diff --git a/app/lib/computation/QuickSync.ts b/app/lib/computation/QuickSync.ts index b7f4cd5f6ea6828eb55476008ebacb7a1b020cf5..c94868a2ff2ed37ae4dfdb89896a732e3d7d4ac8 100644 --- a/app/lib/computation/QuickSync.ts +++ b/app/lib/computation/QuickSync.ts @@ -20,6 +20,7 @@ import {FileDAL} from "../dal/fileDAL" import {DBBlock} from "../db/DBBlock" import {DBTx} from "../db/DBTx" import {Underscore} from "../common-libs/underscore" +import {CommonConstants} from "../common-libs/constants" const constants = require('../constants') @@ -110,6 +111,9 @@ export class QuickSynchronizer { return block })) + // We only keep approx 2 months of blocks in memory, so memory consumption keeps approximately constant during the sync + await this.dal.blockDAL.trimBlocks(blocks[blocks.length - 1].number - CommonConstants.BLOCKS_IN_MEMORY_MAX) + for (const block of blocks) { // VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index ffc453149b864c68ba9efd0f1fde546b0c4c709f..d971ac59675cac887e5c09ccd8e56c04c4448228 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -35,6 +35,7 @@ import {Underscore} from "../../../lib/common-libs/underscore" import {HttpMerkleOfPeers} from "../../bma/lib/dtos" import {DBPeer, JSONDBPeer} from "../../../lib/db/DBPeer" import {cliprogram} from "../../../lib/common-libs/programOptions" +import {Querable} from "../../prover/lib/permanentProver" const multimeter = require('multimeter'); const makeQuerablePromise = require('querablep'); @@ -295,12 +296,12 @@ export class Synchroniser extends stream.Duplex { let idty = await this.dal.getWrittenIdtyByPubkeyForIsMember(thePeer.pubkey); return (idty && idty.member) || false; } - downloadBlocks(thePeer: PeerDTO, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> { + async downloadBlocks(thePeer: PeerDTO, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> { // Note: we don't care about the particular peer asked by the method. We use the network instead. const numberOffseted = fromNumber - (localNumber + 1); const targetChunk = Math.floor(numberOffseted / CONST_BLOCKS_CHUNK); // Return the download promise! Simple. - return downloader.getChunk(targetChunk); + return (await downloader.getChunk(targetChunk))() } })(this.server, this.watcher, this.dal, this.BlockchainService) @@ -390,7 +391,7 @@ export class Synchroniser extends stream.Duplex { this.watcher.writeStatus('Peer ' + entry.pubkey); await this.PeeringService.submitP(entry, false, to === undefined); } catch (e) { - this.logger.warn(e); + this.logger.warn(e && e.message || e) } } } @@ -652,10 +653,14 @@ class LoggerWatcher implements Watcher { } +export interface PromiseOfBlocksReading { + (): Promise<BlockDTO[]> +} + class P2PDownloader { private PARALLEL_PER_CHUNK = 1; - private MAX_DELAY_PER_DOWNLOAD = 15000; + private MAX_DELAY_PER_DOWNLOAD = 5000; private WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED = 3000; private NO_NODES_AVAILABLE = "No node available for download"; private TOO_LONG_TIME_DOWNLOAD:string @@ -663,18 +668,18 @@ class P2PDownloader { private numberOfChunksToDownload:number private downloadSlots:number private writtenChunks = 0 - private chunks:any + private chunks: (PromiseOfBlocksReading|null)[] private processing:any private handler:any private resultsDeferers:any - private resultsData:Promise<BlockDTO[]>[] + private resultsData:Promise<PromiseOfBlocksReading>[] private nodes:any = {} private nbDownloadsTried = 0 private nbDownloading = 0 private lastAvgDelay:number private aSlotWasAdded = false private slots:number[] = []; - private downloads:any = {}; + private downloads: { [k:number]: Querable<PromiseOfBlocksReading> } = {}; private startResolver:any private downloadStarter:Promise<any> @@ -757,7 +762,8 @@ class P2PDownloader { if (this.downloads[realIndex].isResolved()) { // IIFE to be safe about `realIndex` (async () => { - const blocks = await this.downloads[realIndex]; + const promiseOfBlocks = await this.downloads[realIndex] + const blocks = await promiseOfBlocks() if (realIndex < this.chunks.length - 1) { // We must wait for NEXT blocks to be STRONGLY validated before going any further, otherwise we // could be on the wrong chain @@ -767,9 +773,14 @@ class P2PDownloader { if (chainsWell) { // Chunk is COMPLETE this.logger.warn("Chunk #%s is COMPLETE from %s", realIndex, [this.handler[realIndex].host, this.handler[realIndex].port].join(':')); - this.chunks[realIndex] = blocks; + this.chunks[realIndex] = promiseOfBlocks // We pre-save blocks only for non-cautious sync if (this.nocautious) { + await this.dal.blockchainArchiveDAL.archive(blocks.map((b:any) => { + const block = DBBlock.fromBlockDTO(b) + block.fork = false + return block + })) this.writtenChunks++ watcher.savedPercent(Math.round(this.writtenChunks / this.numberOfChunksToDownload * 100)); } @@ -795,7 +806,7 @@ class P2PDownloader { } } // Wait a bit - await new Promise((resolve, reject) => setTimeout(resolve, 10)); + await new Promise((resolve, reject) => setTimeout(resolve, 1)); } } catch (e) { this.logger.error('Fatal error in the downloader:'); @@ -939,7 +950,7 @@ class P2PDownloader { * Function for downloading a chunk by its number. * @param index Number of the chunk. */ - private async downloadChunk(index:number): Promise<BlockDTO[]> { + private async downloadChunk(index:number): Promise<() => Promise<BlockDTO[]>> { // The algorithm to download a chunk const from = this.localNumber + 1 + index * CONST_BLOCKS_CHUNK; let count = CONST_BLOCKS_CHUNK; @@ -960,15 +971,26 @@ class P2PDownloader { if (!existsOnDAL) { theDAL = this.otherDAL as FileDAL } - return (await theDAL.confDAL.coreFS.readJSON(fileName)).blocks; + // Returns a promise of file content + return async () => { + return (await theDAL.confDAL.coreFS.readJSON(fileName)).blocks + } } else { - const chunk:any = await this.p2pDownload(from, count, index); + const chunk:BlockDTO[] = await this.p2pDownload(from, count, index) as BlockDTO[] // Store the file to avoid re-downloading if (this.localNumber <= 0 && chunk.length === CONST_BLOCKS_CHUNK) { await this.dal.confDAL.coreFS.makeTree(this.currency); await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) }); + // Returns a promise of file content + return async () => { + const json = await this.dal.confDAL.coreFS.readJSON(fileName) + return json.blocks + } + } + // Returns a promise of file content + return async () => { + return chunk } - return chunk; } } catch (e) { this.logger.error(e); @@ -1038,7 +1060,7 @@ class P2PDownloader { // Chaining between downloads const previousChunk = await this.getChunk(index + 1); const blockN = blocks[blocks.length - 1]; // The block n - const blockNp1 = previousChunk[0]; // The block n + 1 + const blockNp1 = (await previousChunk())[0] // The block n + 1 if (blockN && blockNp1 && (blockN.number + 1 !== blockNp1.number || blockN.hash != blockNp1.previousHash)) { this.logger.error('Chunk is not referenced by the upper one'); return false; @@ -1062,7 +1084,7 @@ class P2PDownloader { * Promises a chunk to be downloaded and returned * @param index The number of the chunk to download & return */ - getChunk(index:number) { - return this.resultsData[index] || Promise.resolve([]) + getChunk(index:number): Promise<PromiseOfBlocksReading> { + return this.resultsData[index] || Promise.resolve(async () => [] as BlockDTO[]) } }