From 27430439bf993659d0c8f3a4e812424d5c81aabb Mon Sep 17 00:00:00 2001
From: cgeek <cem.moreau@gmail.com>
Date: Sat, 28 Jul 2018 17:06:27 +0200
Subject: [PATCH] [enh] handle P2P download slots augmentation/diminution

---
 app/lib/common-libs/constants.ts              |  2 ++
 .../crawler/lib/sync/FsSyncDownloader.ts      | 13 ++++++++++++
 .../crawler/lib/sync/ISyncDownloader.ts       |  2 ++
 .../crawler/lib/sync/P2PSyncDownloader.ts     | 21 ++++++++-----------
 4 files changed, 26 insertions(+), 12 deletions(-)

diff --git a/app/lib/common-libs/constants.ts b/app/lib/common-libs/constants.ts
index 9aa102688..3f62a53c6 100755
--- a/app/lib/common-libs/constants.ts
+++ b/app/lib/common-libs/constants.ts
@@ -314,6 +314,8 @@ export const CommonConstants = {
 
   REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER: 20000, // Reject after 20 seconds without any change
   REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_MAX_FAILS: 5, // Maximum number of rejections of waiting for an available node
+
+  MAX_READING_SLOTS_FOR_FILE_SYNC: 20, // Number of file reading in parallel
 }
 
 function exact (regexpContent:string) {
diff --git a/app/modules/crawler/lib/sync/FsSyncDownloader.ts b/app/modules/crawler/lib/sync/FsSyncDownloader.ts
index 7078c6f88..8c830ff79 100644
--- a/app/modules/crawler/lib/sync/FsSyncDownloader.ts
+++ b/app/modules/crawler/lib/sync/FsSyncDownloader.ts
@@ -2,10 +2,12 @@ import {ISyncDownloader} from "./ISyncDownloader"
 import {BlockDTO} from "../../../../lib/dto/BlockDTO"
 import {FileSystem} from "../../../../lib/system/directory"
 import * as path from 'path'
+import {CommonConstants} from "../../../../lib/common-libs/constants"
 
 export class FsSyncDownloader implements ISyncDownloader {
 
   private ls: Promise<string[]>
+  private ttas: number[] = []
 
   constructor(
     private fs: FileSystem,
@@ -16,6 +18,7 @@ export class FsSyncDownloader implements ISyncDownloader {
   }
 
   async getChunk(i: number): Promise<BlockDTO[]> {
+    const start = Date.now()
     const files = await this.ls
     const filepath = path.join(this.basePath, this.getChunkName(i))
     const basename = path.basename(filepath)
@@ -26,9 +29,19 @@ export class FsSyncDownloader implements ISyncDownloader {
     }
     if (existsOnDAL) {
       const content: any = JSON.parse(await this.fs.fsReadFile(filepath))
+      // Record the reading duration
+      this.ttas.push(Date.now() - start)
       // Returns a promise of file content
       return content.blocks
     }
     return []
   }
+
+  get maxSlots(): number {
+    return CommonConstants.MAX_READING_SLOTS_FOR_FILE_SYNC
+  }
+
+  async getTimesToAnswer(): Promise<{ ttas: number[] }[]> {
+    return [{ ttas: this.ttas }]
+  }
 }
diff --git a/app/modules/crawler/lib/sync/ISyncDownloader.ts b/app/modules/crawler/lib/sync/ISyncDownloader.ts
index e66bdf108..6283f22a0 100644
--- a/app/modules/crawler/lib/sync/ISyncDownloader.ts
+++ b/app/modules/crawler/lib/sync/ISyncDownloader.ts
@@ -2,4 +2,6 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO"
 
 export interface ISyncDownloader {
   getChunk(i: number): Promise<BlockDTO[]>
+  maxSlots: number
+  getTimesToAnswer(): Promise<{ ttas: number[] }[]>
 }
diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts
index ccceac4d4..c3ecf5175 100644
--- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts
+++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts
@@ -31,7 +31,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
   private nodes: Querable<ProfiledNode>[] = []
   private nbDownloadsTried = 0
   private nbDownloading = 0
-  private lastAvgDelay:number
   private downloads: { [chunk: number]: any } = {}
   private fifoPromise = new GlobalFifoPromise()
   private nbWaitFailed = 0
@@ -53,9 +52,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
     this.processing      = Array.from({ length: this.numberOfChunksToDownload }).map(() => false);
     this.handler         = Array.from({ length: this.numberOfChunksToDownload }).map(() => null);
 
-    // Create slots of download, in a ready stage
-    this.lastAvgDelay = this.MAX_DELAY_PER_DOWNLOAD;
-
     for (const thePeer of peers) {
       // Create the node
       let p = PeerDTO.fromJSONObject(thePeer)
@@ -104,6 +100,10 @@ export class P2PSyncDownloader implements ISyncDownloader {
     }
   }
 
+  get maxSlots(): number {
+    return this.nodes.length
+  }
+
   private async wait4AnAvailableNode(): Promise<any> {
     let promises: Promise<any>[] = this.nodes
     return await Promise.race(promises.concat(newRejectTimeoutPromise(CommonConstants.REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER)
@@ -185,14 +185,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
           this.downloads[chunkIndex] = node
         }
         node.nbSuccess++;
-
-        const peers = await Promise.all(this.nodes.filter(p => p.isResolved()))
-        const downloading = Underscore.filter(peers, (p:any) => p.downloading && p.ttas.length);
-        this.lastAvgDelay = downloading.reduce((sum:number, c:any) => {
-          const tta = Math.round(c.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / c.ttas.length)
-          return sum + tta
-        }, 0) / downloading.length
-
         this.nbDownloadsTried++;
         this.nbDownloading--;
         node.readyForDownload.resolve(true)
@@ -258,6 +250,11 @@ export class P2PSyncDownloader implements ISyncDownloader {
   getChunk(index:number): Promise<BlockDTO[]> {
     return this.downloadChunk(index)
   }
+
+  async getTimesToAnswer(): Promise<{ ttas: number[] }[]> {
+    const nodes = await Promise.all(this.nodes.filter(p => p.isResolved()))
+    return nodes.filter(n => n.ttas.length > 0)
+  }
 }
 
 interface ProfiledNode {
-- 
GitLab