diff --git a/app/modules/ws2p/lib/WS2PClient.ts b/app/modules/ws2p/lib/WS2PClient.ts index 0bc7bc1160036877f3754dd81eb5726413f14fc9..569609f6e1c06bbdef4b7c8c9fadfcf48569972c 100644 --- a/app/modules/ws2p/lib/WS2PClient.ts +++ b/app/modules/ws2p/lib/WS2PClient.ts @@ -13,7 +13,7 @@ export class WS2PClient { private constructor(public connection:WS2PConnection) {} - static async connectTo(server:Server, fullEndpointAddress:string, expectedWS2PUID:string, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) { + static async connectTo(server:Server, fullEndpointAddress:string, uuid:string, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) { const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec) const c = WS2PConnection.newConnectionToAddress( fullEndpointAddress, @@ -25,8 +25,7 @@ export class WS2PClient { connectionTimeout: WS2PConstants.REQUEST_TIMEOUT, requestTimeout: WS2PConstants.REQUEST_TIMEOUT }, - expectedPub, - expectedWS2PUID + expectedPub ) const singleWriteProtection = new WS2PSingleWriteStream() const streamer = new WS2PStreamer(c) diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index b89408190d6059ede8769bd429ca89bb1560cdec..f41ad9cb297093ab957e04d9a4e6652266a79e27 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -267,7 +267,7 @@ export class WS2PCluster { } this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => { const privilegedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[] - return this.acceptPubkey(pubkey, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly)) + return this.acceptPubkey(pubkey, connectedPubkeys, () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.privilegedOnly || false)) }, this.messageHandler) this.host = host this.port = port @@ -298,9 +298,8 @@ export class WS2PCluster { const fullEndpointAddress = WS2PCluster.getFullAddress(host, port, path) const ws2pc = await WS2PClient.connectTo(this.server, fullEndpointAddress, ws2pEndpointUUID, messageHandler, expectedPub, (pub:string) => { const connectedPubkeys = this.getConnectedPubkeys() - const connectedWS2PUID = this.getConnectedWS2PUID() const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[] - return this.acceptPubkey(expectedPub, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID) + return this.acceptPubkey(expectedPub, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID) }) this.ws2pClients[uuid] = ws2pc pub = ws2pc.connection.pubkey @@ -404,11 +403,10 @@ export class WS2PCluster { if (ws2pEnpoint) { // Check if already connected to the pubkey (in any way: server or client) const connectedPubkeys = this.getConnectedPubkeys() - const connectedWS2PUID = this.getConnectedWS2PUID() const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[] - const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid) + const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid) if (shouldAccept && (!this.server.conf.ws2p || ws2pEnpoint.uuid !== this.server.conf.ws2p.uuid || peer.pubkey !== this.server.conf.pair.pub || ws2pEnpoint.uuid === '11111111')) { - await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey, ws2pEnpoint.uuid) + await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey) await this.trimClientConnections() } } @@ -575,75 +573,64 @@ export class WS2PCluster { protected async acceptPubkey( pub:string, connectedPubkeys:string[], - connectedWS2PUID:string[], getConcurrentConnexionsCount:()=>number, maxConcurrentConnexionsSize:number, priorityKeys:string[], priorityKeysOnly:boolean, targetWS2PUID = "" ) { - // We need ws2pServer instance - if (!this.ws2pServer) { - return false - } - - // We do not accept oneself connetion - if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) { - return false - } - // We do not accept banned keys if (this.banned[pub])Â { this.server.logger.warn('Connection to %s refused, reason: %s', pub.slice(0, 8), this.banned[pub]) return false } - - // Is priority key ? - let isPriorityKey = priorityKeys.indexOf(pub) !== -1 - - // We do not accept forbidden keys - if (priorityKeysOnly && !isPriorityKey && this.server.conf.pair.pub !== pub) { - return false - } - - // Is member key ? - const isMemberPeer = await this.server.dal.isMember(pub) - - // Do we have room? - if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { - // Yes: just connect to it - return true - } else if (this.server.conf.pair.pub === pub) { - // We always accept self nodes, and they have a supreme priority (these are siblings) - if (targetWS2PUID !== "") { - if (this.isNewSiblingNode(pub, targetWS2PUID, connectedWS2PUID)) { - return true - } + let accept = priorityKeys.indexOf(pub) !== -1 + if (!accept && !priorityKeysOnly && connectedPubkeys.indexOf(pub) === -1) { + // Do we have room? + if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) { + accept = false } - } - else if (connectedPubkeys.indexOf(pub) === -1) - { - let minPriorityLevel = WS2PConstants.MAX_PRIORITY_LEVEL - for (const connectedPubkey of connectedPubkeys) { - let connectedPubkeyPriorityLevel = this.ws2pServer.keyPriorityLevel(connectedPubkey, priorityKeys) - if (connectedPubkeyPriorityLevel < minPriorityLevel) { - minPriorityLevel = connectedPubkeyPriorityLevel + else if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { + // Yes: just connect to it + accept = true + } + else { + // No: let's verify some peer has a lower priority + if (connectedPubkeys.indexOf(this.server.conf.pair.pub) !== -1) { + // Yes, we are connected to ourself. Let's replace this connexion + accept = true + } + else { + // Does this node have the priority over at least one node? + const isMemberPeer = await this.server.dal.isMember(pub) + if (isMemberPeer) { + // The node may have the priority over at least 1 other node + let i = 0, existsOneNonMemberNode = false + while (!existsOneNonMemberNode && i < connectedPubkeys.length) { + const isAlsoAMemberPeer = await this.server.dal.isMember(connectedPubkeys[i]) + existsOneNonMemberNode = !isAlsoAMemberPeer + i++ + } + if (existsOneNonMemberNode) { + // The node has the priority over a non-member peer: try to connect + accept = true + } + } } } - if (this.ws2pServer.keyPriorityLevel(pub, priorityKeys) > minPriorityLevel) { - return true + } else { + // The pubkey is already connected: we accept only self nodes, and they have a supreme priority (these are siblings) + if (targetWS2PUID) { + if (this.isSiblingNode(pub, targetWS2PUID)) { + accept = true + } } } - return false + return accept } - isNewSiblingNode(pub:string, targetWS2PUID:string, connectedWS2PUID:string[]) { - for (const uuid of connectedWS2PUID) { - if (uuid === targetWS2PUID) { - return false - } - } - return true + isSiblingNode(pub:string, uuid:string) { + return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid !== uuid) } async getLevel1Connections() { @@ -755,12 +742,6 @@ export class WS2PCluster { return clients.concat(served) } - getConnectedWS2PUID() { - const clients = Object.keys(this.ws2pClients).map(k => this.ws2pClients[k].connection.uuid) - const served = this.ws2pServer ? this.ws2pServer.getConnexions().map(c => c.uuid) : [] - return clients.concat(served) - } - banConnection(c:WS2PConnection, reason:string) { this.server.logger.warn('Banning connections of %s for %ss, reason: %s', c.pubkey.slice(0, 8), WS2PConstants.BAN_DURATION_IN_SECONDS, reason) if (c.pubkey) { diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index eed3dfff8f4c7e288b753de4f6f2e4930d49eb76..0595e5626b4e307fffdd2be5d149ce6d33323bd7 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -256,8 +256,7 @@ export class WS2PConnection { connectionTimeout: REQUEST_TIMEOUT_VALUE, requestTimeout: REQUEST_TIMEOUT_VALUE }, - private expectedPub:string = "", - private expectedWS2PUID:string = "" + private expectedPub:string = "" ) { this.connectedp = new Promise((resolve, reject) => { this.connectedResolve = resolve @@ -278,8 +277,7 @@ export class WS2PConnection { connectionTimeout: REQUEST_TIMEOUT_VALUE, requestTimeout: REQUEST_TIMEOUT_VALUE }, - expectedPub:string = "", - expectedWS2PUID:string = "") { + expectedPub:string = "") { if (address.match(WS2PConstants.FULL_ADDRESS_ONION_REGEX)) { options = { connectionTimeout: WS2PConstants.CONNEXION_TOR_TIMEOUT, @@ -294,7 +292,7 @@ export class WS2PConnection { websocket.on('close', () => res()) }) websocket.on('error', () => websocket.close()) - return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub, expectedWS2PUID) + return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub) } static newConnectionFromWebSocketServer( @@ -321,10 +319,6 @@ export class WS2PConnection { return this.remoteAuth.getPubkey() } - get uuid() { - return this.expectedWS2PUID - } - get nbRequests() { return this.nbRequestsCount } diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index d11c3298d1fee8cddb64a3812e20259fb7d5332d..ae79815fe93af6f0c24ebb355e7d404681165748 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -116,8 +116,8 @@ export class WS2PServer extends events.EventEmitter { } }) }) - let privilegedKeys = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[] - await this.removeLowPriorityConnection(privilegedKeys) + + await this.trimConnections() await this.server.dal.setPeerUP(c.pubkey) @@ -128,26 +128,44 @@ export class WS2PServer extends events.EventEmitter { }) } - async removeLowPriorityConnection(privilegedKeys:string[]) { - let lowPriorityConnection:WS2PConnection = this.connections[0] - let minPriorityLevel = this.keyPriorityLevel(lowPriorityConnection.pubkey, privilegedKeys) - for (const c of this.connections) { - if (c !== lowPriorityConnection) { - let cPriorityLevel = this.keyPriorityLevel(c.pubkey, privilegedKeys) - if (cPriorityLevel < minPriorityLevel) { - lowPriorityConnection = c - minPriorityLevel = cPriorityLevel + async trimConnections() { + /*** OVERFLOW TRIMMING ***/ + let disconnectedOne = true + // Disconnect non-members + while (disconnectedOne && this.connections.length > this.maxLevel2Size) { + disconnectedOne = false + for (const c of this.connections) { + const isMember = await this.server.dal.isMember(c.pubkey) + if (!isMember && !disconnectedOne) { + c.close() + this.removeConnection(c) + disconnectedOne = true + } + } + } + // Disconnect members + while (this.connections.length > this.maxLevel2Size) { + for (const c of this.connections) { + c.close() + this.removeConnection(c) + } + } + /*** DUPLICATES TRIMMING ***/ + disconnectedOne = true + while (disconnectedOne) { + disconnectedOne = false + const pubkeysFound = [] + for (const c of this.connections) { + if (pubkeysFound.indexOf(c.pubkey) !== -1) { + c.close() + this.removeConnection(c) + disconnectedOne = true + } + else if (c.pubkey !== this.server.conf.pair.pub) { + pubkeysFound.push(c.pubkey) } } } - this.removeConnection(lowPriorityConnection) - } - - keyPriorityLevel(pubkey:string, privilegedKeys:string[]) { - let priorityLevel = (this.server.dal.isMember(pubkey)) ? 1:0 - priorityLevel += (privilegedKeys.indexOf(pubkey) !== -1) ? 2:0 - priorityLevel += (this.server.conf.pair.pub === pubkey) ? 4:0 - return priorityLevel } private removeConnection(c:WS2PConnection) { diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index cc75af0325e413ccdd5581caf9421332125f3e40..51972d7f7d1dca2c5406d8f7af8fea58bfe740a0 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -19,7 +19,6 @@ export const WS2PConstants = { MAX_LEVEL_1_PEERS: 10, MAX_LEVEL_2_PEERS: 10, CONNECTIONS_LOW_LEVEL: 3, - MAX_PRIORITY_LEVEL: 7, BAN_DURATION_IN_SECONDS: 120, BAN_ON_REPEAT_THRESHOLD: 5,