Mise à jour de GitLab prévue ce samedi 23 octobre 2021 à partir de 9h00 CET

Commit 731b4b0e authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] allow WS2P sync on a single WS2P peer

parent c58e9136
......@@ -311,6 +311,9 @@ export const CommonConstants = {
BLOCKS_COLLECT_THRESHOLD: 30, // Number of blocks to wait before trimming the loki data
DEFAULT_NON_WOT_PEERS_LIMIT: 100, // Number of non-wot peers accepted in our peer document pool
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER: 20000, // Reject after 20 seconds without any change
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_MAX_FAILS: 5, // Maximum number of rejections of waiting for an available node
}
function exact (regexpContent:string) {
......
export enum DataErrors {
REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE,
NO_NODE_FOUND_TO_DOWNLOAD_CHUNK,
WRONG_CURRENCY_DETECTED,
NO_PEERING_AVAILABLE_FOR_SYNC,
REMOTE_HAS_NO_CURRENT_BLOCK,
......
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
export function newRejectTimeoutPromise(timeout: number) {
return new Promise((res, rej) => {
setTimeout(rej, timeout)
})
}
......@@ -96,7 +96,7 @@ export const CrawlerDependency = {
onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any): Promise<any> => {
const source = params[0]
let currency = params[1]
const to = params.upTo
const to = program.upTo
const HOST_PATTERN = /^[^:/]+(:[0-9]{1,5})?$/
const FILE_PATTERN = /^(\/.+)$/
if (!source || !(source.match(HOST_PATTERN) || source.match(FILE_PATTERN))) {
......
......@@ -137,7 +137,7 @@ export abstract class AbstractDAO extends PullingDao {
const applyCoroutine = async (peer:PeerDTO, blocks:BlockDTO[]) => {
if (blocks.length > 0) {
let isFork = localCurrent
// && localCurrent.number !== -1
&& localCurrent.number !== -1
&& !(blocks[0].previousHash == localCurrent.hash
&& blocks[0].number == localCurrent.number + 1);
if (!isFork) {
......
......@@ -10,6 +10,12 @@ import {Keypair} from "../../../../lib/dto/ConfDTO";
import {IRemoteContacter} from "./IRemoteContacter";
import {Querable} from "../../../../lib/common-libs/querable";
import {cat} from "shelljs";
import {ManualPromise, newManualPromise} from "../../../../lib/common-libs/manual-promise"
import {GlobalFifoPromise} from "../../../../service/GlobalFifoPromise"
import {getNanosecondsTime} from "../../../../ProcessCpuProfiler"
import {CommonConstants} from "../../../../lib/common-libs/constants"
import {DataErrors} from "../../../../lib/common-libs/errors"
import {newRejectTimeoutPromise} from "../../../../lib/common-libs/timeout-promise"
const makeQuerablePromise = require('querablep');
......@@ -17,8 +23,6 @@ export class P2PSyncDownloader implements ISyncDownloader {
private PARALLEL_PER_CHUNK = 1;
private MAX_DELAY_PER_DOWNLOAD = cliprogram.slow ? 15000 : 5000;
private WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED = 3000;
private NO_NODES_AVAILABLE = "No node available for download";
private TOO_LONG_TIME_DOWNLOAD:string
private nbBlocksToDownload:number
private numberOfChunksToDownload:number
......@@ -29,6 +33,8 @@ export class P2PSyncDownloader implements ISyncDownloader {
private nbDownloading = 0
private lastAvgDelay:number
private downloads: { [chunk: number]: any } = {}
private fifoPromise = new GlobalFifoPromise()
private nbWaitFailed = 0
constructor(
private currency: string,
......@@ -49,84 +55,96 @@ export class P2PSyncDownloader implements ISyncDownloader {
// Create slots of download, in a ready stage
this.lastAvgDelay = this.MAX_DELAY_PER_DOWNLOAD;
}
/**
* Get a list of P2P nodes to use for download.
* If a node is not yet correctly initialized (we can test a node before considering it good for downloading), then
* this method would not return it.
*/
private async getP2Pcandidates(): Promise<any[]> {
let promises = this.peers.reduce((chosens:Querable<ProfiledNode>[], thePeer, index:number) => {
if (!this.nodes[index]) {
// Create the node
let p = PeerDTO.fromJSONObject(thePeer)
this.nodes[index] = makeQuerablePromise((async () => {
const bmaAPI = p.getBMA()
const ws2pAPI = p.getFirstNonTorWS2P()
const apis: { host: string, port: number, path?: string }[] = []
const bmaHost = bmaAPI.dns || bmaAPI.ipv4 || bmaAPI.ipv6
if (bmaAPI.port && bmaHost) {
apis.push({
port: bmaAPI.port,
host: bmaHost
})
}
if (ws2pAPI) {
apis.push(ws2pAPI)
}
let syncApi: any = null
try {
syncApi = await RemoteSynchronizer.getSyncAPI(this.currency, apis, this.keypair)
} catch (e) {
}
for (const thePeer of peers) {
// Create the node
let p = PeerDTO.fromJSONObject(thePeer)
this.nodes.push(makeQuerablePromise((async () => {
const bmaAPI = p.getBMA()
const ws2pAPI = p.getFirstNonTorWS2P()
const apis: { isBMA?: boolean, isWS2P?: boolean, host: string, port: number, path?: string }[] = []
const bmaHost = bmaAPI.dns || bmaAPI.ipv4 || bmaAPI.ipv6
if (bmaAPI.port && bmaHost) {
apis.push({
isBMA: true,
port: bmaAPI.port,
host: bmaHost
})
}
if (ws2pAPI) {
apis.push({
isWS2P: true,
host: ws2pAPI.host,
port: ws2pAPI.port,
path: ws2pAPI.path,
})
}
let syncApi: any = null
try {
syncApi = await RemoteSynchronizer.getSyncAPI(this.currency, apis, this.keypair)
const manualp = newManualPromise<boolean>()
manualp.resolve(true)
const node: ProfiledNode = {
api: syncApi && syncApi.api,
connected: !!syncApi,
api: syncApi.api,
tta: 1,
ttas: [],
nbSuccess: 1,
excluded: false,
downloading: false,
readyForDownload: manualp,
hostName: syncApi && syncApi.api.hostName || '',
}
if (node.hostName.match(/^(localhost|192|127)/)) {
node.tta = this.MAX_DELAY_PER_DOWNLOAD
}
return node
})())
chosens.push(this.nodes[index]);
} else {
chosens.push(this.nodes[index]);
}
// Continue
return chosens;
}, []);
const eventuals:ProfiledNode[] = await Promise.all(promises)
const candidates: ProfiledNode[] = eventuals.filter(c => c.connected) as ProfiledNode[]
candidates.forEach((c) => {
c.tta = c.tta || 0; // By default we say a node is super slow to answer
c.ttas = c.ttas || []; // Memorize the answer delays
});
if (candidates.length === 0) {
throw this.NO_NODES_AVAILABLE;
}
// We remove the nodes impossible to reach (timeout)
let withGoodDelays = Underscore.filter(candidates, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && !c.downloading);
if (withGoodDelays.length === 0) {
await new Promise(res => setTimeout(res, this.WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED)) // We wait a bit before continuing the downloads
// We reinitialize the nodes
this.nodes = []
// And try it all again
return this.getP2Pcandidates();
} catch (e) {
this.logger.warn(e)
return newManualPromise() // Which never resolves, so this node won't be used
}
})()))
}
const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, withGoodDelays.length);
withGoodDelays = Underscore.sortBy(withGoodDelays, (c:any) => c.tta);
withGoodDelays = withGoodDelays.slice(0, parallelMax);
// We temporarily augment the tta to avoid asking several times to the same node in parallel
withGoodDelays.forEach((c:any) => c.tta = this.MAX_DELAY_PER_DOWNLOAD);
return withGoodDelays;
}
private async wait4AnAvailableNode(): Promise<any> {
let promises: Promise<any>[] = this.nodes
return await Promise.race(promises.concat(newRejectTimeoutPromise(CommonConstants.REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER)
.catch(() => {
if (this.nbWaitFailed >= CommonConstants.REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_MAX_FAILS) {
this.logger.error("Impossible sync: no more compliant nodes to download from")
process.exit(2)
}
else {
throw Error(DataErrors[DataErrors.REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE])
}
})))
}
/**
* Get a list of P2P nodes to use for download.
* If a node is not yet correctly initialized (we can test a node before considering it good for downloading), then
* this method would not return it.
*/
private async getP2Pcandidates(): Promise<ProfiledNode[]> {
return this.fifoPromise.pushFIFOPromise('getP2Pcandidates_' + getNanosecondsTime(), async () => {
// We wait to have at least 1 available node
await this.wait4AnAvailableNode()
// We filter on all the available nodes, since serveral can be ready at the same time
const readyNodes:ProfiledNode[] = await Promise.all(this.nodes.filter(p => p.isResolved()))
// We remove the nodes impossible to reach (timeout)
let withGoodDelays = Underscore.filter(readyNodes, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD)
if (withGoodDelays.length === 0) {
readyNodes.map(c => {
if (c.tta >= this.MAX_DELAY_PER_DOWNLOAD) {
c.tta = this.MAX_DELAY_PER_DOWNLOAD - 1
}
})
}
const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, withGoodDelays.length)
withGoodDelays = Underscore.sortBy(withGoodDelays, c => c.tta)
withGoodDelays = withGoodDelays.slice(0, parallelMax)
// We temporarily augment the tta to avoid asking several times to the same node in parallel
withGoodDelays.forEach(c => c.tta = this.MAX_DELAY_PER_DOWNLOAD)
return withGoodDelays
})
}
/**
......@@ -143,12 +161,16 @@ export class P2PSyncDownloader implements ISyncDownloader {
this.logger.warn('Excluding node %s as it returns unchainable chunks', [lastSupplier.host, lastSupplier.port].join(':'))
}
let candidates = await this.getP2Pcandidates();
if (candidates.length === 0) {
this.logger.warn('No node found to download this chunk.')
throw Error(DataErrors[DataErrors.NO_NODE_FOUND_TO_DOWNLOAD_CHUNK])
}
// Book the nodes
return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:ProfiledNode) => {
try {
const start = Date.now();
this.handler[chunkIndex] = node;
node.downloading = true;
node.readyForDownload = newManualPromise()
this.nbDownloading++;
this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName);
let blocks = await node.api.getBlocks(count, from);
......@@ -173,12 +195,12 @@ export class P2PSyncDownloader implements ISyncDownloader {
this.nbDownloadsTried++;
this.nbDownloading--;
node.downloading = false;
node.readyForDownload.resolve(true)
return blocks;
} catch (e) {
this.nbDownloading--;
node.downloading = false;
node.readyForDownload.resolve(true)
this.nbDownloadsTried++;
node.ttas.push(this.MAX_DELAY_PER_DOWNLOAD + 1); // No more ask on this node
// Average time to answer
......@@ -244,7 +266,5 @@ interface ProfiledNode {
ttas: number[]
nbSuccess: number
hostName: string
connected: boolean
excluded: boolean
downloading: boolean
readyForDownload: ManualPromise<boolean>
}
......@@ -31,7 +31,7 @@ import {pullSandboxToLocalServer} from "../sandbox"
import * as path from 'path'
import {IRemoteContacter} from "./IRemoteContacter";
import {BMARemoteContacter} from "./BMARemoteContacter";
import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "../../../ws2p/lib/WS2PConnection";
import {WS2PConnection, WS2PPubkeyRemoteAuth, WS2PPubkeySyncLocalAuth} from "../../../ws2p/lib/WS2PConnection";
import {WS2PRequester} from "../../../ws2p/lib/WS2PRequester";
import {WS2PMessageHandler} from "../../../ws2p/lib/impl/WS2PMessageHandler";
import {WS2PResponse} from "../../../ws2p/lib/impl/WS2PResponse";
......@@ -114,7 +114,7 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
;(this.node as any).pubkey = this.peer.pubkey
}
public static async getSyncAPI(currency: string, hosts: { host: string, port: number, path?: string }[], keypair: Keypair) {
public static async getSyncAPI(currency: string, hosts: { isBMA?: boolean, isWS2P?: boolean, host: string, port: number, path?: string }[], keypair: Keypair) {
let api: IRemoteContacter|undefined
let peering: any
for (const access of hosts) {
......@@ -122,16 +122,20 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
const port = access.port
const path = access.path
logger.info(`Connecting to address ${host} :${port}...`)
try {
const contacter = await connect(PeerDTO.fromJSONObject({ endpoints: [`BASIC_MERKLED_API ${host} ${port}${path && ' ' + path || ''}`]}), 3000)
peering = await contacter.getPeer()
api = new BMARemoteContacter(contacter)
} catch (e) {
logger.warn(`Node does not support BMA at address ${host} :${port}, trying WS2P...`)
// If we know this is a WS2P connection, don't try BMA
if (access.isWS2P !== true) {
try {
const contacter = await connect(PeerDTO.fromJSONObject({ endpoints: [`BASIC_MERKLED_API ${host} ${port}${path && ' ' + path || ''}`]}), 3000)
peering = await contacter.getPeer()
api = new BMARemoteContacter(contacter)
} catch (e) {
logger.warn(`Node does not support BMA at address ${host} :${port}, trying WS2P...`)
}
}
// If BMA is unreachable, let's try WS2P
if (!api) {
// If BMA is unreachable and the connection is not marked as strict BMA, let's try WS2P
if (!api && access.isBMA !== true) {
const pair = KeyGen(keypair.pub, keypair.sec)
const connection = WS2PConnection.newConnectionToAddress(1,
`ws://${host}:${port}${path && ' ' + path || ''}`,
......@@ -143,13 +147,9 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
logger.warn('Receiving push messages, which are not allowed during a SYNC.', json)
}
}),
new WS2PPubkeyLocalAuth(currency, pair, '00000000'),
new WS2PPubkeySyncLocalAuth(currency, pair, '00000000'),
new WS2PPubkeyRemoteAuth(currency, pair),
undefined,
{
connectionTimeout: 1500,
requestTimeout: 1500,
}
undefined
)
try {
const requester = WS2PRequester.fromConnection(connection)
......
......@@ -772,13 +772,13 @@ export class WS2PCluster {
// Sync case is specific
if (isSync) {
// OK for reconnection period of time
if (this.ok4reconnect[pub]) {
return true
}
if (this.banned4Sync[pub]) {
return false
}
// Already connected
if (syncConnectedPubkeys.indexOf(pub) !== -1) {
return !!this.ok4reconnect[pub]
}
const limit = (this.server.conf.ws2p && this.server.conf.ws2p.syncLimit) || WS2PConstants.WS2P_SYNC_LIMIT
const ok = syncConnectedPubkeys.length < limit
if (ok) {
......
......@@ -129,7 +129,7 @@ export class WS2PServer extends events.EventEmitter {
}
})
// We close the connection after a given delay
setTimeout(() => c.close(), WS2PConstants.SYNC_CONNECTION_DURATION_IN_SECONDS)
setTimeout(() => c.close(), 1000 * WS2PConstants.SYNC_CONNECTION_DURATION_IN_SECONDS)
// We don't broadcast or pipe data
return
}
......
......@@ -57,7 +57,7 @@ export const WS2PConstants = {
},
BAN_DURATION_IN_SECONDS: 120,
SYNC_BAN_DURATION_IN_SECONDS: 240,
SYNC_BAN_DURATION_IN_SECONDS: 2400,
BAN_ON_REPEAT_THRESHOLD: 5,
ERROR_RECALL_DURATION_IN_SECONDS: 60,
SINGLE_RECORD_PROTECTION_IN_SECONDS: 60,
......@@ -98,5 +98,5 @@ export const WS2PConstants = {
HEADS_SPREAD_TIMEOUT: 100, // Wait 100ms before sending a bunch of signed heads
WS2P_SYNC_LIMIT: 15, // Number of concurrent peers for sync
SYNC_CONNECTION_DURATION_IN_SECONDS: 120, // Duration of the SYNC connection
SYNC_CONNECTION_DURATION_IN_SECONDS: 1200, // Duration of the SYNC connection
}
\ No newline at end of file
......@@ -11,9 +11,7 @@
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {MerkleDTO} from "../../../app/lib/dto/MerkleDTO"
import {hashf} from "../../../app/lib/common"
import {processForURL} from "../../../app/lib/helpers/merkle"
import {fakeSyncServer} from "../tools/toolbox"
import {Underscore} from "../../../app/lib/common-libs/underscore"
......@@ -39,13 +37,7 @@ describe("CLI", function() {
*/
const onReadBlockchainChunk = (count:number, from:number) => Promise.resolve(blockchain.blocks.slice(from, from + count));
const onReadParticularBlock = (number:number) => Promise.resolve(blockchain.blocks[number]);
const onPeersRequested = async (req:any) => {
const merkle = new MerkleDTO();
merkle.initialize(leaves);
return processForURL(req, merkle, async () => {
return peersMap;
})
}
const onPeersRequested = async () => []
/**
* The fake hash in the blockchain
......@@ -134,14 +126,14 @@ describe("CLI", function() {
it('sync 7 blocks (fast)', async () => {
await execute(['reset', 'data']);
await execute(['sync', fakeServer.host + ':' + String(fakeServer.port), '7', 'duniter_unit_test_currency', '--nocautious', '--nointeractive', '--noshuffle']);
await execute(['sync', fakeServer.host + ':' + String(fakeServer.port), 'duniter_unit_test_currency', '--nocautious', '--nointeractive', '--noshuffle', '--up-to', '7']);
const res = await execute(['export-bc', '--nostdout']);
res[res.length - 1].should.have.property('number').equal(7);
res.should.have.length(7 + 1); // blocks #0..#7
})
it('sync 4 blocks (cautious)', async () => {
await execute(['sync', fakeServer.host + ':' + String(fakeServer.port), '11', 'duniter_unit_test_currency', '--nointeractive']);
await execute(['sync', fakeServer.host + ':' + String(fakeServer.port), 'duniter_unit_test_currency', '--nointeractive', '--up-to', '11']);
const res = await execute(['export-bc', '--nostdout']);
res[res.length - 1].should.have.property('number').equal(11);
res.should.have.length(11 + 1);
......@@ -154,7 +146,7 @@ describe("CLI", function() {
})
it('[spawn] sync 10 first blocks --memory', async () => {
await execute(['sync', fakeServer.host + ':' + String(fakeServer.port), '10', 'duniter_unit_test_currency', '--memory', '--cautious', '--nointeractive']);
await execute(['sync', fakeServer.host + ':' + String(fakeServer.port), 'duniter_unit_test_currency', '--memory', '--cautious', '--nointeractive', '--up-to', '10']);
})
});
......
......@@ -188,7 +188,7 @@ export const fakeSyncServer = async (currency: string, readBlocksMethod:any, rea
}, noLimit);
// Mock BMA method for sync mocking
httpMethods.httpGET('/network/peering/peers', onPeersRequested, noLimit);
httpMethods.httpGET('/network/peers', onPeersRequested, noLimit);
// Another mock BMA method for sync mocking
httpMethods.httpGET('/blockchain/blocks/:count/:from', (req:any) => {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment