P2PSyncDownloader.ts 7.37 KB
Newer Older
1
2
3
4
5
6
import {JSONDBPeer} from "../../../../lib/db/DBPeer"
import {PeerDTO} from "../../../../lib/dto/PeerDTO"
import {Underscore} from "../../../../lib/common-libs/underscore"
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {Watcher} from "./Watcher"
import {ISyncDownloader} from "./ISyncDownloader"
7
import {cliprogram} from "../../../../lib/common-libs/programOptions"
8
9
import {Keypair} from "../../../../lib/dto/ConfDTO"
import {IRemoteContacter} from "./IRemoteContacter"
10
import {ManualPromise} from "../../../../lib/common-libs/manual-promise"
11
12
13
14
import {GlobalFifoPromise} from "../../../../service/GlobalFifoPromise"
import {getNanosecondsTime} from "../../../../ProcessCpuProfiler"
import {CommonConstants} from "../../../../lib/common-libs/constants"
import {DataErrors} from "../../../../lib/common-libs/errors"
15
import {ASyncDownloader} from "./ASyncDownloader"
16
import {P2pCandidate} from "./p2p/p2p-candidate"
17

18
export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloader {
19
20

  private PARALLEL_PER_CHUNK = 1;
21
  private MAX_DELAY_PER_DOWNLOAD = cliprogram.slow ? 2 * 60000 : 15000;
22
23
24
25
26
  private TOO_LONG_TIME_DOWNLOAD:string
  private nbBlocksToDownload:number
  private numberOfChunksToDownload:number
  private processing:any
  private handler:any
27
  private p2pCandidates: P2pCandidate[] = []
28
29
  private nbDownloadsTried = 0
  private nbDownloading = 0
30
  private downloads: { [chunk: number]: P2pCandidate } = {}
31
32
  private fifoPromise = new GlobalFifoPromise()
  private nbWaitFailed = 0
33
34

  constructor(
35
36
    private currency: string,
    private keypair: Keypair,
37
38
39
40
41
    private localNumber:number,
    private to:number,
    private peers:JSONDBPeer[],
    private watcher:Watcher,
    private logger:any,
42
    public chunkSize: number,
43
    public allowLocalSync: boolean,
44
    ) {
45
    super(chunkSize)
46
47
    this.TOO_LONG_TIME_DOWNLOAD = "No answer after " + this.MAX_DELAY_PER_DOWNLOAD + "ms, will retry download later.";
    this.nbBlocksToDownload = Math.max(0, to - localNumber);
48
    this.numberOfChunksToDownload = Math.ceil(this.nbBlocksToDownload / this.chunkSize);
49
50
51
    this.processing      = Array.from({ length: this.numberOfChunksToDownload }).map(() => false);
    this.handler         = Array.from({ length: this.numberOfChunksToDownload }).map(() => null);

52
    this.p2pCandidates = peers.map(p => new P2pCandidate(PeerDTO.fromJSONObject(p), this.keypair, this.logger, allowLocalSync))
53
54
  }

55
  get maxSlots(): number {
56
    return this.p2pCandidates.filter(p => p.hasAvailableApi()).length
57
58
  }

59
60
61
62
63
64
65
66
67
68
69
70
  private async waitForAvailableNodes(needed = 1): Promise<P2pCandidate[]> {
    let nodesToWaitFor = this.p2pCandidates.slice()
    let nodesAvailable: P2pCandidate[] = []
    let i = 0
    while (nodesAvailable.length < needed && i < needed) {
      await Promise.race(nodesToWaitFor.map(p => p.waitAvailability(CommonConstants.WAIT_P2P_CANDIDATE_HEARTBEAT)))
      const readyNodes = nodesToWaitFor.filter(p => p.isReady())
      nodesToWaitFor = nodesToWaitFor.filter(p => !p.isReady())
      nodesAvailable = nodesAvailable.concat(readyNodes)
      i++
    }
    return nodesAvailable
71
72
73
74
75
76
77
  }

  /**
   * Get a list of P2P nodes to use for download.
   * If a node is not yet correctly initialized (we can test a node before considering it good for downloading), then
   * this method would not return it.
   */
78
  private async getP2Pcandidates(chunkIndex: number): Promise<P2pCandidate[]> {
79
    return this.fifoPromise.pushFIFOPromise('getP2Pcandidates_' + getNanosecondsTime(), async () => {
80
81
      // We wait a bit to have some available nodes
      const readyNodes = await this.waitForAvailableNodes()
82
      // We remove the nodes impossible to reach (timeout)
83
84
85
86
87
      let byAvgAnswerTime = Underscore.sortBy(readyNodes, p => p.avgResponseTime())
      const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, byAvgAnswerTime.length)
      byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax)
      if (byAvgAnswerTime.length === 0) {
        this.logger.warn('No node found to download chunk #%s.', chunkIndex)
88
89
        throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
      }
90
      return byAvgAnswerTime
91
    })
