diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index b8a312d7a77b12bb414b5fbb5055d43e4a1ba62e..84def3f8b341ac56819c5ee86823634fd01bb5ad 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -8,6 +8,7 @@ import {WS2PBlockPuller} from "./WS2PBlockPuller" import {WS2PDocpoolPuller} from "./WS2PDocpoolPuller" import {WS2PConstants} from "./constants" import {PeerDTO} from "../../../lib/dto/PeerDTO" +import {GlobalFifoPromise} from "../../../service/GlobalFifoPromise" const es = require('event-stream') const nuuid = require('node-uuid') @@ -20,6 +21,7 @@ export class WS2PCluster { private port:number|null = null private syncBlockInterval:NodeJS.Timer private syncDocpoolInterval:NodeJS.Timer + private fifo:GlobalFifoPromise = new GlobalFifoPromise() private constructor(private server:Server) {} @@ -33,7 +35,7 @@ export class WS2PCluster { if (this.ws2pServer) { await this.ws2pServer.close() } - this.ws2pServer = await WS2PServer.bindOn(this.server, host, port) + this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => this.acceptPubkey(pubkey, connectedPubkeys)) this.host = host this.port = port return this.ws2pServer @@ -65,6 +67,10 @@ export class WS2PCluster { }) try { this.server.logger.info('WS2P: connected to peer %s using `WS2P %s %s`!', ws2pc.connection.pubkey.slice(0, 8), host, port) + this.server.push({ + ws2p: 'connected', + to: { host, port, pubkey: ws2pc.connection.pubkey } + }) return ws2pc.connection } catch (e) { this.server.logger.info('WS2P: Could not connect to peer %s using `WS2P %s %s: %s`', ws2pc.connection.pubkey.slice(0, 8), host, port, (e && e.message || e)) @@ -79,7 +85,9 @@ export class WS2PCluster { while (i < peers.length && this.clientsCount() < WS2PConstants.MAX_LEVEL_1_PEERS) { const p = peers[i] const api = p.getWS2P() - await this.connect(api.host, api.port) + if (p.pubkey !== this.server.conf.pair.pub) { + await this.connect(api.host, api.port) + } i++ } @@ -88,13 +96,79 @@ export class WS2PCluster { if (data.endpoints) { const peer = PeerDTO.fromJSONObject(data) const ws2pEnpoint = peer.getWS2P() - if (ws2pEnpoint) { - this.connect(ws2pEnpoint.host, ws2pEnpoint.port) + if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) { + await this.fifo.pushFIFOPromise('connect_peer_' + peer.pubkey, async () => { + // Check if already connected to the pubkey (in any way: server or client) + const connectedPubkeys = this.getConnectedPubkeys() + const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys) + if (shouldAccept) { + await this.connect(ws2pEnpoint.host, ws2pEnpoint.port) + // Trim the eventual extra connections + await this.trimClientConnections() + } + }) } } })) } + async trimClientConnections() { + let disconnectedOne = true + // Disconnect non-members + while (disconnectedOne && this.clientsCount() > WS2PConstants.MAX_LEVEL_1_PEERS) { + disconnectedOne = false + let uuids = Object.keys(this.ws2pClients) + uuids = _.shuffle(uuids) + for (const uuid of uuids) { + const client = this.ws2pClients[uuid] + const isMember = await this.server.dal.isMember(client.connection.pubkey) + if (!isMember && !disconnectedOne) { + client.connection.close() + disconnectedOne = true + } + } + } + // Disconnect members + while (this.clientsCount() > WS2PConstants.MAX_LEVEL_1_PEERS) { + let uuids = Object.keys(this.ws2pClients) + uuids = _.shuffle(uuids) + for (const uuid of uuids) { + const client = this.ws2pClients[uuid] + client.connection.close() + } + } + } + + protected async acceptPubkey(pub:string, connectedPubkeys:string[]) { + let accept = false + if (connectedPubkeys.indexOf(pub) === -1) { + // Do we have room? + if (this.clientsCount() < WS2PConstants.MAX_LEVEL_1_PEERS) { + // Yes: just connect to it + accept = true + } + else { + // No: + // Does this node have the priority over at least one node? + const isMemberPeer = await this.server.dal.isMember(pub) + if (isMemberPeer) { + // The node may have the priority over at least 1 other node + let i = 0, existsOneNonMemberNode = false + while (!existsOneNonMemberNode && i < connectedPubkeys.length) { + const isAlsoAMemberPeer = await this.server.dal.isMember(connectedPubkeys[i]) + existsOneNonMemberNode = !isAlsoAMemberPeer + i++ + } + if (existsOneNonMemberNode) { + // The node has the priority over a non-member peer: try to connect + accept = true + } + } + } + } + return accept + } + async getAllConnections() { const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : [] for (const uuid of Object.keys(this.ws2pClients)) { @@ -156,4 +230,10 @@ export class WS2PCluster { await puller.pull() })) } + + getConnectedPubkeys() { + const clients = Object.keys(this.ws2pClients).map(k => this.ws2pClients[k].connection.pubkey) + const served = this.ws2pServer ? this.ws2pServer.getConnexions().map(c => c.pubkey) : [] + return clients.concat(served) + } } \ No newline at end of file diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index aa763e3d1af2b193c67074c17a7cc1c91f508346..780ef13385ac3880427fb5bc28c316e7ca87b5a1 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -39,7 +39,6 @@ export enum WS2P_PUSH { } export interface WS2PAuth { - isAuthorizedPubkey(pub:string): Promise<boolean> authenticationIsDone(): Promise<void> } @@ -70,7 +69,10 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { protected serverAuthResolve:()=>void protected serverAuthReject:(err:any)=>void - constructor(protected pair:Key) { + constructor( + protected pair:Key, + protected tellIsAuthorizedPubkey:(pub: string) => Promise<boolean> = () => Promise.resolve(true) + ) { this.challenge = nuuid.v4() + nuuid.v4() this.serverAuth = new Promise((resolve, reject) => { this.serverAuthResolve = resolve @@ -94,7 +96,7 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { } async registerCONNECT(challenge:string, sig: string, pub: string): Promise<boolean> { - const allow = await this.isAuthorizedPubkey(pub) + const allow = await this.tellIsAuthorizedPubkey(pub) if (!allow) { return false } @@ -124,10 +126,6 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { return this.authenticatedByRemote } - async isAuthorizedPubkey(pub: string): Promise<boolean> { - return true - } - authenticationIsDone(): Promise<void> { return this.serverAuth } @@ -144,7 +142,10 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { protected serverAuthResolve:()=>void protected serverAuthReject:(err:any)=>void - constructor(protected pair:Key) { + constructor( + protected pair:Key, + protected tellIsAuthorizedPubkey:(pub: string) => Promise<boolean> = () => Promise.resolve(true) + ) { this.challenge = nuuid.v4() + nuuid.v4() this.serverAuth = new Promise((resolve, reject) => { this.serverAuthResolve = resolve @@ -166,7 +167,7 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { } async registerACK(sig: string, pub: string): Promise<boolean> { - const allow = await this.isAuthorizedPubkey(pub) + const allow = await this.tellIsAuthorizedPubkey(pub) if (!allow) { return false } @@ -199,10 +200,6 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { authenticationIsDone(): Promise<void> { return this.serverAuth } - - async isAuthorizedPubkey(pub: string): Promise<boolean> { - return true - } } export interface WS2PRequest { diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index d3119fcfb14ca10d958f9a350d8487170ee0f7d7..7c4cba0a3586351957f8e08ab19cac6c1179b9b0 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -3,10 +3,13 @@ import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "./WS2PC import {WS2PServerMessageHandler} from "./interface/WS2PServerMessageHandler" import {WS2PStreamer} from "../../../lib/streams/WS2PStreamer" import {Key} from "../../../lib/common-libs/crypto/keyring" +import {GlobalFifoPromise} from "../../../service/GlobalFifoPromise" +import * as events from "events" +import {WS2PConstants} from "./constants" const WebSocketServer = require('ws').Server -export class WS2PServer { +export class WS2PServer extends events.EventEmitter { private wss:any private connections:WS2PConnection[] = [] @@ -14,7 +17,10 @@ export class WS2PServer { private constructor( private server:Server, private host:string, - private port:number) { + private port:number, + private fifo:GlobalFifoPromise, + private shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>) { + super() } getConnexions() { @@ -26,39 +32,93 @@ export class WS2PServer { this.wss = new WebSocketServer({ host: this.host, port: this.port }) this.wss.on('connection', async (ws:any) => { - /****************** - * A NEW CONNECTION - ******************/ this.server.logger.info('WS2P: new incoming connection from %s:%s!', ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort) - const c = WS2PConnection.newConnectionFromWebSocketServer( - ws, - new WS2PServerMessageHandler(this.server), - new WS2PPubkeyLocalAuth(key), - new WS2PPubkeyRemoteAuth(key), { - connectionTimeout: 5000, - requestTimeout: 5000 - }) - this.connections.push(c) + await this.fifo.pushFIFOPromise('wss.connect:' + [ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort].join(':'), async () => { - c.connect() - .then(() => this.server.logger.info('WS2P: established incoming connection from %s:%s', ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort)) - .catch((e:any) => console.error('WS2P: cannot connect to incoming WebSocket connection: %s', e)) + /****************** + * A NEW CONNECTION + ******************/ + let saidPubkey:string = "" - // Broadcasting - const ws2pStreamer = new WS2PStreamer(c) - this.server.pipe(ws2pStreamer) + const acceptPubkey = async (pub:string) => { + if (!saidPubkey) { + saidPubkey = pub + } + if (saidPubkey !== pub) { + // The key must be identical + return false + } + return await this.shouldAcceptConnection(pub, this.getConnexions().map(c => c.pubkey)) + } - ws.on('error', (e:any) => { - this.server.logger.error(e) - }) + const c = WS2PConnection.newConnectionFromWebSocketServer( + ws, + new WS2PServerMessageHandler(this.server), + new WS2PPubkeyLocalAuth(key, acceptPubkey), + new WS2PPubkeyRemoteAuth(key, acceptPubkey), + { + connectionTimeout: 5000, + requestTimeout: 5000 + } + ) + + try { + await c.connect() + this.connections.push(c) + this.emit('newConnection', c) + this.server.logger.info('WS2P: established incoming connection from %s:%s', ws._sender._socket._handle.owner.remoteAddress, ws._sender._socket._handle.owner.remotePort) + + // Broadcasting + const ws2pStreamer = new WS2PStreamer(c) + this.server.pipe(ws2pStreamer) - ws.on('close', () => { - this.server.unpipe(ws2pStreamer) + ws.on('error', (e:any) => { + this.server.logger.error(e) + }) + + ws.on('close', () => { + this.server.unpipe(ws2pStreamer) + this.removeConnection(c) + }) + + await this.trimConnections() + } catch (e) { + this.server.logger.warn('WS2P: cannot connect to incoming WebSocket connection: %s', e) + } }) }) } + async trimConnections() { + let disconnectedOne = true + // Disconnect non-members + while (disconnectedOne && this.connections.length > WS2PConstants.MAX_LEVEL_2_PEERS) { + disconnectedOne = false + for (const c of this.connections) { + const isMember = await this.server.dal.isMember(c.pubkey) + if (!isMember && !disconnectedOne) { + c.close() + disconnectedOne = true + } + } + } + // Disconnect members + while (this.connections.length > WS2PConstants.MAX_LEVEL_2_PEERS) { + for (const c of this.connections) { + c.close() + } + } + } + + private removeConnection(c:WS2PConnection) { + const index = this.connections.indexOf(c) + if (index !== -1) { + // Remove the connection + this.connections.splice(index, 1) + } + } + async close() { await Promise.all(this.connections.map(c => c.close())) return this.wss.close() @@ -66,7 +126,7 @@ export class WS2PServer { async getConnection(pubkeyOfConnection:string) { if (this.connections.length === 0) { - throw "No connections to look into." + throw Error("No connections to look into.") } return Promise.race(this.connections.map(async (c) => { await c.connected @@ -74,13 +134,13 @@ export class WS2PServer { return c } else { await new Promise(resolve => setTimeout(resolve, 5000)) - throw "Pubkey not matching or too long to be obtained" + throw Error("Pubkey not matching or too long to be obtained") } })) } - static async bindOn(server:Server, host:string, port:number) { - const ws2ps = new WS2PServer(server, host, port) + static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>) { + const ws2ps = new WS2PServer(server, host, port, fifo, shouldAcceptConnection) await ws2ps.listenToWebSocketConnections() server.logger.info('WS2P server listening on %s:%s', host, port) return ws2ps diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index 8d250ccd52fd24d20d5c68d7b66ebd76c937e66f..f4a63317d3d0538dcc2fae2c6f2a4388eee54fdd 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -10,4 +10,5 @@ export const WS2PConstants = { SANDBOX_FIRST_PULL_DELAY: 300 * 2, // 10 minutes after the start MAX_LEVEL_1_PEERS: 10, + MAX_LEVEL_2_PEERS: 10, } \ No newline at end of file diff --git a/test/integration/tools/toolbox.ts b/test/integration/tools/toolbox.ts index f123cf83260dcd4fd1ac171568fa291610aad558..b641d4c1ed6e1de6ed7b062d350bafaec3f5ecea 100644 --- a/test/integration/tools/toolbox.ts +++ b/test/integration/tools/toolbox.ts @@ -274,6 +274,18 @@ export const waitForkResolution = async (server:Server, number:number) => { }) } +export const waitForkWS2PConnection = async (server:Server, pubkey:string) => { + await new Promise(res => { + server.pipe(es.mapSync((e:any) => { + if (e.ws2p === 'connected' && e.to.pubkey === pubkey) { + res() + } + return e + })) + + }) +} + export class TestingServer { private prover:Prover @@ -633,6 +645,9 @@ export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promis const ws2ps = await cluster1.listen('localhost', port) const ws2pc = await cluster2.connect('localhost', port) + await new Promise(res => { + ws2ps.on('newConnection', res) + }) w1 = await ws2ps.getConnection(clientPub) if (!w1) { throw "Connection coming from " + clientPub + " was not found" diff --git a/test/integration/ws2p_cluster.ts b/test/integration/ws2p_cluster.ts new file mode 100644 index 0000000000000000000000000000000000000000..1b53dade96dbb9bac77eac797781fbf33f557778 --- /dev/null +++ b/test/integration/ws2p_cluster.ts @@ -0,0 +1,153 @@ +import { + getNewTestingPort, + simpleTestingConf, + simpleTestingServer, + simpleUser, + TestingServer, + waitForkWS2PConnection +} from "./tools/toolbox" +import {WS2PCluster} from "../../app/modules/ws2p/lib/WS2PCluster" + +const assert = require('assert') + +describe("WS2P cluster", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, s3:TestingServer + let cat:any, tac:any, toc:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'} + + let b0, b1, b2, portBMA1:number, portWS1:number, portWS2:number, portWS3:number + + before(async () => { + const t1 = getTestingServer(catKeyring) + const t2 = getTestingServer(tacKeyring) + const t3 = getTestingServer(tocKeyring) + s1 = t1.server + s2 = t2.server + s3 = t3.server + portWS1 = t1.portWS + portWS2 = t2.portWS + portWS3 = t3.portWS + portBMA1 = t1.portBMA + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + toc = simpleUser('toc', tocKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + await s3.initDalBmaConnections() + + await cat.createIdentity(); + await tac.createIdentity(); + await toc.createIdentity(); + await cat.cert(tac); + await tac.cert(toc); + await toc.cert(cat); + await cat.join(); + await tac.join(); + await toc.join(); + + b0 = await s1.commit({ time: now }) + b1 = await s1.commit({ time: now }) + b2 = await s1.commit({ time: now }) + + await s2.writeBlock(b0) + await s2.writeBlock(b1) + await s2.writeBlock(b2) + await s3.writeBlock(b0) + await s3.writeBlock(b1) + await s3.writeBlock(b2) + await s1.waitToHaveBlock(2) + await s2.waitToHaveBlock(2) + await s3.waitToHaveBlock(2) + WS2PCluster.plugOn(s1._server) + WS2PCluster.plugOn(s2._server) + WS2PCluster.plugOn(s3._server) + await (s1._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS1) + await (s2._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS2) + await (s3._server.ws2pCluster as WS2PCluster).listen('127.0.0.1', portWS3) + }) + + after(() => (s1._server.ws2pCluster as WS2PCluster).close()) + + it('should have b#2 on s1, s2 and s3', async () => { + const currentS1 = await s1.BlockchainService.current() + const currentS2 = await s2.BlockchainService.current() + const currentS3 = await s3.BlockchainService.current() + assert.equal(currentS1.number, 2) + assert.equal(currentS2.number, 2) + assert.equal(currentS3.number, 2) + }) + + it('should be able to have a connected network on s2 start', async () => { + const p1 = await s1.getPeer() + assert.deepEqual(p1.endpoints, [ + 'BASIC_MERKLED_API 127.0.0.1 ' + portBMA1, + 'WS2P 11111111 127.0.0.1 ' + portWS1 + ]) + await s2.writePeer(p1) + WS2PCluster.plugOn(s2._server); + await (s2._server.ws2pCluster as WS2PCluster).startCrawling() + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) + }) + }) + + it('should not start another connection if peer is already connected', async () => { + await (s1._server.ws2pCluster as WS2PCluster).startCrawling() + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) // <- Does not increase! + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) + assert.equal(res.peers.level2, 0) // <- Does not increase either! + }) + }) + + it('should be able to connect on s3 when s3 submits its peer document', async () => { + const p3 = await s3.getPeer() + await s2.writePeer(p3) + await waitForkWS2PConnection(s2._server, 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') + await waitForkWS2PConnection(s1._server, 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') + await s1.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 1) // <- New connection to s3! + assert.equal(res.peers.level2, 1) + }) + await s2.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 2) // <- New connection to s3! + assert.equal(res.peers.level2, 0) + }) + await s3.expect('/network/ws2p/info', (res:any) => { + assert.equal(res.peers.level1, 0) + assert.equal(res.peers.level2, 2) // <- New connections from s1 + s2! + }) + }) + + function getTestingServer(keyring:{ pub:string, sec:string }) { + const conf1 = simpleTestingConf(now, keyring) + const portBMA = getNewTestingPort() + const portWS = getNewTestingPort() + conf1.host = '127.0.0.1' + conf1.port = portBMA + // A server + conf1.ws2p = { + upnp: false, + uuid: '11111111', + host: '127.0.0.1', + port: portWS, + remotehost: '127.0.0.1', + remoteport: portWS + } + const server = simpleTestingServer(conf1) + server._server.addEndpointsDefinitions(async () => 'WS2P 11111111 127.0.0.1 ' + portWS) + return { server, portWS, portBMA } + } +})