Mise à jour effectuée, merci de nous signaler tout dysfonctionnement ! | Upgrade done, please let us know about any dysfunction!

Commit 05cb0ce5 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] synchronization through WS2P first step

parent c6123e46
export enum DataErrors {
WRONG_CURRENCY_DETECTED,
NO_PEERING_AVAILABLE_FOR_SYNC,
REMOTE_HAS_NO_CURRENT_BLOCK,
CANNOT_CONNECT_TO_REMOTE_FOR_SYNC,
WS2P_SYNC_PERIMETER_IS_LIMITED,
PEER_REJECTED,
TOO_OLD_PEER,
......
......@@ -13,7 +13,7 @@
export const OtherConstants = {
MUTE_LOGS_DURING_UNIT_TESTS: true,
MUTE_LOGS_DURING_UNIT_TESTS: false,
SQL_TRACES: false,
BC_EVENT: {
......
......@@ -48,8 +48,8 @@ export const CrawlerDependency = {
return crawler.sandboxPull(server)
},
synchronize: (server:Server, onHost:string, onPort:number, upTo:number, chunkLength:number) => {
const strategy = new RemoteSynchronizer(onHost, onPort, server)
synchronize: (currency: string, server:Server, onHost:string, onPort:number, upTo:number, chunkLength:number) => {
const strategy = new RemoteSynchronizer(currency, onHost, onPort, server)
const remote = new Synchroniser(server, strategy)
const syncPromise = (async () => {
await server.dal.disableChangesAPI()
......@@ -69,8 +69,8 @@ export const CrawlerDependency = {
* @param {number} onPort
* @returns {Promise<any>}
*/
testForSync: (server:Server, onHost:string, onPort:number) => {
return RemoteSynchronizer.test(onHost, onPort)
testForSync: (currency: string, server:Server, onHost:string, onPort:number) => {
return RemoteSynchronizer.test(currency, onHost, onPort, server.conf.pair)
}
},
......@@ -88,12 +88,13 @@ export const CrawlerDependency = {
],
cli: [{
name: 'sync [source] [to]',
name: 'sync [source] [to] [currency]',
desc: 'Synchronize blockchain from a remote Duniter node',
preventIfRunning: true,
onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any): Promise<any> => {
const source = params[0]
const to = params[1]
const source = params[0]
const to = params[1]
const currency = params[2]
const HOST_PATTERN = /^[^:/]+(:[0-9]{1,5})?$/
const FILE_PATTERN = /^(\/.+)$/
if (!source || !(source.match(HOST_PATTERN) || source.match(FILE_PATTERN))) {
......@@ -126,7 +127,10 @@ export const CrawlerDependency = {
const sp = source.split(':')
const onHost = sp[0]
const onPort = parseInt(sp[1] ? sp[1] : '443') // Defaults to 443
strategy = new RemoteSynchronizer(onHost, onPort, server, noShufflePeers === true, otherDAL)
if (!currency) {
throw 'currency parameter is required for network synchronization'
}
strategy = new RemoteSynchronizer(currency, onHost, onPort, server, noShufflePeers === true, otherDAL)
} else {
strategy = new LocalPathSynchronizer(source, server)
}
......
......@@ -13,10 +13,11 @@
import {CrawlerConstants} from "./constants"
import {Contacter} from "./contacter"
import {PeerDTO} from "../../../lib/dto/PeerDTO";
const DEFAULT_HOST = 'localhost';
export const connect = (peer:any, timeout:number|null = null) => {
export const connect = (peer:PeerDTO, timeout:number|null = null) => {
return Promise.resolve(new Contacter(peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST, peer.getPort(), {
timeout: timeout || CrawlerConstants.DEFAULT_TIMEOUT
}))
......
......@@ -65,6 +65,10 @@ export class Contacter {
getPeers(obj?:any) {
return this.get('/network/peering/peers', dtos.MerkleOfPeers, obj)
}
getPeersArray() {
return this.get('/network/peering/peers', dtos.Peers)
}
getSources(pubkey:string) {
return this.get('/tx/sources/', dtos.Sources, pubkey)
......
......@@ -16,6 +16,7 @@ import {Contacter} from "./contacter"
import {Server} from "../../../../server"
import {rawer} from "../../../lib/common-libs/index"
import {parsers} from "../../../lib/common-libs/parsers/index"
import {IRemoteContacter} from "./sync/IRemoteContacter";
export const pullSandbox = async (currency:string, fromHost:string, fromPort:number, toHost:string, toPort:number, logger:any) => {
const from = new Contacter(fromHost, fromPort);
......@@ -43,12 +44,12 @@ export const pullSandbox = async (currency:string, fromHost:string, fromPort:num
}
}
export const pullSandboxToLocalServer = async (currency:string, fromHost:any, toServer:Server, logger:any, watcher:any = null, nbCertsMin = 1, notify = true) => {
export const pullSandboxToLocalServer = async (currency:string, fromHost:IRemoteContacter, toServer:Server, logger:any, watcher:any = null, nbCertsMin = 1, notify = true) => {
let res
try {
res = await fromHost.getRequirementsPending(nbCertsMin || 1)
} catch (e) {
watcher && watcher.writeStatus('Sandbox pulling: could not fetch requirements on %s', [fromHost.host, fromHost.port].join(':'))
watcher && watcher.writeStatus('Sandbox pulling: could not fetch requirements on %s', fromHost.getName())
}
if (res) {
......
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {NewLogger} from "../../../../lib/logger"
import {IRemoteContacter} from "./IRemoteContacter";
import {Contacter} from "../contacter";
import {HttpMerkleOfPeers, HttpRequirements} from "../../../bma/lib/dtos";
import {JSONDBPeer} from "../../../../lib/db/DBPeer";
import {FileDAL} from "../../../../lib/dal/fileDAL";
import {Watcher} from "./Watcher";
import {cliprogram} from "../../../../lib/common-libs/programOptions";
import {connect} from "../connect";
import {RemoteSynchronizer} from "./RemoteSynchronizer";
import {PeerDTO} from "../../../../lib/dto/PeerDTO";
import {CrawlerConstants} from "../constants";
import {dos2unix} from "../../../../lib/common-libs/dos2unix";
import {PeeringService} from "../../../../service/PeeringService";
import {BlockDTO} from "../../../../lib/dto/BlockDTO";
const logger = NewLogger()
export class BMARemoteContacter implements IRemoteContacter {
constructor(protected contacter: Contacter) {
}
getBlock(number: number): Promise<BlockDTO | null> {
return this.contacter.getBlock(number)
}
getCurrent(): Promise<BlockDTO | null> {
return this.contacter.getCurrent()
}
async getPeers(): Promise<(JSONDBPeer|null)[]> {
return (await this.contacter.getPeersArray()).peers
}
getRequirementsPending(minsig: number): Promise<HttpRequirements> {
return this.contacter.getRequirementsPending(minsig)
}
getName(): string {
return "BMA remote '" + this.contacter.fullyQualifiedHost + "'"
}
}
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {JSONDBPeer} from "../../../../lib/db/DBPeer";
import {BlockDTO} from "../../../../lib/dto/BlockDTO";
import {HttpRequirements} from "../../../bma/lib/dtos";
export interface IRemoteContacter {
getName(): string
getPeers(): Promise<(JSONDBPeer|null)[]>
getCurrent(): Promise<BlockDTO|null>
getBlock(number: number): Promise<BlockDTO|null>
getRequirementsPending(number: number): Promise<HttpRequirements>
}
......@@ -14,7 +14,6 @@
import {ISyncDownloader} from "./ISyncDownloader"
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {PeerDTO} from "../../../../lib/dto/PeerDTO"
import {Contacter} from "../contacter"
import {connect} from "../connect"
import {NewLogger} from "../../../../lib/logger"
import {CrawlerConstants} from "../constants"
......@@ -32,25 +31,37 @@ import {FsSyncDownloader} from "./FsSyncDownloader"
import {AbstractSynchronizer} from "./AbstractSynchronizer"
import {pullSandboxToLocalServer} from "../sandbox"
import * as path from 'path'
import {IRemoteContacter} from "./IRemoteContacter";
import {BMARemoteContacter} from "./BMARemoteContacter";
import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "../../../ws2p/lib/WS2PConnection";
import {WS2PRequester} from "../../../ws2p/lib/WS2PRequester";
import {WS2PServerMessageHandler} from "../../../ws2p/lib/interface/WS2PServerMessageHandler";
import {WS2PMessageHandler} from "../../../ws2p/lib/impl/WS2PMessageHandler";
import {WS2PResponse} from "../../../ws2p/lib/impl/WS2PResponse";
import {DataErrors} from "../../../../lib/common-libs/errors";
import {Key, KeyGen} from "../../../../lib/common-libs/crypto/keyring";
import {WS2PRemoteContacter} from "./WS2PRemoteContacter";
import {Keypair} from "../../../../lib/dto/ConfDTO";
import {cat} from "shelljs";
const logger = NewLogger()
export class RemoteSynchronizer extends AbstractSynchronizer {
private node:Contacter
private node:IRemoteContacter
private peer:PeerDTO
private shuffledPeers: JSONDBPeer[]
private theP2pDownloader: ISyncDownloader
private theFsDownloader: ISyncDownloader
private to: number
private localNumber: number
private currency: string
private watcher: Watcher
private static contacterOptions = {
timeout: CrawlerConstants.SYNC_LONG_TIMEOUT
}
constructor(
private readonly currency: string,
private host: string,
private port: number,
private server:Server,
......@@ -89,54 +100,75 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
}
async init(): Promise<void> {
const peering = await Contacter.fetchPeer(this.host, this.port, RemoteSynchronizer.contacterOptions)
this.peer = PeerDTO.fromJSONObject(peering)
const syncApi = await RemoteSynchronizer.getSyncAPI(this.currency, this.host, this.port, this.server.conf.pair)
if (!syncApi.api) {
throw Error(DataErrors[DataErrors.CANNOT_CONNECT_TO_REMOTE_FOR_SYNC])
}
this.node = syncApi.api
this.peer = PeerDTO.fromJSONObject(syncApi.peering)
logger.info("Try with %s %s", this.peer.getURL(), this.peer.pubkey.substr(0, 6))
// We save this peer as a trusted peer for future contact
await this.server.PeeringService.submitP(DBPeer.fromPeerDTO(this.peer), false, false, true)
logger.info("Try with %s %s", this.peer.getURL(), this.peer.pubkey.substr(0, 6))
this.node = await connect(this.peer)
;(this.node as any).pubkey = this.peer.pubkey
this.watcher.writeStatus('Connecting to ' + this.host + '...')
}
async initWithKnownLocalAndToAndCurrency(to: number, localNumber: number, currency: string): Promise<void> {
private static async getSyncAPI(currency: string, host: string, port: number, keypair: Keypair) {
let api: IRemoteContacter|undefined
let peering: any
logger.info('Connecting to ' + host + '...')
try {
const contacter = await connect(PeerDTO.fromJSONObject({ endpoints: [`BASIC_MERKLED_API ${host} ${port}`]}), RemoteSynchronizer.contacterOptions.timeout)
peering = await contacter.getPeer()
api = new BMARemoteContacter(contacter)
} catch (e) {
logger.warn(`Node does not support BMA, trying WS2P...`)
}
// If BMA is unreachable, let's try WS2P
if (!api) {
const pair = KeyGen(keypair.pub, keypair.sec)
const connection = WS2PConnection.newConnectionToAddress(1,
`ws://${host}:${port}`,
new (class SyncMessageHandler implements WS2PMessageHandler {
async answerToRequest(json: any, c: WS2PConnection): Promise<WS2PResponse> {
throw Error(DataErrors[DataErrors.CANNOT_ARCHIVE_CHUNK_WRONG_SIZE])
}
async handlePushMessage(json: any, c: WS2PConnection): Promise<void> {
logger.warn('Receiving push messages, which are not allowed during a SYNC.', json)
}
}),
new WS2PPubkeyLocalAuth(currency, pair, '00000000'),
new WS2PPubkeyRemoteAuth(currency, pair)
)
const requester = WS2PRequester.fromConnection(connection)
peering = await requester.getPeer()
api = new WS2PRemoteContacter(requester)
}
if (!api) {
throw Error(DataErrors[DataErrors.CANNOT_CONNECT_TO_REMOTE_FOR_SYNC])
}
if (!peering) {
throw Error(DataErrors[DataErrors.NO_PEERING_AVAILABLE_FOR_SYNC])
}
if (peering.currency !== currency) {
throw Error(DataErrors[DataErrors.WRONG_CURRENCY_DETECTED])
}
return {
api,
peering
}
}
async initWithKnownLocalAndToAndCurrency(to: number, localNumber: number): Promise<void> {
this.to = to
this.localNumber = localNumber
this.currency = currency
//=======
// Peers (just for P2P download)
//=======
let peers:(JSONDBPeer|null)[] = [];
if (!cliprogram.nopeers && (to - localNumber > 1000)) { // P2P download if more than 1000 blocs
this.watcher.writeStatus('Peers...');
const merkle = await this.dal.merkleForPeers();
const getPeers:(params:any) => Promise<HttpMerkleOfPeers> = this.node.getPeers.bind(this.node);
const json2 = await getPeers({});
const rm = new NodesMerkle(json2);
if(rm.root() != merkle.root()){
const leavesToAdd:string[] = [];
const json = await getPeers({ leaves: true });
json.leaves.forEach((leaf:string) => {
if(merkle.leaves().indexOf(leaf) == -1){
leavesToAdd.push(leaf);
}
});
peers = await Promise.all(leavesToAdd.map(async (leaf) => {
try {
const json3 = await getPeers({ "leaf": leaf });
const jsonEntry = json3.leaf.value;
const endpoint = jsonEntry.endpoints[0];
this.watcher.writeStatus('Peer ' + endpoint);
return jsonEntry;
} catch (e) {
logger.warn("Could not get peer of leaf %s, continue...", leaf);
return null;
}
}))
}
else {
this.watcher.writeStatus('Peers already known');
}
peers = await this.node.getPeers()
}
if (!peers.length) {
......@@ -168,97 +200,21 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
return this.node.getBlock(number)
}
static async test(host: string, port: number): Promise<BlockDTO> {
const peering = await Contacter.fetchPeer(host, port, this.contacterOptions);
const node = await connect(PeerDTO.fromJSONObject(peering));
return node.getCurrent()
}
async syncPeers(fullSync: boolean, to?: number): Promise<void> {
if (!cliprogram.nopeers && fullSync) {
const peering = await Contacter.fetchPeer(this.host, this.port, RemoteSynchronizer.contacterOptions);
let peer = PeerDTO.fromJSONObject(peering);
logger.info("Try with %s %s", peer.getURL(), peer.pubkey.substr(0, 6));
let node:any = await connect(peer);
node.pubkey = peer.pubkey;
logger.info('Sync started.');
this.watcher.writeStatus('Peers...');
await this.syncPeer(node);
const merkle = await this.dal.merkleForPeers();
const getPeers:(params:any) => Promise<HttpMerkleOfPeers> = node.getPeers.bind(node);
const json2 = await getPeers({});
const rm = new NodesMerkle(json2);
if(rm.root() != merkle.root()){
const leavesToAdd:string[] = [];
const json = await getPeers({ leaves: true });
json.leaves.forEach((leaf:string) => {
if(merkle.leaves().indexOf(leaf) == -1){
leavesToAdd.push(leaf);
}
});
for (let i = 0; i < leavesToAdd.length; i++) {
try {
const leaf = leavesToAdd[i]
const json3 = await getPeers({ "leaf": leaf });
const jsonEntry = json3.leaf.value;
const sign = json3.leaf.value.signature;
const entry:any = {};
entry.version = jsonEntry.version
entry.currency = jsonEntry.currency
entry.pubkey = jsonEntry.pubkey
entry.endpoints = jsonEntry.endpoints
entry.block = jsonEntry.block
entry.signature = sign;
this.watcher.writeStatus('Peer ' + entry.pubkey);
this.watcher.peersPercent((i + 1) / leavesToAdd.length * 100)
await this.PeeringService.submitP(entry, false, to === undefined);
} catch (e) {
logger.warn(e && e.message || e)
}
}
this.watcher.peersPercent(100)
}
else {
this.watcher.writeStatus('Peers already known');
}
static async test(currency: string, host: string, port: number, keypair: Keypair): Promise<BlockDTO> {
const syncApi = await RemoteSynchronizer.getSyncAPI(currency, host, port, keypair)
const current = await syncApi.api.getCurrent()
if (!current) {
throw Error(DataErrors[DataErrors.REMOTE_HAS_NO_CURRENT_BLOCK])
}
return current
}
//============
// Peer
//============
private async syncPeer (node:any) {
// Global sync vars
const remotePeer = PeerDTO.fromJSONObject({});
const json = await node.getPeer();
remotePeer.version = json.version
remotePeer.currency = json.currency
remotePeer.pubkey = json.pub
remotePeer.endpoints = json.endpoints
remotePeer.blockstamp = json.block
remotePeer.signature = json.signature
const entry = remotePeer.getRawUnsigned();
const signature = dos2unix(remotePeer.signature);
// Parameters
if(!(entry && signature)){
throw 'Requires a peering entry + signature';
}
let remoteJsonPeer:any = json
remoteJsonPeer.pubkey = json.pubkey;
let signatureOK = this.PeeringService.checkPeerSignature(remoteJsonPeer);
if (!signatureOK) {
this.watcher.writeStatus('Wrong signature for peer #' + remoteJsonPeer.pubkey);
}
try {
await this.PeeringService.submitP(remoteJsonPeer);
} catch (err) {
if (err.indexOf !== undefined && err.indexOf(CrawlerConstants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.message) !== -1 && err != CrawlerConstants.ERROR.PEER.UNKNOWN_REFERENCE_BLOCK) {
throw err;
async syncPeers(fullSync: boolean, to?: number): Promise<void> {
const peers = await this.node.getPeers()
for (const p of peers) {
try {
await this.PeeringService.submitP(DBPeer.fromPeerDTO(PeerDTO.fromJSONObject(p)))
} catch (e) {
}
}
}
......@@ -268,29 +224,3 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
await pullSandboxToLocalServer(this.currency, this.node, this.server, this.server.logger, this.watcher, 1, false)
}
}
class NodesMerkle {
private depth:number
private nodesCount:number
private leavesCount:number
private merkleRoot:string
constructor(json:any) {
this.depth = json.depth
this.nodesCount = json.nodesCount
this.leavesCount = json.leavesCount
this.merkleRoot = json.root;
}
// var i = 0;
// this.levels = [];
// while(json && json.levels[i]){
// this.levels.push(json.levels[i]);
// i++;
// }
root() {
return this.merkleRoot
}
}
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {NewLogger} from "../../../../lib/logger"
import {IRemoteContacter} from "./IRemoteContacter";
import {Contacter} from "../contacter";
import {WS2PRequester} from "../../../ws2p/lib/WS2PRequester";
import {DBPeer, JSONDBPeer} from "../../../../lib/db/DBPeer";
import {BlockDTO} from "../../../../lib/dto/BlockDTO";
import {PeerDTO} from "../../../../lib/dto/PeerDTO";
import {HttpRequirements} from "../../../bma/lib/dtos";
const logger = NewLogger()
export class WS2PRemoteContacter implements IRemoteContacter {
getRequirementsPending(min: number): Promise<HttpRequirements> {
return this.requester.getRequirementsPending(min)
}
constructor(protected requester: WS2PRequester) {
}
getBlock(number: number): Promise<BlockDTO | null> {
return this.requester.getBlock(number)
}
getCurrent(): Promise<BlockDTO | null> {
return this.requester.getCurrent()
}
async getPeers(): Promise<(JSONDBPeer | null)[]> {
return (await this.requester.getPeers()).map(p => DBPeer.fromPeerDTO(PeerDTO.fromJSONObject(p)))
}
getName(): string {
return "WS2P remote"
}
}
......@@ -27,7 +27,6 @@ const nuuid = require('node-uuid');
const logger = require('../../../lib/logger').NewLogger('ws2p')
const MAXIMUM_ERRORS_COUNT = 5
const REQUEST_TIMEOUT_VALUE = WS2PConstants.REQUEST_TIMEOUT
enum WS2P_ERR {
REJECTED_PUBKEY_OR_INCORRECT_ASK_SIGNATURE_FROM_REMOTE,
......@@ -323,8 +322,8 @@ export class WS2PConnection {
connectionTimeout:number
requestTimeout:number
} = {
connectionTimeout: REQUEST_TIMEOUT_VALUE,
requestTimeout: REQUEST_TIMEOUT_VALUE
connectionTimeout: WS2PConstants.REQUEST_TIMEOUT,
requestTimeout: WS2PConstants.REQUEST_TIMEOUT
},
private expectedPub:string = "",
private expectedWS2PUID:string = ""
......@@ -346,8 +345,8 @@ export class WS2PConnection {
connectionTimeout:number,
requestTimeout:number
} = {