Commit 8cbeb23b authored by Cédric Moreau's avatar Cédric Moreau

[enh] Add `sync-mempool-fwd` command

parent 0938a372
Pipeline #5256 waiting for manual action with stages
in 31 minutes and 14 seconds
......@@ -44,6 +44,7 @@ const UNLOCK = "(SIG\\(" + INTEGER + "\\)|XHX\\(" + XUNLOCK + "\\))"
const CONDITIONS = "(&&|\\|\\|| |[()]|(SIG\\(" + PUBKEY + "\\)|(XHX\\([A-F0-9]{64}\\)|CLTV\\(" + CLTV_INTEGER + "\\)|CSV\\(" + CSV_INTEGER + "\\))))*"
const BMA_REGEXP = /^BASIC_MERKLED_API( ([a-z_][a-z0-9-_.]*))?( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))$/
const BMAS_REGEXP = /^BMAS( ([a-z_][a-z0-9-_.]*))?( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))( (\/.+))?$/
const BMATOR_REGEXP = /^BMATOR( ([a-z0-9]{16})\.onion)( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))$/
const WS2P_REGEXP = /^WS2P (?:[1-9][0-9]* )?([a-f0-9]{8}) ([a-z_][a-z0-9-_.]*|[0-9.]+|[0-9a-f:]+) ([0-9]+)(?: (.+))?$/
const WS2P_V2_REGEXP = /^WS2P ([1-9][0-9]*) ([a-f0-9]{8}) ([a-z_][a-z0-9-_.]*|[0-9.]+|[0-9a-f:]+) ([0-9]+)(?: (.+))?$/
......@@ -110,6 +111,7 @@ export const CommonConstants = {
SWITCH_ON_BRANCH_AHEAD_BY_X_BLOCKS: 3,
BMA_REGEXP,
BMAS_REGEXP,
BMATOR_REGEXP,
WS2P_REGEXP,
WS2P_V2_REGEXP,
......
......@@ -94,17 +94,28 @@ export class PeerDTO implements Cloneable {
}
getBMA() {
let bma: { dns?: string, ipv4?: string, ipv6?: string, port?: number } = {}
let bma: { dns?: string, ipv4?: string, ipv6?: string, port?: number, path?: string } = {}
let notFound = true
this.endpoints.forEach((ep) => {
const matches = notFound && ep.match(CommonConstants.BMA_REGEXP);
if (matches) {
const matchesBMA = notFound && ep.match(CommonConstants.BMA_REGEXP);
const matchesBMAS = notFound && ep.match(CommonConstants.BMAS_REGEXP);
if (matchesBMA) {
notFound = false
bma = {
"dns": matches[2] || '',
"ipv4": matches[4] || '',
"ipv6": matches[6] || '',
"port": parseInt(matches[8]) || 9101
"dns": matchesBMA[2] || '',
"ipv4": matchesBMA[4] || '',
"ipv6": matchesBMA[6] || '',
"port": parseInt(matchesBMA[8]) || 9101
};
}
else if (matchesBMAS) {
notFound = false
bma = {
"dns": matchesBMAS[2] || '',
"ipv4": matchesBMAS[4] || '',
"ipv6": matchesBMAS[6] || '',
"port": parseInt(matchesBMAS[8]) || 9101,
"path": matchesBMAS[10] || ''
};
}
});
......@@ -207,6 +218,11 @@ export class PeerDTO implements Cloneable {
return bma.port ? bma.port : null;
}
getPath() {
const bma = this.getBMA();
return bma.path ? bma.path : null;
}
getHostPreferDNS() {
const bma = this.getBMA();
return (bma.dns ? bma.dns :
......
......@@ -33,9 +33,9 @@ import {CrawlerConstants} from "./lib/constants"
import {ExitCodes} from "../../lib/common-libs/exit-codes"
import {connect} from "./lib/connect"
import {BMARemoteContacter} from "./lib/sync/BMARemoteContacter"
import {applyMempoolRequirements, pullSandboxToLocalServer} from "./lib/sandbox"
import {applyMempoolRequirements, forwardToServer, pullSandboxToLocalServer} from "./lib/sandbox"
const HOST_PATTERN = /^[^:/]+(:[0-9]{1,5})?$/
const HOST_PATTERN = /^[^:/]+(:[0-9]{1,5})?(\/.*)?$/
const FILE_PATTERN = /^(\/.+)$/
export const CrawlerDependency = {
......@@ -281,6 +281,39 @@ export const CrawlerDependency = {
}
}
}, {
name: 'sync-mempool-fwd <from> <to> <search>',
desc: 'Import all pending data from matching <search>',
onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => {
const source: string = params[0]
const target: string = params[1]
const search: string = params[2]
if (!source || !(source.match(HOST_PATTERN) || source.match(FILE_PATTERN))) {
throw 'Source of sync is required. (host[:port])'
}
if (!target || !(target.match(HOST_PATTERN) || target.match(FILE_PATTERN))) {
throw 'Target of sync is required. (host[:port])'
}
const logger = NewLogger()
const { host, port } = extractHostPort(source)
const { host: toHost, port: toPort } = extractHostPort(target)
try {
const peer = PeerDTO.fromJSONObject({ endpoints: [['BASIC_MERKLED_API', host, port].join(' ')] })
logger.info('Looking at %s...', source)
try {
const fromHost = await connect(peer)
const res = await fromHost.getRequirements(search)
await forwardToServer(server.conf.currency, res, toHost, toPort, logger)
} catch (e) {
logger.error(e);
}
await server.disconnect();
} catch(e) {
logger.error(e);
throw Error("Exiting");
}
}
}, {
name: 'forward <number> <fromHost> <fromPort> <toHost> <toPort>',
desc: 'Forward existing block <number> from a host to another',
onDatabaseExecute: async (server:Server, conf:ConfDTO, program:any, params:any) => {
......@@ -474,7 +507,7 @@ export const CrawlerDependency = {
function extractHostPort(source: string) {
const sp = source.split(':')
const onHost = sp[0]
const onPort = parseInt(sp[1] ? sp[1] : '443') // Defaults to 443
const onPort = sp[1] ? sp[1] : '443' // Defaults to 443
return {
host: onHost,
port: onPort,
......
......@@ -18,7 +18,11 @@ import {PeerDTO} from "../../../lib/dto/PeerDTO";
const DEFAULT_HOST = 'localhost';
export const connect = (peer:PeerDTO, timeout:number|null = null) => {
return Promise.resolve(new Contacter(peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST, peer.getPort() as number, {
return Promise.resolve(Contacter.fromHostPortPath(
peer.getDns() || peer.getIPv4() || peer.getIPv6() || DEFAULT_HOST,
peer.getPort() as number,
peer.getPath() as string,
{
timeout: timeout || CrawlerConstants.DEFAULT_TIMEOUT
}))
}
......@@ -12,6 +12,7 @@
// GNU Affero General Public License for more details.
import {CrawlerConstants} from "./constants"
import {HttpMembershipList} from "../../bma/lib/dtos"
const rp = require('request-promise');
const sanitize = require('../../../modules/bma/lib/sanitize')
......@@ -19,6 +20,7 @@ const dtos = require('../../../modules/bma').BmaDependency.duniter.methods.dtos;
export class Contacter {
path: string = ''
options:{ timeout:number }
fullyQualifiedHost:string
......@@ -30,6 +32,12 @@ export class Contacter {
this.fullyQualifiedHost = [host, port].join(':');
}
public static fromHostPortPath(host:string, port:number, path:string, opts: { timeout?: number }) {
const contacter = new Contacter(host, port, opts)
contacter.path = path
return contacter
}
getSummary() {
return this.get('/node/summary/', dtos.Summary)
}
......@@ -106,7 +114,7 @@ export class Contacter {
return this.post('/wot/revoke', dtos.Identity, { revocation: rev })
}
wotPending() {
wotPending(): Promise<HttpMembershipList> {
return this.get('/wot/pending', dtos.MembershipList)
}
......@@ -128,8 +136,9 @@ export class Contacter {
param = '?' + Object.keys(param).map((k) => [k, param[k]].join('=')).join('&');
}
try {
const path = this.path || ''
const json = await rp.get({
url: Contacter.protocol(this.port) + this.fullyQualifiedHost + url + (param !== undefined ? param : ''),
url: Contacter.protocol(this.port) + this.fullyQualifiedHost + path + url + (param !== undefined ? param : ''),
json: true,
timeout: this.options.timeout
});
......@@ -142,8 +151,9 @@ export class Contacter {
private async post(url:string, dtoContract:any, data:any) {
try {
const path = this.path || ''
const json = await rp.post({
url: Contacter.protocol(this.port) + this.fullyQualifiedHost + url,
url: Contacter.protocol(this.port) + this.fullyQualifiedHost + path + url,
body: data,
json: true,
timeout: this.options.timeout
......
......@@ -18,6 +18,8 @@ import {parsers} from "../../../lib/common-libs/parsers/index"
import {IRemoteContacter} from "./sync/IRemoteContacter"
import {HttpRequirements} from "../../bma/lib/dtos"
import {Watcher} from "./sync/Watcher"
import {PeerDTO} from "../../../lib/dto/PeerDTO"
import {connect} from "./connect"
export const pullSandboxToLocalServer = async (currency:string, fromHost:IRemoteContacter, toServer:Server, logger:any, watcher:any = null, nbCertsMin = 1, notify = true) => {
let res
......@@ -70,6 +72,74 @@ export async function applyMempoolRequirements(currency: string, res: HttpRequir
watcher && watcher.sbxPercent(100)
}
export async function forwardToServer(currency: string, res: HttpRequirements, toHost: string, toPort: string, logger?: any, watcher?: Watcher) {
const docs = getDocumentsTree(currency, res)
const [port, path] = toPort.split('/')
let ep = [port == '443' ? 'BMAS' : 'BASIC_MERKLED_API', toHost, port].join(' ')
if (path) {
ep += ' /' + path
}
toPort = ':' + toPort
const peer = PeerDTO.fromJSONObject({endpoints: [ep]})
logger.info('Forwarded to %s...', toHost + toPort)
const target = await connect(peer)
let t = 0
let T = docs.identities.length + docs.certifications.length + docs.revocations.length + docs.memberships.length
for (let i = 0; i < docs.identities.length; i++) {
const idty = docs.identities[i];
watcher && watcher.writeStatus('Identity ' + (i+1) + '/' + docs.identities.length)
watcher && watcher.sbxPercent((t++) / T * 100)
try {
await target.postIdentity(idty)
logger.info('Forwarded identity to %s...', toHost + toPort)
} catch (e) {
logger.warn(e)
}
}
for (let i = 0; i < docs.revocations.length; i++) {
const revo = docs.revocations[i];
watcher && watcher.writeStatus('Revocation ' + (i+1) + '/' + docs.revocations.length)
watcher && watcher.sbxPercent((t++) / T * 100)
try {
await target.postRevocation(revo)
logger.info('Forwarded revocation to %s...', toHost + toPort)
} catch (e) {
logger.warn(e)
}
}
for (let i = 0; i < docs.certifications.length; i++) {
const cert = docs.certifications[i];
watcher && watcher.writeStatus('Certification ' + (i+1) + '/' + docs.certifications.length)
watcher && watcher.sbxPercent((t++) / T * 100)
try {
await target.postCert(cert)
logger.info('Forwarded cert to %s...', toHost + toPort)
} catch (e) {
logger.warn(e)
}
}
for (let i = 0; i < docs.memberships.length; i++) {
const ms = docs.memberships[i];
watcher && watcher.writeStatus('Membership ' + (i+1) + '/' + docs.memberships.length)
watcher && watcher.sbxPercent((t++) / T * 100)
try {
await target.postRenew(ms)
logger.info('Forwarded membership to %s...', toHost + toPort)
} catch (e) {
logger.warn(e)
}
}
watcher && watcher.sbxPercent(100)
}
function getDocumentsTree(currency:string, res:HttpRequirements) {
const documents:any = {
identities: [],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment