diff --git a/app/modules/ws2p/lib/WS2PClient.ts b/app/modules/ws2p/lib/WS2PClient.ts index 830b35722ca7079d861ee8a0dbf1b62bf0c202e7..af885353d0a0afe5f573cd5322dcced2b2f1a638 100644 --- a/app/modules/ws2p/lib/WS2PClient.ts +++ b/app/modules/ws2p/lib/WS2PClient.ts @@ -17,7 +17,7 @@ export class WS2PClient { const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec) const myWs2pId = (server.conf.ws2p && server.conf.ws2p.uuid) ? server.conf.ws2p.uuid:"" const c = WS2PConnection.newConnectionToAddress( - Math.min(endpointVersion, WS2PConstants.WS2P_VERSION), + Math.min(endpointVersion, WS2PConstants.WS2P_API_VERSION), fullEndpointAddress, messageHandler, new WS2PPubkeyLocalAuth(server.conf.currency , k2, myWs2pId, allowKey), diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 2498faffe626fc0dc28d7c9442fcd36cd43912ed..ff238cbfb131f6f9eacf7e44cf6b9388aed34f0f 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -26,6 +26,8 @@ const _ = require('underscore') export interface WS2PHead { message:string sig:string + messageV2?:string + sigV2?:string step?:number } @@ -99,13 +101,14 @@ export class WS2PCluster { const heads:WS2PHead[] = [] const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000' const localPub = this.server.conf.pair.pub - const fullId = [localPub, ws2pId].join('-') - if (!this.headsCache[fullId]) { + const myFullId = [localPub, ws2pId].join('-') + if (!this.headsCache[myFullId]) { const current = await this.server.dal.getCurrentBlockOrNull() if (current) { - const { sig, message } = this.sayHeadChangedTo(current.number, current.hash) + const myHead = this.sayHeadChangedTo(current.number, current.hash) const blockstamp = [current.number, current.hash].join('-') - this.headsCache[fullId] = { blockstamp, message, sig } + this.headsCache[myFullId] = { blockstamp, message: myHead.message,sig: myHead.sig, messageV2: myHead.message, sigV2: myHead.sig } + } } for (const ws2pFullId of Object.keys(this.headsCache)) { @@ -121,94 +124,85 @@ export class WS2PCluster { const added:WS2PHead[] = [] await Promise.all(heads.map(async (h:WS2PHead) => { try { - const step = (h.step !== undefined) ? h.step:undefined + const step = h.step const message = h.message const sig = h.sig - if (!message) { - throw "EMPTY_MESSAGE_FOR_HEAD" - } - if (message.match(WS2PConstants.HEAD_V0_REGEXP)) { - const [,, pub, blockstamp]:string[] = message.split(':') - const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000' - const fullId = [pub, ws2pId].join('-') - const sigOK = verify(message, sig, pub) - if (sigOK) { - // Already known? - if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { - // More recent? - if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { - // Check that issuer is a member and that the block exists - const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) - if (isAllowed) { - const exists = await this.existsBlock(blockstamp) - if (exists) { - this.headsCache[fullId] = { blockstamp, message, sig } - this.newHeads.push({message, sig}) - added.push({message, sig}) - // Cancel a pending "heads" to be spread - if (this.headsTimeout) { - clearTimeout(this.headsTimeout) - } - // Reprogram it a few moments later - this.headsTimeout = setTimeout(async () => { - const heads = this.newHeads.splice(0, this.newHeads.length) - if (heads.length) { - await this.spreadNewHeads(heads) - } - }, WS2PConstants.HEADS_SPREAD_TIMEOUT) - } - } - } - } - } else { + const messageV2 = h.messageV2 + const sigV2 = h.sigV2 + let sigOK = false + let fullId = '' + let pubkey = '' + let blockstamp = '' + if (messageV2) { + if (!sigV2) { throw "HEAD_MESSAGE_WRONGLY_SIGNED" } - } - else if (message.match(WS2PConstants.HEAD_V1_REGEXP)) { - const [,,, pub, blockstamp, ws2pId, software, softVersion, prefix]:string[] = message.split(':') - const sigOK = verify(message, sig, pub) + const [,,, pub, blockstamp, ws2pId,,,,,]:string[] = messageV2.split(':') const fullId = [pub, ws2pId].join('-') - if (sigOK) { - // Already known? - if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { - // More recent? - if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { - // Check that issuer is a member and that the block exists - const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) - if (isAllowed) { - const exists = await this.existsBlock(blockstamp) - if (exists) { - this.headsCache[fullId] = { blockstamp, message, sig } - this.newHeads.push({message, sig}) - added.push({message, sig}) - // Cancel a pending "heads" to be spread - if (this.headsTimeout) { - clearTimeout(this.headsTimeout) - } - // Reprogram it a few moments later - this.headsTimeout = setTimeout(async () => { - const heads = this.newHeads.splice(0, this.newHeads.length) - if (heads.length) { - await this.spreadNewHeads(heads) - } - }, WS2PConstants.HEADS_SPREAD_TIMEOUT) - } - } - } - } - } else { - throw "HEAD_MESSAGE_WRONGLY_SIGNED" + this.headReceived(messageV2, sigV2, pub, fullId, blockstamp) + } + if (!message) { + throw "EMPTY_MESSAGE_FOR_HEAD" + } + if (message.match(WS2PConstants.HEAD_V0_REGEXP)) { + const [,, pub, blockstamp]:string[] = message.split(':') + const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000' + const fullId = [pub, ws2pId].join('-') + this.headReceived(message, sig, pub, fullId, blockstamp) } + else if (message.match(WS2PConstants.HEAD_V1_REGEXP)) { + const [,,, pub, blockstamp, ws2pId, software, softVersion, prefix]:string[] = message.split(':') + const sigOK = verify(message, sig, pub) + const fullId = [pub, ws2pId].join('-') + await this.headReceived(message, sig, pub, fullId, blockstamp) + } + } catch (e) { + this.server.logger.trace(e) } - } catch (e) { - this.server.logger.trace(e) - } })) this.server.push({ ws2p: 'heads', - added + added: this.newHeads }) - return added + this.newHeads = [] + } + + private async headReceived(message:string, sig:string, pub:string, fullId:string, blockstamp:string) { + try { + const sigOK = verify(message, sig, pub) + if (sigOK) { + // Already known? + if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { + // More recent? + if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { + // Check that issuer is a member and that the block exists + const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) + if (isAllowed) { + const exists = await this.existsBlock(blockstamp) + if (exists) { + this.headsCache[fullId] = { blockstamp, message, sig } + this.newHeads.push({message, sig}) + // Cancel a pending "heads" to be spread + if (this.headsTimeout) { + clearTimeout(this.headsTimeout) + } + // Reprogram it a few moments later + this.headsTimeout = setTimeout(async () => { + const heads = this.newHeads.splice(0, this.newHeads.length) + if (heads.length) { + await this.spreadNewHeads(heads) + } + }, WS2PConstants.HEADS_SPREAD_TIMEOUT) + } + } + } + } + } else { + throw "HEAD_MESSAGE_WRONGLY_SIGNED" + } + } catch (e) { + this.server.logger.trace(e) + } } private async isMemberKey(pub:string) { @@ -366,6 +360,7 @@ export class WS2PCluster { const prefered = ((this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || []).slice() // Copy // Our key is also a prefered one, so we connect to our siblings const canReachTorEndpoint = ProxiesConf.canReachTorEndpoint(this.server.conf.proxiesConf) + const canReachClearEndpoint = ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf) peers.sort((a, b) => { // Top priority at our own nodes if (a.pubkey === this.server.conf.pair.pub && b.pubkey !== this.server.conf.pair.pub) { @@ -376,6 +371,8 @@ export class WS2PCluster { const aIsPrefered = prefered.indexOf(a.pubkey) !== -1 const bIsPrefered = prefered.indexOf(b.pubkey) !== -1 + const aNumberOfFreeRooms = this.numberOfFreeRooms(a, canReachTorEndpoint, canReachClearEndpoint) + const bNumberOfFreeRooms = this.numberOfFreeRooms(b, canReachTorEndpoint, canReachClearEndpoint) if (canReachTorEndpoint) { const aAtWs2pTorEnpoint = a.endpoints.filter(function (element) { return element.match(CommonConstants.WS2PTOR_REGEXP); }).length > 0 @@ -383,30 +380,36 @@ export class WS2PCluster { if ( (aAtWs2pTorEnpoint && bAtWs2pTorEnpoint) || (!aAtWs2pTorEnpoint && !bAtWs2pTorEnpoint) ) { if ((aIsPrefered && bIsPrefered) || (!aIsPrefered && !bIsPrefered)) { + if (aNumberOfFreeRooms > bNumberOfFreeRooms) { + return -1 + } else if (aNumberOfFreeRooms < bNumberOfFreeRooms) { + return 1 + } return 0 } else if (aIsPrefered) { return -1 - } else { - return 1 } + return 1 } else { if (aAtWs2pTorEnpoint) { return -1 - } else { - return 1 } + return 1 } } else { if ((aIsPrefered && bIsPrefered) || (!aIsPrefered && !bIsPrefered)) { + if (aNumberOfFreeRooms > bNumberOfFreeRooms) { + return -1 + } else if (aNumberOfFreeRooms < bNumberOfFreeRooms) { + return 1 + } return 0 } else if (aIsPrefered) { return -1 - } else { - return 1 } + return 1 } }) - const canReachClearEndpoint = ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf) let i = 0 let countPublicNodesWithSameKey:number = 1 // Necessary if maxPrivate = 0 let endpointsNodesWithSameKey:WS2PEndpoint[] = [] @@ -426,7 +429,7 @@ export class WS2PCluster { } } } else { - const api = p.getOnceWS2PEndpoint(canReachTorEndpoint, canReachClearEndpoint) + const api = p.getOnceWS2PEndpoint(canReachTorEndpoint, canReachClearEndpoint) if (api) { try { // We do not connect to local host @@ -442,6 +445,22 @@ export class WS2PCluster { } } + private numberOfFreeRooms(p:PeerDTO, canReachTorEndpoint:boolean, canReachClearEndpoint:boolean) { + const api = p.getOnceWS2PEndpoint(canReachTorEndpoint, canReachClearEndpoint) + if (api) { + for (const ws2pFullId in this.headsCache) { + if (ws2pFullId.slice(0, 8) == api.uuid) { + const messageV2 = this.headsCache[ws2pFullId].messageV2 + if (messageV2 !== undefined) { + const [,,, pub, blockstamp, ws2pId,,,,freeMemberRoom,freeMirorRoom]:string[] = messageV2.split(':') + return (this.server.dal.isMember(this.server.conf.pair.pub)) ? freeMemberRoom:freeMirorRoom + } + } + } + } + return 0 + } + listenServerFlow() { let connectingToNodesByFlow = false @@ -475,9 +494,9 @@ export class WS2PCluster { // HEAD changed else if (data.bcEvent === OtherConstants.BC_EVENT.HEAD_CHANGED || data.bcEvent === OtherConstants.BC_EVENT.SWITCHED) { // Propagate this change to the network - const { sig, message } = this.sayHeadChangedTo(data.block.number, data.block.hash) + const myHead = this.sayHeadChangedTo(data.block.number, data.block.hash) try { - await this.broadcastHead(message, sig) + await this.broadcastHead(myHead) } catch (e) { this.server.logger.warn(e) } @@ -488,33 +507,123 @@ export class WS2PCluster { })) } - private async broadcastHead(message:string, sig:string) { - await this.headsReceived([{ message, sig }]) - return this.spreadNewHeads([{ message, sig }]) + private async broadcastHead(head:WS2PHead) { + await this.headsReceived([head]) + return this.spreadNewHeads([head]) } - private async spreadNewHeads(heads:{ message:string, sig:string }[]) { + private async spreadNewHeads(heads:WS2PHead[]) { + heads = this.incrementHeadsStep(heads) const connexions = this.getAllConnections() return Promise.all(connexions.map(async (c) => { try { - await c.pushHeads(heads) + if (c.version >= 2) { + await c.pushHeadsV2(heads) + } else { + await c.pushHeads(heads) + } } catch (e) { this.server.logger.warn('Could not spread new HEAD info to %s WS2PID %s', c.pubkey, c.uuid) } })) } + private incrementHeadsStep(heads:WS2PHead[]) { + for (let head of heads) { + if (head.step) { + head.step++ + } + } + return heads + } + private sayHeadChangedTo(number:number, hash:string) { - const api = (this.server.conf.ws2p && this.server.conf.ws2p.remotehost && this.server.conf.ws2p.remotehost.match(WS2PConstants.HOST_ONION_REGEX)) ? 'WS2P':'WS2P' + const api = this.getApi() const key = new Key(this.server.conf.pair.pub, this.server.conf.pair.sec) - const pub = key.publicKey const software = 'duniter' const softVersion = Package.getInstance().version const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '00000000' const prefix = this.server.conf.prefix || ProverConstants.DEFAULT_PEER_ID - const message = `${api}:HEAD:1:${pub}:${number}-${hash}:${ws2pId}:${software}:${softVersion}:${prefix}` + const { freeMemberRoom , freeMirorRoom } = this.countFreeRooms() + const message = `${api}:HEAD:1:${key.publicKey}:${number}-${hash}:${ws2pId}:${software}:${softVersion}:${prefix}` const sig = key.signSync(message) - return { sig, message, pub } + const messageV2 = `${api}:HEAD:2:${key.publicKey}:${number}-${hash}:${ws2pId}:${software}:${softVersion}:${prefix}:${freeMemberRoom}:${freeMirorRoom}` + const sigV2 = key.signSync(messageV2) + + const myHead:WS2PHead = { + message, + sig, + messageV2, + sigV2, + step: 0 } + + return myHead + } + + private getApi() { + let api = 'WS2P' + let network = '' + let ws2pPrivate = '' + let ws2pPublic = '' + if (this.server.conf.proxiesConf && (this.server.conf.proxiesConf.proxyTorAddress || this.server.conf.proxiesConf.forceTor)) { + network = 'TOR' + } + if (this.server.conf.ws2p) { + if (this.server.conf.ws2p.remotehost) { + if (this.server.conf.ws2p.remotehost.match(WS2PConstants.HOST_ONION_REGEX)) { + network = 'TOR' + } + if (this.server.conf.ws2p.publicAccess) { + ws2pPublic = 'I' + switch (network) { + case 'TOR': ws2pPublic += 'T'; break; + default: ws2pPublic += 'C'; break; + } + } + } + if (this.server.conf.ws2p.privateAccess) { + ws2pPrivate = 'O' + switch (network) { + case 'TOR': ws2pPrivate += 'T'; + if (this.server.conf.proxiesConf && this.server.conf.proxiesConf.reachingClearEp) { + switch (this.server.conf.proxiesConf.reachingClearEp) { + case 'none': ws2pPrivate += 'S'; break; + case 'tor': ws2pPrivate += 'A'; break; + default: ws2pPrivate += 'M'; break; + } + } + break; + default: ws2pPrivate += 'C'; break; + } + } + } + + + api += network + ws2pPrivate + ws2pPublic + return api + } + + private countFreeRooms() { + if (!this.ws2pServer) { + return { + freeMemberRoom: 0, + freeMirorRoom: 0 + } + } + + let freeMirorRoom = this.maxLevel2Peers - this.ws2pServer.countConnexions() + let freeMemberRoom = freeMirorRoom + const privilegedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[] + for (const c of this.ws2pServer.getConnexions()) { + if (this.keyPriorityLevel(c.pubkey, privilegedNodes) < WS2PConstants.CONNECTIONS_PRIORITY.MEMBER_KEY_LEVEL) { + freeMemberRoom++ + } + } + + return { + freeMemberRoom, + freeMirorRoom + } } async removeLowPriorityConnections(privilegedKeys:string[]) { diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index 2d57a2abc156f9a00da12a5ac79c69ad105a2d44..72d30aceb47342aa975329879de42ff8e69e1c00 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -6,7 +6,7 @@ import {CertificationDTO} from "../../../lib/dto/CertificationDTO" import {MembershipDTO} from "../../../lib/dto/MembershipDTO" import {TransactionDTO} from "../../../lib/dto/TransactionDTO" import {PeerDTO} from "../../../lib/dto/PeerDTO" -import {WS2PConstants} from "./constants" +import { WS2PConstants } from './constants'; import { ProxiesConf } from '../../../lib/proxy'; const ws = require('ws') const SocksProxyAgent = require('socks-proxy-agent'); @@ -54,6 +54,7 @@ export interface WS2PRemoteAuth extends WS2PAuth { registerOK(sig: string): Promise<boolean> isAuthenticatedByRemote(): boolean getPubkey(): string + getVersion(): number } export interface WS2PLocalAuth extends WS2PAuth { @@ -89,6 +90,10 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { }) } + getVersion() { + return this.remoteVersion + } + getPubkey() { return this.remotePub } @@ -339,6 +344,10 @@ export class WS2PConnection { return new WS2PConnection(WS2PConstants.WS2P_DEFAULT_VERSION, websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub) } + get version() { + return Math.min(WS2PConstants.WS2P_HEAD_VERSION, this.remoteAuth.getVersion()) + } + get pubkey() { return this.remoteAuth.getPubkey() } @@ -610,6 +619,10 @@ export class WS2PConnection { return this.pushData(WS2P_PUSH.HEAD, 'heads', heads) } + async pushHeadsV2(heads:{ message:string, sig:string, messageV2?:string, sigV2?:string, step?:number }[]) { + return this.pushData(WS2P_PUSH.HEAD, 'heads', heads) + } + async pushData(type:WS2P_PUSH, key:string, data:any) { await this.connect() return new Promise((resolve, reject) => { diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index 25e37e2c1c84c860317f3bf092e49c188a774b48..74d5c586aae320f99315794f9d0ad295aa82dfc6 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -2,7 +2,8 @@ import {CommonConstants} from "../../../lib/common-libs/constants" export const WS2PConstants = { WS2P_DEFAULT_VERSION:1, - WS2P_VERSION: 1, + WS2P_API_VERSION: 1, + WS2P_HEAD_VERSION: 2, WS2P_UPNP_TTL: 600, WS2P_PORTS_START: 20900, diff --git a/test/integration/ws2p_connection.ts b/test/integration/ws2p_connection.ts index 73826eb038749e61377df3aa3d0c8e82aaa4df33..e444726b3494123b084e81a6b865c8b9d0547c6e 100644 --- a/test/integration/ws2p_connection.ts +++ b/test/integration/ws2p_connection.ts @@ -415,6 +415,10 @@ class WS2PNoLocalAuth implements WS2PLocalAuth { class WS2PNoRemoteAuth implements WS2PRemoteAuth { + getVersion(): number { + return 1 + } + getPubkey(): string { return "" }