From 2cfc3b4947e516b871bde61b722cc6d2475baf09 Mon Sep 17 00:00:00 2001
From: cgeek <cem.moreau@gmail.com>
Date: Mon, 9 Oct 2017 22:08:54 +0200
Subject: [PATCH] [fix] #1147 Prevent double-sending of a same document + ban
 on 5 errors

---
 app/modules/ws2p/lib/WS2PClient.ts            | 14 +++-
 app/modules/ws2p/lib/WS2PServer.ts            |  9 ++-
 app/modules/ws2p/lib/WS2PSingleWriteStream.ts | 81 +++++++++++++++++++
 app/modules/ws2p/lib/constants.ts             |  2 +
 .../lib/interface/WS2PServerMessageHandler.ts | 19 ++++-
 test/fast/modules/ws2p/single_write.ts        | 46 +++++++++++
 6 files changed, 162 insertions(+), 9 deletions(-)
 create mode 100644 app/modules/ws2p/lib/WS2PSingleWriteStream.ts
 create mode 100644 test/fast/modules/ws2p/single_write.ts

diff --git a/app/modules/ws2p/lib/WS2PClient.ts b/app/modules/ws2p/lib/WS2PClient.ts
index a3e599eba..34710012a 100644
--- a/app/modules/ws2p/lib/WS2PClient.ts
+++ b/app/modules/ws2p/lib/WS2PClient.ts
@@ -3,7 +3,8 @@ import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "./WS2PC
 import {Key} from "../../../lib/common-libs/crypto/keyring"
 import {WS2PMessageHandler} from "./impl/WS2PMessageHandler"
 import {WS2PConstants} from "./constants"
-import { WS2PStreamer } from "./WS2PStreamer";
+import {WS2PStreamer} from "./WS2PStreamer"
+import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream"
 
 export class WS2PClient {
 
@@ -22,17 +23,22 @@ export class WS2PClient {
       },
       expectedPub
     )
+    const singleWriteProtection = new WS2PSingleWriteStream()
     const streamer = new WS2PStreamer(c)
     c.connected
       .then(() => {
         // Streaming
-        server.pipe(streamer)
+        server
+          .pipe(singleWriteProtection)
+          .pipe(streamer)
       })
       .catch(() => {
-        server.unpipe(streamer)
+        server.unpipe(singleWriteProtection)
+        singleWriteProtection.unpipe(streamer)
       })
     c.closed.then(() => {
-      server.unpipe(streamer)
+      server.unpipe(singleWriteProtection)
+      singleWriteProtection.unpipe(streamer)
     })
 
     // Connecting
diff --git a/app/modules/ws2p/lib/WS2PServer.ts b/app/modules/ws2p/lib/WS2PServer.ts
index 09da1f664..c9d3dc9fb 100644
--- a/app/modules/ws2p/lib/WS2PServer.ts
+++ b/app/modules/ws2p/lib/WS2PServer.ts
@@ -6,6 +6,7 @@ import * as events from "events"
 import {WS2PConstants} from "./constants"
 import {WS2PMessageHandler} from "./impl/WS2PMessageHandler"
 import {WS2PStreamer} from "./WS2PStreamer"
+import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream"
 
 const WebSocketServer = require('ws').Server
 
