Skip to content
Snippets Groups Projects
Commit fb8323a3 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

Refactored multicaster

parent b212217f
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,8 @@ var constants = require('../../lib/constants');
var Peer = require('../../lib/entity/peer');
var logger = require('../../lib/logger')('multicaster');
const WITH_ISOLATION = true;
var fifo = async.queue(function (task, callback) {
task(callback);
}, constants.NETWORK.MAX_CONCURRENT_POST);
......@@ -22,140 +24,120 @@ function Multicaster (isolate, timeout) {
var that = this;
this._write = function (obj, enc, done) {
that.emit(obj.type, obj.obj, obj.peers);
done();
let blockForward = forward({
type: 'Block',
uri: '/blockchain/block',
getObj: (block) => {
return {
"block": block.getRawSigned()
};
that.on('identity', function(idty, peers) {
logger.debug('--> new Identity with %s certs to be sent to %s peer(s)', (idty.certs || []).length, peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendIdentity(peer, idty).finally(sent);
});
});
},
getDocID: (block) => 'block#' + block.number
});
that.on('block', function(block, peers) {
logger.debug('--> new Block#%s to be sent to %s peer(s)', block.number, peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendBlock(peer, block).finally(sent);
});
});
let idtyForward = forward({
type: 'Identity',
uri: '/wot/add',
getObj: (idty) => {
return {
"pubkey": idty.getRawPubkey(),
"self": idty.getRawSelf(),
"other": idty.getRawOther()
};
},
getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs'
});
that.on('transaction', function(transaction, peers) {
logger.debug('--> new Transaction to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendTransaction(peer, transaction).finally(sent);
let txForward = forward({
type: 'Transaction',
uri: '/tx/process',
getObj: (transaction) => {
return {
"transaction": transaction.getRaw(),
"signature": transaction.signature
};
}
});
let peerForward = forward({
type: 'Peer',
uri: '/network/peering/peers',
transform: Peer.statics.peerize,
getObj: (peering) => {
return {
peer: peering.getRawSigned()
};
},
getDocID: (doc) => doc.keyID(),
withIsolation: WITH_ISOLATION
});
let msForward = forward({
type: 'Membership',
uri: '/blockchain/membership',
getObj: (membership) => {
return {
"membership": membership.getRaw(),
"signature": membership.signature
};
}
});
that.on('peer', function(peering, peers) {
if(!isolate) {
logger.debug('--> new Peer to be sent to %s peer(s)', peers.length);
let thePeering = Peer.statics.peerize(peering);
logger.info('POST peering %s:', thePeering.keyID());
that.on('identity', idtyForward);
that.on('block', blockForward);
that.on('transaction', txForward);
that.on('peer', peerForward);
that.on('membership', msForward);
this._write = function (obj, enc, done) {
that.emit(obj.type, obj.obj, obj.peers);
done();
};
this.sendBlock = (toPeer, block) => blockForward(block, [toPeer]);
this.sendPeering = (toPeer, peer) => blockForward(peer, [toPeer]);
function forward(params) {
return function(doc, peers) {
if(!params.withIsolation || !isolate) {
let theDoc = params.transform ? params.transform(doc) : doc;
logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length);
if (params.getDocID) {
logger.info('POST %s %s:', params.type, params.getDocID(theDoc));
} else {
logger.info('POST %s:', params.type);
}
peers.forEach(function(p){
let peer = Peer.statics.peerize(p);
logger.info(' `--> to peer %s (%s)', peer.keyID(), peer.getNamedURL());
logger.info(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', peer.getNamedURL());
fifo.push(function (sent) {
sendPeer(peer, thePeering).finally(sent);
return post(peer, params.uri, params.getObj(theDoc)).finally(sent);
});
});
} else {
logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length);
}
});
that.on('membership', function(membership, peers) {
logger.debug('--> new Membership to be sent to %s peer(s)', peers.length);
peers.forEach(function(peer){
fifo.push(function (sent) {
sendMembership(peer, membership).finally(sent);
});
});
});
this.sendBlock = sendBlock;
this.sendPeering = sendPeering;
};
}
function post(peer, url, data, done) {
function post(peer, url, data) {
if (!peer.isReachable()) {
return Q();
}
return Q.Promise(function(resolve, reject){
return Q.Promise(function(resolve){
var postReq = request.post({
"uri": 'http://' + peer.getURL() + url,
"timeout": timeout || constants.NETWORK.DEFAULT_TIMEOUT
}, function (err, res) {
// TODO: set unreachable only if problem of connection
/*if (err) {
if (err) {
that.push({ unreachable: true, peer: { pubkey: peer.pubkey }});
return reject(err);
}*/
logger.warn(err.message || err);
}
resolve(res);
});
postReq.form(data);
})
.then(function(){
done && done();
})
.catch(function(err) {
done && done(err);
throw err;
});
}
function sendIdentity(peer, idty, done) {
var keyID = peer.keyID();
logger.info('POST identity to %s', keyID.match(/Unknown/) ? peer.getURL() : keyID);
return post(peer, '/wot/add', {
"pubkey": idty.getRawPubkey(),
"self": idty.getRawSelf(),
"other": idty.getRawOther()
}, done);
}
function sendBlock(peer, block, done) {
var keyID = peer.keyID();
logger.info('POST block#%s to %s', block.number, keyID.match(/Unknown/) ? peer.getURL() : keyID);
return post(peer, '/blockchain/block', {
"block": block.getRawSigned()
}, done);
}
function sendTransaction(peer, transaction, done) {
logger.info('POST transaction to %s', peer.keyID());
return post(peer, '/tx/process', {
"transaction": transaction.getRaw(),
"signature": transaction.signature
}, done);
}
function sendPeering(toPeer, peer, done) {
logger.info('POST peering to %s (%s)', toPeer.keyID(), toPeer.getURL());
return post(toPeer, '/network/peering/peers', {
"peer": peer.getRawSigned()
}, done);
}
function sendMembership(peer, membership, done) {
logger.info('POST membership to %s', peer.keyID());
return post(peer, '/blockchain/membership', {
"membership": membership.getRaw(),
"signature": membership.signature
}, done);
}
function sendPeer(peer, thePeering, done) {
return post(peer, "/network/peering/peers", {
peer: thePeering.getRawSigned()
}, done);
}
}
util.inherits(Multicaster, stream.Transform);
......@@ -86,7 +86,7 @@ function Router (serverPubkey, conf, dal) {
}
members = chooseXin(members, constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO);
nonmembers = chooseXin(nonmembers, constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO);
return members.concat(nonmembers);
return members.map((p) => (p.member = true) && p).concat(nonmembers);
})
.then(_.partial(done, null)).catch(done);
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment