Commit 685b7aee authored by Cédric Moreau's avatar Cédric Moreau

[fix] #1037 Migrate back duniter-crawler

parent 3858bcc9
......@@ -36,5 +36,7 @@ app/modules/bma/*.js
app/modules/bma/lib/*.js
app/modules/bma/lib/entity/*.js
app/modules/bma/lib/controllers/*.js
app/modules/crawler/*.js
app/modules/crawler/lib/*.js
test/*.js
test/**/*.js
\ No newline at end of file
......@@ -276,12 +276,12 @@ export class MetaDAL extends AbstractSQLite<DBMeta> {
}
}))
let amountNotDestroyed = 0;
await _.values(allUpdates).map(async (src:any) => {
await Promise.all(_.values(allUpdates).map(async (src:any) => {
const exist = await sindexDAL.getSource(src.identifier, src.pos);
if (exist && !exist.consumed) {
amountNotDestroyed += src.amount;
}
})
}))
}
await sindexDAL.insertBatch(sourcesMovements);
},
......
......@@ -40,6 +40,10 @@ export class DBBlock {
) {
}
toBlockDTO() {
return BlockDTO.fromJSONObject(this)
}
static fromBlockDTO(b:BlockDTO) {
const dbb = new DBBlock()
dbb.version = b.version
......
......@@ -6,6 +6,12 @@ export interface Keypair {
sec: string
}
export interface BranchingDTO {
swichOnTimeAheadBy:number
avgGenTime:number
forksize:number
}
export interface CurrencyConfDTO {
c: number
dt: number
......@@ -50,7 +56,7 @@ export interface NetworkConfDTO {
httplogs:boolean
}
export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO {
export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO, BranchingDTO {
constructor(
public loglevel: string,
......
......@@ -300,7 +300,7 @@ export class Indexer {
expired_on: null,
revokes_on: null,
revocation: revocation.revocation,
chainable_on: null,
chainable_on: block.medianTime + conf.msPeriod, // Note: this is useless, because a revoked identity cannot join back. But we let this property for data consistency
revoked_on: [block.number, block.hash].join('-'),
leaving: false
})
......
This diff is collapsed.
import {CrawlerConstants} from "./constants"
import {Contacter} from "./contacter"
const DEFAULT_HOST = 'localhost';
export const connect = (peer:any, timeout:number|null = null) => {
return Promise.resolve(new Contacter(peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST, peer.getPort(), {
timeout: timeout || CrawlerConstants.DEFAULT_TIMEOUT
}))
}
const common = require('duniter-common')
export const CrawlerConstants = {
PEER_LONG_DOWN: 3600 * 24 * 2, // 48h
SYNC_LONG_TIMEOUT: 30 * 1000, // 30 seconds
DEFAULT_TIMEOUT: 10 * 1000, // 10 seconds
SWITCH_ON_BRANCH_AHEAD_BY_X_MINUTES: 30,
TRANSACTION_VERSION: common.constants.TRANSACTION_VERSION,
FORK_ALLOWED: true,
MAX_NUMBER_OF_PEERS_FOR_PULLING: 4,
PULLING_MINIMAL_DELAY: 20,
PULLING_INTERVAL_TARGET: 240,
COUNT_FOR_ENOUGH_PEERS: 4,
SANDBOX_PEERS_COUNT: 2,
SANDBOX_CHECK_INTERVAL: 288, // Every day (288 blocks = 1 day)
TEST_PEERS_INTERVAL: 10, // In seconds
SYNC_PEERS_INTERVAL: 3, // Every 3 block average generation time
DURATIONS: {
TEN_SECONDS: 10,
A_MINUTE: 60,
TEN_MINUTES: 600,
AN_HOUR: 3600,
A_DAY: 3600 * 24,
A_WEEK: 3600 * 24 * 7,
A_MONTH: (3600 * 24 * 365.25) / 12
},
ERRORS: {
NEWER_PEER_DOCUMENT_AVAILABLE: { httpCode: 409, uerr: { ucode: 2022, message: "A newer peer document is available" }},
},
ERROR: {
PEER: {
UNKNOWN_REFERENCE_BLOCK: 'Unknown reference block of peer'
}
}
}
\ No newline at end of file
import {CrawlerConstants} from "./constants"
const rp = require('request-promise');
const sanitize = require('duniter-bma').duniter.methods.sanitize;
const dtos = require('duniter-bma').duniter.methods.dtos;
export class Contacter {
options:{ timeout:number }
fullyQualifiedHost:string
constructor(private host:string, private port:number, opts:any = {}) {
this.options = {
timeout: (opts && opts.timeout) || CrawlerConstants.DEFAULT_TIMEOUT
}
// We suppose that IPv6 is already wrapped by [], for example 'http://[::1]:80/index.html'
this.fullyQualifiedHost = [host, port].join(':');
}
getSummary() {
return this.get('/node/summary/', dtos.Summary)
}
getCertifiedBy(search:string) {
return this.get('/wot/certified-by/' + search, dtos.Certifications)
}
getRequirements(search:string) {
return this.get('/wot/requirements/' + search, dtos.Requirements)
}
getRequirementsPending(minsig:number) {
return this.get('/wot/requirements-of-pending/' + minsig, dtos.Requirements)
}
getLookup(search:string) {
return this.get('/wot/lookup/', dtos.Lookup, search)
}
getBlock(number:number) {
return this.get('/blockchain/block/', dtos.Block, number)
}
getCurrent() {
return this.get('/blockchain/current', dtos.Block)
}
getPeer() {
return this.get('/network/peering', dtos.Peer)
}
getPeers(obj:any) {
return this.get('/network/peering/peers', dtos.MerkleOfPeers, obj)
}
getSources(pubkey:string) {
return this.get('/tx/sources/', dtos.Sources, pubkey)
}
getBlocks(count:number, fromNumber:number) {
return this.get('/blockchain/blocks/', dtos.Blocks, [count, fromNumber].join('/'))
}
postPeer(peer:any) {
return this.post('/network/peering/peers', dtos.Peer, { peer: peer })
}
postIdentity(raw:string) {
return this.post('/wot/add', dtos.Identity, { identity: raw })
}
postCert(cert:string) {
return this.post('/wot/certify', dtos.Cert, { cert: cert})
}
postRenew(ms:string) {
return this.post('/blockchain/membership', dtos.Membership, { membership: ms })
}
wotPending() {
return this.get('/wot/pending', dtos.MembershipList)
}
wotMembers() {
return this.get('/wot/members', dtos.Members)
}
postBlock(rawBlock:string) {
return this.post('/blockchain/block', dtos.Block, { block: rawBlock })
}
processTransaction(rawTX:string) {
return this.post('/tx/process', dtos.Transaction, { transaction: rawTX })
}
private async get(url:string, dtoContract:any, param?:any) {
if (typeof param === 'object') {
// Classical URL params (a=1&b=2&...)
param = '?' + Object.keys(param).map((k) => [k, param[k]].join('=')).join('&');
}
try {
const json = await rp.get({
url: Contacter.protocol(this.port) + this.fullyQualifiedHost + url + (param !== undefined ? param : ''),
json: true,
timeout: this.options.timeout
});
// Prevent JSON injection
return sanitize(json, dtoContract);
} catch (e) {
throw e.error;
}
}
private async post(url:string, dtoContract:any, data:any) {
try {
const json = await rp.post({
url: Contacter.protocol(this.port) + this.fullyQualifiedHost + url,
body: data,
json: true,
timeout: this.options.timeout
});
// Prevent JSON injection
return sanitize(json, dtoContract);
} catch (e) {
throw e.error;
}
}
static protocol(port:number) {
return port == 443 ? 'https://' : 'http://';
}
static async quickly(host:string, port:number, opts:any, callbackPromise:any) {
const node = new Contacter(host, port, opts);
return callbackPromise(node);
}
static async quickly2(peer:any, opts:any, callbackPromise:any) {
const Peer = require('./entity/peer');
const p = Peer.fromJSON(peer);
const node = new Contacter(p.getHostPreferDNS(), p.getPort(), opts);
return callbackPromise(node);
}
static fetchPeer(host:string, port:number, opts:any = {}) {
return Contacter.quickly(host, port, opts, (node:any) => node.getPeer())
}
static fetchBlock(number:number, peer:any, opts:any = {}) {
return Contacter.quickly2(peer, opts, (node:any) => node.getBlock(number))
}
static async isReachableFromTheInternet(peer:any, opts:any) {
const Peer = require('./entity/peer');
const p = Peer.fromJSON(peer);
const node = new Contacter(p.getHostPreferDNS(), p.getPort(), opts);
try {
await node.getPeer();
return true;
} catch (e) {
return false;
}
}
}
\ No newline at end of file
This diff is collapsed.
import {CrawlerConstants} from "./constants"
import {Server} from "../../../../server"
export const cleanLongDownPeers = async (server:Server, now:number) => {
const first_down_limit = now - CrawlerConstants.PEER_LONG_DOWN * 1000;
await server.dal.peerDAL.query('DELETE FROM peer WHERE first_down < ' + first_down_limit)
}
"use strict";
import {BlockDTO} from "../../../lib/dto/BlockDTO"
import {DBBlock} from "../../../lib/db/DBBlock"
import {PeerDTO} from "../../../lib/dto/PeerDTO"
import {BranchingDTO, ConfDTO} from "../../../lib/dto/ConfDTO"
const _ = require('underscore');
export abstract class PullingDao {
abstract applyBranch(blocks:BlockDTO[]): Promise<boolean>
abstract localCurrent(): Promise<DBBlock|null>
abstract remoteCurrent(source?:any): Promise<BlockDTO|null>
abstract remotePeers(source?:any): Promise<PeerDTO[]>
abstract getLocalBlock(number:number): Promise<DBBlock>
abstract getRemoteBlock(thePeer:PeerDTO, number:number): Promise<BlockDTO>
abstract applyMainBranch(block:BlockDTO): Promise<boolean>
abstract removeForks(): Promise<boolean>
abstract isMemberPeer(thePeer:PeerDTO): Promise<boolean>
abstract downloadBlocks(thePeer:PeerDTO, fromNumber:number, count?:number): Promise<BlockDTO[]>
}
export abstract class AbstractDAO extends PullingDao {
/**
* Sugar function. Apply a bunch of blocks instead of one.
* @param blocks
*/
async applyBranch (blocks:BlockDTO[]) {
for (const block of blocks) {
await this.applyMainBranch(block);
}
return true;
}
/**
* Binary search algorithm to find the common root block between a local and a remote blockchain.
* @param fork An object containing a peer, its current block and top fork block
* @param forksize The maximum length we can look at to find common root block.
* @returns {*|Promise}
*/
async findCommonRoot(fork:any, forksize:number) {
let commonRoot = null;
let localCurrent = await this.localCurrent();
if (!localCurrent) {
throw Error('Local blockchain is empty, cannot find a common root')
}
// We look between the top block that is known as fork ...
let topBlock = fork.block;
// ... and the bottom which is bounded by `forksize`
let bottomBlock = await this.getRemoteBlock(fork.peer, Math.max(0, localCurrent.number - forksize));
let lookBlock = bottomBlock;
let localEquivalent = await this.getLocalBlock(bottomBlock.number);
let isCommonBlock = lookBlock.hash == localEquivalent.hash;
if (isCommonBlock) {
// Then common root can be found between top and bottom. We process.
let position = -1, wrongRemotechain = false;
do {
isCommonBlock = lookBlock.hash == localEquivalent.hash;
if (!isCommonBlock) {
// Too high, look downward
topBlock = lookBlock;
position = middle(topBlock.number, bottomBlock.number);
}
else {
let upperBlock = await this.getRemoteBlock(fork.peer, lookBlock.number + 1);
let localUpper = await this.getLocalBlock(upperBlock.number);
let isCommonUpper = upperBlock.hash == localUpper.hash;
if (isCommonUpper) {
// Too low, look upward
bottomBlock = lookBlock;
position = middle(topBlock.number, bottomBlock.number);
}
else {
// Spotted!
commonRoot = lookBlock;
}
}
let noSpace = topBlock.number == bottomBlock.number + 1;
if (!commonRoot && noSpace) {
// Remote node have inconsistency blockchain, stop search
wrongRemotechain = true;
}
if (!wrongRemotechain) {
lookBlock = await this.getRemoteBlock(fork.peer, position);
localEquivalent = await this.getLocalBlock(position);
}
} while (!commonRoot && !wrongRemotechain);
}
// Otherwise common root is unreachable
return commonRoot;
}
static defaultLocalBlock() {
const localCurrent = new DBBlock()
localCurrent.number = -1
return localCurrent
}
/**
* Pull algorithm. Look at given peers' blockchain and try to pull blocks from it.
* May lead local blockchain to fork.
* @param conf The local node configuration
* @param dao An abstract layer to retrieve peers data (blocks).
* @param logger Logger of the main application.
*/
async pull(conf:BranchingDTO, logger:any) {
let localCurrent:DBBlock = await this.localCurrent() || AbstractDAO.defaultLocalBlock()
const forks:any = [];
if (!localCurrent) {
localCurrent = new DBBlock()
localCurrent.number = -1
}
const applyCoroutine = async (peer:PeerDTO, blocks:BlockDTO[]) => {
if (blocks.length > 0) {
let isFork = localCurrent
&& !(blocks[0].previousHash == localCurrent.hash
&& blocks[0].number == localCurrent.number + 1);
if (!isFork) {
await this.applyBranch(blocks);
const newLocalCurrent = await this.localCurrent()
localCurrent = newLocalCurrent || AbstractDAO.defaultLocalBlock()
const appliedSuccessfully = localCurrent.number == blocks[blocks.length - 1].number
&& localCurrent.hash == blocks[blocks.length - 1].hash;
return appliedSuccessfully;
} else {
let remoteCurrent = await this.remoteCurrent(peer);
forks.push({
peer: peer,
block: blocks[0],
current: remoteCurrent
});
return false;
}
}
return true;
}
const downloadCoroutine = async (peer:any, number:number) => {
return await this.downloadBlocks(peer, number);
}
const downloadChuncks = async (peer:PeerDTO) => {
let blocksToApply:BlockDTO[] = [];
const currentBlock = await this.localCurrent();
let currentChunckStart;
if (currentBlock) {
currentChunckStart = currentBlock.number + 1;
} else {
currentChunckStart = 0;
}
let res:any = { applied: {}, downloaded: [] }
do {
let [ applied, downloaded ] = await Promise.all([
applyCoroutine(peer, blocksToApply),
downloadCoroutine(peer, currentChunckStart)
])
res.applied = applied
res.downloaded = downloaded
blocksToApply = downloaded;
currentChunckStart += downloaded.length;
if (!applied) {
logger && logger.info("Blocks were not applied.")
}
} while (res.downloaded.length > 0 && res.applied);
}
let peers = await this.remotePeers();
// Try to get new legit blocks for local blockchain
const downloadChuncksTasks = [];
for (const peer of peers) {
downloadChuncksTasks.push(downloadChuncks(peer));
}
await Promise.all(downloadChuncksTasks)
// Filter forks: do not include mirror peers (non-member peers)
let memberForks = [];
for (const fork of forks) {
let isMember = await this.isMemberPeer(fork.peer);
if (isMember) {
memberForks.push(fork);
}
}
memberForks = memberForks.sort((f1, f2) => {
let result = compare(f1, f2, "number");
if (result == 0) {
result = compare(f1, f2, "medianTime");
}
return result;
});
let avgGenTime = conf.avgGenTime;
memberForks = _.filter(memberForks, (fork:any) => {
let blockDistance = (fork.current.number - localCurrent.number) * avgGenTime / 60;
let timeDistance = (fork.current.medianTime - localCurrent.medianTime) / 60;
logger && logger.debug('Fork of %s has blockDistance %s ; timeDistance %s ; required is >= %s for both values to try to follow the fork', fork.peer.pubkey.substr(0, 6), blockDistance.toFixed(2), timeDistance.toFixed(2), conf.swichOnTimeAheadBy);
return blockDistance >= conf.swichOnTimeAheadBy
&& timeDistance >= conf.swichOnTimeAheadBy;
});
// Remove any previous fork block
await this.removeForks();
// Find the common root block
let j = 0, successFork = false;
while (!successFork && j < memberForks.length) {
let fork = memberForks[j];
let commonRootBlock = await this.findCommonRoot(fork, conf.forksize);
if (commonRootBlock) {
let blocksToApply = await this.downloadBlocks(fork.peer, commonRootBlock.number + 1, conf.forksize);
successFork = await this.applyBranch(blocksToApply);
} else {
logger && logger.debug('No common root block with peer %s', fork.peer.pubkey.substr(0, 6));
}
j++;
}
return this.localCurrent();
}
}
function compare(f1:any, f2:any, field:string) {
if (f1[field] > f2[field]) {
return 1;
}
if (f1[field] < f2[field]) {
return -1;
}
return 0;
}
function middle(top:number, bottom:number) {
let difference = top - bottom;
if (difference % 2 == 1) {
// We look one step below to not forget any block
difference++;
}
return bottom + (difference / 2);
}
import {Contacter} from "./contacter"
const common = require('duniter-common')
export const req2fwd = async (requirements:any, toHost:string, toPort:number, logger:any) => {
const mss:any = {};
const identities:any = {};
const certs:any = {};
const targetPeer = new Contacter(toHost, toPort, { timeout: 10000 });
// Identities
for (const idty of requirements.identities) {
try {
const iid = [idty.pubkey, idty.uid, idty.meta.timestamp].join('-');
if (!identities[iid]) {
logger.info('New identity %s', idty.uid);
identities[iid] = idty;
try {
const rawIdty = common.rawer.getOfficialIdentity({
currency: 'g1',
issuer: idty.pubkey,
uid: idty.uid,
buid: idty.meta.timestamp,
sig: idty.sig
});
await targetPeer.postIdentity(rawIdty);
logger.info('Success idty %s', idty.uid);
} catch (e) {
logger.warn('Rejected idty %s...', idty.uid, e);
}
}
for (const received of idty.pendingCerts) {
const cid = [received.from, iid].join('-');
if (!certs[cid]) {
await new Promise((res) => setTimeout(res, 300));
certs[cid] = received;
const rawCert = common.rawer.getOfficialCertification({
currency: 'g1',
issuer: received.from,
idty_issuer: idty.pubkey,
idty_uid: idty.uid,
idty_buid: idty.meta.timestamp,
idty_sig: idty.sig,
buid: received.blockstamp,
sig: received.sig
});
const rawCertNoSig = common.rawer.getOfficialCertification({
currency: 'g1',
issuer: received.from,
idty_issuer: idty.pubkey,
idty_uid: idty.uid,
idty_buid: idty.meta.timestamp,
idty_sig: idty.sig,
buid: received.blockstamp
});
try {
const chkSig = common.keyring.verify(rawCertNoSig, received.sig, received.from)
if (!chkSig) {
throw "Wrong signature for certification?!"
}
await targetPeer.postCert(rawCert);
logger.info('Success cert %s -> %s', received.from, idty.uid);
} catch (e) {
logger.warn('Rejected cert %s -> %s', received.from, idty.uid, received.blockstamp.substr(0,18), e);
}
}
}
for (const theMS of idty.pendingMemberships) {
// + Membership
const