diff --git a/app/lib/common-libs/manual-promise.ts b/app/lib/common-libs/manual-promise.ts new file mode 100644 index 0000000000000000000000000000000000000000..ea7f4c436cd80bdc43c48da2aa3ec16f451f572e --- /dev/null +++ b/app/lib/common-libs/manual-promise.ts @@ -0,0 +1,25 @@ +import {Querable} from "./querable" + +const querablePromise = require('querablep'); + +export interface ManualPromise<T> extends Querable<T> { + resolve: (data: T) => void + reject: (error: Error) => void +} + +/** + * Creates a new querable promise that can is manually triggered. + * @returns {ManualPromise<T>} + */ +export function newManualPromise<T>() { + let resolveCb: (data: T) => void = () => {} + let rejectCb: (error: Error) => void = () => {} + const p = new Promise((res, rej) => { + resolveCb = res + rejectCb = rej + }) + const q: ManualPromise<T> = querablePromise(p) + q.resolve = resolveCb + q.reject = rejectCb + return q +} diff --git a/app/lib/common-libs/querable.ts b/app/lib/common-libs/querable.ts index d5f4bd121e5b9eaf4f78b0022bea72d8640f6aef..b5cba5da23170c599d46d7dadd138376b0d9f20f 100644 --- a/app/lib/common-libs/querable.ts +++ b/app/lib/common-libs/querable.ts @@ -4,8 +4,11 @@ export interface Querable<T> extends Promise<T> { isFulfilled(): boolean isResolved(): boolean isRejected(): boolean + startedOn: number } export function querablep<T>(p: Promise<T>): Querable<T> { - return querablePromise(p) + const qp = querablePromise(p) + qp.startedOn = Date.now() + return qp } diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index 6cf552b34659303d7cdda0e44548428e35388d11..67f2455be358172e4d02ae06c7f763f1afddb707 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -215,6 +215,8 @@ export class Synchroniser extends stream.Duplex { this.watcher, this.otherDAL) + downloader.start() + let lastPullBlock:BlockDTO|null = null; let dao = new (class extends AbstractDAO { diff --git a/app/modules/crawler/lib/sync/ChunkGetter.ts b/app/modules/crawler/lib/sync/ChunkGetter.ts index c1f4a9f16046ce9dba228e2b1ddf36777f642449..be4fff0b0799296a110282fe0358462be22cb4f9 100644 --- a/app/modules/crawler/lib/sync/ChunkGetter.ts +++ b/app/modules/crawler/lib/sync/ChunkGetter.ts @@ -13,20 +13,47 @@ import {cliprogram} from "../../../../lib/common-libs/programOptions" import {P2PSyncDownloader} from "./P2PSyncDownloader" import {JSONDBPeer} from "../../../../lib/db/DBPeer" import {FsSyncDownloader} from "./FsSyncDownloader" +import {Querable, querablep} from "../../../../lib/common-libs/querable" const logger = NewLogger() +interface DownloadHandler { + downloader: ISyncDownloader +} + +interface WaitingState extends DownloadHandler { + state: 'WAITING', + chunk?: Querable<BlockDTO[]>, +} + +interface DownloadingState extends DownloadHandler { + state: 'DOWNLOADING', + chunk: Querable<BlockDTO[]>, +} + +interface DownloadedState extends DownloadHandler { + state: 'DOWNLOADED', + chunk: Querable<BlockDTO[]>, +} + +interface CompletedState extends DownloadHandler { + state: 'COMPLETED', + readBlocks: PromiseOfBlocksReading, +} + export class ChunkGetter { private resultsDeferers:{ resolve: (data: PromiseOfBlocksReading) => void, reject: () => void }[] private resultsData:Promise<PromiseOfBlocksReading>[] - private downloadStarter:Promise<void> - private startResolver:() => void + private downloadHandlers:(WaitingState|DownloadingState|DownloadedState|CompletedState)[] private fsDownloader: ISyncDownloader private p2PDownloader: ISyncDownloader private downloadedChunks = 0 private writtenChunks = 0 private numberOfChunksToDownload:number + private parallelDownloads = cliprogram.slow ? 1 : 5 + private maxDownloadAdvance = 10 // 10 chunks can be downloaded even if 10th chunk above is not completed + private MAX_DOWNLOAD_TIMEOUT = 15000 constructor( private currency:string, @@ -51,79 +78,149 @@ export class ChunkGetter { this.resultsData = Array.from({ length: this.numberOfChunksToDownload }).map((unused, index) => new Promise(async (resolve, reject) => { this.resultsDeferers[index] = { resolve, reject } })) + } - if (cliprogram.slow) { - // TODO: Handle slow option + /*** + * Triggers the downloading, and parallelize it. + */ + start() { + + // Initializes the downloads queue + this.downloadHandlers = [] + for (let i = 0; i < this.numberOfChunksToDownload; i++) { + this.downloadHandlers.push({ + state: 'WAITING', + downloader: this.fsDownloader, + }) } - /** - * Triggers for starting the download. - */ - this.downloadStarter = new Promise((resolve) => this.startResolver = resolve); - - this.resultsDeferers.map(async (deferer, i) => { - let isTopChunk = i === this.resultsDeferers.length - 1 - let promiseOfUpperChunk: PromiseOfBlocksReading = async () => [] - if (!isTopChunk) { - // We need to wait for upper chunk to be completed to be able to check blocks' correct chaining - promiseOfUpperChunk = await this.resultsData[i + 1] - } - const fileName = this.getChunkName(i) - - let chunk: BlockDTO[] = [] - let chainsWell = false - let downloader: ISyncDownloader = isTopChunk ? this.p2PDownloader : this.fsDownloader // We first try on FS only for non-top chunks - do { - chunk = await downloader.getChunk(i) - chainsWell = await chainsCorrectly(chunk, promiseOfUpperChunk, this.to, this.toHash) - if (!chainsWell) { - if (downloader === this.p2PDownloader) { - if (chunk.length === 0) { - logger.error('No block was downloaded') + // Download loop + (async () => { + let downloadFinished = false + while(!downloadFinished) { + + let usedSlots = 0 + let remainingDownloads = 0 + let firstNonCompleted = 0 + + // Scan loop: + for (let i = this.numberOfChunksToDownload - 1; i >= 0; i--) { + + let isTopChunk = i === this.resultsDeferers.length - 1 + const handler = this.downloadHandlers[i] + if (handler.state !== 'COMPLETED' && firstNonCompleted === 0) { + firstNonCompleted = i + } + if (handler.state === 'WAITING') { + // We reached a new ready slot. + // If there is no more available slot, just stop the scan loop: + if (usedSlots === this.parallelDownloads || i < firstNonCompleted - this.maxDownloadAdvance) { + remainingDownloads++ + break; } - logger.warn("Chunk #%s is DOES NOT CHAIN CORRECTLY. Retrying.", i) + // Otherwise let's start a download + if (isTopChunk) { + // The top chunk is always downloaded via P2P + handler.downloader = this.p2PDownloader + } + handler.chunk = querablep(handler.downloader.getChunk(i)) + ;(handler as any).state = 'DOWNLOADING' + remainingDownloads++ + usedSlots++ } - downloader = this.p2PDownloader // If ever the first call does not chains well, we try using P2P - } else if (downloader !== this.fsDownloader) { - // Store the file to avoid re-downloading - if (this.localNumber <= 0 && chunk.length === CommonConstants.CONST_BLOCKS_CHUNK) { - await this.dal.confDAL.coreFS.makeTree(this.currency); - await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) }); + else if (handler.state === 'DOWNLOADING') { + if (handler.chunk.isResolved()) { + (handler as any).state = 'DOWNLOADED' + i++ // We loop back on this handler + } else if (Date.now() - handler.chunk.startedOn > this.MAX_DOWNLOAD_TIMEOUT) { + (handler as any).chunk = []; + (handler as any).state = 'DOWNLOADED' + i++ // We loop back on this handler + } else { + remainingDownloads++ + usedSlots++ + } + } + else if (handler.state === 'DOWNLOADED') { + // Chaining test: we must wait for upper chunk to be completed (= downloaded + chained) + const chunk = await handler.chunk + if (chunk.length === 0 && handler.downloader === this.fsDownloader) { + // Retry with P2P + handler.downloader = this.p2PDownloader + ;(handler as any).state = 'WAITING' + } + if (isTopChunk || this.downloadHandlers[i + 1].state === 'COMPLETED') { + const fileName = this.getChunkName(i) + let promiseOfUpperChunk: PromiseOfBlocksReading = async () => [] + if (!isTopChunk && chunk.length) { + // We need to wait for upper chunk to be completed to be able to check blocks' correct chaining + promiseOfUpperChunk = await this.resultsData[i + 1] + } + const chainsWell = await chainsCorrectly(chunk, promiseOfUpperChunk, this.to, this.toHash) + if (!chainsWell) { + if (handler.downloader === this.p2PDownloader) { + if (chunk.length === 0) { + logger.error('No block was downloaded') + } + logger.warn("Chunk #%s is DOES NOT CHAIN CORRECTLY. Retrying.", i) + } + handler.downloader = this.p2PDownloader // If ever the first call does not chains well, we try using P2P + ;(handler as any).state = 'WAITING' + i++ + } else if (handler.downloader !== this.fsDownloader) { + // Store the file to avoid re-downloading + if (this.localNumber <= 0 && chunk.length === CommonConstants.CONST_BLOCKS_CHUNK) { + await this.dal.confDAL.coreFS.makeTree(this.currency); + await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) }); + } + } else { + logger.warn("Chunk #%s read from filesystem.", i) + } + + if (chainsWell) { + + // Chunk is COMPLETE + logger.warn("Chunk #%s is COMPLETE", i) + ;(handler as any).state = 'COMPLETED' + this.downloadedChunks++ + this.watcher.downloadPercent(parseInt((this.downloadedChunks / this.numberOfChunksToDownload * 100).toFixed(0))) + // We pre-save blocks only for non-cautious sync + if (this.nocautious) { + await this.dal.blockchainArchiveDAL.archive(chunk.map(b => { + const block = DBBlock.fromBlockDTO(b) + block.fork = false + return block + })) + this.writtenChunks++ + this.watcher.savedPercent(Math.round(this.writtenChunks / this.numberOfChunksToDownload * 100)); + } + + // Returns a promise of file content + this.resultsDeferers[i].resolve(async () => { + if (isTopChunk) { + return chunk + } + return (await this.dal.confDAL.coreFS.readJSON(fileName)).blocks + }) + } + } else { + remainingDownloads++ + } } - } else { - logger.warn("Chunk #%s read from filesystem.", i) - } - } - while (!chainsWell) - // Chunk is COMPLETE - logger.warn("Chunk #%s is COMPLETE", i) - this.downloadedChunks++ - watcher.downloadPercent(parseInt((this.downloadedChunks / this.numberOfChunksToDownload * 100).toFixed(0))) - // We pre-save blocks only for non-cautious sync - if (this.nocautious) { - await this.dal.blockchainArchiveDAL.archive(chunk.map(b => { - const block = DBBlock.fromBlockDTO(b) - block.fork = false - return block - })) - this.writtenChunks++ - watcher.savedPercent(Math.round(this.writtenChunks / this.numberOfChunksToDownload * 100)); - } - // Returns a promise of file content - deferer.resolve(async () => { - if (isTopChunk) { - return chunk } - return (await this.dal.confDAL.coreFS.readJSON(fileName)).blocks - }) - }) - } - /*** - * Triggers the downloading - */ - start() { - return this.startResolver() + downloadFinished = remainingDownloads === 0 + + // Wait for a download to be finished + if (!downloadFinished) { + const downloadsToWait = (this.downloadHandlers.filter(h => h.state === 'DOWNLOADING') as DownloadingState[]) + .map(h => h.chunk) + if (downloadsToWait.length) { + await Promise.race(downloadsToWait) + } + } + } + })() } async getChunk(i: number): Promise<PromiseOfBlocksReading> { diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts index a7e08f41a1cbe62fc94c29409f783b9a08efa128..7e6641ad7c89b7692d4d92821ea346c879338d36 100644 --- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts @@ -6,13 +6,14 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO" import {Watcher} from "./Watcher" import {CommonConstants} from "../../../../lib/common-libs/constants" import {ISyncDownloader} from "./ISyncDownloader" +import {cliprogram} from "../../../../lib/common-libs/programOptions" const makeQuerablePromise = require('querablep'); export class P2PSyncDownloader implements ISyncDownloader { private PARALLEL_PER_CHUNK = 1; - private MAX_DELAY_PER_DOWNLOAD = 10000; + private MAX_DELAY_PER_DOWNLOAD = cliprogram.slow ? 15000 : 5000; private WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED = 3000; private NO_NODES_AVAILABLE = "No node available for download"; private TOO_LONG_TIME_DOWNLOAD:string @@ -85,7 +86,7 @@ export class P2PSyncDownloader implements ISyncDownloader { throw this.NO_NODES_AVAILABLE; } // We remove the nodes impossible to reach (timeout) - let withGoodDelays = Underscore.filter(candidates, (c:any) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded); + let withGoodDelays = Underscore.filter(candidates, (c:any) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && !c.downloading); if (withGoodDelays.length === 0) { await new Promise(res => setTimeout(res, this.WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED)) // We wait a bit before continuing the downloads // We reinitialize the nodes