Skip to content
Snippets Groups Projects
Select Git revision
  • release/1.9.1 protected
  • pini-1.8-docker
  • pini-sync-onlypeers
  • duniter-v2s-issue-123-industrialize-releases
  • feature/build-aarch64-nodejs16
  • release/1.8 protected
  • pini-docker
  • ci_tags
  • fix/1448/1.8/txs_not_stored
  • dev default protected
  • feature/node-20
  • fix/1441/node_summary_with_storage
  • fix/1442/improve_bma_tx_history
  • feature/wotwizard-1.8
  • release/1.9 protected
  • 1.7 protected
  • feature/docker-set-latest protected
  • feature/fast-docker-build-1.8.4
  • fast-docker-build protected
  • feature/dump-distance
  • v1.8.7 protected
  • v1.8.7-rc4 protected
  • v1.8.7-rc3 protected
  • v1.8.7-rc2 protected
  • v1.8.7-rc1 protected
  • v1.8.6 protected
  • v1.7.23 protected
  • v1.8.5 protected
  • v1.8.4 protected
  • v1.8.3 protected
  • v1.8.2 protected
  • v1.8.1 protected
  • v1.8.0 protected
  • v1.8.0-rc1 protected
  • v1.8.0-beta5 protected
  • v1.8.0-beta4 protected
  • v1.8.0-beta3 protected
  • v1.8.0-beta2 protected
  • v1.8.0-beta protected
  • v1.7.21 protected
40 results

server.js

Blame
  • server.js 19.56 KiB
    "use strict";
    const stream      = require('stream');
    const util        = require('util');
    const path        = require('path');
    const co          = require('co');
    const _           = require('underscore');
    const Q           = require('q');
    const archiver    = require('archiver');
    const unzip       = require('unzip2');
    const fs          = require('fs');
    const parsers     = require('./app/lib/streams/parsers');
    const constants   = require('./app/lib/constants');
    const fileDAL     = require('./app/lib/dal/fileDAL');
    const jsonpckg    = require('./package.json');
    const router      = require('./app/lib/streams/router');
    const base58      = require('./app/lib/crypto/base58');
    const keyring      = require('./app/lib/crypto/keyring');
    const directory   = require('./app/lib/system/directory');
    const dos2unix    = require('./app/lib/system/dos2unix');
    const Synchroniser = require('./app/lib/sync');
    const multicaster = require('./app/lib/streams/multicaster');
    const upnp        = require('./app/lib/system/upnp');
    const rawer       = require('./app/lib/ucp/rawer');
    const permanentProver = require('./app/lib/computation/permanentProver');
    
    function Server (dbConf, overrideConf) {
    
      stream.Duplex.call(this, { objectMode: true });
    
      const home = directory.getHome(dbConf.name, dbConf.home);
      const paramsP = directory.getHomeParams(dbConf && dbConf.memory, home);
      const logger = require('./app/lib/logger')('server');
      const permaProver = this.permaProver = permanentProver(this);
      const that = this;
      that.home = home;
      that.conf = null;
      that.dal = null;
      that.version = jsonpckg.version;
      that.logger = logger;
    
      // External libs
      that.lib = {};
      that.lib.keyring = require('./app/lib/crypto/keyring');
      that.lib.Identity = require('./app/lib/entity/identity');
      that.lib.rawer = require('./app/lib/ucp/rawer');
      that.lib.http2raw = require('./app/lib/helpers/http2raw');
      that.lib.dos2unix = require('./app/lib/system/dos2unix');
      that.lib.contacter = require('./app/lib/contacter');
      that.lib.bma = require('./app/lib/streams/bma');
      that.lib.network = require('./app/lib/system/network');
      that.lib.constants = require('./app/lib/constants');
      that.lib.ucp = require('./app/lib/ucp/buid');
    
      that.MerkleService       = require("./app/lib/helpers/merkle");
      that.ParametersService   = require("./app/lib/helpers/parameters")();
      that.IdentityService     = require('./app/service/IdentityService')();
      that.MembershipService   = require('./app/service/MembershipService')();
      that.PeeringService      = require('./app/service/PeeringService')(that);
      that.BlockchainService   = require('./app/service/BlockchainService')(that);
      that.TransactionsService = require('./app/service/TransactionsService')();
    
      // Create document mapping
      const documentsMapping = {
        'identity':      { action: that.IdentityService.submitIdentity,                                               parser: parsers.parseIdentity },
        'certification': { action: that.IdentityService.submitCertification,                                          parser: parsers.parseCertification},
        'revocation':    { action: that.IdentityService.submitRevocation,                                             parser: parsers.parseRevocation },
        'membership':    { action: that.MembershipService.submitMembership,                                           parser: parsers.parseMembership },
        'peer':          { action: that.PeeringService.submitP,                                                       parser: parsers.parsePeer },
        'transaction':   { action: that.TransactionsService.processTx,                                                parser: parsers.parseTransaction },
        'block':         { action: _.partial(that.BlockchainService.submitBlock, _, true, constants.NO_FORK_ALLOWED), parser: parsers.parseBlock }
      };
    
      // Unused, but made mandatory by Duplex interface
      this._read = () => null;
    
      this._write = (obj, enc, writeDone) => that.submit(obj, false, () => writeDone);
    
      /**
       * Facade method to control what is pushed to the stream (we don't want it to be closed)
       * @param obj An object to be pushed to the stream.
       */
      this.streamPush = (obj) => {
        if (obj) {
          that.push(obj);
        }
      };
    
      this.getBcContext = () => this.BlockchainService.getContext();
    
      this.plugFileSystem = () => co(function *() {
        logger.debug('Plugging file system...');
        const params = yield paramsP;
        that.dal = fileDAL(params);
      });
    
      this.unplugFileSystem = () => co(function *() {
        logger.debug('Unplugging file system...');
        yield that.dal.close();
      });
    
      this.loadConf = (useDefaultConf) => co(function *() {
        logger.debug('Loading conf...');
        that.conf = yield that.dal.loadConf(overrideConf, useDefaultConf);
        // Default values
        const defaultValues = {
          remoteipv6:         that.conf.ipv6,
          remoteport:         that.conf.port,
          cpu:                constants.DEFAULT_CPU,
          c:                  constants.CONTRACT.DEFAULT.C,
          dt:                 constants.CONTRACT.DEFAULT.DT,
          ud0:                constants.CONTRACT.DEFAULT.UD0,
          stepMax:            constants.CONTRACT.DEFAULT.STEPMAX,
          sigPeriod:          constants.CONTRACT.DEFAULT.SIGPERIOD,
          sigStock:           constants.CONTRACT.DEFAULT.SIGSTOCK,
          sigWindow:          constants.CONTRACT.DEFAULT.SIGWINDOW,
          sigValidity:        constants.CONTRACT.DEFAULT.SIGVALIDITY,
          msValidity:         constants.CONTRACT.DEFAULT.MSVALIDITY,
          sigQty:             constants.CONTRACT.DEFAULT.SIGQTY,
          idtyWindow:         constants.CONTRACT.DEFAULT.IDTYWINDOW,
          msWindow:           constants.CONTRACT.DEFAULT.MSWINDOW,
          xpercent:           constants.CONTRACT.DEFAULT.X_PERCENT,
          percentRot:         constants.CONTRACT.DEFAULT.PERCENTROT,
          powDelay:           constants.CONTRACT.DEFAULT.POWDELAY,
          avgGenTime:         constants.CONTRACT.DEFAULT.AVGGENTIME,
          dtDiffEval:         constants.CONTRACT.DEFAULT.DTDIFFEVAL,
          medianTimeBlocks:   constants.CONTRACT.DEFAULT.MEDIANTIMEBLOCKS,
          rootoffset:         0,
          forksize:           constants.BRANCHES.DEFAULT_WINDOW_SIZE
        };
        _.keys(defaultValues).forEach(function(key){
          if (that.conf[key] == undefined) {
            that.conf[key] = defaultValues[key];
          }
        });
        logger.debug('Loading crypto functions...');
        // Extract key pair
        let keyPair = null;
        const keypairOverriden = overrideConf && (overrideConf.salt || overrideConf.passwd);
        if (!keypairOverriden && that.conf.pair) {
          keyPair = keyring.Key(that.conf.pair.pub, that.conf.pair.sec);
        }
        else if (that.conf.passwd || that.conf.salt) {
          keyPair = yield keyring.scryptKeyPair(that.conf.salt, that.conf.passwd);
        }
        if (keyPair) {
          that.keyPair = keyPair;
          that.sign = keyPair.sign;
          // Update services
          [that.IdentityService, that.MembershipService, that.PeeringService, that.BlockchainService, that.TransactionsService].map((service) => {
            service.setConfDAL(that.conf, that.dal, that.keyPair);
          });
          that.router().setConfDAL(that.conf, that.dal);
        }
        return that.conf;
      });
    
      this.initWithDAL = () => co(function *() {
        yield that.plugFileSystem();
        yield that.loadConf();
        yield that.initDAL();
        return that;
      });
    
      this.submit = (obj, isInnerWrite, done) => {
        return co(function *() {
          if (!obj.documentType) {
            throw 'Document type not given';
          }
          try {
            const action = documentsMapping[obj.documentType].action;
            let res;
            if (typeof action == 'function') {
              // Handle the incoming object
              res = yield action(obj);
            } else {
              throw 'Unknown document type \'' + obj.documentType + '\'';
            }
            if (res) {
              // Only emit valid documents
              that.emit(obj.documentType, _.clone(res));
              that.streamPush(_.clone(res));
            }
            if (done) {
              isInnerWrite ? done(null, res) : done();
            }
            return res;
          } catch (err) {
            if (err && !err.uerr) {
              // Unhandled error, display it
              logger.debug('Document write error: ', err);
            }
            if (done) {
              isInnerWrite ? done(err, null) : done();
            } else {
              throw err;
            }
          }
        });
      };
    
      this.submitP = (obj, isInnerWrite) => Q.nbind(this.submit, this)(obj, isInnerWrite);
    
      this.initDAL = () => this.dal.init();
    
      this.start = () => co(function*(){
        yield that.checkConfig();
        // Add signing & public key functions to PeeringService
        logger.info('Node version: ' + that.version);
        logger.info('Node pubkey: ' + that.PeeringService.pubkey);
        return that.initPeer();
      });
    
      this.recomputeSelfPeer = () => that.PeeringService.generateSelfPeer(that.conf, 0);
    
      this.initPeer = () => co(function*(){
          yield that.checkConfig();
          yield Q.nbind(that.PeeringService.regularCrawlPeers, that.PeeringService);
          logger.info('Storing self peer...');
          yield that.PeeringService.regularPeerSignal();
          yield Q.nbind(that.PeeringService.regularTestPeers, that.PeeringService);
          yield Q.nbind(that.PeeringService.regularSyncBlock, that.PeeringService);
      });
    
      this.stopBlockComputation = () => permaProver.stopEveryting();
      
      this.getCountOfSelfMadePoW = () => this.BlockchainService.getCountOfSelfMadePoW();
      this.isServerMember = () => this.BlockchainService.isMember();
    
      this.isPoWWaiting = () => permaProver.isPoWWaiting();
    
      this.startBlockComputation = () => permaProver.allowedToStart();
    
      permaProver.onBlockComputed((block) => co(function*() {
        try {
          const obj = parsers.parseBlock.syncWrite(dos2unix(block.getRawSigned()));
          yield that.singleWritePromise(obj);
        } catch (err) {
          logger.warn('Proof-of-work self-submission: %s', err.message || err);
        }
      }));
    
      this.checkConfig = () => {
        return that.checkPeeringConf(that.conf);
      };
    
      this.checkPeeringConf = (conf) => co(function*() {
          if (!conf.pair && conf.passwd == null) {
            throw new Error('No key password was given.');
          }
          if (!conf.pair && conf.salt == null) {
            throw new Error('No key salt was given.');
          }
          if(!conf.ipv4 && !conf.ipv6){
            throw new Error("No interface to listen to.");
          }
          if(!conf.remoteipv4 && !conf.remoteipv6 && !conf.remotehost){
            throw new Error('No interface for remote contact.');
          }
          if (!conf.remoteport) {
            throw new Error('No port for remote contact.');
          }
      });
    
      this.resetHome = () => co(function *() {
        const params = yield paramsP;
        const myFS = params.fs;
        const rootPath = params.home;
        const existsDir = yield myFS.exists(rootPath);
        if (existsDir) {
          yield myFS.removeTree(rootPath);
        }
      });
    
      this.resetAll = (done) => co(function*() {
        const files = ['stats', 'cores', 'current', directory.DUNITER_DB_NAME, directory.DUNITER_DB_NAME + '.db', directory.DUNITER_DB_NAME + '.log', directory.WOTB_FILE, 'export.zip', 'import.zip', 'conf'];
        const dirs  = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
        return resetFiles(files, dirs, done);
      });
    
      this.resetData = (done) => co(function*(){
        const files = ['stats', 'cores', 'current', directory.DUNITER_DB_NAME, directory.DUNITER_DB_NAME + '.db', directory.DUNITER_DB_NAME + '.log', directory.WOTB_FILE];
        const dirs  = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
        yield resetFiles(files, dirs, done);
      });
    
      this.resetConf = (done) => {
        const files = ['conf'];
        const dirs  = [];
        return resetFiles(files, dirs, done);
      };
    
      this.resetStats = (done) => {
        const files = ['stats'];
        const dirs  = ['ud_history'];
        return resetFiles(files, dirs, done);
      };
    
      this.resetPeers = (done) => {
        return that.dal.resetPeers(done);
      };
    
      this.exportAllDataAsZIP = () => co(function *() {
        const params = yield paramsP;
        const rootPath = params.home;
        const myFS = params.fs;
        const archive = archiver('zip');
        if (yield myFS.exists(path.join(rootPath, 'indicators'))) {
          archive.directory(path.join(rootPath, 'indicators'), '/indicators', undefined, { name: 'indicators'});
        }
        const files = ['duniter.db', 'stats.json', 'wotb.bin'];
        for (const file of files) {
          if (yield myFS.exists(path.join(rootPath, file))) {
            archive.file(path.join(rootPath, file), { name: file });
          }
        }
        archive.finalize();
        return archive;
      });
    
      this.importAllDataFromZIP = (zipFile) => co(function *() {
        const params = yield paramsP;
        yield that.resetData();
        const output = unzip.Extract({ path: params.home });
        fs.createReadStream(zipFile).pipe(output);
        return new Promise((resolve, reject) => {
          output.on('error', reject);
          output.on('close', resolve);
        });
      });
    
      this.cleanDBData = () => co(function *() {
        yield that.dal.cleanCaches();
        that.dal.wotb.resetWoT();
        const files = ['stats', 'cores', 'current', directory.DUNITER_DB_NAME, directory.DUNITER_DB_NAME + '.db', directory.DUNITER_DB_NAME + '.log'];
        const dirs  = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
        return resetFiles(files, dirs);
      });
    
      function resetFiles(files, dirs, done) {
        return co(function *() {
          try {
            const params = yield paramsP;
            const myFS = params.fs;
            const rootPath = params.home;
            for (const fName of files) {
              // JSON file?
              const existsJSON = yield myFS.exists(rootPath + '/' + fName + '.json');
              if (existsJSON) {
                const theFilePath = rootPath + '/' + fName + '.json';
                yield myFS.remove(theFilePath);
                if (yield myFS.exists(theFilePath)) {
                  throw Error('Failed to delete file "' + theFilePath + '"');
                }
              } else {
                // Normal file?
                const normalFile = path.join(rootPath, fName);
                const existsFile = yield myFS.exists(normalFile);
                if (existsFile) {
                  yield myFS.remove(normalFile);
                  if (yield myFS.exists(normalFile)) {
                    throw Error('Failed to delete file "' + normalFile + '"');
                  }
                }
              }
            }
            for (const dirName of dirs) {
              const existsDir = yield myFS.exists(rootPath + '/' + dirName);
              if (existsDir) {
                yield myFS.removeTree(rootPath + '/' + dirName);
                if (yield myFS.exists(rootPath + '/' + dirName)) {
                  throw Error('Failed to delete folder "' + rootPath + '/' + dirName + '"');
                }
              }
            }
            done && done();
        } catch(e) {
              done && done(e);
              throw e;
          }
        });
      }
    
      this.disconnect = () => Promise.resolve(that.dal && that.dal.close());
    
      this.pullBlocks = that.PeeringService.pullBlocks;
    
      // Unit Tests or Preview method
      this.doMakeNextBlock = (manualValues) => that.BlockchainService.makeNextBlock(null, null, manualValues);
    
      this.doCheckBlock = (block) => {
        const parsed = parsers.parseBlock.syncWrite(block.getRawSigned());
        return that.BlockchainService.checkBlock(parsed, false);
      };
    
      this.revert = () => this.BlockchainService.revertCurrentBlock();
    
      this.revertTo = (number) => co(function *() {
        const current = yield that.BlockchainService.current();
        for (let i = 0, count = current.number - number; i < count; i++) {
          yield that.BlockchainService.revertCurrentBlock();
        }
        if (current.number <= number) {
          logger.warn('Already reached');
        }
      });
    
      this.reapplyTo = (number) => co(function *() {
        const current = yield that.BlockchainService.current();
        if (current.number == number) {
          logger.warn('Already reached');
        } else {
          for (let i = 0, count = number - current.number; i < count; i++) {
            yield that.BlockchainService.applyNextAvailableFork();
          }
        }
      });
    
      this.singleWritePromise = (obj) => that.submit(obj);
    
      let theRouter;
    
      this.router = (active) => {
        if (!theRouter) {
          theRouter = router(that.PeeringService, that.conf, that.dal);
        }
        theRouter.setActive(active !== false);
        return theRouter;
      };
    
      /**
       * Synchronize the server with another server.
       *
       * If local server's blockchain is empty, process a fast sync: **no block is verified in such a case**, unless
       * you force value `askedCautious` to true.
       *
       * @param onHost Syncs on given host.
       * @param onPort Syncs on given port.
       * @param upTo Sync up to this number, if `upTo` value is a positive integer.
       * @param chunkLength Length of each chunk of blocks to download. Kind of buffer size.
       * @param interactive Tell if the loading bars should be used for console output.
       * @param askedCautious If true, force the verification of each downloaded block. This is the right way to have a valid blockchain for sure.
       * @param nopeers If true, sync will omit to retrieve peer documents.
       * @param noShufflePeers If true, sync will NOT shuffle the retrieved peers before downloading on them.
       */
      this.synchronize = (onHost, onPort, upTo, chunkLength, interactive, askedCautious, nopeers, noShufflePeers) => {
        const remote = new Synchroniser(that, onHost, onPort, that.conf, interactive === true);
        const syncPromise = remote.sync(upTo, chunkLength, askedCautious, nopeers, noShufflePeers === true);
        return {
          flow: remote,
          syncPromise: syncPromise
        };
      };
      
      this.testForSync = (onHost, onPort) => {
        const remote = new Synchroniser(that, onHost, onPort);
        return remote.test();
      };
    
      /**
       * Enable routing features:
       *   - The server will try to send documents to the network
       *   - The server will eventually be notified of network failures
       */
      this.routing = () => {
        // The router asks for multicasting of documents
        this.pipe(this.router())
          // The documents get sent to peers
          .pipe(multicaster(this.conf))
          // The multicaster may answer 'unreachable peer'
          .pipe(this.router());
      };
    
      this.upnp = () => co(function *() {
        const upnpAPI = yield upnp(that.conf.port, that.conf.remoteport);
        that.upnpAPI = upnpAPI;
        return upnpAPI;
      });
    
      this.applyCPU = (cpu) => that.BlockchainService.changeProverCPUSetting(cpu);
      
      this.rawer = rawer;
    
      this.writeRaw = (raw, type) => co(function *() {
        const parser = documentsMapping[type] && documentsMapping[type].parser;
        const obj = parser.syncWrite(raw);
        return yield that.singleWritePromise(obj);
      });
    
      /**
       * Retrieve the last linesQuantity lines from the log file.
       * @param linesQuantity
       */
      this.getLastLogLines = (linesQuantity) => this.dal.getLogContent(linesQuantity);
    
      this.startServices = () => co(function*(){
    
        /***************
         * HTTP ROUTING
         **************/
        that.router(that.conf.routing);
    
        /***************
         *    UPnP
         **************/
        if (that.conf.upnp) {
          try {
            if (that.upnpAPI) {
              that.upnpAPI.stopRegular();
            }
            yield that.upnp();
            that.upnpAPI.startRegular();
          } catch (e) {
            logger.warn(e);
          }
        }
    
        /*******************
         * BLOCK COMPUTING
         ******************/
        if (that.conf.participate) {
          that.startBlockComputation();
        }
    
        /***********************
         * CRYPTO NETWORK LAYER
         **********************/
        yield that.start();
      });
    
      this.stopServices = () => co(function*(){
        that.router(false);
        if (that.conf.participate) {
          that.stopBlockComputation();
        }
        return that.PeeringService.stopRegular();
      });
    }
    
    util.inherits(Server, stream.Duplex);
    
    module.exports = Server;