From 9df2429c198ca89a1d5a0f92c8522ff009caf2df Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Thu, 7 Sep 2017 17:45:04 +0200 Subject: [PATCH] [enh] #1084 WS2P: add automatic connection on startup --- app/modules/ws2p/index.ts | 39 +++++++++++++-- app/modules/ws2p/lib/WS2PCluster.ts | 74 ++++++++++++++++++++++++++++- app/modules/ws2p/lib/constants.ts | 7 ++- 3 files changed, 113 insertions(+), 7 deletions(-) diff --git a/app/modules/ws2p/index.ts b/app/modules/ws2p/index.ts index aedf27ec5..2950107b8 100644 --- a/app/modules/ws2p/index.ts +++ b/app/modules/ws2p/index.ts @@ -4,6 +4,9 @@ import {Server} from "../../../server" import * as stream from "stream" import {WS2PCluster} from "./lib/WS2PCluster" import {WS2PUpnp} from "./lib/ws2p-upnp" +import {CommonConstants} from "../../lib/common-libs/constants" + +const nuuid = require('node-uuid') export const WS2PDependency = { duniter: { @@ -21,7 +24,10 @@ export const WS2PDependency = { onLoading: async (conf:WS2PConfDTO, program:any, logger:any) => { - conf.ws2p = conf.ws2p || {} + conf.ws2p = conf.ws2p || { uuid: nuuid.v4().slice(0,8) } + + // For config which does not have uuid filled in + conf.ws2p.uuid = conf.ws2p.uuid || nuuid.v4().slice(0,8) if (program.ws2pHost !== undefined) conf.ws2p.host = program.ws2pHost if (program.ws2pPort !== undefined) conf.ws2p.port = parseInt(program.ws2pPort) @@ -47,7 +53,8 @@ export const WS2PDependency = { service: { input: (server:Server, conf:WS2PConfDTO, logger:any) => { const api = new WS2PAPI(server, conf, logger) - server.addEndpointsDefinitions(() => api.getEndpoint()) + server.ws2pCluster = api.getCluster() + server.addEndpointsDefinitions(async () => api.getEndpoint()) server.addWrongEndpointFilter((endpoints:string[]) => getWrongEndpoints(endpoints, conf)) return api } @@ -73,7 +80,11 @@ export class WS2PAPI extends stream.Transform { private conf:WS2PConfDTO, private logger:any) { super({ objectMode: true }) - this.cluster = new WS2PCluster(server) + this.cluster = WS2PCluster.plugOn(server) + } + + getCluster() { + return this.cluster } startService = async () => { @@ -104,10 +115,14 @@ export class WS2PAPI extends stream.Transform { this.logger.warn(e); } } + + // In any case, we trigger the Level 1 connection + await this.cluster.startCrawling() } stopService = async () => { if (this.cluster) { + await this.cluster.stopCrawling() await this.cluster.close() } if (this.upnpAPI) { @@ -116,6 +131,22 @@ export class WS2PAPI extends stream.Transform { } async getEndpoint() { - return this.upnpAPI ? this.upnpAPI.getRemoteEndpoint() : '' + if (this.upnpAPI && this.server.conf.ws2p) { + const config = this.upnpAPI.getCurrentConfig() + return !config ? '' : ['WS2P', this.server.conf.ws2p.uuid, config.remotehost, config.port].join(' ') + } + else if (this.server.conf.ws2p + && this.server.conf.ws2p.uuid + && this.server.conf.ws2p.remotehost + && this.server.conf.ws2p.remoteport) { + return ['WS2P', + this.server.conf.ws2p.uuid, + this.server.conf.ws2p.remotehost, + this.server.conf.ws2p.remoteport + ].join(' ') + } + else { + return '' + } } } \ No newline at end of file diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 4699cc86c..b8a312d7a 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -6,7 +6,10 @@ import {randomPick} from "../../../lib/common-libs/randomPick" import {CrawlerConstants} from "../../crawler/lib/constants" import {WS2PBlockPuller} from "./WS2PBlockPuller" import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller" +import {WS2PConstants} from "./constants" +import {PeerDTO} from "../../../lib/dto/PeerDTO" +const es = require('event-stream') const nuuid = require('node-uuid') export class WS2PCluster { @@ -15,8 +18,16 @@ export class WS2PCluster { private ws2pClients:{[k:string]:WS2PClient} = {} private host:string|null = null private port:number|null = null + private syncBlockInterval:NodeJS.Timer + private syncDocpoolInterval:NodeJS.Timer - constructor(private server:Server) {} + private constructor(private server:Server) {} + + static plugOn(server:Server) { + const cluster = new WS2PCluster(server) + server.ws2pCluster = cluster + return cluster + } async listen(host:string, port:number) { if (this.ws2pServer) { @@ -40,14 +51,48 @@ export class WS2PCluster { return Object.keys(this.ws2pClients).length } + servedCount() { + return this.ws2pServer ? this.ws2pServer.getConnexions().length : 0 + } + async connect(host: string, port: number): Promise<WS2PConnection> { const uuid = nuuid.v4() const ws2pc = await WS2PClient.connectTo(this.server, host, port) this.ws2pClients[uuid] = ws2pc ws2pc.connection.closed.then(() => { + this.server.logger.info('WS2P: connection [%s `WS2P %s %s`] has been closed', ws2pc.connection.pubkey.slice(0, 8), host, port) delete this.ws2pClients[uuid] }) - return ws2pc.connection + try { + this.server.logger.info('WS2P: connected to peer %s using `WS2P %s %s`!', ws2pc.connection.pubkey.slice(0, 8), host, port) + return ws2pc.connection + } catch (e) { + this.server.logger.info('WS2P: Could not connect to peer %s using `WS2P %s %s: %s`', ws2pc.connection.pubkey.slice(0, 8), host, port, (e && e.message || e)) + throw e + } + } + + async connectToWS2Peers() { + const potentials = await this.server.dal.getWS2Peers() + const peers:PeerDTO[] = potentials.map((p:any) => PeerDTO.fromJSONObject(p)) + let i = 0 + while (i < peers.length && this.clientsCount() < WS2PConstants.MAX_LEVEL_1_PEERS) { + const p = peers[i] + const api = p.getWS2P() + await this.connect(api.host, api.port) + i++ + } + + // Also listen for network updates, and connect to new nodes + this.server.pipe(es.mapSync(async (data:any) => { + if (data.endpoints) { + const peer = PeerDTO.fromJSONObject(data) + const ws2pEnpoint = peer.getWS2P() + if (ws2pEnpoint) { + this.connect(ws2pEnpoint.host, ws2pEnpoint.port) + } + } + })) } async getAllConnections() { @@ -58,6 +103,31 @@ export class WS2PCluster { return all } + async startCrawling() { + // For blocks + if (this.syncBlockInterval) + clearInterval(this.syncBlockInterval); + this.syncBlockInterval = setInterval(() => this.pullBlocks(), 1000 * WS2PConstants.BLOCK_PULLING_INTERVAL) + // Pull blocks right on start + await this.connectToWS2Peers() + await this.pullBlocks() + // For docpool + if (this.syncDocpoolInterval) + clearInterval(this.syncDocpoolInterval); + this.syncDocpoolInterval = setInterval(() => this.pullDocpool(), 1000 * WS2PConstants.DOCPOOL_PULLING_INTERVAL) + // The first pulling occurs 10 minutes after the start + setTimeout(() => this.pullDocpool(), WS2PConstants.SANDBOX_FIRST_PULL_DELAY) + } + + async stopCrawling() { + if (this.syncBlockInterval) { + clearInterval(this.syncBlockInterval) + } + if (this.syncDocpoolInterval) { + clearInterval(this.syncDocpoolInterval) + } + } + async pullBlocks() { const connections = await this.getAllConnections() const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index e4fd5f3c7..8d250ccd5 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -3,6 +3,11 @@ export const WS2PConstants = { WS2P_UPNP_TTL: 600, WS2P_PORTS_START: 20900, WS2P_PORTS_END: 20999, - WS2P_UPNP_INTERVAL: 300 + WS2P_UPNP_INTERVAL: 300, + BLOCK_PULLING_INTERVAL: 300 * 2, // 10 minutes + DOCPOOL_PULLING_INTERVAL: 3600 * 4, // 4 hours + SANDBOX_FIRST_PULL_DELAY: 300 * 2, // 10 minutes after the start + + MAX_LEVEL_1_PEERS: 10, } \ No newline at end of file -- GitLab