Skip to content
Snippets Groups Projects
Commit 20e89508 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1037 Stream files not deleted

parent 01bae009
Branches
Tags
No related merge requests found
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
const stream = require("stream");
const request = require('request');
const constants = require('../../lib/constants');
const Peer = require('../../lib/entity/peer');
const Identity = require('../../lib/entity/identity');
const Certification = require('../../lib/entity/certification');
const Revocation = require('../../lib/entity/revocation');
const Membership = require('../../lib/entity/membership');
const Block = require('../../lib/entity/block');
const Transaction = require('../../lib/entity/transaction');
const logger = require('../logger').NewLogger('multicaster');
const WITH_ISOLATION = true;
class Multicaster extends stream.Transform {
constructor(conf = null, timeout = 0) {
super({ objectMode: true });
this.conf = conf;
this.timeout = timeout;
this.on('identity', (data, peers) => this.idtyForward(data, peers));
this.on('cert', (data, peers) => this.certForward(data, peers));
this.on('revocation', (data, peers) => this.revocationForward(data, peers));
this.on('block', (data, peers) => this.blockForward(data, peers));
this.on('transaction', (data, peers) => this.txForward(data, peers));
this.on('peer', (data, peers) => this.peerForward(data, peers));
this.on('membership', (data, peers) => this.msForward(data, peers));
}
blockForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
transform: Block.statics.fromJSON,
type: 'Block',
uri: '/blockchain/block',
getObj: (block) => {
return {
"block": block.getRawSigned()
};
},
getDocID: (block) => 'block#' + block.number
})(doc, peers);
});
}
idtyForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
transform: Identity.statics.fromJSON,
type: 'Identity',
uri: '/wot/add',
getObj: (idty) => {
return {
"identity": idty.createIdentity()
};
},
getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs'
})(doc, peers);
});
}
certForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
transform: Certification.statics.fromJSON,
type: 'Cert',
uri: '/wot/certify',
getObj: (cert) => {
return {
"cert": cert.getRaw()
};
},
getDocID: (idty) => 'with ' + (idty.certs || []).length + ' certs'
})(doc, peers);
});
}
revocationForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
transform: Revocation.statics.fromJSON,
type: 'Revocation',
uri: '/wot/revoke',
getObj: (revocation) => {
return {
"revocation": revocation.getRaw()
};
}
})(doc, peers);
});
}
txForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
transform: Transaction.statics.fromJSON,
type: 'Transaction',
uri: '/tx/process',
getObj: (transaction) => {
return {
"transaction": transaction.getRaw(),
"signature": transaction.signature
};
}
})(doc, peers);
});
}
peerForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
type: 'Peer',
uri: '/network/peering/peers',
transform: Peer.statics.peerize,
getObj: (peering) => {
return {
peer: peering.getRawSigned()
};
},
getDocID: (doc) => doc.keyID() + '#' + doc.block.match(/(\d+)-/)[1],
withIsolation: WITH_ISOLATION,
onError: (resJSON, peering, to) => {
const sentPeer = Peer.statics.peerize(peering);
if (Peer.statics.blockNumber(resJSON.peer) > sentPeer.blockNumber()) {
this.push({ outdated: true, peer: resJSON.peer });
logger.warn('Outdated peer document (%s) sent to %s', sentPeer.keyID() + '#' + sentPeer.block.match(/(\d+)-/)[1], to);
}
return Promise.resolve();
}
})(doc, peers);
});
}
msForward(doc, peers) {
return __awaiter(this, void 0, void 0, function* () {
return this.forward({
transform: Membership.statics.fromJSON,
type: 'Membership',
uri: '/blockchain/membership',
getObj: (membership) => {
return {
"membership": membership.getRaw(),
"signature": membership.signature
};
}
})(doc, peers);
});
}
_write(obj, enc, done) {
this.emit(obj.type, obj.obj, obj.peers);
done();
}
sendBlock(toPeer, block) {
return this.blockForward(block, [toPeer]);
}
sendPeering(toPeer, peer) {
return this.peerForward(peer, [toPeer]);
}
forward(params) {
return (doc, peers) => __awaiter(this, void 0, void 0, function* () {
try {
if (!params.withIsolation || !(this.conf && this.conf.isolate)) {
let theDoc = params.transform ? params.transform(doc) : doc;
logger.debug('--> new %s to be sent to %s peer(s)', params.type, peers.length);
if (params.getDocID) {
logger.info('POST %s %s', params.type, params.getDocID(theDoc));
}
else {
logger.info('POST %s', params.type);
}
// Parallel treatment for superfast propagation
yield Promise.all(peers.map((p) => __awaiter(this, void 0, void 0, function* () {
let peer = Peer.statics.peerize(p);
const namedURL = peer.getNamedURL();
logger.debug(' `--> to peer %s [%s] (%s)', peer.keyID(), peer.member ? 'member' : '------', namedURL);
try {
yield this.post(peer, params.uri, params.getObj(theDoc));
}
catch (e) {
if (params.onError) {
try {
const json = JSON.parse(e.body);
yield params.onError(json, doc, namedURL);
}
catch (ex) {
logger.warn('Could not reach %s', namedURL);
}
}
}
})));
}
else {
logger.debug('[ISOLATE] Prevent --> new Peer to be sent to %s peer(s)', peers.length);
}
}
catch (err) {
logger.error(err);
}
});
}
post(peer, uri, data) {
if (!peer.isReachable()) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
const postReq = request.post({
"uri": protocol(peer.getPort()) + '://' + peer.getURL() + uri,
"timeout": this.timeout || constants.NETWORK.DEFAULT_TIMEOUT
}, (err, res) => {
if (err) {
this.push({ unreachable: true, peer: { pubkey: peer.pubkey } });
logger.warn(err.message || err);
}
if (res && res.statusCode != 200) {
return reject(res);
}
resolve(res);
});
postReq.form(data);
});
}
}
exports.Multicaster = Multicaster;
function protocol(port) {
return port == 443 ? 'https' : 'http';
}
//# sourceMappingURL=multicaster.js.map
\ No newline at end of file
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
const stream = require("stream");
const Peer = require('../entity/peer');
const constants = require('../constants');
class RouterStream extends stream.Transform {
constructor(peeringService, dal) {
super({ objectMode: true });
this.peeringService = peeringService;
this.dal = dal;
this.active = true;
this.logger = require('../logger').NewLogger('router');
}
setConfDAL(theDAL) {
this.dal = theDAL;
}
setActive(shouldBeActive) {
this.active = shouldBeActive;
}
_write(obj, enc, done) {
return __awaiter(this, void 0, void 0, function* () {
try {
if (obj.joiners) {
yield this.route('block', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)());
}
else if (obj.revocation) {
yield this.route('revocation', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.pubkey && obj.uid) {
yield this.route('identity', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.idty_uid) {
yield this.route('cert', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.userid) {
yield this.route('membership', obj, () => this.getRandomInUPPeers(obj.issuer === this.peeringService.pubkey)());
}
else if (obj.inputs) {
yield this.route('transaction', obj, () => this.getRandomInUPPeers(obj.issuers.indexOf(this.peeringService.pubkey) !== -1)());
}
else if (obj.endpoints) {
yield this.route('peer', obj, () => this.getRandomInUPPeers(obj.pubkey === this.peeringService.pubkey)());
}
else if (obj.from && obj.from == this.peeringService.pubkey) {
// Route ONLY status emitted by this node
yield this.route('status', obj, () => this.getTargeted(obj.to || obj.idty_issuer)());
}
else if (obj.unreachable) {
yield this.dal.setPeerDown(obj.peer.pubkey);
this.logger.info("Peer %s unreachable: now considered as DOWN.", obj.peer.pubkey);
}
else if (obj.outdated) {
yield this.peeringService.handleNewerPeer(obj.peer);
}
}
catch (e) {
if (e && e.uerr && e.uerr.ucode == constants.ERRORS.NEWER_PEER_DOCUMENT_AVAILABLE.uerr.ucode) {
this.logger.info('Newer peer document available on the network for local node');
}
else {
this.logger.error("Routing error: %s", e && (e.stack || e.message || (e.uerr && e.uerr.message) || e));
}
}
done && done();
});
}
route(type, obj, getPeersFunc) {
return __awaiter(this, void 0, void 0, function* () {
if (!this.active)
return;
const peers = yield getPeersFunc();
this.push({
'type': type,
'obj': obj,
'peers': (peers || []).map(Peer.statics.peerize)
});
});
}
getRandomInUPPeers(isSelfDocument) {
return this.getValidUpPeers([this.peeringService.pubkey], isSelfDocument);
}
getValidUpPeers(without, isSelfDocument) {
return () => __awaiter(this, void 0, void 0, function* () {
let members = [];
let nonmembers = [];
let peers = yield this.dal.getRandomlyUPsWithout(without); // Peers with status UP
for (const p of peers) {
let isMember = yield this.dal.isMember(p.pubkey);
isMember ? members.push(p) : nonmembers.push(p);
}
members = RouterStream.chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO);
nonmembers = RouterStream.chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO);
let mainRoutes = members.map((p) => (p.member = true) && p).concat(nonmembers);
let mirrors = yield this.peeringService.mirrorEndpoints();
return mainRoutes.concat(mirrors.map((mep, index) => {
return {
pubkey: 'M' + index + '_' + this.peeringService.pubkey,
endpoints: [mep]
};
}));
});
}
/**
* Get the peer targeted by `to` argument, this node excluded (for not to loop on self).
*/
getTargeted(to) {
return () => __awaiter(this, void 0, void 0, function* () {
if (to == this.peeringService.pubkey) {
return [];
}
const peer = yield this.dal.getPeer(to);
return [peer];
});
}
static chooseXin(peers, max) {
const chosen = [];
const nbPeers = peers.length;
for (let i = 0; i < Math.min(nbPeers, max); i++) {
const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbPeers) - i, 0);
chosen.push(peers[randIndex]);
peers.splice(randIndex, 1);
}
return chosen;
}
}
exports.RouterStream = RouterStream;
//# sourceMappingURL=router.js.map
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment