diff --git a/.eslintignore b/.eslintignore index a457fd77528e4296ad87ecc7e140a1d3d356f492..0601c0236938e190f3eac13cbc643431a4572284 100644 --- a/.eslintignore +++ b/.eslintignore @@ -36,5 +36,7 @@ app/modules/bma/*.js app/modules/bma/lib/*.js app/modules/bma/lib/entity/*.js app/modules/bma/lib/controllers/*.js +app/modules/crawler/*.js +app/modules/crawler/lib/*.js test/*.js test/**/*.js \ No newline at end of file diff --git a/app/lib/dal/sqliteDAL/MetaDAL.ts b/app/lib/dal/sqliteDAL/MetaDAL.ts index a98dc93a2baa767e12477538e07db6e91709dd89..908c8967494774ad3b5f81afa36f4b1ce842c3a0 100644 --- a/app/lib/dal/sqliteDAL/MetaDAL.ts +++ b/app/lib/dal/sqliteDAL/MetaDAL.ts @@ -276,12 +276,12 @@ export class MetaDAL extends AbstractSQLite<DBMeta> { } })) let amountNotDestroyed = 0; - await _.values(allUpdates).map(async (src:any) => { + await Promise.all(_.values(allUpdates).map(async (src:any) => { const exist = await sindexDAL.getSource(src.identifier, src.pos); if (exist && !exist.consumed) { amountNotDestroyed += src.amount; } - }) + })) } await sindexDAL.insertBatch(sourcesMovements); }, diff --git a/app/lib/db/DBBlock.ts b/app/lib/db/DBBlock.ts index 0a754d09b363c085de80f7aa328e16ae89e06b6c..40cba2e0df490117fca39a8df66cb3cc12a74abe 100644 --- a/app/lib/db/DBBlock.ts +++ b/app/lib/db/DBBlock.ts @@ -40,6 +40,10 @@ export class DBBlock { ) { } + toBlockDTO() { + return BlockDTO.fromJSONObject(this) + } + static fromBlockDTO(b:BlockDTO) { const dbb = new DBBlock() dbb.version = b.version diff --git a/app/lib/dto/ConfDTO.ts b/app/lib/dto/ConfDTO.ts index f1049fdbf001b6c5464df2eda85ad0a5390c98e0..7b2a9777da544448f03f84f485055e3d05548a39 100644 --- a/app/lib/dto/ConfDTO.ts +++ b/app/lib/dto/ConfDTO.ts @@ -6,6 +6,12 @@ export interface Keypair { sec: string } +export interface BranchingDTO { + swichOnTimeAheadBy:number + avgGenTime:number + forksize:number +} + export interface CurrencyConfDTO { c: number dt: number @@ -50,7 +56,7 @@ export interface NetworkConfDTO { httplogs:boolean } -export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO { +export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO, BranchingDTO { constructor( public loglevel: string, diff --git a/app/lib/indexer.ts b/app/lib/indexer.ts index e0d3d52dcb4f63607cc2a846b1ccb778d7b347e5..dad52458fc31392582a958b1d305cbd0e0fc661b 100644 --- a/app/lib/indexer.ts +++ b/app/lib/indexer.ts @@ -300,7 +300,7 @@ export class Indexer { expired_on: null, revokes_on: null, revocation: revocation.revocation, - chainable_on: null, + chainable_on: block.medianTime + conf.msPeriod, // Note: this is useless, because a revoked identity cannot join back. But we let this property for data consistency revoked_on: [block.number, block.hash].join('-'), leaving: false }) diff --git a/app/modules/crawler/index.ts b/app/modules/crawler/index.ts new file mode 100644 index 0000000000000000000000000000000000000000..aa0a078fdabe1b60155a4cc47d407a3fa599313d --- /dev/null +++ b/app/modules/crawler/index.ts @@ -0,0 +1,328 @@ +import { ConfDTO } from "../../lib/dto/ConfDTO"; +import { Server } from "../../../server"; +import {Contacter} from "./lib/contacter" +import {Crawler} from "./lib/crawler" +import {Synchroniser} from "./lib/sync" +import {req2fwd} from "./lib/req2fwd" +import {CrawlerConstants} from "./lib/constants" + +const common = require('duniter-common'); +const Peer = common.document.Peer + +export const CrawlerDependency = { + duniter: { + + config: { + onLoading: async (conf:ConfDTO) => { + conf.swichOnTimeAheadBy = CrawlerConstants.SWITCH_ON_BRANCH_AHEAD_BY_X_MINUTES; + }, + beforeSave: async(conf:ConfDTO) => { + delete conf.swichOnTimeAheadBy + } + }, + + service: { + input: (server:Server, conf:ConfDTO, logger:any) => new Crawler(server, conf, logger) + }, + + methods: { + + contacter: (host:string, port:number, opts:any) => new Contacter(host, port, opts), + + pullBlocks: async (server:Server, pubkey:string) => { + const crawler = new Crawler(server, server.conf, server.logger); + return crawler.pullBlocks(server, pubkey); + }, + + pullSandbox: async (server:Server) => { + const crawler = new Crawler(server, server.conf, server.logger); + return crawler.sandboxPull(server) + }, + + synchronize: (server:Server, onHost:string, onPort:number, upTo:number, chunkLength:number) => { + const remote = new Synchroniser(server, onHost, onPort); + const syncPromise = remote.sync(upTo, chunkLength) + return { + flow: remote, + syncPromise: syncPromise + }; + }, + + testForSync: (server:Server, onHost:string, onPort:number) => { + const remote = new Synchroniser(server, onHost, onPort); + return remote.test(); + } + }, + + cliOptions: [ + { value: '--nointeractive', desc: 'Disable interactive sync UI.'}, + { value: '--nocautious', desc: 'Do not check blocks validity during sync.'}, + { value: '--cautious', desc: 'Check blocks validity during sync (overrides --nocautious option).'}, + { value: '--nopeers', desc: 'Do not retrieve peers during sync.'}, + { value: '--onlypeers', desc: 'Will only try to sync peers.'}, + { value: '--slow', desc: 'Download slowly the blokchcain (for low connnections).'}, + { value: '--minsig <minsig>', desc: 'Minimum pending signatures count for `crawl-lookup`. Default is 5.'} + ], + + cli: [{ + name: 'sync [host] [port] [to]', + desc: 'Synchronize blockchain from a remote Duniter node', + preventIfRunning: true, + onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { + const host = params[0]; + const port = params[1]; + const to = params[2]; + if (!host) { + throw 'Host is required.'; + } + if (!port) { + throw 'Port is required.'; + } + let cautious; + if (program.nocautious) { + cautious = false; + } + if (program.cautious) { + cautious = true; + } + const onHost = host; + const onPort = port; + const upTo = parseInt(to); + const chunkLength = 0; + const interactive = !program.nointeractive; + const askedCautious = cautious; + const nopeers = program.nopeers; + const noShufflePeers = program.noshuffle; + const remote = new Synchroniser(server, onHost, onPort, interactive === true, program.slow === true); + if (program.onlypeers === true) { + return remote.syncPeers(nopeers, true, onHost, onPort) + } else { + return remote.sync(upTo, chunkLength, askedCautious, nopeers, noShufflePeers === true) + } + } + }, { + name: 'peer [host] [port]', + desc: 'Exchange peerings with another node', + preventIfRunning: true, + onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { + const host = params[0]; + const port = params[1]; + const logger = server.logger; + try { + const ERASE_IF_ALREADY_RECORDED = true; + logger.info('Fetching peering record at %s:%s...', host, port); + let peering = await Contacter.fetchPeer(host, port); + logger.info('Apply peering ...'); + await server.PeeringService.submitP(peering, ERASE_IF_ALREADY_RECORDED, !program.nocautious); + logger.info('Applied'); + let selfPeer = await server.dal.getPeer(server.PeeringService.pubkey); + if (!selfPeer) { + await server.PeeringService.generateSelfPeer(server.conf, 0) + selfPeer = await server.dal.getPeer(server.PeeringService.pubkey); + } + logger.info('Send self peering ...'); + const p = Peer.fromJSON(peering); + const contact = new Contacter(p.getHostPreferDNS(), p.getPort(), {}) + await contact.postPeer(Peer.fromJSON(selfPeer)); + logger.info('Sent.'); + await server.disconnect(); + } catch(e) { + logger.error(e.code || e.message || e); + throw Error("Exiting"); + } + } + }, { + name: 'import <fromHost> <fromPort> <search> <toHost> <toPort>', + desc: 'Import all pending data from matching <search>', + onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { + const fromHost = params[0]; + const fromPort = params[1]; + const search = params[2]; + const toHost = params[3]; + const toPort = params[4]; + const logger = server.logger; + try { + const peers = fromHost && fromPort ? [{ endpoints: [['BASIC_MERKLED_API', fromHost, fromPort].join(' ')] }] : await server.dal.peerDAL.query('SELECT * FROM peer WHERE status = ?', ['UP']) + // Memberships + for (const p of peers) { + const peer = Peer.fromJSON(p); + const fromHost = peer.getHostPreferDNS(); + const fromPort = peer.getPort(); + logger.info('Looking at %s:%s...', fromHost, fromPort); + try { + const node = new Contacter(fromHost, fromPort, { timeout: 10000 }); + const requirements = await node.getRequirements(search); + await req2fwd(requirements, toHost, toPort, logger) + } catch (e) { + logger.error(e); + } + } + await server.disconnect(); + } catch(e) { + logger.error(e); + throw Error("Exiting"); + } + } + }, { + name: 'import-lookup [search] [fromhost] [fromport] [tohost] [toport]', + desc: 'Exchange peerings with another node', + onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { + const search = params[0]; + const fromhost = params[1]; + const fromport = params[2]; + const tohost = params[3]; + const toport = params[4]; + const logger = server.logger; + try { + logger.info('Looking for "%s" at %s:%s...', search, fromhost, fromport); + const sourcePeer = new Contacter(fromhost, fromport); + const targetPeer = new Contacter(tohost, toport); + const lookup = await sourcePeer.getLookup(search); + for (const res of lookup.results) { + for (const uid of res.uids) { + const rawIdty = common.rawer.getOfficialIdentity({ + currency: 'g1', + issuer: res.pubkey, + uid: uid.uid, + buid: uid.meta.timestamp, + sig: uid.self + }); + logger.info('Success idty %s', uid.uid); + try { + await targetPeer.postIdentity(rawIdty); + } catch (e) { + logger.error(e); + } + for (const received of uid.others) { + const rawCert = common.rawer.getOfficialCertification({ + currency: 'g1', + issuer: received.pubkey, + idty_issuer: res.pubkey, + idty_uid: uid.uid, + idty_buid: uid.meta.timestamp, + idty_sig: uid.self, + buid: common.buid.format.buid(received.meta.block_number, received.meta.block_hash), + sig: received.signature + }); + try { + logger.info('Success cert %s -> %s', received.pubkey.slice(0, 8), uid.uid); + await targetPeer.postCert(rawCert); + } catch (e) { + logger.error(e); + } + } + } + } + const certBy = await sourcePeer.getCertifiedBy(search) + const mapBlocks:any = {} + for (const signed of certBy.certifications) { + if (signed.written) { + logger.info('Already written cert %s -> %s', certBy.pubkey.slice(0, 8), signed.uid) + } else { + const lookupIdty = await sourcePeer.getLookup(signed.pubkey); + let idty = null + for (const result of lookupIdty.results) { + for (const uid of result.uids) { + if (uid.uid === signed.uid && result.pubkey === signed.pubkey && uid.meta.timestamp === signed.sigDate) { + idty = uid + } + } + } + let block = mapBlocks[signed.cert_time.block] + if (!block) { + block = await sourcePeer.getBlock(signed.cert_time.block) + mapBlocks[block.number] = block + } + const rawCert = common.rawer.getOfficialCertification({ + currency: 'g1', + issuer: certBy.pubkey, + idty_issuer: signed.pubkey, + idty_uid: signed.uid, + idty_buid: idty.meta.timestamp, + idty_sig: idty.self, + buid: common.buid.format.buid(block.number, block.hash), + sig: signed.signature + }); + try { + logger.info('Success cert %s -> %s', certBy.pubkey.slice(0, 8), signed.uid); + await targetPeer.postCert(rawCert); + } catch (e) { + logger.error(e); + } + } + } + logger.info('Sent.'); + await server.disconnect(); + } catch(e) { + logger.error(e); + throw Error("Exiting"); + } + } + }, { + name: 'crawl-lookup <toHost> <toPort> [<fromHost> [<fromPort>]]', + desc: 'Make a full network scan and rebroadcast every WoT pending document (identity, certification, membership)', + onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { + const toHost = params[0] + const toPort = params[1] + const fromHost = params[2] + const fromPort = params[3] + const logger = server.logger; + try { + const peers = fromHost && fromPort ? [{ endpoints: [['BASIC_MERKLED_API', fromHost, fromPort].join(' ')] }] : await server.dal.peerDAL.query('SELECT * FROM peer WHERE status = ?', ['UP']) + // Memberships + for (const p of peers) { + const peer = Peer.fromJSON(p); + const fromHost = peer.getHostPreferDNS(); + const fromPort = peer.getPort(); + logger.info('Looking at %s:%s...', fromHost, fromPort); + try { + const node = new Contacter(fromHost, fromPort, { timeout: 10000 }); + const requirements = await node.getRequirementsPending(program.minsig || 5); + await req2fwd(requirements, toHost, toPort, logger) + } catch (e) { + logger.error(e); + } + } + await server.disconnect(); + } catch(e) { + logger.error(e); + throw Error("Exiting"); + } + } + }, { + name: 'fwd-pending-ms', + desc: 'Forwards all the local pending memberships to target node', + onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => { + const logger = server.logger; + try { + const pendingMSS = await server.dal.msDAL.getPendingIN() + const targetPeer = new Contacter('g1.cgeek.fr', 80, { timeout: 5000 }); + // Membership + let rawMS + for (const theMS of pendingMSS) { + console.log('New membership pending for %s', theMS.uid); + try { + rawMS = common.rawer.getMembership({ + currency: 'g1', + issuer: theMS.issuer, + block: theMS.block, + membership: theMS.membership, + userid: theMS.userid, + certts: theMS.certts, + signature: theMS.signature + }); + await targetPeer.postRenew(rawMS); + logger.info('Success ms idty %s', theMS.userid); + } catch (e) { + logger.warn(e); + } + } + await server.disconnect(); + } catch(e) { + logger.error(e); + throw Error("Exiting"); + } + } + }] + } +} diff --git a/app/modules/crawler/lib/connect.ts b/app/modules/crawler/lib/connect.ts new file mode 100644 index 0000000000000000000000000000000000000000..af727a516d42d31970a65002338ba7368deadc77 --- /dev/null +++ b/app/modules/crawler/lib/connect.ts @@ -0,0 +1,10 @@ +import {CrawlerConstants} from "./constants" +import {Contacter} from "./contacter" + +const DEFAULT_HOST = 'localhost'; + +export const connect = (peer:any, timeout:number|null = null) => { + return Promise.resolve(new Contacter(peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST, peer.getPort(), { + timeout: timeout || CrawlerConstants.DEFAULT_TIMEOUT + })) +} diff --git a/app/modules/crawler/lib/constants.ts b/app/modules/crawler/lib/constants.ts new file mode 100644 index 0000000000000000000000000000000000000000..32c976546396af1962a59dc2a76717adcc11aeb1 --- /dev/null +++ b/app/modules/crawler/lib/constants.ts @@ -0,0 +1,40 @@ +const common = require('duniter-common') + +export const CrawlerConstants = { + + PEER_LONG_DOWN: 3600 * 24 * 2, // 48h + SYNC_LONG_TIMEOUT: 30 * 1000, // 30 seconds + DEFAULT_TIMEOUT: 10 * 1000, // 10 seconds + + SWITCH_ON_BRANCH_AHEAD_BY_X_MINUTES: 30, + TRANSACTION_VERSION: common.constants.TRANSACTION_VERSION, + FORK_ALLOWED: true, + MAX_NUMBER_OF_PEERS_FOR_PULLING: 4, + PULLING_MINIMAL_DELAY: 20, + PULLING_INTERVAL_TARGET: 240, + COUNT_FOR_ENOUGH_PEERS: 4, + SANDBOX_PEERS_COUNT: 2, + SANDBOX_CHECK_INTERVAL: 288, // Every day (288 blocks = 1 day) + TEST_PEERS_INTERVAL: 10, // In seconds + SYNC_PEERS_INTERVAL: 3, // Every 3 block average generation time + + DURATIONS: { + TEN_SECONDS: 10, + A_MINUTE: 60, + TEN_MINUTES: 600, + AN_HOUR: 3600, + A_DAY: 3600 * 24, + A_WEEK: 3600 * 24 * 7, + A_MONTH: (3600 * 24 * 365.25) / 12 + }, + + ERRORS: { + NEWER_PEER_DOCUMENT_AVAILABLE: { httpCode: 409, uerr: { ucode: 2022, message: "A newer peer document is available" }}, + }, + + ERROR: { + PEER: { + UNKNOWN_REFERENCE_BLOCK: 'Unknown reference block of peer' + } + } +} \ No newline at end of file diff --git a/app/modules/crawler/lib/contacter.ts b/app/modules/crawler/lib/contacter.ts new file mode 100644 index 0000000000000000000000000000000000000000..5011485218778f1f5c981d533b462f31a6f83712 --- /dev/null +++ b/app/modules/crawler/lib/contacter.ts @@ -0,0 +1,164 @@ +import {CrawlerConstants} from "./constants" + +const rp = require('request-promise'); +const sanitize = require('duniter-bma').duniter.methods.sanitize; +const dtos = require('duniter-bma').duniter.methods.dtos; + +export class Contacter { + + options:{ timeout:number } + fullyQualifiedHost:string + + constructor(private host:string, private port:number, opts:any = {}) { + this.options = { + timeout: (opts && opts.timeout) || CrawlerConstants.DEFAULT_TIMEOUT + } + // We suppose that IPv6 is already wrapped by [], for example 'http://[::1]:80/index.html' + this.fullyQualifiedHost = [host, port].join(':'); + } + + getSummary() { + return this.get('/node/summary/', dtos.Summary) + } + + getCertifiedBy(search:string) { + return this.get('/wot/certified-by/' + search, dtos.Certifications) + } + + getRequirements(search:string) { + return this.get('/wot/requirements/' + search, dtos.Requirements) + } + + getRequirementsPending(minsig:number) { + return this.get('/wot/requirements-of-pending/' + minsig, dtos.Requirements) + } + + getLookup(search:string) { + return this.get('/wot/lookup/', dtos.Lookup, search) + } + + getBlock(number:number) { + return this.get('/blockchain/block/', dtos.Block, number) + } + + getCurrent() { + return this.get('/blockchain/current', dtos.Block) + } + + getPeer() { + return this.get('/network/peering', dtos.Peer) + } + + getPeers(obj:any) { + return this.get('/network/peering/peers', dtos.MerkleOfPeers, obj) + } + + getSources(pubkey:string) { + return this.get('/tx/sources/', dtos.Sources, pubkey) + } + + getBlocks(count:number, fromNumber:number) { + return this.get('/blockchain/blocks/', dtos.Blocks, [count, fromNumber].join('/')) + } + + postPeer(peer:any) { + return this.post('/network/peering/peers', dtos.Peer, { peer: peer }) + } + + postIdentity(raw:string) { + return this.post('/wot/add', dtos.Identity, { identity: raw }) + } + + postCert(cert:string) { + return this.post('/wot/certify', dtos.Cert, { cert: cert}) + } + + postRenew(ms:string) { + return this.post('/blockchain/membership', dtos.Membership, { membership: ms }) + } + + wotPending() { + return this.get('/wot/pending', dtos.MembershipList) + } + + wotMembers() { + return this.get('/wot/members', dtos.Members) + } + + postBlock(rawBlock:string) { + return this.post('/blockchain/block', dtos.Block, { block: rawBlock }) + } + + processTransaction(rawTX:string) { + return this.post('/tx/process', dtos.Transaction, { transaction: rawTX }) + } + + private async get(url:string, dtoContract:any, param?:any) { + if (typeof param === 'object') { + // Classical URL params (a=1&b=2&...) + param = '?' + Object.keys(param).map((k) => [k, param[k]].join('=')).join('&'); + } + try { + const json = await rp.get({ + url: Contacter.protocol(this.port) + this.fullyQualifiedHost + url + (param !== undefined ? param : ''), + json: true, + timeout: this.options.timeout + }); + // Prevent JSON injection + return sanitize(json, dtoContract); + } catch (e) { + throw e.error; + } + } + + private async post(url:string, dtoContract:any, data:any) { + try { + const json = await rp.post({ + url: Contacter.protocol(this.port) + this.fullyQualifiedHost + url, + body: data, + json: true, + timeout: this.options.timeout + }); + // Prevent JSON injection + return sanitize(json, dtoContract); + } catch (e) { + throw e.error; + } + } + + static protocol(port:number) { + return port == 443 ? 'https://' : 'http://'; + } + + static async quickly(host:string, port:number, opts:any, callbackPromise:any) { + const node = new Contacter(host, port, opts); + return callbackPromise(node); + } + + static async quickly2(peer:any, opts:any, callbackPromise:any) { + const Peer = require('./entity/peer'); + const p = Peer.fromJSON(peer); + const node = new Contacter(p.getHostPreferDNS(), p.getPort(), opts); + return callbackPromise(node); + } + + static fetchPeer(host:string, port:number, opts:any = {}) { + return Contacter.quickly(host, port, opts, (node:any) => node.getPeer()) + } + + static fetchBlock(number:number, peer:any, opts:any = {}) { + return Contacter.quickly2(peer, opts, (node:any) => node.getBlock(number)) + } + + static async isReachableFromTheInternet(peer:any, opts:any) { + const Peer = require('./entity/peer'); + const p = Peer.fromJSON(peer); + const node = new Contacter(p.getHostPreferDNS(), p.getPort(), opts); + try { + await node.getPeer(); + return true; + } catch (e) { + return false; + } + } +} \ No newline at end of file diff --git a/app/modules/crawler/lib/crawler.ts b/app/modules/crawler/lib/crawler.ts new file mode 100644 index 0000000000000000000000000000000000000000..5e67bc1d5948d27352b8195ac4c2303c2fcca1c7 --- /dev/null +++ b/app/modules/crawler/lib/crawler.ts @@ -0,0 +1,513 @@ +import * as stream from 'stream' +import {Server} from "../../../../server" +import {ConfDTO} from "../../../lib/dto/ConfDTO" +import {DuniterService} from "../../../../index"; +import { PeerDTO } from "../../../lib/dto/PeerDTO"; +import { AbstractDAO } from "./pulling"; +import { BlockDTO } from "../../../lib/dto/BlockDTO"; +import { DBBlock } from "../../../lib/db/DBBlock"; +import { tx_cleaner } from "./tx_cleaner"; +import { connect } from "./connect"; +import { CrawlerConstants } from "./constants"; +import { pullSandboxToLocalServer } from "./sandbox"; +import { cleanLongDownPeers } from "./garbager"; + +const _ = require('underscore'); +const async = require('async'); +const querablep = require('querablep'); +const Peer = require('duniter-common').document.Peer; + +/** + * Service which triggers the server's peering generation (actualization of the Peer document). + * @constructor + */ +export class Crawler extends stream.Transform implements DuniterService { + + peerCrawler:PeerCrawler + peerTester:PeerTester + blockCrawler:BlockCrawler + sandboxCrawler:SandboxCrawler + + constructor( + private server:Server, + private conf:ConfDTO, + private logger:any) { + super({ objectMode: true }) + + this.peerCrawler = new PeerCrawler(server, conf, logger) + this.peerTester = new PeerTester(server, conf, logger) + this.blockCrawler = new BlockCrawler(server, logger, this) + this.sandboxCrawler = new SandboxCrawler(server, conf, logger) + } + + pullBlocks(server:Server, pubkey:string) { + return this.blockCrawler.pullBlocks(server, pubkey) + } + + sandboxPull(server:Server) { + return this.sandboxCrawler.sandboxPull(server) + } + + startService() { + return Promise.all([ + this.peerCrawler.startService(), + this.peerTester.startService(), + this.blockCrawler.startService(), + this.sandboxCrawler.startService() + ]) + } + + stopService() { + return Promise.all([ + this.peerCrawler.stopService(), + this.peerTester.stopService(), + this.blockCrawler.stopService(), + this.sandboxCrawler.stopService() + ]) + } +} + +export class PeerCrawler implements DuniterService { + + private DONT_IF_MORE_THAN_FOUR_PEERS = true; + private crawlPeersInterval:NodeJS.Timer + private crawlPeersFifo = async.queue((task:any, callback:any) => task(callback), 1); + + constructor( + private server:Server, + private conf:ConfDTO, + private logger:any) {} + + async startService() { + if (this.crawlPeersInterval) + clearInterval(this.crawlPeersInterval); + this.crawlPeersInterval = setInterval(() => this.crawlPeersFifo.push((cb:any) => this.crawlPeers(this.server, this.conf).then(cb).catch(cb)), 1000 * this.conf.avgGenTime * CrawlerConstants.SYNC_PEERS_INTERVAL); + await this.crawlPeers(this.server, this.conf, this.DONT_IF_MORE_THAN_FOUR_PEERS); + } + + async stopService() { + this.crawlPeersFifo.kill(); + clearInterval(this.crawlPeersInterval); + } + + private async crawlPeers(server:Server, conf:ConfDTO, dontCrawlIfEnoughPeers = false) { + this.logger.info('Crawling the network...'); + const peers = await server.dal.listAllPeersWithStatusNewUPWithtout(conf.pair.pub); + if (peers.length > CrawlerConstants.COUNT_FOR_ENOUGH_PEERS && dontCrawlIfEnoughPeers == this.DONT_IF_MORE_THAN_FOUR_PEERS) { + return; + } + let peersToTest = peers.slice().map((p:PeerDTO) => Peer.fromJSON(p)); + let tested:string[] = []; + const found = []; + while (peersToTest.length > 0) { + const results = await Promise.all(peersToTest.map((p:PeerDTO) => this.crawlPeer(server, p))) + tested = tested.concat(peersToTest.map((p:PeerDTO) => p.pubkey)); + // End loop condition + peersToTest.splice(0); + // Eventually continue the loop + for (let i = 0, len = results.length; i < len; i++) { + const res:any = results[i]; + for (let j = 0, len2 = res.length; j < len2; j++) { + try { + const subpeer = res[j].leaf.value; + if (subpeer.currency && tested.indexOf(subpeer.pubkey) === -1) { + const p = Peer.fromJSON(subpeer); + peersToTest.push(p); + found.push(p); + } + } catch (e) { + this.logger.warn('Invalid peer %s', res[j]); + } + } + } + // Make unique list + peersToTest = _.uniq(peersToTest, false, (p:PeerDTO) => p.pubkey); + } + this.logger.info('Crawling done.'); + for (let i = 0, len = found.length; i < len; i++) { + let p = found[i]; + try { + // Try to write it + p.documentType = 'peer'; + await server.singleWritePromise(p); + } catch(e) { + // Silent error + } + } + await cleanLongDownPeers(server, Date.now()); + } + + private async crawlPeer(server:Server, aPeer:PeerDTO) { + let subpeers:any[] = []; + try { + this.logger.debug('Crawling peers of %s %s', aPeer.pubkey.substr(0, 6), aPeer.getNamedURL()); + const node = await connect(aPeer); + await checkPeerValidity(server, aPeer, node); + const json = await node.getPeers.bind(node)({ leaves: true }); + for (let i = 0, len = json.leaves.length; i < len; i++) { + let leaf = json.leaves[i]; + let subpeer = await node.getPeers.bind(node)({ leaf: leaf }); + subpeers.push(subpeer); + } + return subpeers; + } catch (e) { + return subpeers; + } + } +} + +export class SandboxCrawler implements DuniterService { + + private pullInterval:NodeJS.Timer + private pullFifo = async.queue((task:any, callback:any) => task(callback), 1); + + constructor( + private server:Server, + private conf:ConfDTO, + private logger:any) {} + + async startService() { + if (this.pullInterval) + clearInterval(this.pullInterval); + this.pullInterval = setInterval(() => this.pullFifo.push((cb:any) => this.sandboxPull(this.server).then(cb).catch(cb)), 1000 * this.conf.avgGenTime * CrawlerConstants.SANDBOX_CHECK_INTERVAL); + } + + async stopService() { + this.pullFifo.kill(); + clearInterval(this.pullInterval); + } + + async sandboxPull(server:Server) { + this.logger && this.logger.info('Sandbox pulling started...'); + const peers = await server.dal.getRandomlyUPsWithout([this.conf.pair.pub]) + const randoms = chooseXin(peers, CrawlerConstants.SANDBOX_PEERS_COUNT) + let peersToTest = randoms.slice().map((p) => Peer.fromJSON(p)); + for (const peer of peersToTest) { + const fromHost = await connect(peer) + await pullSandboxToLocalServer(server.conf.currency, fromHost, server, this.logger) + } + this.logger && this.logger.info('Sandbox pulling done.'); + } +} + +export class PeerTester implements DuniterService { + + private FIRST_CALL = true + private testPeerFifo = async.queue((task:any, callback:any) => task(callback), 1); + private testPeerFifoInterval:NodeJS.Timer + + constructor( + private server:Server, + private conf:ConfDTO, + private logger:any) {} + + async startService() { + if (this.testPeerFifoInterval) + clearInterval(this.testPeerFifoInterval); + this.testPeerFifoInterval = setInterval(() => this.testPeerFifo.push((cb:any) => this.testPeers.bind(null, this.server, this.conf, !this.FIRST_CALL)().then(cb).catch(cb)), 1000 * CrawlerConstants.TEST_PEERS_INTERVAL); + await this.testPeers(this.server, this.conf, this.FIRST_CALL); + } + + async stopService() { + clearInterval(this.testPeerFifoInterval); + this.testPeerFifo.kill(); + } + + private async testPeers(server:Server, conf:ConfDTO, displayDelays:boolean) { + let peers = await server.dal.listAllPeers(); + let now = (new Date().getTime()); + peers = _.filter(peers, (p:any) => p.pubkey != conf.pair.pub); + await Promise.all(peers.map(async (thePeer:any) => { + let p = Peer.fromJSON(thePeer); + if (thePeer.status == 'DOWN') { + let shouldDisplayDelays = displayDelays; + let downAt = thePeer.first_down || now; + let waitRemaining = this.getWaitRemaining(now, downAt, thePeer.last_try); + let nextWaitRemaining = this.getWaitRemaining(now, downAt, now); + let testIt = waitRemaining <= 0; + if (testIt) { + // We try to reconnect only with peers marked as DOWN + try { + this.logger.trace('Checking if node %s is UP... (%s:%s) ', p.pubkey.substr(0, 6), p.getHostPreferDNS(), p.getPort()); + // We register the try anyway + await server.dal.setPeerDown(p.pubkey); + // Now we test + let node = await connect(p); + let peering = await node.getPeer(); + await checkPeerValidity(server, p, node); + // The node answered, it is no more DOWN! + this.logger.info('Node %s (%s:%s) is UP!', p.pubkey.substr(0, 6), p.getHostPreferDNS(), p.getPort()); + await server.dal.setPeerUP(p.pubkey); + // We try to forward its peering entry + let sp1 = peering.block.split('-'); + let currentBlockNumber = sp1[0]; + let currentBlockHash = sp1[1]; + let sp2 = peering.block.split('-'); + let blockNumber = sp2[0]; + let blockHash = sp2[1]; + if (!(currentBlockNumber == blockNumber && currentBlockHash == blockHash)) { + // The peering changed + await server.PeeringService.submitP(peering); + } + // Do not need to display when next check will occur: the node is now UP + shouldDisplayDelays = false; + } catch (err) { + if (!err) { + err = "NO_REASON" + } + // Error: we set the peer as DOWN + this.logger.trace("Peer %s is DOWN (%s)", p.pubkey, (err.httpCode && 'HTTP ' + err.httpCode) || err.code || err.message || err); + await server.dal.setPeerDown(p.pubkey); + shouldDisplayDelays = true; + } + } + if (shouldDisplayDelays) { + this.logger.debug('Will check that node %s (%s:%s) is UP in %s min...', p.pubkey.substr(0, 6), p.getHostPreferDNS(), p.getPort(), (nextWaitRemaining / 60).toFixed(0)); + } + } + })) + } + + private getWaitRemaining(now:number, downAt:number, last_try:number) { + let downDelay = Math.floor((now - downAt) / 1000); + let waitedSinceLastTest = Math.floor((now - (last_try || now)) / 1000); + let waitRemaining = 1; + if (downDelay <= CrawlerConstants.DURATIONS.A_MINUTE) { + waitRemaining = CrawlerConstants.DURATIONS.TEN_SECONDS - waitedSinceLastTest; + } + else if (downDelay <= CrawlerConstants.DURATIONS.TEN_MINUTES) { + waitRemaining = CrawlerConstants.DURATIONS.A_MINUTE - waitedSinceLastTest; + } + else if (downDelay <= CrawlerConstants.DURATIONS.AN_HOUR) { + waitRemaining = CrawlerConstants.DURATIONS.TEN_MINUTES - waitedSinceLastTest; + } + else if (downDelay <= CrawlerConstants.DURATIONS.A_DAY) { + waitRemaining = CrawlerConstants.DURATIONS.AN_HOUR - waitedSinceLastTest; + } + else if (downDelay <= CrawlerConstants.DURATIONS.A_WEEK) { + waitRemaining = CrawlerConstants.DURATIONS.A_DAY - waitedSinceLastTest; + } + else if (downDelay <= CrawlerConstants.DURATIONS.A_MONTH) { + waitRemaining = CrawlerConstants.DURATIONS.A_WEEK - waitedSinceLastTest; + } + // Else do not check it, DOWN for too long + return waitRemaining; + } +} + +export class BlockCrawler { + + private CONST_BLOCKS_CHUNK = 50 + private pullingActualIntervalDuration = CrawlerConstants.PULLING_MINIMAL_DELAY + private programStart = Date.now() + private syncBlockFifo = async.queue((task:any, callback:any) => task(callback), 1) + private syncBlockInterval:NodeJS.Timer + + constructor( + private server:Server, + private logger:any, + private PROCESS:stream.Transform) { + } + + async startService() { + if (this.syncBlockInterval) + clearInterval(this.syncBlockInterval); + this.syncBlockInterval = setInterval(() => this.syncBlockFifo.push((cb:any) => this.syncBlock(this.server).then(cb).catch(cb)), 1000 * this.pullingActualIntervalDuration); + this.syncBlock(this.server); + } + + async stopService() { + clearInterval(this.syncBlockInterval); + this.syncBlockFifo.kill(); + } + + pullBlocks(server:Server, pubkey:string) { + return this.syncBlock(server, pubkey) + } + + private async syncBlock(server:Server, pubkey:string = "") { + // Eventually change the interval duration + const minutesElapsed = Math.ceil((Date.now() - this.programStart) / (60 * 1000)); + const FACTOR = Math.sin((minutesElapsed / CrawlerConstants.PULLING_INTERVAL_TARGET) * (Math.PI / 2)); + // Make the interval always higher than before + const pullingTheoreticalIntervalNow = Math.max(Math.max(FACTOR * CrawlerConstants.PULLING_INTERVAL_TARGET, CrawlerConstants.PULLING_MINIMAL_DELAY), this.pullingActualIntervalDuration); + if (pullingTheoreticalIntervalNow !== this.pullingActualIntervalDuration) { + this.pullingActualIntervalDuration = pullingTheoreticalIntervalNow; + // Change the interval + if (this.syncBlockInterval) + clearInterval(this.syncBlockInterval); + this.syncBlockInterval = setInterval(() => this.syncBlockFifo.push((cb:any) => this.syncBlock(server).then(cb).catch(cb)), 1000 * this.pullingActualIntervalDuration); + } + + try { + let current = await server.dal.getCurrentBlockOrNull(); + if (current) { + this.pullingEvent(server, 'start', current.number); + this.logger && this.logger.info("Pulling blocks from the network..."); + let peers = await server.dal.findAllPeersNEWUPBut([server.conf.pair.pub]); + peers = _.shuffle(peers); + if (pubkey) { + _(peers).filter((p:any) => p.pubkey == pubkey); + } + // Shuffle the peers + peers = _.shuffle(peers); + // Only take at max X of them + peers = peers.slice(0, CrawlerConstants.MAX_NUMBER_OF_PEERS_FOR_PULLING); + await Promise.all(peers.map(async (thePeer:any, i:number) => { + let p = Peer.fromJSON(thePeer); + this.pullingEvent(server, 'peer', _.extend({number: i, length: peers.length}, p)); + this.logger && this.logger.trace("Try with %s %s", p.getURL(), p.pubkey.substr(0, 6)); + try { + let node:any = await connect(p); + node.pubkey = p.pubkey; + await checkPeerValidity(server, p, node); + + let dao = new (class extends AbstractDAO { + + private lastDownloaded:BlockDTO|null + + constructor(private crawler:BlockCrawler) { + super() + } + + async localCurrent(): Promise<DBBlock | null> { + return server.dal.getCurrentBlockOrNull() + } + async remoteCurrent(source?: any): Promise<BlockDTO | null> { + return thePeer.getCurrent() + } + async remotePeers(source?: any): Promise<PeerDTO[]> { + return Promise.resolve([node]) + } + async getLocalBlock(number: number): Promise<DBBlock> { + return server.dal.getBlock(number) + } + async getRemoteBlock(thePeer: any, number: number): Promise<BlockDTO> { + let block = null; + try { + block = await thePeer.getBlock(number); + tx_cleaner(block.transactions); + } catch (e) { + if (e.httpCode != 404) { + throw e; + } + } + return block; + } + async applyMainBranch(block: BlockDTO): Promise<boolean> { + let addedBlock = await server.BlockchainService.submitBlock(block, true, CrawlerConstants.FORK_ALLOWED); + if (!this.lastDownloaded) { + this.lastDownloaded = await dao.remoteCurrent(node); + } + this.crawler.pullingEvent(server, 'applying', {number: block.number, last: this.lastDownloaded && this.lastDownloaded.number}); + if (addedBlock) { + current = addedBlock; + server.streamPush(addedBlock); + } + return true + } + async removeForks(): Promise<boolean> { + return true + } + async isMemberPeer(thePeer: PeerDTO): Promise<boolean> { + let idty = await server.dal.getWrittenIdtyByPubkey(thePeer.pubkey); + return (idty && idty.member) || false; + } + async downloadBlocks(thePeer: any, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> { + if (!count) { + count = this.crawler.CONST_BLOCKS_CHUNK; + } + let blocks = await thePeer.getBlocks(count, fromNumber); + // Fix for #734 + for (const block of blocks) { + for (const tx of block.transactions) { + tx.version = CrawlerConstants.TRANSACTION_VERSION; + } + } + return blocks; + } + })(this) + + await dao.pull(server.conf, server.logger); + } catch (e) { + if (this.isConnectionError(e)) { + this.logger && this.logger.info("Peer %s unreachable: now considered as DOWN.", p.pubkey); + await server.dal.setPeerDown(p.pubkey); + } + else if (e.httpCode == 404) { + this.logger && this.logger.trace("No new block from %s %s", p.pubkey.substr(0, 6), p.getURL()); + } + else { + this.logger && this.logger.warn(e); + } + } + })) + this.pullingEvent(server, 'end', current.number); + } + this.logger && this.logger.info('Will pull blocks from the network in %s min %s sec', Math.floor(this.pullingActualIntervalDuration / 60), Math.floor(this.pullingActualIntervalDuration % 60)); + } catch(err) { + this.pullingEvent(server, 'error'); + this.logger && this.logger.warn(err.code || err.stack || err.message || err); + } + } + + private pullingEvent(server:Server, type:string, number:any = null) { + server.push({ + pulling: { + type: type, + data: number + } + }); + if (type !== 'end') { + this.PROCESS.push({ pulling: 'processing' }); + } else { + this.PROCESS.push({ pulling: 'finished' }); + } + } + + private isConnectionError(err:any) { + return err && ( + err.code == "E_DUNITER_PEER_CHANGED" + || err.code == "EINVAL" + || err.code == "ECONNREFUSED" + || err.code == "ETIMEDOUT" + || (err.httpCode !== undefined && err.httpCode !== 404)); + } +} + +function chooseXin (peers:PeerDTO[], max:number) { + const chosen = []; + const nbPeers = peers.length; + for (let i = 0; i < Math.min(nbPeers, max); i++) { + const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0); + chosen.push(peers[randIndex]); + peers.splice(randIndex, 1); + } + return chosen; +} + +const checkPeerValidity = async (server:Server, p:PeerDTO, node:any) => { + try { + let document = await node.getPeer(); + let thePeer = Peer.fromJSON(document); + let goodSignature = server.PeeringService.checkPeerSignature(thePeer); + if (!goodSignature) { + throw 'Signature from a peer must match'; + } + if (p.currency !== thePeer.currency) { + throw 'Currency has changed from ' + p.currency + ' to ' + thePeer.currency; + } + if (p.pubkey !== thePeer.pubkey) { + throw 'Public key of the peer has changed from ' + p.pubkey + ' to ' + thePeer.pubkey; + } + let sp1 = p.block.split('-'); + let sp2 = thePeer.block.split('-'); + let blockNumber1 = parseInt(sp1[0]); + let blockNumber2 = parseInt(sp2[0]); + if (blockNumber2 < blockNumber1) { + throw 'Signature date has changed from block ' + blockNumber1 + ' to older block ' + blockNumber2; + } + } catch (e) { + throw { code: "E_DUNITER_PEER_CHANGED" }; + } +} diff --git a/app/modules/crawler/lib/garbager.ts b/app/modules/crawler/lib/garbager.ts new file mode 100644 index 0000000000000000000000000000000000000000..75b92b8dc766c9ea2ad7d4c35575208ac858c566 --- /dev/null +++ b/app/modules/crawler/lib/garbager.ts @@ -0,0 +1,7 @@ +import {CrawlerConstants} from "./constants" +import {Server} from "../../../../server" + +export const cleanLongDownPeers = async (server:Server, now:number) => { + const first_down_limit = now - CrawlerConstants.PEER_LONG_DOWN * 1000; + await server.dal.peerDAL.query('DELETE FROM peer WHERE first_down < ' + first_down_limit) +} diff --git a/app/modules/crawler/lib/pulling.ts b/app/modules/crawler/lib/pulling.ts new file mode 100644 index 0000000000000000000000000000000000000000..9e784215cc08b5a60391f847c918ab8598606c17 --- /dev/null +++ b/app/modules/crawler/lib/pulling.ts @@ -0,0 +1,245 @@ +"use strict"; +import {BlockDTO} from "../../../lib/dto/BlockDTO" +import {DBBlock} from "../../../lib/db/DBBlock" +import {PeerDTO} from "../../../lib/dto/PeerDTO" +import {BranchingDTO, ConfDTO} from "../../../lib/dto/ConfDTO" + +const _ = require('underscore'); + +export abstract class PullingDao { + abstract applyBranch(blocks:BlockDTO[]): Promise<boolean> + abstract localCurrent(): Promise<DBBlock|null> + abstract remoteCurrent(source?:any): Promise<BlockDTO|null> + abstract remotePeers(source?:any): Promise<PeerDTO[]> + abstract getLocalBlock(number:number): Promise<DBBlock> + abstract getRemoteBlock(thePeer:PeerDTO, number:number): Promise<BlockDTO> + abstract applyMainBranch(block:BlockDTO): Promise<boolean> + abstract removeForks(): Promise<boolean> + abstract isMemberPeer(thePeer:PeerDTO): Promise<boolean> + abstract downloadBlocks(thePeer:PeerDTO, fromNumber:number, count?:number): Promise<BlockDTO[]> +} + +export abstract class AbstractDAO extends PullingDao { + + /** + * Sugar function. Apply a bunch of blocks instead of one. + * @param blocks + */ + async applyBranch (blocks:BlockDTO[]) { + for (const block of blocks) { + await this.applyMainBranch(block); + } + return true; + } + + /** + * Binary search algorithm to find the common root block between a local and a remote blockchain. + * @param fork An object containing a peer, its current block and top fork block + * @param forksize The maximum length we can look at to find common root block. + * @returns {*|Promise} + */ + async findCommonRoot(fork:any, forksize:number) { + let commonRoot = null; + let localCurrent = await this.localCurrent(); + + if (!localCurrent) { + throw Error('Local blockchain is empty, cannot find a common root') + } + + // We look between the top block that is known as fork ... + let topBlock = fork.block; + // ... and the bottom which is bounded by `forksize` + let bottomBlock = await this.getRemoteBlock(fork.peer, Math.max(0, localCurrent.number - forksize)); + let lookBlock = bottomBlock; + let localEquivalent = await this.getLocalBlock(bottomBlock.number); + let isCommonBlock = lookBlock.hash == localEquivalent.hash; + if (isCommonBlock) { + + // Then common root can be found between top and bottom. We process. + let position = -1, wrongRemotechain = false; + do { + + isCommonBlock = lookBlock.hash == localEquivalent.hash; + if (!isCommonBlock) { + + // Too high, look downward + topBlock = lookBlock; + position = middle(topBlock.number, bottomBlock.number); + } + else { + let upperBlock = await this.getRemoteBlock(fork.peer, lookBlock.number + 1); + let localUpper = await this.getLocalBlock(upperBlock.number); + let isCommonUpper = upperBlock.hash == localUpper.hash; + if (isCommonUpper) { + + // Too low, look upward + bottomBlock = lookBlock; + position = middle(topBlock.number, bottomBlock.number); + } + else { + + // Spotted! + commonRoot = lookBlock; + } + } + + let noSpace = topBlock.number == bottomBlock.number + 1; + if (!commonRoot && noSpace) { + // Remote node have inconsistency blockchain, stop search + wrongRemotechain = true; + } + + if (!wrongRemotechain) { + lookBlock = await this.getRemoteBlock(fork.peer, position); + localEquivalent = await this.getLocalBlock(position); + } + } while (!commonRoot && !wrongRemotechain); + } + // Otherwise common root is unreachable + + return commonRoot; + } + + static defaultLocalBlock() { + const localCurrent = new DBBlock() + localCurrent.number = -1 + return localCurrent + } + + /** + * Pull algorithm. Look at given peers' blockchain and try to pull blocks from it. + * May lead local blockchain to fork. + * @param conf The local node configuration + * @param dao An abstract layer to retrieve peers data (blocks). + * @param logger Logger of the main application. + */ + async pull(conf:BranchingDTO, logger:any) { + let localCurrent:DBBlock = await this.localCurrent() || AbstractDAO.defaultLocalBlock() + const forks:any = []; + + if (!localCurrent) { + localCurrent = new DBBlock() + localCurrent.number = -1 + } + + const applyCoroutine = async (peer:PeerDTO, blocks:BlockDTO[]) => { + if (blocks.length > 0) { + let isFork = localCurrent + && !(blocks[0].previousHash == localCurrent.hash + && blocks[0].number == localCurrent.number + 1); + if (!isFork) { + await this.applyBranch(blocks); + const newLocalCurrent = await this.localCurrent() + localCurrent = newLocalCurrent || AbstractDAO.defaultLocalBlock() + const appliedSuccessfully = localCurrent.number == blocks[blocks.length - 1].number + && localCurrent.hash == blocks[blocks.length - 1].hash; + return appliedSuccessfully; + } else { + let remoteCurrent = await this.remoteCurrent(peer); + forks.push({ + peer: peer, + block: blocks[0], + current: remoteCurrent + }); + return false; + } + } + return true; + } + + const downloadCoroutine = async (peer:any, number:number) => { + return await this.downloadBlocks(peer, number); + } + + const downloadChuncks = async (peer:PeerDTO) => { + let blocksToApply:BlockDTO[] = []; + const currentBlock = await this.localCurrent(); + let currentChunckStart; + if (currentBlock) { + currentChunckStart = currentBlock.number + 1; + } else { + currentChunckStart = 0; + } + let res:any = { applied: {}, downloaded: [] } + do { + let [ applied, downloaded ] = await Promise.all([ + applyCoroutine(peer, blocksToApply), + downloadCoroutine(peer, currentChunckStart) + ]) + res.applied = applied + res.downloaded = downloaded + blocksToApply = downloaded; + currentChunckStart += downloaded.length; + if (!applied) { + logger && logger.info("Blocks were not applied.") + } + } while (res.downloaded.length > 0 && res.applied); + } + + let peers = await this.remotePeers(); + // Try to get new legit blocks for local blockchain + const downloadChuncksTasks = []; + for (const peer of peers) { + downloadChuncksTasks.push(downloadChuncks(peer)); + } + await Promise.all(downloadChuncksTasks) + // Filter forks: do not include mirror peers (non-member peers) + let memberForks = []; + for (const fork of forks) { + let isMember = await this.isMemberPeer(fork.peer); + if (isMember) { + memberForks.push(fork); + } + } + memberForks = memberForks.sort((f1, f2) => { + let result = compare(f1, f2, "number"); + if (result == 0) { + result = compare(f1, f2, "medianTime"); + } + return result; + }); + let avgGenTime = conf.avgGenTime; + memberForks = _.filter(memberForks, (fork:any) => { + let blockDistance = (fork.current.number - localCurrent.number) * avgGenTime / 60; + let timeDistance = (fork.current.medianTime - localCurrent.medianTime) / 60; + logger && logger.debug('Fork of %s has blockDistance %s ; timeDistance %s ; required is >= %s for both values to try to follow the fork', fork.peer.pubkey.substr(0, 6), blockDistance.toFixed(2), timeDistance.toFixed(2), conf.swichOnTimeAheadBy); + return blockDistance >= conf.swichOnTimeAheadBy + && timeDistance >= conf.swichOnTimeAheadBy; + }); + // Remove any previous fork block + await this.removeForks(); + // Find the common root block + let j = 0, successFork = false; + while (!successFork && j < memberForks.length) { + let fork = memberForks[j]; + let commonRootBlock = await this.findCommonRoot(fork, conf.forksize); + if (commonRootBlock) { + let blocksToApply = await this.downloadBlocks(fork.peer, commonRootBlock.number + 1, conf.forksize); + successFork = await this.applyBranch(blocksToApply); + } else { + logger && logger.debug('No common root block with peer %s', fork.peer.pubkey.substr(0, 6)); + } + j++; + } + return this.localCurrent(); + } +} + +function compare(f1:any, f2:any, field:string) { + if (f1[field] > f2[field]) { + return 1; + } + if (f1[field] < f2[field]) { + return -1; + } + return 0; +} + +function middle(top:number, bottom:number) { + let difference = top - bottom; + if (difference % 2 == 1) { + // We look one step below to not forget any block + difference++; + } + return bottom + (difference / 2); +} diff --git a/app/modules/crawler/lib/req2fwd.ts b/app/modules/crawler/lib/req2fwd.ts new file mode 100644 index 0000000000000000000000000000000000000000..8592e56f2c81a64b9da0d731cfbc101d7d0c7228 --- /dev/null +++ b/app/modules/crawler/lib/req2fwd.ts @@ -0,0 +1,93 @@ +import {Contacter} from "./contacter" + +const common = require('duniter-common') + +export const req2fwd = async (requirements:any, toHost:string, toPort:number, logger:any) => { + const mss:any = {}; + const identities:any = {}; + const certs:any = {}; + const targetPeer = new Contacter(toHost, toPort, { timeout: 10000 }); + // Identities + for (const idty of requirements.identities) { + try { + const iid = [idty.pubkey, idty.uid, idty.meta.timestamp].join('-'); + if (!identities[iid]) { + logger.info('New identity %s', idty.uid); + identities[iid] = idty; + try { + const rawIdty = common.rawer.getOfficialIdentity({ + currency: 'g1', + issuer: idty.pubkey, + uid: idty.uid, + buid: idty.meta.timestamp, + sig: idty.sig + }); + await targetPeer.postIdentity(rawIdty); + logger.info('Success idty %s', idty.uid); + } catch (e) { + logger.warn('Rejected idty %s...', idty.uid, e); + } + } + for (const received of idty.pendingCerts) { + const cid = [received.from, iid].join('-'); + if (!certs[cid]) { + await new Promise((res) => setTimeout(res, 300)); + certs[cid] = received; + const rawCert = common.rawer.getOfficialCertification({ + currency: 'g1', + issuer: received.from, + idty_issuer: idty.pubkey, + idty_uid: idty.uid, + idty_buid: idty.meta.timestamp, + idty_sig: idty.sig, + buid: received.blockstamp, + sig: received.sig + }); + const rawCertNoSig = common.rawer.getOfficialCertification({ + currency: 'g1', + issuer: received.from, + idty_issuer: idty.pubkey, + idty_uid: idty.uid, + idty_buid: idty.meta.timestamp, + idty_sig: idty.sig, + buid: received.blockstamp + }); + try { + const chkSig = common.keyring.verify(rawCertNoSig, received.sig, received.from) + if (!chkSig) { + throw "Wrong signature for certification?!" + } + await targetPeer.postCert(rawCert); + logger.info('Success cert %s -> %s', received.from, idty.uid); + } catch (e) { + logger.warn('Rejected cert %s -> %s', received.from, idty.uid, received.blockstamp.substr(0,18), e); + } + } + } + for (const theMS of idty.pendingMemberships) { + // + Membership + const id = [idty.pubkey, idty.uid, theMS.blockstamp].join('-'); + if (!mss[id]) { + mss[id] = theMS + try { + const rawMS = common.rawer.getMembership({ + currency: 'g1', + issuer: idty.pubkey, + userid: idty.uid, + block: theMS.blockstamp, + membership: theMS.type, + certts: idty.meta.timestamp, + signature: theMS.sig + }); + await targetPeer.postRenew(rawMS); + logger.info('Success ms idty %s', idty.uid); + } catch (e) { + logger.warn('Rejected ms idty %s', idty.uid, e); + } + } + } + } catch (e) { + logger.warn(e); + } + } +} \ No newline at end of file diff --git a/app/modules/crawler/lib/sandbox.ts b/app/modules/crawler/lib/sandbox.ts new file mode 100644 index 0000000000000000000000000000000000000000..430edc92140fd721a55eecc06a8baea37c1380f1 --- /dev/null +++ b/app/modules/crawler/lib/sandbox.ts @@ -0,0 +1,167 @@ +"use strict"; +import {Contacter} from "./contacter" +import {Server} from "../../../../server" + +const rawer = require('duniter-common').rawer; +const parsers = require('duniter-common').parsers; + +export const pullSandbox = async (currency:string, fromHost:string, fromPort:number, toHost:string, toPort:number, logger:any) => { + const from = new Contacter(fromHost, fromPort); + const to = new Contacter(toHost, toPort); + + let res + try { + res = await from.getRequirementsPending(1) + } catch (e) { + // Silent error + logger && logger.trace('Sandbox pulling: could not fetch requirements on %s', [fromHost, fromPort].join(':')) + } + + if (res) { + const docs = getDocumentsTree(currency, res) + for (const identity of docs.identities) { + await submitIdentity(identity, to) + } + for (const certification of docs.certifications) { + await submitCertification(certification, to) + } + for (const membership of docs.memberships) { + await submitMembership(membership, to) + } + } +} + +export const pullSandboxToLocalServer = async (currency:string, fromHost:any, toServer:Server, logger:any, watcher:any = null, nbCertsMin = 1) => { + let res + try { + res = await fromHost.getRequirementsPending(nbCertsMin || 1) + } catch (e) { + watcher && watcher.writeStatus('Sandbox pulling: could not fetch requirements on %s', [fromHost.host, fromHost.port].join(':')) + } + + if (res) { + const docs = getDocumentsTree(currency, res) + + for (let i = 0; i < docs.identities.length; i++) { + const idty = docs.identities[i]; + watcher && watcher.writeStatus('Identity ' + (i+1) + '/' + docs.identities.length) + await submitIdentityToServer(idty, toServer, logger) + } + + for (let i = 0; i < docs.certifications.length; i++) { + const cert = docs.certifications[i]; + watcher && watcher.writeStatus('Certification ' + (i+1) + '/' + docs.certifications.length) + await submitCertificationToServer(cert, toServer, logger) + } + + for (let i = 0; i < docs.memberships.length; i++) { + const ms = docs.memberships[i]; + watcher && watcher.writeStatus('Membership ' + (i+1) + '/' + docs.memberships.length) + await submitMembershipToServer(ms, toServer, logger) + } + } +} + +function getDocumentsTree(currency:string, res:any) { + const documents:any = { + identities: [], + certifications: [], + memberships: [] + } + for(const idty of res.identities) { + const identity = rawer.getOfficialIdentity({ + currency, + uid: idty.uid, + pubkey: idty.pubkey, + buid: idty.meta.timestamp, + sig: idty.sig + }) + documents.identities.push(identity) + for (const cert of idty.pendingCerts) { + const certification = rawer.getOfficialCertification({ + currency, + idty_issuer: idty.pubkey, + idty_uid: idty.uid, + idty_buid: idty.meta.timestamp, + idty_sig: idty.sig, + issuer: cert.from, + buid: cert.blockstamp, + sig: cert.sig + }) + documents.certifications.push(certification) + } + for (const ms of idty.pendingMemberships) { + const membership = rawer.getMembership({ + currency, + userid: idty.uid, + issuer: idty.pubkey, + certts: idty.meta.timestamp, + membership: ms.type, + block: ms.blockstamp, + signature: ms.sig + }) + documents.memberships.push(membership) + } + } + return documents +} + +async function submitIdentity(idty:any, to:any, logger:any = null) { + try { + await to.postIdentity(idty) + logger && logger.trace('Sandbox pulling: success with identity \'%s\'', idty.uid) + } catch (e) { + // Silent error + } +} + +async function submitCertification(cert:any, to:any, logger:any = null) { + try { + await to.postCert(cert) + logger && logger.trace('Sandbox pulling: success with cert key %s => %s', cert.from.substr(0, 6), cert.idty_uid) + } catch (e) { + // Silent error + } +} + +async function submitMembership(ms:any, to:any, logger:any = null) { + try { + await to.postRenew(ms) + logger && logger.trace('Sandbox pulling: success with membership \'%s\'', ms.uid) + } catch (e) { + // Silent error + } +} + +async function submitIdentityToServer(idty:any, toServer:any, logger:any = null) { + try { + const obj = parsers.parseIdentity.syncWrite(idty) + obj.documentType = 'identity' + await toServer.singleWritePromise(obj) + logger && logger.trace('Sandbox pulling: success with identity \'%s\'', idty.uid) + } catch (e) { + // Silent error + } +} + +async function submitCertificationToServer(cert:any, toServer:any, logger:any = null) { + try { + const obj = parsers.parseCertification.syncWrite(cert) + obj.documentType = 'certification' + await toServer.singleWritePromise(obj) + logger && logger.trace('Sandbox pulling: success with cert key %s => %s', cert.from.substr(0, 6), cert.idty_uid) + } catch (e) { + // Silent error + } +} + +async function submitMembershipToServer(ms:any, toServer:any, logger:any = null) { + try { + const obj = parsers.parseMembership.syncWrite(ms) + obj.documentType = 'membership' + await toServer.singleWritePromise(obj) + logger && logger.trace('Sandbox pulling: success with membership \'%s\'', ms.uid) + } catch (e) { + // Silent error + } +} diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts new file mode 100644 index 0000000000000000000000000000000000000000..1c5b21a336a8432132e71d8455d146cb33b512d8 --- /dev/null +++ b/app/modules/crawler/lib/sync.ts @@ -0,0 +1,986 @@ +import {CrawlerConstants} from "./constants" +import * as stream from 'stream' +import {Server} from "../../../../server" +import {PeerDTO} from "../../../lib/dto/PeerDTO" +import {FileDAL} from "../../../lib/dal/fileDAL" +import { BlockDTO } from "../../../lib/dto/BlockDTO"; +import {connect} from "./connect" +import { Contacter } from "./contacter"; +import { pullSandboxToLocalServer } from "./sandbox"; +import { tx_cleaner } from "./tx_cleaner"; +import { AbstractDAO } from "./pulling"; +import { DBBlock } from "../../../lib/db/DBBlock"; +import { BlockchainService } from "../../../service/BlockchainService"; + +const util = require('util'); +const _ = require('underscore'); +const moment = require('moment'); +const multimeter = require('multimeter'); +const makeQuerablePromise = require('querablep'); +const common = require('duniter-common'); +const Peer = common.document.Peer; +const dos2unix = common.dos2unix; +const hashf = common.hashf; +const rawer = common.rawer; + +const CONST_BLOCKS_CHUNK = 250; +const EVAL_REMAINING_INTERVAL = 1000; +const INITIAL_DOWNLOAD_SLOTS = 1; + +export class Synchroniser extends stream.Duplex { + + private watcher:Watcher + private speed = 0 + private blocksApplied = 0 + private contacterOptions:any + + constructor( + private server:Server, + private host:string, + private port:number, + interactive = false, + private slowOption = false) { + + super({ objectMode: true }) + + // Wrapper to also push event stream + this.watcher = new EventWatcher( + interactive ? new MultimeterWatcher() : new LoggerWatcher(this.logger), + (pct:number, innerWatcher:Watcher) => { + if (pct !== undefined && innerWatcher.downloadPercent() < pct) { + this.push({ download: pct }); + } + }, + (pct:number, innerWatcher:Watcher) => { + if (pct !== undefined && innerWatcher.appliedPercent() < pct) { + this.push({ applied: pct }); + } + } + ) + + if (interactive) { + this.logger.mute(); + } + + this.contacterOptions = { + timeout: CrawlerConstants.SYNC_LONG_TIMEOUT + } + } + + get conf() { + return this.server.conf + } + + get logger() { + return this.server.logger + } + + get PeeringService() { + return this.server.PeeringService + } + + get BlockchainService() { + return this.server.BlockchainService + } + + get dal() { + return this.server.dal + } + + // Unused, but made mandatory by Duplex interface + _read() {} + _write() {} + + + private async logRemaining(to:number) { + const lCurrent = await this.dal.getCurrentBlockOrNull(); + const localNumber = lCurrent ? lCurrent.number : -1; + + if (to > 1 && this.speed > 0) { + const remain = (to - (localNumber + 1 + this.blocksApplied)); + const secondsLeft = remain / this.speed; + const momDuration = moment.duration(secondsLeft * 1000); + this.watcher.writeStatus('Remaining ' + momDuration.humanize() + ''); + } + } + + async test() { + const peering = await Contacter.fetchPeer(this.host, this.port, this.contacterOptions); + const node = await connect(Peer.fromJSON(peering)); + return node.getCurrent(); + } + + async sync(to:number, chunkLen:number, askedCautious = false, nopeers = false, noShufflePeers = false) { + + try { + + const peering = await Contacter.fetchPeer(this.host, this.port, this.contacterOptions); + + let peer = Peer.fromJSON(peering); + this.logger.info("Try with %s %s", peer.getURL(), peer.pubkey.substr(0, 6)); + let node:any = await connect(peer); + node.pubkey = peer.pubkey; + this.logger.info('Sync started.'); + + const fullSync = !to; + + //============ + // Blockchain headers + //============ + this.logger.info('Getting remote blockchain info...'); + this.watcher.writeStatus('Connecting to ' + this.host + '...'); + const lCurrent:DBBlock = await this.dal.getCurrentBlockOrNull(); + const localNumber = lCurrent ? lCurrent.number : -1; + let rCurrent:BlockDTO + if (isNaN(to)) { + rCurrent = await node.getCurrent(); + } else { + rCurrent = await node.getBlock(to); + } + to = rCurrent.number || 0 + + //======= + // Peers (just for P2P download) + //======= + let peers:PeerDTO[] = []; + if (!nopeers && (to - localNumber > 1000)) { // P2P download if more than 1000 blocs + this.watcher.writeStatus('Peers...'); + const merkle = await this.dal.merkleForPeers(); + const getPeers = node.getPeers.bind(node); + const json2 = await getPeers({}); + const rm = new NodesMerkle(json2); + if(rm.root() != merkle.root()){ + const leavesToAdd:string[] = []; + const json = await getPeers({ leaves: true }); + _(json.leaves).forEach((leaf:string) => { + if(merkle.leaves().indexOf(leaf) == -1){ + leavesToAdd.push(leaf); + } + }); + peers = await Promise.all(leavesToAdd.map(async (leaf) => { + try { + const json3 = await getPeers({ "leaf": leaf }); + const jsonEntry = json3.leaf.value; + const endpoint = jsonEntry.endpoints[0]; + this.watcher.writeStatus('Peer ' + endpoint); + return jsonEntry; + } catch (e) { + this.logger.warn("Could not get peer of leaf %s, continue...", leaf); + return null; + } + })) + } + else { + this.watcher.writeStatus('Peers already known'); + } + } + + if (!peers.length) { + peers.push(peer); + } + peers = peers.filter((p) => p); + + //============ + // Blockchain + //============ + this.logger.info('Downloading Blockchain...'); + + // We use cautious mode if it is asked, or not particulary asked but blockchain has been started + const cautious = (askedCautious === true || localNumber >= 0); + const shuffledPeers = noShufflePeers ? peers : _.shuffle(peers); + const downloader = new P2PDownloader(localNumber, to, rCurrent.hash, shuffledPeers, this.watcher, this.logger, hashf, rawer, this.dal, this.slowOption); + + downloader.start(); + + let lastPullBlock:BlockDTO|null = null; + + let dao = new (class extends AbstractDAO { + + constructor( + private server:Server, + private watcher:Watcher, + private dal:FileDAL, + private BlockchainService:BlockchainService) { + super() + } + + async applyBranch(blocks:BlockDTO[]) { + blocks = _.filter(blocks, (b:BlockDTO) => b.number <= to); + if (cautious) { + for (const block of blocks) { + if (block.number == 0) { + await this.BlockchainService.saveParametersForRootBlock(block); + } + await dao.applyMainBranch(block); + } + } else { + await this.BlockchainService.fastBlockInsertions(blocks, to) + } + lastPullBlock = blocks[blocks.length - 1]; + this.watcher.appliedPercent(Math.floor(blocks[blocks.length - 1].number / to * 100)); + return true; + } + + // Get the local blockchain current block + async localCurrent(): Promise<DBBlock | null> { + if (cautious) { + return await this.dal.getCurrentBlockOrNull(); + } else { + if (lCurrent && !lastPullBlock) { + lastPullBlock = lCurrent.toBlockDTO() + } else if (!lastPullBlock) { + return null + } + return DBBlock.fromBlockDTO(lastPullBlock) + } + } + // Get the remote blockchain (bc) current block + async remoteCurrent(source?: any): Promise<BlockDTO | null> { + return Promise.resolve(rCurrent) + } + // Get the remote peers to be pulled + async remotePeers(source?: any): Promise<PeerDTO[]> { + return [node] + } + async getLocalBlock(number: number): Promise<DBBlock> { + return this.dal.getBlock(number) + } + async getRemoteBlock(thePeer: PeerDTO, number: number): Promise<BlockDTO> { + let block = null; + try { + block = await node.getBlock(number); + tx_cleaner(block.transactions); + } catch (e) { + if (e.httpCode != 404) { + throw e; + } + } + return block; + } + async applyMainBranch(block: BlockDTO): Promise<boolean> { + const addedBlock = await this.BlockchainService.submitBlock(block, true, CrawlerConstants.FORK_ALLOWED); + this.server.streamPush(addedBlock); + this.watcher.appliedPercent(Math.floor(block.number / to * 100)); + return true + } + // Eventually remove forks later on + async removeForks(): Promise<boolean> { + return true + } + // Tells wether given peer is a member peer + async isMemberPeer(thePeer: PeerDTO): Promise<boolean> { + let idty = await this.dal.getWrittenIdtyByPubkey(thePeer.pubkey); + return (idty && idty.member) || false; + } + downloadBlocks(thePeer: PeerDTO, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> { + // Note: we don't care about the particular peer asked by the method. We use the network instead. + const numberOffseted = fromNumber - (localNumber + 1); + const targetChunk = Math.floor(numberOffseted / CONST_BLOCKS_CHUNK); + // Return the download promise! Simple. + return downloader.getChunk(targetChunk); + } + + })(this.server, this.watcher, this.dal, this.BlockchainService) + + const logInterval = setInterval(() => this.logRemaining(to), EVAL_REMAINING_INTERVAL); + await dao.pull(this.conf, this.logger) + + // Finished blocks + this.watcher.downloadPercent(100.0); + this.watcher.appliedPercent(100.0); + + if (logInterval) { + clearInterval(logInterval); + } + + // Save currency parameters given by root block + const rootBlock = await this.server.dal.getBlock(0); + await this.BlockchainService.saveParametersForRootBlock(rootBlock); + this.server.dal.blockDAL.cleanCache(); + + //======= + // Sandboxes + //======= + this.watcher.writeStatus('Synchronizing the sandboxes...'); + await pullSandboxToLocalServer(this.conf.currency, node, this.server, this.server.logger, this.watcher) + + //======= + // Peers + //======= + await this.syncPeers(nopeers, fullSync, this.host, this.port, to) + + this.watcher.end(); + this.push({ sync: true }); + this.logger.info('Sync finished.'); + } catch (err) { + this.push({ sync: false, msg: err }); + err && this.watcher.writeStatus(err.message || (err.uerr && err.uerr.message) || String(err)); + this.watcher.end(); + throw err; + } + } + + async syncPeers(nopeers:boolean, fullSync:boolean, host:string, port:number, to?:number) { + if (!nopeers && fullSync) { + + const peering = await Contacter.fetchPeer(host, port, this.contacterOptions); + + let peer = Peer.fromJSON(peering); + this.logger.info("Try with %s %s", peer.getURL(), peer.pubkey.substr(0, 6)); + let node:any = await connect(peer); + node.pubkey = peer.pubkey; + this.logger.info('Sync started.'); + + this.watcher.writeStatus('Peers...'); + await this.syncPeer(node); + const merkle = await this.dal.merkleForPeers(); + const getPeers = node.getPeers.bind(node); + const json2 = await getPeers({}); + const rm = new NodesMerkle(json2); + if(rm.root() != merkle.root()){ + const leavesToAdd:string[] = []; + const json = await getPeers({ leaves: true }); + _(json.leaves).forEach((leaf:string) => { + if(merkle.leaves().indexOf(leaf) == -1){ + leavesToAdd.push(leaf); + } + }); + for (const leaf of leavesToAdd) { + try { + const json3 = await getPeers({ "leaf": leaf }); + const jsonEntry = json3.leaf.value; + const sign = json3.leaf.value.signature; + const entry:any = {}; + ["version", "currency", "pubkey", "endpoints", "block"].forEach((key) => { + entry[key] = jsonEntry[key]; + }); + entry.signature = sign; + this.watcher.writeStatus('Peer ' + entry.pubkey); + await this.PeeringService.submitP(entry, false, to === undefined); + } catch (e) { + this.logger.warn(e); + } + } + } + else { + this.watcher.writeStatus('Peers already known'); + } + } + } + + //============ + // Peer + //============ + private async syncPeer (node:any) { + + // Global sync vars + const remotePeer = Peer.fromJSON({}); + let remoteJsonPeer:any = {}; + const json = await node.getPeer(); + remotePeer.version = json.version + remotePeer.currency = json.currency + remotePeer.pub = json.pub + remotePeer.endpoints = json.endpoints + remotePeer.blockstamp = json.block + remotePeer.signature = json.signature + const entry = remotePeer.getRawUnsigned(); + const signature = dos2unix(remotePeer.signature); + // Parameters + if(!(entry && signature)){ + throw 'Requires a peering entry + signature'; + } + + remoteJsonPeer = json; + remoteJsonPeer.pubkey = json.pubkey; + let signatureOK = this.PeeringService.checkPeerSignature(remoteJsonPeer); + if (!signatureOK) { + this.watcher.writeStatus('Wrong signature for peer #' + remoteJsonPeer.pubkey); + } + try { + await this.PeeringService.submitP(remoteJsonPeer); + } catch (err) { + if (err.indexOf !== undefined && err.indexOf(CrawlerConstants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.message) !== -1 && err != CrawlerConstants.ERROR.PEER.UNKNOWN_REFERENCE_BLOCK) { + throw err; + } + } + } +} + +class NodesMerkle { + + private depth:number + private nodesCount:number + private leavesCount:number + private merkleRoot:string + + constructor(json:any) { + this.depth = json.depth + this.nodesCount = json.nodesCount + this.leavesCount = json.leavesCount + this.merkleRoot = json.root; + } + + // var i = 0; + // this.levels = []; + // while(json && json.levels[i]){ + // this.levels.push(json.levels[i]); + // i++; + // } + + root() { + return this.merkleRoot + } +} + +interface Watcher { + writeStatus(str: string): void + downloadPercent(pct?: number): number + appliedPercent(pct?: number): number + end(): void +} + +class EventWatcher implements Watcher { + + constructor( + private innerWatcher:Watcher, + private beforeDownloadPercentHook: (pct:number, innerWatcher:Watcher) => void, + private beforeAppliedPercentHook: (pct:number, innerWatcher:Watcher) => void) { + } + + writeStatus(str: string): void { + this.innerWatcher.writeStatus(str) + } + + downloadPercent(pct?: number): number { + this.beforeDownloadPercentHook(pct || 0, this.innerWatcher) + return this.innerWatcher.downloadPercent(pct) + } + + appliedPercent(pct?: number): number { + this.beforeAppliedPercentHook(pct || 0, this.innerWatcher) + return this.innerWatcher.appliedPercent(pct) + } + + end(): void { + this.innerWatcher.end() + } +} + +class MultimeterWatcher implements Watcher { + + private xPos:number + private yPos:number + private multi:any + private charm:any + private appliedBar:any + private downloadBar:any + private writtens:string[] = [] + + constructor() { + this.multi = multimeter(process); + this.charm = this.multi.charm; + this.charm.on('^C', process.exit); + this.charm.reset(); + + this.multi.write('Progress:\n\n'); + + this.multi.write("Download: \n"); + this.downloadBar = this.multi("Download: \n".length, 3, { + width : 20, + solid : { + text : '|', + foreground : 'white', + background : 'blue' + }, + empty : { text : ' ' } + }); + + this.multi.write("Apply: \n"); + this.appliedBar = this.multi("Apply: \n".length, 4, { + width : 20, + solid : { + text : '|', + foreground : 'white', + background : 'blue' + }, + empty : { text : ' ' } + }); + + this.multi.write('\nStatus: '); + + this.charm.position( (x:number, y:number) => { + this.xPos = x; + this.yPos = y; + }); + + this.writtens = []; + + this.downloadBar.percent(0); + this.appliedBar.percent(0); + } + + writeStatus(str:string) { + this.writtens.push(str); + //require('fs').writeFileSync('writtens.json', JSON.stringify(writtens)); + this.charm + .position(this.xPos, this.yPos) + .erase('end') + .write(str) + ; + }; + + downloadPercent(pct:number) { + return this.downloadBar.percent(pct) + } + + appliedPercent(pct:number) { + return this.appliedBar.percent(pct) + } + + end() { + this.multi.write('\nAll done.\n'); + this.multi.destroy(); + } +} + +class LoggerWatcher implements Watcher { + + private downPct = 0 + private appliedPct = 0 + private lastMsg = "" + + constructor(private logger:any) { + } + + showProgress() { + return this.logger.info('Downloaded %s%, Applied %s%', this.downPct, this.appliedPct) + } + + writeStatus(str:string) { + if (str != this.lastMsg) { + this.lastMsg = str; + this.logger.info(str); + } + } + + downloadPercent(pct:number) { + if (pct !== undefined) { + let changed = pct > this.downPct; + this.downPct = pct; + if (changed) this.showProgress(); + } + return this.downPct; + } + + appliedPercent(pct:number) { + if (pct !== undefined) { + let changed = pct > this.appliedPct; + this.appliedPct = pct; + if (changed) this.showProgress(); + } + return this.appliedPct; + } + + end() { + } + +} + +class P2PDownloader { + + private PARALLEL_PER_CHUNK = 1; + private MAX_DELAY_PER_DOWNLOAD = 15000; + private NO_NODES_AVAILABLE = "No node available for download"; + private TOO_LONG_TIME_DOWNLOAD:string + private nbBlocksToDownload:number + private numberOfChunksToDownload:number + private downloadSlots:number + private chunks:any + private processing:any + private handler:any + private resultsDeferers:any + private resultsData:Promise<BlockDTO[]>[] + private nodes:any = {} + private nbDownloadsTried = 0 + private nbDownloading = 0 + private lastAvgDelay:number + private aSlotWasAdded = false + private slots:number[] = []; + private downloads:any = {}; + private startResolver:any + private downloadStarter:Promise<any> + + constructor( + private localNumber:number, + private to:number, + private toHash:string, + private peers:PeerDTO[], + private watcher:Watcher, + private logger:any, + private hashf:any, + private rawer:any, + private dal:FileDAL, + private slowOption:any) { + + this.TOO_LONG_TIME_DOWNLOAD = "No answer after " + this.MAX_DELAY_PER_DOWNLOAD + "ms, will retry download later."; + this.nbBlocksToDownload = Math.max(0, to - localNumber); + this.numberOfChunksToDownload = Math.ceil(this.nbBlocksToDownload / CONST_BLOCKS_CHUNK); + this.chunks = Array.from({ length: this.numberOfChunksToDownload }).map(() => null); + this.processing = Array.from({ length: this.numberOfChunksToDownload }).map(() => false); + this.handler = Array.from({ length: this.numberOfChunksToDownload }).map(() => null); + this.resultsDeferers = Array.from({ length: this.numberOfChunksToDownload }).map(() => null); + this.resultsData = Array.from({ length: this.numberOfChunksToDownload }).map((unused, index) => new Promise((resolve, reject) => { + this.resultsDeferers[index] = { resolve, reject }; + })); + + // Create slots of download, in a ready stage + this.downloadSlots = slowOption ? 1 : Math.min(INITIAL_DOWNLOAD_SLOTS, peers.length); + this.lastAvgDelay = this.MAX_DELAY_PER_DOWNLOAD; + + /** + * Triggers for starting the download. + */ + this.downloadStarter = new Promise((resolve) => this.startResolver = resolve); + + /** + * Download worker + * @type {*|Promise} When finished. + */ + (async () => { + try { + await this.downloadStarter; + let doneCount = 0, resolvedCount = 0; + while (resolvedCount < this.chunks.length) { + doneCount = 0; + resolvedCount = 0; + // Add as much possible downloads as possible, and count the already done ones + for (let i = this.chunks.length - 1; i >= 0; i--) { + if (this.chunks[i] === null && !this.processing[i] && this.slots.indexOf(i) === -1 && this.slots.length < this.downloadSlots) { + this.slots.push(i); + this.processing[i] = true; + this.downloads[i] = makeQuerablePromise(this.downloadChunk(i)); // Starts a new download + } else if (this.downloads[i] && this.downloads[i].isFulfilled() && this.processing[i]) { + doneCount++; + } + // We count the number of perfectly downloaded & validated chunks + if (this.chunks[i]) { + resolvedCount++; + } + } + watcher.downloadPercent(Math.round(doneCount / this.numberOfChunksToDownload * 100)); + let races = this.slots.map((i) => this.downloads[i]); + if (races.length) { + try { + await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, races); + } catch (e) { + this.logger.warn(e); + } + for (let i = 0; i < this.slots.length; i++) { + // We must know the index of what resolved/rejected to free the slot + const doneIndex = this.slots.reduce((found:any, realIndex:number, index:number) => { + if (found !== null) return found; + if (this.downloads[realIndex].isFulfilled()) return index; + return null; + }, null); + if (doneIndex !== null) { + const realIndex = this.slots[doneIndex]; + if (this.downloads[realIndex].isResolved()) { + // IIFE to be safe about `realIndex` + (async () => { + const blocks = await this.downloads[realIndex]; + if (realIndex < this.chunks.length - 1) { + // We must wait for NEXT blocks to be STRONGLY validated before going any further, otherwise we + // could be on the wrong chain + await this.getChunk(realIndex + 1); + } + const chainsWell = await this.chainsCorrectly(blocks, realIndex); + if (chainsWell) { + // Chunk is COMPLETE + this.logger.warn("Chunk #%s is COMPLETE from %s", realIndex, [this.handler[realIndex].host, this.handler[realIndex].port].join(':')); + this.chunks[realIndex] = blocks; + this.resultsDeferers[realIndex].resolve(this.chunks[realIndex]); + } else { + this.logger.warn("Chunk #%s DOES NOT CHAIN CORRECTLY from %s", realIndex, [this.handler[realIndex].host, this.handler[realIndex].port].join(':')); + // Penality on this node to avoid its usage + if (this.handler[realIndex].resetFunction) { + await this.handler[realIndex].resetFunction(); + } + if (this.handler[realIndex].tta !== undefined) { + this.handler[realIndex].tta += this.MAX_DELAY_PER_DOWNLOAD; + } + // Need a retry + this.processing[realIndex] = false; + } + })() + } else { + this.processing[realIndex] = false; // Need a retry + } + this.slots.splice(doneIndex, 1); + } + } + } + // Wait a bit + await new Promise((resolve, reject) => setTimeout(resolve, 10)); + } + } catch (e) { + this.logger.error('Fatal error in the downloader:'); + this.logger.error(e); + } + })() + } + + /** + * Get a list of P2P nodes to use for download. + * If a node is not yet correctly initialized (we can test a node before considering it good for downloading), then + * this method would not return it. + */ + private async getP2Pcandidates(): Promise<any[]> { + let promises = this.peers.reduce((chosens:any, other:any, index:number) => { + if (!this.nodes[index]) { + // Create the node + let p = Peer.fromJSON(this.peers[index]); + this.nodes[index] = makeQuerablePromise((async () => { + // We wait for the download process to be triggered + // await downloadStarter; + // if (nodes[index - 1]) { + // try { await nodes[index - 1]; } catch (e) {} + // } + const node:any = await connect(p) + // We initialize nodes with the near worth possible notation + node.tta = 1; + node.nbSuccess = 0; + return node; + })()) + chosens.push(this.nodes[index]); + } else { + chosens.push(this.nodes[index]); + } + // Continue + return chosens; + }, []); + let candidates = await Promise.all(promises) + candidates.forEach((c:any) => { + c.tta = c.tta || 0; // By default we say a node is super slow to answer + c.ttas = c.ttas || []; // Memorize the answer delays + }); + if (candidates.length === 0) { + throw this.NO_NODES_AVAILABLE; + } + // We remove the nodes impossible to reach (timeout) + let withGoodDelays = _.filter(candidates, (c:any) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD); + if (withGoodDelays.length === 0) { + // No node can be reached, we can try to lower the number of nodes on which we download + this.downloadSlots = Math.floor(this.downloadSlots / 2); + // We reinitialize the nodes + this.nodes = {}; + // And try it all again + return this.getP2Pcandidates(); + } + const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, withGoodDelays.length); + withGoodDelays = _.sortBy(withGoodDelays, (c:any) => c.tta); + withGoodDelays = withGoodDelays.slice(0, parallelMax); + // We temporarily augment the tta to avoid asking several times to the same node in parallel + withGoodDelays.forEach((c:any) => c.tta = this.MAX_DELAY_PER_DOWNLOAD); + return withGoodDelays; + } + + /** + * Download a chunk of blocks using P2P network through BMA API. + * @param from The starting block to download + * @param count The number of blocks to download. + * @param chunkIndex The # of the chunk in local algorithm (logging purposes only) + */ + private async p2pDownload(from:number, count:number, chunkIndex:number) { + let candidates = await this.getP2Pcandidates(); + // Book the nodes + return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:any) => { + try { + const start = Date.now(); + this.handler[chunkIndex] = node; + node.downloading = true; + this.nbDownloading++; + this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + [node.host, node.port].join(':')); + let blocks = await node.getBlocks(count, from); + node.ttas.push(Date.now() - start); + // Only keep a flow of 5 ttas for the node + if (node.ttas.length > 5) node.ttas.shift(); + // Average time to answer + node.tta = Math.round(node.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / node.ttas.length); + this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + [node.host, node.port].join(':')); + node.nbSuccess++; + + // Opening/Closing slots depending on the Interne connection + if (this.slots.length == this.downloadSlots) { + const peers = await Promise.all(_.values(this.nodes)) + const downloading = _.filter(peers, (p:any) => p.downloading && p.ttas.length); + const currentAvgDelay = downloading.reduce((sum:number, c:any) => { + const tta = Math.round(c.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / c.ttas.length); + return sum + tta; + }, 0) / downloading.length; + // Opens or close downloading slots + if (!this.slowOption) { + // Check the impact of an added node (not first time) + if (!this.aSlotWasAdded) { + // We try to add a node + const newValue = Math.min(peers.length, this.downloadSlots + 1); + if (newValue !== this.downloadSlots) { + this.downloadSlots = newValue; + this.aSlotWasAdded = true; + this.logger.info('AUGMENTED DOWNLOAD SLOTS! Now has %s slots', this.downloadSlots); + } + } else { + this.aSlotWasAdded = false; + const decelerationPercent = currentAvgDelay / this.lastAvgDelay - 1; + const addedNodePercent = 1 / this.nbDownloading; + this.logger.info('Deceleration = %s (%s/%s), AddedNodePercent = %s', decelerationPercent, currentAvgDelay, this.lastAvgDelay, addedNodePercent); + if (decelerationPercent > addedNodePercent) { + this.downloadSlots = Math.max(1, this.downloadSlots - 1); // We reduce the number of slots, but we keep at least 1 slot + this.logger.info('REDUCED DOWNLOAD SLOT! Now has %s slots', this.downloadSlots); + } + } + } + this.lastAvgDelay = currentAvgDelay; + } + + this.nbDownloadsTried++; + this.nbDownloading--; + node.downloading = false; + + return blocks; + } catch (e) { + this.nbDownloading--; + node.downloading = false; + this.nbDownloadsTried++; + node.ttas.push(this.MAX_DELAY_PER_DOWNLOAD + 1); // No more ask on this node + // Average time to answer + node.tta = Math.round(node.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / node.ttas.length); + throw e; + } + })) + } + + /** + * Function for downloading a chunk by its number. + * @param index Number of the chunk. + */ + private async downloadChunk(index:number): Promise<BlockDTO[]> { + // The algorithm to download a chunk + const from = this.localNumber + 1 + index * CONST_BLOCKS_CHUNK; + let count = CONST_BLOCKS_CHUNK; + if (index == this.numberOfChunksToDownload - 1) { + count = this.nbBlocksToDownload % CONST_BLOCKS_CHUNK || CONST_BLOCKS_CHUNK; + } + try { + const fileName = "blockchain/chunk_" + index + "-" + CONST_BLOCKS_CHUNK + ".json"; + if (this.localNumber <= 0 && (await this.dal.confDAL.coreFS.exists(fileName))) { + this.handler[index] = { + host: 'filesystem', + port: 'blockchain', + resetFunction: () => this.dal.confDAL.coreFS.remove(fileName) + }; + return (await this.dal.confDAL.coreFS.readJSON(fileName)).blocks; + } else { + const chunk:any = await this.p2pDownload(from, count, index); + // Store the file to avoid re-downloading + if (this.localNumber <= 0 && chunk.length === CONST_BLOCKS_CHUNK) { + await this.dal.confDAL.coreFS.makeTree('blockchain'); + await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk }); + } + return chunk; + } + } catch (e) { + this.logger.error(e); + return this.downloadChunk(index); + } + } + + /** + * Utility function this starts a race between promises but cancels it if no answer is found before `timeout` + * @param timeout + * @param races + * @returns {Promise} + */ + private raceOrCancelIfTimeout(timeout:number, races:any[]) { + return Promise.race([ + // Process the race, but cancel it if we don't get an anwser quickly enough + new Promise((resolve, reject) => { + setTimeout(() => { + reject(this.TOO_LONG_TIME_DOWNLOAD); + }, timeout) + }) + ].concat(races)); + }; + + private async chainsCorrectly(blocks:BlockDTO[], index:number) { + + if (!blocks.length) { + this.logger.error('No block was downloaded'); + return false; + } + + for (let i = blocks.length - 1; i > 0; i--) { + if (blocks[i].number !== blocks[i - 1].number + 1 || blocks[i].previousHash !== blocks[i - 1].hash) { + this.logger.error("Blocks do not chaing correctly", blocks[i].number); + return false; + } + if (blocks[i].version != blocks[i - 1].version && blocks[i].version != blocks[i - 1].version + 1) { + this.logger.error("Version cannot be downgraded", blocks[i].number); + return false; + } + } + + // Check hashes + for (let i = 0; i < blocks.length; i++) { + // Note: the hash, in Duniter, is made only on the **signing part** of the block: InnerHash + Nonce + if (blocks[i].version >= 6) { + for (const tx of blocks[i].transactions) { + tx.version = CrawlerConstants.TRANSACTION_VERSION; + } + } + if (blocks[i].inner_hash !== hashf(rawer.getBlockInnerPart(blocks[i])).toUpperCase()) { + this.logger.error("Inner hash of block#%s from %s does not match", blocks[i].number); + return false; + } + if (blocks[i].hash !== hashf(rawer.getBlockInnerHashAndNonceWithSignature(blocks[i])).toUpperCase()) { + this.logger.error("Hash of block#%s from %s does not match", blocks[i].number); + return false; + } + } + + const lastBlockOfChunk = blocks[blocks.length - 1]; + if ((lastBlockOfChunk.number == this.to || blocks.length < CONST_BLOCKS_CHUNK) && lastBlockOfChunk.hash != this.toHash) { + // Top chunk + this.logger.error('Top block is not on the right chain'); + return false; + } else { + // Chaining between downloads + const previousChunk = await this.getChunk(index + 1); + const blockN = blocks[blocks.length - 1]; // The block n + const blockNp1 = previousChunk[0]; // The block n + 1 + if (blockN && blockNp1 && (blockN.number + 1 !== blockNp1.number || blockN.hash != blockNp1.previousHash)) { + this.logger.error('Chunk is not referenced by the upper one'); + return false; + } + } + return true; + } + + /** + * PUBLIC API + */ + + /*** + * Triggers the downloading + */ + start() { + return this.startResolver() + } + + /*** + * Promises a chunk to be downloaded and returned + * @param index The number of the chunk to download & return + */ + getChunk(index:number) { + return this.resultsData[index] || Promise.resolve([]) + } +} diff --git a/app/modules/crawler/lib/tx_cleaner.ts b/app/modules/crawler/lib/tx_cleaner.ts new file mode 100644 index 0000000000000000000000000000000000000000..67a72e9572c9752848b4e6a91190e0ba914f18cb --- /dev/null +++ b/app/modules/crawler/lib/tx_cleaner.ts @@ -0,0 +1,9 @@ +export const tx_cleaner = (txs:any) => + + // Remove unused signatories - see https://github.com/duniter/duniter/issues/494 + txs.forEach((tx:any) => { + if (tx.signatories) { + delete tx.signatories + } + return tx + }) diff --git a/app/service/PeeringService.ts b/app/service/PeeringService.ts index 9355de98445f82b4370f4772111061bbce7fe40e..52ed25c9941e90eaf637a4aa4d9073a0d0e896b9 100644 --- a/app/service/PeeringService.ts +++ b/app/service/PeeringService.ts @@ -194,7 +194,7 @@ export class PeeringService { let endpoint = await this.server.getMainEndpoint(theConf); let otherPotentialEndpoints = this.getOtherEndpoints(p1.endpoints, theConf); logger.info('Sibling endpoints:', otherPotentialEndpoints); - let reals = await otherPotentialEndpoints.map(async (theEndpoint:string) => { + let reals = await Promise.all(otherPotentialEndpoints.map(async (theEndpoint:string) => { let real = true; let remote = PeerDTO.endpoint2host(theEndpoint) try { @@ -214,7 +214,7 @@ export class PeeringService { real = false; } return real; - }) + })) let toConserve = otherPotentialEndpoints.filter((ep, i) => reals[i]); if (!currency || endpoint == 'BASIC_MERKLED_API') { logger.error('It seems there is an issue with your configuration.'); diff --git a/index.ts b/index.ts index 8946c887aef26f8a2aea03c994ac9bd3f2e29122..25307e568f08aad5afb3e9fec73afdc655f710e9 100644 --- a/index.ts +++ b/index.ts @@ -4,6 +4,7 @@ import {Server} from "./server" import {ConfDTO} from "./app/lib/dto/ConfDTO" import {ProverDependency} from "./app/modules/prover/index" import {KeypairDependency} from "./app/modules/keypair/index" +import {CrawlerDependency} from "./app/modules/crawler/index" const path = require('path'); const _ = require('underscore'); @@ -100,7 +101,8 @@ const DEFAULT_DEPENDENCIES = MINIMAL_DEPENDENCIES.concat([ { name: 'duniter-router', required: routerDependency }, { name: 'duniter-plugin', required: pluginDependency }, { name: 'duniter-prover', required: ProverDependency }, - { name: 'duniter-keypair', required: KeypairDependency } + { name: 'duniter-keypair', required: KeypairDependency }, + { name: 'duniter-crawler', required: CrawlerDependency } ]); const PRODUCTION_DEPENDENCIES = DEFAULT_DEPENDENCIES.concat([ @@ -140,10 +142,12 @@ module.exports.statics = { } }; -interface StreamingDuniterModule extends stream.Readable { +export interface DuniterService { startService: () => Promise<any> stopService: () => Promise<any> } +export interface ReadableDuniterService extends DuniterService, stream.Readable {} +export interface TransformableDuniterService extends DuniterService, stream.Transform {} class Stack { @@ -158,10 +162,10 @@ class Stack { private wizardTasks:any private definitions:any[] = [] private streams: { - input: stream.Readable[] - process: stream.Transform[] - output: stream.Transform[] - neutral: stream.Transform[] + input: ReadableDuniterService[] + process: TransformableDuniterService[] + output: TransformableDuniterService[] + neutral: TransformableDuniterService[] } = { input: [], process: [], @@ -390,14 +394,14 @@ class Stack { // Start services and streaming between them async () => { const modules = this.streams.input.concat(this.streams.process).concat(this.streams.output).concat(this.streams.neutral); - await Promise.all(modules.map((module:StreamingDuniterModule) => module.startService())) + await Promise.all(modules.map((module:DuniterService) => module.startService())) }, // Stop services and streaming between them async () => { const modules = this.streams.input.concat(this.streams.process).concat(this.streams.output).concat(this.streams.neutral); // Any streaming module must implement a `stopService` method - await Promise.all(modules.map((module:StreamingDuniterModule) => module.stopService())) + await Promise.all(modules.map((module:DuniterService) => module.stopService())) // // Stop reading inputs // for (const module of streams.input) module.unpipe(); // Stop reading from global INPUT diff --git a/package.json b/package.json index ee115a016408059cdea7f8a80e405bdbc6304934..28e76655ca6f57058357c1c8b20808b1582ecbf9 100644 --- a/package.json +++ b/package.json @@ -85,7 +85,6 @@ "@types/node": "^8.0.9", "@types/should": "^8.3.0", "coveralls": "2.11.4", - "duniter-crawler": "1.3.x", "duniter-ui": "1.3.x", "eslint": "3.13.1", "eslint-plugin-mocha": "4.8.0", @@ -100,7 +99,6 @@ "typescript": "^2.4.1" }, "peerDependencies": { - "duniter-crawler": "1.3.x", "duniter-ui": "1.3.x" }, "bin": { diff --git a/test/fast/modules/crawler/block_pulling.ts b/test/fast/modules/crawler/block_pulling.ts new file mode 100644 index 0000000000000000000000000000000000000000..709cf9f591a93d9bb3c98e5484a7eed2ec3d7f54 --- /dev/null +++ b/test/fast/modules/crawler/block_pulling.ts @@ -0,0 +1,265 @@ +import {AbstractDAO} from "../../../../app/modules/crawler/lib/pulling" +import {BlockDTO} from "../../../../app/lib/dto/BlockDTO" +import {NewLogger} from "../../../../app/lib/logger" + +const should = require('should'); +const _ = require('underscore'); + +let commonConf = { + swichOnTimeAheadBy: 30, + avgGenTime: 30 * 60, + forksize: 100 +}; + +describe('Pulling blocks', () => { + + it('from genesis with good sidechain should work', pullinTest({ + blockchain: [ + newBlock(0, 'A') + ], + sidechains: [ + [ + newBlock(0, 'A'), + newBlock(1, 'A') // <-- 1) checks this block: is good, we add it + ] + ], + expectHash: 'A1' + })); + + it('from genesis with fork sidechain should not work', pullinTest({ + blockchain: [ + newBlock(0, 'A') + ], + sidechains: [ + [ + newBlock(0, 'B'), // <-- 2) oh no this not common with blockchain A, leave this blockchain B alone + newBlock(1, 'B') // <-- 1) checks this block: ah, a fork! let's find common root ... + ] + ], + expectHash: 'A0' + })); + + it('from genesis with multiple good sidechains should work', pullinTest({ + blockchain: [ + newBlock(0, 'A') + ], + sidechains: [ + [ + newBlock(0, 'A'), + newBlock(1, 'A'), // <-- 1) checks this block: is good, we add it + newBlock(2, 'A') // <-- 2) checks this block: is good, we add it + ], + [ + newBlock(0, 'A'), + newBlock(1, 'A') // <-- 3) you are a bit late ... we are on A2 yet! + ], + [ + newBlock(0, 'A'), + newBlock(1, 'A'), + newBlock(2, 'A'), + newBlock(3, 'A') // <-- 4) checks this block: is good, we add it + ], + [ + newBlock(0, 'A'), + newBlock(1, 'A') // <-- 5 really too late + ] + ], + expectHash: 'A3' + })); + + it('sync with a single fork', pullinTest({ + blockchain: [ + newBlock(0, 'A'), + newBlock(1, 'A'), + newBlock(2, 'A'), + newBlock(3, 'A') + ], + sidechains: [ + [ + newBlock(0, 'A'), // <-- 2) sees a common root, yet not *the* common root (A1 is not a fork block) + newBlock(1, 'A'), // <-- 4) yep this is the good one! sync from B2 to B5 + newBlock(2, 'B'), // <-- 3) check the middle, not the common root + newBlock(3, 'B'), + newBlock(4, 'B'), // <-- 1) checks this block: a fork, let's find common root + newBlock(5, 'B') + ] + ], + expectHash: 'B5' + })); + + it('sync with multiple forks', pullinTest({ + blockchain: [ + newBlock(0, 'A'), + newBlock(1, 'A'), + newBlock(2, 'A'), + newBlock(3, 'A') + ], + sidechains: [ + [ + newBlock(0, 'A'), // <-- 2) sees a common root, yet not *the* common root (A1 is not a fork block) + newBlock(1, 'A'), // <-- 4) yep this is the good one! sync from B2 to B5 + newBlock(2, 'B'), // <-- 3) check the middle, not the common root + newBlock(3, 'B'), + newBlock(4, 'B'), // <-- 1) checks this block: a fork, let's find common root + newBlock(5, 'B') + ], + // This fork should not be followed because we switch only one time per pulling, and B5 is already OK + [ + newBlock(0, 'A'), + newBlock(1, 'A'), + newBlock(2, 'B'), + newBlock(3, 'B'), + newBlock(4, 'B'), + newBlock(5, 'B'), + newBlock(6, 'B') + ] + ], + expectHash: 'B5' + })); + + it('sync with inconsistant fork should skip it', pullinTest({ + blockchain: [ + newBlock(0, 'A'), + newBlock(1, 'A'), + newBlock(2, 'A'), + newBlock(3, 'A') + ], + sidechains: [ + [ + newBlock(0, 'A'), // <-- 2) sees a common root, yet not *the* common root (A1 is not a fork block) + qwaBlock(1, 'A'), // <-- 4) checks the middle: the block has changed and now displays C! this is inconsistent + newBlock(2, 'C'), // <-- 3) checks the middle (binary search): too high, go downwards + newBlock(3, 'C'), + newBlock(4, 'C'), // <-- 1) sees a fork, try to find common root + newBlock(5, 'C') + ] + ], + expectHash: 'A3' + })); +}); + +function newBlock(number:number, branch:string, rootBranch = null, quantum = false) { + let previousNumber = number - 1; + let previousBranch:any = rootBranch || branch; + let previousHash = previousNumber >= 0 ? previousBranch + previousNumber : ''; + return { + number: number, + medianTime: number * 30 * 60, + hash: branch + number, + previousHash: previousHash, + // this is not a real field, just here for the sake of demonstration: a quantum block changes itself + // when we consult it, making the chain inconsistent + quantum: quantum + }; +} + +function qwaBlock(number:number, branch:any, rootBranch = null) { + return newBlock(number, branch, rootBranch, true); +} + +function pullinTest(testConfiguration:any) { + return async () => { + + // The blockchains we are testing against + let blockchain = testConfiguration.blockchain; + let sidechains = testConfiguration.sidechains; + + // The data access object simulating network access + let dao = new mockDao(blockchain, sidechains) + + // The very last block of a blockchain should have the good number + const local = await dao.localCurrent() + local.should.have.property('number').equal(blockchain[blockchain.length - 1].number); + + // And after we make a pulling... + await dao.pull(commonConf, NewLogger()) + + // We test the new local blockchain current block (it should have changed in case of successful pull) + let localCurrent = await dao.localCurrent(); + if (testConfiguration.expectHash !== undefined && testConfiguration.expectHash !== null) { + localCurrent.should.have.property('hash').equal(testConfiguration.expectHash); + } + if (testConfiguration.expectFunc !== undefined && testConfiguration.expectFunc !== null) { + testConfiguration.expectFunc(dao); + } + } +} + +/** + * Network mocker + * @param blockchain + * @param sideChains + * @returns DAO + */ +class mockDao extends AbstractDAO { + + constructor( + private blockchain:any, + private sideChains:any) { + super() + } + + // Get the local blockchain current block + async localCurrent() { + return this.blockchain[this.blockchain.length - 1] + } + + // Get the remote blockchain (bc) current block + async remoteCurrent(bc:any) { + return bc[bc.length - 1] + } + + // Get the remote peers to be pulled + remotePeers() { + return Promise.resolve(this.sideChains.map((sc:any, index:number) => { + sc.pubkey = 'PUBK' + index; + return sc; + })) + } + + // Get block of given peer with given block number + async getLocalBlock(number:number) { + return this.blockchain[number] || null + } + + // Get block of given peer with given block number + async getRemoteBlock(bc:any, number:number) { + let block = bc[number] || null; + // Quantum block implementation + if (block && block.quantum) { + bc[number] = _.clone(block); + bc[number].hash = 'Q' + block.hash; + } + return block; + } + + // Simulate the adding of a single new block on local blockchain + async applyMainBranch(block:BlockDTO) { + return this.blockchain.push(block) + } + + // Clean the eventual fork blocks already registered in DB (since real fork mechanism uses them, so we want + // every old fork block to be removed) + async removeForks() { + return true + } + +// Tells wether given peer is a member peer + async isMemberPeer() { + return true; + } + + // Simulates the downloading of blocks from a peer + async downloadBlocks(bc:any, fromNumber:number, count:number) { + if (!count) { + const block = await this.getRemoteBlock(bc, fromNumber); + if (block) { + return [block]; + } + else { + return []; + } + } + return bc.slice(fromNumber, fromNumber + count); + } +} diff --git a/test/fast/modules/crawler/peers_garbaging.js b/test/fast/modules/crawler/peers_garbaging.js new file mode 100644 index 0000000000000000000000000000000000000000..a29e0846af2b1728be792ca877195bb534e586a9 --- /dev/null +++ b/test/fast/modules/crawler/peers_garbaging.js @@ -0,0 +1,42 @@ +"use strict"; +const should = require('should'); +const co = require('co'); + +const garbager = require('../../../../app/modules/crawler/lib/garbager') +const duniter = require('../../../../index') + +let stack + +describe('Peers garbaging', () => { + + before(() => { + + stack = duniter.statics.autoStack([{ + name: 'garbager', + required: { + duniter: { + + cli: [{ + name: 'garbage', + desc: 'Garbage testing', + logs: false, + onDatabaseExecute: (server, conf, program, params) => co(function*() { + yield server.dal.peerDAL.savePeer({ pubkey: 'A', version: 1, currency: 'c', first_down: null, statusTS: 1485000000000, block: '2393-H' }); + yield server.dal.peerDAL.savePeer({ pubkey: 'B', version: 1, currency: 'c', first_down: 1484827199999, statusTS: 1485000000000, block: '2393-H' }); + yield server.dal.peerDAL.savePeer({ pubkey: 'C', version: 1, currency: 'c', first_down: 1484827200000, statusTS: 1485000000000, block: '2393-H' }); + yield server.dal.peerDAL.savePeer({ pubkey: 'D', version: 1, currency: 'c', first_down: 1484820000000, statusTS: 1485000000000, block: '2393-H' }); + (yield server.dal.peerDAL.sqlListAll()).should.have.length(4); + const now = 1485000000000; + yield garbager.cleanLongDownPeers(server, now); + (yield server.dal.peerDAL.sqlListAll()).should.have.length(2); + }) + }] + } + } + }]); + }) + + it('should be able to garbage some peers', () => co(function*() { + yield stack.executeStack(['node', 'b.js', '--memory', 'garbage']); + })); +}); diff --git a/yarn.lock b/yarn.lock index ee47024d021eca7eb30a8111af5621e31f69ee0b..4360e3cbf4de090125bfd291cbc1585f1447843f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -938,11 +938,11 @@ escape-html@~1.0.3: version "1.0.3" resolved "https://registry.yarnpkg.com/escape-html/-/escape-html-1.0.3.tgz#0258eae4d3d0c0974de1c169188ef0051d1d1988" -escape-string-regexp@1.0.2: +escape-string-regexp@1.0.2, escape-string-regexp@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-1.0.2.tgz#4dbc2fe674e71949caf3fb2695ce7f2dc1d9a8d1" -escape-string-regexp@^1.0.2, escape-string-regexp@^1.0.5: +escape-string-regexp@^1.0.5: version "1.0.5" resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz#1b61c0562190a8dff6ae3bb2cf0200ca130b86d4"