From 65ebf7625848fdb8e6c6685cabfd07911f617470 Mon Sep 17 00:00:00 2001
From: cgeek <cem.moreau@gmail.com>
Date: Sat, 5 Jan 2019 16:39:45 +0100
Subject: [PATCH] [enh] #1325 Spread events about synchronization for better
 reporting

---
 app/modules/crawler/lib/sync.ts               |   9 ++
 .../crawler/lib/sync/BMARemoteContacter.ts    |   2 +
 .../crawler/lib/sync/IRemoteContacter.ts      |   2 +
 .../crawler/lib/sync/P2PSyncDownloader.ts     |   7 +
 .../crawler/lib/sync/WS2PRemoteContacter.ts   |   2 +
 app/modules/crawler/lib/sync/Watcher.ts       | 125 ++++++++++++++++++
 .../crawler/lib/sync/p2p/p2p-candidate.ts     |   6 +-
 .../crawler/lib/sync/v2/DownloadStream.ts     |   1 +
 app/modules/ws2p/lib/WS2PDocpoolPuller.ts     |   1 +
 9 files changed, 154 insertions(+), 1 deletion(-)

diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts
index 9d0fa1f51..358bd1547 100644
--- a/app/modules/crawler/lib/sync.ts
+++ b/app/modules/crawler/lib/sync.ts
@@ -48,6 +48,15 @@ export class Synchroniser extends stream.Duplex {
     this.watcher.onEvent('appliedChange',  () => this.push(this.watcher.getStats()))
     this.watcher.onEvent('sbxChange',      () => this.push(this.watcher.getStats()))
     this.watcher.onEvent('peersChange',    () => this.push(this.watcher.getStats()))
+    this.watcher.onEvent('addWrongChunkFailure',  (data) => this.push({ p2pData: { name: 'addWrongChunkFailure', data }}))
+    this.watcher.onEvent('failToGetChunk',        (data) => this.push({ p2pData: { name: 'failToGetChunk', data }}))
+    this.watcher.onEvent('gettingChunk',          (data) => this.push({ p2pData: { name: 'gettingChunk', data }}))
+    this.watcher.onEvent('gotChunk',              (data) => this.push({ p2pData: { name: 'gotChunk', data }}))
+    this.watcher.onEvent('reserveNodes',          (data) => this.push({ p2pData: { name: 'reserveNodes', data }}))
+    this.watcher.onEvent('unableToDownloadChunk', (data) => this.push({ p2pData: { name: 'unableToDownloadChunk', data }}))
+    this.watcher.onEvent('wantToDownload',        (data) => this.push({ p2pData: { name: 'wantToDownload', data }}))
+    this.watcher.onEvent('wantToLoad',            (data) => this.push({ p2pData: { name: 'wantToLoad', data }}))
+    this.watcher.onEvent('beforeReadyNodes',      (data) => this.push({ p2pData: { name: 'beforeReadyNodes', data }}))
 
     this.syncStrategy.setWatcher(this.watcher)
 
diff --git a/app/modules/crawler/lib/sync/BMARemoteContacter.ts b/app/modules/crawler/lib/sync/BMARemoteContacter.ts
index 14fdbdeba..63ddb845d 100644
--- a/app/modules/crawler/lib/sync/BMARemoteContacter.ts
+++ b/app/modules/crawler/lib/sync/BMARemoteContacter.ts
@@ -22,6 +22,8 @@ const logger = NewLogger()
 
 export class BMARemoteContacter implements IRemoteContacter {
 
+  public type: 'BMA'| 'WS2P' = 'BMA'
+
   constructor(protected contacter: Contacter) {
   }
 
diff --git a/app/modules/crawler/lib/sync/IRemoteContacter.ts b/app/modules/crawler/lib/sync/IRemoteContacter.ts
index ef0c2e706..9a4ea10d2 100644
--- a/app/modules/crawler/lib/sync/IRemoteContacter.ts
+++ b/app/modules/crawler/lib/sync/IRemoteContacter.ts
@@ -34,4 +34,6 @@ export interface IRemoteContacter {
   getRequirementsPending(number: number): Promise<HttpRequirements>
 
   hostName: string
+
+  type: 'BMA' | 'WS2P'
 }
diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts
index 96db98a2d..a7c759444 100644
--- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts
+++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts
@@ -57,6 +57,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
   }
 
   private async waitForAvailableNodesAndReserve(needed = 1): Promise<P2pCandidate[]> {
+    this.watcher.beforeReadyNodes(this.p2pCandidates)
     let nodesToWaitFor = this.p2pCandidates.slice()
     let nodesAvailable: P2pCandidate[] = []
     let i = 0
@@ -88,6 +89,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
       byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax)
       if (byAvgAnswerTime.length === 0) {
         this.logger.warn('No node found to download chunk #%s.', chunkIndex)
+        this.watcher.unableToDownloadChunk(chunkIndex)
         throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
       }
       return byAvgAnswerTime
@@ -104,17 +106,21 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
     // if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now
     const lastSupplier = this.downloads[chunkIndex]
     if (lastSupplier) {
+      this.watcher.addWrongChunkFailure(chunkIndex, lastSupplier)
       lastSupplier.addFailure()
     }
+    this.watcher.wantToDownload(chunkIndex)
     // Only 1 candidate for now
     const candidates = await this.getP2Pcandidates(chunkIndex)
     // Book the nodes
+    this.watcher.gettingChunk(chunkIndex, candidates)
     return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => {
       try {
         this.handler[chunkIndex] = node;
         this.nbDownloading++;
         this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
         let blocks = await node.downloadBlocks(count, from);
+        this.watcher.gotChunk(chunkIndex, node)
         this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
         if (this.PARALLEL_PER_CHUNK === 1) {
           // Only works if we have 1 concurrent peer per chunk
@@ -124,6 +130,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
         this.nbDownloadsTried++;
         return blocks;
       } catch (e) {
+        this.watcher.failToGetChunk(chunkIndex, node)
         this.nbDownloading--;
         this.nbDownloadsTried++;
         throw e;
diff --git a/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts b/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts
index eeb7226cb..3af758ce9 100644
--- a/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts
+++ b/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts
@@ -23,6 +23,8 @@ const logger = NewLogger()
 
 export class WS2PRemoteContacter implements IRemoteContacter {
 
+  public type: 'BMA'| 'WS2P' = 'WS2P'
+
   getRequirementsPending(min: number): Promise<HttpRequirements> {
     return this.requester.getRequirementsPending(min)
   }
diff --git a/app/modules/crawler/lib/sync/Watcher.ts b/app/modules/crawler/lib/sync/Watcher.ts
index bbb5f113e..9e0c945a1 100644
--- a/app/modules/crawler/lib/sync/Watcher.ts
+++ b/app/modules/crawler/lib/sync/Watcher.ts
@@ -1,5 +1,6 @@
 import * as events from "events"
 import {cliprogram} from "../../../../lib/common-libs/programOptions"
+import {P2pCandidate} from "./p2p/p2p-candidate"
 
 const multimeter   = require('multimeter')
 
@@ -11,9 +12,36 @@ export interface Watcher {
   sbxPercent(pct?: number): number
   peersPercent(pct?: number): number
   end(): void
+
+  reserveNodes(nodesAvailable: P2pCandidate[]): void
+
+  unableToDownloadChunk(chunkIndex: number): void
+
+  gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void
+
+  gotChunk(chunkIndex: number, node: P2pCandidate): void
+
+  failToGetChunk(chunkIndex: number, node: P2pCandidate): void
+
+  wantToDownload(chunkIndex: number): void
+
+  addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void
+
+  wantToLoad(chunkIndex: number): void
+
+  beforeReadyNodes(p2pCandidates: P2pCandidate[]): void
 }
 
 export type EventName = 'downloadChange'|'storageChange'|'appliedChange'|'sbxChange'|'peersChange'
+  | 'addWrongChunkFailure'
+  | 'failToGetChunk'
+  | 'gettingChunk'
+  | 'gotChunk'
+  | 'reserveNodes'
+  | 'unableToDownloadChunk'
+  | 'wantToDownload'
+  | 'wantToLoad'
+  | 'beforeReadyNodes'
 
 export class EventWatcher extends events.EventEmitter implements Watcher {
 
@@ -69,6 +97,44 @@ export class EventWatcher extends events.EventEmitter implements Watcher {
       peersSync: this.peersPercent(),
     }
   }
+
+  /************* P2P DOWNLOAD EVENTS ****************/
+
+  addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void {
+    this.emit('addWrongChunkFailure', { chunkIndex, node: lastSupplier })
+  }
+
+  failToGetChunk(chunkIndex: number, node: P2pCandidate): void {
+    this.emit('failToGetChunk', { chunkIndex, node })
+  }
+
+  gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void {
+    this.emit('gettingChunk', { chunkIndex, nodes: candidates })
+  }
+
+  gotChunk(chunkIndex: number, node: P2pCandidate): void {
+    this.emit('gotChunk', { chunkIndex, node })
+  }
+
+  reserveNodes(nodesAvailable: P2pCandidate[]): void {
+    this.emit('reserveNodes', { nodes: nodesAvailable })
+  }
+
+  unableToDownloadChunk(chunkIndex: number): void {
+    this.emit('unableToDownloadChunk', { chunkIndex })
+  }
+
+  wantToDownload(chunkIndex: number): void {
+    this.emit('wantToDownload', { chunkIndex })
+  }
+
+  wantToLoad(chunkIndex: number): void {
+    this.emit('wantToLoad', { chunkIndex })
+  }
+
+  beforeReadyNodes(p2pCandidates: P2pCandidate[]): void {
+    this.emit('beforeReadyNodes', { nodes: p2pCandidates })
+  }
 }
 
 export class MultimeterWatcher implements Watcher {
@@ -176,6 +242,36 @@ export class MultimeterWatcher implements Watcher {
       empty : { text : ' ' }
     })
   }
+
+  /************* NOT IMPLEMENTED ****************/
+
+  addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void {
+  }
+
+  failToGetChunk(chunkIndex: number, node: P2pCandidate): void {
+  }
+
+  gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void {
+  }
+
+  gotChunk(chunkIndex: number, node: P2pCandidate): void {
+  }
+
+  reserveNodes(nodesAvailable: P2pCandidate[]): void {
+  }
+
+  unableToDownloadChunk(chunkIndex: number): void {
+  }
+
+  wantToDownload(chunkIndex: number): void {
+  }
+
+  wantToLoad(chunkIndex: number): void {
+  }
+
+  beforeReadyNodes(p2pCandidates: P2pCandidate[]): void {
+  }
+
 }
 
 export class LoggerWatcher implements Watcher {
@@ -239,4 +335,33 @@ export class LoggerWatcher implements Watcher {
   end() {
   }
 
+  /************* NOT IMPLEMENTED ****************/
+
+  addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void {
+  }
+
+  failToGetChunk(chunkIndex: number, node: P2pCandidate): void {
+  }
+
+  gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void {
+  }
+
+  gotChunk(chunkIndex: number, node: P2pCandidate): void {
+  }
+
+  reserveNodes(nodesAvailable: P2pCandidate[]): void {
+  }
+
+  unableToDownloadChunk(chunkIndex: number): void {
+  }
+
+  wantToDownload(chunkIndex: number): void {
+  }
+
+  wantToLoad(chunkIndex: number): void {
+  }
+
+  beforeReadyNodes(p2pCandidates: P2pCandidate[]): void {
+  }
+
 }
diff --git a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts
index 7bdfdbe36..5819db553 100644
--- a/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts
+++ b/app/modules/crawler/lib/sync/p2p/p2p-candidate.ts
@@ -18,7 +18,7 @@ export class P2pCandidate {
   private reserved = false
 
   constructor(
-    private p: PeerDTO,
+    public p: PeerDTO,
     private keypair: Keypair,
     private logger: any,
     private allowLocalSync: boolean,
@@ -131,6 +131,10 @@ export class P2pCandidate {
   reserve() {
     this.reserved = true
   }
+
+  get apiName() {
+    return this.api && this.api.type
+  }
 }
 
 interface RemoteAPI {
diff --git a/app/modules/crawler/lib/sync/v2/DownloadStream.ts b/app/modules/crawler/lib/sync/v2/DownloadStream.ts
index d903b598b..cae155f5a 100644
--- a/app/modules/crawler/lib/sync/v2/DownloadStream.ts
+++ b/app/modules/crawler/lib/sync/v2/DownloadStream.ts
@@ -62,6 +62,7 @@ export class DownloadStream extends Duplex {
         let chunk: BlockDTO[]
         // We don't have the file locally: we loop on P2P download until we have it (or until P2P throws a general error)
         do {
+          this.watcher.wantToLoad(i)
           chunk = await downloader.getChunk(i)
           if (chunk.length) {
             // NewLogger().info("Chunk #%s is COMPLETE", i)
diff --git a/app/modules/ws2p/lib/WS2PDocpoolPuller.ts b/app/modules/ws2p/lib/WS2PDocpoolPuller.ts
index 4178a7a49..08b6cfe35 100644
--- a/app/modules/ws2p/lib/WS2PDocpoolPuller.ts
+++ b/app/modules/ws2p/lib/WS2PDocpoolPuller.ts
@@ -27,6 +27,7 @@ export class WS2PDocpoolPuller {
     const requester = WS2PRequester.fromConnection(this.connection)
     // node.pubkey = p.pubkey;
     return pullSandboxToLocalServer(this.server.conf.currency, {
+      type: 'WS2P',
       getRequirementsPending: (minCert = 1) => {
         return requester.getRequirementsPending(minCert)
       },
-- 
GitLab