diff --git a/app/lib/dto/PeerDTO.ts b/app/lib/dto/PeerDTO.ts index ac62e617388aaa6b0d8ffb494e7914234285ccb3..98af4a9b0ee952503d02032b3e92212534593af8 100644 --- a/app/lib/dto/PeerDTO.ts +++ b/app/lib/dto/PeerDTO.ts @@ -95,9 +95,9 @@ export class PeerDTO implements Cloneable { } getWS2P() { - let api:any = null; - this.endpoints.forEach((ep) => { - const matches = !api && ep.match(CommonConstants.WS2P_REGEXP) + let api:{ uuid:string, host:string, port:number }|null = null + for (const ep of this.endpoints) { + const matches:any = !api && ep.match(CommonConstants.WS2P_REGEXP) if (matches) { api = { uuid: matches[1], @@ -105,8 +105,8 @@ export class PeerDTO implements Cloneable { port: parseInt(matches[3]) || 0 } } - }) - return api || null + } + return api || null } getDns() { diff --git a/app/modules/peersignal.ts b/app/modules/peersignal.ts index 6b810cbe5bde41f18618abc09b88a5dd8a412ae9..f68201de4df59593eb35fe22eda5366c4d12f3ac 100644 --- a/app/modules/peersignal.ts +++ b/app/modules/peersignal.ts @@ -30,6 +30,7 @@ class PeerSignalEmitter { // The interval duration const SIGNAL_INTERVAL = 1000 * this.conf.avgGenTime * constants.NETWORK.STATUS_INTERVAL.UPDATE; + const SIGNAL_INITIAL_DELAY = 1000 * 60 // We eventually clean an existing interval if (this.INTERVAL) @@ -47,8 +48,8 @@ class PeerSignalEmitter { }) }, SIGNAL_INTERVAL) - // Launches it a first time, immediately - await this.server.PeeringService.generateSelfPeer(this.conf, SIGNAL_INTERVAL) + // Launches it a first time few seconds after startup + setTimeout(() => this.server.PeeringService.generateSelfPeer(this.conf, SIGNAL_INTERVAL - SIGNAL_INITIAL_DELAY), SIGNAL_INITIAL_DELAY) } stopService() { diff --git a/app/modules/ws2p/lib/WS2PClient.ts b/app/modules/ws2p/lib/WS2PClient.ts index 9046fab2cf42534e8487b0bb5a568fe603e897a9..f30c105037481f0a285c29aaadb3a9f9c69d2d55 100644 --- a/app/modules/ws2p/lib/WS2PClient.ts +++ b/app/modules/ws2p/lib/WS2PClient.ts @@ -9,13 +9,13 @@ export class WS2PClient { private constructor(public connection:WS2PConnection) {} - static async connectTo(server:Server, host:string, port:number, messageHandler:WS2PMessageHandler, expectedPub:string) { + static async connectTo(server:Server, host:string, port:number, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) { const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec) const c = WS2PConnection.newConnectionToAddress( [host, port].join(':'), messageHandler, - new WS2PPubkeyLocalAuth(server.conf.currency , k2), - new WS2PPubkeyRemoteAuth(server.conf.currency, k2), + new WS2PPubkeyLocalAuth(server.conf.currency , k2, allowKey), + new WS2PPubkeyRemoteAuth(server.conf.currency, k2, allowKey), { connectionTimeout: WS2PConstants.REQUEST_TIMEOUT, requestTimeout: WS2PConstants.REQUEST_TIMEOUT diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 7caf0e249312740ab88ae90457df6b747d4ce331..85724e93cb0660f7ab25989933b1cfb5fc3770a5 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -207,11 +207,14 @@ export class WS2PCluster { return this.ws2pServer ? this.ws2pServer.getConnexions().length : 0 } - async connect(host: string, port: number, messageHandler:WS2PMessageHandler, expectedPub:string): Promise<WS2PConnection> { + async connect(host: string, port: number, messageHandler:WS2PMessageHandler, expectedPub:string, ws2pEndpointUUID:string = ""): Promise<WS2PConnection> { const uuid = nuuid.v4() let pub = "--------" try { - const ws2pc = await WS2PClient.connectTo(this.server, host, port, messageHandler, expectedPub) + const ws2pc = await WS2PClient.connectTo(this.server, host, port, messageHandler, expectedPub, (pub:string) => { + const connectedPubkeys = this.getConnectedPubkeys() + return this.acceptPubkey(expectedPub, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []), ws2pEndpointUUID) + }) this.ws2pClients[uuid] = ws2pc pub = ws2pc.connection.pubkey ws2pc.connection.closed.then(() => { @@ -231,6 +234,7 @@ export class WS2PCluster { ws2p: 'connected', to: { host, port, pubkey: pub } }) + await this.server.dal.setPeerUP(pub) return ws2pc.connection } catch (e) { this.server.logger.info('WS2P: Could not connect to peer %s using `WS2P %s %s: %s`', pub.slice(0, 8), host, port, (e && e.message || e)) @@ -241,7 +245,9 @@ export class WS2PCluster { async connectToWS2Peers() { const potentials = await this.server.dal.getWS2Peers() const peers:PeerDTO[] = potentials.map((p:any) => PeerDTO.fromJSONObject(p)) - const prefered = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || [] + const prefered = ((this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || []).slice() // Copy + // Our key is also a prefered one, so we connect to our siblings + prefered.push(this.server.conf.pair.pub) peers.sort((a, b) => { const aIsPrefered = prefered.indexOf(a.pubkey) !== -1 const bIsPrefered = prefered.indexOf(b.pubkey) !== -1 @@ -257,9 +263,9 @@ export class WS2PCluster { while (i < peers.length && this.clientsCount() < this.maxLevel1Size) { const p = peers[i] const api = p.getWS2P() - if (p.pubkey !== this.server.conf.pair.pub) { + if (api) { try { - await this.connect(api.host, api.port, this.messageHandler, p.pubkey) + await this.connect(api.host, api.port, this.messageHandler, p.pubkey, api.uuid) } catch (e) { this.server.logger.debug('WS2P: init: failed connection') } @@ -277,10 +283,10 @@ export class WS2PCluster { if (data.endpoints) { const peer = PeerDTO.fromJSONObject(data) const ws2pEnpoint = peer.getWS2P() - if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) { + if (ws2pEnpoint) { // Check if already connected to the pubkey (in any way: server or client) const connectedPubkeys = this.getConnectedPubkeys() - const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || [])) + const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []), ws2pEnpoint.uuid) if (shouldAccept) { await this.connect(ws2pEnpoint.host, ws2pEnpoint.port, this.messageHandler, peer.pubkey) await this.trimClientConnections() @@ -367,8 +373,10 @@ export class WS2PCluster { uuids = _.shuffle(uuids) for (const uuid of uuids) { const client = this.ws2pClients[uuid] - const isMember = await this.server.dal.isMember(client.connection.pubkey) - if (!isMember && !disconnectedOne) { + const pub = client.connection.pubkey + const isNotOurself = pub !== this.server.conf.pair.pub + const isMember = await this.server.dal.isMember(pub) + if (isNotOurself && !isMember && !disconnectedOne) { client.connection.close() await client.connection.closed disconnectedOne = true @@ -386,7 +394,9 @@ export class WS2PCluster { uuids = _.shuffle(uuids) for (const uuid of uuids) { const client = this.ws2pClients[uuid] - if (!disconnectedOne && this.getPreferedNodes().indexOf(client.connection.pubkey) === -1) { + const pub = client.connection.pubkey + const isNotOurself = pub !== this.server.conf.pair.pub + if (isNotOurself && !disconnectedOne && this.getPreferedNodes().indexOf(pub) === -1) { client.connection.close() disconnectedOne = true await client.connection.closed @@ -425,12 +435,17 @@ export class WS2PCluster { connectedPubkeys:string[], getConcurrentConnexionsCount:()=>number, maxConcurrentConnexionsSize:number, - priorityKeys:string[] + priorityKeys:string[], + targetWS2PUID = "" ) { let accept = priorityKeys.indexOf(pub) !== -1 if (!accept && connectedPubkeys.indexOf(pub) === -1) { // Do we have room? - if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { + if (this.isThisNode(pub, targetWS2PUID)) { + // We do not connect to local host + return false + } + else if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { // Yes: just connect to it accept = true } @@ -458,10 +473,25 @@ export class WS2PCluster { } } } + } else { + // The pubkey is already connected: we accept only self nodes, and they have a supreme priority (these are siblings) + if (targetWS2PUID) { + if (this.isSiblingNode(pub, targetWS2PUID)) { + accept = true + } + } } return accept } + isThisNode(pub:string, uuid:string) { + return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === uuid) + } + + isSiblingNode(pub:string, uuid:string) { + return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid !== uuid) + } + async getLevel1Connections() { const all:WS2PConnection[] = [] for (const uuid of Object.keys(this.ws2pClients)) { diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index 43c628e8c7dc35018aa0fd93faa0d459bc9a8c1e..b8af839d53620af1de09993a4407ce805080979f 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -106,6 +106,9 @@ export class WS2PServer extends events.EventEmitter { }) await this.trimConnections() + + await this.server.dal.setPeerUP(c.pubkey) + } catch (e) { this.server.logger.warn('WS2P: cannot connect to incoming WebSocket connection: %s', e) } @@ -126,15 +129,6 @@ export class WS2PServer extends events.EventEmitter { } } } - // Disconnect eventual self - if (this.connections.length > this.maxLevel2Size) { - for (const c of this.connections) { - if (c.pubkey === this.server.conf.pair.pub) { - c.close() - this.removeConnection(c) - } - } - } // Disconnect members while (this.connections.length > this.maxLevel2Size) { for (const c of this.connections) {