Mise à jour de GitLab prévue ce samedi 23 octobre 2021 à partir de 9h00 CET

Commit 49c4011f authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1112 Accept multiple connections of local node's key

parent 45fc8e1a
......@@ -95,9 +95,9 @@ export class PeerDTO implements Cloneable {
}
getWS2P() {
let api:any = null;
this.endpoints.forEach((ep) => {
const matches = !api && ep.match(CommonConstants.WS2P_REGEXP)
let api:{ uuid:string, host:string, port:number }|null = null
for (const ep of this.endpoints) {
const matches:any = !api && ep.match(CommonConstants.WS2P_REGEXP)
if (matches) {
api = {
uuid: matches[1],
......@@ -105,8 +105,8 @@ export class PeerDTO implements Cloneable {
port: parseInt(matches[3]) || 0
}
}
})
return api || null
}
return api || null
}
getDns() {
......
......@@ -30,6 +30,7 @@ class PeerSignalEmitter {
// The interval duration
const SIGNAL_INTERVAL = 1000 * this.conf.avgGenTime * constants.NETWORK.STATUS_INTERVAL.UPDATE;
const SIGNAL_INITIAL_DELAY = 1000 * 60
// We eventually clean an existing interval
if (this.INTERVAL)
......@@ -47,8 +48,8 @@ class PeerSignalEmitter {
})
}, SIGNAL_INTERVAL)
// Launches it a first time, immediately
await this.server.PeeringService.generateSelfPeer(this.conf, SIGNAL_INTERVAL)
// Launches it a first time few seconds after startup
setTimeout(() => this.server.PeeringService.generateSelfPeer(this.conf, SIGNAL_INTERVAL - SIGNAL_INITIAL_DELAY), SIGNAL_INITIAL_DELAY)
}
stopService() {
......
......@@ -9,13 +9,13 @@ export class WS2PClient {
private constructor(public connection:WS2PConnection) {}
static async connectTo(server:Server, host:string, port:number, messageHandler:WS2PMessageHandler, expectedPub:string) {
static async connectTo(server:Server, host:string, port:number, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) {
const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec)
const c = WS2PConnection.newConnectionToAddress(
[host, port].join(':'),
messageHandler,
new WS2PPubkeyLocalAuth(server.conf.currency , k2),
new WS2PPubkeyRemoteAuth(server.conf.currency, k2),
new WS2PPubkeyLocalAuth(server.conf.currency , k2, allowKey),
new WS2PPubkeyRemoteAuth(server.conf.currency, k2, allowKey),
{
connectionTimeout: WS2PConstants.REQUEST_TIMEOUT,
requestTimeout: WS2PConstants.REQUEST_TIMEOUT
......
......@@ -207,11 +207,14 @@ export class WS2PCluster {
return this.ws2pServer ? this.ws2pServer.getConnexions().length : 0
}
async connect(host: string, port: number, messageHandler:WS2PMessageHandler, expectedPub:string): Promise<WS2PConnection> {
async connect(host: string, port: number, messageHandler:WS2PMessageHandler, expectedPub:string, ws2pEndpointUUID:string = ""): Promise<WS2PConnection> {
const uuid = nuuid.v4()
let pub = "--------"
try {
const ws2pc = await WS2PClient.connectTo(this.server, host, port, messageHandler, expectedPub)
const ws2pc = await WS2PClient.connectTo(this.server, host, port, messageHandler, expectedPub, (pub:string) => {
const connectedPubkeys = this.getConnectedPubkeys()
return this.acceptPubkey(expectedPub, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []), ws2pEndpointUUID)
})
this.ws2pClients[uuid] = ws2pc
pub = ws2pc.connection.pubkey
ws2pc.connection.closed.then(() => {
......@@ -231,6 +234,7 @@ export class WS2PCluster {
ws2p: 'connected',
to: { host, port, pubkey: pub }
})
await this.server.dal.setPeerUP(pub)
return ws2pc.connection
} catch (e) {
this.server.logger.info('WS2P: Could not connect to peer %s using `WS2P %s %s: %s`', pub.slice(0, 8), host, port, (e && e.message || e))
......@@ -241,7 +245,9 @@ export class WS2PCluster {
async connectToWS2Peers() {
const potentials = await this.server.dal.getWS2Peers()
const peers:PeerDTO[] = potentials.map((p:any) => PeerDTO.fromJSONObject(p))
const prefered = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || []
const prefered = ((this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || []).slice() // Copy
// Our key is also a prefered one, so we connect to our siblings
prefered.push(this.server.conf.pair.pub)
peers.sort((a, b) => {
const aIsPrefered = prefered.indexOf(a.pubkey) !== -1
const bIsPrefered = prefered.indexOf(b.pubkey) !== -1
......@@ -257,9 +263,9 @@ export class WS2PCluster {
while (i < peers.length && this.clientsCount() < this.maxLevel1Size) {
const p = peers[i]
const api = p.getWS2P()
if (p.pubkey !== this.server.conf.pair.pub) {
if (api) {
try {
await this.connect(api.host, api.port, this.messageHandler, p.pubkey)
await this.connect(api.host, api.port, this.messageHandler, p.pubkey, api.uuid)
} catch (e) {
this.server.logger.debug('WS2P: init: failed connection')
}
......@@ -277,10 +283,10 @@ export class WS2PCluster {
if (data.endpoints) {
const peer = PeerDTO.fromJSONObject(data)
const ws2pEnpoint = peer.getWS2P()
if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) {
if (ws2pEnpoint) {
// Check if already connected to the pubkey (in any way: server or client)
const connectedPubkeys = this.getConnectedPubkeys()
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []))
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []), ws2pEnpoint.uuid)
if (shouldAccept) {
await this.connect(ws2pEnpoint.host, ws2pEnpoint.port, this.messageHandler, peer.pubkey)
await this.trimClientConnections()
......@@ -367,8 +373,10 @@ export class WS2PCluster {
uuids = _.shuffle(uuids)
for (const uuid of uuids) {
const client = this.ws2pClients[uuid]
const isMember = await this.server.dal.isMember(client.connection.pubkey)
if (!isMember && !disconnectedOne) {
const pub = client.connection.pubkey
const isNotOurself = pub !== this.server.conf.pair.pub
const isMember = await this.server.dal.isMember(pub)
if (isNotOurself && !isMember && !disconnectedOne) {
client.connection.close()
await client.connection.closed
disconnectedOne = true
......@@ -386,7 +394,9 @@ export class WS2PCluster {
uuids = _.shuffle(uuids)
for (const uuid of uuids) {
const client = this.ws2pClients[uuid]
if (!disconnectedOne && this.getPreferedNodes().indexOf(client.connection.pubkey) === -1) {
const pub = client.connection.pubkey
const isNotOurself = pub !== this.server.conf.pair.pub
if (isNotOurself && !disconnectedOne && this.getPreferedNodes().indexOf(pub) === -1) {
client.connection.close()
disconnectedOne = true
await client.connection.closed
......@@ -425,12 +435,17 @@ export class WS2PCluster {
connectedPubkeys:string[],
getConcurrentConnexionsCount:()=>number,
maxConcurrentConnexionsSize:number,
priorityKeys:string[]
priorityKeys:string[],
targetWS2PUID = ""
) {
let accept = priorityKeys.indexOf(pub) !== -1
if (!accept && connectedPubkeys.indexOf(pub) === -1) {
// Do we have room?
if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) {
if (this.isThisNode(pub, targetWS2PUID)) {
// We do not connect to local host
return false
}
else if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) {
// Yes: just connect to it
accept = true
}
......@@ -458,10 +473,25 @@ export class WS2PCluster {
}
}
}
} else {
// The pubkey is already connected: we accept only self nodes, and they have a supreme priority (these are siblings)
if (targetWS2PUID) {
if (this.isSiblingNode(pub, targetWS2PUID)) {
accept = true
}
}
}
return accept
}
isThisNode(pub:string, uuid:string) {
return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === uuid)
}
isSiblingNode(pub:string, uuid:string) {
return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid !== uuid)
}
async getLevel1Connections() {
const all:WS2PConnection[] = []
for (const uuid of Object.keys(this.ws2pClients)) {
......
......@@ -106,6 +106,9 @@ export class WS2PServer extends events.EventEmitter {
})
await this.trimConnections()
await this.server.dal.setPeerUP(c.pubkey)
} catch (e) {
this.server.logger.warn('WS2P: cannot connect to incoming WebSocket connection: %s', e)
}
......@@ -126,15 +129,6 @@ export class WS2PServer extends events.EventEmitter {
}
}
}
// Disconnect eventual self
if (this.connections.length > this.maxLevel2Size) {
for (const c of this.connections) {
if (c.pubkey === this.server.conf.pair.pub) {
c.close()
this.removeConnection(c)
}
}
}
// Disconnect members
while (this.connections.length > this.maxLevel2Size) {
for (const c of this.connections) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment