Skip to content
Snippets Groups Projects
Select Git revision
  • release/1.9.1 protected
  • pini-1.8-docker
  • pini-sync-onlypeers
  • duniter-v2s-issue-123-industrialize-releases
  • feature/build-aarch64-nodejs16
  • release/1.8 protected
  • pini-docker
  • ci_tags
  • fix/1448/1.8/txs_not_stored
  • dev default protected
  • feature/node-20
  • fix/1441/node_summary_with_storage
  • fix/1442/improve_bma_tx_history
  • feature/wotwizard-1.8
  • release/1.9 protected
  • 1.7 protected
  • feature/docker-set-latest protected
  • feature/fast-docker-build-1.8.4
  • fast-docker-build protected
  • feature/dump-distance
  • v1.8.7 protected
  • v1.8.7-rc4 protected
  • v1.8.7-rc3 protected
  • v1.8.7-rc2 protected
  • v1.8.7-rc1 protected
  • v1.8.6 protected
  • v1.7.23 protected
  • v1.8.5 protected
  • v1.8.4 protected
  • v1.8.3 protected
  • v1.8.2 protected
  • v1.8.1 protected
  • v1.8.0 protected
  • v1.8.0-rc1 protected
  • v1.8.0-beta5 protected
  • v1.8.0-beta4 protected
  • v1.8.0-beta3 protected
  • v1.8.0-beta2 protected
  • v1.8.0-beta protected
  • v1.7.21 protected
40 results

index.js

