Mise à jour effectuée, merci de nous signaler tout dysfonctionnement ! | Upgrade done, please let us know about any dysfunction!

Commit 414c5ece authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1135 Never allow to loose all the WS2P connections

parent 68087678
......@@ -100,83 +100,87 @@ export class WS2PCluster {
async headsReceived(heads:[{ message:string, sig:string }]) {
const added:{ message:string, sig:string }[] = []
await Promise.all(heads.map(async (h:{ message:string, sig:string }) => {
const message = h.message
const sig = h.sig
if (!message) {
throw "EMPTY_MESSAGE_FOR_HEAD"
}
if (message.match(WS2PConstants.HEAD_V0_REGEXP)) {
const [,, pub, blockstamp]:string[] = message.split(':')
const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000'
const fullId = [pub, ws2pId].join('-')
const sigOK = verify(message, sig, pub)
if (sigOK) {
// Already known?
if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) {
// More recent?
if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) {
// Check that issuer is a member and that the block exists
const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub))
if (isAllowed) {
const exists = await this.existsBlock(blockstamp)
if (exists) {
this.headsCache[fullId] = { blockstamp, message, sig }
this.newHeads.push({message, sig})
added.push({message, sig})
// Cancel a pending "heads" to be spread
if (this.headsTimeout) {
clearTimeout(this.headsTimeout)
}
// Reprogram it a few moments later
this.headsTimeout = setTimeout(async () => {
const heads = this.newHeads.splice(0, this.newHeads.length)
if (heads.length) {
await this.spreadNewHeads(heads)
try {
const message = h.message
const sig = h.sig
if (!message) {
throw "EMPTY_MESSAGE_FOR_HEAD"
}
if (message.match(WS2PConstants.HEAD_V0_REGEXP)) {
const [,, pub, blockstamp]:string[] = message.split(':')
const ws2pId = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) || '000000'
const fullId = [pub, ws2pId].join('-')
const sigOK = verify(message, sig, pub)
if (sigOK) {
// Already known?
if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) {
// More recent?
if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) {
// Check that issuer is a member and that the block exists
const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub))
if (isAllowed) {
const exists = await this.existsBlock(blockstamp)
if (exists) {
this.headsCache[fullId] = { blockstamp, message, sig }
this.newHeads.push({message, sig})
added.push({message, sig})
// Cancel a pending "heads" to be spread
if (this.headsTimeout) {
clearTimeout(this.headsTimeout)
}
}, WS2PConstants.HEADS_SPREAD_TIMEOUT)
// Reprogram it a few moments later
this.headsTimeout = setTimeout(async () => {
const heads = this.newHeads.splice(0, this.newHeads.length)
if (heads.length) {
await this.spreadNewHeads(heads)
}
}, WS2PConstants.HEADS_SPREAD_TIMEOUT)
}
}
}
}
} else {
throw "HEAD_MESSAGE_WRONGLY_SIGNED"
}
} else {
throw "HEAD_MESSAGE_WRONGLY_SIGNED"
}
}
else if (message.match(WS2PConstants.HEAD_V1_REGEXP)) {
const [,,, pub, blockstamp, software, ws2pId, softVersion, prefix]:string[] = message.split(':')
const sigOK = verify(message, sig, pub)
const fullId = [pub, ws2pId].join('-')
if (sigOK) {
// Already known?
if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) {
// More recent?
if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) {
// Check that issuer is a member and that the block exists
const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub))
if (isAllowed) {
const exists = await this.existsBlock(blockstamp)
if (exists) {
this.headsCache[fullId] = { blockstamp, message, sig }
this.newHeads.push({message, sig})
added.push({message, sig})
// Cancel a pending "heads" to be spread
if (this.headsTimeout) {
clearTimeout(this.headsTimeout)
}
// Reprogram it a few moments later
this.headsTimeout = setTimeout(async () => {
const heads = this.newHeads.splice(0, this.newHeads.length)
if (heads.length) {
await this.spreadNewHeads(heads)
else if (message.match(WS2PConstants.HEAD_V1_REGEXP)) {
const [,,, pub, blockstamp, software, ws2pId, softVersion, prefix]:string[] = message.split(':')
const sigOK = verify(message, sig, pub)
const fullId = [pub, ws2pId].join('-')
if (sigOK) {
// Already known?
if (!this.headsCache[fullId] || this.headsCache[fullId].blockstamp !== blockstamp) {
// More recent?
if (!this.headsCache[fullId] || parseInt(this.headsCache[fullId].blockstamp) < parseInt(blockstamp)) {
// Check that issuer is a member and that the block exists
const isAllowed = pub === this.server.conf.pair.pub || this.isConnectedKey(pub) || (await this.isMemberKey(pub))
if (isAllowed) {
const exists = await this.existsBlock(blockstamp)
if (exists) {
this.headsCache[fullId] = { blockstamp, message, sig }
this.newHeads.push({message, sig})
added.push({message, sig})
// Cancel a pending "heads" to be spread
if (this.headsTimeout) {
clearTimeout(this.headsTimeout)
}
}, WS2PConstants.HEADS_SPREAD_TIMEOUT)
// Reprogram it a few moments later
this.headsTimeout = setTimeout(async () => {
const heads = this.newHeads.splice(0, this.newHeads.length)
if (heads.length) {
await this.spreadNewHeads(heads)
}
}, WS2PConstants.HEADS_SPREAD_TIMEOUT)
}
}
}
}
} else {
throw "HEAD_MESSAGE_WRONGLY_SIGNED"
}
} else {
throw "HEAD_MESSAGE_WRONGLY_SIGNED"
}
} catch (e) {
this.server.logger.trace(e)
}
}))
this.server.push({
......@@ -339,6 +343,10 @@ export class WS2PCluster {
// Trim the eventual extra connections
setTimeout(() => this.trimClientConnections(), WS2PConstants.CONNEXION_TIMEOUT)
}
}
listenServerFlow() {
let connectingToNodesByFlow = false
// Also listen for network updates, and connect to new nodes
this.server.pipe(es.mapSync((data:any) => {
......@@ -375,6 +383,21 @@ export class WS2PCluster {
this.server.logger.warn(e)
}
}
// WS2P disconnection
else if (data.ws2p === 'disconnected') {
const nbConnections = this.getAllConnections().length
if (nbConnections < WS2PConstants.CONNECTIONS_LOW_LEVEL && !connectingToNodesByFlow) {
try {
connectingToNodesByFlow = true
await this.connectToWS2Peers()
} catch (e) {
throw e
} finally {
connectingToNodesByFlow = false
}
}
}
})()
return data
......@@ -594,6 +617,7 @@ export class WS2PCluster {
// Pull blocks right on start
const init = async () => {
try {
await this.listenServerFlow()
await this.connectToWS2Peers()
await this.pullBlocks()
} catch (e) {
......
......@@ -15,6 +15,7 @@ export const WS2PConstants = {
MAX_LEVEL_1_PEERS: 10,
MAX_LEVEL_2_PEERS: 10,
CONNECTIONS_LOW_LEVEL: 3,
BAN_DURATION_IN_SECONDS: 120,
ERROR_RECALL_DURATION_IN_SECONDS: 60,
......
......@@ -82,10 +82,12 @@ describe("WS2P heads propagation", function() {
})
})
it('should be able to connect to s3 if the we increase size limit', async () => {
it('should be able to receive HEADs', async () => {
b3 = s1.commit({ time: now })
await s2.waitToHaveBlock(3)
await s2.waitForHeads(1)
await Promise.all([
s2.waitToHaveBlock(3),
s2.waitForHeads(1)
])
await s1.expect('/network/ws2p/info', (res:any) => {
assert.equal(res.peers.level1, 0)
assert.equal(res.peers.level2, 1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment