Skip to content
Snippets Groups Projects
Commit 65ebf762 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] #1325 Spread events about synchronization for better reporting

parent 47f7f4ee
Branches
Tags
No related merge requests found
...@@ -48,6 +48,15 @@ export class Synchroniser extends stream.Duplex { ...@@ -48,6 +48,15 @@ export class Synchroniser extends stream.Duplex {
this.watcher.onEvent('appliedChange', () => this.push(this.watcher.getStats())) this.watcher.onEvent('appliedChange', () => this.push(this.watcher.getStats()))
this.watcher.onEvent('sbxChange', () => this.push(this.watcher.getStats())) this.watcher.onEvent('sbxChange', () => this.push(this.watcher.getStats()))
this.watcher.onEvent('peersChange', () => this.push(this.watcher.getStats())) this.watcher.onEvent('peersChange', () => this.push(this.watcher.getStats()))
this.watcher.onEvent('addWrongChunkFailure', (data) => this.push({ p2pData: { name: 'addWrongChunkFailure', data }}))
this.watcher.onEvent('failToGetChunk', (data) => this.push({ p2pData: { name: 'failToGetChunk', data }}))
this.watcher.onEvent('gettingChunk', (data) => this.push({ p2pData: { name: 'gettingChunk', data }}))
this.watcher.onEvent('gotChunk', (data) => this.push({ p2pData: { name: 'gotChunk', data }}))
this.watcher.onEvent('reserveNodes', (data) => this.push({ p2pData: { name: 'reserveNodes', data }}))
this.watcher.onEvent('unableToDownloadChunk', (data) => this.push({ p2pData: { name: 'unableToDownloadChunk', data }}))
this.watcher.onEvent('wantToDownload', (data) => this.push({ p2pData: { name: 'wantToDownload', data }}))
this.watcher.onEvent('wantToLoad', (data) => this.push({ p2pData: { name: 'wantToLoad', data }}))
this.watcher.onEvent('beforeReadyNodes', (data) => this.push({ p2pData: { name: 'beforeReadyNodes', data }}))
this.syncStrategy.setWatcher(this.watcher) this.syncStrategy.setWatcher(this.watcher)
......
...@@ -22,6 +22,8 @@ const logger = NewLogger() ...@@ -22,6 +22,8 @@ const logger = NewLogger()
export class BMARemoteContacter implements IRemoteContacter { export class BMARemoteContacter implements IRemoteContacter {
public type: 'BMA'| 'WS2P' = 'BMA'
constructor(protected contacter: Contacter) { constructor(protected contacter: Contacter) {
} }
......
...@@ -34,4 +34,6 @@ export interface IRemoteContacter { ...@@ -34,4 +34,6 @@ export interface IRemoteContacter {
getRequirementsPending(number: number): Promise<HttpRequirements> getRequirementsPending(number: number): Promise<HttpRequirements>
hostName: string hostName: string
type: 'BMA' | 'WS2P'
} }
...@@ -57,6 +57,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade ...@@ -57,6 +57,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
} }
private async waitForAvailableNodesAndReserve(needed = 1): Promise<P2pCandidate[]> { private async waitForAvailableNodesAndReserve(needed = 1): Promise<P2pCandidate[]> {
this.watcher.beforeReadyNodes(this.p2pCandidates)
let nodesToWaitFor = this.p2pCandidates.slice() let nodesToWaitFor = this.p2pCandidates.slice()
let nodesAvailable: P2pCandidate[] = [] let nodesAvailable: P2pCandidate[] = []
let i = 0 let i = 0
...@@ -88,6 +89,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade ...@@ -88,6 +89,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax) byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax)
if (byAvgAnswerTime.length === 0) { if (byAvgAnswerTime.length === 0) {
this.logger.warn('No node found to download chunk #%s.', chunkIndex) this.logger.warn('No node found to download chunk #%s.', chunkIndex)
this.watcher.unableToDownloadChunk(chunkIndex)
throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK]) throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
} }
return byAvgAnswerTime return byAvgAnswerTime
...@@ -104,17 +106,21 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade ...@@ -104,17 +106,21 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
// if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now // if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now
const lastSupplier = this.downloads[chunkIndex] const lastSupplier = this.downloads[chunkIndex]
if (lastSupplier) { if (lastSupplier) {
this.watcher.addWrongChunkFailure(chunkIndex, lastSupplier)
lastSupplier.addFailure() lastSupplier.addFailure()
} }
this.watcher.wantToDownload(chunkIndex)
// Only 1 candidate for now // Only 1 candidate for now
const candidates = await this.getP2Pcandidates(chunkIndex) const candidates = await this.getP2Pcandidates(chunkIndex)
// Book the nodes // Book the nodes
this.watcher.gettingChunk(chunkIndex, candidates)
return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => { return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => {
try { try {
this.handler[chunkIndex] = node; this.handler[chunkIndex] = node;
this.nbDownloading++; this.nbDownloading++;
this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
let blocks = await node.downloadBlocks(count, from); let blocks = await node.downloadBlocks(count, from);
this.watcher.gotChunk(chunkIndex, node)
this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
if (this.PARALLEL_PER_CHUNK === 1) { if (this.PARALLEL_PER_CHUNK === 1) {
// Only works if we have 1 concurrent peer per chunk // Only works if we have 1 concurrent peer per chunk
...@@ -124,6 +130,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade ...@@ -124,6 +130,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
this.nbDownloadsTried++; this.nbDownloadsTried++;
return blocks; return blocks;
} catch (e) { } catch (e) {
this.watcher.failToGetChunk(chunkIndex, node)
this.nbDownloading--; this.nbDownloading--;
this.nbDownloadsTried++; this.nbDownloadsTried++;
throw e; throw e;
......
...@@ -23,6 +23,8 @@ const logger = NewLogger() ...@@ -23,6 +23,8 @@ const logger = NewLogger()
export class WS2PRemoteContacter implements IRemoteContacter { export class WS2PRemoteContacter implements IRemoteContacter {
public type: 'BMA'| 'WS2P' = 'WS2P'
getRequirementsPending(min: number): Promise<HttpRequirements> { getRequirementsPending(min: number): Promise<HttpRequirements> {
return this.requester.getRequirementsPending(min) return this.requester.getRequirementsPending(min)
} }
......
import * as events from "events" import * as events from "events"
import {cliprogram} from "../../../../lib/common-libs/programOptions" import {cliprogram} from "../../../../lib/common-libs/programOptions"
import {P2pCandidate} from "./p2p/p2p-candidate"
const multimeter = require('multimeter') const multimeter = require('multimeter')
...@@ -11,9 +12,36 @@ export interface Watcher { ...@@ -11,9 +12,36 @@ export interface Watcher {
sbxPercent(pct?: number): number sbxPercent(pct?: number): number
peersPercent(pct?: number): number peersPercent(pct?: number): number
end(): void end(): void
reserveNodes(nodesAvailable: P2pCandidate[]): void
unableToDownloadChunk(chunkIndex: number): void
gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void
gotChunk(chunkIndex: number, node: P2pCandidate): void
failToGetChunk(chunkIndex: number, node: P2pCandidate): void
wantToDownload(chunkIndex: number): void
addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void
wantToLoad(chunkIndex: number): void
beforeReadyNodes(p2pCandidates: P2pCandidate[]): void
} }
export type EventName = 'downloadChange'|'storageChange'|'appliedChange'|'sbxChange'|'peersChange' export type EventName = 'downloadChange'|'storageChange'|'appliedChange'|'sbxChange'|'peersChange'
| 'addWrongChunkFailure'
| 'failToGetChunk'
| 'gettingChunk'
| 'gotChunk'
| 'reserveNodes'
| 'unableToDownloadChunk'
| 'wantToDownload'
| 'wantToLoad'
| 'beforeReadyNodes'
export class EventWatcher extends events.EventEmitter implements Watcher { export class EventWatcher extends events.EventEmitter implements Watcher {
...@@ -69,6 +97,44 @@ export class EventWatcher extends events.EventEmitter implements Watcher { ...@@ -69,6 +97,44 @@ export class EventWatcher extends events.EventEmitter implements Watcher {
peersSync: this.peersPercent(), peersSync: this.peersPercent(),
} }
} }
/************* P2P DOWNLOAD EVENTS ****************/
addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void {
this.emit('addWrongChunkFailure', { chunkIndex, node: lastSupplier })
}
failToGetChunk(chunkIndex: number, node: P2pCandidate): void {
this.emit('failToGetChunk', { chunkIndex, node })
}
gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void {
this.emit('gettingChunk', { chunkIndex, nodes: candidates })
}
gotChunk(chunkIndex: number, node: P2pCandidate): void {
this.emit('gotChunk', { chunkIndex, node })
}
reserveNodes(nodesAvailable: P2pCandidate[]): void {
this.emit('reserveNodes', { nodes: nodesAvailable })
}
unableToDownloadChunk(chunkIndex: number): void {
this.emit('unableToDownloadChunk', { chunkIndex })
}
wantToDownload(chunkIndex: number): void {
this.emit('wantToDownload', { chunkIndex })
}
wantToLoad(chunkIndex: number): void {
this.emit('wantToLoad', { chunkIndex })
}
beforeReadyNodes(p2pCandidates: P2pCandidate[]): void {
this.emit('beforeReadyNodes', { nodes: p2pCandidates })
}
} }
export class MultimeterWatcher implements Watcher { export class MultimeterWatcher implements Watcher {
...@@ -176,6 +242,36 @@ export class MultimeterWatcher implements Watcher { ...@@ -176,6 +242,36 @@ export class MultimeterWatcher implements Watcher {
empty : { text : ' ' } empty : { text : ' ' }
}) })
} }
/************* NOT IMPLEMENTED ****************/
addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void {
}
failToGetChunk(chunkIndex: number, node: P2pCandidate): void {
}
gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void {
}
gotChunk(chunkIndex: number, node: P2pCandidate): void {
}
reserveNodes(nodesAvailable: P2pCandidate[]): void {
}
unableToDownloadChunk(chunkIndex: number): void {
}
wantToDownload(chunkIndex: number): void {
}
wantToLoad(chunkIndex: number): void {
}
beforeReadyNodes(p2pCandidates: P2pCandidate[]): void {
}
} }
export class LoggerWatcher implements Watcher { export class LoggerWatcher implements Watcher {
...@@ -239,4 +335,33 @@ export class LoggerWatcher implements Watcher { ...@@ -239,4 +335,33 @@ export class LoggerWatcher implements Watcher {
end() { end() {
} }
/************* NOT IMPLEMENTED ****************/
addWrongChunkFailure(chunkIndex: number, lastSupplier: P2pCandidate): void {
}
failToGetChunk(chunkIndex: number, node: P2pCandidate): void {
}
gettingChunk(chunkIndex: number, candidates: P2pCandidate[]): void {
}
gotChunk(chunkIndex: number, node: P2pCandidate): void {
}
reserveNodes(nodesAvailable: P2pCandidate[]): void {
}
unableToDownloadChunk(chunkIndex: number): void {
}
wantToDownload(chunkIndex: number): void {
}
wantToLoad(chunkIndex: number): void {
}
beforeReadyNodes(p2pCandidates: P2pCandidate[]): void {
}
} }
...@@ -18,7 +18,7 @@ export class P2pCandidate { ...@@ -18,7 +18,7 @@ export class P2pCandidate {
private reserved = false private reserved = false
constructor( constructor(
private p: PeerDTO, public p: PeerDTO,
private keypair: Keypair, private keypair: Keypair,
private logger: any, private logger: any,
private allowLocalSync: boolean, private allowLocalSync: boolean,
...@@ -131,6 +131,10 @@ export class P2pCandidate { ...@@ -131,6 +131,10 @@ export class P2pCandidate {
reserve() { reserve() {
this.reserved = true this.reserved = true
} }
get apiName() {
return this.api && this.api.type
}
} }
interface RemoteAPI { interface RemoteAPI {
......
...@@ -62,6 +62,7 @@ export class DownloadStream extends Duplex { ...@@ -62,6 +62,7 @@ export class DownloadStream extends Duplex {
let chunk: BlockDTO[] let chunk: BlockDTO[]
// We don't have the file locally: we loop on P2P download until we have it (or until P2P throws a general error) // We don't have the file locally: we loop on P2P download until we have it (or until P2P throws a general error)
do { do {
this.watcher.wantToLoad(i)
chunk = await downloader.getChunk(i) chunk = await downloader.getChunk(i)
if (chunk.length) { if (chunk.length) {
// NewLogger().info("Chunk #%s is COMPLETE", i) // NewLogger().info("Chunk #%s is COMPLETE", i)
......
...@@ -27,6 +27,7 @@ export class WS2PDocpoolPuller { ...@@ -27,6 +27,7 @@ export class WS2PDocpoolPuller {
const requester = WS2PRequester.fromConnection(this.connection) const requester = WS2PRequester.fromConnection(this.connection)
// node.pubkey = p.pubkey; // node.pubkey = p.pubkey;
return pullSandboxToLocalServer(this.server.conf.currency, { return pullSandboxToLocalServer(this.server.conf.currency, {
type: 'WS2P',
getRequirementsPending: (minCert = 1) => { getRequirementsPending: (minCert = 1) => {
return requester.getRequirementsPending(minCert) return requester.getRequirementsPending(minCert)
}, },
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment