diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts index a7c759444569b4125a4a49257adb35ab464742ff..b5b59a7f61e53cf4c26d12ebff0abd3937c46065 100644 --- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts @@ -68,10 +68,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade nodesAvailable = nodesAvailable.concat(readyNodes) i++ } - return nodesAvailable.slice(0, needed).map(n => { - n.reserve() - return n - }) + return nodesAvailable } /** @@ -81,12 +78,17 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade */ private async getP2Pcandidates(chunkIndex: number): Promise<P2pCandidate[]> { return this.fifoPromise.pushFIFOPromise('getP2Pcandidates_' + getNanosecondsTime(), async () => { + const needed = 1 // We wait a bit to have some available nodes const readyNodes = await this.waitForAvailableNodesAndReserve() // We remove the nodes impossible to reach (timeout) let byAvgAnswerTime = Underscore.sortBy(readyNodes, p => p.avgResponseTime()) const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, byAvgAnswerTime.length) byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax) + this.watcher.reserveNodes(byAvgAnswerTime) + byAvgAnswerTime.slice(0, needed).forEach(n => { + n.reserve() + }) if (byAvgAnswerTime.length === 0) { this.logger.warn('No node found to download chunk #%s.', chunkIndex) this.watcher.unableToDownloadChunk(chunkIndex) diff --git a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts index 5819db553d2d9d44eea920c5a172df0c3d40a59c..16c59e199d13669bb9a4e1519ebe80dc352178d4 100644 --- a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts +++ b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts @@ -43,6 +43,7 @@ export class P2pCandidate { return Promise.race([ // Wait for availablity (async () => !this.isExcluded + && !this.reserved && (this.apiPromise.isRejected() ? await newResolveTimeoutPromise(maxWait, false) : !!(await this.apiPromise)) && (this.dlPromise.isRejected() ? await newResolveTimeoutPromise(maxWait, false) : !!(await this.dlPromise)))(), // Maximum wait trigger @@ -55,6 +56,9 @@ export class P2pCandidate { } avgResponseTime() { + if (!this.responseTimes.length) { + return 0 + } return this.responseTimes.reduce((sum, rt) => sum + rt, 0) / this.responseTimes.length }