diff --git a/app/modules/ws2p/lib/WS2PClient.ts b/app/modules/ws2p/lib/WS2PClient.ts index a3e599eba9d94f1d557ee8bd10c8fe7172272f29..34710012a92988df92e4255bd6c2357c1381cebf 100644 --- a/app/modules/ws2p/lib/WS2PClient.ts +++ b/app/modules/ws2p/lib/WS2PClient.ts @@ -3,7 +3,8 @@ import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "./WS2PC import {Key} from "../../../lib/common-libs/crypto/keyring" import {WS2PMessageHandler} from "./impl/WS2PMessageHandler" import {WS2PConstants} from "./constants" -import { WS2PStreamer } from "./WS2PStreamer"; +import {WS2PStreamer} from "./WS2PStreamer" +import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream" export class WS2PClient { @@ -22,17 +23,22 @@ export class WS2PClient { }, expectedPub ) + const singleWriteProtection = new WS2PSingleWriteStream() const streamer = new WS2PStreamer(c) c.connected .then(() => { // Streaming - server.pipe(streamer) + server + .pipe(singleWriteProtection) + .pipe(streamer) }) .catch(() => { - server.unpipe(streamer) + server.unpipe(singleWriteProtection) + singleWriteProtection.unpipe(streamer) }) c.closed.then(() => { - server.unpipe(streamer) + server.unpipe(singleWriteProtection) + singleWriteProtection.unpipe(streamer) }) // Connecting diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts index 09da1f664184195e68f89c71a0ede409f08e34bb..c9d3dc9fb025dbcc318c7ef6ede9bbedf08c44de 100644 --- a/app/modules/ws2p/lib/WS2PServer.ts +++ b/app/modules/ws2p/lib/WS2PServer.ts @@ -6,6 +6,7 @@ import * as events from "events" import {WS2PConstants} from "./constants" import {WS2PMessageHandler} from "./impl/WS2PMessageHandler" import {WS2PStreamer} from "./WS2PStreamer" +import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream" const WebSocketServer = require('ws').Server @@ -87,15 +88,19 @@ export class WS2PServer extends events.EventEmitter { this.server.logger.info('WS2P: established incoming connection from %s:%s', host, port) // Broadcasting + const singleWriteProtection = new WS2PSingleWriteStream() const ws2pStreamer = new WS2PStreamer(c) - this.server.pipe(ws2pStreamer) + this.server + .pipe(singleWriteProtection) + .pipe(ws2pStreamer) ws.on('error', (e:any) => { this.server.logger.error(e) }) ws.on('close', () => { - this.server.unpipe(ws2pStreamer) + this.server.unpipe(singleWriteProtection) + singleWriteProtection.unpipe(ws2pStreamer) this.removeConnection(c) this.server.push({ ws2p: 'disconnected', diff --git a/app/modules/ws2p/lib/WS2PSingleWriteStream.ts b/app/modules/ws2p/lib/WS2PSingleWriteStream.ts new file mode 100644 index 0000000000000000000000000000000000000000..69b619a829678711514d92746eced4539bc1b17b --- /dev/null +++ b/app/modules/ws2p/lib/WS2PSingleWriteStream.ts @@ -0,0 +1,81 @@ +import * as stream from "stream" +import {NewLogger} from "../../../lib/logger" +import {CertificationDTO} from "../../../lib/dto/CertificationDTO" +import {IdentityDTO} from "../../../lib/dto/IdentityDTO" +import {BlockDTO} from "../../../lib/dto/BlockDTO" +import {MembershipDTO} from "../../../lib/dto/MembershipDTO" +import {TransactionDTO} from "../../../lib/dto/TransactionDTO" +import {PeerDTO} from "../../../lib/dto/PeerDTO" +import {WS2PConstants} from "./constants" + +const logger = NewLogger() + +export class WS2PSingleWriteStream extends stream.Transform { + + private detections:{ + [k:string]: number + } = {} + + constructor(private protectionDuration = 1000 * WS2PConstants.SINGLE_RECORD_PROTECTION_IN_SECONDS) { + super({ objectMode: true }) + } + + getNbProtectionsCurrently() { + return Object.keys(this.detections).length + } + + async _write(obj:any, enc:any, done:any) { + let documentHash = '' + let doStream = false + try { + + if (obj.joiners) { + const dto = BlockDTO.fromJSONObject(obj) + documentHash = dto.getHash() + } + else if (obj.pubkey && obj.uid) { + const dto = IdentityDTO.fromJSONObject(obj) + documentHash = dto.getHash() + } + else if (obj.idty_uid) { + const dto = CertificationDTO.fromJSONObject(obj) + documentHash = dto.getHash() + } + else if (obj.userid) { + const dto = MembershipDTO.fromJSONObject(obj) + documentHash = dto.getHash() + } + else if (obj.issuers) { + const dto = TransactionDTO.fromJSONObject(obj) + documentHash = dto.getHash() + } + else if (obj.endpoints) { + const dto = PeerDTO.fromJSONObject(obj) + documentHash = dto.getHash() + } + + if (documentHash) { + if (!this.detections[documentHash]) { + doStream = true + this.detections[documentHash] = 1 + } else { + this.detections[documentHash]++ + logger.warn('WS2P OUT => Document detected %s times: %s', this.detections[documentHash], JSON.stringify(obj)) + } + + setTimeout(() => { + delete this.detections[documentHash] + }, this.protectionDuration) + } + + if (doStream) { + this.push(obj) + } + + } catch (e) { + logger.warn('WS2P >> SingleWrite >>', e) + } + + done && done() + } +} diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts index 5b887d5a2882d9a3b92d43efa56988d9a85118c9..ace1fe96e1d5c67685ff2d36edbdd34a6002d541 100644 --- a/app/modules/ws2p/lib/constants.ts +++ b/app/modules/ws2p/lib/constants.ts @@ -18,7 +18,9 @@ export const WS2PConstants = { CONNECTIONS_LOW_LEVEL: 3, BAN_DURATION_IN_SECONDS: 120, + BAN_ON_REPEAT_THRESHOLD: 5, ERROR_RECALL_DURATION_IN_SECONDS: 60, + SINGLE_RECORD_PROTECTION_IN_SECONDS: 60, HEAD_V0_REGEXP: new RegExp('^WS2P:HEAD:' + CommonConstants.FORMATS.PUBKEY + ':' diff --git a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts index 1f666afc5d37ce9bef94056c67e60e921e041210..1a64ac0fc5edc995ded732603f41e34d81e35a48 100644 --- a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts +++ b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts @@ -26,7 +26,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { [k:string]: { createdOn: number, pubkeys: { - [p:string]: boolean + [p:string]: any[] } } } = {} @@ -88,7 +88,20 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { && this.errors[documentHash] && this.errors[documentHash].pubkeys[c.pubkey] !== undefined && this.server.conf.pair.pub !== c.pubkey) { // We do not want to ban ourselves - this.cluster.banConnection(c, "Peer " + (c.pubkey || '--unknown--') + " sending again a wrong document") + this.errors[documentHash].pubkeys[c.pubkey].push(json.body) + if (this.errors[documentHash].pubkeys[c.pubkey].length >= WS2PConstants.BAN_ON_REPEAT_THRESHOLD) { + let message = "peer " + (c.pubkey || '--unknown--') + " sent " + WS2PConstants.BAN_ON_REPEAT_THRESHOLD + " times a same wrong document: " + (e && (e.message || (e.uerr && e.uerr.message)) || JSON.stringify(e)) + this.cluster.banConnection(c, message) + for (const body of this.errors[documentHash].pubkeys[c.pubkey]) { + message += '\n => ' + JSON.stringify(body) + } + } else { + let message = "WS2P IN => " + (c.pubkey || '--unknown--') + " sent " + this.errors[documentHash].pubkeys[c.pubkey].length + " times a same wrong document: " + (e && (e.message || (e.uerr && e.uerr.message)) || JSON.stringify(e)) + for (const body of this.errors[documentHash].pubkeys[c.pubkey]) { + message += '\n => ' + JSON.stringify(body) + } + this.server.logger.warn(message) + } } else { // Remember the error for some time if (!this.errors[documentHash]) { @@ -97,7 +110,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler { pubkeys: {} } } - this.errors[documentHash].pubkeys[c.pubkey] = true + this.errors[documentHash].pubkeys[c.pubkey] = [json.body] setTimeout(() => { delete this.errors[documentHash] }, 1000 * WS2PConstants.ERROR_RECALL_DURATION_IN_SECONDS) diff --git a/test/fast/modules/ws2p/single_write.ts b/test/fast/modules/ws2p/single_write.ts new file mode 100644 index 0000000000000000000000000000000000000000..a2eaff6273bafba669baf6359a0250f2ce817607 --- /dev/null +++ b/test/fast/modules/ws2p/single_write.ts @@ -0,0 +1,46 @@ +import * as stream from "stream" +import * as assert from "assert" + +import {WS2PSingleWriteStream} from "../../../../app/modules/ws2p/lib/WS2PSingleWriteStream" + +const es = require('event-stream') + +describe('WS2P Single Write limiter', () => { + + const PROTECTION_DURATION = 100 + + it('should detect double writings', async () => { + const source = new Readable() + const protection = new WS2PSingleWriteStream(PROTECTION_DURATION) + let nbDocs = 0 + await new Promise(res => { + source + .pipe(protection) + .pipe(es.mapSync(() => { + nbDocs++ + if (nbDocs >= 2) { + res() + } + })) + + // Writing + source.push({ joiners: [] }) // A block + source.push({ joiners: [] }) // A block + source.push({ endpoints: [] }) // A peer + }) + assert.equal(nbDocs, 2) + assert.equal(protection.getNbProtectionsCurrently(), 2) + await new Promise(res => setTimeout(res, PROTECTION_DURATION + 100)) + assert.equal(protection.getNbProtectionsCurrently(), 0) + }) +}) + +class Readable extends stream.Readable { + + constructor() { + super({ objectMode: true }) + } + + async _read() { + } +} \ No newline at end of file