diff --git a/app/lib/common-libs/constants.ts b/app/lib/common-libs/constants.ts index 664cecf585a0edf56cebe60e380ee729408b9c18..98bef61945b1bcdca02331a7c9943843f05ea61f 100755 --- a/app/lib/common-libs/constants.ts +++ b/app/lib/common-libs/constants.ts @@ -301,7 +301,8 @@ export const CommonConstants = { BLOCK_MAX_TX_CHAINING_DEPTH: 5, - CONST_BLOCKS_CHUNK: 250, + ARCHIVES_BLOCKS_CHUNK: 250, + SYNC_BLOCKS_CHUNK: 250, CHUNK_PREFIX: 'chunk_', BLOCKS_IN_MEMORY_MAX: 288 * 60, // 60 days of blocks diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index 76ef93581aed4e6c00f336e3d5b72b7c3a9c15fc..62e3799da2989938997639d3212d095b8b4b0f1a 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -153,7 +153,7 @@ export class FileDAL { this.powDAL = new PowDAL(this.rootPath, params.fs) this.confDAL = new ConfDAL(this.rootPath, params.fs) this.metaDAL = new (require('./sqliteDAL/MetaDAL').MetaDAL)(this.sqliteDriver); - this.blockchainArchiveDAL = new CFSBlockchainArchive(new CFSCore(path.join(this.rootPath, '/archives'), params.fs), CommonConstants.CONST_BLOCKS_CHUNK) + this.blockchainArchiveDAL = new CFSBlockchainArchive(new CFSCore(path.join(this.rootPath, '/archives'), params.fs), CommonConstants.ARCHIVES_BLOCKS_CHUNK) this.blockDAL = new LokiBlockchain(this.loki.getLokiInstance()) this.txsDAL = new LokiTransactions(this.loki.getLokiInstance()) this.statDAL = new StatDAL(this.rootPath, params.fs) diff --git a/app/lib/dto/PeerDTO.ts b/app/lib/dto/PeerDTO.ts index 9fb10b08f28752e8cbd4c3c5ae6675ae193400f7..7391067eee88ac164d1e51d4b98bb5edb0b060ca 100644 --- a/app/lib/dto/PeerDTO.ts +++ b/app/lib/dto/PeerDTO.ts @@ -94,19 +94,21 @@ export class PeerDTO implements Cloneable { } getBMA() { - let bma:any = null; + let bma: { dns?: string, ipv4?: string, ipv6?: string, port?: number } = {} + let notFound = true this.endpoints.forEach((ep) => { - const matches = !bma && ep.match(CommonConstants.BMA_REGEXP); + const matches = notFound && ep.match(CommonConstants.BMA_REGEXP); if (matches) { + notFound = false bma = { "dns": matches[2] || '', "ipv4": matches[4] || '', "ipv6": matches[6] || '', - "port": matches[8] || 9101 + "port": parseInt(matches[8]) || 9101 }; } }); - return bma || {}; + return bma } getOnceWS2PEndpoint(canReachTorEp:boolean, canReachClearEp:boolean, uuidExcluded:string[] = []) { @@ -181,6 +183,10 @@ export class PeerDTO implements Cloneable { return apis } + getFirstNonTorWS2P() { + return this.getOnceWS2PEndpoint(false, true) + } + getDns() { const bma = this.getBMA(); return bma.dns ? bma.dns : null; diff --git a/app/modules/bma/lib/bma.ts b/app/modules/bma/lib/bma.ts index 575ab3eb05369d70d38d08a9398394769eacabff..129bcbc15fcfb5ed4af0ff47a360fd024776a73a 100644 --- a/app/modules/bma/lib/bma.ts +++ b/app/modules/bma/lib/bma.ts @@ -48,7 +48,7 @@ export const bma = function(server:Server, interfaces:NetworkInterface[]|null, h } } - return Network.createServersAndListen('Duniter server', server, interfaces, httpLogs, logger, null, (app:any, httpMethods:any) => { + return Network.createServersAndListen('BMA server', server, interfaces, httpLogs, logger, null, (app:any, httpMethods:any) => { const node = new NodeBinding(server); const blockchain = new BlockchainBinding(server) diff --git a/app/modules/bma/lib/network.ts b/app/modules/bma/lib/network.ts index ba36638287c5b4dc7fd47ec7d4454ccc1e0cf052..1d68a32a2fa2e06e20f8dda89471e31c4b6b20f5 100644 --- a/app/modules/bma/lib/network.ts +++ b/app/modules/bma/lib/network.ts @@ -17,6 +17,7 @@ import {BMAConstants} from "./constants" import {BMALimitation} from "./limiter" import {Underscore} from "../../../lib/common-libs/underscore" import {CommonConstants} from "../../../lib/common-libs/constants" +import {NewLogger} from "../../../lib/logger"; const os = require('os'); const Q = require('q'); @@ -28,6 +29,7 @@ const errorhandler = require('errorhandler'); const bodyParser = require('body-parser'); const cors = require('cors'); const fileUpload = require('express-fileupload'); +const logger = NewLogger() export interface NetworkInterface { ip:string|null @@ -240,10 +242,10 @@ export class BmaApi { resolve(httpServer); }); }); - this.logger && this.logger.info(this.name + ' listening on http://' + (netInterface.match(/:/) ? '[' + netInterface + ']' : netInterface) + ':' + port); + logger.info(this.name + ' listening on http://' + (netInterface.match(/:/) ? '[' + netInterface + ']' : netInterface) + ':' + port); } catch (e) { - this.logger && this.logger.warn('Could NOT listen to http://' + netInterface + ':' + port); - this.logger && this.logger.warn(e); + logger.warn('Could NOT listen to http://' + netInterface + ':' + port); + logger.warn(e); } } } diff --git a/app/modules/crawler/index.ts b/app/modules/crawler/index.ts index b6ccfe1b8dfe406118f3b4ea712fb81ea75d1717..9c281406b7147106caafaed721b66e1a97ae7029 100644 --- a/app/modules/crawler/index.ts +++ b/app/modules/crawler/index.ts @@ -26,6 +26,7 @@ import {FileDAL} from "../../lib/dal/fileDAL" import {RemoteSynchronizer} from "./lib/sync/RemoteSynchronizer" import {AbstractSynchronizer} from "./lib/sync/AbstractSynchronizer" import {LocalPathSynchronizer} from "./lib/sync/LocalPathSynchronizer" +import {CommonConstants} from "../../lib/common-libs/constants"; export const CrawlerDependency = { duniter: { @@ -49,7 +50,7 @@ export const CrawlerDependency = { }, synchronize: (currency: string, server:Server, onHost:string, onPort:number, upTo:number, chunkLength:number) => { - const strategy = new RemoteSynchronizer(currency, onHost, onPort, server) + const strategy = new RemoteSynchronizer(currency, onHost, onPort, server, chunkLength) const remote = new Synchroniser(server, strategy) const syncPromise = (async () => { await server.dal.disableChangesAPI() @@ -84,7 +85,8 @@ export const CrawlerDependency = { { value: '--onlypeers', desc: 'Will only try to sync peers.'}, { value: '--slow', desc: 'Download slowly the blokchcain (for low connnections).'}, { value: '--readfilesystem',desc: 'Also read the filesystem to speed up block downloading.'}, - { value: '--minsig <minsig>', desc: 'Minimum pending signatures count for `crawl-lookup`. Default is 5.'} + { value: '--minsig <minsig>', desc: 'Minimum pending signatures count for `crawl-lookup`. Default is 5.'}, + { value: '--up-to <block-number>', desc: 'Block number to reach.'}, ], cli: [{ @@ -92,9 +94,9 @@ export const CrawlerDependency = { desc: 'Synchronize blockchain from a remote Duniter node', preventIfRunning: true, onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any): Promise<any> => { - const source = params[0] - const to = params[1] - const currency = params[2] + const source = params[0] + let currency = params[1] + const to = params.upTo const HOST_PATTERN = /^[^:/]+(:[0-9]{1,5})?$/ const FILE_PATTERN = /^(\/.+)$/ if (!source || !(source.match(HOST_PATTERN) || source.match(FILE_PATTERN))) { @@ -130,9 +132,9 @@ export const CrawlerDependency = { if (!currency) { throw 'currency parameter is required for network synchronization' } - strategy = new RemoteSynchronizer(currency, onHost, onPort, server, noShufflePeers === true, otherDAL) + strategy = new RemoteSynchronizer(currency, onHost, onPort, server, CommonConstants.SYNC_BLOCKS_CHUNK, noShufflePeers === true, otherDAL) } else { - strategy = new LocalPathSynchronizer(source, server) + strategy = new LocalPathSynchronizer(source, server, CommonConstants.SYNC_BLOCKS_CHUNK) } if (program.onlypeers === true) { return strategy.syncPeers(true) @@ -163,7 +165,7 @@ export const CrawlerDependency = { } logger.info('Send self peering ...'); const p = PeerDTO.fromJSONObject(peering) - const contact = new Contacter(p.getHostPreferDNS(), p.getPort(), {}) + const contact = new Contacter(p.getHostPreferDNS(), p.getPort() as number, {}) await contact.postPeer(PeerDTO.fromJSONObject(selfPeer)) logger.info('Sent.'); await server.disconnect(); @@ -191,7 +193,7 @@ export const CrawlerDependency = { const fromPort = peer.getPort(); logger.info('Looking at %s:%s...', fromHost, fromPort); try { - const node = new Contacter(fromHost, fromPort, { timeout: 10000 }); + const node = new Contacter(fromHost, fromPort as number, { timeout: 10000 }); const requirements = await node.getRequirements(search); await req2fwd(requirements, toHost, toPort, logger) } catch (e) { @@ -344,7 +346,7 @@ export const CrawlerDependency = { const fromPort = peer.getPort(); logger.info('Looking at %s:%s...', fromHost, fromPort); try { - const node = new Contacter(fromHost, fromPort, { timeout: 10000 }); + const node = new Contacter(fromHost, fromPort as number, { timeout: 10000 }); const requirements = await node.getRequirementsPending(program.minsig ||Â 5); await req2fwd(requirements, toHost, toPort, logger) } catch (e) { diff --git a/app/modules/crawler/lib/connect.ts b/app/modules/crawler/lib/connect.ts index f87741c5e8c18243d62d17bcd41832c032da1f5e..be7b6a1fd28ac351dbbea2a4196c72b50d26babc 100644 --- a/app/modules/crawler/lib/connect.ts +++ b/app/modules/crawler/lib/connect.ts @@ -18,7 +18,7 @@ import {PeerDTO} from "../../../lib/dto/PeerDTO"; const DEFAULT_HOST = 'localhost'; export const connect = (peer:PeerDTO, timeout:number|null = null) => { - return Promise.resolve(new Contacter(peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST, peer.getPort(), { + return Promise.resolve(new Contacter(peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST, peer.getPort() as number, { timeout: timeout || CrawlerConstants.DEFAULT_TIMEOUT })) } diff --git a/app/modules/crawler/lib/contacter.ts b/app/modules/crawler/lib/contacter.ts index 0e493502f6fc2ceabf760d0de03422ec10cc1640..75f37f0e98ce38da4c152ec7033da4dbcecf2614 100644 --- a/app/modules/crawler/lib/contacter.ts +++ b/app/modules/crawler/lib/contacter.ts @@ -22,7 +22,7 @@ export class Contacter { options:{ timeout:number } fullyQualifiedHost:string - constructor(private host:string, private port:number, opts:any = {}) { + constructor(public readonly host:string, public readonly port:number, opts:any = {}) { this.options = { timeout: (opts && opts.timeout) || CrawlerConstants.DEFAULT_TIMEOUT } @@ -67,7 +67,7 @@ export class Contacter { } getPeersArray() { - return this.get('/network/peering/peers', dtos.Peers) + return this.get('/network/peers', dtos.Peers) } getSources(pubkey:string) { diff --git a/app/modules/crawler/lib/crawler.ts b/app/modules/crawler/lib/crawler.ts index 40e10ccc0da111939e3ed0aa5dd8ea3f5b6945f2..9a187fa8740640b248fbfb72ff2c14a9e717b6d9 100644 --- a/app/modules/crawler/lib/crawler.ts +++ b/app/modules/crawler/lib/crawler.ts @@ -25,6 +25,7 @@ import {CrawlerConstants} from "./constants" import {pullSandboxToLocalServer} from "./sandbox" import {cleanLongDownPeers} from "./garbager" import {Underscore} from "../../../lib/common-libs/underscore" +import {BMARemoteContacter} from "./sync/BMARemoteContacter" const async = require('async'); @@ -208,7 +209,8 @@ export class SandboxCrawler implements DuniterService { let peersToTest = randoms.slice().map((p) => PeerDTO.fromJSONObject(p)); for (const peer of peersToTest) { const fromHost = await connect(peer) - await pullSandboxToLocalServer(server.conf.currency, fromHost, server, this.logger) + const api = new BMARemoteContacter(fromHost) + await pullSandboxToLocalServer(server.conf.currency, api, server, this.logger) } this.logger && this.logger.info('Sandbox pulling done.'); } diff --git a/app/modules/crawler/lib/pulling.ts b/app/modules/crawler/lib/pulling.ts index 7c785d280b2c2ac671c3990c16168d52c892d3b9..52af56791d78688512f02fecf0547f9ef1686b00 100644 --- a/app/modules/crawler/lib/pulling.ts +++ b/app/modules/crawler/lib/pulling.ts @@ -137,6 +137,7 @@ export abstract class AbstractDAO extends PullingDao { const applyCoroutine = async (peer:PeerDTO, blocks:BlockDTO[]) => { if (blocks.length > 0) { let isFork = localCurrent + // && localCurrent.number !== -1 && !(blocks[0].previousHash == localCurrent.hash && blocks[0].number == localCurrent.number + 1); if (!isFork) { diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index 3bf8cc58d9d40700ae55402dadb2926fa7a2a062..ea2538e5b1bb13bbee87858d3298c80fb7595073 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -23,7 +23,6 @@ import {DBBlock} from "../../../lib/db/DBBlock" import {BlockchainService} from "../../../service/BlockchainService" import {ConfDTO} from "../../../lib/dto/ConfDTO" import {PeeringService} from "../../../service/PeeringService" -import {CommonConstants} from "../../../lib/common-libs/constants" import {Underscore} from "../../../lib/common-libs/underscore" import {cliprogram} from "../../../lib/common-libs/programOptions" import {EventWatcher, LoggerWatcher, MultimeterWatcher, Watcher} from "./sync/Watcher" @@ -241,7 +240,7 @@ export class Synchroniser extends stream.Duplex { async downloadBlocks(thePeer: PeerDTO, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> { // Note: we don't care about the particular peer asked by the method. We use the network instead. const numberOffseted = fromNumber - (localNumber + 1); - const targetChunk = Math.floor(numberOffseted / CommonConstants.CONST_BLOCKS_CHUNK); + const targetChunk = Math.floor(numberOffseted / syncStrategy.chunkSize); // Return the download promise! Simple. return (await downloader.getChunk(targetChunk))() } diff --git a/app/modules/crawler/lib/sync/AbstractSynchronizer.ts b/app/modules/crawler/lib/sync/AbstractSynchronizer.ts index 4b9215f1307b4f317cc5526a2ade3f2b05a5e04c..803088c44b03101c2178b43b2ca6927689124d41 100644 --- a/app/modules/crawler/lib/sync/AbstractSynchronizer.ts +++ b/app/modules/crawler/lib/sync/AbstractSynchronizer.ts @@ -21,7 +21,7 @@ import * as path from 'path' export abstract class AbstractSynchronizer { - constructor() { + constructor(public readonly chunkSize: number) { } abstract init(): Promise<void> @@ -43,6 +43,6 @@ export abstract class AbstractSynchronizer { } public getChunkName(i: number) { - return CommonConstants.CHUNK_PREFIX + i + "-" + CommonConstants.CONST_BLOCKS_CHUNK + ".json" + return CommonConstants.CHUNK_PREFIX + i + "-" + this.chunkSize + ".json" } } diff --git a/app/modules/crawler/lib/sync/BMARemoteContacter.ts b/app/modules/crawler/lib/sync/BMARemoteContacter.ts index 0726cec0b842ae8bf474b3bf85617d70ca3cbde2..1a3f5be406d1605215fef387fcb33d5e1d546a7e 100644 --- a/app/modules/crawler/lib/sync/BMARemoteContacter.ts +++ b/app/modules/crawler/lib/sync/BMARemoteContacter.ts @@ -14,17 +14,8 @@ import {NewLogger} from "../../../../lib/logger" import {IRemoteContacter} from "./IRemoteContacter"; import {Contacter} from "../contacter"; -import {HttpMerkleOfPeers, HttpRequirements} from "../../../bma/lib/dtos"; +import {HttpRequirements} from "../../../bma/lib/dtos"; import {JSONDBPeer} from "../../../../lib/db/DBPeer"; -import {FileDAL} from "../../../../lib/dal/fileDAL"; -import {Watcher} from "./Watcher"; -import {cliprogram} from "../../../../lib/common-libs/programOptions"; -import {connect} from "../connect"; -import {RemoteSynchronizer} from "./RemoteSynchronizer"; -import {PeerDTO} from "../../../../lib/dto/PeerDTO"; -import {CrawlerConstants} from "../constants"; -import {dos2unix} from "../../../../lib/common-libs/dos2unix"; -import {PeeringService} from "../../../../service/PeeringService"; import {BlockDTO} from "../../../../lib/dto/BlockDTO"; const logger = NewLogger() @@ -42,6 +33,10 @@ export class BMARemoteContacter implements IRemoteContacter { return this.contacter.getCurrent() } + getBlocks(count: number, from: number): Promise<BlockDTO[]> { + return this.contacter.getBlocks(count, from) + } + async getPeers(): Promise<(JSONDBPeer|null)[]> { return (await this.contacter.getPeersArray()).peers } @@ -53,4 +48,8 @@ export class BMARemoteContacter implements IRemoteContacter { getName(): string { return "BMA remote '" + this.contacter.fullyQualifiedHost + "'" } + + get hostName() { + return this.contacter.host + } } diff --git a/app/modules/crawler/lib/sync/ChunkGetter.ts b/app/modules/crawler/lib/sync/ChunkGetter.ts index c6a41f8735baedaf490bd5840e3d23f128930b91..2709e022b191288fc14e1de51681531a3f18b4a3 100644 --- a/app/modules/crawler/lib/sync/ChunkGetter.ts +++ b/app/modules/crawler/lib/sync/ChunkGetter.ts @@ -3,7 +3,6 @@ import {BlockDTO} from "../../../../lib/dto/BlockDTO" import {CrawlerConstants} from "../constants" import {hashf} from "../../../../lib/common" import {getBlockInnerHashAndNonceWithSignature, getBlockInnerPart} from "../../../../lib/common-libs/rawer" -import {CommonConstants} from "../../../../lib/common-libs/constants" import {NewLogger} from "../../../../lib/logger" import {ISyncDownloader} from "./ISyncDownloader" import {DBBlock} from "../../../../lib/db/DBBlock" @@ -65,7 +64,7 @@ export class ChunkGetter { ) { this.writeDAL = dal const nbBlocksToDownload = Math.max(0, to - localNumber) - this.numberOfChunksToDownload = Math.ceil(nbBlocksToDownload / CommonConstants.CONST_BLOCKS_CHUNK) + this.numberOfChunksToDownload = Math.ceil(nbBlocksToDownload / syncStrategy.chunkSize) this.p2PDownloader = syncStrategy.p2pDownloader() this.fsDownloader = syncStrategy.fsDownloader() @@ -154,7 +153,7 @@ export class ChunkGetter { // We need to wait for upper chunk to be completed to be able to check blocks' correct chaining promiseOfUpperChunk = await this.resultsData[i + 1] } - const chainsWell = await chainsCorrectly(chunk, promiseOfUpperChunk, this.to, this.toHash) + const chainsWell = await chainsCorrectly(chunk, promiseOfUpperChunk, this.to, this.toHash, this.syncStrategy.chunkSize) if (!chainsWell) { if (handler.downloader === this.p2PDownloader) { if (chunk.length === 0) { @@ -171,7 +170,7 @@ export class ChunkGetter { || !(await this.writeDAL.confDAL.coreFS.exists(fileName)) if (doWrite) { // Store the file to avoid re-downloading - if (this.localNumber <= 0 && chunk.length === CommonConstants.CONST_BLOCKS_CHUNK) { + if (this.localNumber <= 0 && chunk.length === this.syncStrategy.chunkSize) { await this.writeDAL.confDAL.coreFS.makeTree(this.syncStrategy.getCurrency()) const content = { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) } await this.writeDAL.confDAL.coreFS.writeJSON(fileName, content) @@ -245,7 +244,7 @@ export class ChunkGetter { } } -export async function chainsCorrectly(blocks:BlockDTO[], readNextChunk: PromiseOfBlocksReading, topNumber: number, topHash: string) { +export async function chainsCorrectly(blocks:BlockDTO[], readNextChunk: PromiseOfBlocksReading, topNumber: number, topHash: string, chunkSize: number) { if (!blocks.length) { return false @@ -281,7 +280,7 @@ export async function chainsCorrectly(blocks:BlockDTO[], readNextChunk: PromiseO } const lastBlockOfChunk = blocks[blocks.length - 1]; - if ((lastBlockOfChunk.number === topNumber || blocks.length < CommonConstants.CONST_BLOCKS_CHUNK) && lastBlockOfChunk.hash != topHash) { + if ((lastBlockOfChunk.number === topNumber || blocks.length < chunkSize) && lastBlockOfChunk.hash != topHash) { // Top chunk logger.error('Top block is not on the right chain') return false diff --git a/app/modules/crawler/lib/sync/IRemoteContacter.ts b/app/modules/crawler/lib/sync/IRemoteContacter.ts index fc3b8c3bc22971cf389e2c9f71043f533265c618..42bbc491018fff7edca81cadcb286f4febfd4f50 100644 --- a/app/modules/crawler/lib/sync/IRemoteContacter.ts +++ b/app/modules/crawler/lib/sync/IRemoteContacter.ts @@ -25,5 +25,9 @@ export interface IRemoteContacter { getBlock(number: number): Promise<BlockDTO|null> + getBlocks(count: number, from: number): Promise<BlockDTO[]> + getRequirementsPending(number: number): Promise<HttpRequirements> + + hostName: string } diff --git a/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts b/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts index db76e498e80935e3c76edc0f5c235feb2413f67d..e3a48e841435a2ebe27d792075b18a8e49be3e9a 100644 --- a/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts +++ b/app/modules/crawler/lib/sync/LocalPathSynchronizer.ts @@ -34,8 +34,9 @@ export class LocalPathSynchronizer extends AbstractSynchronizer { constructor( private path: string, private server:Server, + chunkSize: number, ) { - super() + super(chunkSize) const fs = RealFS() this.ls = fs.fsList(path) // We read from the real file system here, directly. @@ -98,8 +99,8 @@ export class LocalPathSynchronizer extends AbstractSynchronizer { } async getBlock(number: number): Promise<BlockDTO|null> { - const chunkNumber = parseInt(String(number / CommonConstants.CONST_BLOCKS_CHUNK)) - const position = number % CommonConstants.CONST_BLOCKS_CHUNK + const chunkNumber = parseInt(String(number / this.chunkSize)) + const position = number % this.chunkSize const chunk = await this.theFsDownloader.getChunk(chunkNumber) return chunk[position] } diff --git a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts index 474973635c43248f72bdf8b3569f286d14250c4c..3dacd0368ebe690f23b0a05d2150bf356a15e7c3 100644 --- a/app/modules/crawler/lib/sync/P2PSyncDownloader.ts +++ b/app/modules/crawler/lib/sync/P2PSyncDownloader.ts @@ -1,12 +1,15 @@ import {JSONDBPeer} from "../../../../lib/db/DBPeer" import {PeerDTO} from "../../../../lib/dto/PeerDTO" -import {connect} from "../connect" import {Underscore} from "../../../../lib/common-libs/underscore" import {BlockDTO} from "../../../../lib/dto/BlockDTO" import {Watcher} from "./Watcher" -import {CommonConstants} from "../../../../lib/common-libs/constants" import {ISyncDownloader} from "./ISyncDownloader" import {cliprogram} from "../../../../lib/common-libs/programOptions" +import {RemoteSynchronizer} from "./RemoteSynchronizer"; +import {Keypair} from "../../../../lib/dto/ConfDTO"; +import {IRemoteContacter} from "./IRemoteContacter"; +import {Querable} from "../../../../lib/common-libs/querable"; +import {cat} from "shelljs"; const makeQuerablePromise = require('querablep'); @@ -21,23 +24,26 @@ export class P2PSyncDownloader implements ISyncDownloader { private numberOfChunksToDownload:number private processing:any private handler:any - private nodes:any = {} + private nodes: Querable<ProfiledNode>[] = [] private nbDownloadsTried = 0 private nbDownloading = 0 private lastAvgDelay:number private downloads: { [chunk: number]: any } = {} constructor( + private currency: string, + private keypair: Keypair, private localNumber:number, private to:number, private peers:JSONDBPeer[], private watcher:Watcher, private logger:any, + private chunkSize: number, ) { this.TOO_LONG_TIME_DOWNLOAD = "No answer after " + this.MAX_DELAY_PER_DOWNLOAD + "ms, will retry download later."; this.nbBlocksToDownload = Math.max(0, to - localNumber); - this.numberOfChunksToDownload = Math.ceil(this.nbBlocksToDownload / CommonConstants.CONST_BLOCKS_CHUNK); + this.numberOfChunksToDownload = Math.ceil(this.nbBlocksToDownload / this.chunkSize); this.processing = Array.from({ length: this.numberOfChunksToDownload }).map(() => false); this.handler = Array.from({ length: this.numberOfChunksToDownload }).map(() => null); @@ -51,24 +57,44 @@ export class P2PSyncDownloader implements ISyncDownloader { * this method would not return it. */ private async getP2Pcandidates(): Promise<any[]> { - let promises = this.peers.reduce((chosens:any, other:any, index:number) => { + let promises = this.peers.reduce((chosens:Querable<ProfiledNode>[], thePeer, index:number) => { if (!this.nodes[index]) { // Create the node - let p = PeerDTO.fromJSONObject(this.peers[index]); + let p = PeerDTO.fromJSONObject(thePeer) this.nodes[index] = makeQuerablePromise((async () => { - // We wait for the download process to be triggered - // await downloadStarter; - // if (nodes[index - 1]) { - // try { await nodes[index - 1]; } catch (e) {} - // } - const node:any = await connect(p) - // We initialize nodes with the near worth possible notation - node.tta = 1; - node.nbSuccess = 0; - if (node.host.match(/^(localhost|192|127)/)) { + const bmaAPI = p.getBMA() + const ws2pAPI = p.getFirstNonTorWS2P() + const apis: { host: string, port: number, path?: string }[] = [] + const bmaHost = bmaAPI.dns || bmaAPI.ipv4 || bmaAPI.ipv6 + if (bmaAPI.port && bmaHost) { + apis.push({ + port: bmaAPI.port, + host: bmaHost + }) + } + if (ws2pAPI) { + apis.push(ws2pAPI) + } + let syncApi: any = null + try { + syncApi = await RemoteSynchronizer.getSyncAPI(this.currency, apis, this.keypair) + } catch (e) { + + } + const node: ProfiledNode = { + api: syncApi && syncApi.api, + connected: !!syncApi, + tta: 1, + ttas: [], + nbSuccess: 1, + excluded: false, + downloading: false, + hostName: syncApi && syncApi.api.hostName || '', + } + if (node.hostName.match(/^(localhost|192|127)/)) { node.tta = this.MAX_DELAY_PER_DOWNLOAD } - return node; + return node })()) chosens.push(this.nodes[index]); } else { @@ -77,8 +103,9 @@ export class P2PSyncDownloader implements ISyncDownloader { // Continue return chosens; }, []); - let candidates:any[] = await Promise.all(promises) - candidates.forEach((c:any) => { + const eventuals:ProfiledNode[] = await Promise.all(promises) + const candidates: ProfiledNode[] = eventuals.filter(c => c.connected) as ProfiledNode[] + candidates.forEach((c) => { c.tta = c.tta || 0; // By default we say a node is super slow to answer c.ttas = c.ttas || []; // Memorize the answer delays }); @@ -86,11 +113,11 @@ export class P2PSyncDownloader implements ISyncDownloader { throw this.NO_NODES_AVAILABLE; } // We remove the nodes impossible to reach (timeout) - let withGoodDelays = Underscore.filter(candidates, (c:any) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && !c.downloading); + let withGoodDelays = Underscore.filter(candidates, (c) => c.tta <= this.MAX_DELAY_PER_DOWNLOAD && !c.excluded && !c.downloading); if (withGoodDelays.length === 0) { await new Promise(res => setTimeout(res, this.WAIT_DELAY_WHEN_MAX_DOWNLOAD_IS_REACHED)) // We wait a bit before continuing the downloads // We reinitialize the nodes - this.nodes = {}; + this.nodes = [] // And try it all again return this.getP2Pcandidates(); } @@ -117,27 +144,27 @@ export class P2PSyncDownloader implements ISyncDownloader { } let candidates = await this.getP2Pcandidates(); // Book the nodes - return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:any) => { + return await this.raceOrCancelIfTimeout(this.MAX_DELAY_PER_DOWNLOAD, candidates.map(async (node:ProfiledNode) => { try { const start = Date.now(); this.handler[chunkIndex] = node; node.downloading = true; this.nbDownloading++; - this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + [node.host, node.port].join(':')); - let blocks = await node.getBlocks(count, from); + this.watcher.writeStatus('Getting chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); + let blocks = await node.api.getBlocks(count, from); node.ttas.push(Date.now() - start); // Only keep a flow of 5 ttas for the node if (node.ttas.length > 5) node.ttas.shift(); // Average time to answer node.tta = Math.round(node.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / node.ttas.length); - this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + [node.host, node.port].join(':')); + this.watcher.writeStatus('GOT chunck #' + chunkIndex + '/' + (this.numberOfChunksToDownload - 1) + ' from ' + from + ' to ' + (from + count - 1) + ' on peer ' + node.hostName); if (this.PARALLEL_PER_CHUNK === 1) { // Only works if we have 1 concurrent peer per chunk this.downloads[chunkIndex] = node } node.nbSuccess++; - const peers = await Promise.all(Underscore.values(this.nodes)) + const peers = await Promise.all(this.nodes) const downloading = Underscore.filter(peers, (p:any) => p.downloading && p.ttas.length); this.lastAvgDelay = downloading.reduce((sum:number, c:any) => { const tta = Math.round(c.ttas.reduce((sum:number, tta:number) => sum + tta, 0) / c.ttas.length) @@ -167,15 +194,16 @@ export class P2PSyncDownloader implements ISyncDownloader { */ private async downloadChunk(index:number): Promise<BlockDTO[]> { // The algorithm to download a chunk - const from = this.localNumber + 1 + index * CommonConstants.CONST_BLOCKS_CHUNK; - let count = CommonConstants.CONST_BLOCKS_CHUNK; + const from = this.localNumber + 1 + index * this.chunkSize; + let count = this.chunkSize; if (index == this.numberOfChunksToDownload - 1) { - count = this.nbBlocksToDownload % CommonConstants.CONST_BLOCKS_CHUNK || CommonConstants.CONST_BLOCKS_CHUNK; + count = this.nbBlocksToDownload % this.chunkSize || this.chunkSize; } try { return await this.p2pDownload(from, count, index) as BlockDTO[] } catch (e) { this.logger.error(e); + await new Promise(res => setTimeout(res, 1000)) // Wait 1s before retrying return this.downloadChunk(index); } } @@ -209,3 +237,14 @@ export class P2PSyncDownloader implements ISyncDownloader { return this.downloadChunk(index) } } + +interface ProfiledNode { + api: IRemoteContacter + tta: number + ttas: number[] + nbSuccess: number + hostName: string + connected: boolean + excluded: boolean + downloading: boolean +} diff --git a/app/modules/crawler/lib/sync/RemoteSynchronizer.ts b/app/modules/crawler/lib/sync/RemoteSynchronizer.ts index 08ef13a4244c171115a964a208744f5d16639375..3628fd6aa4d9e3a0d0f22a11fbfe0a8ca2aedc18 100644 --- a/app/modules/crawler/lib/sync/RemoteSynchronizer.ts +++ b/app/modules/crawler/lib/sync/RemoteSynchronizer.ts @@ -17,10 +17,8 @@ import {PeerDTO} from "../../../../lib/dto/PeerDTO" import {connect} from "../connect" import {NewLogger} from "../../../../lib/logger" import {CrawlerConstants} from "../constants" -import {HttpMerkleOfPeers} from "../../../bma/lib/dtos" import {cliprogram} from "../../../../lib/common-libs/programOptions" import {Watcher} from "./Watcher" -import {dos2unix} from "../../../../lib/common-libs/dos2unix" import {PeeringService} from "../../../../service/PeeringService" import {Server} from "../../../../../server" import {DBPeer, JSONDBPeer} from "../../../../lib/db/DBPeer" @@ -35,11 +33,10 @@ import {IRemoteContacter} from "./IRemoteContacter"; import {BMARemoteContacter} from "./BMARemoteContacter"; import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "../../../ws2p/lib/WS2PConnection"; import {WS2PRequester} from "../../../ws2p/lib/WS2PRequester"; -import {WS2PServerMessageHandler} from "../../../ws2p/lib/interface/WS2PServerMessageHandler"; import {WS2PMessageHandler} from "../../../ws2p/lib/impl/WS2PMessageHandler"; import {WS2PResponse} from "../../../ws2p/lib/impl/WS2PResponse"; import {DataErrors} from "../../../../lib/common-libs/errors"; -import {Key, KeyGen} from "../../../../lib/common-libs/crypto/keyring"; +import {KeyGen} from "../../../../lib/common-libs/crypto/keyring"; import {WS2PRemoteContacter} from "./WS2PRemoteContacter"; import {Keypair} from "../../../../lib/dto/ConfDTO"; import {cat} from "shelljs"; @@ -65,10 +62,11 @@ export class RemoteSynchronizer extends AbstractSynchronizer { private host: string, private port: number, private server:Server, + chunkSize: number, private noShufflePeers = false, private otherDAL?:FileDAL, ) { - super() + super(chunkSize) } get dal(): FileDAL { @@ -100,7 +98,7 @@ export class RemoteSynchronizer extends AbstractSynchronizer { } async init(): Promise<void> { - const syncApi = await RemoteSynchronizer.getSyncAPI(this.currency, this.host, this.port, this.server.conf.pair) + const syncApi = await RemoteSynchronizer.getSyncAPI(this.currency, [{ host: this.host, port: this.port }], this.server.conf.pair) if (!syncApi.api) { throw Error(DataErrors[DataErrors.CANNOT_CONNECT_TO_REMOTE_FOR_SYNC]) } @@ -108,41 +106,63 @@ export class RemoteSynchronizer extends AbstractSynchronizer { this.peer = PeerDTO.fromJSONObject(syncApi.peering) logger.info("Try with %s %s", this.peer.getURL(), this.peer.pubkey.substr(0, 6)) // We save this peer as a trusted peer for future contact - await this.server.PeeringService.submitP(DBPeer.fromPeerDTO(this.peer), false, false, true) + try { + await this.server.PeeringService.submitP(DBPeer.fromPeerDTO(this.peer), false, false, true) + } catch (e) { + logger.debug(e) + } ;(this.node as any).pubkey = this.peer.pubkey } - private static async getSyncAPI(currency: string, host: string, port: number, keypair: Keypair) { + public static async getSyncAPI(currency: string, hosts: { host: string, port: number, path?: string }[], keypair: Keypair) { let api: IRemoteContacter|undefined let peering: any - logger.info('Connecting to ' + host + '...') - try { - const contacter = await connect(PeerDTO.fromJSONObject({ endpoints: [`BASIC_MERKLED_API ${host} ${port}`]}), RemoteSynchronizer.contacterOptions.timeout) - peering = await contacter.getPeer() - api = new BMARemoteContacter(contacter) - } catch (e) { - logger.warn(`Node does not support BMA, trying WS2P...`) - } + for (const access of hosts) { + const host = access.host + const port = access.port + const path = access.path + logger.info(`Connecting to address ${host} :${port}...`) + try { + const contacter = await connect(PeerDTO.fromJSONObject({ endpoints: [`BASIC_MERKLED_API ${host} ${port}${path && ' ' + path || ''}`]}), 3000) + peering = await contacter.getPeer() + api = new BMARemoteContacter(contacter) + } catch (e) { + logger.warn(`Node does not support BMA at address ${host} :${port}, trying WS2P...`) + } - // If BMA is unreachable, let's try WS2P - if (!api) { - const pair = KeyGen(keypair.pub, keypair.sec) - const connection = WS2PConnection.newConnectionToAddress(1, - `ws://${host}:${port}`, - new (class SyncMessageHandler implements WS2PMessageHandler { - async answerToRequest(json: any, c: WS2PConnection): Promise<WS2PResponse> { - throw Error(DataErrors[DataErrors.CANNOT_ARCHIVE_CHUNK_WRONG_SIZE]) + // If BMA is unreachable, let's try WS2P + if (!api) { + const pair = KeyGen(keypair.pub, keypair.sec) + const connection = WS2PConnection.newConnectionToAddress(1, + `ws://${host}:${port}${path && ' ' + path || ''}`, + new (class SyncMessageHandler implements WS2PMessageHandler { + async answerToRequest(json: any, c: WS2PConnection): Promise<WS2PResponse> { + throw Error(DataErrors[DataErrors.CANNOT_ARCHIVE_CHUNK_WRONG_SIZE]) + } + async handlePushMessage(json: any, c: WS2PConnection): Promise<void> { + logger.warn('Receiving push messages, which are not allowed during a SYNC.', json) + } + }), + new WS2PPubkeyLocalAuth(currency, pair, '00000000'), + new WS2PPubkeyRemoteAuth(currency, pair), + undefined, + { + connectionTimeout: 1500, + requestTimeout: 1500, } - async handlePushMessage(json: any, c: WS2PConnection): Promise<void> { - logger.warn('Receiving push messages, which are not allowed during a SYNC.', json) - } - }), - new WS2PPubkeyLocalAuth(currency, pair, '00000000'), - new WS2PPubkeyRemoteAuth(currency, pair) - ) - const requester = WS2PRequester.fromConnection(connection) - peering = await requester.getPeer() - api = new WS2PRemoteContacter(requester) + ) + try { + const requester = WS2PRequester.fromConnection(connection) + peering = await requester.getPeer() + api = new WS2PRemoteContacter(requester) + } catch (e) { + logger.warn(`Node does not support WS2P at address ${host} :${port} either.`) + } + } + // If we have a working API: stop! + if (api && peering) { + break; + } } if (!api) { throw Error(DataErrors[DataErrors.CANNOT_CONNECT_TO_REMOTE_FOR_SYNC]) @@ -166,7 +186,7 @@ export class RemoteSynchronizer extends AbstractSynchronizer { // Peers (just for P2P download) //======= let peers:(JSONDBPeer|null)[] = []; - if (!cliprogram.nopeers && (to - localNumber > 1000)) { // P2P download if more than 1000 blocs + if (!cliprogram.nopeers) { this.watcher.writeStatus('Peers...'); peers = await this.node.getPeers() } @@ -180,7 +200,7 @@ export class RemoteSynchronizer extends AbstractSynchronizer { p2pDownloader(): ISyncDownloader { if (!this.theP2pDownloader) { - this.theP2pDownloader = new P2PSyncDownloader(this.localNumber, this.to, this.shuffledPeers, this.watcher, logger) + this.theP2pDownloader = new P2PSyncDownloader(this.currency, this.server.conf.pair, this.localNumber, this.to, this.shuffledPeers, this.watcher, logger, this.chunkSize) } return this.theP2pDownloader } @@ -201,7 +221,7 @@ export class RemoteSynchronizer extends AbstractSynchronizer { } static async test(currency: string, host: string, port: number, keypair: Keypair): Promise<BlockDTO> { - const syncApi = await RemoteSynchronizer.getSyncAPI(currency, host, port, keypair) + const syncApi = await RemoteSynchronizer.getSyncAPI(currency, [{ host, port }], keypair) const current = await syncApi.api.getCurrent() if (!current) { throw Error(DataErrors[DataErrors.REMOTE_HAS_NO_CURRENT_BLOCK]) diff --git a/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts b/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts index 1a45c64f92c10c7d4b9476ecf7f282ceb6e7b3db..a5ea8af5fa17d15051db0d2ed6751234932d23fd 100644 --- a/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts +++ b/app/modules/crawler/lib/sync/WS2PRemoteContacter.ts @@ -13,7 +13,6 @@ import {NewLogger} from "../../../../lib/logger" import {IRemoteContacter} from "./IRemoteContacter"; -import {Contacter} from "../contacter"; import {WS2PRequester} from "../../../ws2p/lib/WS2PRequester"; import {DBPeer, JSONDBPeer} from "../../../../lib/db/DBPeer"; import {BlockDTO} from "../../../../lib/dto/BlockDTO"; @@ -39,6 +38,10 @@ export class WS2PRemoteContacter implements IRemoteContacter { return this.requester.getCurrent() } + getBlocks(count: number, from: number): Promise<BlockDTO[]> { + return this.requester.getBlocks(count, from) + } + async getPeers(): Promise<(JSONDBPeer | null)[]> { return (await this.requester.getPeers()).map(p => DBPeer.fromPeerDTO(PeerDTO.fromJSONObject(p))) } @@ -46,4 +49,8 @@ export class WS2PRemoteContacter implements IRemoteContacter { getName(): string { return "WS2P remote" } + + get hostName() { + return this.requester.hostName + } } diff --git a/app/modules/prover/index.ts b/app/modules/prover/index.ts index a33a84d4ecdc48dcdc06f08ea1dee15689b0475d..75b3fabf0fccf25ae0afe93d209d5d03ef770243 100644 --- a/app/modules/prover/index.ts +++ b/app/modules/prover/index.ts @@ -223,7 +223,7 @@ function proveAndSend(program:any, server:Server, block:any, issuer:any, difficu program.show && console.log(proven.getRawSigned()); logger.info('Posted block ' + proven.getRawSigned()); const p = PeerDTO.fromJSONObject(peer); - const contact = new Contacter(p.getHostPreferDNS(), p.getPort()); + const contact = new Contacter(p.getHostPreferDNS(), p.getPort() as number); await contact.postBlock(proven.getRawSigned()); next() } diff --git a/app/modules/ws2p/index.ts b/app/modules/ws2p/index.ts index 6b8e1a3f1d6f7d6593cad9c879249c69a586b1ec..832d08fdbfdcac4a75a6272fbacf7ac2ca7eeef7 100644 --- a/app/modules/ws2p/index.ts +++ b/app/modules/ws2p/index.ts @@ -11,7 +11,6 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -"use strict"; import {WS2PConstants} from './lib/constants'; import {ConfDTO, WS2PConfDTO} from "../../lib/dto/ConfDTO" import {Server} from "../../../server" @@ -19,9 +18,10 @@ import * as stream from 'stream'; import {WS2PCluster} from "./lib/WS2PCluster" import {WS2PUpnp} from "./lib/ws2p-upnp" import {CommonConstants} from "../../lib/common-libs/constants" +import {NewLogger} from "../../lib/logger" const constants = require("../../lib/constants"); - +const logger = NewLogger() const nuuid = require('node-uuid') export const WS2PDependency = { @@ -122,16 +122,22 @@ export const WS2PDependency = { } }, - service: { - input: (server:Server, conf:ConfDTO, logger:any) => { - const api = new WS2PAPI(server, conf, logger) + methods: { + bindWS2P: (server: Server) => { + const api = new WS2PAPI(server, server.conf) server.ws2pCluster = api.getCluster() server.addEndpointsDefinitions(async () => api.getEndpoint()) - server.addWrongEndpointFilter((endpoints:string[]) => getWrongEndpoints(endpoints, conf)) + server.addWrongEndpointFilter((endpoints:string[]) => getWrongEndpoints(endpoints, server.conf)) return api } }, + service: { + input: (server:Server) => { + return WS2PDependency.duniter.methods.bindWS2P(server) + } + }, + cli: [{ name: 'ws2p [list-prefered|list-privileged|list-nodes|show-conf]', desc: 'WS2P operations for configuration and diagnosis tasks.', @@ -190,8 +196,7 @@ export class WS2PAPI extends stream.Transform { constructor( private server:Server, - private conf:ConfDTO, - private logger:any) { + private conf:ConfDTO) { super({ objectMode: true }) this.cluster = WS2PCluster.plugOn(server) } @@ -226,7 +231,7 @@ export class WS2PAPI extends stream.Transform { this.upnpAPI.stopRegular(); } try { - this.upnpAPI = new WS2PUpnp(this.logger, this.conf) + this.upnpAPI = new WS2PUpnp(logger, this.conf) const { host, port, available } = await this.upnpAPI.startRegular() if (available) { // Defaults UPnP to true if not defined and available @@ -235,7 +240,7 @@ export class WS2PAPI extends stream.Transform { await this.server.PeeringService.generateSelfPeer(this.server.conf) } } catch (e) { - this.logger.warn(e); + logger.warn(e); } } } diff --git a/app/modules/ws2p/lib/WS2PCluster.ts b/app/modules/ws2p/lib/WS2PCluster.ts index 66d6dea38063e9ad1a02f873f0cccb6a67dbea0f..372bac013055de0eaa586b0a06bac868c5cf0fe7 100644 --- a/app/modules/ws2p/lib/WS2PCluster.ts +++ b/app/modules/ws2p/lib/WS2PCluster.ts @@ -32,9 +32,11 @@ import {Package} from "../../../lib/common/package"; import {ProverConstants} from "../../prover/lib/constants"; import {ProxiesConf} from '../../../lib/proxy'; import {Underscore} from "../../../lib/common-libs/underscore" +import {NewLogger} from "../../../lib/logger"; const es = require('event-stream') const nuuid = require('node-uuid') +const logger = NewLogger() export interface WS2PHead { message:string @@ -84,6 +86,9 @@ export class WS2PCluster { // A cache to remember the banned keys private banned:{ [k:string]: string } = {} + // A cache to remember the keys OK for reconnect + private ok4reconnect:{ [k:string]: string } = {} + // A cache to remember the banned keys for synchronization private banned4Sync:{ [k:string]: string } = {} @@ -772,13 +777,13 @@ export class WS2PCluster { } // Already connected if (syncConnectedPubkeys.indexOf(pub) !== -1) { - return false + return !!this.ok4reconnect[pub] } const limit = (this.server.conf.ws2p && this.server.conf.ws2p.syncLimit) || WS2PConstants.WS2P_SYNC_LIMIT const ok = syncConnectedPubkeys.length < limit if (ok) { // The connection will OK: we prepare the ban right now to give room for future users - this.banSyncConnection(pub) + this.rememberAndPrepareBanSyncConnection(pub) } return ok } @@ -982,11 +987,24 @@ export class WS2PCluster { } } - banSyncConnection(pub: string) { - this.server.logger.warn('Banning SYNC connection of %s for %ss (for room)', pub.slice(0, 8), WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS) - this.banned4Sync[pub] = 'sync' - setTimeout(() => { - delete this.banned4Sync[pub] - }, 1000 * WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS) + rememberAndPrepareBanSyncConnection(pub: string) { + if (!this.ok4reconnect[pub]) { + + // 1. Remember that the key can reconnect within the next few minutes without issue + this.ok4reconnect[pub] = 'reconnect' + setTimeout(() => { + delete this.ok4reconnect[pub] + }, 1000 * WS2PConstants.SYNC_CONNECTION_DURATION_IN_SECONDS) + + // 2. Remember that the key will be banned between the reconnection period and the ban period + this.server.logger.warn('Prepare banning SYNC connection of %s for %ss (for room)', pub.slice(0, 8), WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS) + this.banned4Sync[pub] = 'sync' + setTimeout(() => { + delete this.banned4Sync[pub] + }, 1000 * WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS) + + // Anyway, the connection will be closed after the reconnection period (see WS2PServer), + // through the usage of constant SYNC_CONNECTION_DURATION_IN_SECONDS + } } } diff --git a/app/modules/ws2p/lib/WS2PConnection.ts b/app/modules/ws2p/lib/WS2PConnection.ts index 6c9e3687ea767ca0ad91d530dfa4f539e201fcf5..edee56a93ea7dba901411b57a7b9fcb3abea31ff 100644 --- a/app/modules/ws2p/lib/WS2PConnection.ts +++ b/app/modules/ws2p/lib/WS2PConnection.ts @@ -702,6 +702,12 @@ export class WS2PConnection { this.ws.terminate() } } + + get hostName(): string { + return (this.ws as any).url + .replace('ws://', '') + .replace('wss://', '') + } } class Logger { diff --git a/app/modules/ws2p/lib/WS2PDocpoolPuller.ts b/app/modules/ws2p/lib/WS2PDocpoolPuller.ts index a720d2455eacf9acd7e485bc1e00c4b7b9175d57..a782b9fec67895497070509af09f9bf7fae75ad4 100644 --- a/app/modules/ws2p/lib/WS2PDocpoolPuller.ts +++ b/app/modules/ws2p/lib/WS2PDocpoolPuller.ts @@ -34,6 +34,8 @@ export class WS2PDocpoolPuller { getPeers: async () => [], getCurrent: async () => null, getBlock: async () => null, + getBlocks: async () => [], + hostName: '' }, this.server, this.server.logger) } } diff --git a/app/modules/ws2p/lib/WS2PRequester.ts b/app/modules/ws2p/lib/WS2PRequester.ts index e043742eae4d1f2a82661e123992aec020eb9e70..afaa6783209c5680ac4461261f5083a2b8de9bdd 100644 --- a/app/modules/ws2p/lib/WS2PRequester.ts +++ b/app/modules/ws2p/lib/WS2PRequester.ts @@ -67,4 +67,8 @@ export class WS2PRequester { params: params }) } + + get hostName() { + return this.ws2pc.hostName + } } \ No newline at end of file diff --git a/app/service/PeeringService.ts b/app/service/PeeringService.ts index 6d9b9c6e5033749604a135a73e488402b26e3064..48ade7142029d042543c0f82ecb5a3ffa4687708 100755 --- a/app/service/PeeringService.ts +++ b/app/service/PeeringService.ts @@ -288,6 +288,10 @@ export class PeeringService { if (bmaAccess) { logger.info('BMA access:', bmaAccess) } + const ws2pAccess = PeerDTO.fromJSONObject(p2).getFirstNonTorWS2P() + if (ws2pAccess) { + logger.info(`WS2P access: ${ws2pAccess.host} :${ws2pAccess.port}${ws2pAccess.path && ' ' + ws2pAccess.path || ''}`) + } logger.debug('Generating server\'s peering entry based on block#%s...', p2.block.split('-')[0]); p2.signature = await this.server.sign(raw2); p2.pubkey = this.selfPubkey; diff --git a/test/integration/tools/toolbox.ts b/test/integration/tools/toolbox.ts index f6688e60dd7004535a26c6e790a55be673eee082..e4e7cf88d6e1461385ae3f2a1a57a5a795f6ac1d 100644 --- a/test/integration/tools/toolbox.ts +++ b/test/integration/tools/toolbox.ts @@ -57,6 +57,7 @@ import {expectAnswer, expectError, expectJSON} from "./http-expect" import {WebSocketServer} from "../../../app/lib/common-libs/websocket" import {CommonConstants} from "../../../app/lib/common-libs/constants" import {WS2PRequester} from "../../../app/modules/ws2p/lib/WS2PRequester" +import {WS2PDependency} from "../../../app/modules/ws2p/index" const assert = require('assert'); const rp = require('request-promise'); @@ -643,6 +644,10 @@ export class TestingServer { require('../../../app/modules/prover').ProverDependency.duniter.methods.hookServer(this.server); } + async disableBMA() { + await this.bma.closeConnections() + } + startBlockComputation() { if (!this.prover) { this.prover = require('../../../app/modules/prover').ProverDependency.duniter.methods.prover(this.server); @@ -666,8 +671,26 @@ export class TestingServer { } async enableWS2P(port: number = PORT++): Promise<TestWS2PAPI> { - const cluster = WS2PCluster.plugOn(this._server) - await (this._server.ws2pCluster as WS2PCluster).listen(HOST, port) + // Configure + this._server.conf.ws2p = { + host: HOST, + port, + remotehost: HOST, + remoteport: port, + upnp: false, + uuid: '00000000', + preferedOnly: false, + privilegedOnly: false, + privateAccess: true, + publicAccess: true, + } + // Plug cluster + const api = WS2PDependency.duniter.methods.bindWS2P(this._server) + // Bind cluster + await api.getCluster().listen(HOST, port) + // Recompute the peer + await this._server.recomputeSelfPeer() + // Then, create testing facilities to connect to the node const doConnection = (pair: Key, ws2pId: string, constructor: new ( currency:string, pair:Key, @@ -675,7 +698,7 @@ export class TestingServer { ) => WS2PPubkeyLocalAuth) => { const connection = WS2PConnection.newConnectionToAddress(1, `ws://${HOST}:${port}`, - new WS2PServerMessageHandler(this._server, cluster), + new WS2PServerMessageHandler(this._server, api.getCluster()), new constructor(this.conf.currency, pair, ws2pId), new WS2PPubkeyRemoteAuth(this.conf.currency, pair) ) diff --git a/test/integration/ws2p/ws2p_sync.ts b/test/integration/ws2p/ws2p_sync.ts index b6852290e1dea562a8040c5fc2797b3d0a0a9451..23dd3aa7002b4ac5d9a3f622ac5a1708db75b6e1 100644 --- a/test/integration/ws2p/ws2p_sync.ts +++ b/test/integration/ws2p/ws2p_sync.ts @@ -14,7 +14,6 @@ import {WS2PConstants} from "../../../app/modules/ws2p/lib/constants" import {assertEqual, assertNotNull, createCurrencyWith2Blocks, writeBasicTestWith2Users} from "../tools/test-framework" import {NewTestingServer, TestWS2PAPI} from "../tools/toolbox"; -import {assertThrows} from "../../unit-tools"; import {CrawlerDependency} from "../../../app/modules/crawler/index"; describe('WS2P sync', () => writeBasicTestWith2Users((test) => { @@ -28,6 +27,7 @@ describe('WS2P sync', () => writeBasicTestWith2Users((test) => { test('should be able to init with 2 blocks', async (s1, cat, tac) => { await createCurrencyWith2Blocks(s1, cat, tac) + await s1.disableBMA() }) test('we should be able to connect for SYNC', async (s1, cat, tac) => { @@ -38,9 +38,9 @@ describe('WS2P sync', () => writeBasicTestWith2Users((test) => { assertEqual(2, current.number) }) - test('we should NOT be able to reconnect for SYNC', async (s1, cat, tac) => { + test('we should be able to reconnect for SYNC', async (s1, cat, tac) => { const ws = ws2p.connectForSync(tac.keypair, '22222222') - await assertThrows(ws.getCurrent(), 'WS2P connection timeout') + await assertNotNull(ws.getCurrent()) }) test('we should be able to connect for SYNC with toc', async (s1, cat, tac, toc) => { @@ -54,7 +54,7 @@ describe('WS2P sync', () => writeBasicTestWith2Users((test) => { const s2 = NewTestingServer({ pair: cat.keypair }) await s2.initWithDAL() // We sync on s1 - await CrawlerDependency.duniter.methods.synchronize(s1.conf.currency, s2._server, ws2p.host, ws2p.port, 2, 250).syncPromise + await CrawlerDependency.duniter.methods.synchronize(s1.conf.currency, s2._server, ws2p.host, ws2p.port, 2, 2).syncPromise assertNotNull(await s2.dal.getCurrentBlockOrNull()) }) }))