Blame
  • index.js 13.64 KiB
    "use strict";
    
    const Q = require('q');
    const co = require('co');
    const es = require('event-stream');
    const util = require('util');
    const stream = require('stream');
    const _ = require('underscore');
    const Server = require('./server');
    const directory = require('./app/lib/system/directory');
    const constants = require('./app/lib/constants');
    const wizard = require('./app/lib/wizard');
    const logger = require('./app/lib/logger')('duniter');
    
    const configDependency    = require('./app/modules/config');
    const wizardDependency    = require('./app/modules/wizard');
    const resetDependency     = require('./app/modules/reset');
    const checkConfDependency = require('./app/modules/check-config');
    const exportBcDependency  = require('./app/modules/export-bc');
    const reapplyDependency   = require('./app/modules/reapply');
    const revertDependency    = require('./app/modules/revert');
    const daemonDependency    = require('./app/modules/daemon');
    const pSignalDependency   = require('./app/modules/peersignal');
    const routerDependency    = require('./app/modules/router');
    
    const MINIMAL_DEPENDENCIES = [
      { name: 'duniter-config',    required: configDependency }
    ];
    
    const DEFAULT_DEPENDENCIES = MINIMAL_DEPENDENCIES.concat([
      { name: 'duniter-wizard',    required: wizardDependency },
      { name: 'duniter-reset',     required: resetDependency },
      { name: 'duniter-chkconf',   required: checkConfDependency },
      { name: 'duniter-exportbc',  required: exportBcDependency },
      { name: 'duniter-reapply',   required: reapplyDependency },
      { name: 'duniter-revert',    required: revertDependency },
      { name: 'duniter-daemon',    required: daemonDependency },
      { name: 'duniter-psignal',   required: pSignalDependency },
      { name: 'duniter-router',    required: routerDependency }
    ]);
    
    const PRODUCTION_DEPENDENCIES = DEFAULT_DEPENDENCIES.concat([
    ]);
    
    module.exports = function (home, memory, overConf) {
      return new Server(home, memory, overConf);
    };
    
    module.exports.statics = {
    
      logger: logger,
    
      /**
       * Creates a new stack with minimal registrations only.
       */
      minimalStack: () => new Stack(MINIMAL_DEPENDENCIES),
    
      /**
       * Creates a new stack with core registrations only.
       */
      simpleStack: () => new Stack(DEFAULT_DEPENDENCIES),
    
      /**
       * Creates a new stack pre-registered with compliant modules found in package.json
       */
      autoStack: (priorityModules) => {
        const pjson = require('./package.json');
        const duniterModules = [];
    
        // Look for compliant packages
        const prodDeps = Object.keys(pjson.dependencies);
        const devDeps = Object.keys(pjson.devDependencies);
        const duniterDeps = _.filter(prodDeps.concat(devDeps), (dep) => dep.match(/^duniter-/));
        for(const dep of duniterDeps) {
          const required = require(dep);
          if (required.duniter) {
            duniterModules.push({
              name: dep,
              required
            });
          }
        }
    
        // The final stack
        return new Stack((priorityModules || []).concat(PRODUCTION_DEPENDENCIES).concat(duniterModules));
      }
    };
    
    function Stack(dependencies) {
    
      const that = this;
      const cli = require('./app/cli')();
      const configLoadingCallbacks = [];
      const configBeforeSaveCallbacks = [];
      const INPUT = new InputStream();
      const PROCESS = new ProcessStream();
      const loaded = {};
      const wizardTasks = {};
    
      const definitions = [];
      const streams = {
        input: [],
        process: [],
        output: [],
        neutral: []
      };
    
      this.registerDependency = (requiredObject, name) => {
        if (name && loaded[name]) {
          // Do not try to load it twice
          return;
        }
        loaded[name] = true;
        const def = requiredObject.duniter;
        definitions.push(def);
        for (const opt of (def.cliOptions || [])) {
          cli.addOption(opt.value, opt.desc, opt.parser);
        }
        for (const command of (def.cli || [])) {
          cli.addCommand({
            name: command.name,
            desc: command.desc
          }, (...args) => that.processCommand.apply(null, [command].concat(args)));
        }
    
        /**
         * Configuration injection
         * -----------------------
         */
        if (def.config) {
          if (def.config.onLoading) {
            configLoadingCallbacks.push(def.config.onLoading);
          }
          // Before the configuration is saved, the module can make some injection/cleaning
          if (def.config.beforeSave) {
            configBeforeSaveCallbacks.push(def.config.beforeSave);
          }
        }
    
        /**
         * Wizard injection
         * -----------------------
         */
        if (def.wizard) {
          const tasks = Object.keys(def.wizard);
          for (const name of tasks) {
            wizardTasks[name] = def.wizard[name];
          }
        }
      };
    
      this.processCommand = (...args) => co(function*() {
        const command = args[0];
        const program = args[1];
        const params  = args.slice(2);
        params.pop(); // Don't need the command argument
    
        const dbName = program.mdb;
        const dbHome = program.home;
        const home = directory.getHome(dbName, dbHome);
    
        if (command.logs === false) {
          logger.mute();
        }
    
        // Add log files for this instance
        logger.addHomeLogs(home, program.loglevel);
    
        const server = new Server(home, program.memory === true, commandLineConf(program));
    
        // If ever the process gets interrupted
        let isSaving = false;
        process.on('SIGINT', () => {
          co(function*() {
            if (!isSaving) {
              isSaving = true;
              // Save DB
              try {
                yield server.disconnect();
                process.exit();
              } catch (e) {
                logger.error(e);
                process.exit(3);
              }
            }
          });
        });
    
        // Initialize server (db connection, ...)
        try {
          server.onPluggedFSHook = () => co(function*() {
    
            // Register the configuration hook for loading phase (overrides the loaded data)
            server.dal.loadConfHook = (conf) => co(function*() {
              // Loading injection
              for (const callback of configLoadingCallbacks) {
                yield callback(conf, program, logger, server.dal.confDAL);
              }
            });
    
            // Register the configuration hook for saving phase (overrides the saved data)
            server.dal.saveConfHook = (conf) => co(function*() {
              const clonedConf = _.clone(conf);
              for (const callback of configBeforeSaveCallbacks) {
                yield callback(clonedConf, program, logger, server.dal.confDAL);
              }
              return clonedConf;
            });
          })
          yield server.plugFileSystem();
    
          const conf = yield server.loadConf();
    
          // Eventually change the log level
          logger.addHomeLogs(home, conf.loglevel);
    
          // Auto-configuration default
          yield configure(program, server, server.conf || {});
          // Autosave conf
          try {
            yield server.dal.saveConf(conf);
            logger.debug("Configuration saved.");
          } catch (e) {
            logger.error("Configuration could not be saved: " + e);
            throw Error(e);
          }
    
          const daemon = server.getDaemon()
          if (command.preventIfRunning && daemon.status()) {
            throw 'Your node is currently running. Please stop it and relaunch your command.'
          }
    
          // First possible class of commands: post-config
          if (command.onConfiguredExecute) {
            return yield command.onConfiguredExecute(server, conf, program, params, wizardTasks, that);
          }
          // Second possible class of commands: post-service
          yield server.initDAL();
    
          /**
           * Service injection
           * -----------------
           */
          for (const def of definitions) {
            if (def.service) {
              // To feed data coming from some I/O (network, disk, other module, ...)
              if (def.service.input) {
                streams.input.push(def.service.input(server, conf, logger));
              }
              // To handle data that has been submitted by INPUT stream
              if (def.service.process) {
                streams.process.push(def.service.process(server, conf, logger));
              }
              // To handle data that has been validated by PROCESS stream
              if (def.service.output) {
                streams.output.push(def.service.output(server, conf, logger));
              }
              // Special service which does not stream anything particular (ex.: piloting the `server` object)
              if (def.service.neutral) {
                streams.neutral.push(def.service.neutral(server, conf, logger));
              }
            }
          }
          // All inputs write to global INPUT stream
          for (const module of streams.input) module.pipe(INPUT);
          // All processes read from global INPUT stream
          for (const module of streams.process) INPUT.pipe(module);
          // All processes write to global PROCESS stream
          for (const module of streams.process) module.pipe(PROCESS);
          // All ouputs read from global PROCESS stream
          for (const module of streams.output) PROCESS.pipe(module);
    
          return yield command.onDatabaseExecute(server, conf, program, params,
    
            // Start services and streaming between them
            () => co(function*() {
              // Any streaming module must implement a `startService` method
              for (const m of streams.input) {
                yield m.startService();
              }
              const modules = [].concat(streams.process).concat(streams.output).concat(streams.neutral);
              yield modules.map(module => module.startService());
            }),
    
            // Stop services and streaming between them
            () => co(function*() {
              const modules = streams.input.concat(streams.process).concat(streams.output);
              // Any streaming module must implement a `stopService` method
              yield modules.map(module => module.stopService());
              // // Stop reading inputs
              // for (const module of streams.input) module.unpipe();
              // Stop reading from global INPUT
              // INPUT.unpipe();
              // for (const module of streams.process) module.unpipe();
              // // Stop reading from global PROCESS
              // PROCESS.unpipe();
            }), that);
        } catch (e) {
          server.disconnect();
          throw e;
        }
      });
    
      this.executeStack = (argv) => {
    
        // Trace these errors
        process.on('unhandledRejection', (reason) => {
          logger.error('Unhandled rejection: ' + reason);
          logger.error(reason);
        });
    
        // Executes the command
        return cli.execute(argv);
      };
    
      // We register the initial dependencies right now. Others can be added thereafter.
      for (const dep of dependencies) {
        that.registerDependency(dep.required, dep.name);
      }
    }
    
    function commandLineConf(program, conf) {
    
      conf = conf || {};
      conf.sync = conf.sync || {};
      const cli = {
        currency: program.currency,
        cpu: program.cpu,
        server: {
          port: program.port,
        },
        db: {
          mport: program.mport,
          mdb: program.mdb,
          home: program.home
        },
        logs: {
          http: program.httplogs,
          nohttp: program.nohttplogs
        },
        endpoints: [],
        rmEndpoints: [],
        isolate: program.isolate,
        forksize: program.forksize,
        nofork: program.nofork,
        timeout: program.timeout
      };
    
      // Update conf
      if (cli.currency)                         conf.currency = cli.currency;
      if (cli.server.port)                      conf.port = cli.server.port;
      if (cli.cpu)                              conf.cpu = Math.max(0.01, Math.min(1.0, cli.cpu));
      if (cli.logs.http)                        conf.httplogs = true;
      if (cli.logs.nohttp)                      conf.httplogs = false;
      if (cli.db.mport)                         conf.mport = cli.db.mport;
      if (cli.db.home)                          conf.home = cli.db.home;
      if (cli.db.mdb)                           conf.mdb = cli.db.mdb;
      if (cli.isolate)                          conf.isolate = cli.isolate;
      if (cli.timeout)                          conf.timeout = cli.timeout;
      if (cli.forksize != null)                 conf.forksize = cli.forksize;
    
      return conf;
    }
    
    function configure(program, server, conf) {
      return co(function *() {
        if (typeof server == "string" || typeof conf == "string") {
          throw constants.ERRORS.CLI_CALLERR_CONFIG;
        }
        // Try to add an endpoint if provided
        if (program.addep) {
          if (conf.endpoints.indexOf(program.addep) === -1) {
            conf.endpoints.push(program.addep);
          }
          // Remove it from "to be removed" list
          const indexInRemove = conf.rmEndpoints.indexOf(program.addep);
          if (indexInRemove !== -1) {
            conf.rmEndpoints.splice(indexInRemove, 1);
          }
        }
        // Try to remove an endpoint if provided
        if (program.remep) {
          if (conf.rmEndpoints.indexOf(program.remep) === -1) {
            conf.rmEndpoints.push(program.remep);
          }
          // Remove it from "to be added" list
          const indexInToAdd = conf.endpoints.indexOf(program.remep);
          if (indexInToAdd !== -1) {
            conf.endpoints.splice(indexInToAdd, 1);
          }
        }
      });
    }
    
    /**
     * InputStream is a special stream that filters what passes in.
     * Only DUP-like documents should be treated by the processing tools, to avoid JSON injection and save CPU cycles.
     * @constructor
     */
    function InputStream() {
    
      const that = this;
    
      stream.Transform.call(this, { objectMode: true });
    
      this._write = function (str, enc, done) {
        if (typeof str === 'string') {
          // Keep only strings
          const matches = str.match(/Type: (.*)\n/);
          if (matches && matches[1].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) {
            const type = matches[1].toLowerCase();
            that.push({ type, doc: str });
          }
        }
        done && done();
      };
    }
    
    function ProcessStream() {
    
      const that = this;
    
      stream.Transform.call(this, { objectMode: true });
    
      this._write = function (obj, enc, done) {
        // Never close the stream
        if (obj !== undefined && obj !== null) {
          that.push(obj);
        }
        done && done();
      };
    }
    
    util.inherits(InputStream, stream.Transform);
    util.inherits(ProcessStream, stream.Transform);