Commit 9fba84a7 authored by Cédric Moreau's avatar Cédric Moreau

Changing server instanciation, now emitting events on init

parent ea1bad93
var async = require('async');
var sha1 = require('sha1');
var util = require('util');
var stream = require('stream');
module.exports = function (server) {
return new Router(server);
};
function Router (server) {
stream.Writable.call(this, { objectMode: true });
var that = this;
this._write = function (obj, enc, done) {
if (typeof obj.email != undefined) {
getRandomInAllPeers(function (err, peers) {
that.emit('pubkey', pubkey, peers || []);
});
}
};
function getRandomInAllPeers (done) {
Peer.getRandomlyWithout([server.PeeringService.cert.fingerprint], done);
};
};
util.inherits(Router, stream.Writable);
......@@ -89,9 +89,6 @@ module.exports = function Synchroniser (server, host, port, authenticated, conf)
function (obj, next) {
obj.pubkey = { fingerprint: obj.fingerprint };
peer = obj;
server.initServer(next);
},
function (next){
// Temporarily manage ALL keys for sync
server.conf.kmanagement = "ALL";
KeyService = server.KeyService;
......
......@@ -31,6 +31,14 @@ var ConfigurationSchema = new Schema({
}}
});
ConfigurationSchema.virtual('createNext').get(function () {
return this._createNext;
});
ConfigurationSchema.virtual('createNext').set(function (create) {
this._createNext = create;
});
// Automatic Monetary Contract default parameters:
// - Voting start: None (must be given)
// - Voting frequency: 1 day
......
......@@ -32,14 +32,13 @@ PublicKeySchema.pre('save', function (next) {
PublicKeySchema.methods = {
json: function () {
var raw = this.raw.replace('-----BEGIN PGP PUBLIC KEY BLOCK-----', 'BEGIN PGP PUBLIC KEY BLOCK');
raw = raw.replace('-----END PGP PUBLIC KEY BLOCK-----', 'END PGP PUBLIC KEY BLOCK');
return {
"email": this.email,
"name": this.name,
"hash": this.hash,
"fingerprint": this.fingerprint,
"comment": this.comment,
"raw": raw
"raw": this.raw
};
},
......
......@@ -17,6 +17,10 @@ function keys (val) {
return val.split(',');
}
// Constants
var LISTEN_HTTP = true;
var DO_NOT_LISTEN_HTTP = false;
program
.version('0.5.6')
.usage('<command> [options]')
......@@ -51,7 +55,7 @@ program
program
.command('wizard [step]')
.description('Launch the configuration Wizard')
.action(connect(function (step, server, conf) {
.action(connect(DO_NOT_LISTEN_HTTP, ucoin.createRegistryServer, function (step, server, conf) {
var wiz = wizard(server);
var task = {
'currency': wiz.configCurrency,
......@@ -112,13 +116,10 @@ program
program
.command('allow-key [key]')
.description('Add given key to authorized keys of this node')
.action(connect(ucoin.createHDCServer, function (key, server, conf) {
.action(connect(DO_NOT_LISTEN_HTTP, ucoin.createHDCServer, function (key, server, conf) {
key = key || "";
key = key.toUpperCase();
async.waterfall([
function (next){
server.initServer(next);
},
function (next) {
if (!key.isSha1()) {
next("Key must match a SHA-1 hash");
......@@ -145,14 +146,14 @@ program
program
.command('manage-key [key]')
.description('Add given key to stack of managed keys of this node')
.action(connect(ucoin.createHDCServer, function (key, server, conf) {
.action(connect(DO_NOT_LISTEN_HTTP, ucoin.createHDCServer, function (key, server, conf) {
handleKey(server, key, true, 'Key %s is now managed');
}));
program
.command('forget-key [key]')
.description('Remove given key of the managed keys\' stack of this node')
.action(connect(ucoin.createHDCServer, function (key, server, conf) {
.action(connect(DO_NOT_LISTEN_HTTP, ucoin.createHDCServer, function (key, server, conf) {
handleKey(server, key, false, 'Key %s no more managed from now');
}));
......@@ -196,7 +197,7 @@ program
program
.command('config')
.description('Register configuration in database')
.action(connect(function (server, conf) {
.action(connect(false, function (server, conf) {
conf.save(function (err) {
if(err){
logger.error("Configuration could not be saved: " + err);
......@@ -244,10 +245,10 @@ program
program
.command('start')
.description('Start uCoin server using given --currency')
.action(connect(function (server, conf) {
.action(connect(LISTEN_HTTP, ucoin.createRegistryServer, function (server, conf) {
// Launching server
server.listenBMA(function (err, app) {
server.on('BMALoaded', function (err, app) {
if(err){
console.error(err);
this.disconnect();
......@@ -258,15 +259,10 @@ program
});
}));
var config = {};
function overrideConf(conn, conf) {
function overrideConf(conf) {
// Ensure is not null and have good structure
var Configuration = conn.model('Configuration');
conf = conf || new Configuration();
cli = {
conf.sync = conf.sync || {};
var cli = {
currency: program.currency,
server: {
port: program.port,
......@@ -307,52 +303,55 @@ function overrideConf(conn, conf) {
}
};
// Update conf
if(cli.server.pgp.key) cli.server.pgp.key = fs.readFileSync(cli.server.pgp.key, 'utf8');
conf.currency = cli.currency || conf.currency;
conf.openpgpjs = cli.server.openpgpjs != undefined ? cli.server.openpgpjs : conf.openpgpjs;
conf.ipv4 = cli.server.ipv4address || conf.ipv4;
conf.ipv6 = cli.server.ipv6address || conf.ipv6;
conf.port = cli.server.port || conf.port;
conf.pgpkey = cli.server.pgp.key || conf.pgpkey;
conf.pgppasswd = cli.server.pgp.password != undefined ? cli.server.pgp.password : conf.pgppasswd;
conf.remotehost = cli.server.remote.host != undefined ? cli.server.remote.host : conf.remotehost;
conf.remoteipv4 = cli.server.remote.ipv4 != undefined ? cli.server.remote.ipv4 : conf.remoteipv4;
conf.remoteipv6 = cli.server.remote.ipv6 != undefined ? cli.server.remote.ipv6 : conf.remoteipv6;
conf.remoteport = cli.server.remote.port != undefined ? cli.server.remote.port : conf.remoteport;
conf.kmanagement = cli.policy.keys || conf.kmanagement;
conf.kaccept = cli.policy.pubkeys || conf.kaccept;
conf.sync = {
AMDaemon: cli.sync.AMDaemon || conf.sync.AMDaemon,
AMStart: cli.sync.AMStart || conf.sync.AMStart,
AMFreq: cli.sync.AMFreq || conf.sync.AMFreq,
UDFreq: cli.sync.UDFreq || conf.sync.UDFreq,
UD0: cli.sync.UD0 || conf.sync.UD0,
UDPercent: cli.sync.UDPercent || conf.sync.UDPercent,
Consensus: cli.sync.Consensus || conf.sync.Consensus,
MSExpires: cli.sync.MSExpires || conf.sync.MSExpires,
VTExpires: cli.sync.VTExpires || conf.sync.VTExpires,
Algorithm: cli.sync.Algorithm || conf.sync.Algorithm
};
// Update conf
if (cli.currency) conf.currency = cli.currency;
if (cli.server.ipv4address) conf.ipv4 = cli.server.ipv4address;
if (cli.server.ipv6address) conf.ipv6 = cli.server.ipv6address;
if (cli.server.port) conf.port = cli.server.port;
if (cli.server.openpgpjs != undefined) conf.openpgpjs = cli.server.openpgpjs;
if (cli.server.pgp.key) conf.pgpkey = cli.server.pgp.key;
if (cli.server.pgp.password != undefined) conf.pgppasswd = cli.server.pgp.password;
if (cli.server.remote.host != undefined) conf.remotehost = cli.server.remote.host;
if (cli.server.remote.ipv4 != undefined) conf.remoteipv4 = cli.server.remote.ipv4;
if (cli.server.remote.ipv6 != undefined) conf.remoteipv6 = cli.server.remote.ipv6;
if (cli.server.remote.port != undefined) conf.remoteport = cli.server.remote.port;
if (cli.policy.keys) conf.kmanagement = cli.policy.keys;
if (cli.policy.pubkeys) conf.kaccept = cli.policy.pubkeys;
if (cli.sync.AMDaemon) conf.sync.AMDaemon = cli.sync.AMDaemon;
if (cli.sync.AMStart) conf.sync.AMStart = cli.sync.AMStart;
if (cli.sync.AMFreq) conf.sync.AMFreq = cli.sync.AMFreq;
if (cli.sync.UDFreq) conf.sync.UDFreq = cli.sync.UDFreq;
if (cli.sync.UD0) conf.sync.UD0 = cli.sync.UD0;
if (cli.sync.UDPercent) conf.sync.UDPercent = cli.sync.UDPercent;
if (cli.sync.Consensus) conf.sync.Consensus = cli.sync.Consensus;
if (cli.sync.MSExpires) conf.sync.MSExpires = cli.sync.MSExpires;
if (cli.sync.VTExpires) conf.sync.VTExpires = cli.sync.VTExpires;
if (cli.sync.Algorithm) conf.sync.Algorithm = cli.sync.Algorithm;
// Specific internal settings
conf.createNext = true;
return conf;
}
function connect(serverFactory, callback) {
function connect(listenHTTP, serverFactory, callback) {
if (arguments.length == 1) {
callback = listenHTTP;
listenHTTP = false;
serverFactory = ucoin.createHDCServer;
} else if (arguments.length == 2) {
callback = serverFactory;
serverFactory = ucoin.createRegistryServer;
serverFactory = ucoin.createHDCServer;
}
return function () {
var cbArgs = arguments;
var dbName = program.mdb || "ucoin_default";
var server = serverFactory({ name: dbName, host: program.mhost, port: program.mport });
var server = serverFactory({ name: dbName, host: program.mhost, port: program.mport, listenBMA: listenHTTP }, overrideConf({}));
// Connecting to DB
server.connect(function (err) {
server.on('services', function (err) {
if(err){
logger.warn(err);
......@@ -361,8 +360,6 @@ function connect(serverFactory, callback) {
return;
}
server.conf = overrideConf(server.conn, server.conf);
cbArgs.length--;
cbArgs[cbArgs.length++] = server;
cbArgs[cbArgs.length++] = server.conf;
......
var async = require('async');
var util = require('util');
var logger = require('./app/lib/logger')('hdcserver');
var Server = require('./server');
var async = require('async');
var util = require('util');
var parsers = require('./app/lib/streams/parsers/doc');
var Server = require('./server');
function HDCServer (dbConf, overrideConf, interceptors) {
function HDCServer (dbConf, overrideConf, interceptors, onInit) {
var logger = require('./app/lib/logger')(dbConf.name);
var selfInterceptors = [
{
......@@ -13,6 +15,7 @@ function HDCServer (dbConf, overrideConf, interceptors) {
},
treatment: function (server, obj, next) {
logger.debug('⬇ PUBKEY %s', obj.fingerprint);
// console.log(obj);
async.waterfall([
function (next){
server.PublicKeyService.submitPubkey(obj, next);
......@@ -61,27 +64,18 @@ function HDCServer (dbConf, overrideConf, interceptors) {
}
];
Server.call(this, dbConf, overrideConf, selfInterceptors.concat(interceptors || []));
Server.call(this, dbConf, overrideConf, selfInterceptors.concat(interceptors || []), onInit || []);
var that = this;
this._read = function (size) {
};
this.initServer = function (done) {
if (!that.peerInited) {
that.peerInited = true;
async.waterfall([
function (next){
that.connect(next);
},
function (next){
that.initServices(next);
},
], done);
} else {
done();
}
this.writeRawPubkey = function (raw) {
var source = parsers.parsePubkey();
var dest = that.singleWriteStream();
source.pipe(dest);
source.end(raw);
};
this._initServices = function(conn, done) {
......@@ -111,7 +105,7 @@ function HDCServer (dbConf, overrideConf, interceptors) {
};
this.listenHDC = function (app) {
var hdc = require('./app/controllers/hdc')(this);
var hdc = require('./app/controllers/hdc')(that);
app.get( '/hdc/amendments/promoted', hdc.amendments.promoted);
app.get( '/hdc/amendments/promoted/:am_number', hdc.amendments.promotedNumber);
app.get( '/hdc/amendments/view/:amendment_id/self', hdc.amendments.viewAM.self);
......
......@@ -10,7 +10,7 @@ var wlogger = require('./app/lib/logger')('wallet');
var HDCServer = require('./hdcserver');
var parsers = require('./app/lib/streams/parsers/doc');
function PeerServer (dbConf, overrideConf, interceptors) {
function PeerServer (dbConf, overrideConf, interceptors, onInit) {
var selfInterceptors = [
{
......@@ -88,7 +88,13 @@ function PeerServer (dbConf, overrideConf, interceptors) {
}
];
HDCServer.call(this, dbConf, overrideConf, selfInterceptors.concat(interceptors || []));
var initFunctions = [
function (done) {
that.initPeer(that.conn, that.conf, done);
}
].concat(onInit || []);
HDCServer.call(this, dbConf, overrideConf, selfInterceptors.concat(interceptors || []), initFunctions);
var that = this;
......@@ -128,25 +134,6 @@ function PeerServer (dbConf, overrideConf, interceptors) {
], done);
};
this.initServer = function (done) {
if (!that.peerInited) {
that.peerInited = true;
async.waterfall([
function (next){
that.connect(next);
},
function (next){
that.initServices(next);
},
function (next){
that.initPeer(that.conn, that.conf, next);
},
], done);
} else {
done();
}
};
this.checkConfig = function (done) {
async.waterfall([
function (next){
......@@ -259,7 +246,7 @@ function PeerServer (dbConf, overrideConf, interceptors) {
parser.end(that.PeeringService.cert.raw);
parser.on('readable', function () {
var parsed = parser.read();
that._write(parsed, null, done);
that.submit(parsed, false, done);
});
};
......@@ -306,7 +293,7 @@ function PeerServer (dbConf, overrideConf, interceptors) {
signature = signature.substring(signature.indexOf('-----BEGIN PGP SIGNATURE'));
p2.signature = signature;
p2.pubkey = { fingerprint: that.PeeringService.cert.fingerprint };
that.PeeringService.submit(p2, next);
that.submit(p2, false, next);
},
], function (err) {
next(err);
......
......@@ -57,7 +57,13 @@ function RegistryServer (dbConf, overrideConf, interceptors) {
}
];
PeerServer.call(this, dbConf, overrideConf, selfInterceptors.concat(interceptors || []));
var initFunctions = [
function (done) {
that.initRegistry(that.conn, that.conf, done);
}
];
PeerServer.call(this, dbConf, overrideConf, selfInterceptors.concat(interceptors || []), initFunctions);
var that = this;
......@@ -110,30 +116,6 @@ function RegistryServer (dbConf, overrideConf, interceptors) {
], done);
};
this.initServer = function (done) {
if (!that.peerInited) {
that.peerInited = true;
async.waterfall([
function (next){
that.connect(next);
},
function (next){
that.initServices(next);
},
function (next){
that.initPeer(that.conn, that.conf, next);
},
function (next){
that.initRegistry(that.conn, that.conf, next);
},
], function (err) {
done(err);
});
} else {
done();
}
};
this.initRegistry = function (conn, conf, done) {
async.waterfall([
function (next){
......
......@@ -9,22 +9,73 @@ var request = require('request');
var http = require('http');
var log4js = require('log4js');
var connectPgp = require('connect-pgp');
var logger = require('./app/lib/logger')('server');
var models = ['Amendment', 'Coin', 'Configuration', 'Forward', 'Key', 'CKey', 'Merkle', 'Peer', 'PublicKey', 'Wallet', 'Transaction', 'Vote', 'TxMemory', 'Membership', 'Voting', 'CommunityFlow'];
var INNER_WRITE = true;
function Server (dbConf, overrideConf, interceptors) {
function Server (dbConf, overrideConf, interceptors, onInit) {
stream.Duplex.call(this, { objectMode : true });
var logger = require('./app/lib/logger')(dbConf.name);
var that = this;
that.conn = null;
that.conf = null;
this._write = function (obj, enc, done, isInnerWrite) {
var queue = that.queue = async.queue(function (task, pushDone) {
task(function (err) {
pushDone(err);
});
}, 1);
var initFunctions = [
function (done) {
that.connect(function (err) {
that.emit('connected', err);
done(err);
});
},
function (done) {
that.initServices(function (err) {
that.emit('services', err);
done(err);
});
}
];
initFunctions.concat(onInit).forEach(function(f){
queue.push(f);
});
if (dbConf.listenBMA) {
queue.push(function (done) {
listenBMA(function (err, app) {
that.emit('BMALoaded', err, app);
done(err);
});
});
}
queue.push(function (done) {
queue.concurrency = 1;
done();
});
this._write = function (obj, enc, writeDone, isInnerWrite) {
queue.push(function (done) {
that.submit(obj, isInnerWrite, function (err, res) {
if (isInnerWrite) {
writeDone(err, res);
} else {
writeDone();
}
done();
});
});
};
this.submit = function (obj, isInnerWrite, done) {
async.waterfall([
async.apply(that.initServer.bind(that)),
function (next){
var i = 0;
var treatment = null;
......@@ -56,7 +107,7 @@ function Server (dbConf, overrideConf, interceptors) {
done();
}
});
};
}
this.connect = function (reset, done) {
var databaseName = dbConf.name || "ucoin_default";
......@@ -64,7 +115,7 @@ function Server (dbConf, overrideConf, interceptors) {
var port = dbConf.port;
if (arguments.length == 1) {
done = reset;
reset = false;
reset = dbConf.resetData;
}
// Init connection
if (!that.conn) {
......@@ -84,7 +135,7 @@ function Server (dbConf, overrideConf, interceptors) {
port = undefined;
}
host = host ? host : 'localhost';
logger.debug('Connecting to database %s', databaseName);
// logger.debug('Connecting to database `%s`', databaseName);
var conn = that.conn = mongoose.createConnection('mongodb://' + host + (port ? ':' + port : '') + '/' + databaseName);
conn.on('error', function (err) {
logger.error('connection error:', err);
......@@ -103,9 +154,14 @@ function Server (dbConf, overrideConf, interceptors) {
var Configuration = conn.model('Configuration');
that.conf = foundConf[0] || new Configuration();
if (overrideConf) {
_(overrideConf).keys().forEach(function(k){
_(_(overrideConf).keys()).without('sync').forEach(function(k){
that.conf[k] = overrideConf[k];
});
if (overrideConf.sync) {
_(overrideConf.sync).keys().forEach(function(k){
that.conf.sync[k] = overrideConf.sync[k];
});
}
}
if (reset) {
that.reset(next);
......@@ -182,10 +238,10 @@ function Server (dbConf, overrideConf, interceptors) {
this.initServices = function(done) {
if (!that.servicesInited) {
that.servicesInited = true;
this.HTTPService = require("./app/service/HTTPService");
this.MerkleService = require("./app/service/MerkleService");
this.ParametersService = require("./app/service/ParametersService").get(that.conn, that.conf.currency);
this._initServices(that.conn, done);
that.HTTPService = require("./app/service/HTTPService");
that.MerkleService = require("./app/service/MerkleService");
that.ParametersService = require("./app/service/ParametersService").get(that.conn, that.conf.currency);
that._initServices(that.conn, done);
} else {
done();
}
......@@ -195,7 +251,7 @@ function Server (dbConf, overrideConf, interceptors) {
// To override in child classes
};
this.listenBMA = function (overConf, onLoaded) {
function listenBMA (overConf, onLoaded) {
if (arguments.length == 1) {
onLoaded = overConf;
overConf = undefined;
......@@ -211,9 +267,6 @@ function Server (dbConf, overrideConf, interceptors) {
app.use(express.urlencoded());
app.use(express.json());
async.waterfall([
function (next) {
that.initServer(next);
},
function (next){
// HTTP Signatures
......
......@@ -13,24 +13,6 @@ var ucoin = require('./..');
var logger = require('../app/lib/logger')('test');
var currency = "testo";
var server = ucoin.createRegistryServer({ name: currency }, {
currency: currency,
pgpkey: fs.readFileSync(__dirname + "/data/lolcat.priv"),
pgppasswd: 'lolcat',
ipv4: '127.0.0.1',
port: 9106,
remoteipv4: '127.0.0.1',
remoteport: 9106,
sync: {
AMStart: now,
AMFreq: 1, // Every second
UDFreq: 2, // Dividend every 5 seconds
UD0: 10,
UDPercent: 0.5, // So it can be tested under 4 UD - this ultra high value of UD growth
Consensus: 2/3<