diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 34f380d7d04260e44f6e73d0380802c5ae1e8063..129426d3b2a97617ab31773897a1f2a452e9643f 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -35,6 +35,9 @@ export class WS2PCluster { private maxLevel1Size = WS2PConstants.MAX_LEVEL_1_PEERS private messageHandler: WS2PServerMessageHandler + // A cache to remember the banned keys + private banned:{ [k:string]: string } = {} + // A cache to know if a block exists or not in the DB private blockstampsCache:{ [k:string]: number } = {} @@ -83,42 +86,43 @@ export class WS2PCluster { await Promise.all(heads.map(async (h:{ message:string, sig:string }) => { const message = h.message const sig = h.sig - try { - if (message && message.match(WS2PConstants.HEAD_REGEXP)) { - const [,, pub, blockstamp]:string[] = message.split(':') - const sigOK = verify(message, sig, pub) - if (sigOK) { - // Already known? - if (!this.headsCache[pub] || this.headsCache[pub].blockstamp !== blockstamp) { - // More recent? - if (!this.headsCache[pub] || parseInt(this.headsCache[pub].blockstamp) < parseInt(blockstamp)) { - // Check that issuer is a member and that the block exists - const memberKey = await this.isMemberKey(pub) - if (memberKey) { - const exists = await this.existsBlock(blockstamp) - if (exists) { - this.headsCache[pub] = { blockstamp, message, sig } - this.newHeads.push({message, sig}) - added.push({message, sig}) - // Cancel a pending "heads" to be spread - if (this.headsTimeout) { - clearTimeout(this.headsTimeout) - } - // Reprogram it a few moments later - this.headsTimeout = setTimeout(async () => { - const heads = this.newHeads.splice(0, this.newHeads.length) - if (heads.length) { - await this.spreadNewHeads(heads) - } - }, WS2PConstants.HEADS_SPREAD_TIMEOUT) + if (!message) { + throw "EMPTY_MESSAGE_FOR_HEAD" + } + if (message.match(WS2PConstants.HEAD_REGEXP)) { + const [,, pub, blockstamp]:string[] = message.split(':') + const sigOK = verify(message, sig, pub) + if (sigOK) { + // Already known? + if (!this.headsCache[pub] || this.headsCache[pub].blockstamp !== blockstamp) { + // More recent? + if (!this.headsCache[pub] || parseInt(this.headsCache[pub].blockstamp) < parseInt(blockstamp)) { + // Check that issuer is a member and that the block exists + const memberKey = await this.isMemberKey(pub) + if (memberKey) { + const exists = await this.existsBlock(blockstamp) + if (exists) { + this.headsCache[pub] = { blockstamp, message, sig } + this.newHeads.push({message, sig}) + added.push({message, sig}) + // Cancel a pending "heads" to be spread + if (this.headsTimeout) { + clearTimeout(this.headsTimeout) } + // Reprogram it a few moments later + this.headsTimeout = setTimeout(async () => { + const heads = this.newHeads.splice(0, this.newHeads.length) + if (heads.length) { + await this.spreadNewHeads(heads) + } + }, WS2PConstants.HEADS_SPREAD_TIMEOUT) } } } } + } else { + throw "HEAD_MESSAGE_WRONGLY_SIGNED" } - } catch (e) { - this.server.logger.trace('Rejected message %s:', message, e) } })) this.server.push({ @@ -195,7 +199,7 @@ export class WS2PCluster { if (this.ws2pServer) { await this.ws2pServer.close() } - const connections = await this.getAllConnections() + const connections = this.getAllConnections() await Promise.all(connections.map(c => c.close())) } @@ -322,7 +326,7 @@ export class WS2PCluster { } private async spreadNewHeads(heads:{ message:string, sig:string }[]) { - const connexions = await this.getAllConnections() + const connexions = this.getAllConnections() return Promise.all(connexions.map(async (c) => { try { await c.pushHeads(heads) @@ -438,6 +442,11 @@ export class WS2PCluster { priorityKeys:string[], targetWS2PUID = "" ) { + // We do not accept banned keys + if (this.banned[pub]) { + this.server.logger.warn('Connection to %s refused, reason: %s', pub.slice(0, 8), this.banned[pub]) + return false + } let accept = priorityKeys.indexOf(pub) !== -1 if (!accept && connectedPubkeys.indexOf(pub) === -1) { // Do we have room? @@ -504,7 +513,7 @@ export class WS2PCluster { return this.ws2pServer ? this.ws2pServer.getConnexions() : [] } - async getAllConnections() { + getAllConnections() { const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : [] for (const uuid of Object.keys(this.ws2pClients)) { all.push(this.ws2pClients[uuid].connection) @@ -562,7 +571,7 @@ export class WS2PCluster { } private async makeApullShot() { - const connections = await this.getAllConnections() + const connections = this.getAllConnections() const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) await Promise.all(chosen.map(async (conn) => { @@ -581,7 +590,7 @@ export class WS2PCluster { } async pullDocpool() { - const connections = await this.getAllConnections() + const connections = this.getAllConnections() const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT) await Promise.all(chosen.map(async (conn) => { const puller = new WS2PDocpoolPuller(this.server, conn) @@ -594,4 +603,20 @@ export class WS2PCluster { const served = this.ws2pServer ? this.ws2pServer.getConnexions().map(c => c.pubkey) : [] return clients.concat(served) } + + banConnection(c:WS2PConnection, reason:string) { + this.server.logger.warn('Banning connections of %s for %ss, reason: %s', c.pubkey.slice(0, 8), WS2PConstants.BAN_DURATION_IN_SECONDS, reason) + if (c.pubkey) { + this.banned[c.pubkey] = reason + setTimeout(() => { + delete this.banned[c.pubkey] + }, 1000 * WS2PConstants.BAN_DURATION_IN_SECONDS) + const connections = this.getAllConnections() + for (const connection of connections) { + if (c.pubkey == connection.pubkey) { + connection.close() + } + } + } + } } \ No newline at end of file diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index 14131644e41a4b40f2787e0c6959a1f3ee8a5c1c..139175bfc945163f88dc25151c0e352a64f2512e 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -476,7 +476,7 @@ export class WS2PConnection { // Push message else { this.nbPushsByRemoteCount++ - await this.messageHandler.handlePushMessage(data) + await this.messageHandler.handlePushMessage(data, this) } } } diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index 230bdda87b422c736389632ef98ffd8aeb6fc701..8fd9b790b34ff9845dbccd5b7c198b4a057d6e99 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -16,6 +16,9 @@ export const WS2PConstants = { MAX_LEVEL_1_PEERS: 10, MAX_LEVEL_2_PEERS: 10, + BAN_DURATION_IN_SECONDS: 120, + ERROR_RECALL_DURATION_IN_SECONDS: 60, + HEAD_REGEXP: new RegExp('^WS2P:HEAD:' + CommonConstants.FORMATS.PUBKEY + ':' + CommonConstants.FORMATS.BLOCKSTAMP + '$'), HEADS_SPREAD_TIMEOUT: 100 // Wait 100ms before sending a bunch of signed heads diff --git a/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts b/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts index 650e9f4e90c7de3dccc037af80489f5407c3a3ed..312fa614b458b8ff52de6f7aba3ce820b6748162 100644 --- a/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts +++ b/app/modules/ws2p/lib/impl/WS2PMessageHandler.ts @@ -1,6 +1,7 @@ import {WS2PResponse} from "./WS2PResponse" +import {WS2PConnection} from "../WS2PConnection" export interface WS2PMessageHandler { - handlePushMessage(json:any): Promise<void> + handlePushMessage(json:any, c:WS2PConnection): Promise<void> answerToRequest(json:any): Promise<WS2PResponse> } \ No newline at end of file diff --git a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts index f722a765e68ee5634eeb7adabf2c06eb8680d064..f6a19f1071697724b19c75dd7299fc8494b8df19 100644 --- a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts +++ b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts @@ -11,6 +11,8 @@ import {TransactionDTO} from "../../../../lib/dto/TransactionDTO" import {PeerDTO} from "../../../../lib/dto/PeerDTO" import {WS2P_REQ} from "../WS2PRequester" import {WS2PCluster} from "../WS2PCluster" +import {WS2PConnection} from "../WS2PConnection" +import {WS2PConstants} from "../constants" export enum WS2P_REQERROR { UNKNOWN_REQUEST @@ -19,49 +21,86 @@ export enum WS2P_REQERROR { export class WS2PServerMessageHandler implements WS2PMessageHandler { protected mapper:WS2PReqMapper + private errors:{ + [k:string]: { + createdOn: number, + pubkeys: { + [p:string]: boolean + } + } + } = {} constructor(protected server:Server, protected cluster:WS2PCluster) { this.mapper = new WS2PReqMapperByServer(server) } - async handlePushMessage(json: any): Promise<void> { + async handlePushMessage(json: any, c:WS2PConnection): Promise<void> { + let documentHash = '' try { if (json.body) { if (json.body.block) { const dto = BlockDTO.fromJSONObject(json.body.block) const raw = dto.getRawSigned() + documentHash = dto.getHash() await this.server.writeRawBlock(raw) } else if (json.body.identity) { const dto = IdentityDTO.fromJSONObject(json.body.identity) const raw = dto.getRawSigned() + documentHash = dto.getHash() await this.server.writeRawIdentity(raw) } else if (json.body.certification) { const dto = CertificationDTO.fromJSONObject(json.body.certification) const raw = dto.getRawSigned() + documentHash = dto.getHash() await this.server.writeRawCertification(raw) } else if (json.body.membership) { const dto = MembershipDTO.fromJSONObject(json.body.membership) const raw = dto.getRawSigned() + documentHash = dto.getHash() await this.server.writeRawMembership(raw) } else if (json.body.transaction) { const dto = TransactionDTO.fromJSONObject(json.body.transaction) const raw = dto.getRaw() + documentHash = dto.getHash() await this.server.writeRawTransaction(raw) } else if (json.body.peer) { const dto = PeerDTO.fromJSONObject(json.body.peer) const raw = dto.getRawSigned() + documentHash = dto.getHash() await this.server.writeRawPeer(raw) } else if (json.body.heads && typeof json.body.heads === "object" && json.body.heads.length !== undefined) { + if (!json.body.heads.length) { + documentHash = 'HEADs' + throw "Heads empty HEADs received" + } await this.cluster.headsReceived(json.body.heads || []) } } } catch(e) { + if (documentHash + && this.errors[documentHash] + && this.errors[documentHash].pubkeys[c.pubkey] !== undefined + && this.server.conf.pair.pub !== c.pubkey) { // We do not want to ban ourselves + this.cluster.banConnection(c, "Peer " + (c.pubkey || '--unknown--') + " sending again a wrong document") + } else { + // Remember the error for some time + if (!this.errors[documentHash]) { + this.errors[documentHash] = { + createdOn: Date.now(), + pubkeys: {} + } + } + this.errors[documentHash].pubkeys[c.pubkey] = true + setTimeout(() => { + delete this.errors[documentHash] + }, WS2PConstants.ERROR_RECALL_DURATION_IN_SECONDS) + } this.server.logger.warn(e) } }