diff --git a/.eslintignore b/.eslintignore index 4816b4d21dcaf7abcb2492d04bd67c668ca583bd..7b9a68880f3504729c7ca57ea18d5c580e3cc457 100644 --- a/.eslintignore +++ b/.eslintignore @@ -12,7 +12,8 @@ app/lib/dal/fileDALs/*.js app/lib/dal/fileDAL.js app/service/*.js app/lib/rules/*.js -app/lib/system/directory.js +app/lib/system/*.js +app/lib/streams/*.js app/modules/wizard.js app/modules/router.js app/modules/revert.js diff --git a/.gitignore b/.gitignore index 1e66dced5b08e5085d80c6abbd713a55360d4b72..ac726b307dd5eee23eefb6db0904c99eb61e82fa 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ app/lib/dal/fileDAL.js* app/lib/rules/*.js* app/lib/logger*js* app/lib/system/directory.js* +app/lib/streams/*.js* app/service/*.js* app/lib/wot.js* app/modules/prover/*.js* diff --git a/app/lib/streams/multicaster.js b/app/lib/streams/multicaster.js index 64d778d04a283bdce9f3ffacc165531a7d07d18f..e82a8aaa2c8f1252e368c55661232c7e1c1f8943 100644 --- a/app/lib/streams/multicaster.js +++ b/app/lib/streams/multicaster.js @@ -1,203 +1,227 @@ "use strict"; -const stream = require('stream'); -const util = require('util'); +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); + }); +}; +Object.defineProperty(exports, "__esModule", { value: true }); +const stream = require("stream"); const request = require('request'); -const co = require('co'); const constants = require('../../lib/constants'); -const Peer = require('../../lib/entity/peer'); +const Peer = require('../../lib/entity/peer'); const Identity = require('../../lib/entity/identity'); const Certification = require('../../lib/entity/certification'); const Revocation = require('../../lib/entity/revocation'); const Membership = require('../../lib/entity/membership'); const Block = require('../../lib/entity/block'); const Transaction = require('../../lib/entity/transaction'); -const logger = require('../logger').NewLogger('multicaster'); - +const logger = require('../logger').NewLogger('multicaster'); const WITH_ISOLATION = true; - -module.exports = function (conf, timeout) { - return new Multicaster(conf, timeout); -}; - -function Multicaster (conf, timeout) { - - stream.Transform.call(this, { objectMode: true }); - - const that = this; - - let blockForward = forward({ - transform: Block.statics.fromJSON, - type: 'Block', - uri: '/blockchain/block', - getObj: (block) => { - return { - "block": block.getRawSigned() - }; - }, - getDocID: (block) => 'block#' + block.number - }); - - let idtyForward = forward({ - transform: Identity.statics.fromJSON, - type: 'Identity', - uri: '/wot/add', - getObj: (idty) => { - return { - "identity": idty.createIdentity() - }; - }, - getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs' - }); - - let certForward = forward({ - transform: Certification.statics.fromJSON, - type: 'Cert', - uri: '/wot/certify', - getObj: (cert) => { - return { - "cert": cert.getRaw() - }; - }, - getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs' - }); - - let revocationForward = forward({ - transform: Revocation.statics.fromJSON, - type: 'Revocation', - uri: '/wot/revoke', - getObj: (revocation) => { - return { - "revocation": revocation.getRaw() - }; +class Multicaster extends stream.Transform { + constructor(conf = null, timeout = 0) { + super({ objectMode: true }); + this.conf = conf; + this.timeout = timeout; + this.on('identity', (data, peers) => this.idtyForward(data, peers)); + this.on('cert', (data, peers) => this.certForward(data, peers)); + this.on('revocation', (data, peers) => this.revocationForward(data, peers)); + this.on('block', (data, peers) => this.blockForward(data, peers)); + this.on('transaction', (data, peers) => this.txForward(data, peers)); + this.on('peer', (data, peers) => this.peerForward(data, peers)); + this.on('membership', (data, peers) => this.msForward(data, peers)); } - }); - - let txForward = forward({ - transform: Transaction.statics.fromJSON, - type: 'Transaction', - uri: '/tx/process', - getObj: (transaction) => { - return { - "transaction": transaction.getRaw(), - "signature": transaction.signature - }; + blockForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + transform: Block.statics.fromJSON, + type: 'Block', + uri: '/blockchain/block', + getObj: (block) => { + return { + "block": block.getRawSigned() + }; + }, + getDocID: (block) => 'block#' + block.number + })(doc, peers); + }); } - }); - - let peerForward = forward({ - type: 'Peer', - uri: '/network/peering/peers', - transform: Peer.statics.peerize, - getObj: (peering) => { - return { - peer: peering.getRawSigned() - }; - }, - getDocID: (doc) => doc.keyID() + '#' + doc.block.match(/(\d+)-/)[1], - withIsolation: WITH_ISOLATION, - onError: (resJSON, peering, to) => { - const sentPeer = Peer.statics.peerize(peering); - if (Peer.statics.blockNumber(resJSON.peer) > sentPeer.blockNumber()) { - that.push({ outdated: true, peer: resJSON.peer }); - logger.warn('Outdated peer document (%s) sent to %s', sentPeer.keyID() + '#' + sentPeer.block.match(/(\d+)-/)[1], to); - } - return Promise.resolve(); + idtyForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + transform: Identity.statics.fromJSON, + type: 'Identity', + uri: '/wot/add', + getObj: (idty) => { + return { + "identity": idty.createIdentity() + }; + }, + getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs' + })(doc, peers); + }); } - }); - - let msForward = forward({ - transform: Membership.statics.fromJSON, - type: 'Membership', - uri: '/blockchain/membership', - getObj: (membership) => { - return { - "membership": membership.getRaw(), - "signature": membership.signature - }; + certForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + transform: Certification.statics.fromJSON, + type: 'Cert', + uri: '/wot/certify', + getObj: (cert) => { + return { + "cert": cert.getRaw() + }; + }, + getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs' + })(doc, peers); + }); } - }); - - that.on('identity', idtyForward); - that.on('cert', certForward); - that.on('revocation', revocationForward); - that.on('block', blockForward); - that.on('transaction', txForward); - that.on('peer', peerForward); - that.on('membership', msForward); - - this._write = function (obj, enc, done) { - that.emit(obj.type, obj.obj, obj.peers); - done(); - }; - - this.sendBlock = (toPeer, block) => blockForward(block, [toPeer]); - this.sendPeering = (toPeer, peer) => peerForward(peer, [toPeer]); - - function forward(params) { - return function(doc, peers) { - return co(function *() { - try { - if(!params.withIsolation || !(conf && conf.isolate)) { - let theDoc = params.transform ? params.transform(doc) : doc; - logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length); - if (params.getDocID) { - logger.info('POST %s %s', params.type, params.getDocID(theDoc)); - } else { - logger.info('POST %s', params.type); - } - // Parallel treatment for superfast propagation - yield peers.map((p) => co(function*() { - let peer = Peer.statics.peerize(p); - const namedURL = peer.getNamedURL(); - logger.debug(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', namedURL); - try { - yield post(peer, params.uri, params.getObj(theDoc)); - } catch (e) { - if (params.onError) { - try { - const json = JSON.parse(e.body); - yield params.onError(json, doc, namedURL); - } catch (ex) { - logger.warn('Could not reach %s', namedURL); - } + revocationForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + transform: Revocation.statics.fromJSON, + type: 'Revocation', + uri: '/wot/revoke', + getObj: (revocation) => { + return { + "revocation": revocation.getRaw() + }; } - } - })); - } else { - logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length); - } - } catch (err) { - logger.error(err); - } - }); - }; - } - - function post(peer, uri, data) { - if (!peer.isReachable()) { - return Promise.resolve(); + })(doc, peers); + }); } - return new Promise(function(resolve, reject){ - const postReq = request.post({ - "uri": protocol(peer.getPort()) + '://' + peer.getURL() + uri, - "timeout": timeout || constants.NETWORK.DEFAULT_TIMEOUT - }, function (err, res) { - if (err) { - that.push({ unreachable: true, peer: { pubkey: peer.pubkey }}); - logger.warn(err.message || err); - } - if (res && res.statusCode != 200) { - return reject(res); + txForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + transform: Transaction.statics.fromJSON, + type: 'Transaction', + uri: '/tx/process', + getObj: (transaction) => { + return { + "transaction": transaction.getRaw(), + "signature": transaction.signature + }; + } + })(doc, peers); + }); + } + peerForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + type: 'Peer', + uri: '/network/peering/peers', + transform: Peer.statics.peerize, + getObj: (peering) => { + return { + peer: peering.getRawSigned() + }; + }, + getDocID: (doc) => doc.keyID() + '#' + doc.block.match(/(\d+)-/)[1], + withIsolation: WITH_ISOLATION, + onError: (resJSON, peering, to) => { + const sentPeer = Peer.statics.peerize(peering); + if (Peer.statics.blockNumber(resJSON.peer) > sentPeer.blockNumber()) { + this.push({ outdated: true, peer: resJSON.peer }); + logger.warn('Outdated peer document (%s) sent to %s', sentPeer.keyID() + '#' + sentPeer.block.match(/(\d+)-/)[1], to); + } + return Promise.resolve(); + } + })(doc, peers); + }); + } + msForward(doc, peers) { + return __awaiter(this, void 0, void 0, function* () { + return this.forward({ + transform: Membership.statics.fromJSON, + type: 'Membership', + uri: '/blockchain/membership', + getObj: (membership) => { + return { + "membership": membership.getRaw(), + "signature": membership.signature + }; + } + })(doc, peers); + }); + } + _write(obj, enc, done) { + this.emit(obj.type, obj.obj, obj.peers); + done(); + } + sendBlock(toPeer, block) { + return this.blockForward(block, [toPeer]); + } + sendPeering(toPeer, peer) { + return this.peerForward(peer, [toPeer]); + } + forward(params) { + return (doc, peers) => __awaiter(this, void 0, void 0, function* () { + try { + if (!params.withIsolation || !(this.conf && this.conf.isolate)) { + let theDoc = params.transform ? params.transform(doc) : doc; + logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length); + if (params.getDocID) { + logger.info('POST %s %s', params.type, params.getDocID(theDoc)); + } + else { + logger.info('POST %s', params.type); + } + // Parallel treatment for superfast propagation + yield Promise.all(peers.map((p) => __awaiter(this, void 0, void 0, function* () { + let peer = Peer.statics.peerize(p); + const namedURL = peer.getNamedURL(); + logger.debug(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', namedURL); + try { + yield this.post(peer, params.uri, params.getObj(theDoc)); + } + catch (e) { + if (params.onError) { + try { + const json = JSON.parse(e.body); + yield params.onError(json, doc, namedURL); + } + catch (ex) { + logger.warn('Could not reach %s', namedURL); + } + } + } + }))); + } + else { + logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length); + } + } + catch (err) { + logger.error(err); + } + }); + } + post(peer, uri, data) { + if (!peer.isReachable()) { + return Promise.resolve(); } - resolve(res); - }); - postReq.form(data); - }); - } + return new Promise((resolve, reject) => { + const postReq = request.post({ + "uri": protocol(peer.getPort()) + '://' + peer.getURL() + uri, + "timeout": this.timeout || constants.NETWORK.DEFAULT_TIMEOUT + }, (err, res) => { + if (err) { + this.push({ unreachable: true, peer: { pubkey: peer.pubkey } }); + logger.warn(err.message || err); + } + if (res && res.statusCode != 200) { + return reject(res); + } + resolve(res); + }); + postReq.form(data); + }); + } } - +exports.Multicaster = Multicaster; function protocol(port) { - return port == 443 ? 'https' : 'http'; + return port == 443 ? 'https' : 'http'; } - -util.inherits(Multicaster, stream.Transform); +//# sourceMappingURL=multicaster.js.map \ No newline at end of file diff --git a/app/lib/streams/multicaster.ts b/app/lib/streams/multicaster.ts new file mode 100644 index 0000000000000000000000000000000000000000..c313b8d75ba8e8ca8ae6b768059464ed82163606 --- /dev/null +++ b/app/lib/streams/multicaster.ts @@ -0,0 +1,215 @@ +import {ConfDTO} from "../dto/ConfDTO" +import * as stream from "stream" +import {DBPeer} from "../dal/sqliteDAL/PeerDAL" + +const request = require('request'); +const constants = require('../../lib/constants'); +const Peer = require('../../lib/entity/peer'); +const Identity = require('../../lib/entity/identity'); +const Certification = require('../../lib/entity/certification'); +const Revocation = require('../../lib/entity/revocation'); +const Membership = require('../../lib/entity/membership'); +const Block = require('../../lib/entity/block'); +const Transaction = require('../../lib/entity/transaction'); +const logger = require('../logger').NewLogger('multicaster'); + +const WITH_ISOLATION = true; + +export class Multicaster extends stream.Transform { + + constructor(private conf:ConfDTO|null = null, private timeout:number = 0) { + + super({ objectMode: true }) + + this.on('identity', (data:any, peers:DBPeer[]) => this.idtyForward(data, peers)) + this.on('cert', (data:any, peers:DBPeer[]) => this.certForward(data, peers)) + this.on('revocation', (data:any, peers:DBPeer[]) => this.revocationForward(data, peers)) + this.on('block', (data:any, peers:DBPeer[]) => this.blockForward(data, peers)) + this.on('transaction', (data:any, peers:DBPeer[]) => this.txForward(data, peers)) + this.on('peer', (data:any, peers:DBPeer[]) => this.peerForward(data, peers)) + this.on('membership', (data:any, peers:DBPeer[]) => this.msForward(data, peers)) + } + + async blockForward(doc:any, peers:DBPeer[]) { + return this.forward({ + transform: Block.statics.fromJSON, + type: 'Block', + uri: '/blockchain/block', + getObj: (block:any) => { + return { + "block": block.getRawSigned() + }; + }, + getDocID: (block:any) => 'block#' + block.number + })(doc, peers) + } + + async idtyForward(doc:any, peers:DBPeer[]) { + return this.forward({ + transform: Identity.statics.fromJSON, + type: 'Identity', + uri: '/wot/add', + getObj: (idty:any) => { + return { + "identity": idty.createIdentity() + }; + }, + getDocID: (idty:any) => 'with ' + (idty.certs || []).length + ' certs' + })(doc, peers) + } + + async certForward(doc:any, peers:DBPeer[]) { + return this.forward({ + transform: Certification.statics.fromJSON, + type: 'Cert', + uri: '/wot/certify', + getObj: (cert:any) => { + return { + "cert": cert.getRaw() + }; + }, + getDocID: (idty:any) => 'with ' + (idty.certs || []).length + ' certs' + })(doc, peers) + } + + async revocationForward(doc:any, peers:DBPeer[]) { + return this.forward({ + transform: Revocation.statics.fromJSON, + type: 'Revocation', + uri: '/wot/revoke', + getObj: (revocation:any) => { + return { + "revocation": revocation.getRaw() + }; + } + })(doc, peers) + } + + async txForward(doc:any, peers:DBPeer[]) { + return this.forward({ + transform: Transaction.statics.fromJSON, + type: 'Transaction', + uri: '/tx/process', + getObj: (transaction:any) => { + return { + "transaction": transaction.getRaw(), + "signature": transaction.signature + }; + } + })(doc, peers) + } + + async peerForward(doc:any, peers:DBPeer[]) { + return this.forward({ + type: 'Peer', + uri: '/network/peering/peers', + transform: Peer.statics.peerize, + getObj: (peering:any) => { + return { + peer: peering.getRawSigned() + }; + }, + getDocID: (doc:any) => doc.keyID() + '#' + doc.block.match(/(\d+)-/)[1], + withIsolation: WITH_ISOLATION, + onError: (resJSON:any, peering:any, to:any) => { + const sentPeer = Peer.statics.peerize(peering); + if (Peer.statics.blockNumber(resJSON.peer) > sentPeer.blockNumber()) { + this.push({ outdated: true, peer: resJSON.peer }); + logger.warn('Outdated peer document (%s) sent to %s', sentPeer.keyID() + '#' + sentPeer.block.match(/(\d+)-/)[1], to); + } + return Promise.resolve(); + } + })(doc, peers) + } + + async msForward(doc:any, peers:DBPeer[]) { + return this.forward({ + transform: Membership.statics.fromJSON, + type: 'Membership', + uri: '/blockchain/membership', + getObj: (membership:any) => { + return { + "membership": membership.getRaw(), + "signature": membership.signature + }; + } + })(doc, peers) + } + + _write(obj:any, enc:any, done:any) { + this.emit(obj.type, obj.obj, obj.peers) + done() + } + + sendBlock(toPeer:any, block:any) { + return this.blockForward(block, [toPeer]) + } + + sendPeering(toPeer:any, peer:any) { + return this.peerForward(peer, [toPeer]) + } + + forward(params:any) { + return async (doc:any, peers:DBPeer[]) => { + try { + if(!params.withIsolation || !(this.conf && this.conf.isolate)) { + let theDoc = params.transform ? params.transform(doc) : doc; + logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length); + if (params.getDocID) { + logger.info('POST %s %s', params.type, params.getDocID(theDoc)); + } else { + logger.info('POST %s', params.type); + } + // Parallel treatment for superfast propagation + await Promise.all(peers.map(async (p) => { + let peer = Peer.statics.peerize(p); + const namedURL = peer.getNamedURL(); + logger.debug(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', namedURL); + try { + await this.post(peer, params.uri, params.getObj(theDoc)) + } catch (e) { + if (params.onError) { + try { + const json = JSON.parse(e.body); + await params.onError(json, doc, namedURL) + } catch (ex) { + logger.warn('Could not reach %s', namedURL); + } + } + } + })) + } else { + logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length); + } + } catch (err) { + logger.error(err); + } + } + } + + post(peer:any, uri:string, data:any) { + if (!peer.isReachable()) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + const postReq = request.post({ + "uri": protocol(peer.getPort()) + '://' + peer.getURL() + uri, + "timeout": this.timeout || constants.NETWORK.DEFAULT_TIMEOUT + }, (err:any, res:any) => { + if (err) { + this.push({ unreachable: true, peer: { pubkey: peer.pubkey }}); + logger.warn(err.message || err); + } + if (res && res.statusCode != 200) { + return reject(res); + } + resolve(res); + }) + postReq.form(data); + }); + } +} + +function protocol(port:number) { + return port == 443 ? 'https' : 'http'; +} diff --git a/app/lib/streams/router.js b/app/lib/streams/router.js index b8c75a01fab31e80af7d059d4fe83bf83edc24dd..8a81f3094dc00dd51e59fb4a9fd43775285658a4 100644 --- a/app/lib/streams/router.js +++ b/app/lib/streams/router.js @@ -1,140 +1,135 @@ "use strict"; - -const co = require('co'); -const util = require('util'); -const stream = require('stream'); -const Peer = require('../entity/peer'); -const constants = require('../constants'); - -module.exports = function (PeeringService, dal) { - return new Router(PeeringService, dal); -}; - -function Router (PeeringService, dal) { - - this.setConfDAL = (theDAL) => { - dal = theDAL; - }; - - const logger = require('../logger').NewLogger('router'); - - stream.Transform.call(this, { objectMode: true }); - - let active = true; - - this.setActive = (shouldBeActive) => active = shouldBeActive; - - const that = this; - - this._write = function (obj, enc, done) { - return co(function*() { - try { - if (obj.joiners) { - yield route('block', obj, getRandomInUPPeers(obj.issuer === PeeringService.pubkey)); - } - else if (obj.revocation) { - yield route('revocation', obj, getRandomInUPPeers(obj.pubkey === PeeringService.pubkey)); - } - else if (obj.pubkey && obj.uid) { - yield route('identity', obj, getRandomInUPPeers(obj.pubkey === PeeringService.pubkey)); - } - else if (obj.idty_uid) { - yield route('cert', obj, getRandomInUPPeers(obj.pubkey === PeeringService.pubkey)); - } - else if (obj.userid) { - yield route('membership', obj, getRandomInUPPeers(obj.issuer === PeeringService.pubkey)); - } - else if (obj.inputs) { - yield route('transaction', obj, getRandomInUPPeers(obj.issuers.indexOf(PeeringService.pubkey) !== -1)); - } - else if (obj.endpoints) { - yield route('peer', obj, getRandomInUPPeers(obj.pubkey === PeeringService.pubkey)); - } - else if (obj.from && obj.from == PeeringService.pubkey) { - // Route ONLY status emitted by this node - yield route('status', obj, getTargeted(obj.to || obj.idty_issuer)); - } - else if (obj.unreachable) { - yield dal.setPeerDown(obj.peer.pubkey); - logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey); - } - else if (obj.outdated) { - yield PeeringService.handleNewerPeer(obj.peer); - } - } catch (e) { - if (e && e.uerr && e.uerr.ucode == constants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.ucode) { - logger.info('Newer peer document available on the network for local node'); - } else { - logger.error("Routing error: %s", e && (e.stack || e.message || (e.uerr && e.uerr.message) || e)); - } - } - done && done(); +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); }); - }; - - function route (type, obj, getPeersFunc) { - return co(function*() { - if (!active) return; - const peers = yield getPeersFunc(); - that.push({ - 'type': type, - 'obj': obj, - 'peers': (peers || []).map(Peer.statics.peerize) - }); - }); - } - - function getRandomInUPPeers (isSelfDocument) { - return getValidUpPeers([PeeringService.pubkey], isSelfDocument); - } - - function getValidUpPeers (without, isSelfDocument) { - return function () { - return co(function *() { - let members = []; - let nonmembers = []; - let peers = yield dal.getRandomlyUPsWithout(without); // Peers with status UP - for (const p of peers) { - let isMember = yield dal.isMember(p.pubkey); - isMember ? members.push(p) : nonmembers.push(p); - } - members = chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO); - nonmembers = chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO); - let mainRoutes = members.map((p) => (p.member = true) && p).concat(nonmembers); - let mirrors = yield PeeringService.mirrorEndpoints(); - return mainRoutes.concat(mirrors.map((mep, index) => { return { - pubkey: 'M' + index + '_' + PeeringService.pubkey, - endpoints: [mep] - }})); - }); - }; - } - - /** - * Get the peer targeted by `to` argument, this node excluded (for not to loop on self). - */ - function getTargeted (to) { - return function () { - return co(function*() { - if (to == PeeringService.pubkey) { - return []; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +const stream = require("stream"); +const Peer = require('../entity/peer'); +const constants = require('../constants'); +class RouterStream extends stream.Transform { + constructor(peeringService, dal) { + super({ objectMode: true }); + this.peeringService = peeringService; + this.dal = dal; + this.active = true; + this.logger = require('../logger').NewLogger('router'); + } + setConfDAL(theDAL) { + this.dal = theDAL; + } + setActive(shouldBeActive) { + this.active = shouldBeActive; + } + _write(obj, enc, done) { + return __awaiter(this, void 0, void 0, function* () { + try { + if (obj.joiners) { + yield this.route('block', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)()); + } + else if (obj.revocation) { + yield this.route('revocation', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.pubkey && obj.uid) { + yield this.route('identity', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.idty_uid) { + yield this.route('cert', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.userid) { + yield this.route('membership', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)()); + } + else if (obj.inputs) { + yield this.route('transaction', obj, () => this.getRandomInUPPeers(obj.issuers.indexOf(this.peeringService.pubkey) !== -1)()); + } + else if (obj.endpoints) { + yield this.route('peer', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.from && obj.from == this.peeringService.pubkey) { + // Route ONLY status emitted by this node + yield this.route('status', obj, () => this.getTargeted(obj.to || obj.idty_issuer)()); + } + else if (obj.unreachable) { + yield this.dal.setPeerDown(obj.peer.pubkey); + this.logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey); + } + else if (obj.outdated) { + yield this.peeringService.handleNewerPeer(obj.peer); + } + } + catch (e) { + if (e && e.uerr && e.uerr.ucode == constants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.ucode) { + this.logger.info('Newer peer document available on the network for local node'); + } + else { + this.logger.error("Routing error: %s", e && (e.stack || e.message || (e.uerr && e.uerr.message) || e)); + } + } + done && done(); + }); + } + route(type, obj, getPeersFunc) { + return __awaiter(this, void 0, void 0, function* () { + if (!this.active) + return; + const peers = yield getPeersFunc(); + this.push({ + 'type': type, + 'obj': obj, + 'peers': (peers || []).map(Peer.statics.peerize) + }); + }); + } + getRandomInUPPeers(isSelfDocument) { + return this.getValidUpPeers([this.peeringService.pubkey], isSelfDocument); + } + getValidUpPeers(without, isSelfDocument) { + return () => __awaiter(this, void 0, void 0, function* () { + let members = []; + let nonmembers = []; + let peers = yield this.dal.getRandomlyUPsWithout(without); // Peers with status UP + for (const p of peers) { + let isMember = yield this.dal.isMember(p.pubkey); + isMember ? members.push(p) : nonmembers.push(p); + } + members = RouterStream.chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO); + nonmembers = RouterStream.chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO); + let mainRoutes = members.map((p) => (p.member = true) && p).concat(nonmembers); + let mirrors = yield this.peeringService.mirrorEndpoints(); + return mainRoutes.concat(mirrors.map((mep, index) => { + return { + pubkey: 'M' + index + '_' + this.peeringService.pubkey, + endpoints: [mep] + }; + })); + }); + } + /** + * Get the peer targeted by `to` argument, this node excluded (for not to loop on self). + */ + getTargeted(to) { + return () => __awaiter(this, void 0, void 0, function* () { + if (to == this.peeringService.pubkey) { + return []; + } + const peer = yield this.dal.getPeer(to); + return [peer]; + }); + } + static chooseXin(peers, max) { + const chosen = []; + const nbPeers = peers.length; + for (let i = 0; i < Math.min(nbPeers, max); i++) { + const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0); + chosen.push(peers[randIndex]); + peers.splice(randIndex, 1); } - const peer = yield dal.getPeer(to); - return [peer]; - }); - }; - } - - function chooseXin (peers, max) { - const chosen = []; - const nbPeers = peers.length; - for (let i = 0; i < Math.min(nbPeers, max); i++) { - const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0); - chosen.push(peers[randIndex]); - peers.splice(randIndex, 1); + return chosen; } - return chosen; - } } - -util.inherits(Router, stream.Transform); +exports.RouterStream = RouterStream; +//# sourceMappingURL=router.js.map \ No newline at end of file diff --git a/app/lib/streams/router.ts b/app/lib/streams/router.ts new file mode 100644 index 0000000000000000000000000000000000000000..169f2c3963d1741dacdcf7b55f4f9f99c675fb8c --- /dev/null +++ b/app/lib/streams/router.ts @@ -0,0 +1,129 @@ +import * as stream from "stream" +import {PeeringService} from "../../service/PeeringService" +import {FileDAL} from "../dal/fileDAL" +import {DBPeer} from "../dal/sqliteDAL/PeerDAL" + +const Peer = require('../entity/peer'); +const constants = require('../constants'); + +export class RouterStream extends stream.Transform { + + logger:any + active = true + + constructor(private peeringService:PeeringService, private dal:FileDAL) { + super({ objectMode: true }) + + this.logger = require('../logger').NewLogger('router') + } + + setConfDAL(theDAL:FileDAL) { + this.dal = theDAL + } + + setActive(shouldBeActive:boolean) { + this.active = shouldBeActive + } + + async _write(obj:any, enc:any, done:any) { + try { + if (obj.joiners) { + await this.route('block', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)()); + } + else if (obj.revocation) { + await this.route('revocation', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.pubkey && obj.uid) { + await this.route('identity', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.idty_uid) { + await this.route('cert', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.userid) { + await this.route('membership', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)()); + } + else if (obj.inputs) { + await this.route('transaction', obj, () => this.getRandomInUPPeers(obj.issuers.indexOf(this.peeringService.pubkey) !== -1)()); + } + else if (obj.endpoints) { + await this.route('peer', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); + } + else if (obj.from && obj.from == this.peeringService.pubkey) { + // Route ONLY status emitted by this node + await this.route('status', obj, () => this.getTargeted(obj.to || obj.idty_issuer)()); + } + else if (obj.unreachable) { + await this.dal.setPeerDown(obj.peer.pubkey); + this.logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey); + } + else if (obj.outdated) { + await this.peeringService.handleNewerPeer(obj.peer); + } + } catch (e) { + if (e && e.uerr && e.uerr.ucode == constants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.ucode) { + this.logger.info('Newer peer document available on the network for local node'); + } else { + this.logger.error("Routing error: %s", e && (e.stack || e.message || (e.uerr && e.uerr.message) || e)); + } + } + done && done(); + } + + private async route(type:string, obj:any, getPeersFunc:any) { + if (!this.active) return; + const peers = await getPeersFunc(); + this.push({ + 'type': type, + 'obj': obj, + 'peers': (peers || []).map(Peer.statics.peerize) + }) + } + + private getRandomInUPPeers (isSelfDocument:boolean): () => Promise<any> { + return this.getValidUpPeers([this.peeringService.pubkey], isSelfDocument); + } + + private getValidUpPeers (without:any, isSelfDocument:boolean) { + return async () => { + let members:DBPeer[] = []; + let nonmembers:DBPeer[] = []; + let peers = await this.dal.getRandomlyUPsWithout(without); // Peers with status UP + for (const p of peers) { + let isMember = await this.dal.isMember(p.pubkey); + isMember ? members.push(p) : nonmembers.push(p); + } + members = RouterStream.chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO); + nonmembers = RouterStream.chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO); + let mainRoutes:any = members.map((p:any) => (p.member = true) && p).concat(nonmembers); + let mirrors = await this.peeringService.mirrorEndpoints(); + return mainRoutes.concat(mirrors.map((mep, index) => { return { + pubkey: 'M' + index + '_' + this.peeringService.pubkey, + endpoints: [mep] + }})); + } + } + + /** + * Get the peer targeted by `to` argument, this node excluded (for not to loop on self). + */ + private getTargeted(to:string) { + return async () => { + if (to == this.peeringService.pubkey) { + return []; + } + const peer = await this.dal.getPeer(to); + return [peer]; + }; + } + + static chooseXin(peers:DBPeer[], max:number) { + const chosen:DBPeer[] = []; + const nbPeers = peers.length; + for (let i = 0; i < Math.min(nbPeers, max); i++) { + const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0); + chosen.push(peers[randIndex]); + peers.splice(randIndex, 1); + } + return chosen; + } +} diff --git a/app/modules/router.ts b/app/modules/router.ts index a56c3de677fa47a540b6dfae108e9bce22bcb5c1..7a350a2be5e7e707cd7d4ef9d126a7549ea2bd49 100644 --- a/app/modules/router.ts +++ b/app/modules/router.ts @@ -1,10 +1,10 @@ "use strict"; import {ConfDTO} from "../lib/dto/ConfDTO" import * as stream from "stream" +import {Multicaster} from "../lib/streams/multicaster" +import {RouterStream} from "../lib/streams/router" const constants = require('../lib/constants'); -const router = require('../lib/streams/router'); -const multicaster = require('../lib/streams/multicaster'); module.exports = { duniter: { @@ -28,7 +28,7 @@ module.exports = { class Router extends stream.Transform { theRouter:any - theMulticaster:any = multicaster() + theMulticaster:Multicaster = new Multicaster() constructor(private server:any) { super({ objectMode: true }) @@ -44,7 +44,7 @@ class Router extends stream.Transform { async startService() { if (!this.theRouter) { - this.theRouter = router(this.server.PeeringService, this.server.dal); + this.theRouter = new RouterStream(this.server.PeeringService, this.server.dal) } this.theRouter.setActive(true); this.theRouter.setConfDAL(this.server.dal); diff --git a/app/service/PeeringService.ts b/app/service/PeeringService.ts index 05183b5743e9b8d9de6975ab7e29d5cad6fb7c27..63b9448d13a8658213ddac416342e9b534527fb9 100644 --- a/app/service/PeeringService.ts +++ b/app/service/PeeringService.ts @@ -3,12 +3,12 @@ import {ConfDTO} from "../lib/dto/ConfDTO" import {FileDAL} from "../lib/dal/fileDAL" import {DBPeer} from "../lib/dal/sqliteDAL/PeerDAL" import {DBBlock} from "../lib/db/DBBlock" +import {Multicaster} from "../lib/streams/multicaster" const util = require('util'); const _ = require('underscore'); const events = require('events'); const rp = require('request-promise'); -const multicaster = require('../lib/streams/multicaster'); const keyring = require('duniter-common').keyring; const logger = require('../lib/logger').NewLogger('peering'); const dos2unix = require('duniter-common').dos2unix; @@ -121,7 +121,7 @@ export class PeeringService { peerEntity = Peer.statics.peerize(found); if (interfacesChanged) { // Warns the old peer of the change - const caster = multicaster(); + const caster = new Multicaster(); caster.sendPeering(Peer.statics.peerize(peerEntity), Peer.statics.peerize(thePeer)); } thePeer.copyValues(peerEntity); diff --git a/test/integration/peer-outdated.js b/test/integration/peer-outdated.js index 619a40a6b545333656c06ba2c29c55f41a9d4e98..3e5a92dd14dfa3d284a16a0fde220fefba7fd20d 100644 --- a/test/integration/peer-outdated.js +++ b/test/integration/peer-outdated.js @@ -9,7 +9,7 @@ const user = require('./tools/user'); const commit = require('./tools/commit'); const until = require('./tools/until'); const toolbox = require('./tools/toolbox'); -const multicaster = require('../../app/lib/streams/multicaster'); +const Multicaster = require('../../app/lib/streams/multicaster').Multicaster const Peer = require('../../app/lib/entity/peer'); const s1 = toolbox.server({ @@ -74,7 +74,7 @@ describe("Peer document expiry", function() { })); it('routing V1 peer document should raise an "outdated" event', () => co(function*() { - const caster = multicaster(); + const caster = new Multicaster(); return new Promise((resolve) => { caster .pipe(es.mapSync((obj) => {