diff --git a/app/modules/ws2p/lib/WS2PClient.ts b/app/modules/ws2p/lib/WS2PClient.ts index 569609f6e1c06bbdef4b7c8c9fadfcf48569972c..0bc7bc1160036877f3754dd81eb5726413f14fc9 100644 --- a/app/modules/ws2p/lib/WS2PClient.ts +++ b/app/modules/ws2p/lib/WS2PClient.ts @@ -13,7 +13,7 @@ export class WS2PClient { private constructor(public connection:WS2PConnection) {} - static async connectTo(server:Server, fullEndpointAddress:string, uuid:string, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) { + static async connectTo(server:Server, fullEndpointAddress:string, expectedWS2PUID:string, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) { const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec) const c = WS2PConnection.newConnectionToAddress( fullEndpointAddress, @@ -25,7 +25,8 @@ export class WS2PClient { connectionTimeout: WS2PConstants.REQUEST_TIMEOUT, requestTimeout: WS2PConstants.REQUEST_TIMEOUT }, - expectedPub + expectedPub, + expectedWS2PUID ) const singleWriteProtection = new WS2PSingleWriteStream() const streamer = new WS2PStreamer(c) diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index f41ad9cb297093ab957e04d9a4e6652266a79e27..b89408190d6059ede8769bd429ca89bb1560cdec 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -267,7 +267,7 @@ export class WS2PCluster { } this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => { const privilegedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[] - return this.acceptPubkey(pubkey, connectedPubkeys, () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.privilegedOnly || false)) + return this.acceptPubkey(pubkey, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly)) }, this.messageHandler) this.host = host this.port = port @@ -298,8 +298,9 @@ export class WS2PCluster { const fullEndpointAddress = WS2PCluster.getFullAddress(host, port, path) const ws2pc = await WS2PClient.connectTo(this.server, fullEndpointAddress, ws2pEndpointUUID, messageHandler, expectedPub, (pub:string) => { const connectedPubkeys = this.getConnectedPubkeys() + const connectedWS2PUID = this.getConnectedWS2PUID() const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[] - return this.acceptPubkey(expectedPub, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID) + return this.acceptPubkey(expectedPub, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID) }) this.ws2pClients[uuid] = ws2pc pub = ws2pc.connection.pubkey @@ -403,10 +404,11 @@ export class WS2PCluster { if (ws2pEnpoint) { // Check if already connected to the pubkey (in any way: server or client) const connectedPubkeys = this.getConnectedPubkeys() + const connectedWS2PUID = this.getConnectedWS2PUID() const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[] - const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid) + const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid) if (shouldAccept && (!this.server.conf.ws2p || ws2pEnpoint.uuid !== this.server.conf.ws2p.uuid || peer.pubkey !== this.server.conf.pair.pub || ws2pEnpoint.uuid === '11111111')) { - await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey) + await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey, ws2pEnpoint.uuid) await this.trimClientConnections() } } @@ -573,64 +575,75 @@ export class WS2PCluster { protected async acceptPubkey( pub:string, connectedPubkeys:string[], + connectedWS2PUID:string[], getConcurrentConnexionsCount:()=>number, maxConcurrentConnexionsSize:number, priorityKeys:string[], priorityKeysOnly:boolean, targetWS2PUID = "" ) { + // We need ws2pServer instance + if (!this.ws2pServer) { + return false + } + + // We do not accept oneself connetion + if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) { + return false + } + // We do not accept banned keys if (this.banned[pub]) { this.server.logger.warn('Connection to %s refused, reason: %s', pub.slice(0, 8), this.banned[pub]) return false } - let accept = priorityKeys.indexOf(pub) !== -1 - if (!accept && !priorityKeysOnly && connectedPubkeys.indexOf(pub) === -1) { - // Do we have room? - if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) { - accept = false - } - else if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { - // Yes: just connect to it - accept = true - } - else { - // No: let's verify some peer has a lower priority - if (connectedPubkeys.indexOf(this.server.conf.pair.pub) !== -1) { - // Yes, we are connected to ourself. Let's replace this connexion - accept = true - } - else { - // Does this node have the priority over at least one node? - const isMemberPeer = await this.server.dal.isMember(pub) - if (isMemberPeer) { - // The node may have the priority over at least 1 other node - let i = 0, existsOneNonMemberNode = false - while (!existsOneNonMemberNode && i < connectedPubkeys.length) { - const isAlsoAMemberPeer = await this.server.dal.isMember(connectedPubkeys[i]) - existsOneNonMemberNode = !isAlsoAMemberPeer - i++ - } - if (existsOneNonMemberNode) { - // The node has the priority over a non-member peer: try to connect - accept = true - } - } + + // Is priority key ? + let isPriorityKey = priorityKeys.indexOf(pub) !== -1 + + // We do not accept forbidden keys + if (priorityKeysOnly && !isPriorityKey && this.server.conf.pair.pub !== pub) { + return false + } + + // Is member key ? + const isMemberPeer = await this.server.dal.isMember(pub) + + // Do we have room? + if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { + // Yes: just connect to it + return true + } else if (this.server.conf.pair.pub === pub) { + // We always accept self nodes, and they have a supreme priority (these are siblings) + if (targetWS2PUID !== "") { + if (this.isNewSiblingNode(pub, targetWS2PUID, connectedWS2PUID)) { + return true } } - } else { - // The pubkey is already connected: we accept only self nodes, and they have a supreme priority (these are siblings) - if (targetWS2PUID) { - if (this.isSiblingNode(pub, targetWS2PUID)) { - accept = true + } + else if (connectedPubkeys.indexOf(pub) === -1) + { + let minPriorityLevel = WS2PConstants.MAX_PRIORITY_LEVEL + for (const connectedPubkey of connectedPubkeys) { + let connectedPubkeyPriorityLevel = this.ws2pServer.keyPriorityLevel(connectedPubkey, priorityKeys) + if (connectedPubkeyPriorityLevel < minPriorityLevel) { + minPriorityLevel = connectedPubkeyPriorityLevel } } + if (this.ws2pServer.keyPriorityLevel(pub, priorityKeys) > minPriorityLevel) { + return true + } } - return accept + return false } - isSiblingNode(pub:string, uuid:string) { - return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid !== uuid) + isNewSiblingNode(pub:string, targetWS2PUID:string, connectedWS2PUID:string[]) { + for (const uuid of connectedWS2PUID) { + if (uuid === targetWS2PUID) { + return false + } + } + return true } async getLevel1Connections() { @@ -742,6 +755,12 @@ export class WS2PCluster { return clients.concat(served) } + getConnectedWS2PUID() { + const clients = Object.keys(this.ws2pClients).map(k => this.ws2pClients[k].connection.uuid) + const served = this.ws2pServer ? this.ws2pServer.getConnexions().map(c => c.uuid) : [] + return clients.concat(served) + } + banConnection(c:WS2PConnection, reason:string) { this.server.logger.warn('Banning connections of %s for %ss, reason: %s', c.pubkey.slice(0, 8), WS2PConstants.BAN_DURATION_IN_SECONDS, reason) if (c.pubkey) { diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index 0595e5626b4e307fffdd2be5d149ce6d33323bd7..eed3dfff8f4c7e288b753de4f6f2e4930d49eb76 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -256,7 +256,8 @@ export class WS2PConnection { connectionTimeout: REQUEST_TIMEOUT_VALUE, requestTimeout: REQUEST_TIMEOUT_VALUE }, - private expectedPub:string = "" + private expectedPub:string = "", + private expectedWS2PUID:string = "" ) { this.connectedp = new Promise((resolve, reject) => { this.connectedResolve = resolve @@ -277,7 +278,8 @@ export class WS2PConnection { connectionTimeout: REQUEST_TIMEOUT_VALUE, requestTimeout: REQUEST_TIMEOUT_VALUE }, - expectedPub:string = "") { + expectedPub:string = "", + expectedWS2PUID:string = "") { if (address.match(WS2PConstants.FULL_ADDRESS_ONION_REGEX)) { options = { connectionTimeout: WS2PConstants.CONNEXION_TOR_TIMEOUT, @@ -292,7 +294,7 @@ export class WS2PConnection { websocket.on('close', () => res()) }) websocket.on('error', () => websocket.close()) - return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub) + return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub, expectedWS2PUID) } static newConnectionFromWebSocketServer( @@ -319,6 +321,10 @@ export class WS2PConnection { return this.remoteAuth.getPubkey() } + get uuid() { + return this.expectedWS2PUID + } + get nbRequests() { return this.nbRequestsCount } diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index ae79815fe93af6f0c24ebb355e7d404681165748..d11c3298d1fee8cddb64a3812e20259fb7d5332d 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -116,8 +116,8 @@ export class WS2PServer extends events.EventEmitter { } }) }) - - await this.trimConnections() + let privilegedKeys = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[] + await this.removeLowPriorityConnection(privilegedKeys) await this.server.dal.setPeerUP(c.pubkey) @@ -128,44 +128,26 @@ export class WS2PServer extends events.EventEmitter { }) } - async trimConnections() { - /*** OVERFLOW TRIMMING ***/ - let disconnectedOne = true - // Disconnect non-members - while (disconnectedOne && this.connections.length > this.maxLevel2Size) { - disconnectedOne = false - for (const c of this.connections) { - const isMember = await this.server.dal.isMember(c.pubkey) - if (!isMember && !disconnectedOne) { - c.close() - this.removeConnection(c) - disconnectedOne = true - } - } - } - // Disconnect members - while (this.connections.length > this.maxLevel2Size) { - for (const c of this.connections) { - c.close() - this.removeConnection(c) - } - } - /*** DUPLICATES TRIMMING ***/ - disconnectedOne = true - while (disconnectedOne) { - disconnectedOne = false - const pubkeysFound = [] - for (const c of this.connections) { - if (pubkeysFound.indexOf(c.pubkey) !== -1) { - c.close() - this.removeConnection(c) - disconnectedOne = true - } - else if (c.pubkey !== this.server.conf.pair.pub) { - pubkeysFound.push(c.pubkey) + async removeLowPriorityConnection(privilegedKeys:string[]) { + let lowPriorityConnection:WS2PConnection = this.connections[0] + let minPriorityLevel = this.keyPriorityLevel(lowPriorityConnection.pubkey, privilegedKeys) + for (const c of this.connections) { + if (c !== lowPriorityConnection) { + let cPriorityLevel = this.keyPriorityLevel(c.pubkey, privilegedKeys) + if (cPriorityLevel < minPriorityLevel) { + lowPriorityConnection = c + minPriorityLevel = cPriorityLevel } } } + this.removeConnection(lowPriorityConnection) + } + + keyPriorityLevel(pubkey:string, privilegedKeys:string[]) { + let priorityLevel = (this.server.dal.isMember(pubkey)) ? 1:0 + priorityLevel += (privilegedKeys.indexOf(pubkey) !== -1) ? 2:0 + priorityLevel += (this.server.conf.pair.pub === pubkey) ? 4:0 + return priorityLevel } private removeConnection(c:WS2PConnection) { diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index 51972d7f7d1dca2c5406d8f7af8fea58bfe740a0..cc75af0325e413ccdd5581caf9421332125f3e40 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -19,6 +19,7 @@ export const WS2PConstants = { MAX_LEVEL_1_PEERS: 10, MAX_LEVEL_2_PEERS: 10, CONNECTIONS_LOW_LEVEL: 3, + MAX_PRIORITY_LEVEL: 7, BAN_DURATION_IN_SECONDS: 120, BAN_ON_REPEAT_THRESHOLD: 5,