Skip to content
Snippets Groups Projects
Commit c2253019 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] sync was strangling the fastest peers

parent 38386a75
No related branches found
No related tags found
No related merge requests found
...@@ -31,7 +31,7 @@ export class P2PSyncDownloader implements ISyncDownloader { ...@@ -31,7 +31,7 @@ export class P2PSyncDownloader implements ISyncDownloader {
private nodes: Querable<ProfiledNode>[] = [] private nodes: Querable<ProfiledNode>[] = []
private nbDownloadsTried = 0 private nbDownloadsTried = 0
private nbDownloading = 0 private nbDownloading = 0
private downloads: { [chunk: number]: any } = {} private downloads: { [chunk: number]: ProfiledNode } = {}
private fifoPromise = new GlobalFifoPromise() private fifoPromise = new GlobalFifoPromise()
private nbWaitFailed = 0 private nbWaitFailed = 0
...@@ -85,6 +85,7 @@ export class P2PSyncDownloader implements ISyncDownloader { ...@@ -85,6 +85,7 @@ export class P2PSyncDownloader implements ISyncDownloader {
tta: 1, tta: 1,
ttas: [], ttas: [],
nbSuccess: 1, nbSuccess: 1,
excluded: false,
readyForDownload: manualp, readyForDownload: manualp,
hostName: syncApi && syncApi.api.hostName || '', hostName: syncApi && syncApi.api.hostName || '',
} }
...@@ -130,7 +131,7 @@ export class P2PSyncDownloader implements ISyncDownloader { ...@@ -130,7 +131,7 @@ export class P2PSyncDownloader implements ISyncDownloader {
// We filter on all the available nodes, since serveral can be ready at the same time // We filter on all the available nodes, since serveral can be ready at the same time
const readyNodes:ProfiledNode[] = await Promise.all(this.nodes.filter(p => p.isResolved())) const readyNodes:ProfiledNode[] = await Promise.all(this.nodes.filter(p => p.isResolved()))
// We remove the nodes impossible to reach (timeout) // We remove the nodes impossible to reach (timeout)
let withGoodDelays = Underscore.filter(readyNodes, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD) let withGoodDelays = Underscore.filter(readyNodes, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && c.readyForDownload.isResolved())
if (withGoodDelays.length === 0) { if (withGoodDelays.length === 0) {
readyNodes.map(c => { readyNodes.map(c => {
if (c.tta >= this.MAX_DELAY_PER_DOWNLOAD) { if (c.tta >= this.MAX_DELAY_PER_DOWNLOAD) {
...@@ -142,7 +143,14 @@ export class P2PSyncDownloader implements ISyncDownloader { ...@@ -142,7 +143,14 @@ export class P2PSyncDownloader implements ISyncDownloader {
withGoodDelays = Underscore.sortBy(withGoodDelays, c => c.tta) withGoodDelays = Underscore.sortBy(withGoodDelays, c => c.tta)
withGoodDelays = withGoodDelays.slice(0, parallelMax) withGoodDelays = withGoodDelays.slice(0, parallelMax)
// We temporarily augment the tta to avoid asking several times to the same node in parallel // We temporarily augment the tta to avoid asking several times to the same node in parallel
withGoodDelays.forEach(c => c.tta = this.MAX_DELAY_PER_DOWNLOAD) withGoodDelays.forEach(c => {
c.tta = this.MAX_DELAY_PER_DOWNLOAD
c.readyForDownload = newManualPromise()
})
if (withGoodDelays.length === 0) {
this.logger.warn('No node found to download this chunk.')
throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
}
return withGoodDelays return withGoodDelays
}) })
} }
...@@ -158,19 +166,14 @@ export class P2PSyncDownloader implements ISyncDownloader { ...@@ -158,19 +166,14 @@ export class P2PSyncDownloader implements ISyncDownloader {
const lastSupplier = this.downloads[chunkIndex] const lastSupplier = this.downloads[chunkIndex]
if (lastSupplier) { if (lastSupplier) {
lastSupplier.excluded = true lastSupplier.excluded = true
this.logger.warn('Excluding node %s as it returns unchainable chunks', [lastSupplier.host, lastSupplier.port].join(':')) this.logger.warn('Excluding node %s as it returns unchainable chunks', lastSupplier.hostName)
} }
let candidates = await this.getP2Pcandidates(); let candidates = await this.getP2Pcandidates();
if (candidates.length === 0) {
this.logger.warn('No node found to download this chunk.')
throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
}
// Book the nodes // Book the nodes
return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:ProfiledNode) => { return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:ProfiledNode) => {
try { try {
const start = Date.now(); const start = Date.now();
this.handler[chunkIndex] = node; this.handler[chunkIndex] = node;
node.readyForDownload = newManualPromise()
this.nbDownloading++; this.nbDownloading++;
this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
let blocks = await node.api.getBlocks(count, from); let blocks = await node.api.getBlocks(count, from);
...@@ -263,5 +266,6 @@ interface ProfiledNode { ...@@ -263,5 +266,6 @@ interface ProfiledNode {
ttas: number[] ttas: number[]
nbSuccess: number nbSuccess: number
hostName: string hostName: string
excluded: boolean
readyForDownload: ManualPromise<boolean> readyForDownload: ManualPromise<boolean>
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment