-
Cédric Moreau authoredCédric Moreau authored
index.ts 20.26 KiB
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {ExecuteCommand} from "./app/cli"
import * as stream from "stream"
import {Server} from "./server"
import {ConfDTO} from "./app/lib/dto/ConfDTO"
import {ProverDependency} from "./app/modules/prover/index"
import {KeypairDependency} from "./app/modules/keypair/index"
import {CrawlerDependency} from "./app/modules/crawler/index"
import {BmaDependency} from "./app/modules/bma/index"
import {WS2PDependency} from "./app/modules/ws2p/index"
import {ProverConstants} from "./app/modules/prover/lib/constants"
import {ProxiesConf} from './app/lib/proxy';
import {RouterDependency} from "./app/modules/router"
import {OtherConstants} from "./app/lib/other_constants"
import {Directory} from "./app/lib/system/directory"
import {Underscore} from "./app/lib/common-libs/underscore"
import {CliCommand, DuniterDependency, DuniterModule} from "./app/modules/DuniterModule"
import {ProgramOptions} from "./app/lib/common-libs/programOptions"
import {ExitCodes} from "./app/lib/common-libs/exit-codes"
const path = require('path');
const constants = require('./app/lib/constants');
const logger = require('./app/lib/logger').NewLogger('duniter');
const configDependency = require('./app/modules/config');
const wizardDependency = require('./app/modules/wizard');
const resetDependency = require('./app/modules/reset');
const checkConfDependency = require('./app/modules/check-config');
const exportBcDependency = require('./app/modules/export-bc');
const reapplyDependency = require('./app/modules/reapply');
const revertDependency = require('./app/modules/revert');
const daemonDependency = require('./app/modules/daemon');
const pSignalDependency = require('./app/modules/peersignal');
const pluginDependency = require('./app/modules/plugin');
const dumpDependency = require('./app/modules/dump');
let sigintListening = false
// Trace errors
process.on('unhandledRejection', (reason) => {
logger.error('Unhandled rejection: ' + reason);
logger.error(reason);
});
export class Stacks {
static todoOnRunDone:() => any = () => process.exit()
static async quickRun(...args:any[]) {
const deps = Array.from(args).map((f, index) => {
const canonicalPath = path.resolve(f)
return {
name: 'duniter-quick-module-' + index,
required: require(canonicalPath)
}
})
const stack = Stacks.autoStack(deps)
let res
try {
res = await stack.executeStack(Stacks.quickRunGetArgs())
} catch(e) {
console.error(e)
}
Stacks.onRunDone()
return res
}
static quickRunGetArgs() {
return process.argv.slice()
}
static onRunDone() {
return Stacks.todoOnRunDone()
}
static autoStack(priorityModules?:DuniterModule[]) {
const duniterModules = [];
let duniterDeps:any = []
try {
const pjson = require(path.resolve('./package.json'))
// Look for compliant packages
const prodDeps = Object.keys(pjson.dependencies || {});
const devDeps = Object.keys(pjson.devDependencies || {});
duniterDeps = prodDeps.concat(devDeps)
} catch (e) { /* duniter as a dependency might not be run from an NPM project */ }
for(const dep of duniterDeps) {
try {
const required = require(dep);
if (required.duniter) {
duniterModules.push({
name: dep,
required
});
}
} catch (e) { /* Silent errors for packages this fail to load */ }
}
// The final stack
return new Stack((priorityModules || []).concat(PRODUCTION_DEPENDENCIES).concat(duniterModules));
}
}
const MINIMAL_DEPENDENCIES = [
{ name: 'duniter-config', required: configDependency }
];
const DEFAULT_DEPENDENCIES = MINIMAL_DEPENDENCIES.concat([
{ name: 'duniter-wizard', required: wizardDependency },
{ name: 'duniter-reset', required: resetDependency },
{ name: 'duniter-chkconf', required: checkConfDependency },
{ name: 'duniter-exportbc', required: exportBcDependency },
{ name: 'duniter-reapply', required: reapplyDependency },
{ name: 'duniter-revert', required: revertDependency },
{ name: 'duniter-daemon', required: daemonDependency },
{ name: 'duniter-psignal', required: pSignalDependency },
{ name: 'duniter-router', required: RouterDependency },
{ name: 'duniter-plugin', required: pluginDependency },
{ name: 'duniter-prover', required: ProverDependency },
{ name: 'duniter-keypair', required: KeypairDependency },
{ name: 'duniter-crawler', required: CrawlerDependency },
{ name: 'duniter-bma', required: BmaDependency },
{ name: 'duniter-ws2p', required: WS2PDependency },
{ name: 'duniter-dump', required: dumpDependency },
]);
const PRODUCTION_DEPENDENCIES = DEFAULT_DEPENDENCIES.concat([
]);
export function NewDuniterServer(home:string, memory:boolean, overConf:any) {
return new Server(home, memory, overConf);
}
export const Statics = {
logger: logger,
/**
* Creates a new stack with minimal registrations only.
*/
minimalStack: () => new Stack(MINIMAL_DEPENDENCIES),
/**
* Creates a new stack with core registrations only.
*/
simpleStack: () => new Stack(DEFAULT_DEPENDENCIES),
/**
* Creates a new stack pre-registered with compliant modules found in package.json
*/
autoStack: (...args:any[]) => {
return Stacks.autoStack.apply(null, args)
},
quickRun: (path:string) => {
return Stacks.quickRun(path)
},
setOnRunDone: (f:()=>any) => {
return Stacks.todoOnRunDone = f
}
};
export interface DuniterService {
startService: () => Promise<any>
stopService: () => Promise<any>
}
export interface ReadableDuniterService extends DuniterService, stream.Readable {}
export interface TransformableDuniterService extends DuniterService, stream.Transform {}
export class Stack {
private injectedServices = false
private cli:any
private configLoadingCallbacks:any[]
private configBeforeSaveCallbacks:any[]
private resetDataHooks:any[]
private resetConfigHooks:any[]
private INPUT:any
private PROCESS:any
private loaded:any
private wizardTasks:any
private definitions:DuniterDependency[] = []
private streams: {
input: ReadableDuniterService[]
process: TransformableDuniterService[]
output: TransformableDuniterService[]
neutral: DuniterService[]
} = {
input: [],
process: [],
output: [],
neutral: []
}
constructor(private dependencies:DuniterModule[]) {
this.cli = ExecuteCommand()
this.configLoadingCallbacks = []
this.configBeforeSaveCallbacks = []
this.resetDataHooks = []
this.resetConfigHooks = []
this.INPUT = new InputStream()
this.PROCESS = new ProcessStream()
this.loaded = {}
this.wizardTasks = {}
// We register the initial dependencies right now. Others can be added thereafter.
for (const dep of dependencies) {
this.registerDependency(dep.required, dep.name);
}
}
// Part of modules API
getModule(name:string) {
return this.loaded[name]
}
registerDependency(requiredObject:{ duniter: DuniterDependency }, name:string) {
if (name && this.loaded[name]) {
// Do not try to load it twice
return;
}
this.loaded[name] = requiredObject;
const def = requiredObject.duniter;
this.definitions.push(def);
for (const opt of (def.cliOptions || [])) {
this.cli.addOption(opt.value, opt.desc, opt.parser);
}
for (const command of (def.cli || [])) {
this.cli.addCommand({
name: command.name,
desc: command.desc
}, (...args:any[]) => this.processCommand.apply(this, [command].concat(args)));
}
/**
* Configuration injection
* -----------------------
*/
if (def.config) {
if (def.config.onLoading) {
this.configLoadingCallbacks.push(def.config.onLoading);
}
// Before the configuration is saved, the module can make some injection/cleaning
if (def.config.beforeSave) {
this.configBeforeSaveCallbacks.push(def.config.beforeSave);
}
}
/**
* Reset data/config injection
* -----------------------
*/
if (def.onReset) {
if (def.onReset.data) {
this.resetDataHooks.push(def.onReset.data);
}
// Before the configuration is saved, the module can make some injection/cleaning
if (def.onReset.config) {
this.resetConfigHooks.push(def.onReset.config);
}
}
/**
* Wizard injection
* -----------------------
*/
if (def.wizard) {
const tasks = Object.keys(def.wizard);
for (const name of tasks) {
this.wizardTasks[name] = def.wizard[name];
}
}
};
async processCommand (...args:any[]) {
const command: CliCommand = args[0];
const program: ProgramOptions = args[1];
const params = args.slice(2);
params.pop(); // Don't need the command argument
OtherConstants.SQL_TRACES = program.sqlTraces === true
const dbName = program.mdb;
const dbHome = program.home;
const home = Directory.getHome(dbName, dbHome);
if (command.logs === false) {
logger.mute();
}
// Add log files for this instance (non-memory instances only)
if (!program.memory) {
logger.addHomeLogs(home, program.loglevel);
}
const server = new Server(home, program.memory === true, commandLineConf(program));
let piped = false
// If ever the process gets interrupted
let isSaving = false;
if (!sigintListening) {
process.on('SIGINT', async () => {
if (!isSaving) {
isSaving = true;
// Save DB
try {
await server.disconnect();
process.exit(ExitCodes.OK);
} catch (e) {
logger.error(e);
process.exit(ExitCodes.SIGINT);
}
}
})
sigintListening = true
}
// Config or Data reset hooks
server.resetDataHook = async () => {
for (const callback of this.resetDataHooks) {
await callback(server.conf, program, logger, server.dal.confDAL);
}
}
server.resetConfigHook = async () => {
for (const callback of this.resetConfigHooks) {
await callback(server.conf, program, logger, server.dal.confDAL);
}
}
// Initialize server (db connection, ...)
try {
server.onPluggedFSHook = async () => {
// Register the configuration hook for loading phase (overrides the loaded data)
server.dal.loadConfHook = async (conf:ConfDTO) => {
// Loading injection
for (const callback of this.configLoadingCallbacks) {
await callback(conf, program, logger, server.dal.confDAL);
}
}
// Register the configuration hook for saving phase (overrides the saved data)
server.dal.saveConfHook = async (conf:ConfDTO) => {
const clonedConf = Underscore.clone(conf)
for (const callback of this.configBeforeSaveCallbacks) {
await callback(clonedConf, program, logger, server.dal.confDAL);
}
return clonedConf;
}
}
await server.plugFileSystem();
const conf = await server.loadConf();
// Eventually change the log level
// Add log files for this instance (non-memory instances only)
if (!program.memory) {
logger.addHomeLogs(home, conf.loglevel);
}
// Auto-configuration default
await configure(program, server, server.conf || {});
// Autosave conf
try {
await server.dal.saveConf(conf);
logger.debug("Configuration saved.");
} catch (e) {
logger.error("Configuration could not be saved: " + e);
throw Error(e);
}
const daemon = server.getDaemon()
if (command.preventIfRunning && daemon.status()) {
throw 'Your node is currently running. Please stop it and relaunch your command.'
}
// First possible class of commands: post-config
if (command.onConfiguredExecute) {
const res = await command.onConfiguredExecute(server, conf, program, params, this.wizardTasks, this);
// If we don't have an execution callback, let's stop the command
if (!command.onDatabaseExecute) {
return res
}
} else if (!command.onDatabaseExecute) {
throw `Command ${command.name} does not implement onConfiguredExecute nor onDatabaseExecute.`
}
// Second possible class of commands: post-service
await server.initDAL(conf);
/**
* Service injection
* -----------------
*/
if (!this.injectedServices) {
this.injectedServices = true
for (const def of this.definitions) {
if (def.service) {
// To feed data coming from some I/O (network, disk, other module, ...)
if (def.service.input) {
this.streams.input.push(def.service.input(server, conf, logger));
}
// To handle data this has been submitted by INPUT stream
if (def.service.process) {
this.streams.process.push(def.service.process(server, conf, logger));
}
// To handle data this has been validated by PROCESS stream
if (def.service.output) {
this.streams.output.push(def.service.output(server, conf, logger));
}
// Special service which does not stream anything particular (ex.: piloting the `server` object)
if (def.service.neutral) {
this.streams.neutral.push(def.service.neutral(server, conf, logger));
}
}
}
}
piped = true
// All inputs write to global INPUT stream
for (const module of this.streams.input) module.pipe(this.INPUT);
// All processes read from global INPUT stream
for (const module of this.streams.process) this.INPUT.pipe(module);
// All processes write to global PROCESS stream
for (const module of this.streams.process) module.pipe(this.PROCESS);
// All ouputs read from global PROCESS stream
for (const module of this.streams.output) this.PROCESS.pipe(module);
const finalResult = await command.onDatabaseExecute(server, conf, program, params,
// Start services and streaming between them
async () => {
const modules: DuniterService[] = this.streams.neutral.concat(this.streams.input).concat(this.streams.process).concat(this.streams.output);
await Promise.all(modules.map((module:DuniterService) => module.startService()))
},
// Stop services and streaming between them
async () => {
const modules: DuniterService[] = this.streams.neutral.concat(this.streams.input).concat(this.streams.process).concat(this.streams.output);
// Any streaming module must implement a `stopService` method
await Promise.all(modules.map((module:DuniterService) => module.stopService()))
},
this);
// Close resources
await server.disconnect()
return finalResult
} catch (e) {
server.disconnect();
throw e;
} finally {
if (piped) {
// Unpipe everything, as the command is done
for (const module of this.streams.input) module.unpipe()
for (const module of this.streams.process) module.unpipe()
for (const module of this.streams.output) module.unpipe()
this.INPUT.unpipe()
this.PROCESS.unpipe()
}
}
}
executeStack(argv:string[]) {
// Executes the command
return this.cli.execute(argv);
}
}
function commandLineConf(program:any, conf:any = {}) {
conf = conf || {};
const cli = {
currency: program.currency,
cpu: program.cpu,
nbCores: program.nbCores,
prefix: program.prefix,
server: {
port: program.port,
},
proxies: {
proxySocks: program.socksProxy,
proxyTor: program.torProxy,
reachingClearEp: program.reachingClearEp,
forceTor: program.forceTor,
rmProxies: program.rmProxies
},
logs: {
http: program.httplogs,
nohttp: program.nohttplogs
},
endpoints: [],
rmEndpoints: [],
isolate: program.isolate,
forksize: program.forksize,
nofork: program.nofork,
timeout: program.timeout
};
// Declare and update proxiesConf
if (cli.proxies.proxySocks || cli.proxies.proxyTor || cli.proxies.reachingClearEp || cli.proxies.forceTor || cli.proxies.rmProxies) {
conf.proxiesConf = new ProxiesConf()
if (cli.proxies.proxySocks) conf.proxiesConf.proxySocksAddress = cli.proxies.proxySocks;
if (cli.proxies.proxyTor) conf.proxiesConf.proxyTorAddress = cli.proxies.proxyTor;
if (cli.proxies.reachingClearEp) {
switch (cli.proxies.reachingClearEp) {
case 'tor': conf.proxiesConf.reachingClearEp = 'tor'; break;
case 'none': conf.proxiesConf.reachingClearEp = 'none'; break;
}
}
if (cli.proxies.forceTor) conf.proxiesConf.forceTor = true
}
// Update the rest of the conf
if (cli.currency) conf.currency = cli.currency;
if (cli.server.port) conf.port = cli.server.port;
if (cli.cpu) conf.cpu = Math.max(0.01, Math.min(1.0, cli.cpu));
if (cli.nbCores) conf.nbCores = Math.max(1, Math.min(ProverConstants.CORES_MAXIMUM_USE_IN_PARALLEL, cli.nbCores));
if (cli.prefix) conf.prefix = Math.max(ProverConstants.MIN_PEER_ID, Math.min(ProverConstants.MAX_PEER_ID, cli.prefix));
if (cli.logs.http) conf.httplogs = true;
if (cli.logs.nohttp) conf.httplogs = false;
if (cli.isolate) conf.isolate = cli.isolate;
if (cli.timeout) conf.timeout = cli.timeout;
if (cli.forksize != null) conf.forksize = cli.forksize;
return conf;
}
async function configure(program:any, server:Server, conf:ConfDTO) {
if (typeof server == "string" || typeof conf == "string") {
throw constants.ERRORS.CLI_CALLERR_CONFIG;
}
// Try to add an endpoint if provided
if (program.addep) {
if (conf.endpoints.indexOf(program.addep) === -1) {
conf.endpoints.push(program.addep);
}
// Remove it from "to be removed" list
const indexInRemove = conf.rmEndpoints.indexOf(program.addep);
if (indexInRemove !== -1) {
conf.rmEndpoints.splice(indexInRemove, 1);
}
}
// Try to remove an endpoint if provided
if (program.remep) {
if (conf.rmEndpoints.indexOf(program.remep) === -1) {
conf.rmEndpoints.push(program.remep);
}
// Remove it from "to be added" list
const indexInToAdd = conf.endpoints.indexOf(program.remep);
if (indexInToAdd !== -1) {
conf.endpoints.splice(indexInToAdd, 1);
}
}
}
/**
* InputStream is a special stream this filters what passes in.
* Only DUP-like documents should be treated by the processing tools, to avoid JSON injection and save CPU cycles.
* @constructor
*/
class InputStream extends stream.Transform {
constructor() {
super({ objectMode: true })
}
_write(str:string, enc:any, done:any) {
if (typeof str === 'string') {
// Keep only strings
const matches = str.match(/Type: (.*)\n/);
if (matches && matches[1].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) {
const type = matches[1].toLowerCase();
this.push({ type, doc: str });
}
}
done && done();
};
}
class ProcessStream extends stream.Transform {
constructor() {
super({ objectMode: true })
}
_write(obj:any, enc:any, done:any) {
// Never close the stream
if (obj !== undefined && obj !== null) {
this.push(obj);
}
done && done();
};
}
export const Duniter = {
run(modules: DuniterModule[] = [], args?: string[]) {
return Stacks.autoStack(modules).executeStack(args || process.argv)
}
}