Skip to content
Snippets Groups Projects
Select Git revision
  • c98d492f3fa3f214db607deaa202eefbf7aeff6e
  • dev default protected
  • release/1.9.1 protected
  • pini-1.8-docker
  • pini-sync-onlypeers
  • duniter-v2s-issue-123-industrialize-releases
  • feature/build-aarch64-nodejs16
  • release/1.8 protected
  • pini-docker
  • ci_tags
  • fix/1448/1.8/txs_not_stored
  • feature/node-20
  • fix/1441/node_summary_with_storage
  • fix/1442/improve_bma_tx_history
  • feature/wotwizard-1.8
  • release/1.9 protected
  • 1.7 protected
  • feature/docker-set-latest protected
  • feature/fast-docker-build-1.8.4
  • fast-docker-build protected
  • feature/dump-distance
  • v1.8.7 protected
  • v1.8.7-rc4 protected
  • v1.8.7-rc3 protected
  • v1.8.7-rc2 protected
  • v1.8.7-rc1 protected
  • v1.8.6 protected
  • v1.7.23 protected
  • v1.8.5 protected
  • v1.8.4 protected
  • v1.8.3 protected
  • v1.8.2 protected
  • v1.8.1 protected
  • v1.8.0 protected
  • v1.8.0-rc1 protected
  • v1.8.0-beta5 protected
  • v1.8.0-beta4 protected
  • v1.8.0-beta3 protected
  • v1.8.0-beta2 protected
  • v1.8.0-beta protected
  • v1.7.21 protected
41 results

server.js

