diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index d607e9c2f5f5998d5140e32e6f7e55ee53784f76..7cd2284be9adaad9b37840ee93f0b3eea6721714 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -100,83 +100,87 @@ export class WS2PCluster { async headsReceived(heads:[{ message:string, sig:string }]) { const added:{ message:string, sig:string }[] = [] await Promise.all(heads.map(async (h:{ message:string, sig:string }) => { - const message = h.message - const sig = h.sig - if (!message) { - throw "EMPTY_MESSAGE_FOR_HEAD" - } - if (message.match(WS2PConstants.HEAD_V0_REGEXP)) { - const [,, pub, blockstamp]:string[] = message.split(':') - const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000' - const fullId = [pub, ws2pId].join('-') - const sigOK = verify(message, sig, pub) - if (sigOK) { - // Already known? - if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { - // More recent? - if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { - // Check that issuer is a member and that the block exists - const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) - if (isAllowed) { - const exists = await this.existsBlock(blockstamp) - if (exists) { - this.headsCache[fullId] = { blockstamp, message, sig } - this.newHeads.push({message, sig}) - added.push({message, sig}) - // Cancel a pending "heads" to be spread - if (this.headsTimeout) { - clearTimeout(this.headsTimeout) - } - // Reprogram it a few moments later - this.headsTimeout = setTimeout(async () => { - const heads = this.newHeads.splice(0, this.newHeads.length) - if (heads.length) { - await this.spreadNewHeads(heads) + try { + const message = h.message + const sig = h.sig + if (!message) { + throw "EMPTY_MESSAGE_FOR_HEAD" + } + if (message.match(WS2PConstants.HEAD_V0_REGEXP)) { + const [,, pub, blockstamp]:string[] = message.split(':') + const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000' + const fullId = [pub, ws2pId].join('-') + const sigOK = verify(message, sig, pub) + if (sigOK) { + // Already known? + if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { + // More recent? + if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { + // Check that issuer is a member and that the block exists + const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) + if (isAllowed) { + const exists = await this.existsBlock(blockstamp) + if (exists) { + this.headsCache[fullId] = { blockstamp, message, sig } + this.newHeads.push({message, sig}) + added.push({message, sig}) + // Cancel a pending "heads" to be spread + if (this.headsTimeout) { + clearTimeout(this.headsTimeout) } - }, WS2PConstants.HEADS_SPREAD_TIMEOUT) + // Reprogram it a few moments later + this.headsTimeout = setTimeout(async () => { + const heads = this.newHeads.splice(0, this.newHeads.length) + if (heads.length) { + await this.spreadNewHeads(heads) + } + }, WS2PConstants.HEADS_SPREAD_TIMEOUT) + } } } } + } else { + throw "HEAD_MESSAGE_WRONGLY_SIGNED" } - } else { - throw "HEAD_MESSAGE_WRONGLY_SIGNED" } - } - else if (message.match(WS2PConstants.HEAD_V1_REGEXP)) { - const [,,, pub, blockstamp, software, ws2pId, softVersion, prefix]:string[] = message.split(':') - const sigOK = verify(message, sig, pub) - const fullId = [pub, ws2pId].join('-') - if (sigOK) { - // Already known? - if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { - // More recent? - if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { - // Check that issuer is a member and that the block exists - const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) - if (isAllowed) { - const exists = await this.existsBlock(blockstamp) - if (exists) { - this.headsCache[fullId] = { blockstamp, message, sig } - this.newHeads.push({message, sig}) - added.push({message, sig}) - // Cancel a pending "heads" to be spread - if (this.headsTimeout) { - clearTimeout(this.headsTimeout) - } - // Reprogram it a few moments later - this.headsTimeout = setTimeout(async () => { - const heads = this.newHeads.splice(0, this.newHeads.length) - if (heads.length) { - await this.spreadNewHeads(heads) + else if (message.match(WS2PConstants.HEAD_V1_REGEXP)) { + const [,,, pub, blockstamp, software, ws2pId, softVersion, prefix]:string[] = message.split(':') + const sigOK = verify(message, sig, pub) + const fullId = [pub, ws2pId].join('-') + if (sigOK) { + // Already known? + if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) { + // More recent? + if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) { + // Check that issuer is a member and that the block exists + const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub)) + if (isAllowed) { + const exists = await this.existsBlock(blockstamp) + if (exists) { + this.headsCache[fullId] = { blockstamp, message, sig } + this.newHeads.push({message, sig}) + added.push({message, sig}) + // Cancel a pending "heads" to be spread + if (this.headsTimeout) { + clearTimeout(this.headsTimeout) } - }, WS2PConstants.HEADS_SPREAD_TIMEOUT) + // Reprogram it a few moments later + this.headsTimeout = setTimeout(async () => { + const heads = this.newHeads.splice(0, this.newHeads.length) + if (heads.length) { + await this.spreadNewHeads(heads) + } + }, WS2PConstants.HEADS_SPREAD_TIMEOUT) + } } } } + } else { + throw "HEAD_MESSAGE_WRONGLY_SIGNED" } - } else { - throw "HEAD_MESSAGE_WRONGLY_SIGNED" } + } catch (e) { + this.server.logger.trace(e) } })) this.server.push({ @@ -339,6 +343,10 @@ export class WS2PCluster { // Trim the eventual extra connections setTimeout(() => this.trimClientConnections(), WS2PConstants.CONNEXION_TIMEOUT) } + } + + listenServerFlow() { + let connectingToNodesByFlow = false // Also listen for network updates, and connect to new nodes this.server.pipe(es.mapSync((data:any) => { @@ -375,6 +383,21 @@ export class WS2PCluster { this.server.logger.warn(e) } } + + // WS2P disconnection + else if (data.ws2p === 'disconnected') { + const nbConnections = this.getAllConnections().length + if (nbConnections < WS2PConstants.CONNECTIONS_LOW_LEVEL && !connectingToNodesByFlow) { + try { + connectingToNodesByFlow = true + await this.connectToWS2Peers() + } catch (e) { + throw e + } finally { + connectingToNodesByFlow = false + } + } + } })() return data @@ -594,6 +617,7 @@ export class WS2PCluster { // Pull blocks right on start const init = async () => { try { + await this.listenServerFlow() await this.connectToWS2Peers() await this.pullBlocks() } catch (e) { diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index f196b27d36e5dc33e40f7290c41653132fb6c8af..5b887d5a2882d9a3b92d43efa56988d9a85118c9 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -15,6 +15,7 @@ export const WS2PConstants = { MAX_LEVEL_1_PEERS: 10, MAX_LEVEL_2_PEERS: 10, + CONNECTIONS_LOW_LEVEL: 3, BAN_DURATION_IN_SECONDS: 120, ERROR_RECALL_DURATION_IN_SECONDS: 60, diff --git a/test/integration/ws2p_heads.ts b/test/integration/ws2p_heads.ts index 748bc9ed412c27229c521eba209df9b352c4e5d9..7f0e78e50fa47b3e4fa08c96f9eac99718480356 100644 --- a/test/integration/ws2p_heads.ts +++ b/test/integration/ws2p_heads.ts @@ -82,10 +82,12 @@ describe("WS2P heads propagation", function() { }) }) - it('should be able to connect to s3 if the we increase size limit', async () => { + it('should be able to receive HEADs', async () => { b3 = s1.commit({ time: now }) - await s2.waitToHaveBlock(3) - await s2.waitForHeads(1) + await Promise.all([ + s2.waitToHaveBlock(3), + s2.waitForHeads(1) + ]) await s1.expect('/network/ws2p/info', (res:any) => { assert.equal(res.peers.level1, 0) assert.equal(res.peers.level2, 1)