diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index 9d0fa1f51886db1b43604130557938df1476e2db..358bd1547f3965ba34e286c393541709531f3944 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -48,6 +48,15 @@ export class Synchroniser extends stream.Duplex { this.watcher.onEvent('appliedChange', () => this.push(this.watcher.getStats())) this.watcher.onEvent('sbxChange', () => this.push(this.watcher.getStats())) this.watcher.onEvent('peersChange', () => this.push(this.watcher.getStats())) + this.watcher.onEvent('addWrongChunkFailure', (data) => this.push({ p2pData: { name: 'addWrongChunkFailure', data }})) + this.watcher.onEvent('failToGetChunk', (data) => this.push({ p2pData: { name: 'failToGetChunk', data }})) + this.watcher.onEvent('gettingChunk', (data) => this.push({ p2pData: { name: 'gettingChunk', data }})) + this.watcher.onEvent('gotChunk', (data) => this.push({ p2pData: { name: 'gotChunk', data }})) + this.watcher.onEvent('reserveNodes', (data) => this.push({ p2pData: { name: 'reserveNodes', data }})) + this.watcher.onEvent('unableToDownloadChunk', (data) => this.push({ p2pData: { name: 'unableToDownloadChunk', data }})) + this.watcher.onEvent('wantToDownload', (data) => this.push({ p2pData: { name: 'wantToDownload', data }})) + this.watcher.onEvent('wantToLoad', (data) => this.push({ p2pData: { name: 'wantToLoad', data }})) + this.watcher.onEvent('beforeReadyNodes', (data) => this.push({ p2pData: { name: 'beforeReadyNodes', data }})) this.syncStrategy.setWatcher(this.watcher) diff --git a/app/modules/crawler/lib/sync/BMARemoteContacter.ts b/app/modules/crawler/lib/sync/BMARemoteContacter.ts index 14fdbdebabae43ad000007a55207a70f44c76a72..63ddb845d247205ede03bd9b77aea2dc7d709971 100644 --- a/app/modules/crawler/lib/sync/BMARemoteContacter.ts +++ b/app/modules/crawler/lib/sync/BMARemoteContacter.ts @@ -22,6 +22,8 @@ const logger = NewLogger() export class BMARemoteContacter implements IRemoteContacter { + public type: 'BMA'| 'WS2P' = 'BMA' + constructor(protected contacter: Contacter) { } diff --git a/app/modules/crawler/lib/sync/IRemoteContacter.ts b/app/modules/crawler/lib/sync/IRemoteContacter.ts index ef0c2e706fbfcb758e99499f5fd06c0e8c83c534..9a4ea10d238b666762692ad55355acfd044eb403 100644 --- a/app/modules/crawler/lib/sync/IRemoteContacter.ts +++ b/app/modules/crawler/lib/sync/IRemoteContacter.ts @@ -34,4 +34,6 @@ export interface IRemoteContacter { getRequirementsPending(number: number): Promise<HttpRequirements> hostName: string + + type: 'BMA' | 'WS2P' } diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts index 96db98a2df962daab97956d4fd103351c8498f86..a7c759444569b4125a4a49257adb35ab464742ff 100644 --- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts @@ -57,6 +57,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade } private async waitForAvailableNodesAndReserve(needed = 1): Promise<P2pCandidate[]> { + this.watcher.beforeReadyNodes(this.p2pCandidates) let nodesToWaitFor = this.p2pCandidates.slice() let nodesAvailable: P2pCandidate[] = [] let i = 0 @@ -88,6 +89,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax) if (byAvgAnswerTime.length === 0) { this.logger.warn('No node found to download chunk #%s.', chunkIndex) + this.watcher.unableToDownloadChunk(chunkIndex) throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK]) } return byAvgAnswerTime @@ -104,17 +106,21 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade // if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now const lastSupplier = this.downloads[chunkIndex] if (lastSupplier) { + this.watcher.addWrongChunkFailure(chunkIndex, lastSupplier) lastSupplier.addFailure() } + this.watcher.wantToDownload(chunkIndex) // Only 1 candidate for now const candidates = await this.getP2Pcandidates(chunkIndex) // Book the nodes + this.watcher.gettingChunk(chunkIndex, candidates) return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => { try { this.handler[chunkIndex] = node; this.nbDownloading++; this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); let blocks = await node.downloadBlocks(count, from); + this.watcher.gotChunk(chunkIndex, node) this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); if (this.PARALLEL_PER_CHUNK === 1) { // Only works if we have 1 concurrent peer per chunk @@ -124,6 +130,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade this.nbDownloadsTried++; return blocks; } catch (e) { + this.watcher.failToGetChunk(chunkIndex, node) this.nbDownloading--; this.nbDownloadsTried++; throw e; diff --git a/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts b/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts index eeb7226cbb581273ec52104a121ed8ebd376eb79..3af758ce9500452a57f751f0f169fbcc0082d790 100644 --- a/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts +++ b/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts @@ -23,6 +23,8 @@ const logger = NewLogger() export class WS2PRemoteContacter implements IRemoteContacter { + public type: 'BMA'| 'WS2P' = 'WS2P' + getRequirementsPending(min: number): Promise<HttpRequirements> { return this.requester.getRequirementsPending(min) } diff --git a/app/modules/crawler/lib/sync/Watcher.ts b/app/modules/crawler/lib/sync/Watcher.ts index bbb5f113ea3b4679b1e2bfe57c86add88c62f7a1..9e0c945a1baaef4bceb99ae8cd4376cc4404dace 100644 --- a/app/modules/crawler/lib/sync/Watcher.ts +++ b/app/modules/crawler/lib/sync/Watcher.ts @@ -1,5 +1,6 @@ import * as events from "events" import {cliprogram} from "../../../../lib/common-libs/programOptions" +import {P2pCandidate} from "./p2p/p2p-candidate" const multimeter = require('multimeter') @@ -11,9 +12,36 @@ export interface Watcher { sbxPercent(pct?: number): number peersPercent(pct?: number): number end(): void + + reserveNodes(nodesAvailable: P2pCandidate[]): void + + unableToDownloadChunk(chunkIndex: number): void + + gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void + + gotChunk(chunkIndex: number, node: P2pCandidate): void + + failToGetChunk(chunkIndex: number, node: P2pCandidate): void + + wantToDownload(chunkIndex: number): void + + addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void + + wantToLoad(chunkIndex: number): void + + beforeReadyNodes(p2pCandidates: P2pCandidate[]): void } export type EventName = 'downloadChange'|'storageChange'|'appliedChange'|'sbxChange'|'peersChange' + | 'addWrongChunkFailure' + | 'failToGetChunk' + | 'gettingChunk' + | 'gotChunk' + | 'reserveNodes' + | 'unableToDownloadChunk' + | 'wantToDownload' + | 'wantToLoad' + | 'beforeReadyNodes' export class EventWatcher extends events.EventEmitter implements Watcher { @@ -69,6 +97,44 @@ export class EventWatcher extends events.EventEmitter implements Watcher { peersSync: this.peersPercent(), } } + + /************* P2P DOWNLOAD EVENTS ****************/ + + addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void { + this.emit('addWrongChunkFailure', { chunkIndex, node: lastSupplier }) + } + + failToGetChunk(chunkIndex: number, node: P2pCandidate): void { + this.emit('failToGetChunk', { chunkIndex, node }) + } + + gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void { + this.emit('gettingChunk', { chunkIndex, nodes: candidates }) + } + + gotChunk(chunkIndex: number, node: P2pCandidate): void { + this.emit('gotChunk', { chunkIndex, node }) + } + + reserveNodes(nodesAvailable: P2pCandidate[]): void { + this.emit('reserveNodes', { nodes: nodesAvailable }) + } + + unableToDownloadChunk(chunkIndex: number): void { + this.emit('unableToDownloadChunk', { chunkIndex }) + } + + wantToDownload(chunkIndex: number): void { + this.emit('wantToDownload', { chunkIndex }) + } + + wantToLoad(chunkIndex: number): void { + this.emit('wantToLoad', { chunkIndex }) + } + + beforeReadyNodes(p2pCandidates: P2pCandidate[]): void { + this.emit('beforeReadyNodes', { nodes: p2pCandidates }) + } } export class MultimeterWatcher implements Watcher { @@ -176,6 +242,36 @@ export class MultimeterWatcher implements Watcher { empty : { text : ' ' } }) } + + /************* NOT IMPLEMENTED ****************/ + + addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void { + } + + failToGetChunk(chunkIndex: number, node: P2pCandidate): void { + } + + gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void { + } + + gotChunk(chunkIndex: number, node: P2pCandidate): void { + } + + reserveNodes(nodesAvailable: P2pCandidate[]): void { + } + + unableToDownloadChunk(chunkIndex: number): void { + } + + wantToDownload(chunkIndex: number): void { + } + + wantToLoad(chunkIndex: number): void { + } + + beforeReadyNodes(p2pCandidates: P2pCandidate[]): void { + } + } export class LoggerWatcher implements Watcher { @@ -239,4 +335,33 @@ export class LoggerWatcher implements Watcher { end() { } + /************* NOT IMPLEMENTED ****************/ + + addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void { + } + + failToGetChunk(chunkIndex: number, node: P2pCandidate): void { + } + + gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void { + } + + gotChunk(chunkIndex: number, node: P2pCandidate): void { + } + + reserveNodes(nodesAvailable: P2pCandidate[]): void { + } + + unableToDownloadChunk(chunkIndex: number): void { + } + + wantToDownload(chunkIndex: number): void { + } + + wantToLoad(chunkIndex: number): void { + } + + beforeReadyNodes(p2pCandidates: P2pCandidate[]): void { + } + } diff --git a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts index 7bdfdbe36974ff636865a70983d1d560469dbbf7..5819db553d2d9d44eea920c5a172df0c3d40a59c 100644 --- a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts +++ b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts @@ -18,7 +18,7 @@ export class P2pCandidate { private reserved = false constructor( - private p: PeerDTO, + public p: PeerDTO, private keypair: Keypair, private logger: any, private allowLocalSync: boolean, @@ -131,6 +131,10 @@ export class P2pCandidate { reserve() { this.reserved = true } + + get apiName() { + return this.api && this.api.type + } } interface RemoteAPI { diff --git a/app/modules/crawler/lib/sync/v2/DownloadStream.ts b/app/modules/crawler/lib/sync/v2/DownloadStream.ts index d903b598b4e015dc271930200ecb80bbb67d4149..cae155f5a826d0e004631fe76a97aaafe9046f9d 100644 --- a/app/modules/crawler/lib/sync/v2/DownloadStream.ts +++ b/app/modules/crawler/lib/sync/v2/DownloadStream.ts @@ -62,6 +62,7 @@ export class DownloadStream extends Duplex { let chunk: BlockDTO[] // We don't have the file locally: we loop on P2P download until we have it (or until P2P throws a general error) do { + this.watcher.wantToLoad(i) chunk = await downloader.getChunk(i) if (chunk.length) { // NewLogger().info("Chunk #%s is COMPLETE", i) diff --git a/app/modules/ws2p/lib/WS2PDocpoolPuller.ts b/app/modules/ws2p/lib/WS2PDocpoolPuller.ts index 4178a7a4946a4e52779a4db2ff7d5f1f74e4e268..08b6cfe354489157f3e3edbe8b2fdb099a4f76c2 100644 --- a/app/modules/ws2p/lib/WS2PDocpoolPuller.ts +++ b/app/modules/ws2p/lib/WS2PDocpoolPuller.ts @@ -27,6 +27,7 @@ export class WS2PDocpoolPuller { const requester = WS2PRequester.fromConnection(this.connection) // node.pubkey = p.pubkey; return pullSandboxToLocalServer(this.server.conf.currency, { + type: 'WS2P', getRequirementsPending: (minCert = 1) => { return requester.getRequirementsPending(minCert) },