92
93
94
95
96
97
98
99
100
101
102
103
  }

  /**
   * Download a chunk of blocks using P2P network through BMA API.
   * @param from The starting block to download
   * @param count The number of blocks to download.
   * @param chunkIndex The # of the chunk in local algorithm (logging purposes only)
   */
  private async p2pDownload(from:number, count:number, chunkIndex:number) {
    // if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now
    const lastSupplier = this.downloads[chunkIndex]
    if (lastSupplier) {
104
      lastSupplier.addFailure()
105
    }
106
107
    // Only 1 candidate for now
    const candidates = await this.getP2Pcandidates(chunkIndex)
108
    // Book the nodes
109
    return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => {
110
111
112
      try {
        this.handler[chunkIndex] = node;
        this.nbDownloading++;
113
        this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
114
        let blocks = await node.downloadBlocks(count, from);
115
        this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
116
117
118
119
120
        if (this.PARALLEL_PER_CHUNK === 1) {
          // Only works if we have 1 concurrent peer per chunk
          this.downloads[chunkIndex] = node
        }
        this.nbDownloading--;
121
        this.nbDownloadsTried++;
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
        return blocks;
      } catch (e) {
        this.nbDownloading--;
        this.nbDownloadsTried++;
        throw e;
      }
    }))
  }

  /**
   * Function for downloading a chunk by its number.
   * @param index Number of the chunk.
   */
  private async downloadChunk(index:number): Promise<BlockDTO[]> {
    // The algorithm to download a chunk
137
138
    const from = this.localNumber + 1 + index * this.chunkSize;
    let count = this.chunkSize;
139
    if (index == this.numberOfChunksToDownload - 1) {
140
      count = this.nbBlocksToDownload % this.chunkSize || this.chunkSize;
141
142
143
144
145
    }
    try {
      return await this.p2pDownload(from, count, index) as BlockDTO[]
    } catch (e) {
      this.logger.error(e);
146
      await new Promise(res => setTimeout(res, 1000)) // Wait 1s before retrying
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
      return this.downloadChunk(index);
    }
  }

  /**
   * Utility function this starts a race between promises but cancels it if no answer is found before `timeout`
   * @param timeout
   * @param races
   * @returns {Promise}
   */
  private raceOrCancelIfTimeout(timeout:number, races:any[]) {
    return Promise.race([
      // Process the race, but cancel it if we don't get an anwser quickly enough
      new Promise((resolve, reject) => {
        setTimeout(() => {
          reject(this.TOO_LONG_TIME_DOWNLOAD);
        }, timeout)
      })
    ].concat(races));
  };

  /**
   * PUBLIC API
   */

  /***
   * Promises a chunk to be downloaded and returned
   * @param index The number of the chunk to download & return
   */
  getChunk(index:number): Promise<BlockDTO[]> {
    return this.downloadChunk(index)
  }
}
180
181
182
183
184
185
186

interface ProfiledNode {
  api: IRemoteContacter
  tta: number
  ttas: number[]
  nbSuccess: number
  hostName: string
187
  excluded: boolean
188
  readyForDownload: ManualPromise<boolean>
189
}