From c980570d6de04015db6d15e2befeeda19f0a8ba4 Mon Sep 17 00:00:00 2001
From: cgeek <cem.moreau@gmail.com>
Date: Sun, 17 Sep 2017 16:52:06 +0200
Subject: [PATCH] [fix] WS2P document sharing must not close the streams

---
 app/lib/streams/WS2PStreamer.ts     | 41 ++++++++++-------
 app/modules/ws2p/lib/WS2PCluster.ts | 70 ++++++++++++++++-------------
 app/service/BlockchainService.ts    |  2 +-
 server.ts                           |  1 +
 4 files changed, 66 insertions(+), 48 deletions(-)

diff --git a/app/lib/streams/WS2PStreamer.ts b/app/lib/streams/WS2PStreamer.ts
index 1e8685cd0..b5497bb16 100644
--- a/app/lib/streams/WS2PStreamer.ts
+++ b/app/lib/streams/WS2PStreamer.ts
@@ -1,5 +1,8 @@
 import * as stream from "stream"
 import {WS2PConnection} from "../../modules/ws2p/lib/WS2PConnection"
+import {NewLogger} from "../logger"
+
+const logger = NewLogger()
 
 export class WS2PStreamer extends stream.Transform {
 
@@ -8,23 +11,27 @@ export class WS2PStreamer extends stream.Transform {
   }
 
   async _write(obj:any, enc:any, done:any) {
-    if (obj.joiners) {
-      await this.ws2pc.pushBlock(obj)
-    }
-    else if (obj.pubkey && obj.uid) {
-      await this.ws2pc.pushIdentity(obj)
-    }
-    else if (obj.idty_uid) {
-      await this.ws2pc.pushCertification(obj)
-    }
-    else if (obj.userid) {
-      await this.ws2pc.pushMembership(obj)
-    }
-    else if (obj.issuers) {
-      await this.ws2pc.pushTransaction(obj)
-    }
-    else if (obj.endpoints) {
-      await this.ws2pc.pushPeer(obj)
+    try {
+      if (obj.joiners) {
+        await this.ws2pc.pushBlock(obj)
+      }
+      else if (obj.pubkey && obj.uid) {
+        await this.ws2pc.pushIdentity(obj)
+      }
+      else if (obj.idty_uid) {
+        await this.ws2pc.pushCertification(obj)
+      }
+      else if (obj.userid) {
+        await this.ws2pc.pushMembership(obj)
+      }
+      else if (obj.issuers) {
+        await this.ws2pc.pushTransaction(obj)
+      }
+      else if (obj.endpoints) {
+        await this.ws2pc.pushPeer(obj)
+      }
+    } catch (e) {
+      logger.warn(e)
     }
     done && done();
   }
diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts
index 520ca25a8..bdc77eba5 100644
--- a/app/modules/ws2p/lib/WS2PCluster.ts
+++ b/app/modules/ws2p/lib/WS2PCluster.ts
@@ -252,40 +252,44 @@ export class WS2PCluster {
     }
 
     // Also listen for network updates, and connect to new nodes
-    this.server.pipe(es.mapSync(async (data:any) => {
-
-      // New peer
-      if (data.endpoints) {
-        const peer = PeerDTO.fromJSONObject(data)
-        const ws2pEnpoint = peer.getWS2P()
-        if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) {
-          // Check if already connected to the pubkey (in any way: server or client)
-          const connectedPubkeys = this.getConnectedPubkeys()
-          const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []))
-          if (shouldAccept) {
-            await this.connect(ws2pEnpoint.host, ws2pEnpoint.port, this.messageHandler)
-            // Trim the eventual extra connections
-            await this.trimClientConnections()
+    this.server.pipe(es.mapSync((data:any) => {
+
+      (async () => {
+        // New peer
+        if (data.endpoints) {
+          const peer = PeerDTO.fromJSONObject(data)
+          const ws2pEnpoint = peer.getWS2P()
+          if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) {
+            // Check if already connected to the pubkey (in any way: server or client)
+            const connectedPubkeys = this.getConnectedPubkeys()
+            const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []))
+            if (shouldAccept) {
+              await this.connect(ws2pEnpoint.host, ws2pEnpoint.port, this.messageHandler)
+              // Trim the eventual extra connections
+              await this.trimClientConnections()
+            }
           }
         }
-      }
 
-      // Block received
-      else if (data.joiners) {
-        // Update the cache
-        this.blockstampsCache[[data.number, data.hash].join('-')] = Date.now()
-      }
+        // Block received
+        else if (data.joiners) {
+          // Update the cache
+          this.blockstampsCache[[data.number, data.hash].join('-')] = Date.now()
+        }
 
-      // HEAD changed
-      else if (data.bcEvent === OtherConstants.BC_EVENT.HEAD_CHANGED || data.bcEvent === OtherConstants.BC_EVENT.SWITCHED) {
-        // Propagate this change to the network
-        const { sig, message } = this.sayHeadChangedTo(data.block.number, data.block.hash)
-        try {
-          await this.broadcastHead(message, sig)
-        } catch (e) {
-          this.server.logger.warn(e)
+        // HEAD changed
+        else if (data.bcEvent === OtherConstants.BC_EVENT.HEAD_CHANGED || data.bcEvent === OtherConstants.BC_EVENT.SWITCHED) {
+          // Propagate this change to the network
+          const { sig, message } = this.sayHeadChangedTo(data.block.number, data.block.hash)
+          try {
+            await this.broadcastHead(message, sig)
+          } catch (e) {
+            this.server.logger.warn(e)
+          }
         }
-      }
+      })()
+
+      return data
     }))
   }
 
@@ -296,7 +300,13 @@ export class WS2PCluster {
 
   private async spreadNewHeads(heads:{ message:string, sig:string }[]) {
     const connexions = await this.getAllConnections()
-    return Promise.all(connexions.map(c => c.pushHeads(heads)))
+    return Promise.all(connexions.map(async (c) => {
+      try {
+        await c.pushHeads(heads)
+      } catch (e) {
+        this.server.logger.warn('Could not spread new HEAD info to %s WS2P %s %s', c.pubkey)
+      }
+    }))
   }
 
   private sayHeadChangedTo(number:number, hash:string) {
diff --git a/app/service/BlockchainService.ts b/app/service/BlockchainService.ts
index dade3d0e1..2735bb16f 100644
--- a/app/service/BlockchainService.ts
+++ b/app/service/BlockchainService.ts
@@ -159,7 +159,7 @@ export class BlockchainService extends FIFOService {
               await this.blockResolution()
               // Resolve the potential forks
               await this.forkResolution()
-              const current = this.current()
+              const current = await this.current()
               this.push({
                 bcEvent: OtherConstants.BC_EVENT.RESOLUTION_DONE,
                 block: current
diff --git a/server.ts b/server.ts
index 259276f1b..fbc57016d 100644
--- a/server.ts
+++ b/server.ts
@@ -190,6 +190,7 @@ export class Server extends stream.Duplex implements HookableServer {
           this.emit('bcEvent', e)
         }
         this.streamPush(e)
+        return e
       }))
 
     return this.conf;
-- 
GitLab