Skip to content
Snippets Groups Projects
Commit 27430439 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] handle P2P download slots augmentation/diminution

parent 9ff516e5
No related branches found
No related tags found
No related merge requests found
......@@ -314,6 +314,8 @@ export const CommonConstants = {
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER: 20000, // Reject after 20 seconds without any change
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_MAX_FAILS: 5, // Maximum number of rejections of waiting for an available node
MAX_READING_SLOTS_FOR_FILE_SYNC: 20, // Number of file reading in parallel
}
function exact (regexpContent:string) {
......
......@@ -2,10 +2,12 @@ import {ISyncDownloader} from "./ISyncDownloader"
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {FileSystem} from "../../../../lib/system/directory"
import * as path from 'path'
import {CommonConstants} from "../../../../lib/common-libs/constants"
export class FsSyncDownloader implements ISyncDownloader {
private ls: Promise<string[]>
private ttas: number[] = []
constructor(
private fs: FileSystem,
......@@ -16,6 +18,7 @@ export class FsSyncDownloader implements ISyncDownloader {
}
async getChunk(i: number): Promise<BlockDTO[]> {
const start = Date.now()
const files = await this.ls
const filepath = path.join(this.basePath, this.getChunkName(i))
const basename = path.basename(filepath)
......@@ -26,9 +29,19 @@ export class FsSyncDownloader implements ISyncDownloader {
}
if (existsOnDAL) {
const content: any = JSON.parse(await this.fs.fsReadFile(filepath))
// Record the reading duration
this.ttas.push(Date.now() - start)
// Returns a promise of file content
return content.blocks
}
return []
}
get maxSlots(): number {
return CommonConstants.MAX_READING_SLOTS_FOR_FILE_SYNC
}
async getTimesToAnswer(): Promise<{ ttas: number[] }[]> {
return [{ ttas: this.ttas }]
}
}
......@@ -2,4 +2,6 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO"
export interface ISyncDownloader {
getChunk(i: number): Promise<BlockDTO[]>
maxSlots: number
getTimesToAnswer(): Promise<{ ttas: number[] }[]>
}
......@@ -31,7 +31,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
private nodes: Querable<ProfiledNode>[] = []
private nbDownloadsTried = 0
private nbDownloading = 0
private lastAvgDelay:number
private downloads: { [chunk: number]: any } = {}
private fifoPromise = new GlobalFifoPromise()
private nbWaitFailed = 0
......@@ -53,9 +52,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
this.processing = Array.from({ length: this.numberOfChunksToDownload }).map(() => false);
this.handler = Array.from({ length: this.numberOfChunksToDownload }).map(() => null);
// Create slots of download, in a ready stage
this.lastAvgDelay = this.MAX_DELAY_PER_DOWNLOAD;
for (const thePeer of peers) {
// Create the node
let p = PeerDTO.fromJSONObject(thePeer)
......@@ -104,6 +100,10 @@ export class P2PSyncDownloader implements ISyncDownloader {
}
}
get maxSlots(): number {
return this.nodes.length
}
private async wait4AnAvailableNode(): Promise<any> {
let promises: Promise<any>[] = this.nodes
return await Promise.race(promises.concat(newRejectTimeoutPromise(CommonConstants.REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER)
......@@ -185,14 +185,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
this.downloads[chunkIndex] = node
}
node.nbSuccess++;
const peers = await Promise.all(this.nodes.filter(p => p.isResolved()))
const downloading = Underscore.filter(peers, (p:any) => p.downloading && p.ttas.length);
this.lastAvgDelay = downloading.reduce((sum:number, c:any) => {
const tta = Math.round(c.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / c.ttas.length)
return sum + tta
}, 0) / downloading.length
this.nbDownloadsTried++;
this.nbDownloading--;
node.readyForDownload.resolve(true)
......@@ -258,6 +250,11 @@ export class P2PSyncDownloader implements ISyncDownloader {
getChunk(index:number): Promise<BlockDTO[]> {
return this.downloadChunk(index)
}
async getTimesToAnswer(): Promise<{ ttas: number[] }[]> {
const nodes = await Promise.all(this.nodes.filter(p => p.isResolved()))
return nodes.filter(n => n.ttas.length > 0)
}
}
interface ProfiledNode {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment