diff --git a/app/lib/common-libs/constants.ts b/app/lib/common-libs/constants.ts index 776d8de7aa261786988dfee736b31b68f82474b5..ff000fb76ee3d2b477b01bcddf5af2c3c8dee1bd 100755 --- a/app/lib/common-libs/constants.ts +++ b/app/lib/common-libs/constants.ts @@ -302,6 +302,7 @@ export const CommonConstants = { BLOCK_MAX_TX_CHAINING_DEPTH: 5, CONST_BLOCKS_CHUNK: 250, + CHUNK_PREFIX: 'chunk_', BLOCKS_IN_MEMORY_MAX: 288 * 60, // 60 days of blocks MAX_AGE_OF_PEER_IN_BLOCKS: 200, // blocks diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index 5e3246d773cd552744358e8a7ee3e67e37ed2bb7..de828427156bd9d8ae6824d618778fcf5b3d13bd 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -103,6 +103,7 @@ export interface IndexBatch { export class FileDAL { rootPath:string + fs: FileSystem loki:LokiJsDriver sqliteDriver:SQLiteDriver wotb:WoTBInstance @@ -142,6 +143,7 @@ export class FileDAL { this.loki = params.dbf2() this.wotb = params.wotb this.profile = 'DAL' + this.fs = params.fs // DALs this.powDAL = new PowDAL(this.rootPath, params.fs) @@ -290,7 +292,7 @@ export class FileDAL { } async getBlockWeHaveItForSure(number:number): Promise<DBBlock> { - return (await this.blockDAL.getBlock(number)) as DBBlock + return (await this.blockDAL.getBlock(number)) as DBBlock || (await this.blockchainArchiveDAL.getBlockByNumber(number)) } // Duniter-UI dependency diff --git a/app/modules/crawler/index.ts b/app/modules/crawler/index.ts index e6078ae4dd0e567415131a265cdb2624650a50ed..1e4fc8e740b28ce3b7a201b92f1d8c88267a050e 100644 --- a/app/modules/crawler/index.ts +++ b/app/modules/crawler/index.ts @@ -23,6 +23,9 @@ import {Buid} from "../../lib/common-libs/buid" import {BlockDTO} from "../../lib/dto/BlockDTO" import {Directory} from "../../lib/system/directory" import {FileDAL} from "../../lib/dal/fileDAL" +import {RemoteSynchronizer} from "./lib/sync/RemoteSynchronizer" +import {AbstractSynchronizer} from "./lib/sync/AbstractSynchronizer" +import {LocalPathSynchronizer} from "./lib/sync/LocalPathSynchronizer" export const CrawlerDependency = { duniter: { @@ -46,7 +49,8 @@ export const CrawlerDependency = { }, synchronize: (server:Server, onHost:string, onPort:number, upTo:number, chunkLength:number) => { - const remote = new Synchroniser(server, onHost, onPort); + const strategy = new RemoteSynchronizer(onHost, onPort, server) + const remote = new Synchroniser(server, strategy) const syncPromise = remote.sync(upTo, chunkLength) return { flow: remote, @@ -54,9 +58,15 @@ export const CrawlerDependency = { }; }, + /** + * Used by duniter-ui + * @param {Server} server + * @param {string} onHost + * @param {number} onPort + * @returns {Promise<any>} + */ testForSync: (server:Server, onHost:string, onPort:number) => { - const remote = new Synchroniser(server, onHost, onPort); - return remote.test(); + return RemoteSynchronizer.test(onHost, onPort) } }, @@ -74,18 +84,16 @@ export const CrawlerDependency = { ], cli: [{ - name: 'sync [host] [port] [to]', + name: 'sync [source] [to]', desc: 'Synchronize blockchain from a remote Duniter node', preventIfRunning: true, onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { - const host = params[0]; - const port = params[1]; - const to = params[2]; - if (!host) { - throw 'Host is required.'; - } - if (!port) { - throw 'Port is required.'; + const source = params[0] + const to = params[1] + const HOST_PATTERN = /^[^:/]+(:[0-9]{1,5})?$/ + const FILE_PATTERN = /^(\/.+)$/ + if (!source || !(source.match(HOST_PATTERN) || source.match(FILE_PATTERN))) { + throw 'Source of sync is required. (either a host:port or a file path)' } let cautious; if (program.nocautious) { @@ -94,8 +102,6 @@ export const CrawlerDependency = { if (program.cautious) { cautious = true; } - const onHost = host; - const onPort = port; const upTo = parseInt(to); const chunkLength = 0; const interactive = !program.nointeractive; @@ -111,11 +117,20 @@ export const CrawlerDependency = { otherDAL = new FileDAL(params) } - const remote = new Synchroniser(server, onHost, onPort, interactive === true, otherDAL); + let strategy: AbstractSynchronizer + if (source.match(HOST_PATTERN)) { + const sp = source.split(':') + const onHost = sp[0] + const onPort = parseInt(sp[1] ? sp[1] : '443') // Defaults to 443 + strategy = new RemoteSynchronizer(onHost, onPort, server, noShufflePeers === true, otherDAL) + } else { + strategy = new LocalPathSynchronizer(source, server) + } if (program.onlypeers === true) { - return remote.syncPeers(true, onHost, onPort) + return strategy.syncPeers(true) } else { - return remote.sync(upTo, chunkLength, askedCautious, noShufflePeers === true) + const remote = new Synchroniser(server, strategy, interactive === true) + return remote.sync(upTo, chunkLength, askedCautious) } } }, { diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index 949b5e1f6086e9262ed760786b1eba0c368dcb40..99598fa62d8719768e30e4f328a1ea5c1d396494 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -13,28 +13,22 @@ import * as stream from "stream" import * as moment from "moment" -import {CrawlerConstants} from "./constants" import {Server} from "../../../../server" import {PeerDTO} from "../../../lib/dto/PeerDTO" import {FileDAL} from "../../../lib/dal/fileDAL" import {BlockDTO} from "../../../lib/dto/BlockDTO" -import {connect} from "./connect" -import {Contacter} from "./contacter" -import {pullSandboxToLocalServer} from "./sandbox" import {tx_cleaner} from "./tx_cleaner" import {AbstractDAO} from "./pulling" import {DBBlock} from "../../../lib/db/DBBlock" import {BlockchainService} from "../../../service/BlockchainService" -import {dos2unix} from "../../../lib/common-libs/dos2unix" import {ConfDTO} from "../../../lib/dto/ConfDTO" import {PeeringService} from "../../../service/PeeringService" import {CommonConstants} from "../../../lib/common-libs/constants" import {Underscore} from "../../../lib/common-libs/underscore" -import {HttpMerkleOfPeers} from "../../bma/lib/dtos" -import {DBPeer, JSONDBPeer} from "../../../lib/db/DBPeer" import {cliprogram} from "../../../lib/common-libs/programOptions" import {EventWatcher, LoggerWatcher, MultimeterWatcher, Watcher} from "./sync/Watcher" import {ChunkGetter} from "./sync/ChunkGetter" +import {AbstractSynchronizer} from "./sync/AbstractSynchronizer" const EVAL_REMAINING_INTERVAL = 1000; @@ -43,14 +37,11 @@ export class Synchroniser extends stream.Duplex { private watcher:EventWatcher private speed = 0 private blocksApplied = 0 - private contacterOptions:any constructor( private server:Server, - private host:string, - private port:number, - interactive = false, - private otherDAL?:FileDAL) { + private syncStrategy: AbstractSynchronizer, + interactive = false) { super({ objectMode: true }) @@ -62,13 +53,11 @@ export class Synchroniser extends stream.Duplex { this.watcher.onEvent('sbxChange', (pct: number) => this.push({ sandbox: pct })) this.watcher.onEvent('peersChange', (pct: number) => this.push({ peersSync: pct })) + this.syncStrategy.setWatcher(this.watcher) + if (interactive) { this.logger.mute(); } - - this.contacterOptions = { - timeout: CrawlerConstants.SYNC_LONG_TIMEOUT - } } get conf(): ConfDTO { @@ -108,22 +97,10 @@ export class Synchroniser extends stream.Duplex { } } - async test() { - const peering = await Contacter.fetchPeer(this.host, this.port, this.contacterOptions); - const node = await connect(PeerDTO.fromJSONObject(peering)); - return node.getCurrent(); - } - - async sync(to:number, chunkLen:number, askedCautious = false, noShufflePeers = false) { + async sync(to:number, chunkLen:number, askedCautious = false) { try { - - const peering = await Contacter.fetchPeer(this.host, this.port, this.contacterOptions); - - let peer = PeerDTO.fromJSONObject(peering); - this.logger.info("Try with %s %s", peer.getURL(), peer.pubkey.substr(0, 6)); - let node:any = await connect(peer); - node.pubkey = peer.pubkey; + await this.syncStrategy.init() this.logger.info('Sync started.'); const fullSync = !to; @@ -132,57 +109,23 @@ export class Synchroniser extends stream.Duplex { // Blockchain headers //============ this.logger.info('Getting remote blockchain info...'); - this.watcher.writeStatus('Connecting to ' + this.host + '...'); const lCurrent:DBBlock|null = await this.dal.getCurrentBlockOrNull(); const localNumber = lCurrent ? lCurrent.number : -1; - let rCurrent:BlockDTO + let rCurrent:BlockDTO|null if (isNaN(to)) { - rCurrent = await node.getCurrent(); - } else { - rCurrent = await node.getBlock(to); - } - to = rCurrent.number || 0 - - //======= - // Peers (just for P2P download) - //======= - let peers:(JSONDBPeer|null)[] = []; - if (!cliprogram.nopeers && (to - localNumber > 1000)) { // P2P download if more than 1000 blocs - this.watcher.writeStatus('Peers...'); - const merkle = await this.dal.merkleForPeers(); - const getPeers:(params:any) => Promise<HttpMerkleOfPeers> = node.getPeers.bind(node); - const json2 = await getPeers({}); - const rm = new NodesMerkle(json2); - if(rm.root() != merkle.root()){ - const leavesToAdd:string[] = []; - const json = await getPeers({ leaves: true }); - json.leaves.forEach((leaf:string) => { - if(merkle.leaves().indexOf(leaf) == -1){ - leavesToAdd.push(leaf); - } - }); - peers = await Promise.all(leavesToAdd.map(async (leaf) => { - try { - const json3 = await getPeers({ "leaf": leaf }); - const jsonEntry = json3.leaf.value; - const endpoint = jsonEntry.endpoints[0]; - this.watcher.writeStatus('Peer ' + endpoint); - return jsonEntry; - } catch (e) { - this.logger.warn("Could not get peer of leaf %s, continue...", leaf); - return null; - } - })) + rCurrent = await this.syncStrategy.getCurrent(); + if (!rCurrent) { + throw 'Remote does not have a current block. Sync aborted.' } - else { - this.watcher.writeStatus('Peers already known'); + } else { + rCurrent = await this.syncStrategy.getBlock(to) + if (!rCurrent) { + throw 'Remote does not have a target block. Sync aborted.' } } + to = rCurrent.number || 0 - if (!peers.length) { - peers.push(DBPeer.fromPeerDTO(peer)) - } - peers = peers.filter((p) => p); + await this.syncStrategy.initWithKnownLocalAndToAndCurrency(to, localNumber, rCurrent.currency) //============ // Blockchain @@ -191,21 +134,20 @@ export class Synchroniser extends stream.Duplex { // We use cautious mode if it is asked, or not particulary asked but blockchain has been started const cautious = (askedCautious === true || localNumber >= 0); - const shuffledPeers = (noShufflePeers ? peers : Underscore.shuffle(peers)).filter(p => !!(p)) as JSONDBPeer[] const downloader = new ChunkGetter( - rCurrent.currency, localNumber, to, rCurrent.hash, - shuffledPeers, + this.syncStrategy, this.dal, !cautious, - this.watcher, - this.otherDAL) + this.watcher) downloader.start() let lastPullBlock:BlockDTO|null = null; + let syncStrategy = this.syncStrategy + let node = this.syncStrategy.getPeer() let dao = new (class extends AbstractDAO { @@ -261,14 +203,17 @@ export class Synchroniser extends stream.Duplex { async getRemoteBlock(thePeer: PeerDTO, number: number): Promise<BlockDTO> { let block = null; try { - block = await node.getBlock(number); + block = await syncStrategy.getBlock(number) + if (!block) { + throw 'Could not get remote block' + } tx_cleaner(block.transactions); } catch (e) { if (e.httpCode != 404) { throw e; } } - return block; + return block as BlockDTO } async applyMainBranch(block: BlockDTO): Promise<boolean> { const addedBlock = await this.BlockchainService.submitBlock(block, true) @@ -317,15 +262,14 @@ export class Synchroniser extends stream.Duplex { //======= // Sandboxes //======= - this.watcher.writeStatus('Synchronizing the sandboxes...'); - await pullSandboxToLocalServer(this.conf.currency, node, this.server, this.server.logger, this.watcher, 1, false) + await this.syncStrategy.syncSandbox() } if (!cliprogram.nopeers) { //======= // Peers //======= - await this.syncPeers(fullSync, this.host, this.port, to) + await this.syncStrategy.syncPeers(fullSync, to) } // Trim the loki data @@ -341,120 +285,4 @@ export class Synchroniser extends stream.Duplex { throw err; } } - - async syncPeers(fullSync:boolean, host:string, port:number, to?:number) { - if (!cliprogram.nopeers && fullSync) { - - const peering = await Contacter.fetchPeer(host, port, this.contacterOptions); - - let peer = PeerDTO.fromJSONObject(peering); - this.logger.info("Try with %s %s", peer.getURL(), peer.pubkey.substr(0, 6)); - let node:any = await connect(peer); - node.pubkey = peer.pubkey; - this.logger.info('Sync started.'); - - this.watcher.writeStatus('Peers...'); - await this.syncPeer(node); - const merkle = await this.dal.merkleForPeers(); - const getPeers:(params:any) => Promise<HttpMerkleOfPeers> = node.getPeers.bind(node); - const json2 = await getPeers({}); - const rm = new NodesMerkle(json2); - if(rm.root() != merkle.root()){ - const leavesToAdd:string[] = []; - const json = await getPeers({ leaves: true }); - json.leaves.forEach((leaf:string) => { - if(merkle.leaves().indexOf(leaf) == -1){ - leavesToAdd.push(leaf); - } - }); - for (let i = 0; i < leavesToAdd.length; i++) { - try { - const leaf = leavesToAdd[i] - const json3 = await getPeers({ "leaf": leaf }); - const jsonEntry = json3.leaf.value; - const sign = json3.leaf.value.signature; - const entry:any = {}; - entry.version = jsonEntry.version - entry.currency = jsonEntry.currency - entry.pubkey = jsonEntry.pubkey - entry.endpoints = jsonEntry.endpoints - entry.block = jsonEntry.block - entry.signature = sign; - this.watcher.writeStatus('Peer ' + entry.pubkey); - this.watcher.peersPercent((i + 1) / leavesToAdd.length * 100) - await this.PeeringService.submitP(entry, false, to === undefined); - } catch (e) { - this.logger.warn(e && e.message || e) - } - } - this.watcher.peersPercent(100) - } - else { - this.watcher.writeStatus('Peers already known'); - } - } - } - - //============ - // Peer - //============ - private async syncPeer (node:any) { - - // Global sync vars - const remotePeer = PeerDTO.fromJSONObject({}); - let remoteJsonPeer:any = {}; - const json = await node.getPeer(); - remotePeer.version = json.version - remotePeer.currency = json.currency - remotePeer.pubkey = json.pub - remotePeer.endpoints = json.endpoints - remotePeer.blockstamp = json.block - remotePeer.signature = json.signature - const entry = remotePeer.getRawUnsigned(); - const signature = dos2unix(remotePeer.signature); - // Parameters - if(!(entry && signature)){ - throw 'Requires a peering entry + signature'; - } - - remoteJsonPeer = json; - remoteJsonPeer.pubkey = json.pubkey; - let signatureOK = this.PeeringService.checkPeerSignature(remoteJsonPeer); - if (!signatureOK) { - this.watcher.writeStatus('Wrong signature for peer #' + remoteJsonPeer.pubkey); - } - try { - await this.PeeringService.submitP(remoteJsonPeer); - } catch (err) { - if (err.indexOf !== undefined && err.indexOf(CrawlerConstants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.message) !== -1 && err != CrawlerConstants.ERROR.PEER.UNKNOWN_REFERENCE_BLOCK) { - throw err; - } - } - } -} - -class NodesMerkle { - - private depth:number - private nodesCount:number - private leavesCount:number - private merkleRoot:string - - constructor(json:any) { - this.depth = json.depth - this.nodesCount = json.nodesCount - this.leavesCount = json.leavesCount - this.merkleRoot = json.root; - } - - // var i = 0; - // this.levels = []; - // while(json && json.levels[i]){ - // this.levels.push(json.levels[i]); - // i++; - // } - - root() { - return this.merkleRoot - } } diff --git a/app/modules/crawler/lib/sync/AbstractSynchronizer.ts b/app/modules/crawler/lib/sync/AbstractSynchronizer.ts new file mode 100644 index 0000000000000000000000000000000000000000..4b9215f1307b4f317cc5526a2ade3f2b05a5e04c --- /dev/null +++ b/app/modules/crawler/lib/sync/AbstractSynchronizer.ts @@ -0,0 +1,48 @@ +// Source file from duniter: Crypto-currency software to manage libre currency such as Äž1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +import {BlockDTO} from "../../../../lib/dto/BlockDTO" +import {ISyncDownloader} from "./ISyncDownloader" +import {CommonConstants} from "../../../../lib/common-libs/constants" +import {PeerDTO} from "../../../../lib/dto/PeerDTO" +import {Watcher} from "./Watcher" +import {FileDAL} from "../../../../lib/dal/fileDAL" +import * as path from 'path' + +export abstract class AbstractSynchronizer { + + constructor() { + } + + abstract init(): Promise<void> + abstract initWithKnownLocalAndToAndCurrency(to: number, localNumber: number, currency: string): Promise<void> + abstract getCurrent(): Promise<BlockDTO|null> + abstract getBlock(number: number): Promise<BlockDTO|null> + abstract p2pDownloader(): ISyncDownloader + abstract fsDownloader(): ISyncDownloader + abstract syncPeers(fullSync:boolean, to?:number): Promise<void> + abstract syncSandbox(): Promise<void> + abstract getPeer(): PeerDTO + abstract setWatcher(watcher: Watcher): void + public abstract getCurrency(): string + public abstract getChunksPath(): string + public abstract get readDAL(): FileDAL + + public getChunkRelativePath(i: number) { + return path.join(this.getCurrency(), this.getChunkName(i)) + } + + public getChunkName(i: number) { + return CommonConstants.CHUNK_PREFIX + i + "-" + CommonConstants.CONST_BLOCKS_CHUNK + ".json" + } +} diff --git a/app/modules/crawler/lib/sync/ChunkGetter.ts b/app/modules/crawler/lib/sync/ChunkGetter.ts index 91dca6501b1d4a73baa16a681301a1a76671cee2..2c0f146b2c4860ccba3e9afcc9c2e585046dd1b8 100644 --- a/app/modules/crawler/lib/sync/ChunkGetter.ts +++ b/app/modules/crawler/lib/sync/ChunkGetter.ts @@ -10,10 +10,8 @@ import {DBBlock} from "../../../../lib/db/DBBlock" import {FileDAL} from "../../../../lib/dal/fileDAL" import {Watcher} from "./Watcher" import {cliprogram} from "../../../../lib/common-libs/programOptions" -import {P2PSyncDownloader} from "./P2PSyncDownloader" -import {JSONDBPeer} from "../../../../lib/db/DBPeer" -import {FsSyncDownloader} from "./FsSyncDownloader" import {Querable, querablep} from "../../../../lib/common-libs/querable" +import {AbstractSynchronizer} from "./AbstractSynchronizer" const logger = NewLogger() @@ -54,26 +52,22 @@ export class ChunkGetter { private parallelDownloads = cliprogram.slow ? 1 : 5 private maxDownloadAdvance = 10 // 10 chunks can be downloaded even if 10th chunk above is not completed private MAX_DOWNLOAD_TIMEOUT = 15000 - private readDAL: FileDAL private writeDAL: FileDAL constructor( - private currency:string, private localNumber:number, private to:number, private toHash:string, - private peers:JSONDBPeer[], + private syncStrategy: AbstractSynchronizer, dal:FileDAL, private nocautious:boolean, private watcher:Watcher, - otherDAL?:FileDAL, ) { - this.readDAL = otherDAL || dal this.writeDAL = dal const nbBlocksToDownload = Math.max(0, to - localNumber) this.numberOfChunksToDownload = Math.ceil(nbBlocksToDownload / CommonConstants.CONST_BLOCKS_CHUNK) - this.p2PDownloader = new P2PSyncDownloader(localNumber, to, peers, this.watcher, logger) - this.fsDownloader = new FsSyncDownloader(localNumber, to, this.readDAL, this.getChunkName.bind(this), this.getChunksDir.bind(this)) + this.p2PDownloader = syncStrategy.p2pDownloader() + this.fsDownloader = syncStrategy.fsDownloader() this.resultsDeferers = Array.from({ length: this.numberOfChunksToDownload }).map(() => ({ resolve: () => { throw Error('resolve should not be called here') }, @@ -154,7 +148,7 @@ export class ChunkGetter { ;(handler as any).state = 'WAITING' } if (isTopChunk || this.downloadHandlers[i + 1].state === 'COMPLETED') { - const fileName = this.getChunkName(i) + const fileName = this.syncStrategy.getChunkRelativePath(i) let promiseOfUpperChunk: PromiseOfBlocksReading = async () => [] if (!isTopChunk && chunk.length) { // We need to wait for upper chunk to be completed to be able to check blocks' correct chaining @@ -174,8 +168,9 @@ export class ChunkGetter { } else if (handler.downloader !== this.fsDownloader) { // Store the file to avoid re-downloading if (this.localNumber <= 0 && chunk.length === CommonConstants.CONST_BLOCKS_CHUNK) { - await this.writeDAL.confDAL.coreFS.makeTree(this.currency); - await this.writeDAL.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) }); + await this.writeDAL.confDAL.coreFS.makeTree(this.syncStrategy.getCurrency()) + const content = { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) } + await this.writeDAL.confDAL.coreFS.writeJSON(fileName, content) } } else { logger.warn("Chunk #%s read from filesystem.", i) @@ -209,7 +204,12 @@ export class ChunkGetter { if (isTopChunk) { return await handler.chunk // don't return directly "chunk" as it would prevent the GC to collect it } - return (await this.readDAL.confDAL.coreFS.readJSON(fileName)).blocks + let content: { blocks: BlockDTO[] } = await this.syncStrategy.readDAL.confDAL.coreFS.readJSON(fileName) + if (!content) { + // Reading from classical DAL doesn't work, maybe we are using --readfilesystem option. + content = await this.writeDAL.confDAL.coreFS.readJSON(fileName) + } + return content.blocks }) } } else { @@ -235,14 +235,6 @@ export class ChunkGetter { async getChunk(i: number): Promise<PromiseOfBlocksReading> { return this.resultsData[i] || Promise.resolve(async (): Promise<BlockDTO[]> => []) } - - private getChunkName(i: number) { - return this.getChunksDir() + "chunk_" + i + "-" + CommonConstants.CONST_BLOCKS_CHUNK + ".json" - } - - private getChunksDir() { - return this.currency + "/" - } } export async function chainsCorrectly(blocks:BlockDTO[], readNextChunk: PromiseOfBlocksReading, topNumber: number, topHash: string) { diff --git a/app/modules/crawler/lib/sync/FsSyncDownloader.ts b/app/modules/crawler/lib/sync/FsSyncDownloader.ts index b03c26809b462b49274fe469d1e28c7bf5de4251..7078c6f88784be22295f3c31fd6d389736c06dfa 100644 --- a/app/modules/crawler/lib/sync/FsSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/FsSyncDownloader.ts @@ -1,6 +1,6 @@ import {ISyncDownloader} from "./ISyncDownloader" import {BlockDTO} from "../../../../lib/dto/BlockDTO" -import {FileDAL} from "../../../../lib/dal/fileDAL" +import {FileSystem} from "../../../../lib/system/directory" import * as path from 'path' export class FsSyncDownloader implements ISyncDownloader { @@ -8,26 +8,26 @@ export class FsSyncDownloader implements ISyncDownloader { private ls: Promise<string[]> constructor( - private localNumber:number, - private to:number, - private dal:FileDAL, + private fs: FileSystem, + private basePath: string, private getChunkName:(i: number) => string, - private getChunksDir:() => string, ) { - this.ls = this.dal.confDAL.coreFS.list(getChunksDir()) + this.ls = this.fs.fsList(basePath) } async getChunk(i: number): Promise<BlockDTO[]> { const files = await this.ls - const fileName = this.getChunkName(i) - const basename = path.basename(fileName) + const filepath = path.join(this.basePath, this.getChunkName(i)) + const basename = path.basename(filepath) let existsOnDAL = files.filter(f => f === basename).length === 1 if (!existsOnDAL) { - existsOnDAL = !!(await this.dal.confDAL.coreFS.exists(fileName)) + // We make another try in case the file was created after the initial `ls` test + existsOnDAL = await this.fs.fsExists(filepath) } - if (this.localNumber <= 0 && existsOnDAL) { + if (existsOnDAL) { + const content: any = JSON.parse(await this.fs.fsReadFile(filepath)) // Returns a promise of file content - return (await this.dal.confDAL.coreFS.readJSON(fileName)).blocks + return content.blocks } return [] } diff --git a/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts b/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts new file mode 100644 index 0000000000000000000000000000000000000000..db76e498e80935e3c76edc0f5c235feb2413f67d --- /dev/null +++ b/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts @@ -0,0 +1,114 @@ +// Source file from duniter: Crypto-currency software to manage libre currency such as Äž1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +import {ISyncDownloader} from "./ISyncDownloader" +import {BlockDTO} from "../../../../lib/dto/BlockDTO" +import {PeerDTO} from "../../../../lib/dto/PeerDTO" +import {Watcher} from "./Watcher" +import {PeeringService} from "../../../../service/PeeringService" +import {Server} from "../../../../../server" +import {FileDAL} from "../../../../lib/dal/fileDAL" +import {FsSyncDownloader} from "./FsSyncDownloader" +import {AbstractSynchronizer} from "./AbstractSynchronizer" +import {CommonConstants} from "../../../../lib/common-libs/constants" +import {RealFS} from "../../../../lib/system/directory" + +export class LocalPathSynchronizer extends AbstractSynchronizer { + + private theP2pDownloader: ISyncDownloader + private theFsDownloader: ISyncDownloader + private currency: string + private watcher: Watcher + private ls: Promise<string[]> + + constructor( + private path: string, + private server:Server, + ) { + super() + const fs = RealFS() + this.ls = fs.fsList(path) + // We read from the real file system here, directly. + this.theFsDownloader = new FsSyncDownloader(fs, this.path, this.getChunkName.bind(this)) + this.theP2pDownloader = new FsSyncDownloader(fs, this.path, this.getChunkName.bind(this)) + } + + get dal(): FileDAL { + return this.server.dal + } + + get readDAL(): FileDAL { + return this.dal + } + + get PeeringService(): PeeringService { + return this.server.PeeringService + } + + getCurrency(): string { + return this.currency + } + + getPeer(): PeerDTO { + return this as any + } + + getChunksPath(): string { + return this.path + } + + setWatcher(watcher: Watcher): void { + this.watcher = watcher + } + + async init(): Promise<void> { + // TODO: check that path exists and that files seem consistent + } + + async initWithKnownLocalAndToAndCurrency(to: number, localNumber: number, currency: string): Promise<void> { + this.currency = currency + } + + p2pDownloader(): ISyncDownloader { + return this.theP2pDownloader + } + + fsDownloader(): ISyncDownloader { + return this.theFsDownloader + } + + async getCurrent(): Promise<BlockDTO|null> { + const chunkNumbers: number[] = (await this.ls).map(s => parseInt(s.replace(CommonConstants.CHUNK_PREFIX, ''))) + const topChunk = chunkNumbers.reduce((number, max) => Math.max(number, max), -1) + if (topChunk === -1) { + return null + } + const chunk = await this.theFsDownloader.getChunk(topChunk) + return chunk[chunk.length - 1] // This is the top block of the top chunk = the current block + } + + async getBlock(number: number): Promise<BlockDTO|null> { + const chunkNumber = parseInt(String(number / CommonConstants.CONST_BLOCKS_CHUNK)) + const position = number % CommonConstants.CONST_BLOCKS_CHUNK + const chunk = await this.theFsDownloader.getChunk(chunkNumber) + return chunk[position] + } + + async syncPeers(fullSync: boolean, to?: number): Promise<void> { + // Does nothing on LocalPathSynchronizer + } + + async syncSandbox(): Promise<void> { + // Does nothing on LocalPathSynchronizer + } +} diff --git a/app/modules/crawler/lib/sync/RemoteSynchronizer.ts b/app/modules/crawler/lib/sync/RemoteSynchronizer.ts new file mode 100644 index 0000000000000000000000000000000000000000..8113ec42d4c6a0fcd9ca2ad123e9f0c22e7aaa68 --- /dev/null +++ b/app/modules/crawler/lib/sync/RemoteSynchronizer.ts @@ -0,0 +1,294 @@ +// Source file from duniter: Crypto-currency software to manage libre currency such as Äž1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +import {ISyncDownloader} from "./ISyncDownloader" +import {BlockDTO} from "../../../../lib/dto/BlockDTO" +import {PeerDTO} from "../../../../lib/dto/PeerDTO" +import {Contacter} from "../contacter" +import {connect} from "../connect" +import {NewLogger} from "../../../../lib/logger" +import {CrawlerConstants} from "../constants" +import {HttpMerkleOfPeers} from "../../../bma/lib/dtos" +import {cliprogram} from "../../../../lib/common-libs/programOptions" +import {Watcher} from "./Watcher" +import {dos2unix} from "../../../../lib/common-libs/dos2unix" +import {PeeringService} from "../../../../service/PeeringService" +import {Server} from "../../../../../server" +import {DBPeer, JSONDBPeer} from "../../../../lib/db/DBPeer" +import {Underscore} from "../../../../lib/common-libs/underscore" +import {FileDAL} from "../../../../lib/dal/fileDAL" +import {P2PSyncDownloader} from "./P2PSyncDownloader" +import {FsSyncDownloader} from "./FsSyncDownloader" +import {AbstractSynchronizer} from "./AbstractSynchronizer" +import {pullSandboxToLocalServer} from "../sandbox" +import * as path from 'path' + +const logger = NewLogger() + +export class RemoteSynchronizer extends AbstractSynchronizer { + + private node:Contacter + private peer:PeerDTO + private shuffledPeers: JSONDBPeer[] + private theP2pDownloader: ISyncDownloader + private theFsDownloader: ISyncDownloader + private to: number + private localNumber: number + private currency: string + private watcher: Watcher + private static contacterOptions = { + timeout: CrawlerConstants.SYNC_LONG_TIMEOUT + } + + constructor( + private host: string, + private port: number, + private server:Server, + private noShufflePeers = false, + private otherDAL?:FileDAL, + ) { + super() + } + + get dal(): FileDAL { + return this.server.dal + } + + get readDAL(): FileDAL { + return this.otherDAL || this.dal + } + + get PeeringService(): PeeringService { + return this.server.PeeringService + } + + getCurrency(): string { + return this.currency + } + + getPeer(): PeerDTO { + return this.node as any + } + + setWatcher(watcher: Watcher): void { + this.watcher = watcher + } + + getChunksPath(): string { + return this.getCurrency() + } + + async init(): Promise<void> { + const peering = await Contacter.fetchPeer(this.host, this.port, RemoteSynchronizer.contacterOptions) + this.peer = PeerDTO.fromJSONObject(peering) + logger.info("Try with %s %s", this.peer.getURL(), this.peer.pubkey.substr(0, 6)) + this.node = await connect(this.peer) + ;(this.node as any).pubkey = this.peer.pubkey + this.watcher.writeStatus('Connecting to ' + this.host + '...') + } + + async initWithKnownLocalAndToAndCurrency(to: number, localNumber: number, currency: string): Promise<void> { + this.to = to + this.localNumber = localNumber + this.currency = currency + //======= + // Peers (just for P2P download) + //======= + let peers:(JSONDBPeer|null)[] = []; + if (!cliprogram.nopeers && (to - localNumber > 1000)) { // P2P download if more than 1000 blocs + this.watcher.writeStatus('Peers...'); + const merkle = await this.dal.merkleForPeers(); + const getPeers:(params:any) => Promise<HttpMerkleOfPeers> = this.node.getPeers.bind(this.node); + const json2 = await getPeers({}); + const rm = new NodesMerkle(json2); + if(rm.root() != merkle.root()){ + const leavesToAdd:string[] = []; + const json = await getPeers({ leaves: true }); + json.leaves.forEach((leaf:string) => { + if(merkle.leaves().indexOf(leaf) == -1){ + leavesToAdd.push(leaf); + } + }); + peers = await Promise.all(leavesToAdd.map(async (leaf) => { + try { + const json3 = await getPeers({ "leaf": leaf }); + const jsonEntry = json3.leaf.value; + const endpoint = jsonEntry.endpoints[0]; + this.watcher.writeStatus('Peer ' + endpoint); + return jsonEntry; + } catch (e) { + logger.warn("Could not get peer of leaf %s, continue...", leaf); + return null; + } + })) + } + else { + this.watcher.writeStatus('Peers already known'); + } + } + + if (!peers.length) { + peers.push(DBPeer.fromPeerDTO(this.peer)) + } + peers = peers.filter((p) => p); + this.shuffledPeers = (this.noShufflePeers ? peers : Underscore.shuffle(peers)).filter(p => !!(p)) as JSONDBPeer[] + } + + p2pDownloader(): ISyncDownloader { + if (!this.theP2pDownloader) { + this.theP2pDownloader = new P2PSyncDownloader(this.localNumber, this.to, this.shuffledPeers, this.watcher, logger) + } + return this.theP2pDownloader + } + + fsDownloader(): ISyncDownloader { + if (!this.theFsDownloader) { + this.theFsDownloader = new FsSyncDownloader(this.readDAL.fs, path.join(this.readDAL.rootPath, this.getChunksPath()), this.getChunkName.bind(this)) + } + return this.theFsDownloader + } + + getCurrent(): Promise<BlockDTO|null> { + return this.node.getCurrent() + } + + getBlock(number: number): Promise<BlockDTO|null> { + return this.node.getBlock(number) + } + + static async test(host: string, port: number): Promise<BlockDTO> { + const peering = await Contacter.fetchPeer(host, port, this.contacterOptions); + const node = await connect(PeerDTO.fromJSONObject(peering)); + return node.getCurrent() + } + + async syncPeers(fullSync: boolean, to?: number): Promise<void> { + if (!cliprogram.nopeers && fullSync) { + + const peering = await Contacter.fetchPeer(this.host, this.port, RemoteSynchronizer.contacterOptions); + + let peer = PeerDTO.fromJSONObject(peering); + logger.info("Try with %s %s", peer.getURL(), peer.pubkey.substr(0, 6)); + let node:any = await connect(peer); + node.pubkey = peer.pubkey; + logger.info('Sync started.'); + + this.watcher.writeStatus('Peers...'); + await this.syncPeer(node); + const merkle = await this.dal.merkleForPeers(); + const getPeers:(params:any) => Promise<HttpMerkleOfPeers> = node.getPeers.bind(node); + const json2 = await getPeers({}); + const rm = new NodesMerkle(json2); + if(rm.root() != merkle.root()){ + const leavesToAdd:string[] = []; + const json = await getPeers({ leaves: true }); + json.leaves.forEach((leaf:string) => { + if(merkle.leaves().indexOf(leaf) == -1){ + leavesToAdd.push(leaf); + } + }); + for (let i = 0; i < leavesToAdd.length; i++) { + try { + const leaf = leavesToAdd[i] + const json3 = await getPeers({ "leaf": leaf }); + const jsonEntry = json3.leaf.value; + const sign = json3.leaf.value.signature; + const entry:any = {}; + entry.version = jsonEntry.version + entry.currency = jsonEntry.currency + entry.pubkey = jsonEntry.pubkey + entry.endpoints = jsonEntry.endpoints + entry.block = jsonEntry.block + entry.signature = sign; + this.watcher.writeStatus('Peer ' + entry.pubkey); + this.watcher.peersPercent((i + 1) / leavesToAdd.length * 100) + await this.PeeringService.submitP(entry, false, to === undefined); + } catch (e) { + logger.warn(e && e.message || e) + } + } + this.watcher.peersPercent(100) + } + else { + this.watcher.writeStatus('Peers already known'); + } + } + } + + //============ + // Peer + //============ + private async syncPeer (node:any) { + + // Global sync vars + const remotePeer = PeerDTO.fromJSONObject({}); + const json = await node.getPeer(); + remotePeer.version = json.version + remotePeer.currency = json.currency + remotePeer.pubkey = json.pub + remotePeer.endpoints = json.endpoints + remotePeer.blockstamp = json.block + remotePeer.signature = json.signature + const entry = remotePeer.getRawUnsigned(); + const signature = dos2unix(remotePeer.signature); + // Parameters + if(!(entry && signature)){ + throw 'Requires a peering entry + signature'; + } + + let remoteJsonPeer:any = json + remoteJsonPeer.pubkey = json.pubkey; + let signatureOK = this.PeeringService.checkPeerSignature(remoteJsonPeer); + if (!signatureOK) { + this.watcher.writeStatus('Wrong signature for peer #' + remoteJsonPeer.pubkey); + } + try { + await this.PeeringService.submitP(remoteJsonPeer); + } catch (err) { + if (err.indexOf !== undefined && err.indexOf(CrawlerConstants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.message) !== -1 && err != CrawlerConstants.ERROR.PEER.UNKNOWN_REFERENCE_BLOCK) { + throw err; + } + } + } + + async syncSandbox(): Promise<void> { + this.watcher.writeStatus('Synchronizing the sandboxes...'); + await pullSandboxToLocalServer(this.currency, this.node, this.server, this.server.logger, this.watcher, 1, false) + } +} + +class NodesMerkle { + + private depth:number + private nodesCount:number + private leavesCount:number + private merkleRoot:string + + constructor(json:any) { + this.depth = json.depth + this.nodesCount = json.nodesCount + this.leavesCount = json.leavesCount + this.merkleRoot = json.root; + } + + // var i = 0; + // this.levels = []; + // while(json && json.levels[i]){ + // this.levels.push(json.levels[i]); + // i++; + // } + + root() { + return this.merkleRoot + } +}