diff --git a/app/lib/dto/ConfDTO.ts b/app/lib/dto/ConfDTO.ts index acaf52cdebddbde008216cc876f2c85195bcf89a..d9d8b744b71fc735715eff4b2241412110430ed3 100644 --- a/app/lib/dto/ConfDTO.ts +++ b/app/lib/dto/ConfDTO.ts @@ -66,6 +66,8 @@ export interface WS2PConfDTO { remoteport?: number|null port?: number host?: string + preferedNodes?: string[] + alwaysAccept?: string[] } } @@ -132,6 +134,8 @@ export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO, remoteport?: number|null port?: number host?: string + preferedNodes?: string[] + alwaysAccept?: string[] } ) {} diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 84def3f8b341ac56819c5ee86823634fd01bb5ad..ade2cb4bf61c82c0d41f51ad7e1e6768e6a1b4fc 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -12,6 +12,7 @@ import {GlobalFifoPromise} from "../../../service/GlobalFifoPromise" const es = require('event-stream') const nuuid = require('node-uuid') +const _ = require('underscore') export class WS2PCluster { @@ -22,6 +23,7 @@ export class WS2PCluster { private syncBlockInterval:NodeJS.Timer private syncDocpoolInterval:NodeJS.Timer private fifo:GlobalFifoPromise = new GlobalFifoPromise() + private maxLevel1Size = WS2PConstants.MAX_LEVEL_1_PEERS private constructor(private server:Server) {} @@ -31,11 +33,30 @@ export class WS2PCluster { return cluster } + set maxLevel1Peers(newValue:number) { + this.maxLevel1Size = Math.max(newValue, 0) || 0 + } + + set maxLevel2Peers(newValue:number) { + if (this.ws2pServer) { + this.ws2pServer.maxLevel2Peers = Math.max(newValue, 0) + } + } + + get maxLevel2Peers() { + if (this.ws2pServer) { + return this.ws2pServer.maxLevel2Peers || 0 + } + return 0 + } + async listen(host:string, port:number) { if (this.ws2pServer) { await this.ws2pServer.close() } - this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => this.acceptPubkey(pubkey, connectedPubkeys)) + this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => { + return this.acceptPubkey(pubkey, connectedPubkeys, () => this.servedCount(), this.maxLevel2Peers, (this.server.conf.ws2p && this.server.conf.ws2p.alwaysAccept || [])) + }) this.host = host this.port = port return this.ws2pServer @@ -63,7 +84,15 @@ export class WS2PCluster { this.ws2pClients[uuid] = ws2pc ws2pc.connection.closed.then(() => { this.server.logger.info('WS2P: connection [%s `WS2P %s %s`] has been closed', ws2pc.connection.pubkey.slice(0, 8), host, port) - delete this.ws2pClients[uuid] + this.server.push({ + ws2p: 'disconnected', + peer: { + pub: ws2pc.connection.pubkey + } + }) + if (this.ws2pClients[uuid]) { + delete this.ws2pClients[uuid] + } }) try { this.server.logger.info('WS2P: connected to peer %s using `WS2P %s %s`!', ws2pc.connection.pubkey.slice(0, 8), host, port) @@ -82,7 +111,7 @@ export class WS2PCluster { const potentials = await this.server.dal.getWS2Peers() const peers:PeerDTO[] = potentials.map((p:any) => PeerDTO.fromJSONObject(p)) let i = 0 - while (i < peers.length && this.clientsCount() < WS2PConstants.MAX_LEVEL_1_PEERS) { + while (i < peers.length && this.clientsCount() < this.maxLevel1Size) { const p = peers[i] const api = p.getWS2P() if (p.pubkey !== this.server.conf.pair.pub) { @@ -97,16 +126,14 @@ export class WS2PCluster { const peer = PeerDTO.fromJSONObject(data) const ws2pEnpoint = peer.getWS2P() if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) { - await this.fifo.pushFIFOPromise('connect_peer_' + peer.pubkey, async () => { - // Check if already connected to the pubkey (in any way: server or client) - const connectedPubkeys = this.getConnectedPubkeys() - const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys) - if (shouldAccept) { - await this.connect(ws2pEnpoint.host, ws2pEnpoint.port) - // Trim the eventual extra connections - await this.trimClientConnections() - } - }) + // Check if already connected to the pubkey (in any way: server or client) + const connectedPubkeys = this.getConnectedPubkeys() + const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || [])) + if (shouldAccept) { + await this.connect(ws2pEnpoint.host, ws2pEnpoint.port) + // Trim the eventual extra connections + await this.trimClientConnections() + } } } })) @@ -115,7 +142,7 @@ export class WS2PCluster { async trimClientConnections() { let disconnectedOne = true // Disconnect non-members - while (disconnectedOne && this.clientsCount() > WS2PConstants.MAX_LEVEL_1_PEERS) { + while (disconnectedOne && this.clientsCount() > this.maxLevel1Size) { disconnectedOne = false let uuids = Object.keys(this.ws2pClients) uuids = _.shuffle(uuids) @@ -123,27 +150,65 @@ export class WS2PCluster { const client = this.ws2pClients[uuid] const isMember = await this.server.dal.isMember(client.connection.pubkey) if (!isMember && !disconnectedOne) { + client.connection.close() + await client.connection.closed + disconnectedOne = true + } + } + } + disconnectedOne = true + // Disconnect non-prefered members + while (disconnectedOne && this.clientsCount() > this.maxLevel1Size) { + disconnectedOne = false + let uuids = Object.keys(this.ws2pClients) + uuids = _.shuffle(uuids) + for (const uuid of uuids) { + const client = this.ws2pClients[uuid] + if (!disconnectedOne && this.getPreferedNodes().indexOf(client.connection.pubkey) === -1) { client.connection.close() disconnectedOne = true + await client.connection.closed + if (this.ws2pClients[uuid]) { + delete this.ws2pClients[uuid] + } } } } - // Disconnect members - while (this.clientsCount() > WS2PConstants.MAX_LEVEL_1_PEERS) { + // Disconnect anything + disconnectedOne = true + while (disconnectedOne && this.clientsCount() > this.maxLevel1Size) { + disconnectedOne = false let uuids = Object.keys(this.ws2pClients) uuids = _.shuffle(uuids) for (const uuid of uuids) { const client = this.ws2pClients[uuid] - client.connection.close() + if (!disconnectedOne) { + client.connection.close() + disconnectedOne = true + await client.connection.closed + if (this.ws2pClients[uuid]) { + delete this.ws2pClients[uuid] + } + } } } } - protected async acceptPubkey(pub:string, connectedPubkeys:string[]) { - let accept = false - if (connectedPubkeys.indexOf(pub) === -1) { + private getPreferedNodes(): string[] { + return (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || [] + } + + protected async acceptPubkey( + pub:string, + connectedPubkeys:string[], + getConcurrentConnexionsCount:()=>number, + maxConcurrentConnexionsSize:number, + priorityKeys:string[] + ) { + let accept = priorityKeys.indexOf(pub) !== -1 + if (!accept && connectedPubkeys.indexOf(pub) === -1) { // Do we have room? - if (this.clientsCount() < WS2PConstants.MAX_LEVEL_1_PEERS) { + if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) { // Yes: just connect to it accept = true } diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index 780ef13385ac3880427fb5bc28c316e7ca87b5a1..597bf24ddb21871804d4259b63ca0c3da90a6f04 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -13,6 +13,7 @@ const MAXIMUM_ERRORS_COUNT = 5 const REQUEST_TIMEOUT_VALUE = 1000 * 5 // 10 seconds enum WS2P_ERR { + REJECTED_PUBKEY_OR_INCORRECT_ASK_SIGNATURE_FROM_REMOTE, AUTH_INVALID_ASK_FIELDS, AUTH_INVALID_ACK_FIELDS, AUTH_INVALID_OK_FIELDS, @@ -391,7 +392,7 @@ export class WS2PConnection { if (valid) { await this.remoteAuth.sendACK(this.ws) } else { - await this.errorDetected(WS2P_ERR.INCORRECT_ASK_SIGNATURE_FROM_REMOTE) + await this.errorDetected(WS2P_ERR.REJECTED_PUBKEY_OR_INCORRECT_ASK_SIGNATURE_FROM_REMOTE) } } } diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index 7c4cba0a3586351957f8e08ab19cac6c1179b9b0..211b35fa9a403112dfefa590adc4f699398ece44 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -13,6 +13,7 @@ export class WS2PServer extends events.EventEmitter { private wss:any private connections:WS2PConnection[] = [] + private maxLevel2Size = WS2PConstants.MAX_LEVEL_2_PEERS private constructor( private server:Server, @@ -23,6 +24,14 @@ export class WS2PServer extends events.EventEmitter { super() } + get maxLevel2Peers() { + return this.maxLevel2Size || 0 + } + + set maxLevel2Peers(newValue:number) { + this.maxLevel2Size = Math.max(newValue, 0) + } + getConnexions() { return this.connections.slice() } @@ -32,81 +41,93 @@ export class WS2PServer extends events.EventEmitter { this.wss = new WebSocketServer({ host: this.host, port: this.port }) this.wss.on('connection', async (ws:any) => { - this.server.logger.info('WS2P: new incoming connection from %s:%s!', ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort) - - await this.fifo.pushFIFOPromise('wss.connect:' + [ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort].join(':'), async () => { + this.server.logger.info('WS2P %s: new incoming connection from %s:%s!', this.server.conf.pair.pub, ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort) - /****************** - * A NEW CONNECTION - ******************/ - let saidPubkey:string = "" + /****************** + * A NEW CONNECTION + ******************/ + let saidPubkey:string = "" - const acceptPubkey = async (pub:string) => { - if (!saidPubkey) { - saidPubkey = pub - } - if (saidPubkey !== pub) { - // The key must be identical - return false - } - return await this.shouldAcceptConnection(pub, this.getConnexions().map(c => c.pubkey)) + const acceptPubkey = async (pub:string) => { + if (!saidPubkey) { + saidPubkey = pub } + if (saidPubkey !== pub) { + // The key must be identical + return false + } + return await this.shouldAcceptConnection(pub, this.getConnexions().map(c => c.pubkey)) + } - const c = WS2PConnection.newConnectionFromWebSocketServer( - ws, - new WS2PServerMessageHandler(this.server), - new WS2PPubkeyLocalAuth(key, acceptPubkey), - new WS2PPubkeyRemoteAuth(key, acceptPubkey), - { - connectionTimeout: 5000, - requestTimeout: 5000 - } - ) - - try { - await c.connect() - this.connections.push(c) - this.emit('newConnection', c) - this.server.logger.info('WS2P: established incoming connection from %s:%s', ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort) - - // Broadcasting - const ws2pStreamer = new WS2PStreamer(c) - this.server.pipe(ws2pStreamer) - - ws.on('error', (e:any) => { - this.server.logger.error(e) - }) - - ws.on('close', () => { - this.server.unpipe(ws2pStreamer) - this.removeConnection(c) + const c = WS2PConnection.newConnectionFromWebSocketServer( + ws, + new WS2PServerMessageHandler(this.server), + new WS2PPubkeyLocalAuth(key, acceptPubkey), + new WS2PPubkeyRemoteAuth(key, acceptPubkey), + { + connectionTimeout: 5000, + requestTimeout: 5000 + } + ) + + try { + this.server.logger.info('WS2P %s: [c.connect()...] new incoming connection from %s:%s!', this.server.conf.pair.pub, ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort) + await c.connect() + const host = ws._sender._socket._handle.owner.remoteAddress + const port = ws._sender._socket._handle.owner.remotePort + this.server.push({ + ws2p: 'connected', + to: { host, port, pubkey: c.pubkey } + }) + this.connections.push(c) + this.emit('newConnection', c) + this.server.logger.info('WS2P: established incoming connection from %s:%s', host, port) + + // Broadcasting + const ws2pStreamer = new WS2PStreamer(c) + this.server.pipe(ws2pStreamer) + + ws.on('error', (e:any) => { + this.server.logger.error(e) + }) + + ws.on('close', () => { + this.server.unpipe(ws2pStreamer) + this.removeConnection(c) + this.server.push({ + ws2p: 'disconnected', + peer: { + pub: c.pubkey + } }) + }) - await this.trimConnections() - } catch (e) { - this.server.logger.warn('WS2P: cannot connect to incoming WebSocket connection: %s', e) - } - }) + await this.trimConnections() + } catch (e) { + this.server.logger.warn('WS2P: cannot connect to incoming WebSocket connection: %s', e) + } }) } async trimConnections() { let disconnectedOne = true // Disconnect non-members - while (disconnectedOne && this.connections.length > WS2PConstants.MAX_LEVEL_2_PEERS) { + while (disconnectedOne && this.connections.length > this.maxLevel2Size) { disconnectedOne = false for (const c of this.connections) { const isMember = await this.server.dal.isMember(c.pubkey) if (!isMember && !disconnectedOne) { c.close() + this.removeConnection(c) disconnectedOne = true } } } // Disconnect members - while (this.connections.length > WS2PConstants.MAX_LEVEL_2_PEERS) { + while (this.connections.length > this.maxLevel2Size) { for (const c of this.connections) { c.close() + this.removeConnection(c) } } } @@ -142,7 +163,7 @@ export class WS2PServer extends events.EventEmitter { static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>) { const ws2ps = new WS2PServer(server, host, port, fifo, shouldAcceptConnection) await ws2ps.listenToWebSocketConnections() - server.logger.info('WS2P server listening on %s:%s', host, port) + server.logger.info('WS2P server %s listening on %s:%s', server.conf.pair.pub, host, port) return ws2ps } } \ No newline at end of file diff --git a/test/integration/tools/toolbox.ts b/test/integration/tools/toolbox.ts index b641d4c1ed6e1de6ed7b062d350bafaec3f5ecea..2174004963500f1b190fc4d130f74c3942469e34 100644 --- a/test/integration/tools/toolbox.ts +++ b/test/integration/tools/toolbox.ts @@ -286,6 +286,21 @@ export const waitForkWS2PConnection = async (server:Server, pubkey:string) => { }) } +export const waitForkWS2PDisconnection = async (server:Server, pubkey:string) => { + await new Promise(res => { + server.pipe(es.mapSync((e:any) => { + if (e.ws2p === 'disconnected') { + console.log('>>>>>>> EVENT ', e) + } + if (e.ws2p === 'disconnected' && e.peer.pub === pubkey) { + res() + } + return e + })) + + }) +} + export class TestingServer { private prover:Prover diff --git a/test/integration/ws2p_client_limitations.ts b/test/integration/ws2p_client_limitations.ts new file mode 100644 index 0000000000000000000000000000000000000000..866f94dbe5d86df01f06e4a5dfb24c9e3ce43eb2 --- /dev/null +++ b/test/integration/ws2p_client_limitations.ts @@ -0,0 +1,240 @@ +import { + getNewTestingPort, + simpleTestingConf, + simpleTestingServer, + simpleUser, + TestingServer, + waitForkWS2PConnection, + waitForkWS2PDisconnection +} from "./tools/toolbox" +import {WS2PCluster} from "../../app/modules/ws2p/lib/WS2PCluster" + +const assert = require('assert') + +describe("WS2P client limitations", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, s3:TestingServer, s4:TestingServer + let cluster1:WS2PCluster, cluster2:WS2PCluster, cluster3:WS2PCluster, cluster4:WS2PCluster + let cat:any, tac:any, toc:any, tic:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'} + const ticKeyring = { pub: 'DNann1Lh55eZMEDXeYt59bzHbA3NJR46DeQYCS2qQdLV', sec: '468Q1XtTq7h84NorZdWBZFJrGkB18CbmbHr9tkp9snt5GiERP7ySs3wM8myLccbAAGejgMRC9rqnXuW3iAfZACm7'} + + let b0, b1, b2, b3:any, b4:any, portBMA1:number, portWS1:number, portWS2:number, portWS3:number, portWS4:number + + before(async () => { + const t1 = getTestingServer(catKeyring) + const t2 = getTestingServer(tacKeyring) + const t3 = getTestingServer(tocKeyring) + const t4 = getTestingServer(ticKeyring) + s1 = t1.server + s2 = t2.server + s3 = t3.server + s4 = t4.server; + portWS1 = t1.portWS + portWS2 = t2.portWS + portWS3 = t3.portWS + portWS4 = t4.portWS + portBMA1 = t1.portBMA + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + toc = simpleUser('toc', tocKeyring, s1) + tic = simpleUser('tic', ticKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + await s3.initDalBmaConnections() + await s4.initDalBmaConnections() + if (s1._server.conf.ws2p) { + s1._server.conf.ws2p.preferedNodes = ['DNann1Lh55eZMEDXeYt59bzHbA3NJR46DeQYCS2qQdLV'] + } + + await cat.createIdentity(); + await tac.createIdentity(); + await toc.createIdentity(); + await tic.createIdentity(); + await cat.cert(tac); + await tac.cert(toc); + await toc.cert(tic); + await tic.cert(cat); + await cat.join(); + await tac.join(); + await toc.join(); + await tic.join(); + + b0 = await s1.commit({ time: now }) + b1 = await s1.commit({ time: now }) + b2 = await s1.commit({ time: now }) + + await s2.writeBlock(b0) + await s2.writeBlock(b1) + await s2.writeBlock(b2) + await s3.writeBlock(b0) + await s3.writeBlock(b1) + await s3.writeBlock(b2) + await s4.writeBlock(b0) + await s4.writeBlock(b1) + await s4.writeBlock(b2) + await s1.waitToHaveBlock(2) + await s2.waitToHaveBlock(2) + await s3.waitToHaveBlock(2) + await s4.waitToHaveBlock(2) + cluster1 = WS2PCluster.plugOn(s1._server) + cluster2 = WS2PCluster.plugOn(s2._server) + cluster3 = WS2PCluster.plugOn(s3._server) + cluster4 = WS2PCluster.plugOn(s4._server) + await (s1._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS1) + await (s2._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS2) + await (s3._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS3) + await (s4._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS4) + }) + + after(() => (s1._server.ws2pCluster as WS2PCluster).close()) + + it('should have b#2 on s1, s2 and s3', async () => { + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + const currentS3 = await s3.BlockchainService.current() + const currentS4 = await s4.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + assert.equal(currentS3.number, 2) + assert.equal(currentS4.number, 2) + }) + + it('should be able to have a connected network on s2 start', async () => { + const p1 = await s1.getPeer() + assert.deepEqual(p1.endpoints, [ + 'BASIC_MERKLED_API 127.0.0.1 ' + portBMA1, + 'WS2P 11111111 127.0.0.1 ' + portWS1 + ]) + await s2.writePeer(p1) + await (s1._server.ws2pCluster as WS2PCluster).startCrawling() + await (s2._server.ws2pCluster as WS2PCluster).startCrawling() + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + }) + + it('should not connect to s3 because of connection size limit', async () => { + cluster1.maxLevel1Peers = 0 + cluster2.maxLevel1Peers = 1 + const p3 = await s3.getPeer() + await s2.writePeer(p3) + b3 = await s1.commit({ time: now }) + await s1.waitToHaveBlock(3) + await s2.waitToHaveBlock(3) + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + }) + + it('should connect to s4 because of configuration favorism', async () => { + cluster1.maxLevel1Peers = 1 + cluster2.maxLevel1Peers = 1 + await s3._server.PeeringService.generateSelfPeer(s3._server.conf) + const p3 = await s3.getPeer() + await s2.writePeer(p3) + await s1.waitToHaveBlock(3) + await s2.waitToHaveBlock(3) + await waitForkWS2PConnection(s1._server, 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') + b4 = await s1.commit({ time: now }) + await s2.waitToHaveBlock(4) + const p4 = await s4.getPeer() + await s2.writePeer(p4) + await waitForkWS2PConnection(s1._server, 'DNann1Lh55eZMEDXeYt59bzHbA3NJR46DeQYCS2qQdLV') + await waitForkWS2PDisconnection(s1._server, 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) // <- New connection to s4 + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + }) + + it('should be able to connect to s3 if the we increase size limit', async () => { + await s3.writeBlock(b3) + await s3.waitToHaveBlock(4) + cluster1.maxLevel1Peers = 2 + cluster2.maxLevel1Peers = 2 + await s3._server.PeeringService.generateSelfPeer(s3._server.conf) + const p3 = await s3.getPeer() + await s2.writePeer(p3) + await waitForkWS2PConnection(s2._server, 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') + await waitForkWS2PConnection(s1._server, 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 2) // <- New connection to s3! + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 2) // <- New connection to s3! + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 2) // <- New connections from s1 + s2! + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + }) + + function getTestingServer(keyring:{ pub:string, sec:string }) { + const conf1 = simpleTestingConf(now, keyring) + const portBMA = getNewTestingPort() + const portWS = getNewTestingPort() + conf1.host = '127.0.0.1' + conf1.port = portBMA + // A server + conf1.ws2p = { + upnp: false, + uuid: '11111111', + host: '127.0.0.1', + port: portWS, + remotehost: '127.0.0.1', + remoteport: portWS, + alwaysAccept: [] + } + const server = simpleTestingServer(conf1) + server._server.addEndpointsDefinitions(async () => 'WS2P 11111111 127.0.0.1 ' + portWS) + return { server, portWS, portBMA } + } +}) diff --git a/test/integration/ws2p_server_limitations.ts b/test/integration/ws2p_server_limitations.ts new file mode 100644 index 0000000000000000000000000000000000000000..ed31c32e6d5399a0909ad56cd16678ee441eec05 --- /dev/null +++ b/test/integration/ws2p_server_limitations.ts @@ -0,0 +1,215 @@ +import { + getNewTestingPort, + simpleTestingConf, + simpleTestingServer, + simpleUser, + TestingServer, + waitForkWS2PConnection, + waitForkWS2PDisconnection +} from "./tools/toolbox" +import {WS2PCluster} from "../../app/modules/ws2p/lib/WS2PCluster" + +const assert = require('assert') + +describe("WS2P server limitations", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, s3:TestingServer, s4:TestingServer + let cluster1:WS2PCluster, cluster2:WS2PCluster, cluster3:WS2PCluster, cluster4:WS2PCluster + let cat:any, tac:any, toc:any, tic:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'} + const ticKeyring = { pub: 'DNann1Lh55eZMEDXeYt59bzHbA3NJR46DeQYCS2qQdLV', sec: '468Q1XtTq7h84NorZdWBZFJrGkB18CbmbHr9tkp9snt5GiERP7ySs3wM8myLccbAAGejgMRC9rqnXuW3iAfZACm7'} + + let b0, b1, b2, b3:any, b4:any, portBMA1:number, portWS1:number, portWS2:number, portWS3:number, portWS4:number + + before(async () => { + const t1 = getTestingServer(catKeyring) + const t2 = getTestingServer(tacKeyring) + const t3 = getTestingServer(tocKeyring) + const t4 = getTestingServer(ticKeyring) + s1 = t1.server + s2 = t2.server + s3 = t3.server + s4 = t4.server; + portWS1 = t1.portWS + portWS2 = t2.portWS + portWS3 = t3.portWS + portWS4 = t4.portWS + portBMA1 = t1.portBMA + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + toc = simpleUser('toc', tocKeyring, s1) + tic = simpleUser('tic', ticKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + await s3.initDalBmaConnections() + await s4.initDalBmaConnections() + if (s1._server.conf.ws2p) { + s1._server.conf.ws2p.preferedNodes = ['DNann1Lh55eZMEDXeYt59bzHbA3NJR46DeQYCS2qQdLV'] + } + if (s3._server.conf.ws2p) { + s3._server.conf.ws2p.alwaysAccept = ['HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd'] + } + + await cat.createIdentity(); + await tac.createIdentity(); + await toc.createIdentity(); + await tic.createIdentity(); + await cat.cert(tac); + await tac.cert(toc); + await toc.cert(tic); + await tic.cert(cat); + await cat.join(); + await tac.join(); + await toc.join(); + await tic.join(); + + b0 = await s1.commit({ time: now }) + b1 = await s1.commit({ time: now }) + b2 = await s1.commit({ time: now }) + + await s2.writeBlock(b0) + await s2.writeBlock(b1) + await s2.writeBlock(b2) + await s3.writeBlock(b0) + await s3.writeBlock(b1) + await s3.writeBlock(b2) + await s4.writeBlock(b0) + await s4.writeBlock(b1) + await s4.writeBlock(b2) + await s1.waitToHaveBlock(2) + await s2.waitToHaveBlock(2) + await s3.waitToHaveBlock(2) + await s4.waitToHaveBlock(2) + cluster1 = WS2PCluster.plugOn(s1._server) + cluster2 = WS2PCluster.plugOn(s2._server) + cluster3 = WS2PCluster.plugOn(s3._server) + cluster4 = WS2PCluster.plugOn(s4._server) + await (s1._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS1) + await (s2._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS2) + await (s3._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS3) + await (s4._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS4) + }) + + after(() => (s1._server.ws2pCluster as WS2PCluster).close()) + + it('should have b#2 on s1, s2 and s3', async () => { + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + const currentS3 = await s3.BlockchainService.current() + const currentS4 = await s4.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + assert.equal(currentS3.number, 2) + assert.equal(currentS4.number, 2) + }) + + it('should be able to have a connected network on s2 start', async () => { + const p1 = await s1.getPeer() + assert.deepEqual(p1.endpoints, [ + 'BASIC_MERKLED_API 127.0.0.1 ' + portBMA1, + 'WS2P 11111111 127.0.0.1 ' + portWS1 + ]) + await s2.writePeer(p1) + await (s1._server.ws2pCluster as WS2PCluster).startCrawling() + await (s2._server.ws2pCluster as WS2PCluster).startCrawling() + await (s3._server.ws2pCluster as WS2PCluster).startCrawling() + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + }) + + it('should not connect to s3 because of connection size limit', async () => { + cluster3.maxLevel2Peers = 0 + const p3 = await s3.getPeer() + await s2.writePeer(p3) + b3 = await s1.commit({ time: now }) + await s1.waitToHaveBlock(3) + await s2.waitToHaveBlock(3) + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + }) + + it('should connect to s3 because of configuration favorism', async () => { + cluster3.maxLevel2Peers = 1 + await s3.writeBlock(b3) + await s3._server.PeeringService.generateSelfPeer(s3._server.conf) + await s3._server.PeeringService.generateSelfPeer(s3._server.conf) + await s1.waitToHaveBlock(3) + await s2.waitToHaveBlock(3) + // await waitForkWS2PConnection(s3._server, 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd') + b4 = await s1.commit({ time: now }) + await s2.waitToHaveBlock(4) + await s3._server.PeeringService.generateSelfPeer(s3._server.conf) + const p3 = await s3.getPeer() + await s2.writePeer(p3) + await waitForkWS2PConnection(s3._server, 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd') + await waitForkWS2PDisconnection(s3._server, '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc') + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) // <- New connection to s3 + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) // <- New connection from s1 + }) + await s4.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 0) + }) + }) + + function getTestingServer(keyring:{ pub:string, sec:string }) { + const conf1 = simpleTestingConf(now, keyring) + const portBMA = getNewTestingPort() + const portWS = getNewTestingPort() + conf1.host = '127.0.0.1' + conf1.port = portBMA + // A server + conf1.ws2p = { + upnp: false, + uuid: '11111111', + host: '127.0.0.1', + port: portWS, + remotehost: '127.0.0.1', + remoteport: portWS, + alwaysAccept: [] + } + const server = simpleTestingServer(conf1) + server._server.addEndpointsDefinitions(async () => 'WS2P 11111111 127.0.0.1 ' + portWS) + return { server, portWS, portBMA } + } +})