@@ -87,15 +88,19 @@ export class WS2PServer extends events.EventEmitter {
         this.server.logger.info('WS2P: established incoming connection from %s:%s', host, port)
 
         // Broadcasting
+        const singleWriteProtection = new WS2PSingleWriteStream()
         const ws2pStreamer = new WS2PStreamer(c)
-        this.server.pipe(ws2pStreamer)
+        this.server
+          .pipe(singleWriteProtection)
+          .pipe(ws2pStreamer)
 
         ws.on('error', (e:any) => {
           this.server.logger.error(e)
         })
 
         ws.on('close', () => {
-          this.server.unpipe(ws2pStreamer)
+          this.server.unpipe(singleWriteProtection)
+          singleWriteProtection.unpipe(ws2pStreamer)
           this.removeConnection(c)
           this.server.push({
             ws2p: 'disconnected',
diff --git a/app/modules/ws2p/lib/WS2PSingleWriteStream.ts b/app/modules/ws2p/lib/WS2PSingleWriteStream.ts
new file mode 100644
index 000000000..69b619a82
--- /dev/null
+++ b/app/modules/ws2p/lib/WS2PSingleWriteStream.ts
@@ -0,0 +1,81 @@
+import * as stream from "stream"
+import {NewLogger} from "../../../lib/logger"
+import {CertificationDTO} from "../../../lib/dto/CertificationDTO"
+import {IdentityDTO} from "../../../lib/dto/IdentityDTO"
+import {BlockDTO} from "../../../lib/dto/BlockDTO"
+import {MembershipDTO} from "../../../lib/dto/MembershipDTO"
+import {TransactionDTO} from "../../../lib/dto/TransactionDTO"
+import {PeerDTO} from "../../../lib/dto/PeerDTO"
+import {WS2PConstants} from "./constants"
+
+const logger = NewLogger()
+
+export class WS2PSingleWriteStream extends stream.Transform {
+
+  private detections:{
+    [k:string]: number
+  } = {}
+
+  constructor(private protectionDuration = 1000 * WS2PConstants.SINGLE_RECORD_PROTECTION_IN_SECONDS) {
+    super({ objectMode: true })
+  }
+
+  getNbProtectionsCurrently() {
+    return Object.keys(this.detections).length
+  }
+
+  async _write(obj:any, enc:any, done:any) {
+    let documentHash = ''
+    let doStream = false
+    try {
+
+      if (obj.joiners) {
+        const dto = BlockDTO.fromJSONObject(obj)
+        documentHash = dto.getHash()
+      }
+      else if (obj.pubkey && obj.uid) {
+        const dto = IdentityDTO.fromJSONObject(obj)
+        documentHash = dto.getHash()
+      }
+      else if (obj.idty_uid) {
+        const dto = CertificationDTO.fromJSONObject(obj)
+        documentHash = dto.getHash()
+      }
+      else if (obj.userid) {
+        const dto = MembershipDTO.fromJSONObject(obj)
+        documentHash = dto.getHash()
+      }
+      else if (obj.issuers) {
+        const dto = TransactionDTO.fromJSONObject(obj)
+        documentHash = dto.getHash()
+      }
+      else if (obj.endpoints) {
+        const dto = PeerDTO.fromJSONObject(obj)
+        documentHash = dto.getHash()
+      }
+
+      if (documentHash) {
+        if (!this.detections[documentHash]) {
+          doStream = true
+          this.detections[documentHash] = 1
+        } else {
+          this.detections[documentHash]++
+          logger.warn('WS2P OUT => Document detected %s times: %s', this.detections[documentHash], JSON.stringify(obj))
+        }
+
+        setTimeout(() => {
+          delete this.detections[documentHash]
+        }, this.protectionDuration)
+      }
+
+      if (doStream) {
+        this.push(obj)
+      }
+
+    } catch (e) {
+      logger.warn('WS2P >> SingleWrite >>', e)
+    }
+
+    done && done()
+  }
+}
diff --git a/app/modules/ws2p/lib/constants.ts b/app/modules/ws2p/lib/constants.ts
index 5b887d5a2..ace1fe96e 100644
--- a/app/modules/ws2p/lib/constants.ts
+++ b/app/modules/ws2p/lib/constants.ts
@@ -18,7 +18,9 @@ export const WS2PConstants = {
   CONNECTIONS_LOW_LEVEL: 3,
 
   BAN_DURATION_IN_SECONDS: 120,
+  BAN_ON_REPEAT_THRESHOLD: 5,
   ERROR_RECALL_DURATION_IN_SECONDS: 60,
+  SINGLE_RECORD_PROTECTION_IN_SECONDS: 60,
 
   HEAD_V0_REGEXP: new RegExp('^WS2P:HEAD:'
     + CommonConstants.FORMATS.PUBKEY + ':'
diff --git a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts
index 1f666afc5..1a64ac0fc 100644
--- a/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts
+++ b/app/modules/ws2p/lib/interface/WS2PServerMessageHandler.ts
@@ -26,7 +26,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
     [k:string]: {
       createdOn: number,
       pubkeys: {
-        [p:string]: boolean
+        [p:string]: any[]
       }
     }
   } = {}
@@ -88,7 +88,20 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
         && this.errors[documentHash]
         && this.errors[documentHash].pubkeys[c.pubkey] !== undefined
         && this.server.conf.pair.pub !== c.pubkey) { // We do not want to ban ourselves
-        this.cluster.banConnection(c, "Peer " + (c.pubkey || '--unknown--') + " sending again a wrong document")
+        this.errors[documentHash].pubkeys[c.pubkey].push(json.body)
+        if (this.errors[documentHash].pubkeys[c.pubkey].length >= WS2PConstants.BAN_ON_REPEAT_THRESHOLD) {
+          let message = "peer " + (c.pubkey || '--unknown--') + " sent " + WS2PConstants.BAN_ON_REPEAT_THRESHOLD + " times a same wrong document: " + (e && (e.message || (e.uerr && e.uerr.message)) || JSON.stringify(e))
+          this.cluster.banConnection(c, message)
+          for (const body of this.errors[documentHash].pubkeys[c.pubkey]) {
+            message += '\n => ' + JSON.stringify(body)
+          }
+        } else {
+          let message = "WS2P IN => " + (c.pubkey || '--unknown--') + " sent " + this.errors[documentHash].pubkeys[c.pubkey].length + " times a same wrong document: " + (e && (e.message || (e.uerr && e.uerr.message)) || JSON.stringify(e))
+          for (const body of this.errors[documentHash].pubkeys[c.pubkey]) {
+            message += '\n => ' + JSON.stringify(body)
+          }
+          this.server.logger.warn(message)
+        }
       } else {
         // Remember the error for some time
         if (!this.errors[documentHash]) {
@@ -97,7 +110,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
             pubkeys: {}
           }
         }
-        this.errors[documentHash].pubkeys[c.pubkey] = true
+        this.errors[documentHash].pubkeys[c.pubkey] = [json.body]
         setTimeout(() => {
           delete this.errors[documentHash]
         }, 1000 * WS2PConstants.ERROR_RECALL_DURATION_IN_SECONDS)
diff --git a/test/fast/modules/ws2p/single_write.ts b/test/fast/modules/ws2p/single_write.ts
new file mode 100644
index 000000000..a2eaff627
--- /dev/null
+++ b/test/fast/modules/ws2p/single_write.ts
@@ -0,0 +1,46 @@
+import * as stream from "stream"
+import * as assert from "assert"
+
+import {WS2PSingleWriteStream} from "../../../../app/modules/ws2p/lib/WS2PSingleWriteStream"
+
+const es = require('event-stream')
+
+describe('WS2P Single Write limiter', () => {
+
+  const PROTECTION_DURATION = 100
+  
+  it('should detect double writings', async () => {
+    const source = new Readable()
+    const protection = new WS2PSingleWriteStream(PROTECTION_DURATION)
+    let nbDocs = 0
+    await new Promise(res => {
+      source
+        .pipe(protection)
+        .pipe(es.mapSync(() => {
+        nbDocs++
+        if (nbDocs >= 2) {
+          res()
+        }
+      }))
+
+      // Writing
+      source.push({ joiners: [] }) // A block
+      source.push({ joiners: [] }) // A block
+      source.push({ endpoints: [] }) // A peer
+    })
+    assert.equal(nbDocs, 2)
+    assert.equal(protection.getNbProtectionsCurrently(), 2)
+    await new Promise(res => setTimeout(res, PROTECTION_DURATION + 100))
+    assert.equal(protection.getNbProtectionsCurrently(), 0)
+  })
+})
+
+class Readable extends stream.Readable {
+
+  constructor() {
+    super({ objectMode: true })
+  }
+
+  async _read() {
+  }
+}
\ No newline at end of file
-- 
GitLab