Select Git revision
app-controllers.js
-
Benoit Lavenier authored
- Login : Alert user if login into a empty account - Fix #167 - Login : auto compute pubkey (if device has grade 'a') - Fix #170
Benoit Lavenier authored- Login : Alert user if login into a empty account - Fix #167 - Login : auto compute pubkey (if device has grade 'a') - Fix #170
index.js 13.14 KiB
"use strict";
const Q = require('q');
const co = require('co');
const util = require('util');
const stream = require('stream');
const _ = require('underscore');
const Server = require('./server');
const directory = require('./app/lib/system/directory');
const constants = require('./app/lib/constants');
const wizard = require('./app/lib/wizard');
const logger = require('./app/lib/logger')('duniter');
const dkeypairDependency = require('duniter-keypair');
const configDependency = require('./app/modules/config');
const wizardDependency = require('./app/modules/wizard');
const genDependency = require('./app/modules/gen');
const syncDependency = require('./app/modules/synchronization');
const MINIMAL_DEPENDENCIES = [
{ name: 'duniter-config', required: configDependency }
];
const DEFAULT_DEPENDENCIES = [
{ name: 'duniter-config', required: configDependency },
{ name: 'duniter-sync', required: syncDependency },
{ name: 'duniter-wizard', required: wizardDependency },
{ name: 'duniter-gen', required: genDependency },
{ name: 'duniter-keypair', required: dkeypairDependency }
];
module.exports = function (home, memory, overConf) {
return new Server(home, memory, overConf);
};
module.exports.statics = {
logger: logger,
/**
* Creates a new stack with minimal registrations only.
*/
minimalStack: () => new Stack(MINIMAL_DEPENDENCIES),
/**
* Creates a new stack with core registrations only.
*/
simpleStack: () => new Stack(DEFAULT_DEPENDENCIES),
/**
* Creates a new stack pre-registered with compliant modules found in package.json
*/
autoStack: () => {
const pjson = require('./package.json');
const duniterModules = [];
// Look for compliant packages
const prodDeps = Object.keys(pjson.dependencies);
const devDeps = Object.keys(pjson.devDependencies);
const duniterDeps = _.filter(prodDeps.concat(devDeps), (dep) => dep.match(/^duniter-/));
for(const dep of duniterDeps) {
const required = require(dep);
if (required.duniter) {
duniterModules.push({
name: dep,
required
});
}
}
// The final stack
return new Stack(DEFAULT_DEPENDENCIES.concat(duniterModules));
}
};
function Stack(dependencies) {
const that = this;
const cli = require('./app/cli')();
const configLoadingCallbacks = [];
const configBeforeSaveCallbacks = [];
const INPUT = new InputStream();
const PROCESS = new ProcessStream();
const loaded = {};
const wizardTasks = {};
const streams = {
input: [],
process: [],
output: [],
};
this.registerDependency = (requiredObject, name) => {
if (name && loaded[name]) {
// Do not try to load it twice
return;
}
loaded[name] = true;
const def = requiredObject.duniter;
for (const opt of (def.cliOptions || [])) {
cli.addOption(opt.value, opt.desc, opt.parser);
}
for (const command of (def.cli || [])) {
cli.addCommand({
name: command.name,
desc: command.desc
}, (...args) => that.processCommand.apply(null, [command].concat(args)));
}
/**
* Configuration injection
* -----------------------
*/
if (def.config) {
if (def.config.onLoading) {
configLoadingCallbacks.push(def.config.onLoading);
}
// Before the configuration is saved, the module can make some injection/cleaning
if (def.config.beforeSave) {
configBeforeSaveCallbacks.push(def.config.beforeSave);
}
}
/**
* Wizard injection
* -----------------------
*/
if (def.wizard) {
const tasks = Object.keys(def.wizard);
for (const name of tasks) {
wizardTasks[name] = def.wizard[name];
}
}
/**
* Service injection
* -----------------
*/
if (def.service) {
// To feed data coming from some I/O (network, disk, other module, ...)
if (def.service.input) {
streams.input.push(def.service.input);
}
// To handle data that has been submitted by INPUT stream
if (def.service.process) {
streams.process.push(def.service.process);
}
// To handle data that has been validated by PROCESS stream
if (def.service.output) {
streams.output.push(def.service.output);
}
}
};
this.processCommand = (...args) => co(function*() {
const command = args[0];
const program = args[1];
const params = args.slice(2);
params.pop(); // Don't need the command argument
const dbName = program.mdb;
const dbHome = program.home;
const home = directory.getHome(dbName, dbHome);
// Add log files for this instance
logger.addHomeLogs(home);
const server = new Server(home, program.memory === true, commandLineConf(program));
// If ever the process gets interrupted
let isSaving = false;
process.on('SIGINT', () => {
co(function*() {
if (!isSaving) {
isSaving = true;
// Save DB
return server.disconnect();
}
process.exit();
});
});
// Initialize server (db connection, ...)
try {
yield server.plugFileSystem();
// Register the configuration hook for loading phase (overrides the loaded data)
server.dal.loadConfHook = (conf) => co(function*() {
// Loading injection
for (const callback of configLoadingCallbacks) {
yield callback(conf, program);
}
});
// Register the configuration hook for saving phase (overrides the saved data)
server.dal.saveConfHook = (conf) => co(function*() {
const clonedConf = _.clone(conf);
for (const callback of configBeforeSaveCallbacks) {
yield callback(clonedConf, program);
}
return clonedConf;
});
const conf = yield server.loadConf();
// Auto-configuration default
yield configure(program, server, server.conf || {});
// Autosave conf
try {
yield server.dal.saveConf(conf);
logger.debug("Configuration saved.");
} catch (e) {
logger.error("Configuration could not be saved: " + e);
throw Error(e);
}
// First possible class of commands: post-config
if (command.onConfiguredExecute) {
return yield command.onConfiguredExecute(server, conf, program, params, wizardTasks);
}
// Second possible class of commands: post-service
yield server.initDAL();
return yield command.onPluggedDALExecute(server, conf, program, params,
// Start services and streaming between them
() => co(function*() {
const modules = streams.input.concat(streams.process).concat(streams.output);
// Any streaming module must implement a `startService` method
yield modules.map(module => module.startService());
// All inputs write to global INPUT stream
for (const module of streams.input) module.pipe(INPUT);
// All processes read from global INPUT stream
for (const module of streams.process) INPUT.pipe(module);
// All processes write to global PROCESS stream
for (const module of streams.process) module.pipe(PROCESS);
// All ouputs read from global PROCESS stream
for (const module of streams.output) PROCESS.pipe(module);
}),
// Stop services and streaming between them
() => co(function*() {
const modules = streams.input.concat(streams.process).concat(streams.output);
// Any streaming module must implement a `stopService` method
yield modules.map(module => module.stopService());
// Stop reading inputs
for (const module of streams.input) module.unpipe();
// Stop reading from global INPUT
INPUT.unpipe();
for (const module of streams.process) module.unpipe();
// Stop reading from global PROCESS
PROCESS.unpipe();
}));
} catch (e) {
server.disconnect();
throw e;
}
});
this.executeStack = (argv) => {
// Trace these errors
process.on('unhandledRejection', (reason) => {
logger.error('Unhandled rejection: ' + reason);
});
// Executes the command
return cli.execute(argv);
};
// We register the initial dependencies right now. Others can be added thereafter.
for (const dep of dependencies) {
that.registerDependency(dep.required, dep.name);
}
}
function commandLineConf(program, conf) {
conf = conf || {};
conf.sync = conf.sync || {};
const cli = {
currency: program.currency,
cpu: program.cpu,
server: {
port: program.port,
ipv4address: program.ipv4,
ipv6address: program.ipv6,
remote: {
host: program.remoteh,
ipv4: program.remote4,
ipv6: program.remote6,
port: program.remotep
}
},
db: {
mport: program.mport,
mdb: program.mdb,
home: program.home
},
net: {
upnp: program.upnp,
noupnp: program.noupnp
},
logs: {
http: program.httplogs,
nohttp: program.nohttplogs
},
endpoints: [],
rmEndpoints: [],
isolate: program.isolate,
forksize: program.forksize,
nofork: program.nofork,
timeout: program.timeout
};
// Update conf
if (cli.currency) conf.currency = cli.currency;
if (cli.server.ipv4address) conf.ipv4 = cli.server.ipv4address;
if (cli.server.ipv6address) conf.ipv6 = cli.server.ipv6address;
if (cli.server.port) conf.port = cli.server.port;
if (cli.server.remote.host != undefined) conf.remotehost = cli.server.remote.host;
if (cli.server.remote.ipv4 != undefined) conf.remoteipv4 = cli.server.remote.ipv4;
if (cli.server.remote.ipv6 != undefined) conf.remoteipv6 = cli.server.remote.ipv6;
if (cli.server.remote.port != undefined) conf.remoteport = cli.server.remote.port;
if (cli.net.upnp) conf.upnp = true;
if (cli.net.noupnp) conf.upnp = false;
if (cli.cpu) conf.cpu = Math.max(0.01, Math.min(1.0, cli.cpu));
if (cli.logs.http) conf.httplogs = true;
if (cli.logs.nohttp) conf.httplogs = false;
if (cli.db.mport) conf.mport = cli.db.mport;
if (cli.db.home) conf.home = cli.db.home;
if (cli.db.mdb) conf.mdb = cli.db.mdb;
if (cli.isolate) conf.isolate = cli.isolate;
if (cli.timeout) conf.timeout = cli.timeout;
if (cli.forksize != null) conf.forksize = cli.forksize;
// Specific internal settings
conf.createNext = true;
return _(conf).extend({routing: true});
}
function configure(program, server, conf) {
return co(function *() {
if (typeof server == "string" || typeof conf == "string") {
throw constants.ERRORS.CLI_CALLERR_CONFIG;
}
let wiz = wizard();
// UPnP override
if (program.noupnp === true) {
conf.upnp = false;
}
if (program.upnp === true) {
conf.upnp = true;
}
// Network autoconf
const autoconfNet = program.autoconf
|| !(conf.ipv4 || conf.ipv6)
|| !(conf.remoteipv4 || conf.remoteipv6 || conf.remotehost)
|| !(conf.port && conf.remoteport);
if (autoconfNet) {
yield Q.nbind(wiz.networkReconfiguration, wiz)(conf, autoconfNet, program.noupnp);
}
// Try to add an endpoint if provided
if (program.addep) {
if (conf.endpoints.indexOf(program.addep) === -1) {
conf.endpoints.push(program.addep);
}
// Remove it from "to be removed" list
const indexInRemove = conf.rmEndpoints.indexOf(program.addep);
if (indexInRemove !== -1) {
conf.rmEndpoints.splice(indexInRemove, 1);
}
}
// Try to remove an endpoint if provided
if (program.remep) {
if (conf.rmEndpoints.indexOf(program.remep) === -1) {
conf.rmEndpoints.push(program.remep);
}
// Remove it from "to be added" list
const indexInToAdd = conf.endpoints.indexOf(program.remep);
if (indexInToAdd !== -1) {
conf.endpoints.splice(indexInToAdd, 1);
}
}
});
}
/**
* InputStream is a special stream that filters what passes in.
* Only DUP-like documents should be treated by the processing tools, to avoid JSON injection and save CPU cycles.
* @constructor
*/
function InputStream() {
const that = this;
stream.Transform.call(this, { objectMode: true });
this._write = function (str, enc, done) {
if (typeof str === 'string') {
// Keep only strings
const matches = str.match(/Type: (.*)\n/);
if (matches && matches[1].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) {
const type = matches[1].toLowerCase();
that.push({ type, doc: str });
}
}
done && done();
};
}
function ProcessStream() {
const that = this;
stream.Transform.call(this, { objectMode: true });
this._write = function (obj, enc, done) {
// Never close the stream
if (obj !== undefined && obj !== null) {
that.push(obj);
}
done && done();
};
}
util.inherits(InputStream, stream.Transform);
util.inherits(ProcessStream, stream.Transform);