From ab70e76a4620882d2a20b217185e2b0b29f17600 Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Wed, 6 Sep 2017 07:56:30 +0200 Subject: [PATCH] [enh] #1084 WS2P: implement docpool pulling --- app/lib/ws2p/WS2PCluster.ts | 10 +++ app/lib/ws2p/WS2PDocpoolPuller.ts | 22 ++++++ app/lib/ws2p/WS2PRequester.ts | 5 ++ app/lib/ws2p/impl/WS2PReqMapperByServer.ts | 8 ++ app/lib/ws2p/interface/WS2PReqMapper.ts | 1 + .../interface/WS2PServerMessageHandler.ts | 7 ++ test/integration/ws2p_docpool.ts | 74 +++++++++++++++++++ 7 files changed, 127 insertions(+) create mode 100644 app/lib/ws2p/WS2PDocpoolPuller.ts create mode 100644 test/integration/ws2p_docpool.ts diff --git a/app/lib/ws2p/WS2PCluster.ts b/app/lib/ws2p/WS2PCluster.ts index 84669505e..90af4d2f5 100644 --- a/app/lib/ws2p/WS2PCluster.ts +++ b/app/lib/ws2p/WS2PCluster.ts @@ -5,6 +5,7 @@ import {WS2PConnection} from "./WS2PConnection" import {randomPick} from "../common-libs/randomPick" import {CrawlerConstants} from "../../modules/crawler/lib/constants" import {WS2PBlockPuller} from "./WS2PBlockPuller" +import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller" const nuuid = require('node-uuid') @@ -64,4 +65,13 @@ export class WS2PCluster { this.server.pullingEvent('end', current.number) } } + + async pullDocpool() { + const connections = await this.getAllConnections() + const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) + await Promise.all(chosen.map(async (conn) => { + const puller = new WS2PDocpoolPuller(this.server, conn) + await puller.pull() + })) + } } \ No newline at end of file diff --git a/app/lib/ws2p/WS2PDocpoolPuller.ts b/app/lib/ws2p/WS2PDocpoolPuller.ts new file mode 100644 index 000000000..d95da8a5c --- /dev/null +++ b/app/lib/ws2p/WS2PDocpoolPuller.ts @@ -0,0 +1,22 @@ +import {Server} from "../../../server" +import {WS2PConnection} from "./WS2PConnection" +import {WS2PRequester} from "./WS2PRequester" +import {pullSandboxToLocalServer} from "../../modules/crawler/lib/sandbox" + +export class WS2PDocpoolPuller { + + constructor( + private server:Server, + private connection:WS2PConnection + ) {} + + async pull() { + const requester = WS2PRequester.fromConnection(this.connection) + // node.pubkey = p.pubkey; + return pullSandboxToLocalServer(this.server.conf.currency, { + getRequirementsPending: (minCert = 1) => { + return requester.getRequirementsPending(minCert) + } + }, this.server, this.server.logger) + } +} diff --git a/app/lib/ws2p/WS2PRequester.ts b/app/lib/ws2p/WS2PRequester.ts index 8906ebaf9..7c3725d51 100644 --- a/app/lib/ws2p/WS2PRequester.ts +++ b/app/lib/ws2p/WS2PRequester.ts @@ -2,6 +2,7 @@ import {WS2PConnection} from "./WS2PConnection" import {BlockDTO} from "../dto/BlockDTO" export enum WS2P_REQ { + WOT_REQUIREMENTS_OF_PENDING, BLOCKS_CHUNK, BLOCK_BY_NUMBER, CURRENT @@ -28,6 +29,10 @@ export class WS2PRequester { return this.query(WS2P_REQ.BLOCKS_CHUNK, { count, fromNumber }) } + async getRequirementsPending(minCert = 1): Promise<any> { + return this.query(WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING, { minCert }) + } + private query(req:WS2P_REQ, params:any = {}): Promise<any> { return this.ws2pc.request({ name: WS2P_REQ[req], diff --git a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts index d4f75a0bd..8f7675233 100644 --- a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts +++ b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts @@ -25,4 +25,12 @@ export class WS2PReqMapperByServer implements WS2PReqMapper { } return this.server.dal.getBlocksBetween(from, from + count - 1) } + + async getRequirementsOfPending(minsig: number): Promise<any> { + const identities = await this.server.dal.idtyDAL.query('SELECT i.*, count(c.sig) as nbSig FROM idty i, cert c WHERE c.target = i.hash group by i.hash having nbSig >= ?', minsig) + const all = await this.server.BlockchainService.requirementsOfIdentities(identities, false) + return { + identities: all + } + } } \ No newline at end of file diff --git a/app/lib/ws2p/interface/WS2PReqMapper.ts b/app/lib/ws2p/interface/WS2PReqMapper.ts index fb9cdeb43..ded05ead0 100644 --- a/app/lib/ws2p/interface/WS2PReqMapper.ts +++ b/app/lib/ws2p/interface/WS2PReqMapper.ts @@ -5,4 +5,5 @@ export interface WS2PReqMapper { getCurrent(): Promise<BlockDTO> getBlock(number:number): Promise<BlockDTO[]> getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> + getRequirementsOfPending(minCert:number): Promise<any> } \ No newline at end of file diff --git a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts index a4d71ab75..47cd0a064 100644 --- a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts +++ b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts @@ -93,6 +93,13 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { const fromNumber:number = data.params.fromNumber body = await this.mapper.getBlocks(count, fromNumber) break; + case WS2P_REQ[WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING]: + if (isNaN(data.params.minCert)) { + throw "Wrong param `minCert`" + } + const minCert:number = data.params.minCert + body = await this.mapper.getRequirementsOfPending(minCert) + break; default: throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) } diff --git a/test/integration/ws2p_docpool.ts b/test/integration/ws2p_docpool.ts new file mode 100644 index 000000000..e37452920 --- /dev/null +++ b/test/integration/ws2p_docpool.ts @@ -0,0 +1,74 @@ +import {simpleTestingConf, simpleTestingServer, simpleUser, simpleWS2PNetwork, TestingServer} from "./tools/toolbox" +import {WS2PCluster} from "../../app/lib/ws2p/WS2PCluster" +import {ProverDependency} from "../../app/modules/prover/index" + +const assert = require('assert') + +describe("WS2P docpool pulling", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, wss:any + let cluster2:WS2PCluster + let cat:any, tac:any, toc:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'} + + let b0, b1, b2 + + before(async () => { + const conf1 = simpleTestingConf(now, catKeyring) + const conf2 = simpleTestingConf(now, tacKeyring) + s1 = simpleTestingServer(conf1) + s2 = simpleTestingServer(conf2) + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + toc = simpleUser('toc', tocKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + + await cat.createIdentity(); + await tac.createIdentity(); + await cat.cert(tac); + await tac.cert(cat); + await cat.join(); + await tac.join(); + + b0 = await s1.commit({ time: now }) + b1 = await s1.commit({ time: now }) + b2 = await s1.commit({ time: now }) + + await s2.writeBlock(b0) + await s2.writeBlock(b1) + await s2.writeBlock(b2) + await s2.waitToHaveBlock(2) + }) + + after(() => wss.close()) + + it('should have b#2 on s1 and s2', async () => { + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + }) + + it('should be able to pull the docpool', async () => { + await toc.createIdentity(); + await cat.cert(toc); + await toc.join(); + const network = await simpleWS2PNetwork(s1, s2) + wss = network.wss + cluster2 = network.cluster2 + ProverDependency.duniter.methods.hookServer(s1._server) + await cluster2.pullDocpool() + await s2.expect('/wot/lookup/toc', (res:any) => { + assert.equal(res.results.length, 1) + assert.equal(res.results[0].uids[0].others.length, 1) + }) + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + }) +}) -- GitLab