diff --git a/app/lib/streams/multicaster.js b/app/lib/streams/multicaster.js deleted file mode 100644 index e82a8aaa2c8f1252e368c55661232c7e1c1f8943..0000000000000000000000000000000000000000 --- a/app/lib/streams/multicaster.js +++ /dev/null @@ -1,227 +0,0 @@ -"use strict"; -var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { - return new (P || (P = Promise))(function (resolve, reject) { - function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } - function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } - function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); } - step((generator = generator.apply(thisArg, _arguments || [])).next()); - }); -}; -Object.defineProperty(exports, "__esModule", { value: true }); -const stream = require("stream"); -const request = require('request'); -const constants = require('../../lib/constants'); -const Peer = require('../../lib/entity/peer'); -const Identity = require('../../lib/entity/identity'); -const Certification = require('../../lib/entity/certification'); -const Revocation = require('../../lib/entity/revocation'); -const Membership = require('../../lib/entity/membership'); -const Block = require('../../lib/entity/block'); -const Transaction = require('../../lib/entity/transaction'); -const logger = require('../logger').NewLogger('multicaster'); -const WITH_ISOLATION = true; -class Multicaster extends stream.Transform { - constructor(conf = null, timeout = 0) { - super({ objectMode: true }); - this.conf = conf; - this.timeout = timeout; - this.on('identity', (data, peers) => this.idtyForward(data, peers)); - this.on('cert', (data, peers) => this.certForward(data, peers)); - this.on('revocation', (data, peers) => this.revocationForward(data, peers)); - this.on('block', (data, peers) => this.blockForward(data, peers)); - this.on('transaction', (data, peers) => this.txForward(data, peers)); - this.on('peer', (data, peers) => this.peerForward(data, peers)); - this.on('membership', (data, peers) => this.msForward(data, peers)); - } - blockForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - transform: Block.statics.fromJSON, - type: 'Block', - uri: '/blockchain/block', - getObj: (block) => { - return { - "block": block.getRawSigned() - }; - }, - getDocID: (block) => 'block#' + block.number - })(doc, peers); - }); - } - idtyForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - transform: Identity.statics.fromJSON, - type: 'Identity', - uri: '/wot/add', - getObj: (idty) => { - return { - "identity": idty.createIdentity() - }; - }, - getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs' - })(doc, peers); - }); - } - certForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - transform: Certification.statics.fromJSON, - type: 'Cert', - uri: '/wot/certify', - getObj: (cert) => { - return { - "cert": cert.getRaw() - }; - }, - getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs' - })(doc, peers); - }); - } - revocationForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - transform: Revocation.statics.fromJSON, - type: 'Revocation', - uri: '/wot/revoke', - getObj: (revocation) => { - return { - "revocation": revocation.getRaw() - }; - } - })(doc, peers); - }); - } - txForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - transform: Transaction.statics.fromJSON, - type: 'Transaction', - uri: '/tx/process', - getObj: (transaction) => { - return { - "transaction": transaction.getRaw(), - "signature": transaction.signature - }; - } - })(doc, peers); - }); - } - peerForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - type: 'Peer', - uri: '/network/peering/peers', - transform: Peer.statics.peerize, - getObj: (peering) => { - return { - peer: peering.getRawSigned() - }; - }, - getDocID: (doc) => doc.keyID() + '#' + doc.block.match(/(\d+)-/)[1], - withIsolation: WITH_ISOLATION, - onError: (resJSON, peering, to) => { - const sentPeer = Peer.statics.peerize(peering); - if (Peer.statics.blockNumber(resJSON.peer) > sentPeer.blockNumber()) { - this.push({ outdated: true, peer: resJSON.peer }); - logger.warn('Outdated peer document (%s) sent to %s', sentPeer.keyID() + '#' + sentPeer.block.match(/(\d+)-/)[1], to); - } - return Promise.resolve(); - } - })(doc, peers); - }); - } - msForward(doc, peers) { - return __awaiter(this, void 0, void 0, function* () { - return this.forward({ - transform: Membership.statics.fromJSON, - type: 'Membership', - uri: '/blockchain/membership', - getObj: (membership) => { - return { - "membership": membership.getRaw(), - "signature": membership.signature - }; - } - })(doc, peers); - }); - } - _write(obj, enc, done) { - this.emit(obj.type, obj.obj, obj.peers); - done(); - } - sendBlock(toPeer, block) { - return this.blockForward(block, [toPeer]); - } - sendPeering(toPeer, peer) { - return this.peerForward(peer, [toPeer]); - } - forward(params) { - return (doc, peers) => __awaiter(this, void 0, void 0, function* () { - try { - if (!params.withIsolation || !(this.conf && this.conf.isolate)) { - let theDoc = params.transform ? params.transform(doc) : doc; - logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length); - if (params.getDocID) { - logger.info('POST %s %s', params.type, params.getDocID(theDoc)); - } - else { - logger.info('POST %s', params.type); - } - // Parallel treatment for superfast propagation - yield Promise.all(peers.map((p) => __awaiter(this, void 0, void 0, function* () { - let peer = Peer.statics.peerize(p); - const namedURL = peer.getNamedURL(); - logger.debug(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', namedURL); - try { - yield this.post(peer, params.uri, params.getObj(theDoc)); - } - catch (e) { - if (params.onError) { - try { - const json = JSON.parse(e.body); - yield params.onError(json, doc, namedURL); - } - catch (ex) { - logger.warn('Could not reach %s', namedURL); - } - } - } - }))); - } - else { - logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length); - } - } - catch (err) { - logger.error(err); - } - }); - } - post(peer, uri, data) { - if (!peer.isReachable()) { - return Promise.resolve(); - } - return new Promise((resolve, reject) => { - const postReq = request.post({ - "uri": protocol(peer.getPort()) + '://' + peer.getURL() + uri, - "timeout": this.timeout || constants.NETWORK.DEFAULT_TIMEOUT - }, (err, res) => { - if (err) { - this.push({ unreachable: true, peer: { pubkey: peer.pubkey } }); - logger.warn(err.message || err); - } - if (res && res.statusCode != 200) { - return reject(res); - } - resolve(res); - }); - postReq.form(data); - }); - } -} -exports.Multicaster = Multicaster; -function protocol(port) { - return port == 443 ? 'https' : 'http'; -} -//# sourceMappingURL=multicaster.js.map \ No newline at end of file diff --git a/app/lib/streams/router.js b/app/lib/streams/router.js deleted file mode 100644 index 8a81f3094dc00dd51e59fb4a9fd43775285658a4..0000000000000000000000000000000000000000 --- a/app/lib/streams/router.js +++ /dev/null @@ -1,135 +0,0 @@ -"use strict"; -var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { - return new (P || (P = Promise))(function (resolve, reject) { - function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } - function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } - function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); } - step((generator = generator.apply(thisArg, _arguments || [])).next()); - }); -}; -Object.defineProperty(exports, "__esModule", { value: true }); -const stream = require("stream"); -const Peer = require('../entity/peer'); -const constants = require('../constants'); -class RouterStream extends stream.Transform { - constructor(peeringService, dal) { - super({ objectMode: true }); - this.peeringService = peeringService; - this.dal = dal; - this.active = true; - this.logger = require('../logger').NewLogger('router'); - } - setConfDAL(theDAL) { - this.dal = theDAL; - } - setActive(shouldBeActive) { - this.active = shouldBeActive; - } - _write(obj, enc, done) { - return __awaiter(this, void 0, void 0, function* () { - try { - if (obj.joiners) { - yield this.route('block', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)()); - } - else if (obj.revocation) { - yield this.route('revocation', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); - } - else if (obj.pubkey && obj.uid) { - yield this.route('identity', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); - } - else if (obj.idty_uid) { - yield this.route('cert', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); - } - else if (obj.userid) { - yield this.route('membership', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)()); - } - else if (obj.inputs) { - yield this.route('transaction', obj, () => this.getRandomInUPPeers(obj.issuers.indexOf(this.peeringService.pubkey) !== -1)()); - } - else if (obj.endpoints) { - yield this.route('peer', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)()); - } - else if (obj.from && obj.from == this.peeringService.pubkey) { - // Route ONLY status emitted by this node - yield this.route('status', obj, () => this.getTargeted(obj.to || obj.idty_issuer)()); - } - else if (obj.unreachable) { - yield this.dal.setPeerDown(obj.peer.pubkey); - this.logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey); - } - else if (obj.outdated) { - yield this.peeringService.handleNewerPeer(obj.peer); - } - } - catch (e) { - if (e && e.uerr && e.uerr.ucode == constants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.ucode) { - this.logger.info('Newer peer document available on the network for local node'); - } - else { - this.logger.error("Routing error: %s", e && (e.stack || e.message || (e.uerr && e.uerr.message) || e)); - } - } - done && done(); - }); - } - route(type, obj, getPeersFunc) { - return __awaiter(this, void 0, void 0, function* () { - if (!this.active) - return; - const peers = yield getPeersFunc(); - this.push({ - 'type': type, - 'obj': obj, - 'peers': (peers || []).map(Peer.statics.peerize) - }); - }); - } - getRandomInUPPeers(isSelfDocument) { - return this.getValidUpPeers([this.peeringService.pubkey], isSelfDocument); - } - getValidUpPeers(without, isSelfDocument) { - return () => __awaiter(this, void 0, void 0, function* () { - let members = []; - let nonmembers = []; - let peers = yield this.dal.getRandomlyUPsWithout(without); // Peers with status UP - for (const p of peers) { - let isMember = yield this.dal.isMember(p.pubkey); - isMember ? members.push(p) : nonmembers.push(p); - } - members = RouterStream.chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO); - nonmembers = RouterStream.chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO); - let mainRoutes = members.map((p) => (p.member = true) && p).concat(nonmembers); - let mirrors = yield this.peeringService.mirrorEndpoints(); - return mainRoutes.concat(mirrors.map((mep, index) => { - return { - pubkey: 'M' + index + '_' + this.peeringService.pubkey, - endpoints: [mep] - }; - })); - }); - } - /** - * Get the peer targeted by `to` argument, this node excluded (for not to loop on self). - */ - getTargeted(to) { - return () => __awaiter(this, void 0, void 0, function* () { - if (to == this.peeringService.pubkey) { - return []; - } - const peer = yield this.dal.getPeer(to); - return [peer]; - }); - } - static chooseXin(peers, max) { - const chosen = []; - const nbPeers = peers.length; - for (let i = 0; i < Math.min(nbPeers, max); i++) { - const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0); - chosen.push(peers[randIndex]); - peers.splice(randIndex, 1); - } - return chosen; - } -} -exports.RouterStream = RouterStream; -//# sourceMappingURL=router.js.map \ No newline at end of file