diff --git a/app/lib/streams/router.js b/app/lib/streams/router.js index e0fef2bd127554bc6f6383921cf13842ab0dcc9b..d83afb891230034e3a97f4aaf746cbf723a3189f 100644 --- a/app/lib/streams/router.js +++ b/app/lib/streams/router.js @@ -29,67 +29,50 @@ function Router (PeeringService, conf, dal) { const that = this; this._write = function (obj, enc, done) { - if (obj.joiners) { - route('block', obj, getRandomInUPPeers(), done); - } - else if (obj.pubkey && obj.uid) { - route('identity', obj, getRandomInUPPeers(), done); - } - else if (obj.userid) { - route('membership', obj, getRandomInUPPeers(), done); - } - else if (obj.inputs) { - route('transaction', obj, getRandomInUPPeers(), done); - } - else if (obj.endpoints) { - route('peer', obj, getRandomInUPPeers(), done); - } - else if (obj.from && obj.from == PeeringService.pubkey) { - // Route ONLY status emitted by this node - route('status', obj, getTargeted(obj.to), done); - } - else if (obj.unreachable) { - async.waterfall([ - function (next) { - dal.setPeerDown(obj.peer.pubkey) - .then(function(){ - next(); - }) - .catch(next); + return co(function*() { + try { + if (obj.joiners) { + yield route('block', obj, getRandomInUPPeers()); } - ], function(err) { - if (err) logger.error(err); - else logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey); - done(); - }); - } - else if (obj.outdated) { - co(function*() { - try { - yield PeeringService.handleNewerPeer(obj.peer); - done(); - } catch (e) { - logger.warn(e); - done(e); - } - }); - } - else { - done(); - } + else if (obj.pubkey && obj.uid) { + yield route('identity', obj, getRandomInUPPeers()); + } + else if (obj.userid) { + yield route('membership', obj, getRandomInUPPeers()); + } + else if (obj.inputs) { + yield route('transaction', obj, getRandomInUPPeers()); + } + else if (obj.endpoints) { + yield route('peer', obj, getRandomInUPPeers()); + } + else if (obj.from && obj.from == PeeringService.pubkey) { + // Route ONLY status emitted by this node + yield route('status', obj, getTargeted(obj.to)); + } + else if (obj.unreachable) { + yield dal.setPeerDown(obj.peer.pubkey); + logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey); + } + else if (obj.outdated) { + yield PeeringService.handleNewerPeer(obj.peer); + } + } catch (e) { + logger.error("Routing error: %s", e && (e.stack || e.message || e)); + } + done && done(); + }); }; - function route (type, obj, getPeersFunc, done) { - if (!active) { - return done(); - } - getPeersFunc(function (err, peers) { + function route (type, obj, getPeersFunc) { + return co(function*() { + if (!active) return; + const peers = yield getPeersFunc(); that.push({ 'type': type, 'obj': obj, 'peers': (peers || []).map(Peer.statics.peerize) }); - done(); }); } @@ -98,7 +81,7 @@ function Router (PeeringService, conf, dal) { } function getValidUpPeers (without) { - return function (done) { + return function () { return co(function *() { let members = []; let nonmembers = []; @@ -115,8 +98,7 @@ function Router (PeeringService, conf, dal) { pubkey: 'M' + index + '_' + PeeringService.pubkey, endpoints: [mep] }})); - }) - .then(_.partial(done, null)).catch(done); + }); }; } @@ -124,14 +106,14 @@ function Router (PeeringService, conf, dal) { * Get the peer targeted by `to` argument, this node excluded (for not to loop on self). */ function getTargeted (to) { - return function (done) { - if (to == PeeringService.pubkey) { - done(null, []); - } else { - dal.getPeer(to) - .then((peer) => done(null, [peer])) - .catch((err) => done(err)); - } + return function () { + return co(function*() { + if (to == PeeringService.pubkey) { + return []; + } + const peer = yield dal.getPeer(to); + return [peer]; + }); }; }