Skip to content
Snippets Groups Projects
Commit f3a7d2d1 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] sync: support for `--slow` option

parent 03c300be
No related branches found
No related tags found
No related merge requests found
import {Querable} from "./querable"
const querablePromise = require('querablep');
export interface ManualPromise<T> extends Querable<T> {
resolve: (data: T) => void
reject: (error: Error) => void
}
/**
* Creates a new querable promise that can is manually triggered.
* @returns {ManualPromise<T>}
*/
export function newManualPromise<T>() {
let resolveCb: (data: T) => void = () => {}
let rejectCb: (error: Error) => void = () => {}
const p = new Promise((res, rej) => {
resolveCb = res
rejectCb = rej
})
const q: ManualPromise<T> = querablePromise(p)
q.resolve = resolveCb
q.reject = rejectCb
return q
}
......@@ -4,8 +4,11 @@ export interface Querable<T> extends Promise<T> {
isFulfilled(): boolean
isResolved(): boolean
isRejected(): boolean
startedOn: number
}
export function querablep<T>(p: Promise<T>): Querable<T> {
return querablePromise(p)
const qp = querablePromise(p)
qp.startedOn = Date.now()
return qp
}
......@@ -215,6 +215,8 @@ export class Synchroniser extends stream.Duplex {
this.watcher,
this.otherDAL)
downloader.start()
let lastPullBlock:BlockDTO|null = null;
let dao = new (class extends AbstractDAO {
......
......@@ -13,20 +13,47 @@ import {cliprogram} from "../../../../lib/common-libs/programOptions"
import {P2PSyncDownloader} from "./P2PSyncDownloader"
import {JSONDBPeer} from "../../../../lib/db/DBPeer"
import {FsSyncDownloader} from "./FsSyncDownloader"
import {Querable, querablep} from "../../../../lib/common-libs/querable"
const logger = NewLogger()
interface DownloadHandler {
downloader: ISyncDownloader
}
interface WaitingState extends DownloadHandler {
state: 'WAITING',
chunk?: Querable<BlockDTO[]>,
}
interface DownloadingState extends DownloadHandler {
state: 'DOWNLOADING',
chunk: Querable<BlockDTO[]>,
}
interface DownloadedState extends DownloadHandler {
state: 'DOWNLOADED',
chunk: Querable<BlockDTO[]>,
}
interface CompletedState extends DownloadHandler {
state: 'COMPLETED',
readBlocks: PromiseOfBlocksReading,
}
export class ChunkGetter {
private resultsDeferers:{ resolve: (data: PromiseOfBlocksReading) => void, reject: () => void }[]
private resultsData:Promise<PromiseOfBlocksReading>[]
private downloadStarter:Promise<void>
private startResolver:() => void
private downloadHandlers:(WaitingState|DownloadingState|DownloadedState|CompletedState)[]
private fsDownloader: ISyncDownloader
private p2PDownloader: ISyncDownloader
private downloadedChunks = 0
private writtenChunks = 0
private numberOfChunksToDownload:number
private parallelDownloads = cliprogram.slow ? 1 : 5
private maxDownloadAdvance = 10 // 10 chunks can be downloaded even if 10th chunk above is not completed
private MAX_DOWNLOAD_TIMEOUT = 15000
constructor(
private currency:string,
......@@ -51,79 +78,149 @@ export class ChunkGetter {
this.resultsData = Array.from({ length: this.numberOfChunksToDownload }).map((unused, index) => new Promise(async (resolve, reject) => {
this.resultsDeferers[index] = { resolve, reject }
}))
}
if (cliprogram.slow) {
// TODO: Handle slow option
/***
* Triggers the downloading, and parallelize it.
*/
start() {
// Initializes the downloads queue
this.downloadHandlers = []
for (let i = 0; i < this.numberOfChunksToDownload; i++) {
this.downloadHandlers.push({
state: 'WAITING',
downloader: this.fsDownloader,
})
}
/**
* Triggers for starting the download.
*/
this.downloadStarter = new Promise((resolve) => this.startResolver = resolve);
this.resultsDeferers.map(async (deferer, i) => {
let isTopChunk = i === this.resultsDeferers.length - 1
let promiseOfUpperChunk: PromiseOfBlocksReading = async () => []
if (!isTopChunk) {
// We need to wait for upper chunk to be completed to be able to check blocks' correct chaining
promiseOfUpperChunk = await this.resultsData[i + 1]
}
const fileName = this.getChunkName(i)
let chunk: BlockDTO[] = []
let chainsWell = false
let downloader: ISyncDownloader = isTopChunk ? this.p2PDownloader : this.fsDownloader // We first try on FS only for non-top chunks
do {
chunk = await downloader.getChunk(i)
chainsWell = await chainsCorrectly(chunk, promiseOfUpperChunk, this.to, this.toHash)
if (!chainsWell) {
if (downloader === this.p2PDownloader) {
if (chunk.length === 0) {
logger.error('No block was downloaded')
// Download loop
(async () => {
let downloadFinished = false
while(!downloadFinished) {
let usedSlots = 0
let remainingDownloads = 0
let firstNonCompleted = 0
// Scan loop:
for (let i = this.numberOfChunksToDownload - 1; i >= 0; i--) {
let isTopChunk = i === this.resultsDeferers.length - 1
const handler = this.downloadHandlers[i]
if (handler.state !== 'COMPLETED' && firstNonCompleted === 0) {
firstNonCompleted = i
}
if (handler.state === 'WAITING') {
// We reached a new ready slot.
// If there is no more available slot, just stop the scan loop:
if (usedSlots === this.parallelDownloads || i < firstNonCompleted - this.maxDownloadAdvance) {
remainingDownloads++
break;
}
logger.warn("Chunk #%s is DOES NOT CHAIN CORRECTLY. Retrying.", i)
// Otherwise let's start a download
if (isTopChunk) {
// The top chunk is always downloaded via P2P
handler.downloader = this.p2PDownloader
}
handler.chunk = querablep(handler.downloader.getChunk(i))
;(handler as any).state = 'DOWNLOADING'
remainingDownloads++
usedSlots++
}
downloader = this.p2PDownloader // If ever the first call does not chains well, we try using P2P
} else if (downloader !== this.fsDownloader) {
// Store the file to avoid re-downloading
if (this.localNumber <= 0 && chunk.length === CommonConstants.CONST_BLOCKS_CHUNK) {
await this.dal.confDAL.coreFS.makeTree(this.currency);
await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) });
else if (handler.state === 'DOWNLOADING') {
if (handler.chunk.isResolved()) {
(handler as any).state = 'DOWNLOADED'
i++ // We loop back on this handler
} else if (Date.now() - handler.chunk.startedOn > this.MAX_DOWNLOAD_TIMEOUT) {
(handler as any).chunk = [];
(handler as any).state = 'DOWNLOADED'
i++ // We loop back on this handler
} else {
remainingDownloads++
usedSlots++
}
}
else if (handler.state === 'DOWNLOADED') {
// Chaining test: we must wait for upper chunk to be completed (= downloaded + chained)
const chunk = await handler.chunk
if (chunk.length === 0 && handler.downloader === this.fsDownloader) {
// Retry with P2P
handler.downloader = this.p2PDownloader
;(handler as any).state = 'WAITING'
}
if (isTopChunk || this.downloadHandlers[i + 1].state === 'COMPLETED') {
const fileName = this.getChunkName(i)
let promiseOfUpperChunk: PromiseOfBlocksReading = async () => []
if (!isTopChunk && chunk.length) {
// We need to wait for upper chunk to be completed to be able to check blocks' correct chaining
promiseOfUpperChunk = await this.resultsData[i + 1]
}
const chainsWell = await chainsCorrectly(chunk, promiseOfUpperChunk, this.to, this.toHash)
if (!chainsWell) {
if (handler.downloader === this.p2PDownloader) {
if (chunk.length === 0) {
logger.error('No block was downloaded')
}
logger.warn("Chunk #%s is DOES NOT CHAIN CORRECTLY. Retrying.", i)
}
handler.downloader = this.p2PDownloader // If ever the first call does not chains well, we try using P2P
;(handler as any).state = 'WAITING'
i++
} else if (handler.downloader !== this.fsDownloader) {
// Store the file to avoid re-downloading
if (this.localNumber <= 0 && chunk.length === CommonConstants.CONST_BLOCKS_CHUNK) {
await this.dal.confDAL.coreFS.makeTree(this.currency);
await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) });
}
} else {
logger.warn("Chunk #%s read from filesystem.", i)
}
if (chainsWell) {
// Chunk is COMPLETE
logger.warn("Chunk #%s is COMPLETE", i)
;(handler as any).state = 'COMPLETED'
this.downloadedChunks++
this.watcher.downloadPercent(parseInt((this.downloadedChunks / this.numberOfChunksToDownload * 100).toFixed(0)))
// We pre-save blocks only for non-cautious sync
if (this.nocautious) {
await this.dal.blockchainArchiveDAL.archive(chunk.map(b => {
const block = DBBlock.fromBlockDTO(b)
block.fork = false
return block
}))
this.writtenChunks++
this.watcher.savedPercent(Math.round(this.writtenChunks / this.numberOfChunksToDownload * 100));
}
// Returns a promise of file content
this.resultsDeferers[i].resolve(async () => {
if (isTopChunk) {
return chunk
}
return (await this.dal.confDAL.coreFS.readJSON(fileName)).blocks
})
}
} else {
remainingDownloads++
}
}
} else {
logger.warn("Chunk #%s read from filesystem.", i)
}
}
while (!chainsWell)
// Chunk is COMPLETE
logger.warn("Chunk #%s is COMPLETE", i)
this.downloadedChunks++
watcher.downloadPercent(parseInt((this.downloadedChunks / this.numberOfChunksToDownload * 100).toFixed(0)))
// We pre-save blocks only for non-cautious sync
if (this.nocautious) {
await this.dal.blockchainArchiveDAL.archive(chunk.map(b => {
const block = DBBlock.fromBlockDTO(b)
block.fork = false
return block
}))
this.writtenChunks++
watcher.savedPercent(Math.round(this.writtenChunks / this.numberOfChunksToDownload * 100));
}
// Returns a promise of file content
deferer.resolve(async () => {
if (isTopChunk) {
return chunk
}
return (await this.dal.confDAL.coreFS.readJSON(fileName)).blocks
})
})
}
/***
* Triggers the downloading
*/
start() {
return this.startResolver()
downloadFinished = remainingDownloads === 0
// Wait for a download to be finished
if (!downloadFinished) {
const downloadsToWait = (this.downloadHandlers.filter(h => h.state === 'DOWNLOADING') as DownloadingState[])
.map(h => h.chunk)
if (downloadsToWait.length) {
await Promise.race(downloadsToWait)
}
}
}
})()
}
async getChunk(i: number): Promise<PromiseOfBlocksReading> {
......
......@@ -6,13 +6,14 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {Watcher} from "./Watcher"
import {CommonConstants} from "../../../../lib/common-libs/constants"
import {ISyncDownloader} from "./ISyncDownloader"
import {cliprogram} from "../../../../lib/common-libs/programOptions"
const makeQuerablePromise = require('querablep');
export class P2PSyncDownloader implements ISyncDownloader {
private PARALLEL_PER_CHUNK = 1;
private MAX_DELAY_PER_DOWNLOAD = 10000;
private MAX_DELAY_PER_DOWNLOAD = cliprogram.slow ? 15000 : 5000;
private WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED = 3000;
private NO_NODES_AVAILABLE = "No node available for download";
private TOO_LONG_TIME_DOWNLOAD:string
......@@ -85,7 +86,7 @@ export class P2PSyncDownloader implements ISyncDownloader {
throw this.NO_NODES_AVAILABLE;
}
// We remove the nodes impossible to reach (timeout)
let withGoodDelays = Underscore.filter(candidates, (c:any) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded);
let withGoodDelays = Underscore.filter(candidates, (c:any) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && !c.downloading);
if (withGoodDelays.length === 0) {
await new Promise(res => setTimeout(res, this.WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED)) // We wait a bit before continuing the downloads
// We reinitialize the nodes
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment