Skip to content
Snippets Groups Projects
Commit ab70e76a authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] #1084 WS2P: implement docpool pulling

parent 76a34a5a
Branches
Tags
No related merge requests found
...@@ -5,6 +5,7 @@ import {WS2PConnection} from "./WS2PConnection" ...@@ -5,6 +5,7 @@ import {WS2PConnection} from "./WS2PConnection"
import {randomPick} from "../common-libs/randomPick" import {randomPick} from "../common-libs/randomPick"
import {CrawlerConstants} from "../../modules/crawler/lib/constants" import {CrawlerConstants} from "../../modules/crawler/lib/constants"
import {WS2PBlockPuller} from "./WS2PBlockPuller" import {WS2PBlockPuller} from "./WS2PBlockPuller"
import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller"
const nuuid = require('node-uuid') const nuuid = require('node-uuid')
...@@ -64,4 +65,13 @@ export class WS2PCluster { ...@@ -64,4 +65,13 @@ export class WS2PCluster {
this.server.pullingEvent('end', current.number) this.server.pullingEvent('end', current.number)
} }
} }
async pullDocpool() {
const connections = await this.getAllConnections()
const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT)
await Promise.all(chosen.map(async (conn) => {
const puller = new WS2PDocpoolPuller(this.server, conn)
await puller.pull()
}))
}
} }
\ No newline at end of file
import {Server} from "../../../server"
import {WS2PConnection} from "./WS2PConnection"
import {WS2PRequester} from "./WS2PRequester"
import {pullSandboxToLocalServer} from "../../modules/crawler/lib/sandbox"
export class WS2PDocpoolPuller {
constructor(
private server:Server,
private connection:WS2PConnection
) {}
async pull() {
const requester = WS2PRequester.fromConnection(this.connection)
// node.pubkey = p.pubkey;
return pullSandboxToLocalServer(this.server.conf.currency, {
getRequirementsPending: (minCert = 1) => {
return requester.getRequirementsPending(minCert)
}
}, this.server, this.server.logger)
}
}
...@@ -2,6 +2,7 @@ import {WS2PConnection} from "./WS2PConnection" ...@@ -2,6 +2,7 @@ import {WS2PConnection} from "./WS2PConnection"
import {BlockDTO} from "../dto/BlockDTO" import {BlockDTO} from "../dto/BlockDTO"
export enum WS2P_REQ { export enum WS2P_REQ {
WOT_REQUIREMENTS_OF_PENDING,
BLOCKS_CHUNK, BLOCKS_CHUNK,
BLOCK_BY_NUMBER, BLOCK_BY_NUMBER,
CURRENT CURRENT
...@@ -28,6 +29,10 @@ export class WS2PRequester { ...@@ -28,6 +29,10 @@ export class WS2PRequester {
return this.query(WS2P_REQ.BLOCKS_CHUNK, { count, fromNumber }) return this.query(WS2P_REQ.BLOCKS_CHUNK, { count, fromNumber })
} }
async getRequirementsPending(minCert = 1): Promise<any> {
return this.query(WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING, { minCert })
}
private query(req:WS2P_REQ, params:any = {}): Promise<any> { private query(req:WS2P_REQ, params:any = {}): Promise<any> {
return this.ws2pc.request({ return this.ws2pc.request({
name: WS2P_REQ[req], name: WS2P_REQ[req],
......
...@@ -25,4 +25,12 @@ export class WS2PReqMapperByServer implements WS2PReqMapper { ...@@ -25,4 +25,12 @@ export class WS2PReqMapperByServer implements WS2PReqMapper {
} }
return this.server.dal.getBlocksBetween(from, from + count - 1) return this.server.dal.getBlocksBetween(from, from + count - 1)
} }
async getRequirementsOfPending(minsig: number): Promise<any> {
const identities = await this.server.dal.idtyDAL.query('SELECT i.*, count(c.sig) as nbSig FROM idty i, cert c WHERE c.target = i.hash group by i.hash having nbSig >= ?', minsig)
const all = await this.server.BlockchainService.requirementsOfIdentities(identities, false)
return {
identities: all
}
}
} }
\ No newline at end of file
...@@ -5,4 +5,5 @@ export interface WS2PReqMapper { ...@@ -5,4 +5,5 @@ export interface WS2PReqMapper {
getCurrent(): Promise<BlockDTO> getCurrent(): Promise<BlockDTO>
getBlock(number:number): Promise<BlockDTO[]> getBlock(number:number): Promise<BlockDTO[]>
getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]>
getRequirementsOfPending(minCert:number): Promise<any>
} }
\ No newline at end of file
...@@ -93,6 +93,13 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { ...@@ -93,6 +93,13 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
const fromNumber:number = data.params.fromNumber const fromNumber:number = data.params.fromNumber
body = await this.mapper.getBlocks(count, fromNumber) body = await this.mapper.getBlocks(count, fromNumber)
break; break;
case WS2P_REQ[WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING]:
if (isNaN(data.params.minCert)) {
throw "Wrong param `minCert`"
}
const minCert:number = data.params.minCert
body = await this.mapper.getRequirementsOfPending(minCert)
break;
default: default:
throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST])
} }
......
import {simpleTestingConf, simpleTestingServer, simpleUser, simpleWS2PNetwork, TestingServer} from "./tools/toolbox"
import {WS2PCluster} from "../../app/lib/ws2p/WS2PCluster"
import {ProverDependency} from "../../app/modules/prover/index"
const assert = require('assert')
describe("WS2P docpool pulling", function() {
const now = 1500000000
let s1:TestingServer, s2:TestingServer, wss:any
let cluster2:WS2PCluster
let cat:any, tac:any, toc:any
const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}
const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'}
const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'}
let b0, b1, b2
before(async () => {
const conf1 = simpleTestingConf(now, catKeyring)
const conf2 = simpleTestingConf(now, tacKeyring)
s1 = simpleTestingServer(conf1)
s2 = simpleTestingServer(conf2)
cat = simpleUser('cat', catKeyring, s1)
tac = simpleUser('tac', tacKeyring, s1)
toc = simpleUser('toc', tocKeyring, s1)
await s1.initDalBmaConnections()
await s2.initDalBmaConnections()
await cat.createIdentity();
await tac.createIdentity();
await cat.cert(tac);
await tac.cert(cat);
await cat.join();
await tac.join();
b0 = await s1.commit({ time: now })
b1 = await s1.commit({ time: now })
b2 = await s1.commit({ time: now })
await s2.writeBlock(b0)
await s2.writeBlock(b1)
await s2.writeBlock(b2)
await s2.waitToHaveBlock(2)
})
after(() => wss.close())
it('should have b#2 on s1 and s2', async () => {
const currentS1 = await s1.BlockchainService.current()
const currentS2 = await s2.BlockchainService.current()
assert.equal(currentS1.number, 2)
assert.equal(currentS2.number, 2)
})
it('should be able to pull the docpool', async () => {
await toc.createIdentity();
await cat.cert(toc);
await toc.join();
const network = await simpleWS2PNetwork(s1, s2)
wss = network.wss
cluster2 = network.cluster2
ProverDependency.duniter.methods.hookServer(s1._server)
await cluster2.pullDocpool()
await s2.expect('/wot/lookup/toc', (res:any) => {
assert.equal(res.results.length, 1)
assert.equal(res.results[0].uids[0].others.length, 1)
})
const currentS1 = await s1.BlockchainService.current()
const currentS2 = await s2.BlockchainService.current()
assert.equal(currentS1.number, 2)
assert.equal(currentS2.number, 2)
})
})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment