diff --git a/app/lib/common-libs/errors.ts b/app/lib/common-libs/errors.ts index b01172a5a8760de8c2ddb2ec33acacec181dd001..d44e0f6eadc87a6f1b7594e9e753251c3735a6fa 100755 --- a/app/lib/common-libs/errors.ts +++ b/app/lib/common-libs/errors.ts @@ -1,5 +1,6 @@ export enum DataErrors { + WS2P_SYNC_PERIMETER_IS_LIMITED, PEER_REJECTED, TOO_OLD_PEER, LOKI_DIVIDEND_GET_WRITTEN_ON_SHOULD_NOT_BE_USED, diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index f5e556edb57f5f5424ae7b2eca139783c2d26924..76ef93581aed4e6c00f336e3d5b72b7c3a9c15fc 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -959,7 +959,7 @@ export class FileDAL { return this.peerDAL.removePeerByPubkey(pubkey) } - async findAllPeersNEWUPBut(pubkeys:string[]) { + async findAllPeersBut(pubkeys:string[]) { const peers = await this.listAllPeers(); return peers.filter((peer:DBPeer) => pubkeys.indexOf(peer.pubkey) == -1 && ['UP'].indexOf(peer.status) !== -1); diff --git a/app/lib/dto/ConfDTO.ts b/app/lib/dto/ConfDTO.ts index c8075acaf9813938fb59bd6ebe759bbcb83bd59b..cc33c9f29f6400152738c31e6dc5a934efcbfe53 100644 --- a/app/lib/dto/ConfDTO.ts +++ b/app/lib/dto/ConfDTO.ts @@ -98,6 +98,7 @@ export interface WS2PConfDTO { preferedOnly: boolean privilegedNodes?: string[] privilegedOnly: boolean + syncLimit?: number } } @@ -178,6 +179,7 @@ export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO, privilegedOnly: boolean maxPublic?:number maxPrivate?:number + syncLimit?:number }, public powNoSecurity = false ) {} diff --git a/app/modules/crawler/lib/crawler.ts b/app/modules/crawler/lib/crawler.ts index 0a0acbfa89d2bfacae717a12cab6344b59f03fbc..40e10ccc0da111939e3ed0aa5dd8ea3f5b6945f2 100644 --- a/app/modules/crawler/lib/crawler.ts +++ b/app/modules/crawler/lib/crawler.ts @@ -367,7 +367,7 @@ export class BlockCrawler { if (current) { this.pullingEvent(server, 'start', current.number); this.logger && this.logger.info("Pulling blocks from the network..."); - let peers = await server.dal.findAllPeersNEWUPBut([server.conf.pair.pub]); + let peers = await server.dal.findAllPeersBut([server.conf.pair.pub]); peers = Underscore.shuffle(peers); if (pubkey) { peers = Underscore.filter(peers, (p:any) => p.pubkey == pubkey) diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 999c28a6b8f0bda0eb8ec968c69bb79d4b32b4af..66d6dea38063e9ad1a02f873f0cccb6a67dbea0f 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -84,6 +84,9 @@ export class WS2PCluster { // A cache to remember the banned keys private banned:{ [k:string]: string } = {} + // A cache to remember the banned keys for synchronization + private banned4Sync:{ [k:string]: string } = {} + // A cache to know if a block exists or not in the DB private blockstampsCache:{ [k:string]: number } = {} @@ -285,8 +288,8 @@ export class WS2PCluster { if (this.ws2pServer) { await this.ws2pServer.close() } - this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => { - return this.acceptPubkey(pubkey, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, this.privilegedNodes(), (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly)) + this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, isSync: boolean, syncConnectedPubkeys:string[], connectedPubkeys:string[]) => { + return this.acceptPubkey(pubkey, isSync, syncConnectedPubkeys, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, this.privilegedNodes(), (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly)) }, this.keyPriorityLevel, this.messageHandler) this.host = host this.port = port @@ -343,10 +346,11 @@ export class WS2PCluster { try { const fullEndpointAddress = WS2PCluster.getFullAddress(host, port, path) const ws2pc = await WS2PClient.connectTo(this.server, fullEndpointAddress, endpointVersion, ws2pEndpointUUID, messageHandler, expectedPub, (pub:string) => { + const syncPubkeys: string[] = [] // The connection won't be considered as a SYNC connection, so there is no check to do const connectedPubkeys = this.getConnectedPubkeys() const connectedWS2PUID = this.getConnectedWS2PUID() const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[] - return this.acceptPubkey(expectedPub, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID) + return this.acceptPubkey(expectedPub, false, syncPubkeys, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID) }) this.ws2pClients[uuid] = ws2pc pub = ws2pc.connection.pubkey @@ -518,10 +522,11 @@ export class WS2PCluster { const ws2pEnpoint = peer.getOnceWS2PEndpoint(ProxiesConf.canReachTorEndpoint(this.server.conf.proxiesConf), ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf)) if (ws2pEnpoint) { // Check if already connected to the pubkey (in any way: server or client) + const syncPubkeys: string[] = [] // The connection won't be considered as a SYNC connection, so there is no check to do const connectedPubkeys = this.getConnectedPubkeys() const connectedWS2PUID = this.getConnectedWS2PUID() const preferedKeys = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[] - const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedKeys, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid) + const shouldAccept = await this.acceptPubkey(peer.pubkey, false, syncPubkeys, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedKeys, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid) if (shouldAccept && (!this.server.conf.ws2p || ws2pEnpoint.uuid !== this.server.conf.ws2p.uuid || peer.pubkey !== this.server.conf.pair.pub)) { await this.connectToRemoteWS(ws2pEnpoint.version, ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey, ws2pEnpoint.uuid) await this.removeLowPriorityConnections(preferedKeys) @@ -749,6 +754,8 @@ export class WS2PCluster { protected async acceptPubkey( pub:string, + isSync: boolean, + syncConnectedPubkeys:string[], connectedPubkeys:string[], connectedWS2PUID:string[], getConcurrentConnexionsCount:()=>number, @@ -757,6 +764,25 @@ export class WS2PCluster { priorityKeysOnly:boolean, targetWS2PUID = "" ) { + + // Sync case is specific + if (isSync) { + if (this.banned4Sync[pub]) { + return false + } + // Already connected + if (syncConnectedPubkeys.indexOf(pub) !== -1) { + return false + } + const limit = (this.server.conf.ws2p && this.server.conf.ws2p.syncLimit) || WS2PConstants.WS2P_SYNC_LIMIT + const ok = syncConnectedPubkeys.length < limit + if (ok) { + // The connection will OK: we prepare the ban right now to give room for future users + this.banSyncConnection(pub) + } + return ok + } + if (this.server.conf.pair.pub === pub) { // We do not accept oneself connetion if (this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID || targetWS2PUID === '11111111') { @@ -838,7 +864,7 @@ export class WS2PCluster { } getAllConnections() { - const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : [] + const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions().concat(this.ws2pServer.getConnexionsForSync()) : [] for (const uuid of Object.keys(this.ws2pClients)) { all.push(this.ws2pClients[uuid].connection) } @@ -955,4 +981,12 @@ export class WS2PCluster { } } } + + banSyncConnection(pub: string) { + this.server.logger.warn('Banning SYNC connection of %s for %ss (for room)', pub.slice(0, 8), WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS) + this.banned4Sync[pub] = 'sync' + setTimeout(() => { + delete this.banned4Sync[pub] + }, 1000 * WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS) + } } diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index 0f8cf34a77e5f6aeafb579f1b7fc5132eb32b93e..be9fa5851bc5d917a7a9efe632ad10daa511eae8 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -62,12 +62,13 @@ export interface WS2PAuth { } export interface WS2PRemoteAuth extends WS2PAuth { - registerCONNECT(ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string): Promise<boolean> + registerCONNECT(type: 'CONNECT'|'SYNC', ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string): Promise<boolean> sendACK(ws:any): Promise<void> registerOK(sig: string): Promise<boolean> isAuthenticatedByRemote(): boolean getPubkey(): string getVersion(): number + isSync(): boolean } export interface WS2PLocalAuth extends WS2PAuth { @@ -90,11 +91,12 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { protected serverAuth:Promise<void> protected serverAuthResolve:()=>void protected serverAuthReject:(err:any)=>void + protected isSyncConnection = false constructor( protected currency:string, protected pair:Key, - protected tellIsAuthorizedPubkey:(pub: string) => Promise<boolean> = () => Promise.resolve(true) + protected tellIsAuthorizedPubkey:(pub: string, isSync: boolean) => Promise<boolean> = () => Promise.resolve(true) ) { this.challenge = nuuid.v4() + nuuid.v4() this.serverAuth = new Promise((resolve, reject) => { @@ -111,6 +113,10 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { return this.remotePub } + isSync() { + return this.isSyncConnection + } + async sendACK(ws: any): Promise<void> { const challengeMessage = `WS2P:ACK:${this.currency}:${this.pair.pub}:${this.challenge}` Logger.log('sendACK >>> ' + challengeMessage) @@ -122,12 +128,13 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { })) } - async registerCONNECT(ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string = ""): Promise<boolean> { - const allow = await this.tellIsAuthorizedPubkey(pub) + async registerCONNECT(type: 'CONNECT'|'SYNC', ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string = ""): Promise<boolean> { + this.isSyncConnection = type === 'SYNC' + const allow = await this.tellIsAuthorizedPubkey(pub, this.isSyncConnection) if (!allow) { return false } - const challengeMessage = (ws2pVersion > 1) ? `WS2P:CONNECT:${this.currency}:${pub}:${ws2pId}:${challenge}`:`WS2P:CONNECT:${this.currency}:${pub}:${challenge}` + const challengeMessage = (ws2pVersion > 1) ? `WS2P:${type}:${this.currency}:${pub}:${ws2pId}:${challenge}`:`WS2P:${type}:${this.currency}:${pub}:${challenge}` Logger.log('registerCONNECT >>> ' + challengeMessage) const verified = verify(challengeMessage, sig, pub) if (verified) { @@ -170,6 +177,7 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { protected serverAuth:Promise<void> protected serverAuthResolve:()=>void protected serverAuthReject:(err:any)=>void + protected isSync: boolean constructor( protected currency:string, @@ -182,15 +190,17 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { this.serverAuthResolve = resolve this.serverAuthReject = reject }) + this.isSync = false } async sendCONNECT(ws:any, ws2pVersion:number): Promise<void> { + const connectWord = this.isSync ? 'SYNC' : 'CONNECT' if (ws2pVersion > 1) { - const challengeMessage = `WS2P:${ws2pVersion}:CONNECT:${this.currency}:${this.pair.pub}:${this.ws2pId}:${this.challenge}` + const challengeMessage = `WS2P:${ws2pVersion}:${connectWord}:${this.currency}:${this.pair.pub}:${this.ws2pId}:${this.challenge}` Logger.log('sendCONNECT >>> ' + challengeMessage) const sig = this.pair.signSync(challengeMessage) await ws.send(JSON.stringify({ - auth: 'CONNECT', + auth: `${connectWord}`, version: ws2pVersion, pub: this.pair.pub, ws2pid: this.ws2pId, @@ -199,11 +209,11 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { })) return this.serverAuth } else if (ws2pVersion == 1) { - const challengeMessage = `WS2P:CONNECT:${this.currency}:${this.pair.pub}:${this.challenge}` + const challengeMessage = `WS2P:${connectWord}:${this.currency}:${this.pair.pub}:${this.challenge}` Logger.log('sendCONNECT >>> ' + challengeMessage) const sig = this.pair.signSync(challengeMessage) await ws.send(JSON.stringify({ - auth: 'CONNECT', + auth: `${connectWord}`, pub: this.pair.pub, challenge: this.challenge, sig @@ -248,6 +258,19 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth { } } +export class WS2PPubkeySyncLocalAuth extends WS2PPubkeyLocalAuth { + + constructor( + protected currency:string, + protected pair:Key, + protected ws2pId:string, + protected tellIsAuthorizedPubkey:(pub: string) => Promise<boolean> = () => Promise.resolve(true) + ) { + super(currency, pair, ws2pId, tellIsAuthorizedPubkey) + this.isSync = true + } +} + export interface WS2PRequest { name:string, params?:any @@ -377,6 +400,10 @@ export class WS2PConnection { return this.expectedWS2PUID } + get isSync() { + return this.remoteAuth.isSync() + } + get nbRequests() { return this.nbRequestsCount } @@ -405,7 +432,12 @@ export class WS2PConnection { return this.ws.close() } - async connect() { + async connectAsInitiator() { + return this.connect(true) + } + + async connect(initiator = false) { + const whoIs = initiator ? 'INITIATOR' : 'SERVER' if (!this.connectp) { this.connectp = (async () => { const connectionTimeout = new Promise((res, rej) => { @@ -448,7 +480,7 @@ export class WS2PConnection { if (data.auth && typeof data.auth === "string") { - if (data.auth === "CONNECT") { + if (data.auth === "CONNECT" || data.auth === "SYNC") { if (data.version) { if (typeof data.version !== "number") { await this.errorDetected(WS2P_ERR.AUTH_INVALID_ASK_FIELDS) @@ -466,7 +498,7 @@ export class WS2PConnection { if (this.expectedPub && data.pub !== this.expectedPub) { await this.errorDetected(WS2P_ERR.INCORRECT_PUBKEY_FOR_REMOTE) } else { - const valid = await this.remoteAuth.registerCONNECT(this.ws2pVersion, data.challenge, data.sig, data.pub, (this.ws2pVersion > 1) ? data.ws2pID:"") + const valid = await this.remoteAuth.registerCONNECT(data.auth, this.ws2pVersion, data.challenge, data.sig, data.pub, (this.ws2pVersion > 1) ? data.ws2pID:"") if (valid) { await this.remoteAuth.sendACK(this.ws) } else { @@ -527,7 +559,7 @@ export class WS2PConnection { // Request message else if (data.reqId && typeof data.reqId === "string") { try { - const answer = await this.messageHandler.answerToRequest(data.body) + const answer = await this.messageHandler.answerToRequest(data.body, this) this.ws.send(JSON.stringify({ resId: data.reqId, body: answer })) } catch (e) { this.ws.send(JSON.stringify({ resId: data.reqId, err: e })) @@ -567,7 +599,7 @@ export class WS2PConnection { } async request(body:WS2PRequest) { - await this.connect() + await this.connectAsInitiator() const uuid = nuuid.v4() return new Promise((resolve, reject) => { this.nbRequestsCount++ @@ -643,7 +675,7 @@ export class WS2PConnection { } async pushData(type:WS2P_PUSH, key:string, data:any) { - await this.connect() + await this.connectAsInitiator() return new Promise((resolve, reject) => { this.nbPushsToRemoteCount++ try { diff --git a/app/modules/ws2p/lib/WS2PRequester.ts b/app/modules/ws2p/lib/WS2PRequester.ts index 29f72b72f5d8a85593172abb719710759c047a32..bfa2ea23589915a181d8488b74573b89e1f851e0 100644 --- a/app/modules/ws2p/lib/WS2PRequester.ts +++ b/app/modules/ws2p/lib/WS2PRequester.ts @@ -15,6 +15,8 @@ import {WS2PConnection} from "./WS2PConnection" import {BlockDTO} from "../../../lib/dto/BlockDTO" export enum WS2P_REQ { + KNOWN_PEERS, + PEER_DOCUMENT, WOT_REQUIREMENTS_OF_PENDING, BLOCKS_CHUNK, BLOCK_BY_NUMBER, diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index a9091d43ebd0b4fb221c755f99682bc2203c19c2..9ee4847df74f6f66ea572d5f66a9cc6af761cbf7 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -26,13 +26,14 @@ export class WS2PServer extends events.EventEmitter { private wss:any private connections:WS2PConnection[] = [] + private synConnections:WS2PConnection[] = [] private constructor( private server:Server, private host:string, private port:number, private fifo:GlobalFifoPromise, - private shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>, + private shouldAcceptConnection:(pubkey:string, isSync: boolean, syncConnectedPubkeys:string[], connectedPubkeys:string[])=>Promise<boolean>, public keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>Promise<number>) { super() } @@ -48,6 +49,10 @@ export class WS2PServer extends events.EventEmitter { return this.connections.slice() } + getConnexionsForSync() { + return this.synConnections.slice() + } + countConnexions() { const connections = this.getConnexions() let count = 0 @@ -71,7 +76,7 @@ export class WS2PServer extends events.EventEmitter { ******************/ let saidPubkey:string = "" - const acceptPubkey = async (pub:string) => { + const acceptPubkey = async (pub:string, isSync: boolean) => { if (!saidPubkey) { saidPubkey = pub } @@ -79,7 +84,7 @@ export class WS2PServer extends events.EventEmitter { // The key must be identical return false } - return await this.shouldAcceptConnection(pub, this.getConnexions().map(c => c.pubkey)) + return await this.shouldAcceptConnection(pub, isSync, this.getConnexionsForSync().map(c => c.pubkey), this.getConnexions().map(c => c.pubkey)) } let timeout = { connectionTimeout: WS2PConstants.CONNEXION_TIMEOUT, @@ -95,13 +100,40 @@ export class WS2PServer extends events.EventEmitter { const c = WS2PConnection.newConnectionFromWebSocketServer( ws, messageHandler, - new WS2PPubkeyLocalAuth(this.server.conf.currency, key, myWs2pId, acceptPubkey), + new WS2PPubkeyLocalAuth(this.server.conf.currency, key, myWs2pId, pub => acceptPubkey(pub, false)), new WS2PPubkeyRemoteAuth(this.server.conf.currency, key, acceptPubkey), timeout ) try { await c.connect() + + /** + * Sync is a particular case: + * - we remember the connection + * - we allow it to run for a limited period of time + * - we don't broadcast any data to it + * - we only allow blocks+peering fetching, any other request is forbidden and closes the connection + */ + if (c.isSync) { + // We remember it + this.synConnections.push(c) + // When the connection closes: + ws.on('close', () => { + // Remove the connection + const index = this.synConnections.indexOf(c) + if (index !== -1) { + // Remove the connection + this.synConnections.splice(index, 1) + c.close() + } + }) + // We close the connection after a given delay + setTimeout(() => c.close(), WS2PConstants.SYNC_CONNECTION_DURATION_IN_SECONDS) + // We don't broadcast or pipe data + return + } + const host = ws._sender._socket._handle.owner.remoteAddress const port = ws._sender._socket._handle.owner.remotePort this.server.push({ @@ -217,7 +249,7 @@ export class WS2PServer extends events.EventEmitter { })) } - static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>, keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>Promise<number>, messageHandler:WS2PMessageHandler) { + static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, isSync: boolean, syncConnectedPubkeys:string[], connectedPubkeys:string[])=>Promise<boolean>, keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>Promise<number>, messageHandler:WS2PMessageHandler) { const ws2ps = new WS2PServer(server, host, port, fifo, shouldAcceptConnection, keyPriorityLevel) await ws2ps.listenToWebSocketConnections(messageHandler) server.logger.info('WS2P server %s listening on %s:%s', server.conf.pair.pub, host, port) diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index 3ec09645fbeab14b286b71a533b7ac10bf3c9ea9..ac58306db71978c1d278c21264dc34dfcc098801 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -12,6 +12,7 @@ // GNU Affero General Public License for more details. import {CommonConstants} from "../../../lib/common-libs/constants" + export const WS2PConstants = { NETWORK: { @@ -56,6 +57,7 @@ export const WS2PConstants = { }, BAN_DURATION_IN_SECONDS: 120, + SYNC_BAN_DURATION_IN_SECONDS: 240, BAN_ON_REPEAT_THRESHOLD: 5, ERROR_RECALL_DURATION_IN_SECONDS: 60, SINGLE_RECORD_PROTECTION_IN_SECONDS: 60, @@ -93,5 +95,8 @@ export const WS2PConstants = { INITIAL_CONNECTION_PEERS_BUNDLE_SIZE: 5, - HEADS_SPREAD_TIMEOUT: 100 // Wait 100ms before sending a bunch of signed heads + HEADS_SPREAD_TIMEOUT: 100, // Wait 100ms before sending a bunch of signed heads + + WS2P_SYNC_LIMIT: 15, // Number of concurrent peers for sync + SYNC_CONNECTION_DURATION_IN_SECONDS: 120, // Duration of the SYNC connection } \ No newline at end of file diff --git a/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts b/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts index 56c7870d2c3a27502352ab475553ded10cc2bbf2..c07e07b5e0c27e14fea34662720afc408e9181ba 100644 --- a/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts +++ b/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts @@ -13,8 +13,9 @@ import {WS2PResponse} from "./WS2PResponse" import {WS2PConnection} from "../WS2PConnection" + export interface WS2PMessageHandler { handlePushMessage(json:any, c:WS2PConnection): Promise<void> - answerToRequest(json:any): Promise<WS2PResponse> + answerToRequest(json:any, c:WS2PConnection): Promise<WS2PResponse> } \ No newline at end of file diff --git a/app/modules/ws2p/lib/impl/WS2PReqMapperByServer.ts b/app/modules/ws2p/lib/impl/WS2PReqMapperByServer.ts index ae805fdc2d5a5abe34c3027940db285ea03d9504..ce683320513e017e84cd456111c437222376e9c0 100644 --- a/app/modules/ws2p/lib/impl/WS2PReqMapperByServer.ts +++ b/app/modules/ws2p/lib/impl/WS2PReqMapperByServer.ts @@ -11,11 +11,12 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -import {IdentityForRequirements} from './../../../../service/BlockchainService'; +import {IdentityForRequirements} from '../../../../service/BlockchainService'; import {Server} from "../../../../../server" import {WS2PReqMapper} from "../interface/WS2PReqMapper" import {BlockDTO} from "../../../../lib/dto/BlockDTO" import {DBBlock} from "../../../../lib/db/DBBlock" +import {PeerDTO} from "../../../../lib/dto/PeerDTO" export class WS2PReqMapperByServer implements WS2PReqMapper { @@ -68,4 +69,12 @@ export class WS2PReqMapperByServer implements WS2PReqMapper { identities: all } } + + async getPeer(): Promise<PeerDTO> { + return this.server.PeeringService.peer() + } + + async getKnownPeers(): Promise<PeerDTO[]> { + return (await this.server.dal.findAllPeersBut([])).map(p => PeerDTO.fromDBPeer(p)) + } } \ No newline at end of file diff --git a/app/modules/ws2p/lib/interface/WS2PReqMapper.ts b/app/modules/ws2p/lib/interface/WS2PReqMapper.ts index b2575e4a26c5beb51d43b1fbfe67137b37d69822..83b37d29b6bc8f49fe10bd0ebd649309297860b1 100644 --- a/app/modules/ws2p/lib/interface/WS2PReqMapper.ts +++ b/app/modules/ws2p/lib/interface/WS2PReqMapper.ts @@ -13,6 +13,7 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO" import {DBBlock} from "../../../../lib/db/DBBlock" +import {PeerDTO} from "../../../../lib/dto/PeerDTO" export interface WS2PReqMapper { @@ -20,4 +21,6 @@ export interface WS2PReqMapper { getBlock(number:number): Promise<BlockDTO> getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> getRequirementsOfPending(minCert:number): Promise<any> + getPeer(): Promise<PeerDTO> + getKnownPeers(): Promise<PeerDTO[]> } \ No newline at end of file diff --git a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts index 19a0a85deaa08d9af5cddff4335ebd3c739da7f5..27f43f308fbd25961d51defd74f0153a2c9f10de 100644 --- a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts +++ b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts @@ -27,6 +27,7 @@ import {WS2PCluster} from "../WS2PCluster" import {WS2PConnection} from "../WS2PConnection" import {WS2PConstants} from "../constants" import {CommonConstants} from "../../../../lib/common-libs/constants" +import {DataErrors} from "../../../../lib/common-libs/errors" export enum WS2P_REQERROR { UNKNOWN_REQUEST @@ -49,6 +50,13 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { } async handlePushMessage(json: any, c:WS2PConnection): Promise<void> { + + if (c.isSync) { + // Push messages are forbidden on sync connection + c.close() + return + } + let documentHash = '' try { if (json.body) { @@ -144,7 +152,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { } } - async answerToRequest(data: any): Promise<WS2PResponse> { + async answerToRequest(data: any, c:WS2PConnection): Promise<WS2PResponse> { /********** * REQUEST @@ -152,11 +160,20 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { let body:any = {} + const forbiddenRequestsForSync: string[] = [] // For now, no WS2P requests are forbidden + if (c.isSync && (!data || !data.name || forbiddenRequestsForSync.indexOf(data.name) !== -1)) { + // Some messages are forbidden on sync connection + c.close() + throw Error(DataErrors[DataErrors.WS2P_SYNC_PERIMETER_IS_LIMITED]) + } + if (data && data.name) { switch (data.name) { + case WS2P_REQ[WS2P_REQ.CURRENT]: body = await this.mapper.getCurrent() break; + case WS2P_REQ[WS2P_REQ.BLOCK_BY_NUMBER]: if (isNaN(data.params.number)) { throw "Wrong param `number`" @@ -164,6 +181,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { const number:number = data.params.number body = await this.mapper.getBlock(number) break; + case WS2P_REQ[WS2P_REQ.BLOCKS_CHUNK]: if (isNaN(data.params.count)) { throw "Wrong param `count`" @@ -175,6 +193,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { const fromNumber:number = data.params.fromNumber body = await this.mapper.getBlocks(count, fromNumber) break; + case WS2P_REQ[WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING]: if (isNaN(data.params.minCert)) { throw "Wrong param `minCert`" @@ -182,6 +201,15 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { const minCert:number = data.params.minCert body = await this.mapper.getRequirementsOfPending(minCert) break; + + case WS2P_REQ[WS2P_REQ.PEER_DOCUMENT]: + body = await this.mapper.getPeer() + break; + + case WS2P_REQ[WS2P_REQ.KNOWN_PEERS]: + body = await this.mapper.getKnownPeers() + break; + default: throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) } diff --git a/test/integration/tools/TestUser.ts b/test/integration/tools/TestUser.ts index ddd6e515a24eb91ac9807fe9c4584c7abbecc9d4..74941a78a7675e26ad6aee2bc43b1e42674cce31 100644 --- a/test/integration/tools/TestUser.ts +++ b/test/integration/tools/TestUser.ts @@ -11,7 +11,7 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -import {KeyGen} from "../../../app/lib/common-libs/crypto/keyring" +import {Key, KeyGen} from "../../../app/lib/common-libs/crypto/keyring" import {IdentityDTO} from "../../../app/lib/dto/IdentityDTO"; import {TestingServer} from "./toolbox" import {CommonConstants} from "../../../app/lib/common-libs/constants" @@ -54,6 +54,10 @@ export class TestUser { } } + get keypair(): Key { + return new Key(this.pub, this.sec) + } + private init(done:()=>void) { if (this.options.pub && this.options.sec) { this.pub = this.options.pub diff --git a/test/integration/tools/test-framework.ts b/test/integration/tools/test-framework.ts index 0f691805f034bb3e849c865330862ed081e9b665..52ced0319e67cefb91e669420afdfd896603b502 100644 --- a/test/integration/tools/test-framework.ts +++ b/test/integration/tools/test-framework.ts @@ -26,6 +26,18 @@ export function writeBasicTestWith2Users(writeTests: (test: (testTitle: string, }) } +export async function createCurrencyWith2Blocks(s: TestingServer, cat: TestUser, tac: TestUser) { + await cat.createIdentity() + await tac.createIdentity() + await cat.cert(tac) + await tac.cert(cat) + await cat.join() + await tac.join() + await s.commit() + await s.commit() + await s.commit() +} + export function assertEqual(value: number, expected: number) { assert.equal(value, expected) } @@ -34,6 +46,10 @@ export function assertTrue(expected: boolean) { assert.equal(true, expected) } +export function assertNotNull(value: any) { + assert.notEqual(value, null) +} + export function assertFalse(expected: boolean) { assert.equal(false, expected) } \ No newline at end of file diff --git a/test/integration/tools/toolbox.ts b/test/integration/tools/toolbox.ts index 29090c972a26ca4d3d1e9cd88f31160f53337ce2..ad761b1c3303e0e7eaf251748c31ee562fab91c6 100644 --- a/test/integration/tools/toolbox.ts +++ b/test/integration/tools/toolbox.ts @@ -29,7 +29,12 @@ import {FileDAL} from "../../../app/lib/dal/fileDAL" import {MembershipDTO} from "../../../app/lib/dto/MembershipDTO" import {TransactionDTO} from "../../../app/lib/dto/TransactionDTO" import {Key} from "../../../app/lib/common-libs/crypto/keyring" -import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "../../../app/modules/ws2p/lib/WS2PConnection" +import { + WS2PConnection, + WS2PPubkeyLocalAuth, + WS2PPubkeyRemoteAuth, + WS2PPubkeySyncLocalAuth +} from "../../../app/modules/ws2p/lib/WS2PConnection" import {WS2PResponse} from "../../../app/modules/ws2p/lib/impl/WS2PResponse" import {WS2PMessageHandler} from "../../../app/modules/ws2p/lib/impl/WS2PMessageHandler" import {WS2PCluster} from "../../../app/modules/ws2p/lib/WS2PCluster" @@ -51,6 +56,7 @@ import {sync} from "./test-sync" import {expectAnswer, expectError, expectJSON} from "./http-expect" import {WebSocketServer} from "../../../app/lib/common-libs/websocket" import {CommonConstants} from "../../../app/lib/common-libs/constants" +import {WS2PRequester} from "../../../app/modules/ws2p/lib/WS2PRequester" const assert = require('assert'); const rp = require('request-promise'); @@ -657,6 +663,32 @@ export class TestingServer { await farm.shutDownEngine() } } + + async enableWS2P(port: number = PORT++) { + const cluster = WS2PCluster.plugOn(this._server) + await (this._server.ws2pCluster as WS2PCluster).listen(HOST, port) + const doConnection = (pair: Key, ws2pId: string, constructor: new ( + currency:string, + pair:Key, + ws2pId:string, + ) => WS2PPubkeyLocalAuth) => { + const connection = WS2PConnection.newConnectionToAddress(1, + `ws://${HOST}:${port}`, + new WS2PServerMessageHandler(this._server, cluster), + new constructor(this.conf.currency, pair, ws2pId), + new WS2PPubkeyRemoteAuth(this.conf.currency, pair) + ) + return WS2PRequester.fromConnection(connection) + } + return { + connect: (pair: Key, ws2pId: string) => { + return doConnection(pair, ws2pId, WS2PPubkeyLocalAuth) + }, + connectForSync: (pair: Key, ws2pId: string) => { + return doConnection(pair, ws2pId, WS2PPubkeySyncLocalAuth) + } + } + } } export async function newWS2PBidirectionnalConnection(currency:string, k1:Key, k2:Key, serverHandler:WS2PMessageHandler) { diff --git a/test/integration/ws2p/ws2p_connection.ts b/test/integration/ws2p/ws2p_connection.ts index 84defd19e0329e87f6105976ea04209f323d3862..ebdb2add42a793e5f42b5b3fbe656b6d7dbb1128 100644 --- a/test/integration/ws2p/ws2p_connection.ts +++ b/test/integration/ws2p/ws2p_connection.ts @@ -442,7 +442,7 @@ class WS2PNoRemoteAuth implements WS2PRemoteAuth { async sendACK(ws: any): Promise<void> { } - async registerCONNECT(version:number, challenge:string, sig: string, pub: string, ws2pId:string): Promise<boolean> { + async registerCONNECT(type: 'CONNECT'|'SYNC', version:number, challenge:string, sig: string, pub: string, ws2pId:string): Promise<boolean> { return true } @@ -460,6 +460,10 @@ class WS2PNoRemoteAuth implements WS2PRemoteAuth { async authenticationIsDone(): Promise<void> { } + + isSync(): boolean { + return false + } } class WS2PMutedHandler implements WS2PMessageHandler { diff --git a/test/integration/ws2p/ws2p_sync.ts b/test/integration/ws2p/ws2p_sync.ts new file mode 100644 index 0000000000000000000000000000000000000000..a1c812b8ea0efd76fd6fdc6d2040820f5bb0a889 --- /dev/null +++ b/test/integration/ws2p/ws2p_sync.ts @@ -0,0 +1,33 @@ +// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +import {WS2PConstants} from "../../../app/modules/ws2p/lib/constants" +import {assertEqual, assertNotNull, createCurrencyWith2Blocks, writeBasicTestWith2Users} from "../tools/test-framework" + +describe('WS2P sync', () => writeBasicTestWith2Users((test) => { + + WS2PConstants.CONNEXION_TIMEOUT = 100 + WS2PConstants.REQUEST_TIMEOUT= 100 + + test('should be able to init with 2 blocks', async (s1, cat, tac) => { + await createCurrencyWith2Blocks(s1, cat, tac) + }) + + test('if we disable the changes API', async (s1, cat, tac) => { + const ws2p = await s1.enableWS2P() + const ws = (await ws2p).connectForSync(tac.keypair, '12345678') + const current = await ws.getCurrent() + assertNotNull(current) + assertEqual(2, current.number) + }) +}))