Select Git revision
-
Cédric Moreau authoredCédric Moreau authored
index.js 13.64 KiB
"use strict";
const Q = require('q');
const co = require('co');
const es = require('event-stream');
const util = require('util');
const stream = require('stream');
const _ = require('underscore');
const Server = require('./server');
const directory = require('./app/lib/system/directory');
const constants = require('./app/lib/constants');
const wizard = require('./app/lib/wizard');
const logger = require('./app/lib/logger')('duniter');
const configDependency = require('./app/modules/config');
const wizardDependency = require('./app/modules/wizard');
const resetDependency = require('./app/modules/reset');
const checkConfDependency = require('./app/modules/check-config');
const exportBcDependency = require('./app/modules/export-bc');
const reapplyDependency = require('./app/modules/reapply');
const revertDependency = require('./app/modules/revert');
const daemonDependency = require('./app/modules/daemon');
const pSignalDependency = require('./app/modules/peersignal');
const routerDependency = require('./app/modules/router');
const MINIMAL_DEPENDENCIES = [
{ name: 'duniter-config', required: configDependency }
];
const DEFAULT_DEPENDENCIES = MINIMAL_DEPENDENCIES.concat([
{ name: 'duniter-wizard', required: wizardDependency },
{ name: 'duniter-reset', required: resetDependency },
{ name: 'duniter-chkconf', required: checkConfDependency },
{ name: 'duniter-exportbc', required: exportBcDependency },
{ name: 'duniter-reapply', required: reapplyDependency },
{ name: 'duniter-revert', required: revertDependency },
{ name: 'duniter-daemon', required: daemonDependency },
{ name: 'duniter-psignal', required: pSignalDependency },
{ name: 'duniter-router', required: routerDependency }
]);
const PRODUCTION_DEPENDENCIES = DEFAULT_DEPENDENCIES.concat([
]);
module.exports = function (home, memory, overConf) {
return new Server(home, memory, overConf);
};
module.exports.statics = {
logger: logger,
/**
* Creates a new stack with minimal registrations only.
*/
minimalStack: () => new Stack(MINIMAL_DEPENDENCIES),
/**
* Creates a new stack with core registrations only.
*/
simpleStack: () => new Stack(DEFAULT_DEPENDENCIES),
/**
* Creates a new stack pre-registered with compliant modules found in package.json
*/
autoStack: (priorityModules) => {
const pjson = require('./package.json');
const duniterModules = [];
// Look for compliant packages
const prodDeps = Object.keys(pjson.dependencies);
const devDeps = Object.keys(pjson.devDependencies);
const duniterDeps = _.filter(prodDeps.concat(devDeps), (dep) => dep.match(/^duniter-/));
for(const dep of duniterDeps) {
const required = require(dep);
if (required.duniter) {
duniterModules.push({
name: dep,
required
});
}
}
// The final stack
return new Stack((priorityModules || []).concat(PRODUCTION_DEPENDENCIES).concat(duniterModules));
}
};
function Stack(dependencies) {
const that = this;
const cli = require('./app/cli')();
const configLoadingCallbacks = [];
const configBeforeSaveCallbacks = [];
const INPUT = new InputStream();
const PROCESS = new ProcessStream();
const loaded = {};
const wizardTasks = {};
const definitions = [];
const streams = {
input: [],
process: [],
output: [],
neutral: []
};
this.registerDependency = (requiredObject, name) => {
if (name && loaded[name]) {
// Do not try to load it twice
return;
}
loaded[name] = true;
const def = requiredObject.duniter;
definitions.push(def);
for (const opt of (def.cliOptions || [])) {
cli.addOption(opt.value, opt.desc, opt.parser);
}
for (const command of (def.cli || [])) {
cli.addCommand({
name: command.name,
desc: command.desc
}, (...args) => that.processCommand.apply(null, [command].concat(args)));
}
/**
* Configuration injection
* -----------------------
*/
if (def.config) {
if (def.config.onLoading) {
configLoadingCallbacks.push(def.config.onLoading);
}
// Before the configuration is saved, the module can make some injection/cleaning
if (def.config.beforeSave) {
configBeforeSaveCallbacks.push(def.config.beforeSave);
}
}
/**
* Wizard injection
* -----------------------
*/
if (def.wizard) {
const tasks = Object.keys(def.wizard);
for (const name of tasks) {
wizardTasks[name] = def.wizard[name];
}
}
};
this.processCommand = (...args) => co(function*() {
const command = args[0];
const program = args[1];
const params = args.slice(2);
params.pop(); // Don't need the command argument
const dbName = program.mdb;
const dbHome = program.home;
const home = directory.getHome(dbName, dbHome);
if (command.logs === false) {
logger.mute();
}
// Add log files for this instance
logger.addHomeLogs(home, program.loglevel);
const server = new Server(home, program.memory === true, commandLineConf(program));
// If ever the process gets interrupted
let isSaving = false;
process.on('SIGINT', () => {
co(function*() {
if (!isSaving) {
isSaving = true;
// Save DB
try {
yield server.disconnect();
process.exit();
} catch (e) {
logger.error(e);
process.exit(3);
}
}
});
});
// Initialize server (db connection, ...)
try {
server.onPluggedFSHook = () => co(function*() {
// Register the configuration hook for loading phase (overrides the loaded data)
server.dal.loadConfHook = (conf) => co(function*() {
// Loading injection
for (const callback of configLoadingCallbacks) {
yield callback(conf, program, logger, server.dal.confDAL);
}
});
// Register the configuration hook for saving phase (overrides the saved data)
server.dal.saveConfHook = (conf) => co(function*() {
const clonedConf = _.clone(conf);
for (const callback of configBeforeSaveCallbacks) {
yield callback(clonedConf, program, logger, server.dal.confDAL);
}
return clonedConf;
});
})
yield server.plugFileSystem();
const conf = yield server.loadConf();
// Eventually change the log level
logger.addHomeLogs(home, conf.loglevel);
// Auto-configuration default
yield configure(program, server, server.conf || {});
// Autosave conf
try {
yield server.dal.saveConf(conf);
logger.debug("Configuration saved.");
} catch (e) {
logger.error("Configuration could not be saved: " + e);
throw Error(e);
}
const daemon = server.getDaemon()
if (command.preventIfRunning && daemon.status()) {
throw 'Your node is currently running. Please stop it and relaunch your command.'
}
// First possible class of commands: post-config
if (command.onConfiguredExecute) {
return yield command.onConfiguredExecute(server, conf, program, params, wizardTasks, that);
}
// Second possible class of commands: post-service
yield server.initDAL();
/**
* Service injection
* -----------------
*/
for (const def of definitions) {
if (def.service) {
// To feed data coming from some I/O (network, disk, other module, ...)
if (def.service.input) {
streams.input.push(def.service.input(server, conf, logger));
}
// To handle data that has been submitted by INPUT stream
if (def.service.process) {
streams.process.push(def.service.process(server, conf, logger));
}
// To handle data that has been validated by PROCESS stream
if (def.service.output) {
streams.output.push(def.service.output(server, conf, logger));
}
// Special service which does not stream anything particular (ex.: piloting the `server` object)
if (def.service.neutral) {
streams.neutral.push(def.service.neutral(server, conf, logger));
}
}
}
// All inputs write to global INPUT stream
for (const module of streams.input) module.pipe(INPUT);
// All processes read from global INPUT stream
for (const module of streams.process) INPUT.pipe(module);
// All processes write to global PROCESS stream
for (const module of streams.process) module.pipe(PROCESS);
// All ouputs read from global PROCESS stream
for (const module of streams.output) PROCESS.pipe(module);
return yield command.onDatabaseExecute(server, conf, program, params,
// Start services and streaming between them
() => co(function*() {
// Any streaming module must implement a `startService` method
for (const m of streams.input) {
yield m.startService();
}
const modules = [].concat(streams.process).concat(streams.output).concat(streams.neutral);
yield modules.map(module => module.startService());
}),
// Stop services and streaming between them
() => co(function*() {
const modules = streams.input.concat(streams.process).concat(streams.output);
// Any streaming module must implement a `stopService` method
yield modules.map(module => module.stopService());
// // Stop reading inputs
// for (const module of streams.input) module.unpipe();
// Stop reading from global INPUT
// INPUT.unpipe();
// for (const module of streams.process) module.unpipe();
// // Stop reading from global PROCESS
// PROCESS.unpipe();
}), that);
} catch (e) {
server.disconnect();
throw e;
}
});
this.executeStack = (argv) => {
// Trace these errors
process.on('unhandledRejection', (reason) => {
logger.error('Unhandled rejection: ' + reason);
logger.error(reason);
});
// Executes the command
return cli.execute(argv);
};
// We register the initial dependencies right now. Others can be added thereafter.
for (const dep of dependencies) {
that.registerDependency(dep.required, dep.name);
}
}
function commandLineConf(program, conf) {
conf = conf || {};
conf.sync = conf.sync || {};
const cli = {
currency: program.currency,
cpu: program.cpu,
server: {
port: program.port,
},
db: {
mport: program.mport,
mdb: program.mdb,
home: program.home
},
logs: {
http: program.httplogs,
nohttp: program.nohttplogs
},
endpoints: [],
rmEndpoints: [],
isolate: program.isolate,
forksize: program.forksize,
nofork: program.nofork,
timeout: program.timeout
};
// Update conf
if (cli.currency) conf.currency = cli.currency;
if (cli.server.port) conf.port = cli.server.port;
if (cli.cpu) conf.cpu = Math.max(0.01, Math.min(1.0, cli.cpu));
if (cli.logs.http) conf.httplogs = true;
if (cli.logs.nohttp) conf.httplogs = false;
if (cli.db.mport) conf.mport = cli.db.mport;
if (cli.db.home) conf.home = cli.db.home;
if (cli.db.mdb) conf.mdb = cli.db.mdb;
if (cli.isolate) conf.isolate = cli.isolate;
if (cli.timeout) conf.timeout = cli.timeout;
if (cli.forksize != null) conf.forksize = cli.forksize;
return conf;
}
function configure(program, server, conf) {
return co(function *() {
if (typeof server == "string" || typeof conf == "string") {
throw constants.ERRORS.CLI_CALLERR_CONFIG;
}
// Try to add an endpoint if provided
if (program.addep) {
if (conf.endpoints.indexOf(program.addep) === -1) {
conf.endpoints.push(program.addep);
}
// Remove it from "to be removed" list
const indexInRemove = conf.rmEndpoints.indexOf(program.addep);
if (indexInRemove !== -1) {
conf.rmEndpoints.splice(indexInRemove, 1);
}
}
// Try to remove an endpoint if provided
if (program.remep) {
if (conf.rmEndpoints.indexOf(program.remep) === -1) {
conf.rmEndpoints.push(program.remep);
}
// Remove it from "to be added" list
const indexInToAdd = conf.endpoints.indexOf(program.remep);
if (indexInToAdd !== -1) {
conf.endpoints.splice(indexInToAdd, 1);
}
}
});
}
/**
* InputStream is a special stream that filters what passes in.
* Only DUP-like documents should be treated by the processing tools, to avoid JSON injection and save CPU cycles.
* @constructor
*/
function InputStream() {
const that = this;
stream.Transform.call(this, { objectMode: true });
this._write = function (str, enc, done) {
if (typeof str === 'string') {
// Keep only strings
const matches = str.match(/Type: (.*)\n/);
if (matches && matches[1].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) {
const type = matches[1].toLowerCase();
that.push({ type, doc: str });
}
}
done && done();
};
}
function ProcessStream() {
const that = this;
stream.Transform.call(this, { objectMode: true });
this._write = function (obj, enc, done) {
// Never close the stream
if (obj !== undefined && obj !== null) {
that.push(obj);
}
done && done();
};
}
util.inherits(InputStream, stream.Transform);
util.inherits(ProcessStream, stream.Transform);