Blame
  • server.js 16.89 KiB
    "use strict";
    const stream      = require('stream');
    const util        = require('util');
    const path        = require('path');
    const co          = require('co');
    const _           = require('underscore');
    const Q           = require('q');
    const archiver    = require('archiver');
    const unzip       = require('unzip2');
    const fs          = require('fs');
    const daemonize   = require("daemonize2")
    const parsers     = require('./app/lib/streams/parsers');
    const constants   = require('./app/lib/constants');
    const fileDAL     = require('./app/lib/dal/fileDAL');
    const jsonpckg    = require('./package.json');
    const keyring      = require('duniter-common').keyring;
    const directory   = require('./app/lib/system/directory');
    const rawer       = require('duniter-common').rawer;
    
    function Server (home, memoryOnly, overrideConf) {
    
      stream.Duplex.call(this, { objectMode: true });
    
      const paramsP = directory.getHomeParams(memoryOnly, home);
      const logger = require('./app/lib/logger')('server');
      const that = this;
      that.home = home;
      that.conf = null;
      that.dal = null;
      that.version = jsonpckg.version;
      that.logger = logger;
    
      // External libs
      that.lib = {};
      that.lib.keyring = require('duniter-common').keyring;
      that.lib.Identity = require('./app/lib/entity/identity');
      that.lib.Certification = require('./app/lib/entity/certification');
      that.lib.Transaction = require('./app/lib/entity/transaction');
      that.lib.Peer = require('./app/lib/entity/peer');
      that.lib.Membership = require('./app/lib/entity/membership');
      that.lib.Block = require('./app/lib/entity/block');
      that.lib.Stat = require('./app/lib/entity/stat');
      that.lib.rawer = require('duniter-common').rawer;
      that.lib.parsers = require('./app/lib/streams/parsers');
      that.lib.http2raw = require('duniter-bma').duniter.methods.http2raw;
      that.lib.dos2unix = require('duniter-common').dos2unix;
      that.lib.bma = require('duniter-bma').duniter.methods.bma;
      that.lib.constants = require('./app/lib/constants');
      that.lib.ucp = require('duniter-common').buid;
      that.lib.hashf = require('duniter-common').hashf;
      that.lib.indexer = require('./app/lib/dup/indexer');
      that.lib.rules = require('./app/lib/rules');
    
      that.MerkleService       = require("./app/lib/helpers/merkle");
      that.IdentityService     = require('./app/service/IdentityService')();
      that.MembershipService   = require('./app/service/MembershipService')();
      that.PeeringService      = require('./app/service/PeeringService')(that);
      that.BlockchainService   = require('./app/service/BlockchainService')(that);
      that.TransactionsService = require('./app/service/TransactionsService')();
    
      // Create document mapping
      const documentsMapping = {
        'identity':      { action: that.IdentityService.submitIdentity,                                               parser: parsers.parseIdentity },
        'certification': { action: that.IdentityService.submitCertification,                                          parser: parsers.parseCertification},
        'revocation':    { action: that.IdentityService.submitRevocation,                                             parser: parsers.parseRevocation },
        'membership':    { action: that.MembershipService.submitMembership,                                           parser: parsers.parseMembership },
        'peer':          { action: that.PeeringService.submitP,                                                       parser: parsers.parsePeer },
        'transaction':   { action: that.TransactionsService.processTx,                                                parser: parsers.parseTransaction },
        'block':         { action: _.partial(that.BlockchainService.submitBlock, _, true, constants.NO_FORK_ALLOWED), parser: parsers.parseBlock }
      };
    
      // Unused, but made mandatory by Duplex interface
      this._read = () => null;
    
      this._write = (obj, enc, writeDone) => that.submit(obj, false, () => writeDone);
    
      /**
       * Facade method to control what is pushed to the stream (we don't want it to be closed)
       * @param obj An object to be pushed to the stream.
       */
      this.streamPush = (obj) => {
        if (obj) {
          that.push(obj);
        }
      };
    
      this.getBcContext = () => this.BlockchainService.getContext();
    
      this.plugFileSystem = () => co(function *() {
        logger.debug('Plugging file system...');
        const params = yield paramsP;
        that.dal = fileDAL(params);
      });
    
      this.unplugFileSystem = () => co(function *() {
        logger.debug('Unplugging file system...');
        yield that.dal.close();
      });
    
      this.loadConf = (useDefaultConf) => co(function *() {
        logger.debug('Loading conf...');
        that.conf = yield that.dal.loadConf(overrideConf, useDefaultConf);
        // Default values
        const defaultValues = {
          remoteipv6:         that.conf.ipv6,
          remoteport:         that.conf.port,
          c:                  constants.CONTRACT.DEFAULT.C,
          dt:                 constants.CONTRACT.DEFAULT.DT,
          ud0:                constants.CONTRACT.DEFAULT.UD0,
          stepMax:            constants.CONTRACT.DEFAULT.STEPMAX,
          sigPeriod:          constants.CONTRACT.DEFAULT.SIGPERIOD,
          sigStock:           constants.CONTRACT.DEFAULT.SIGSTOCK,
          sigWindow:          constants.CONTRACT.DEFAULT.SIGWINDOW,
          sigValidity:        constants.CONTRACT.DEFAULT.SIGVALIDITY,
          msValidity:         constants.CONTRACT.DEFAULT.MSVALIDITY,
          sigQty:             constants.CONTRACT.DEFAULT.SIGQTY,
          idtyWindow:         constants.CONTRACT.DEFAULT.IDTYWINDOW,
          msWindow:           constants.CONTRACT.DEFAULT.MSWINDOW,
          xpercent:           constants.CONTRACT.DEFAULT.X_PERCENT,
          percentRot:         constants.CONTRACT.DEFAULT.PERCENTROT,
          powDelay:           constants.CONTRACT.DEFAULT.POWDELAY,
          avgGenTime:         constants.CONTRACT.DEFAULT.AVGGENTIME,
          dtDiffEval:         constants.CONTRACT.DEFAULT.DTDIFFEVAL,
          medianTimeBlocks:   constants.CONTRACT.DEFAULT.MEDIANTIMEBLOCKS,
          rootoffset:         0,
          forksize:           constants.BRANCHES.DEFAULT_WINDOW_SIZE
        };
        _.keys(defaultValues).forEach(function(key){
          if (that.conf[key] == undefined) {
            that.conf[key] = defaultValues[key];
          }
        });
        // Extract key pair
        that.keyPair = keyring.Key(that.conf.pair.pub, that.conf.pair.sec);
        that.sign = that.keyPair.sign;
        // Update services
        [that.IdentityService, that.MembershipService, that.PeeringService, that.BlockchainService, that.TransactionsService].map((service) => {
          service.setConfDAL(that.conf, that.dal, that.keyPair);
        });
        return that.conf;
      });
    
      this.initWithDAL = () => co(function *() {
        yield that.plugFileSystem();
        yield that.loadConf();
        yield that.initDAL();
        return that;
      });
    
      this.submit = (obj, isInnerWrite, done) => {
        return co(function *() {
          if (!obj.documentType) {
            throw 'Document type not given';
          }
          try {
            const action = documentsMapping[obj.documentType].action;
            let res;
            if (typeof action == 'function') {
              // Handle the incoming object
              res = yield action(obj);
            } else {
              throw 'Unknown document type \'' + obj.documentType + '\'';
            }
            if (res) {
              // Only emit valid documents
              that.emit(obj.documentType, _.clone(res));
              that.streamPush(_.clone(res));
            }
            if (done) {
              isInnerWrite ? done(null, res) : done();
            }
            return res;
          } catch (err) {
            if (err && !err.uerr) {
              // Unhandled error, display it
              logger.debug('Document write error: ', err);
            }
            if (done) {
              isInnerWrite ? done(err, null) : done();
            } else {
              throw err;
            }
          }
        });
      };
    
      this.submitP = (obj, isInnerWrite) => Q.nbind(this.submit, this)(obj, isInnerWrite);
    
      this.initDAL = () => co(function*() {
        yield that.dal.init();
        // Maintenance
        let head_1 = yield that.dal.bindexDAL.head(1);
        if (head_1) {
          // Case 1: b_index < block
          yield that.dal.blockDAL.exec('DELETE FROM block WHERE NOT fork AND number > ' + head_1.number);
          // Case 2: b_index > block
          const current = yield that.dal.blockDAL.getCurrent();
          const nbBlocksToRevert = (head_1.number - current.number);
          for (let i = 0; i < nbBlocksToRevert; i++) {
            yield that.revert();
          }
        }
      });
    
      this.recomputeSelfPeer = () => that.PeeringService.generateSelfPeer(that.conf, 0);
    
      this.getCountOfSelfMadePoW = () => this.BlockchainService.getCountOfSelfMadePoW();
      this.isServerMember = () => this.BlockchainService.isMember();
    
      this.checkConfig = () => co(function*() {
        if (!that.conf.pair) {
          throw new Error('No keypair was given.');
        }
      });
    
      this.resetHome = () => co(function *() {
        const params = yield paramsP;
        const myFS = params.fs;
        const rootPath = params.home;
        const existsDir = yield myFS.exists(rootPath);
        if (existsDir) {
          yield myFS.removeTree(rootPath);
        }
      });
    
      this.resetAll = (done) => co(function*() {
        const files = ['stats', 'cores', 'current', directory.DUNITER_DB_NAME, directory.DUNITER_DB_NAME + '.db', directory.DUNITER_DB_NAME + '.log', directory.WOTB_FILE, 'export.zip', 'import.zip', 'conf'];
        const dirs  = ['blocks', 'blockchain', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
        return resetFiles(files, dirs, done);
      });
    
      this.resetData = (done) => co(function*(){
        const files = ['stats', 'cores', 'current', directory.DUNITER_DB_NAME, directory.DUNITER_DB_NAME + '.db', directory.DUNITER_DB_NAME + '.log', directory.WOTB_FILE];
        const dirs  = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
        yield resetFiles(files, dirs, done);
      });
    
      this.resetConf = (done) => {
        const files = ['conf'];
        const dirs  = [];
        return resetFiles(files, dirs, done);
      };
    
      this.resetStats = (done) => {
        const files = ['stats'];
        const dirs  = ['ud_history'];
        return resetFiles(files, dirs, done);
      };
    
      this.resetPeers = (done) => {
        return that.dal.resetPeers(done);
      };
    
      this.exportAllDataAsZIP = () => co(function *() {
        const params = yield paramsP;
        const rootPath = params.home;
        const myFS = params.fs;
        const archive = archiver('zip');
        if (yield myFS.exists(path.join(rootPath, 'indicators'))) {
          archive.directory(path.join(rootPath, 'indicators'), '/indicators', undefined, { name: 'indicators'});
        }
        const files = ['duniter.db', 'stats.json', 'wotb.bin'];
        for (const file of files) {
          if (yield myFS.exists(path.join(rootPath, file))) {
            archive.file(path.join(rootPath, file), { name: file });
          }
        }
        archive.finalize();
        return archive;
      });
    
      this.importAllDataFromZIP = (zipFile) => co(function *() {
        const params = yield paramsP;
        yield that.resetData();
        const output = unzip.Extract({ path: params.home });
        fs.createReadStream(zipFile).pipe(output);
        return new Promise((resolve, reject) => {
          output.on('error', reject);
          output.on('close', resolve);
        });
      });
    
      this.cleanDBData = () => co(function *() {
        yield that.dal.cleanCaches();
        that.dal.wotb.resetWoT();
        const files = ['stats', 'cores', 'current', directory.DUNITER_DB_NAME, directory.DUNITER_DB_NAME + '.db', directory.DUNITER_DB_NAME + '.log'];
        const dirs  = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
        return resetFiles(files, dirs);
      });
    
      function resetFiles(files, dirs, done) {
        return co(function *() {
          try {
            const params = yield paramsP;
            const myFS = params.fs;
            const rootPath = params.home;
            for (const fName of files) {
              // JSON file?
              const existsJSON = yield myFS.exists(rootPath + '/' + fName + '.json');
              if (existsJSON) {
                const theFilePath = rootPath + '/' + fName + '.json';
                yield myFS.remove(theFilePath);
                if (yield myFS.exists(theFilePath)) {
                  throw Error('Failed to delete file "' + theFilePath + '"');
                }
              } else {
                // Normal file?
                const normalFile = path.join(rootPath, fName);
                const existsFile = yield myFS.exists(normalFile);
                if (existsFile) {
                  yield myFS.remove(normalFile);
                  if (yield myFS.exists(normalFile)) {
                    throw Error('Failed to delete file "' + normalFile + '"');
                  }
                }
              }
            }
            for (const dirName of dirs) {
              const existsDir = yield myFS.exists(rootPath + '/' + dirName);
              if (existsDir) {
                yield myFS.removeTree(rootPath + '/' + dirName);
                if (yield myFS.exists(rootPath + '/' + dirName)) {
                  throw Error('Failed to delete folder "' + rootPath + '/' + dirName + '"');
                }
              }
            }
            done && done();
        } catch(e) {
              done && done(e);
              throw e;
          }
        });
      }
    
      this.disconnect = () => Promise.resolve(that.dal && that.dal.close());
    
      this.revert = () => this.BlockchainService.revertCurrentBlock();
    
      this.revertTo = (number) => co(function *() {
        const current = yield that.BlockchainService.current();
        for (let i = 0, count = current.number - number; i < count; i++) {
          yield that.BlockchainService.revertCurrentBlock();
        }
        if (current.number <= number) {
          logger.warn('Already reached');
        }
      });
    
      this.reapplyTo = (number) => co(function *() {
        const current = yield that.BlockchainService.current();
        if (current.number == number) {
          logger.warn('Already reached');
        } else {
          for (let i = 0, count = number - current.number; i < count; i++) {
            yield that.BlockchainService.applyNextAvailableFork();
          }
        }
      });
    
      this.singleWritePromise = (obj) => that.submit(obj);
    
      this.rawer = rawer;
    
      this.writeRaw = (raw, type) => co(function *() {
        const parser = documentsMapping[type] && documentsMapping[type].parser;
        const obj = parser.syncWrite(raw);
        return yield that.singleWritePromise(obj);
      });
    
      /*****************
       * DAEMONIZATION
       ****************/
    
      /**
       * Get the daemon handle. Eventually give arguments to launch a new daemon.
       * @param overrideCommand The new command to launch.
       * @param insteadOfCmd The current command to be replaced by `overrideCommand` command.
       * @returns {*} The daemon handle.
       */
      this.getDaemon = function getDaemon(overrideCommand, insteadOfCmd) {
        const mainModule = process.argv[1]
        const argv = getCommand(overrideCommand, insteadOfCmd)
        return daemonize.setup({
          main: mainModule,
          name: directory.INSTANCE_NAME,
          pidfile: path.join(directory.INSTANCE_HOME, "app.pid"),
          argv
        });
      }
    
      /**
       * Return current script full command arguments except the two firsts (which are node executable + js file).
       * If the two optional `cmd` and `insteadOfCmd` parameters are given, replace `insteadOfCmd`'s value by `cmd` in
       * the script arguments.
       *
       *   Ex:
       *     * process.argv: ['/usr/bin/node', '/opt/duniter/sources/bin/duniter', 'restart', '--mdb', 'g1']
       *
       *     Then `getCommand('direct_start', 'restart') will return:
       *
       *     * ['direct_start', '--mdb', 'g1']
       *
       *     This new array is what will be given to a *fork* of current script, resulting in a new process with:
       *
       *     * process.argv: ['/usr/bin/node', '/opt/duniter/sources/bin/duniter', 'direct_start', '--mdb', 'g1']
       *
       * @param cmd
       * @param insteadOfCmd
       * @returns {*}
       */
      function getCommand(cmd, insteadOfCmd) {
        if (insteadOfCmd) {
          // Return the same command args, except the command `insteadOfCmd` which is replaced by `cmd`
          return process.argv.slice(2).map((arg) => {
            if (arg == insteadOfCmd) {
              return cmd
            } else {
              return arg
            }
          })
        } else {
          // Return the exact same args (generally for stop/status commands)
          return process.argv.slice(2)
        }
      }
    
      /**
       * Retrieve the last linesQuantity lines from the log file.
       * @param linesQuantity
       */
      this.getLastLogLines = (linesQuantity) => this.dal.getLogContent(linesQuantity);
    
      /*****************
       * MODULES PLUGS
       ****************/
    
      /**
       * Default endpoint. To be overriden by a module to specify another endpoint value (for ex. BMA).
       */
      this.getMainEndpoint = () => Promise.resolve('DEFAULT_ENDPOINT')
    
      /**
       * Default WoT incoming data for new block. To be overriden by a module.
       */
      this.generatorGetJoinData = () => Promise.resolve({})
    
      /**
       * Default WoT incoming certifications for new block, filtering wrong certs. To be overriden by a module.
       */
      this.generatorComputeNewCerts = () => Promise.resolve({})
    
      /**
       * Default WoT transforming method for certs => links. To be overriden by a module.
       */
      this.generatorNewCertsToLinks = () => Promise.resolve({})
    }
    
    util.inherits(Server, stream.Duplex);
    
    module.exports = Server;