Commit 01bae009 authored by Cédric Moreau's avatar Cédric Moreau

[fix] #1037 Migrate streams "multicaster" "router"

parent a9d607f2
......@@ -12,7 +12,8 @@ app/lib/dal/fileDALs/*.js
app/lib/dal/fileDAL.js
app/service/*.js
app/lib/rules/*.js
app/lib/system/directory.js
app/lib/system/*.js
app/lib/streams/*.js
app/modules/wizard.js
app/modules/router.js
app/modules/revert.js
......
......@@ -50,6 +50,7 @@ app/lib/dal/fileDAL.js*
app/lib/rules/*.js*
app/lib/logger*js*
app/lib/system/directory.js*
app/lib/streams/*.js*
app/service/*.js*
app/lib/wot.js*
app/modules/prover/*.js*
......
This diff is collapsed.
import {ConfDTO} from "../dto/ConfDTO"
import * as stream from "stream"
import {DBPeer} from "../dal/sqliteDAL/PeerDAL"
const request = require('request');
const constants = require('../../lib/constants');
const Peer = require('../../lib/entity/peer');
const Identity = require('../../lib/entity/identity');
const Certification = require('../../lib/entity/certification');
const Revocation = require('../../lib/entity/revocation');
const Membership = require('../../lib/entity/membership');
const Block = require('../../lib/entity/block');
const Transaction = require('../../lib/entity/transaction');
const logger = require('../logger').NewLogger('multicaster');
const WITH_ISOLATION = true;
export class Multicaster extends stream.Transform {
constructor(private conf:ConfDTO|null = null, private timeout:number = 0) {
super({ objectMode: true })
this.on('identity', (data:any, peers:DBPeer[]) => this.idtyForward(data, peers))
this.on('cert', (data:any, peers:DBPeer[]) => this.certForward(data, peers))
this.on('revocation', (data:any, peers:DBPeer[]) => this.revocationForward(data, peers))
this.on('block', (data:any, peers:DBPeer[]) => this.blockForward(data, peers))
this.on('transaction', (data:any, peers:DBPeer[]) => this.txForward(data, peers))
this.on('peer', (data:any, peers:DBPeer[]) => this.peerForward(data, peers))
this.on('membership', (data:any, peers:DBPeer[]) => this.msForward(data, peers))
}
async blockForward(doc:any, peers:DBPeer[]) {
return this.forward({
transform: Block.statics.fromJSON,
type: 'Block',
uri: '/blockchain/block',
getObj: (block:any) => {
return {
"block": block.getRawSigned()
};
},
getDocID: (block:any) => 'block#' + block.number
})(doc, peers)
}
async idtyForward(doc:any, peers:DBPeer[]) {
return this.forward({
transform: Identity.statics.fromJSON,
type: 'Identity',
uri: '/wot/add',
getObj: (idty:any) => {
return {
"identity": idty.createIdentity()
};
},
getDocID: (idty:any) => 'with ' + (idty.certs || []).length + ' certs'
})(doc, peers)
}
async certForward(doc:any, peers:DBPeer[]) {
return this.forward({
transform: Certification.statics.fromJSON,
type: 'Cert',
uri: '/wot/certify',
getObj: (cert:any) => {
return {
"cert": cert.getRaw()
};
},
getDocID: (idty:any) => 'with ' + (idty.certs || []).length + ' certs'
})(doc, peers)
}
async revocationForward(doc:any, peers:DBPeer[]) {
return this.forward({
transform: Revocation.statics.fromJSON,
type: 'Revocation',
uri: '/wot/revoke',
getObj: (revocation:any) => {
return {
"revocation": revocation.getRaw()
};
}
})(doc, peers)
}
async txForward(doc:any, peers:DBPeer[]) {
return this.forward({
transform: Transaction.statics.fromJSON,
type: 'Transaction',
uri: '/tx/process',
getObj: (transaction:any) => {
return {
"transaction": transaction.getRaw(),
"signature": transaction.signature
};
}
})(doc, peers)
}
async peerForward(doc:any, peers:DBPeer[]) {
return this.forward({
type: 'Peer',
uri: '/network/peering/peers',
transform: Peer.statics.peerize,
getObj: (peering:any) => {
return {
peer: peering.getRawSigned()
};
},
getDocID: (doc:any) => doc.keyID() + '#' + doc.block.match(/(\d+)-/)[1],
withIsolation: WITH_ISOLATION,
onError: (resJSON:any, peering:any, to:any) => {
const sentPeer = Peer.statics.peerize(peering);
if (Peer.statics.blockNumber(resJSON.peer) > sentPeer.blockNumber()) {
this.push({ outdated: true, peer: resJSON.peer });
logger.warn('Outdated peer document (%s) sent to %s', sentPeer.keyID() + '#' + sentPeer.block.match(/(\d+)-/)[1], to);
}
return Promise.resolve();
}
})(doc, peers)
}
async msForward(doc:any, peers:DBPeer[]) {
return this.forward({
transform: Membership.statics.fromJSON,
type: 'Membership',
uri: '/blockchain/membership',
getObj: (membership:any) => {
return {
"membership": membership.getRaw(),
"signature": membership.signature
};
}
})(doc, peers)
}
_write(obj:any, enc:any, done:any) {
this.emit(obj.type, obj.obj, obj.peers)
done()
}
sendBlock(toPeer:any, block:any) {
return this.blockForward(block, [toPeer])
}
sendPeering(toPeer:any, peer:any) {
return this.peerForward(peer, [toPeer])
}
forward(params:any) {
return async (doc:any, peers:DBPeer[]) => {
try {
if(!params.withIsolation || !(this.conf && this.conf.isolate)) {
let theDoc = params.transform ? params.transform(doc) : doc;
logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length);
if (params.getDocID) {
logger.info('POST %s %s', params.type, params.getDocID(theDoc));
} else {
logger.info('POST %s', params.type);
}
// Parallel treatment for superfast propagation
await Promise.all(peers.map(async (p) => {
let peer = Peer.statics.peerize(p);
const namedURL = peer.getNamedURL();
logger.debug(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', namedURL);
try {
await this.post(peer, params.uri, params.getObj(theDoc))
} catch (e) {
if (params.onError) {
try {
const json = JSON.parse(e.body);
await params.onError(json, doc, namedURL)
} catch (ex) {
logger.warn('Could not reach %s', namedURL);
}
}
}
}))
} else {
logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length);
}
} catch (err) {
logger.error(err);
}
}
}
post(peer:any, uri:string, data:any) {
if (!peer.isReachable()) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
const postReq = request.post({
"uri": protocol(peer.getPort()) + '://' + peer.getURL() + uri,
"timeout": this.timeout || constants.NETWORK.DEFAULT_TIMEOUT
}, (err:any, res:any) => {
if (err) {
this.push({ unreachable: true, peer: { pubkey: peer.pubkey }});
logger.warn(err.message || err);
}
if (res && res.statusCode != 200) {
return reject(res);
}
resolve(res);
})
postReq.form(data);
});
}
}
function protocol(port:number) {
return port == 443 ? 'https' : 'http';
}
This diff is collapsed.
import * as stream from "stream"
import {PeeringService} from "../../service/PeeringService"
import {FileDAL} from "../dal/fileDAL"
import {DBPeer} from "../dal/sqliteDAL/PeerDAL"
const Peer = require('../entity/peer');
const constants = require('../constants');
export class RouterStream extends stream.Transform {
logger:any
active = true
constructor(private peeringService:PeeringService, private dal:FileDAL) {
super({ objectMode: true })
this.logger = require('../logger').NewLogger('router')
}
setConfDAL(theDAL:FileDAL) {
this.dal = theDAL
}
setActive(shouldBeActive:boolean) {
this.active = shouldBeActive
}
async _write(obj:any, enc:any, done:any) {
try {
if (obj.joiners) {
await this.route('block', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)());
}
else if (obj.revocation) {
await this.route('revocation', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.pubkey && obj.uid) {
await this.route('identity', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.idty_uid) {
await this.route('cert', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.userid) {
await this.route('membership', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)());
}
else if (obj.inputs) {
await this.route('transaction', obj, () => this.getRandomInUPPeers(obj.issuers.indexOf(this.peeringService.pubkey) !== -1)());
}
else if (obj.endpoints) {
await this.route('peer', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.from && obj.from == this.peeringService.pubkey) {
// Route ONLY status emitted by this node
await this.route('status', obj, () => this.getTargeted(obj.to || obj.idty_issuer)());
}
else if (obj.unreachable) {
await this.dal.setPeerDown(obj.peer.pubkey);
this.logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey);
}
else if (obj.outdated) {
await this.peeringService.handleNewerPeer(obj.peer);
}
} catch (e) {
if (e && e.uerr && e.uerr.ucode == constants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.ucode) {
this.logger.info('Newer peer document available on the network for local node');
} else {
this.logger.error("Routing error: %s", e && (e.stack || e.message || (e.uerr && e.uerr.message) || e));
}
}
done && done();
}
private async route(type:string, obj:any, getPeersFunc:any) {
if (!this.active) return;
const peers = await getPeersFunc();
this.push({
'type': type,
'obj': obj,
'peers': (peers || []).map(Peer.statics.peerize)
})
}
private getRandomInUPPeers (isSelfDocument:boolean): () => Promise<any> {
return this.getValidUpPeers([this.peeringService.pubkey], isSelfDocument);
}
private getValidUpPeers (without:any, isSelfDocument:boolean) {
return async () => {
let members:DBPeer[] = [];
let nonmembers:DBPeer[] = [];
let peers = await this.dal.getRandomlyUPsWithout(without); // Peers with status UP
for (const p of peers) {
let isMember = await this.dal.isMember(p.pubkey);
isMember ? members.push(p) : nonmembers.push(p);
}
members = RouterStream.chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO);
nonmembers = RouterStream.chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO);
let mainRoutes:any = members.map((p:any) => (p.member = true) && p).concat(nonmembers);
let mirrors = await this.peeringService.mirrorEndpoints();
return mainRoutes.concat(mirrors.map((mep, index) => { return {
pubkey: 'M' + index + '_' + this.peeringService.pubkey,
endpoints: [mep]
}}));
}
}
/**
* Get the peer targeted by `to` argument, this node excluded (for not to loop on self).
*/
private getTargeted(to:string) {
return async () => {
if (to == this.peeringService.pubkey) {
return [];
}
const peer = await this.dal.getPeer(to);
return [peer];
};
}
static chooseXin(peers:DBPeer[], max:number) {
const chosen:DBPeer[] = [];
const nbPeers = peers.length;
for (let i = 0; i < Math.min(nbPeers, max); i++) {
const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0);
chosen.push(peers[randIndex]);
peers.splice(randIndex, 1);
}
return chosen;
}
}
"use strict";
import {ConfDTO} from "../lib/dto/ConfDTO"
import * as stream from "stream"
import {Multicaster} from "../lib/streams/multicaster"
import {RouterStream} from "../lib/streams/router"
const constants = require('../lib/constants');
const router = require('../lib/streams/router');
const multicaster = require('../lib/streams/multicaster');
module.exports = {
duniter: {
......@@ -28,7 +28,7 @@ module.exports = {
class Router extends stream.Transform {
theRouter:any
theMulticaster:any = multicaster()
theMulticaster:Multicaster = new Multicaster()
constructor(private server:any) {
super({ objectMode: true })
......@@ -44,7 +44,7 @@ class Router extends stream.Transform {
async startService() {
if (!this.theRouter) {
this.theRouter = router(this.server.PeeringService, this.server.dal);
this.theRouter = new RouterStream(this.server.PeeringService, this.server.dal)
}
this.theRouter.setActive(true);
this.theRouter.setConfDAL(this.server.dal);
......
......@@ -3,12 +3,12 @@ import {ConfDTO} from "../lib/dto/ConfDTO"
import {FileDAL} from "../lib/dal/fileDAL"
import {DBPeer} from "../lib/dal/sqliteDAL/PeerDAL"
import {DBBlock} from "../lib/db/DBBlock"
import {Multicaster} from "../lib/streams/multicaster"
const util = require('util');
const _ = require('underscore');
const events = require('events');
const rp = require('request-promise');
const multicaster = require('../lib/streams/multicaster');
const keyring = require('duniter-common').keyring;
const logger = require('../lib/logger').NewLogger('peering');
const dos2unix = require('duniter-common').dos2unix;
......@@ -121,7 +121,7 @@ export class PeeringService {
peerEntity = Peer.statics.peerize(found);
if (interfacesChanged) {
// Warns the old peer of the change
const caster = multicaster();
const caster = new Multicaster();
caster.sendPeering(Peer.statics.peerize(peerEntity), Peer.statics.peerize(thePeer));
}
thePeer.copyValues(peerEntity);
......
......@@ -9,7 +9,7 @@ const user = require('./tools/user');
const commit = require('./tools/commit');
const until = require('./tools/until');
const toolbox = require('./tools/toolbox');
const multicaster = require('../../app/lib/streams/multicaster');
const Multicaster = require('../../app/lib/streams/multicaster').Multicaster
const Peer = require('../../app/lib/entity/peer');
const s1 = toolbox.server({
......@@ -74,7 +74,7 @@ describe("Peer document expiry", function() {
}));
it('routing V1 peer document should raise an "outdated" event', () => co(function*() {
const caster = multicaster();
const caster = new Multicaster();
return new Promise((resolve) => {
caster
.pipe(es.mapSync((obj) => {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment