Skip to content
Snippets Groups Projects
Select Git revision
  • 23ca2c829635a9253f036b4ca8680aafbf4ffb74
  • dev default protected
  • release/1.9.1 protected
  • pini-1.8-docker
  • pini-sync-onlypeers
  • duniter-v2s-issue-123-industrialize-releases
  • feature/build-aarch64-nodejs16
  • release/1.8 protected
  • pini-docker
  • ci_tags
  • fix/1448/1.8/txs_not_stored
  • feature/node-20
  • fix/1441/node_summary_with_storage
  • fix/1442/improve_bma_tx_history
  • feature/wotwizard-1.8
  • release/1.9 protected
  • 1.7 protected
  • feature/docker-set-latest protected
  • feature/fast-docker-build-1.8.4
  • fast-docker-build protected
  • feature/dump-distance
  • v1.8.7 protected
  • v1.8.7-rc4 protected
  • v1.8.7-rc3 protected
  • v1.8.7-rc2 protected
  • v1.8.7-rc1 protected
  • v1.8.6 protected
  • v1.7.23 protected
  • v1.8.5 protected
  • v1.8.4 protected
  • v1.8.3 protected
  • v1.8.2 protected
  • v1.8.1 protected
  • v1.8.0 protected
  • v1.8.0-rc1 protected
  • v1.8.0-beta5 protected
  • v1.8.0-beta4 protected
  • v1.8.0-beta3 protected
  • v1.8.0-beta2 protected
  • v1.8.0-beta protected
  • v1.7.21 protected
41 results

index.ts

Blame
  • server.js 8.92 KiB
    var stream     = require('stream');
    var async      = require('async');
    var util       = require('util');
    var _          = require('underscore');
    var mongoose   = require('mongoose');
    var common     = require('./app/lib/common');
    var constants  = require('./app/lib/constants');
    var express    = require('express');
    var request    = require('request');
    var http       = require('http');
    var log4js     = require('log4js');
    
    var models = ['Identity', 'Certification', 'Configuration', 'Link', 'Merkle', 'Peer', 'Transaction', 'TxMemory', 'Membership', 'Block'];
    var INNER_WRITE = true;
    
    function Server (dbConf, overrideConf, interceptors, onInit) {
    
      stream.Duplex.call(this, { objectMode : true });
    
      var logger  = require('./app/lib/logger')(dbConf.name);
      var that = this;
      that.conn = null;
      that.conf = null;
    
      var initFunctions = [
        function (done) {
          that.connect(function (err) {
            that.emit('connected', err);
            done(err);
          });
        },
        function (done) {
          that.initServices(function (err) {
            that.emit('services', err);
            done(err);
          });
        }
      ];
    
      var todoOnInit = initFunctions.concat(onInit).concat([
      ]);
    
      this._write = function (obj, enc, writeDone, isInnerWrite) {
        that.submit(obj, isInnerWrite, function (err, res) {
          if (isInnerWrite) {
            writeDone(err, res);
          } else {
            writeDone();
          }
        });
      };
    
      this.submit = function (obj, isInnerWrite, done) {
        async.waterfall([
          function (next){
            var i = 0;
            var treatment = null;
            while (i < interceptors.length && !treatment) {
              if (interceptors[i].matches(obj)) {
                treatment = interceptors[i].treatment;
              }
              i++;
            }
            if (typeof treatment == 'function') {
              // Handle the incoming object
              treatment(that, obj, next);
            } else {
              var err = 'Unknown document type ' + JSON.stringify(obj);
              that.emit('error', Error(err));
              next(err);
            }
          },
        ], function (err, res) {
          if (err){
            switch (err) {
              case constants.ERROR.PUBKEY.ALREADY_UPDATED: err = 'Key already up-to-date'; break;
            }
            logger.debug(err);
          }
          if (res != null && res != undefined) {
            that.push(res);
          }
          if (isInnerWrite) {
            done(err, res);
          } else {
            done();
          }
        });
      }
    
      this.connect = function (reset, done) {
        var databaseName = dbConf.name || "ucoin_default";
        var host = dbConf.host || "localhost";
        var port = dbConf.port;
        if (arguments.length == 1) {
          done = reset;
          reset = dbConf.resetData;
        }
        // Init connection
        if (!that.conn) {
          // bad parameters
          if(!host && !port && !done){
            throw new Error('Bad parameters for database connection');
          }
          // host and port not provided
          if(!done && !port){
            done = host;
            host = 'localhost';
            port = undefined;
          }
          // port not provided
          if(!done && !port){
            done = port;
            port = undefined;
          }
          host = host ? host : 'localhost';
          // logger.debug('Connecting to database `%s`', databaseName);
          var conn = that.conn = mongoose.createConnection('mongodb://' + host + (port ? ':' + port : '') + '/' + databaseName);
          conn.on('error', function (err) {
            logger.error('connection error:', err);
          });
          async.waterfall([
            function (next){
              conn.once('open', next);
            },
            function (next){
              models.forEach(function (entity) {
                conn.model(entity, require(__dirname + '/app/models/' + entity.toLowerCase() + '.js'));
              });
              conn.model('Configuration').find(next);
            },
            function (foundConf, next){
              var Configuration = conn.model('Configuration');
              that.conf = foundConf[0] || new Configuration();
              if (overrideConf) {
                _(overrideConf).keys().forEach(function(k){
                  that.conf[k] = overrideConf[k];
                });
              }
              if (reset) {
                that.reset(next);
                return;
              }
              next();
            },
          ], done);
        }
        else {
          done();
        }
      };
    
      this.start = function (doListenHTTP, done) {
        if (arguments.length == 1) {
          done = doListenHTTP;
          doListenHTTP = true;
        }
        async.waterfall([
          function (next){
            that._start(next);
          },
          function (next) {
            if (!doListenHTTP) {
              next();
              return;
            }
            listenBMA(function (err, app) {
              that.emit('BMALoaded', err, app);
              next();
            });
          }
        ], done);
      };
    
      this._start = function (done) {
        // Method to override
        done();
      };
    
      this.reset = function(done) {
        async.waterfall([
          function (next){
            that.connect(next);
          },
          function (next){
            var deletableCollections = [
              'identities',
              'certifications',
              'blocks',
              'links',
              'merkles',
              'peers',
              'transactions',
              'txmemories',
              'memberships'];
            async.forEachSeries(deletableCollections, function(collection, next){
              if (that.conn.collections[collection]) {
                that.conn.collections[collection].drop(function (err) {
                  next();
                });
              } else {
                next();
              }
            }, function (err) {
              next(err);
            });
          }
        ], done);
      };
    
      this.resetConf = function(done) {
        async.waterfall([
          function (next){
            that.connect(next);
          },
          function (next){
            that.conn.model('Configuration').remove({}, function (err) {
              next(err);
            });
          },
        ], done);
      };
    
      this.disconnect = function(done) {
        that.conn.close(function (err) {
          if(err)
            logger.error(err);
          if (typeof done == 'function')
            done(err);
        });
      };
    
      this.initServices = function(done) {
        if (!that.servicesInited) {
          that.servicesInited = true;
          that.HTTPService      = require("./app/service/HTTPService");
          that.MerkleService    = require("./app/service/MerkleService");
          that.ParametersService = require("./app/service/ParametersService").get(that.conn, that.conf.currency);
          that._initServices(that.conn, done);
        } else {
          done();
        }
      };
    
      this._initServices = function(conn) {
        // To override in child classes
      };
    
      function listenBMA (overConf, onLoaded) {
        if (arguments.length == 1) {
          onLoaded = overConf;
          overConf = undefined;
        }
        var app = express();
        var conf = _.extend(that.conf, overConf || {});
        var port = process.env.PORT || conf.port;
        var currency = conf.currency;
    
        // all environments
        app.set('port', port);
        app.use(log4js.connectLogger(logger, { level: 'auto', format: '\x1b[90m:remote-addr - :method :url HTTP/:http-version :status :res[content-length] - :response-time ms\x1b[0m' }));
        app.use(express.urlencoded());
        app.use(express.json());
        async.waterfall([
          function (next){
    
            // Routing
            app.use(app.router);
    
            // development only
            if ('development' == app.get('env')) {
              app.use(express.errorHandler());
            }
            next();
          },
          function (next) {
            // Listen to interfaces
            that._listenBMA(app);
            next();
          },
          function (next) {
            if(conf.ipv4){
              logger.info('Connecting on interface %s:%s...', conf.ipv4, conf.port);
              http.createServer(app).listen(conf.port, conf.ipv4, function(){
                logger.info('uCoin server listening on ' + conf.ipv4 + ' port ' + conf.port);
                next();
              });
            }
            else next();
          },
          function (next) {
            if(conf.ipv6){
              logger.info('Connecting on interface %s:%s...', conf.ipv6, conf.port);
              http.createServer(app).listen(conf.port, conf.ipv6, function(){
                logger.info('uCoin server listening on ' + conf.ipv6 + ' port ' + conf.port);
                next();
              });
            }
            else next();
          },
        ], function (err) {
          if (typeof onLoaded == 'function')
            onLoaded.call(that, err, app);
        });
      };
    
      this._listenBMA = function (app) {
      };
    
      this.singleWriteStream = function (onError) {
        return new TempStream(that, onError);
      };
    
      function TempStream (parentStream, onError) {
    
        stream.Duplex.call(this, { objectMode : true });
    
        var self = this;
        this._write = function (obj, enc, done) {
          parentStream._write(obj, enc, function (err, res) {
            if (err && typeof onError == 'function') onError(err);
            if (res) self.push(res);
            self.push(null);
            done();
          }, INNER_WRITE);
        };
        this._read = function () {
        }
      }
    
      util.inherits(TempStream, stream.Duplex);
    
      // Launches the server
      async.forEachSeries(todoOnInit, function(f, cb){
        f(cb);
      });
    }
    
    util.inherits(Server, stream.Duplex);
    
    module.exports = Server;