Skip to content
Snippets Groups Projects
Commit 9df2429c authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] #1084 WS2P: add automatic connection on startup

parent 8ae1e140
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,9 @@ import {Server} from "../../../server" ...@@ -4,6 +4,9 @@ import {Server} from "../../../server"
import * as stream from "stream" import * as stream from "stream"
import {WS2PCluster} from "./lib/WS2PCluster" import {WS2PCluster} from "./lib/WS2PCluster"
import {WS2PUpnp} from "./lib/ws2p-upnp" import {WS2PUpnp} from "./lib/ws2p-upnp"
import {CommonConstants} from "../../lib/common-libs/constants"
const nuuid = require('node-uuid')
export const WS2PDependency = { export const WS2PDependency = {
duniter: { duniter: {
...@@ -21,7 +24,10 @@ export const WS2PDependency = { ...@@ -21,7 +24,10 @@ export const WS2PDependency = {
onLoading: async (conf:WS2PConfDTO, program:any, logger:any) => { onLoading: async (conf:WS2PConfDTO, program:any, logger:any) => {
conf.ws2p = conf.ws2p || {} conf.ws2p = conf.ws2p || { uuid: nuuid.v4().slice(0,8) }
// For config which does not have uuid filled in
conf.ws2p.uuid = conf.ws2p.uuid || nuuid.v4().slice(0,8)
if (program.ws2pHost !== undefined) conf.ws2p.host = program.ws2pHost if (program.ws2pHost !== undefined) conf.ws2p.host = program.ws2pHost
if (program.ws2pPort !== undefined) conf.ws2p.port = parseInt(program.ws2pPort) if (program.ws2pPort !== undefined) conf.ws2p.port = parseInt(program.ws2pPort)
...@@ -47,7 +53,8 @@ export const WS2PDependency = { ...@@ -47,7 +53,8 @@ export const WS2PDependency = {
service: { service: {
input: (server:Server, conf:WS2PConfDTO, logger:any) => { input: (server:Server, conf:WS2PConfDTO, logger:any) => {
const api = new WS2PAPI(server, conf, logger) const api = new WS2PAPI(server, conf, logger)
server.addEndpointsDefinitions(() => api.getEndpoint()) server.ws2pCluster = api.getCluster()
server.addEndpointsDefinitions(async () => api.getEndpoint())
server.addWrongEndpointFilter((endpoints:string[]) => getWrongEndpoints(endpoints, conf)) server.addWrongEndpointFilter((endpoints:string[]) => getWrongEndpoints(endpoints, conf))
return api return api
} }
...@@ -73,7 +80,11 @@ export class WS2PAPI extends stream.Transform { ...@@ -73,7 +80,11 @@ export class WS2PAPI extends stream.Transform {
private conf:WS2PConfDTO, private conf:WS2PConfDTO,
private logger:any) { private logger:any) {
super({ objectMode: true }) super({ objectMode: true })
this.cluster = new WS2PCluster(server) this.cluster = WS2PCluster.plugOn(server)
}
getCluster() {
return this.cluster
} }
startService = async () => { startService = async () => {
...@@ -104,10 +115,14 @@ export class WS2PAPI extends stream.Transform { ...@@ -104,10 +115,14 @@ export class WS2PAPI extends stream.Transform {
this.logger.warn(e); this.logger.warn(e);
} }
} }
// In any case, we trigger the Level 1 connection
await this.cluster.startCrawling()
} }
stopService = async () => { stopService = async () => {
if (this.cluster) { if (this.cluster) {
await this.cluster.stopCrawling()
await this.cluster.close() await this.cluster.close()
} }
if (this.upnpAPI) { if (this.upnpAPI) {
...@@ -116,6 +131,22 @@ export class WS2PAPI extends stream.Transform { ...@@ -116,6 +131,22 @@ export class WS2PAPI extends stream.Transform {
} }
async getEndpoint() { async getEndpoint() {
return this.upnpAPI ? this.upnpAPI.getRemoteEndpoint() : '' if (this.upnpAPI && this.server.conf.ws2p) {
const config = this.upnpAPI.getCurrentConfig()
return !config ? '' : ['WS2P', this.server.conf.ws2p.uuid, config.remotehost, config.port].join(' ')
}
else if (this.server.conf.ws2p
&& this.server.conf.ws2p.uuid
&& this.server.conf.ws2p.remotehost
&& this.server.conf.ws2p.remoteport) {
return ['WS2P',
this.server.conf.ws2p.uuid,
this.server.conf.ws2p.remotehost,
this.server.conf.ws2p.remoteport
].join(' ')
}
else {
return ''
}
} }
} }
\ No newline at end of file
...@@ -6,7 +6,10 @@ import {randomPick} from "../../../lib/common-libs/randomPick" ...@@ -6,7 +6,10 @@ import {randomPick} from "../../../lib/common-libs/randomPick"
import {CrawlerConstants} from "../../crawler/lib/constants" import {CrawlerConstants} from "../../crawler/lib/constants"
import {WS2PBlockPuller} from "./WS2PBlockPuller" import {WS2PBlockPuller} from "./WS2PBlockPuller"
import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller" import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller"
import {WS2PConstants} from "./constants"
import {PeerDTO} from "../../../lib/dto/PeerDTO"
const es = require('event-stream')
const nuuid = require('node-uuid') const nuuid = require('node-uuid')
export class WS2PCluster { export class WS2PCluster {
...@@ -15,8 +18,16 @@ export class WS2PCluster { ...@@ -15,8 +18,16 @@ export class WS2PCluster {
private ws2pClients:{[k:string]:WS2PClient} = {} private ws2pClients:{[k:string]:WS2PClient} = {}
private host:string|null = null private host:string|null = null
private port:number|null = null private port:number|null = null
private syncBlockInterval:NodeJS.Timer
private syncDocpoolInterval:NodeJS.Timer
constructor(private server:Server) {} private constructor(private server:Server) {}
static plugOn(server:Server) {
const cluster = new WS2PCluster(server)
server.ws2pCluster = cluster
return cluster
}
async listen(host:string, port:number) { async listen(host:string, port:number) {
if (this.ws2pServer) { if (this.ws2pServer) {
...@@ -40,14 +51,48 @@ export class WS2PCluster { ...@@ -40,14 +51,48 @@ export class WS2PCluster {
return Object.keys(this.ws2pClients).length return Object.keys(this.ws2pClients).length
} }
servedCount() {
return this.ws2pServer ? this.ws2pServer.getConnexions().length : 0
}
async connect(host: string, port: number): Promise<WS2PConnection> { async connect(host: string, port: number): Promise<WS2PConnection> {
const uuid = nuuid.v4() const uuid = nuuid.v4()
const ws2pc = await WS2PClient.connectTo(this.server, host, port) const ws2pc = await WS2PClient.connectTo(this.server, host, port)
this.ws2pClients[uuid] = ws2pc this.ws2pClients[uuid] = ws2pc
ws2pc.connection.closed.then(() => { ws2pc.connection.closed.then(() => {
this.server.logger.info('WS2P: connection [%s `WS2P %s %s`] has been closed', ws2pc.connection.pubkey.slice(0, 8), host, port)
delete this.ws2pClients[uuid] delete this.ws2pClients[uuid]
}) })
return ws2pc.connection try {
this.server.logger.info('WS2P: connected to peer %s using `WS2P %s %s`!', ws2pc.connection.pubkey.slice(0, 8), host, port)
return ws2pc.connection
} catch (e) {
this.server.logger.info('WS2P: Could not connect to peer %s using `WS2P %s %s: %s`', ws2pc.connection.pubkey.slice(0, 8), host, port, (e && e.message || e))
throw e
}
}
async connectToWS2Peers() {
const potentials = await this.server.dal.getWS2Peers()
const peers:PeerDTO[] = potentials.map((p:any) => PeerDTO.fromJSONObject(p))
let i = 0
while (i < peers.length && this.clientsCount() < WS2PConstants.MAX_LEVEL_1_PEERS) {
const p = peers[i]
const api = p.getWS2P()
await this.connect(api.host, api.port)
i++
}
// Also listen for network updates, and connect to new nodes
this.server.pipe(es.mapSync(async (data:any) => {
if (data.endpoints) {
const peer = PeerDTO.fromJSONObject(data)
const ws2pEnpoint = peer.getWS2P()
if (ws2pEnpoint) {
this.connect(ws2pEnpoint.host, ws2pEnpoint.port)
}
}
}))
} }
async getAllConnections() { async getAllConnections() {
...@@ -58,6 +103,31 @@ export class WS2PCluster { ...@@ -58,6 +103,31 @@ export class WS2PCluster {
return all return all
} }
async startCrawling() {
// For blocks
if (this.syncBlockInterval)
clearInterval(this.syncBlockInterval);
this.syncBlockInterval = setInterval(() => this.pullBlocks(), 1000 * WS2PConstants.BLOCK_PULLING_INTERVAL)
// Pull blocks right on start
await this.connectToWS2Peers()
await this.pullBlocks()
// For docpool
if (this.syncDocpoolInterval)
clearInterval(this.syncDocpoolInterval);
this.syncDocpoolInterval = setInterval(() => this.pullDocpool(), 1000 * WS2PConstants.DOCPOOL_PULLING_INTERVAL)
// The first pulling occurs 10 minutes after the start
setTimeout(() => this.pullDocpool(), WS2PConstants.SANDBOX_FIRST_PULL_DELAY)
}
async stopCrawling() {
if (this.syncBlockInterval) {
clearInterval(this.syncBlockInterval)
}
if (this.syncDocpoolInterval) {
clearInterval(this.syncDocpoolInterval)
}
}
async pullBlocks() { async pullBlocks() {
const connections = await this.getAllConnections() const connections = await this.getAllConnections()
const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT)
......
...@@ -3,6 +3,11 @@ export const WS2PConstants = { ...@@ -3,6 +3,11 @@ export const WS2PConstants = {
WS2P_UPNP_TTL: 600, WS2P_UPNP_TTL: 600,
WS2P_PORTS_START: 20900, WS2P_PORTS_START: 20900,
WS2P_PORTS_END: 20999, WS2P_PORTS_END: 20999,
WS2P_UPNP_INTERVAL: 300 WS2P_UPNP_INTERVAL: 300,
BLOCK_PULLING_INTERVAL: 300 * 2, // 10 minutes
DOCPOOL_PULLING_INTERVAL: 3600 * 4, // 4 hours
SANDBOX_FIRST_PULL_DELAY: 300 * 2, // 10 minutes after the start
MAX_LEVEL_1_PEERS: 10,
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment