diff --git a/app/lib/ws2p/WS2PCluster.ts b/app/lib/ws2p/WS2PCluster.ts index 84669505e9e6818445f04328f0ffc8782a5d0f56..90af4d2f57d401408682b530dd047c0cdad6388b 100644 --- a/app/lib/ws2p/WS2PCluster.ts +++ b/app/lib/ws2p/WS2PCluster.ts @@ -5,6 +5,7 @@ import {WS2PConnection} from "./WS2PConnection" import {randomPick} from "../common-libs/randomPick" import {CrawlerConstants} from "../../modules/crawler/lib/constants" import {WS2PBlockPuller} from "./WS2PBlockPuller" +import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller" const nuuid = require('node-uuid') @@ -64,4 +65,13 @@ export class WS2PCluster { this.server.pullingEvent('end', current.number) } } + + async pullDocpool() { + const connections = await this.getAllConnections() + const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) + await Promise.all(chosen.map(async (conn) => { + const puller = new WS2PDocpoolPuller(this.server, conn) + await puller.pull() + })) + } } \ No newline at end of file diff --git a/app/lib/ws2p/WS2PDocpoolPuller.ts b/app/lib/ws2p/WS2PDocpoolPuller.ts new file mode 100644 index 0000000000000000000000000000000000000000..d95da8a5ccee2289c92429a261f442cfea9c022c --- /dev/null +++ b/app/lib/ws2p/WS2PDocpoolPuller.ts @@ -0,0 +1,22 @@ +import {Server} from "../../../server" +import {WS2PConnection} from "./WS2PConnection" +import {WS2PRequester} from "./WS2PRequester" +import {pullSandboxToLocalServer} from "../../modules/crawler/lib/sandbox" + +export class WS2PDocpoolPuller { + + constructor( + private server:Server, + private connection:WS2PConnection + ) {} + + async pull() { + const requester = WS2PRequester.fromConnection(this.connection) + // node.pubkey = p.pubkey; + return pullSandboxToLocalServer(this.server.conf.currency, { + getRequirementsPending: (minCert = 1) => { + return requester.getRequirementsPending(minCert) + } + }, this.server, this.server.logger) + } +} diff --git a/app/lib/ws2p/WS2PRequester.ts b/app/lib/ws2p/WS2PRequester.ts index 8906ebaf91b93cc4f87005dbc48c46050f340912..7c3725d510f792bfceb6e60f5f977488d0427d1e 100644 --- a/app/lib/ws2p/WS2PRequester.ts +++ b/app/lib/ws2p/WS2PRequester.ts @@ -2,6 +2,7 @@ import {WS2PConnection} from "./WS2PConnection" import {BlockDTO} from "../dto/BlockDTO" export enum WS2P_REQ { + WOT_REQUIREMENTS_OF_PENDING, BLOCKS_CHUNK, BLOCK_BY_NUMBER, CURRENT @@ -28,6 +29,10 @@ export class WS2PRequester { return this.query(WS2P_REQ.BLOCKS_CHUNK, { count, fromNumber }) } + async getRequirementsPending(minCert = 1): Promise<any> { + return this.query(WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING, { minCert }) + } + private query(req:WS2P_REQ, params:any = {}): Promise<any> { return this.ws2pc.request({ name: WS2P_REQ[req], diff --git a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts index d4f75a0bdd1a3ec75d0f8530877d0fbfcde37946..8f7675233e424f83fc7e10addb8808f553016f48 100644 --- a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts +++ b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts @@ -25,4 +25,12 @@ export class WS2PReqMapperByServer implements WS2PReqMapper { } return this.server.dal.getBlocksBetween(from, from + count - 1) } + + async getRequirementsOfPending(minsig: number): Promise<any> { + const identities = await this.server.dal.idtyDAL.query('SELECT i.*, count(c.sig) as nbSig FROM idty i, cert c WHERE c.target = i.hash group by i.hash having nbSig >= ?', minsig) + const all = await this.server.BlockchainService.requirementsOfIdentities(identities, false) + return { + identities: all + } + } } \ No newline at end of file diff --git a/app/lib/ws2p/interface/WS2PReqMapper.ts b/app/lib/ws2p/interface/WS2PReqMapper.ts index fb9cdeb43233b6cd9015b72f6f45d28564f964b8..ded05ead01d3c1a7c99f977045a3c54956c605e6 100644 --- a/app/lib/ws2p/interface/WS2PReqMapper.ts +++ b/app/lib/ws2p/interface/WS2PReqMapper.ts @@ -5,4 +5,5 @@ export interface WS2PReqMapper { getCurrent(): Promise<BlockDTO> getBlock(number:number): Promise<BlockDTO[]> getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> + getRequirementsOfPending(minCert:number): Promise<any> } \ No newline at end of file diff --git a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts index a4d71ab7556413ec4a53658880510c49fc705450..47cd0a0645c50fea24d4c7cfdfa092da6c326bc1 100644 --- a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts +++ b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts @@ -93,6 +93,13 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { const fromNumber:number = data.params.fromNumber body = await this.mapper.getBlocks(count, fromNumber) break; + case WS2P_REQ[WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING]: + if (isNaN(data.params.minCert)) { + throw "Wrong param `minCert`" + } + const minCert:number = data.params.minCert + body = await this.mapper.getRequirementsOfPending(minCert) + break; default: throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) } diff --git a/test/integration/ws2p_docpool.ts b/test/integration/ws2p_docpool.ts new file mode 100644 index 0000000000000000000000000000000000000000..e3745292034d7f816f30fd20df14e7f519cca2d2 --- /dev/null +++ b/test/integration/ws2p_docpool.ts @@ -0,0 +1,74 @@ +import {simpleTestingConf, simpleTestingServer, simpleUser, simpleWS2PNetwork, TestingServer} from "./tools/toolbox" +import {WS2PCluster} from "../../app/lib/ws2p/WS2PCluster" +import {ProverDependency} from "../../app/modules/prover/index" + +const assert = require('assert') + +describe("WS2P docpool pulling", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, wss:any + let cluster2:WS2PCluster + let cat:any, tac:any, toc:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'} + + let b0, b1, b2 + + before(async () => { + const conf1 = simpleTestingConf(now, catKeyring) + const conf2 = simpleTestingConf(now, tacKeyring) + s1 = simpleTestingServer(conf1) + s2 = simpleTestingServer(conf2) + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + toc = simpleUser('toc', tocKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + + await cat.createIdentity(); + await tac.createIdentity(); + await cat.cert(tac); + await tac.cert(cat); + await cat.join(); + await tac.join(); + + b0 = await s1.commit({ time: now }) + b1 = await s1.commit({ time: now }) + b2 = await s1.commit({ time: now }) + + await s2.writeBlock(b0) + await s2.writeBlock(b1) + await s2.writeBlock(b2) + await s2.waitToHaveBlock(2) + }) + + after(() => wss.close()) + + it('should have b#2 on s1 and s2', async () => { + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + }) + + it('should be able to pull the docpool', async () => { + await toc.createIdentity(); + await cat.cert(toc); + await toc.join(); + const network = await simpleWS2PNetwork(s1, s2) + wss = network.wss + cluster2 = network.cluster2 + ProverDependency.duniter.methods.hookServer(s1._server) + await cluster2.pullDocpool() + await s2.expect('/wot/lookup/toc', (res:any) => { + assert.equal(res.results.length, 1) + assert.equal(res.results[0].uids[0].others.length, 1) + }) + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + }) +})