From 76a34a5a1d66f1bf42f77bdc646b1461e4988ce0 Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Tue, 5 Sep 2017 20:44:18 +0200 Subject: [PATCH] [enh] #1084 WS2P: implement block pulling --- .eslintignore | 1 + app/lib/common-libs/randomPick.ts | 11 ++ app/lib/ws2p/WS2PBlockPuller.ts | 129 ++++++++++++++++++ app/lib/ws2p/WS2PClient.ts | 4 +- app/lib/ws2p/WS2PCluster.ts | 35 ++++- app/lib/ws2p/WS2PConnection.ts | 16 ++- app/lib/ws2p/WS2PRequester.ts | 20 ++- app/lib/ws2p/impl/WS2PReqMapperByServer.ts | 17 +++ app/lib/ws2p/interface/WS2PReqMapper.ts | 2 + .../interface/WS2PServerMessageHandler.ts | 23 +++- app/modules/crawler/lib/constants.ts | 2 + app/modules/crawler/lib/crawler.ts | 19 +-- server.ts | 14 ++ test/integration/tools/toolbox.ts | 10 +- test/integration/ws2p_connection.ts | 6 +- test/integration/ws2p_pulling.ts | 68 +++++++++ 16 files changed, 343 insertions(+), 34 deletions(-) create mode 100644 app/lib/common-libs/randomPick.ts create mode 100644 app/lib/ws2p/WS2PBlockPuller.ts create mode 100644 test/integration/ws2p_pulling.ts diff --git a/.eslintignore b/.eslintignore index 7e4d97cdf..5d024870e 100644 --- a/.eslintignore +++ b/.eslintignore @@ -19,6 +19,7 @@ app/lib/system/*.js app/lib/streams/*.js app/lib/helpers/*.js app/lib/ws2p/*.js +app/lib/ws2p/*/*.js app/lib/*.js app/modules/wizard.js app/modules/router.js diff --git a/app/lib/common-libs/randomPick.ts b/app/lib/common-libs/randomPick.ts new file mode 100644 index 000000000..fa64c73de --- /dev/null +++ b/app/lib/common-libs/randomPick.ts @@ -0,0 +1,11 @@ + +export const randomPick = <T>(elements:T[], max:number) => { + const chosen:T[] = [] + const nbElements = elements.length + for (let i = 0; i < Math.min(nbElements, max); i++) { + const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbElements) - i, 0) + chosen.push(elements[randIndex]) + elements.splice(randIndex, 1) + } + return chosen +} \ No newline at end of file diff --git a/app/lib/ws2p/WS2PBlockPuller.ts b/app/lib/ws2p/WS2PBlockPuller.ts new file mode 100644 index 000000000..3613257cc --- /dev/null +++ b/app/lib/ws2p/WS2PBlockPuller.ts @@ -0,0 +1,129 @@ +import {BlockDTO} from "../dto/BlockDTO" +import {AbstractDAO} from "../../modules/crawler/lib/pulling" +import {Server} from "../../../server" +import {DBBlock} from "../db/DBBlock" +import {PeerDTO} from "../dto/PeerDTO" +import {CrawlerConstants} from "../../modules/crawler/lib/constants" +import {tx_cleaner} from "../../modules/crawler/lib/tx_cleaner" +import {WS2PConnection} from "./WS2PConnection" +import {WS2PRequester} from "./WS2PRequester" + +export class WS2PBlockPuller { + + constructor( + private server:Server, + private connection:WS2PConnection + ) {} + + async pull() { + const requester = WS2PRequester.fromConnection(this.connection) + // node.pubkey = p.pubkey; + let dao = new WS2PDao(this.server, requester) + await dao.pull(this.server.conf, this.server.logger) + } +} + +interface RemoteNode { + getCurrent: () => Promise<BlockDTO> + getBlock: (number:number) => Promise<BlockDTO> + getBlocks: (count:number, fromNumber:number) => Promise<BlockDTO[]> +} + +class WS2PDao extends AbstractDAO { + + private node:RemoteNode + private lastDownloaded:BlockDTO|null + private nodeCurrent:BlockDTO|null = null + public newCurrent:BlockDTO|null = null + + constructor( + private server:Server, + private requester:WS2PRequester + ) { + super() + this.node = { + getCurrent: async () => { + return this.requester.getCurrent() + }, + getBlock: async (number:number) => { + return this.requester.getBlock(number) + }, + getBlocks: async (count:number, fromNumber:number) => { + return this.requester.getBlocks(count, fromNumber) + } + } + } + + async localCurrent(): Promise<DBBlock | null> { + return this.server.dal.getCurrentBlockOrNull() + } + + async remoteCurrent(source: RemoteNode): Promise<BlockDTO | null> { + this.nodeCurrent = await source.getCurrent() + return this.nodeCurrent + } + + async remotePeers(source?: any): Promise<PeerDTO[]> { + const peer:any = this.node + return Promise.resolve([peer]) + } + + async getLocalBlock(number: number): Promise<DBBlock> { + return this.server.dal.getBlock(number) + } + + async getRemoteBlock(thePeer: any, number: number): Promise<BlockDTO> { + let block = null; + try { + block = await thePeer.getBlock(number); + tx_cleaner(block.transactions); + } catch (e) { + if (e.httpCode != 404) { + throw e; + } + } + return block; + } + + async applyMainBranch(block: BlockDTO): Promise<boolean> { + const existing = await this.server.dal.getAbsoluteBlockByNumberAndHash(block.number, block.hash) + if (!existing) { + let addedBlock = await this.server.writeBlock(block, false, true) + if (!this.lastDownloaded) { + this.lastDownloaded = await this.remoteCurrent(this.node) + } + this.server.pullingEvent('applying', {number: block.number, last: this.lastDownloaded && this.lastDownloaded.number}) + if (addedBlock) { + this.newCurrent = addedBlock + // Emit block events (for sharing with the network) only in forkWindowSize + if (this.nodeCurrent && this.nodeCurrent.number - addedBlock.number < this.server.conf.forksize) { + this.server.streamPush(addedBlock); + } + } + } + return true + } + + async removeForks(): Promise<boolean> { + return true + } + + async isMemberPeer(thePeer: PeerDTO): Promise<boolean> { + return true + } + + async downloadBlocks(thePeer: any, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> { + if (!count) { + count = CrawlerConstants.CRAWL_BLOCK_CHUNK + } + + let blocks = await thePeer.getBlocks(count, fromNumber); + // Fix for #734 + for (const block of blocks) { + for (const tx of block.transactions) { + tx.version = CrawlerConstants.TRANSACTION_VERSION; + } + } + return blocks; + } +} diff --git a/app/lib/ws2p/WS2PClient.ts b/app/lib/ws2p/WS2PClient.ts index ca5104817..f1eac48ea 100644 --- a/app/lib/ws2p/WS2PClient.ts +++ b/app/lib/ws2p/WS2PClient.ts @@ -6,6 +6,8 @@ import {Key} from "../common-libs/crypto/keyring" export class WS2PClient { + private constructor(public connection:WS2PConnection) {} + static async connectTo(server:Server, host:string, port:number) { const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec) const c = WS2PConnection.newConnectionToAddress( @@ -23,6 +25,6 @@ export class WS2PClient { // Connecting await c.connect() - return c + return new WS2PClient(c) } } \ No newline at end of file diff --git a/app/lib/ws2p/WS2PCluster.ts b/app/lib/ws2p/WS2PCluster.ts index bc774ea42..84669505e 100644 --- a/app/lib/ws2p/WS2PCluster.ts +++ b/app/lib/ws2p/WS2PCluster.ts @@ -2,6 +2,9 @@ import {WS2PServer} from "./WS2PServer" import {Server} from "../../../server" import {WS2PClient} from "./WS2PClient" import {WS2PConnection} from "./WS2PConnection" +import {randomPick} from "../common-libs/randomPick" +import {CrawlerConstants} from "../../modules/crawler/lib/constants" +import {WS2PBlockPuller} from "./WS2PBlockPuller" const nuuid = require('node-uuid') @@ -28,9 +31,37 @@ export class WS2PCluster { const uuid = nuuid.v4() const ws2pc = await WS2PClient.connectTo(this.server, host, port) this.ws2pClients[uuid] = ws2pc - ws2pc.closed.then(() => { + ws2pc.connection.closed.then(() => { delete this.ws2pClients[uuid] }) - return ws2pc + return ws2pc.connection + } + + async getAllConnections() { + const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : [] + for (const uuid of Object.keys(this.ws2pClients)) { + all.push(this.ws2pClients[uuid].connection) + } + return all + } + + async pullBlocks() { + const connections = await this.getAllConnections() + const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) + + await Promise.all(chosen.map(async (conn) => { + const puller = new WS2PBlockPuller(this.server, conn) + await puller.pull() + })) + + await this.server.BlockchainService.pushFIFO("WS2PCrawlerResolution", async () => { + await this.server.BlockchainService.blockResolution() + await this.server.BlockchainService.forkResolution() + }) + + const current = await this.server.dal.getCurrentBlockOrNull() + if (current) { + this.server.pullingEvent('end', current.number) + } } } \ No newline at end of file diff --git a/app/lib/ws2p/WS2PConnection.ts b/app/lib/ws2p/WS2PConnection.ts index 9d598f47c..1a85b5242 100644 --- a/app/lib/ws2p/WS2PConnection.ts +++ b/app/lib/ws2p/WS2PConnection.ts @@ -200,7 +200,8 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { } export interface WS2PRequest { - name:string + name:string, + params?:any } /** @@ -214,7 +215,7 @@ export class WS2PConnection { private connectp:Promise<any>|undefined private connectedp:Promise<string> private connectedResolve:(pub:string)=>void - private connectedReject:()=>void + private connectedReject:(e:any)=>void private nbErrors = 0 private nbRequestsCount = 0 private nbResponsesCount = 0 @@ -442,8 +443,12 @@ export class WS2PConnection { // Request message else if (data.reqId && typeof data.reqId === "string") { - const answer = await this.messageHandler.answerToRequest(data.body) - this.ws.send(JSON.stringify({ resId: data.reqId, body: answer })) + try { + const answer = await this.messageHandler.answerToRequest(data.body) + this.ws.send(JSON.stringify({ resId: data.reqId, body: answer })) + } catch (e) { + this.ws.send(JSON.stringify({ resId: data.reqId, err: e })) + } } // Answer message @@ -470,7 +475,8 @@ export class WS2PConnection { this.connectedResolve(this.remoteAuth.getPubkey()) } catch (e) { - this.connectedReject() + this.connectedReject(e) + throw e } })() } diff --git a/app/lib/ws2p/WS2PRequester.ts b/app/lib/ws2p/WS2PRequester.ts index 5f0cc3cc9..8906ebaf9 100644 --- a/app/lib/ws2p/WS2PRequester.ts +++ b/app/lib/ws2p/WS2PRequester.ts @@ -1,6 +1,9 @@ import {WS2PConnection} from "./WS2PConnection" +import {BlockDTO} from "../dto/BlockDTO" -enum WS2P_REQ { +export enum WS2P_REQ { + BLOCKS_CHUNK, + BLOCK_BY_NUMBER, CURRENT } @@ -13,13 +16,22 @@ export class WS2PRequester { return new WS2PRequester(ws2pc) } - getCurrent() { + getCurrent(): Promise<BlockDTO> { return this.query(WS2P_REQ.CURRENT) } - private query(req:WS2P_REQ) { + getBlock(number:number): Promise<BlockDTO> { + return this.query(WS2P_REQ.BLOCK_BY_NUMBER, { number }) + } + + getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> { + return this.query(WS2P_REQ.BLOCKS_CHUNK, { count, fromNumber }) + } + + private query(req:WS2P_REQ, params:any = {}): Promise<any> { return this.ws2pc.request({ - name: WS2P_REQ[req] + name: WS2P_REQ[req], + params: params }) } } \ No newline at end of file diff --git a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts index 9f0101a4d..d4f75a0bd 100644 --- a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts +++ b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts @@ -1,5 +1,6 @@ import {Server} from "../../../../server" import {WS2PReqMapper} from "../interface/WS2PReqMapper" +import {BlockDTO} from "../../dto/BlockDTO" export class WS2PReqMapperByServer implements WS2PReqMapper { @@ -8,4 +9,20 @@ export class WS2PReqMapperByServer implements WS2PReqMapper { async getCurrent() { return this.server.BlockchainService.current() } + + getBlock(number: number): Promise<BlockDTO[]> { + return this.server.dal.getBlock(number) + } + + async getBlocks(count: number, from: number): Promise<BlockDTO[]> { + if (count > 5000) { + throw 'Count is too high' + } + const current = await this.server.dal.getCurrentBlockOrNull() + count = Math.min(current.number - from + 1, count) + if (!current || current.number < from) { + return [] + } + return this.server.dal.getBlocksBetween(from, from + count - 1) + } } \ No newline at end of file diff --git a/app/lib/ws2p/interface/WS2PReqMapper.ts b/app/lib/ws2p/interface/WS2PReqMapper.ts index dbe16a237..fb9cdeb43 100644 --- a/app/lib/ws2p/interface/WS2PReqMapper.ts +++ b/app/lib/ws2p/interface/WS2PReqMapper.ts @@ -3,4 +3,6 @@ import {BlockDTO} from "../../dto/BlockDTO" export interface WS2PReqMapper { getCurrent(): Promise<BlockDTO> + getBlock(number:number): Promise<BlockDTO[]> + getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> } \ No newline at end of file diff --git a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts index 8ed051fcf..a4d71ab75 100644 --- a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts +++ b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts @@ -9,10 +9,7 @@ import {CertificationDTO} from "../../dto/CertificationDTO" import {MembershipDTO} from "../../dto/MembershipDTO" import {TransactionDTO} from "../../dto/TransactionDTO" import {PeerDTO} from "../../dto/PeerDTO" - -enum WS2P_REQ { - CURRENT -} +import {WS2P_REQ} from "../WS2PRequester" export enum WS2P_REQERROR { UNKNOWN_REQUEST @@ -78,6 +75,24 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { case WS2P_REQ[WS2P_REQ.CURRENT]: body = await this.mapper.getCurrent() break; + case WS2P_REQ[WS2P_REQ.BLOCK_BY_NUMBER]: + if (isNaN(data.params.number)) { + throw "Wrong param `number`" + } + const number:number = data.params.number + body = await this.mapper.getBlock(number) + break; + case WS2P_REQ[WS2P_REQ.BLOCKS_CHUNK]: + if (isNaN(data.params.count)) { + throw "Wrong param `count`" + } + if (isNaN(data.params.fromNumber)) { + throw "Wrong param `fromNumber`" + } + const count:number = data.params.count + const fromNumber:number = data.params.fromNumber + body = await this.mapper.getBlocks(count, fromNumber) + break; default: throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) } diff --git a/app/modules/crawler/lib/constants.ts b/app/modules/crawler/lib/constants.ts index 8b19cba39..533ab0088 100644 --- a/app/modules/crawler/lib/constants.ts +++ b/app/modules/crawler/lib/constants.ts @@ -10,6 +10,8 @@ export const CrawlerConstants = { FORK_ALLOWED: true, MAX_NUMBER_OF_PEERS_FOR_PULLING: 4, PULLING_MINIMAL_DELAY: 20, + CRAWL_BLOCK_CHUNK: 50, // During a crawl, the quantity of blocks to download + CRAWL_PEERS_COUNT: 4, PULLING_INTERVAL_TARGET: 240, COUNT_FOR_ENOUGH_PEERS: 4, SANDBOX_FIRST_PULL_DELAY: 1000 * 60 * 10, // milliseconds diff --git a/app/modules/crawler/lib/crawler.ts b/app/modules/crawler/lib/crawler.ts index ef1e29638..4e1f92749 100644 --- a/app/modules/crawler/lib/crawler.ts +++ b/app/modules/crawler/lib/crawler.ts @@ -35,7 +35,7 @@ export class Crawler extends stream.Transform implements DuniterService { this.peerCrawler = new PeerCrawler(server, conf, logger) this.peerTester = new PeerTester(server, conf, logger) - this.blockCrawler = new BlockCrawler(server, logger, this) + this.blockCrawler = new BlockCrawler(server, logger) this.sandboxCrawler = new SandboxCrawler(server, conf, logger) } @@ -303,7 +303,7 @@ export class PeerTester implements DuniterService { export class BlockCrawler { - private CONST_BLOCKS_CHUNK = 50 + private CONST_BLOCKS_CHUNK = CrawlerConstants.CRAWL_BLOCK_CHUNK private pullingActualIntervalDuration = CrawlerConstants.PULLING_MINIMAL_DELAY private programStart = Date.now() private syncBlockFifo = async.queue((task:any, callback:any) => task(callback), 1) @@ -311,8 +311,7 @@ export class BlockCrawler { constructor( private server:Server, - private logger:any, - private PROCESS:stream.Transform) { + private logger:any) { } async startService() { @@ -470,17 +469,7 @@ export class BlockCrawler { } private pullingEvent(server:Server, type:string, number:any = null) { - server.push({ - pulling: { - type: type, - data: number - } - }); - if (type !== 'end') { - this.PROCESS.push({ pulling: 'processing' }); - } else { - this.PROCESS.push({ pulling: 'finished' }); - } + server.pullingEvent(type, number) } private isConnectionError(err:any) { diff --git a/server.ts b/server.ts index f872e3327..1b1f5ddfc 100644 --- a/server.ts +++ b/server.ts @@ -473,6 +473,20 @@ export class Server extends stream.Duplex implements HookableServer { } } + pullingEvent(type:string, number:any = null) { + this.push({ + pulling: { + type: type, + data: number + } + }) + if (type !== 'end') { + this.push({ pulling: 'processing' }) + } else { + this.push({ pulling: 'finished' }) + } + } + async reapplyTo(number:number) { const current = await this.BlockchainService.current(); if (current.number == number) { diff --git a/test/integration/tools/toolbox.ts b/test/integration/tools/toolbox.ts index 9d6e46032..cfdd28144 100644 --- a/test/integration/tools/toolbox.ts +++ b/test/integration/tools/toolbox.ts @@ -354,6 +354,10 @@ export class TestingServer { async writePeer(obj:any) { return this.server.writePeer(obj) } + + async pullingEvent(type:string, number:number) { + this.server.pullingEvent(type, number) + } exportAllDataAsZIP() { return this.server.exportAllDataAsZIP() @@ -609,7 +613,7 @@ export async function newWS2PBidirectionnalConnection(k1:Key, k2:Key, serverHand }) } -export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promise<{ w1: WS2PConnection; ws2pc: WS2PConnection; wss: WS2PServer }> = async (s1: TestingServer, s2: TestingServer) => { +export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promise<{ w1: WS2PConnection; ws2pc: WS2PConnection; wss: WS2PServer, cluster1:WS2PCluster, cluster2:WS2PCluster }> = async (s1: TestingServer, s2: TestingServer) => { let port = PORT++ const clientPub = s2.conf.pair.pub let w1: WS2PConnection | null @@ -627,7 +631,9 @@ export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promis return { w1, ws2pc, - wss: ws2ps + wss: ws2ps, + cluster1, + cluster2 } } diff --git a/test/integration/ws2p_connection.ts b/test/integration/ws2p_connection.ts index 25185c66e..777f44a11 100644 --- a/test/integration/ws2p_connection.ts +++ b/test/integration/ws2p_connection.ts @@ -406,6 +406,10 @@ class WS2PNoLocalAuth implements WS2PLocalAuth { class WS2PNoRemoteAuth implements WS2PRemoteAuth { + getPubkey(): string { + return "" + } + async sendACK(ws: any): Promise<void> { } @@ -435,6 +439,6 @@ class WS2PMutedHandler implements WS2PMessageHandler { } async answerToRequest(json: any): Promise<WS2PResponse> { - return {} + throw "Does not answer" } } diff --git a/test/integration/ws2p_pulling.ts b/test/integration/ws2p_pulling.ts new file mode 100644 index 000000000..df5bcff4e --- /dev/null +++ b/test/integration/ws2p_pulling.ts @@ -0,0 +1,68 @@ +import {simpleTestingConf, simpleTestingServer, simpleUser, simpleWS2PNetwork, TestingServer} from "./tools/toolbox" +import {WS2PCluster} from "../../app/lib/ws2p/WS2PCluster" + +const assert = require('assert') + +describe("WS2P block pulling", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, wss:any + let cluster2:WS2PCluster + let cat:any, tac:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + + let b0, b1, b2 + + before(async () => { + const conf1 = simpleTestingConf(now, catKeyring) + const conf2 = simpleTestingConf(now, tacKeyring) + s1 = simpleTestingServer(conf1) + s2 = simpleTestingServer(conf2) + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + + await cat.createIdentity(); + await tac.createIdentity(); + await cat.cert(tac); + await tac.cert(cat); + await cat.join(); + await tac.join(); + + b0 = await s1.commit({ time: now }) + b1 = await s1.commit({ time: now }) + b2 = await s1.commit({ time: now }) + await s1.commit({ time: now }) + await s1.commit({ time: now }) + await s1.commit({ time: now }) + await s1.commit({ time: now }) // b6 + + await s2.writeBlock(b0) + await s2.writeBlock(b1) + await s2.writeBlock(b2) + await s2.waitToHaveBlock(2) + + const network = await simpleWS2PNetwork(s1, s2) + wss = network.wss + cluster2 = network.cluster2 + }) + + after(() => wss.close()) + + it('should have b#6 on s1, b#2 on s2', async () => { + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + assert.equal(currentS1.number, 6) + assert.equal(currentS2.number, 2) + }) + + it('should be able to pull and have the same current block as a result', async () => { + await cluster2.pullBlocks() + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + assert.equal(currentS1.number, 6) + assert.equal(currentS2.number, 6) + }) +}) -- GitLab