From b12fdb30e0cf82b33d7e13569a0e46fbbccff6ba Mon Sep 17 00:00:00 2001 From: Benoit Lavenier <benoit.lavenier@e-is.pro> Date: Tue, 6 Jun 2023 18:37:14 +0200 Subject: [PATCH] fix(sync): don't lose nodes on single download err - cherry-pick of commit 7a59f46 from branch `dev` --- .../crawler/lib/sync/P2PSyncDownloader.ts | 20 ++++++---- .../crawler/lib/sync/p2p/p2p-candidate.ts | 40 ++++++++++++------- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts index a677cdb4b..ad5bda842 100644 --- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts @@ -136,7 +136,11 @@ export class P2PSyncDownloader extends ASyncDownloader * @param count The number of blocks to download. * @param chunkIndex The # of the chunk in local algorithm (logging purposes only) */ - private async p2pDownload(from: number, count: number, chunkIndex: number) { + private async p2pDownload( + from: number, + count: number, + chunkIndex: number + ): Promise<BlockDTO[]> { // if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now const lastSupplier = this.downloads[chunkIndex]; if (lastSupplier) { @@ -148,14 +152,14 @@ export class P2PSyncDownloader extends ASyncDownloader const candidates = await this.getP2Pcandidates(chunkIndex); // Book the nodes this.watcher.gettingChunk(chunkIndex, candidates); - return await this.raceOrCancelIfTimeout( + return await this.raceOrCancelIfTimeout<BlockDTO[]>( this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => { try { this.handler[chunkIndex] = node; this.nbDownloading++; this.watcher.writeStatus( - "Getting chunck #" + + "Getting chunk #" + chunkIndex + "/" + (this.numberOfChunksToDownload - 1) + @@ -166,10 +170,10 @@ export class P2PSyncDownloader extends ASyncDownloader " on peer " + node.hostName ); - let blocks = await node.downloadBlocks(count, from); + let blocks = (await node.downloadBlocks(count, from)) || []; this.watcher.gotChunk(chunkIndex, node); this.watcher.writeStatus( - "GOT chunck #" + + "GOT chunk #" + chunkIndex + "/" + (this.numberOfChunksToDownload - 1) + @@ -226,7 +230,7 @@ export class P2PSyncDownloader extends ASyncDownloader throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK]); } } - await new Promise((res) => setTimeout(res, 1000)); // Wait 1s before retrying + await new Promise<void>((res) => setTimeout(res, 1000)); // Wait 1s before retrying return this.downloadChunk(index); } } @@ -237,11 +241,11 @@ export class P2PSyncDownloader extends ASyncDownloader * @param races * @returns {Promise} */ - private raceOrCancelIfTimeout(timeout: number, races: any[]) { + private raceOrCancelIfTimeout<T = any>(timeout: number, races: Promise<T>[]) { return Promise.race( [ // Process the race, but cancel it if we don't get an anwser quickly enough - new Promise((resolve, reject) => { + new Promise<T>((resolve, reject) => { setTimeout(() => { reject(this.TOO_LONG_TIME_DOWNLOAD); }, timeout); diff --git a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts index 56fca9b04..9df30f665 100644 --- a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts +++ b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts @@ -83,32 +83,44 @@ export class P2pCandidate { return (this.api && this.api.hostName) || "NO_API"; } - async downloadBlocks(count: number, from: number) { + async downloadBlocks(count: number, from: number): Promise<BlockDTO[]> { const start = Date.now(); - let error: Error | undefined; this.reserved = false; - this.dlPromise = querablep( + const promise = querablep( (async () => { // We try to download the blocks - let blocks: BlockDTO[] | null; - try { - blocks = await (this.api as IRemoteContacter).getBlocks(count, from); - } catch (e) { - // Unfortunately this can fail - blocks = null; - error = e; + let blocks: BlockDTO[] = []; + let tries = 5; + while (tries > 0) { + try { + blocks = await (this.api as IRemoteContacter).getBlocks( + count, + from + ); + } catch (e) { + // Unfortunately this can fail + blocks = []; + this.logger.error(e); + } + if (blocks.length != count) { + this.logger.error("Wrong number of blocks from %s", this.hostName); + tries--; + } else { + break; + } + } + if (!blocks || blocks.length != count) { + throw new Error("Wrong number of blocks from " + this.hostName); } this.responseTimes.push(Date.now() - start); // Only keep a flow of 5 ttas for the node if (this.responseTimes.length > 5) this.responseTimes.shift(); this.nbSuccess++; - if (error) { - throw error; - } return blocks; })() ); - return this.dlPromise; + this.dlPromise = promise; + return promise; } private getRemoteAPIs() { -- GitLab