Commit 3be85224 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] Sync was not using P2P correctly

parent d37aa060
......@@ -316,8 +316,7 @@ export const CommonConstants = {
DEFAULT_NON_WOT_PEERS_LIMIT: 100, // Number of non-wot peers accepted in our peer document pool
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER: 20000, // Reject after 20 seconds without any change
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_MAX_FAILS: 5, // Maximum number of rejections of waiting for an available node
WAIT_P2P_CANDIDATE_HEARTBEAT: 5000, // Wait X seconds for a node to answer about its state
MAX_READING_SLOTS_FOR_FILE_SYNC: 20, // Number of file reading in parallel
}
......
......@@ -72,8 +72,8 @@ export function randomKey() {
}
const keypair = nacl.sign.keyPair.fromSeed(byteseed)
return new Key(
Base58encode(keypair.publicKey),
Base58encode(keypair.secretKey)
Base58encode(new Buffer(keypair.publicKey)),
Base58encode(new Buffer(keypair.secretKey))
)
}
......
......@@ -4,9 +4,7 @@ export enum DataErrors {
INVALID_LEVELDB_IINDEX_DATA_TO_BE_KICKED,
IDENTITY_UID_NOT_FOUND,
INVALID_TRIMMABLE_DATA,
SYNC_FAST_MEM_ERROR_DURING_INJECTION,
CANNOT_GET_VALIDATION_BLOCK_FROM_REMOTE,
REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE,
NO_NODE_FOUND_TO_DOWNLOAD_CHUNK,
WRONG_CURRENCY_DETECTED,
NO_PEERING_AVAILABLE_FOR_SYNC,
......@@ -20,7 +18,6 @@ export enum DataErrors {
NEGATIVE_BALANCE,
BLOCK_WASNT_COMMITTED,
CANNOT_ARCHIVE_CHUNK_WRONG_SIZE,
CORRUPTED_DATABASE,
BLOCKCHAIN_NOT_INITIALIZED_YET,
CANNOT_DETERMINATE_MEMBERSHIP_AGE,
CANNOT_DETERMINATE_IDENTITY_AGE,
......
......@@ -19,7 +19,7 @@ export interface ProgramOptions {
notrim?: boolean
nosbx?: boolean
nopeers?: boolean
p2psync?: boolean
nop2p?: boolean
syncTrace?: string
isSync: boolean
noSources: boolean
......@@ -36,7 +36,7 @@ export const cliprogram: ProgramOptions = {
notrim: opts.notrim,
nosbx: opts.nosbx,
nopeers: opts.nopeers,
p2psync: opts.p2psync,
nop2p: opts.nop2p,
noSources: !!opts.nosources,
syncTrace: opts['sync-trace'],
isSync: opts._[0] === 'sync',
......
......@@ -16,3 +16,9 @@ export function newRejectTimeoutPromise(timeout: number) {
setTimeout(rej, timeout)
})
}
export function newResolveTimeoutPromise<T>(timeout: number, value: T): Promise<T> {
return new Promise(res => {
setTimeout(() => res(value), timeout)
})
}
......@@ -76,7 +76,7 @@ export const CrawlerDependency = {
{ value: '--nocautious', desc: 'Do not check blocks validity during sync.'},
{ value: '--cautious', desc: 'Check blocks validity during sync (overrides --nocautious option).'},
{ value: '--nopeers', desc: 'Do not retrieve peers during sync.'},
{ value: '--p2psync', desc: 'Force P2P downloading of blocs during sync.'},
{ value: '--nop2p', desc: 'Disables P2P downloading of blocs during sync.'},
{ value: '--nosources', desc: 'Do not parse sources (UD, TX) during sync (debug purposes).'},
{ value: '--nosbx', desc: 'Do not retrieve sandboxes during sync.'},
{ value: '--onlypeers', desc: 'Will only try to sync peers.'},
......@@ -86,8 +86,8 @@ export const CrawlerDependency = {
],
cli: [{
name: 'sync [source] [to] [currency]',
desc: 'Synchronize blockchain from a remote Duniter node',
name: 'sync [source] [to]',
desc: 'Synchronize blockchain from a remote Duniter node. [source] is [host][:port]. [to] defaults to remote current block number.',
preventIfRunning: true,
onConfiguredExecute: async (server:Server) => {
await server.resetData();
......
......@@ -15,5 +15,4 @@ export abstract class ASyncDownloader implements ISyncDownloader {
abstract maxSlots: number
abstract getChunk(i: number): Promise<BlockDTO[]>
abstract getTimesToAnswer(): Promise<{ ttas: number[] }[]>
}
......@@ -43,8 +43,4 @@ export class FsSyncDownloader extends ASyncDownloader implements ISyncDownloader
get maxSlots(): number {
return CommonConstants.MAX_READING_SLOTS_FOR_FILE_SYNC
}
async getTimesToAnswer(): Promise<{ ttas: number[] }[]> {
return [{ ttas: this.ttas }]
}
}
......@@ -5,5 +5,4 @@ export interface ISyncDownloader {
getBlock(number: number): Promise<BlockDTO|null>
maxSlots: number
chunkSize: number
getTimesToAnswer(): Promise<{ ttas: number[] }[]>
}
......@@ -5,20 +5,15 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {Watcher} from "./Watcher"
import {ISyncDownloader} from "./ISyncDownloader"
import {cliprogram} from "../../../../lib/common-libs/programOptions"
import {RemoteSynchronizer} from "./RemoteSynchronizer";
import {Keypair} from "../../../../lib/dto/ConfDTO";
import {IRemoteContacter} from "./IRemoteContacter";
import {Querable} from "../../../../lib/common-libs/querable";
import {cat} from "shelljs";
import {Keypair} from "../../../../lib/dto/ConfDTO"
import {IRemoteContacter} from "./IRemoteContacter"
import {ManualPromise, newManualPromise} from "../../../../lib/common-libs/manual-promise"
import {GlobalFifoPromise} from "../../../../service/GlobalFifoPromise"
import {getNanosecondsTime} from "../../../../ProcessCpuProfiler"
import {CommonConstants} from "../../../../lib/common-libs/constants"
import {DataErrors} from "../../../../lib/common-libs/errors"
import {newRejectTimeoutPromise} from "../../../../lib/common-libs/timeout-promise"
import {ASyncDownloader} from "./ASyncDownloader"
const makeQuerablePromise = require('querablep');
import {P2pCandidate} from "./p2p/p2p-candidate"
export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloader {
......@@ -29,10 +24,10 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
private numberOfChunksToDownload:number
private processing:any
private handler:any
private nodes: Querable<ProfiledNode>[] = []
private p2pCandidates: P2pCandidate[] = []
private nbDownloadsTried = 0
private nbDownloading = 0
private downloads: { [chunk: number]: ProfiledNode } = {}
private downloads: { [chunk: number]: P2pCandidate } = {}
private fifoPromise = new GlobalFifoPromise()
private nbWaitFailed = 0
......@@ -53,70 +48,25 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
this.processing = Array.from({ length: this.numberOfChunksToDownload }).map(() => false);
this.handler = Array.from({ length: this.numberOfChunksToDownload }).map(() => null);
for (const thePeer of peers) {
// Create the node
let p = PeerDTO.fromJSONObject(thePeer)
this.nodes.push(makeQuerablePromise((async () => {
const bmaAPI = p.getBMA()
const ws2pAPI = p.getFirstNonTorWS2P()
const apis: { isBMA?: boolean, isWS2P?: boolean, host: string, port: number, path?: string }[] = []
const bmaHost = bmaAPI.dns || bmaAPI.ipv4 || bmaAPI.ipv6
if (bmaAPI.port && bmaHost) {
apis.push({
isBMA: true,
port: bmaAPI.port,
host: bmaHost
})
}
if (ws2pAPI) {
apis.push({
isWS2P: true,
host: ws2pAPI.host,
port: ws2pAPI.port,
path: ws2pAPI.path,
})
}
let syncApi: any = null
try {
syncApi = await RemoteSynchronizer.getSyncAPI(apis, this.keypair)
const manualp = newManualPromise<boolean>()
manualp.resolve(true)
const node: ProfiledNode = {
api: syncApi.api,
tta: 1,
ttas: [],
nbSuccess: 1,
excluded: false,
readyForDownload: manualp,
hostName: syncApi && syncApi.api.hostName || '',
}
if (node.hostName.match(/^(localhost|192|127)/)) {
node.tta = this.MAX_DELAY_PER_DOWNLOAD
}
return node
} catch (e) {
return newManualPromise() // Which never resolves, so this node won't be used
}
})()))
}
this.p2pCandidates = peers.map(p => new P2pCandidate(PeerDTO.fromJSONObject(p), this.keypair, this.logger))
}
get maxSlots(): number {
return this.nodes.length
return this.p2pCandidates.filter(p => p.hasAvailableApi()).length
}
private async wait4AnAvailableNode(): Promise<any> {
let promises: Promise<any>[] = this.nodes
return await Promise.race(promises.concat(newRejectTimeoutPromise(CommonConstants.REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER)
.catch(() => {
if (this.nbWaitFailed >= CommonConstants.REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_MAX_FAILS) {
this.logger.error("Impossible sync: no more compliant nodes to download from")
process.exit(2)
}
else {
throw Error(DataErrors[DataErrors.REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE])
}
})))
private async waitForAvailableNodes(needed = 1): Promise<P2pCandidate[]> {
let nodesToWaitFor = this.p2pCandidates.slice()
let nodesAvailable: P2pCandidate[] = []
let i = 0
while (nodesAvailable.length < needed && i < needed) {
await Promise.race(nodesToWaitFor.map(p => p.waitAvailability(CommonConstants.WAIT_P2P_CANDIDATE_HEARTBEAT)))
const readyNodes = nodesToWaitFor.filter(p => p.isReady())
nodesToWaitFor = nodesToWaitFor.filter(p => !p.isReady())
nodesAvailable = nodesAvailable.concat(readyNodes)
i++
}
return nodesAvailable
}
/**
......@@ -124,34 +74,19 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
* If a node is not yet correctly initialized (we can test a node before considering it good for downloading), then
* this method would not return it.
*/
private async getP2Pcandidates(): Promise<ProfiledNode[]> {
private async getP2Pcandidates(chunkIndex: number): Promise<P2pCandidate[]> {
return this.fifoPromise.pushFIFOPromise('getP2Pcandidates_' + getNanosecondsTime(), async () => {
// We wait to have at least 1 available node
await this.wait4AnAvailableNode()
// We filter on all the available nodes, since serveral can be ready at the same time
const readyNodes:ProfiledNode[] = await Promise.all(this.nodes.filter(p => p.isResolved()))
// We wait a bit to have some available nodes
const readyNodes = await this.waitForAvailableNodes()
// We remove the nodes impossible to reach (timeout)
let withGoodDelays = Underscore.filter(readyNodes, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && c.readyForDownload.isResolved())
if (withGoodDelays.length === 0) {
readyNodes.map(c => {
if (c.tta >= this.MAX_DELAY_PER_DOWNLOAD) {
c.tta = this.MAX_DELAY_PER_DOWNLOAD - 1
}
})
}
const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, withGoodDelays.length)
withGoodDelays = Underscore.sortBy(withGoodDelays, c => c.tta)
withGoodDelays = withGoodDelays.slice(0, parallelMax)
// We temporarily augment the tta to avoid asking several times to the same node in parallel
withGoodDelays.forEach(c => {
c.tta = this.MAX_DELAY_PER_DOWNLOAD
c.readyForDownload = newManualPromise()
})
if (withGoodDelays.length === 0) {
this.logger.warn('No node found to download this chunk.')
let byAvgAnswerTime = Underscore.sortBy(readyNodes, p => p.avgResponseTime())
const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, byAvgAnswerTime.length)
byAvgAnswerTime = byAvgAnswerTime.slice(0, parallelMax)
if (byAvgAnswerTime.length === 0) {
this.logger.warn('No node found to download chunk #%s.', chunkIndex)
throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
}
return withGoodDelays
return byAvgAnswerTime
})
}
......@@ -165,41 +100,28 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
// if this chunk has already been downloaded before, we exclude its supplier node from the download list as it won't give correct answer now
const lastSupplier = this.downloads[chunkIndex]
if (lastSupplier) {
lastSupplier.excluded = true
this.logger.warn('Excluding node %s as it returns unchainable chunks', lastSupplier.hostName)
lastSupplier.addFailure()
}
let candidates = await this.getP2Pcandidates();
// Only 1 candidate for now
const candidates = await this.getP2Pcandidates(chunkIndex)
// Book the nodes
return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:ProfiledNode) => {
return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node) => {
try {
const start = Date.now();
this.handler[chunkIndex] = node;
this.nbDownloading++;
this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
let blocks = await node.api.getBlocks(count, from);
node.ttas.push(Date.now() - start);
// Only keep a flow of 5 ttas for the node
if (node.ttas.length > 5) node.ttas.shift();
// Average time to answer
node.tta = Math.round(node.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / node.ttas.length);
let blocks = await node.downloadBlocks(count, from);
this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
if (this.PARALLEL_PER_CHUNK === 1) {
// Only works if we have 1 concurrent peer per chunk
this.downloads[chunkIndex] = node
}
node.nbSuccess++;
this.nbDownloadsTried++;
this.nbDownloading--;
node.readyForDownload.resolve(true)
this.nbDownloadsTried++;
return blocks;
} catch (e) {
this.nbDownloading--;
node.readyForDownload.resolve(true)
this.nbDownloadsTried++;
node.ttas.push(this.MAX_DELAY_PER_DOWNLOAD + 1); // No more ask on this node
// Average time to answer
node.tta = Math.round(node.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / node.ttas.length);
throw e;
}
}))
......@@ -253,11 +175,6 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
getChunk(index:number): Promise<BlockDTO[]> {
return this.downloadChunk(index)
}
async getTimesToAnswer(): Promise<{ ttas: number[] }[]> {
const nodes = await Promise.all(this.nodes.filter(p => p.isResolved()))
return nodes.filter(n => n.ttas.length > 0)
}
}
interface ProfiledNode {
......
......@@ -185,7 +185,8 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
// Peers (just for P2P download)
//=======
let peers:(JSONDBPeer|null)[] = [];
if (cliprogram.p2psync) {
const p2psync = !cliprogram.nop2p
if (p2psync) {
this.watcher.writeStatus('Peers...');
peers = await this.node.getPeers()
}
......
import {Querable, querablep} from "../../../../../lib/common-libs/querable"
import {PeerDTO} from "../../../../../lib/dto/PeerDTO"
import {Keypair} from "../../../../../lib/dto/ConfDTO"
import {RemoteSynchronizer} from "../RemoteSynchronizer"
import {IRemoteContacter} from "../IRemoteContacter"
import {BlockDTO} from "../../../../../lib/dto/BlockDTO"
import {newResolveTimeoutPromise} from "../../../../../lib/common-libs/timeout-promise"
export class P2pCandidate {
private readonly apiPromise: Querable<any>
private dlPromise: Querable<BlockDTO[]|null>
private readonly responseTimes: number[] = []
private api: IRemoteContacter|null|undefined
private nbSuccess = 0
private isExcluded: boolean
private failures = 0
constructor(
private p: PeerDTO,
private keypair: Keypair,
private logger: any
) {
this.apiPromise = this.initAPI()
this.dlPromise = querablep(Promise.resolve(null))
}
addFailure() {
this.failures++
if (this.failures >= 5 && !this.isExcluded) {
this.isExcluded = true
this.logger.warn('Excluding node %s as it returned unchainable chunks %s times', this.hostName, this.failures)
}
}
isReady() {
return this.apiPromise.isResolved() && this.dlPromise.isResolved() && this.api && !this.isExcluded
}
async waitAvailability(maxWait: number): Promise<boolean> {
return Promise.race([
// Wait for availablity
(async () => !this.isExcluded
&& (this.apiPromise.isRejected() ? await newResolveTimeoutPromise(maxWait, false) : !!(await this.apiPromise))
&& (this.dlPromise.isRejected() ? await newResolveTimeoutPromise(maxWait, false) : !!(await this.dlPromise)))(),
// Maximum wait trigger
newResolveTimeoutPromise(maxWait, false)
])
}
hasAvailableApi() {
return !!this.api
}
avgResponseTime() {
return this.responseTimes.reduce((sum, rt) => sum + rt, 0) / this.responseTimes.length
}
get hostName() {
return (this.api && this.api.hostName) || 'NO_API'
}
async downloadBlocks(count: number, from: number) {
const start = Date.now()
let error: Error|undefined
this.dlPromise = querablep((async () => {
// We try to download the blocks
let blocks: BlockDTO[]|null
try {
blocks = await (this.api as IRemoteContacter).getBlocks(count, from)
}
catch (e) {
// Unfortunately this can fail
blocks = null
error = e
}
this.responseTimes.push(Date.now() - start);
// Only keep a flow of 5 ttas for the node
if (this.responseTimes.length > 5) this.responseTimes.shift()
this.nbSuccess++
if (error) {
throw error
}
return blocks
})())
return this.dlPromise
}
private getRemoteAPIs() {
const bmaAPI = this.p.getBMA()
const ws2pAPI = this.p.getFirstNonTorWS2P()
const apis: RemoteAPI[] = []
const bmaHost = bmaAPI.dns || bmaAPI.ipv4 || bmaAPI.ipv6
if (bmaAPI.port && bmaHost) {
apis.push({
isBMA: true,
port: bmaAPI.port,
host: bmaHost
})
}
if (ws2pAPI) {
apis.push({
isWS2P: true,
host: ws2pAPI.host,
port: ws2pAPI.port,
path: ws2pAPI.path,
})
}
return apis
}
private initAPI() {
return querablep((async (): Promise<IRemoteContacter|null> => {
try {
const apis = this.getRemoteAPIs()
const syncApi = await RemoteSynchronizer.getSyncAPI(apis, this.keypair)
if ((syncApi && syncApi.api.hostName || '').match(/^(localhost|192|127)/)) {
return null
}
this.api = syncApi.api
return syncApi.api
} catch (e) {
return null
}
})())
}
}
interface RemoteAPI {
isBMA?: boolean
isWS2P?: boolean
host: string
port: number
path?: string
}
\ No newline at end of file
......@@ -141,186 +141,186 @@ export class GlobalIndexStream extends Duplex {
@MonitorExecutionTime()
private async transform(dataArray:ProtocolIndexesStream[]): Promise<GindexData[]> {
await this.beforeBlocks(dataArray.map(d => d.block))
// await this.beforeBlocks(dataArray.map(d => d.block))
const gindex: GindexData[] = []
for (const data of dataArray) {
const block = data.block
const gData: GindexData = {
lindex: {
mindex: data.mindex.slice(),
iindex: data.iindex.slice(),
sindex: data.sindex.slice(),
cindex: data.cindex.slice(),
},
gindex: {
mindex: [],
iindex: [],
sindex: [],
cindex: [],
},
block,
head: null as any,
}
// VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime
if (block.number == 0) {
this.sync_currConf = BlockDTO.getConf(block)
await DuniterBlockchain.saveParametersForRoot(block, this.conf, this.dal)
}
if (block.number <= this.to - this.conf.forksize || cliprogram.noSources) { // If we require nosources option, this blockchain can't be valid so we don't make checks
const HEAD = await Indexer.quickCompleteGlobalScope(block, this.sync_currConf, sync_bindex, data.iindex, data.mindex, data.cindex, this.dal)
sync_bindex.push(HEAD)
// GINDEX
gData.head = HEAD
// Remember expiration dates
for (const entry of data.cindex) {
if (entry.expires_on) {
sync_expires.push(entry.expires_on)
}
}
for (const entry of data.mindex) {
if (entry.expires_on) {
sync_expires.push(entry.expires_on)
}
}
for (const entry of data.mindex) {
if (entry.revokes_on) {
sync_expires.push(entry.revokes_on)
}
}
if (data.iindex.length) {
await DuniterBlockchain.createNewcomers(data.iindex, this.dal, NewLogger(), this.wotbMem)
}
if ((block.dividend && !cliprogram.noSources)
|| block.joiners.length
|| block.actives.length
|| block.revoked.length
|| block.excluded.length
|| block.certifications.length
|| (block.transactions.length && !cliprogram.noSources)
|| block.medianTime >= sync_nextExpiring) {
const nextExpiringChanged = block.medianTime >= sync_nextExpiring
for (let i = 0; i < sync_expires.length; i++) {
let expire = sync_expires[i];
if (block.medianTime >= expire) {
sync_expires.splice(i, 1);
i--;
}
}
sync_nextExpiring = sync_expires.reduce((max, value) => max ? Math.min(max, value) : value, 9007199254740991); // Far far away date
if (!cliprogram.noSources) {
if (data.sindex.length) {
await this.blockFillTxSourcesConditions(data.sindex)
}