diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts index c3ecf5175cdd234b6eb8fe9e155d3208ff64fcfd..d4bfe7e12dc07478a2e74e4b832f4f41a5ee7d4b 100644 --- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts @@ -31,7 +31,7 @@ export class P2PSyncDownloader implements ISyncDownloader { private nodes: Querable<ProfiledNode>[] = [] private nbDownloadsTried = 0 private nbDownloading = 0 - private downloads: { [chunk: number]: any } = {} + private downloads: { [chunk: number]: ProfiledNode } = {} private fifoPromise = new GlobalFifoPromise() private nbWaitFailed = 0 @@ -85,6 +85,7 @@ export class P2PSyncDownloader implements ISyncDownloader { tta: 1, ttas: [], nbSuccess: 1, + excluded: false, readyForDownload: manualp, hostName: syncApi && syncApi.api.hostName || '', } @@ -130,7 +131,7 @@ export class P2PSyncDownloader implements ISyncDownloader { // We filter on all the available nodes, since serveral can be ready at the same time const readyNodes:ProfiledNode[] = await Promise.all(this.nodes.filter(p => p.isResolved())) // We remove the nodes impossible to reach (timeout) - let withGoodDelays = Underscore.filter(readyNodes, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD) + let withGoodDelays = Underscore.filter(readyNodes, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && c.readyForDownload.isResolved()) if (withGoodDelays.length === 0) { readyNodes.map(c => { if (c.tta >= this.MAX_DELAY_PER_DOWNLOAD) { @@ -142,7 +143,14 @@ export class P2PSyncDownloader implements ISyncDownloader { withGoodDelays = Underscore.sortBy(withGoodDelays, c => c.tta) withGoodDelays = withGoodDelays.slice(0, parallelMax) // We temporarily augment the tta to avoid asking several times to the same node in parallel - withGoodDelays.forEach(c => c.tta = this.MAX_DELAY_PER_DOWNLOAD) + withGoodDelays.forEach(c => { + c.tta = this.MAX_DELAY_PER_DOWNLOAD + c.readyForDownload = newManualPromise() + }) + if (withGoodDelays.length === 0) { + this.logger.warn('No node found to download this chunk.') + throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK]) + } return withGoodDelays }) } @@ -158,19 +166,14 @@ export class P2PSyncDownloader implements ISyncDownloader { const lastSupplier = this.downloads[chunkIndex] if (lastSupplier) { lastSupplier.excluded = true - this.logger.warn('Excluding node %s as it returns unchainable chunks', [lastSupplier.host, lastSupplier.port].join(':')) + this.logger.warn('Excluding node %s as it returns unchainable chunks', lastSupplier.hostName) } let candidates = await this.getP2Pcandidates(); - if (candidates.length === 0) { - this.logger.warn('No node found to download this chunk.') - throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK]) - } // Book the nodes return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:ProfiledNode) => { try { const start = Date.now(); this.handler[chunkIndex] = node; - node.readyForDownload = newManualPromise() this.nbDownloading++; this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); let blocks = await node.api.getBlocks(count, from); @@ -263,5 +266,6 @@ interface ProfiledNode { ttas: number[] nbSuccess: number hostName: string + excluded: boolean readyForDownload: ManualPromise<boolean> }