Commit 99a87f0e authored by Cédric Moreau's avatar Cédric Moreau

Plugging again multicast

parent 78e2af73
var async = require('async');
var stream = require('stream');
var util = require('util');
var request = require('request');
var logger = require('../lib/logger')('networker');
var async = require('async');
var logger = require('../../lib/logger')('multicaster');
var fifo = async.queue(function (task, callback) {
task(callback);
}, 1);
module.exports = function (eventEmitter) {
module.exports = function () {
return new Multicaster();
}
function Multicaster () {
stream.Transform.call(this, { objectMode: true });
var that = this;
this._write = function (obj, enc, done) {
that.emit(obj.type, obj.obj, obj.peers);
done();
}
eventEmitter.on('pubkey', function(pubkey, peers) {
logger.debug('new pubkey to be sent to %s peers', peers.length);
that.on('pubkey', function(pubkey, peers) {
logger.debug('--> new Pubkey to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendPubkey(peer, pubkey, success(function (err) {
......@@ -19,8 +34,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('vote', function(vote, peers) {
logger.debug('new vote to be sent to %s peers', peers.length);
that.on('vote', function(vote, peers) {
logger.debug('--> new Vote to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendVote(peer, vote, success(function (err) {
......@@ -30,8 +45,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('transaction', function(transaction, peers) {
logger.debug('new transaction to be sent to %s peers', peers.length);
that.on('transaction', function(transaction, peers) {
logger.debug('--> new Transaction to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendTransaction(peer, transaction, success(function (err) {
......@@ -41,8 +56,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('wallet', function(wallet, peers) {
logger.debug('new Wallet to be sent to %s peers', peers.length);
that.on('wallet', function(wallet, peers) {
logger.debug('--> new Wallet to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendWallet(peer, wallet, success(function (err) {
......@@ -52,7 +67,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('peer', function(peering, peers, done) {
that.on('peer', function(peering, peers, done) {
logger.debug('--> new Peer to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
......@@ -71,7 +87,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('status', function(status, peers, internal) {
that.on('status', function(status, peers) {
logger.debug('--> new Status to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
......@@ -82,19 +99,13 @@ module.exports = function (eventEmitter) {
}, function (err, res, body) {
// Sent!
sent(err);
if (!err && res && res.statusCode == 400 && !internal) {
logger.debug('sending self peering to peer %s', peer.keyID());
eventEmitter.emit('peer', eventEmitter.peer(), [peer], function (err, res, body) {
eventEmitter.emit('status', status, [peer], true);
});
}
});
});
});
});
eventEmitter.on('membership', function(membership, peers) {
logger.debug('new membership to be sent to %s peers', peers.length);
that.on('membership', function(membership, peers) {
logger.debug('--> new Membership to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendMembership(peer, membership, success(function (err) {
......@@ -104,8 +115,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('voting', function(voting, peers) {
logger.debug('new voting to be sent to %s peers', peers.length);
that.on('voting', function(voting, peers) {
logger.debug('--> new Voting to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendVoting(peer, voting, success(function (err) {
......@@ -115,7 +126,8 @@ module.exports = function (eventEmitter) {
});
});
eventEmitter.on('forward', function(forward, peers, done) {
that.on('forward', function(forward, peers, done) {
logger.debug('--> new Forward to be sent to %s peer(s)', peers.length);
fifo.push(function (sent) {
async.forEach(peers, function(peer, callback){
// Do propagating
......@@ -131,12 +143,16 @@ module.exports = function (eventEmitter) {
}
});
}, function(err){
done(err);
if (typeof done == 'function') {
done(err);
}
sent();
});
});
});
};
}
util.inherits(Multicaster, stream.Transform);
function sendPubkey(peer, pubkey, done) {
logger.info('POST pubkey to %s', peer.keyID());
......
......@@ -3,27 +3,77 @@ var sha1 = require('sha1');
var util = require('util');
var stream = require('stream');
module.exports = function (server) {
return new Router(server);
module.exports = function (serverFPR, conn) {
return new Router(serverFPR, conn);
};
function Router (server) {
function Router (serverFPR, conn) {
stream.Writable.call(this, { objectMode: true });
var Merkle = conn.model('Merkle');
var Key = conn.model('Key');
var PublicKey = conn.model('PublicKey');
var Amendment = conn.model('Amendment');
var Vote = conn.model('Vote');
var Transaction = conn.model('Transaction');
var Peer = conn.model('Peer');
var Forward = conn.model('Forward');
var Wallet = conn.model('Wallet');
stream.Transform.call(this, { objectMode: true });
var that = this;
this._write = function (obj, enc, done) {
if (typeof obj.email != undefined) {
getRandomInAllPeers(function (err, peers) {
that.emit('pubkey', pubkey, peers || []);
});
if (typeof obj.email != 'undefined') { route('pubkey', obj, getRandomInUPPeers, done); }
else if (obj.amendment ? true : false) { route('vote', obj, getRandomInUPPeers, done); }
else if (obj.recipient ? true : false) { route('transaction', obj, getTargetedButSelf(obj.recipient), done); }
else if (obj.endpoints ? true : false) { route('peer', obj, getRandomInAllPeersButPeer(obj.fingerprint), done); }
else if (obj.forward ? true : false) { route('forward', obj, getTargetedButSelf(obj.to), done); }
else if (obj.status ? true : false) { route('status', obj, getTargetedButSelf(obj.to), done); }
else if (obj.requiredTrusts ? true : false) { route('wallet', obj, getRandomInUPPeers, done); }
else if (obj.type && obj.type == "MEMBERSHIP" ? true : false) { route('membership', obj, getRandomInUPPeers, done); }
else if (obj.type && obj.type == "VOTING" ? true : false) { route('voting', obj, getRandomInUPPeers, done); }
else if (obj.algorithm ? true : false) { route('flow', obj, getRandomInUPPeers, done); }
else {
done();
}
};
function getRandomInAllPeers (done) {
Peer.getRandomlyWithout([server.PeeringService.cert.fingerprint], done);
function route (type, obj, getPeersFunc, done) {
getPeersFunc(function (err, peers) {
that.push({
'type': type,
'obj': obj,
'peers': peers || []
});
done();
});
}
function getRandomInAllPeersButPeer (fpr) {
return function (done) {
Peer.getRandomlyWithout([serverFPR, fpr], done);
};
};
function getRandomInUPPeers (done) {
Peer.getRandomlyUPsWithout([serverFPR], done);
};
/**
* Get the peer targeted by `to` argument, this node excluded (for not to loop on self).
*/
function getTargetedButSelf (to) {
return function (done) {
if (to == serverFPR) {
done(null, []);
} else {
Peer.getTheOne(to, function (err, peer) {
done(err, [peer]);
});
}
};
}
};
util.inherits(Router, stream.Writable);
util.inherits(Router, stream.Transform);
......@@ -15,6 +15,7 @@ var ForwardSchema = new Schema({
keys: [String],
hash: String,
hashBasis: String,
signature: String,
upstream: { type: Boolean, default: false },
created: { type: Date, default: Date.now },
updated: { type: Date, default: Date.now }
......@@ -46,7 +47,7 @@ ForwardSchema.methods = {
copyValues: function(to) {
var obj = this;
["version", "currency", "from", "to", "forward", "keys", "upstream"].forEach(function (key) {
["version", "currency", "from", "to", "forward", "keys", "upstream", "signature"].forEach(function (key) {
to[key] = obj[key];
});
},
......@@ -57,6 +58,7 @@ ForwardSchema.methods = {
["version", "currency", "from", "to", "forward", "keys"].forEach(function (key) {
json[key] = obj[key];
});
json.raw = this.getRaw();
return json;
},
......
......@@ -83,6 +83,7 @@ PeerSchema.methods = {
["version", "currency", "fingerprint", "endpoints", "status", "signature"].forEach(function (key) {
json[key] = obj[key];
});
json.raw = this.getRaw();
return json;
},
......@@ -201,24 +202,42 @@ PeerSchema.statics.getRandomlyWithout = function (fingerprints, done) {
var that = this;
async.waterfall([
function (next){
that.find({ fingerprint: { $nin: fingerprints }, status: { $in: ['NEW_BACK', 'UP'] } })
that.find({ fingerprint: { $nin: fingerprints } })
.sort({ 'updated': -1 })
.limit(10)
.exec(next);
},
function (records, next){
var peers = [];
var recordsLength = records.length;
for (var i = 0; i < Math.min(recordsLength, 4); i++) {
var randIndex = Math.max(Math.floor(Math.random()*10) - (10 - recordsLength) - i, 0);
peers.push(records[randIndex]);
records.splice(randIndex, 1);
}
next(null, peers);
choose4in
], done);
};
/**
* Look for 10 last updated peers, and choose randomly 4 peers in it
*/
PeerSchema.statics.getRandomlyUPsWithout = function (fingerprints, done) {
var that = this;
async.waterfall([
function (next){
that.find({ fingerprint: { $nin: fingerprints }, status: { $in: ['NEW_BACK', 'UP'] } })
.sort({ 'updated': -1 })
.limit(10)
.exec(next);
},
choose4in
], done);
};
function choose4in (peers, done) {
var chosen = [];
var nbPeers = peers.length;
for (var i = 0; i < Math.min(nbPeers, 4); i++) {
var randIndex = Math.max(Math.floor(Math.random()*10) - (10 - nbPeers) - i, 0);
chosen.push(peers[randIndex]);
peers.splice(randIndex, 1);
}
done(null, chosen);
}
PeerSchema.statics.status = STATUS;
module.exports = PeerSchema;
......@@ -16,6 +16,8 @@ module.exports = function StatusMessage (values) {
['version', 'currency', 'status', 'from', 'to'].forEach(function(field){
obj[field] = that[field] || '';
});
obj.raw = this.getRaw();
obj.signature = this.signature;
return obj;
}
......
......@@ -284,7 +284,7 @@ function PeeringService(conn, conf, PublicKeyService, ParametersService) {
jpgp().sign(forward.getRaw(), that.privateKey, next);
},
function (signature, next) {
forward.signature = signature;
forward.signature = signature.substring(signature.indexOf('-----BEGIN PGP SIGNATURE'));;
next(null, forward);
}
], done);
......@@ -484,34 +484,34 @@ function PeeringService(conn, conf, PublicKeyService, ParametersService) {
* @param fingerprints List of peers' fingerprints to which status is to be sent
*/
this.sendStatusTo = function (statusStr, fingerprints, done) {
var status = new Status({
version: 1,
currency: currency,
status: statusStr
});
var raw = status.getRaw().unix2dos();
async.waterfall([
function (next){
jpgp().sign(raw, that.privateKey, next);
},
function (signature, next) {
status.signature = signature.substring(signature.indexOf('-----BEGIN PGP SIGNATURE'));
async.waterfall([
async.apply(Peer.getList.bind(Peer), fingerprints),
function (peers) {
that.emit('status', status, peers || [], false, function (err) {
async.forEach(peers, function(peer, callback){
peer.statusSent = status.status;
peer.statusSigDate = status.sigDate;
peer.save(function (err) {
if (err) logger.error(err);
callback();
});
async.apply(Peer.getList.bind(Peer), fingerprints),
function (peers, next) {
async.forEach(peers, function(peer, callback){
var status = new Status({
version: 1,
currency: currency,
status: statusStr,
from: that.cert.fingerprint,
to: peer.fingerprint
});
async.waterfall([
function (next){
jpgp().sign(status.getRaw(), that.privateKey, next);
},
function (signature, next) {
status.signature = signature.substring(signature.indexOf('-----BEGIN PGP SIGNATURE'));
status.sigDate = new Date();
that.emit('status', status);
peer.statusSent = status.status;
peer.statusSigDate = status.sigDate;
peer.save(function (err) {
if (err) logger.error(err);
next();
});
});
next();
}
], next);
},
], callback);
}, next);
},
], done);
}
......@@ -592,7 +592,7 @@ function PeeringService(conn, conf, PublicKeyService, ParametersService) {
};
function getRandomInAllPeers (done) {
Peer.getRandomlyWithout([that.cert.fingerprint], done);
Peer.getRandomlyUPsWithout([that.cert.fingerprint], done);
};
// TODO
......
#!/usr/bin/env node
var jpgp = require('../app/lib/jpgp');
var wizard = require('../app/lib/wizard');
var fs = require('fs');
var os = require('os');
var async = require('async');
var _ = require('underscore');
var program = require('commander');
var mongoose = require('mongoose');
var moment = require('moment');
var inquirer = require('inquirer');
var openpgp = require('openpgp');
var logger = require('../app/lib/logger')('ucoind');
var ucoin = require('./..');
var fs = require('fs');
var os = require('os');
var async = require('async');
var _ = require('underscore');
var program = require('commander');
var mongoose = require('mongoose');
var moment = require('moment');
var inquirer = require('inquirer');
var openpgp = require('openpgp');
var jpgp = require('../app/lib/jpgp');
var wizard = require('../app/lib/wizard');
var router = require('../app/lib/streams/router');
var multicaster = require('../app/lib/streams/multicaster');
var logger = require('../app/lib/logger')('ucoind');
var ucoin = require('./..');
function keys (val) {
return val.split(',');
......@@ -256,6 +258,10 @@ program
.description('Start uCoin server using given --currency')
.action(connect(LISTEN_HTTP, ucoin.createRegistryServer, function (server, conf) {
server
.pipe(router(server.PeeringService.cert.fingerprint, server.conn))
.pipe(multicaster());
// Launching server
server.on('BMALoaded', function (err, app) {
if(err){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment