Commit 4c08c289 authored by Cédric Moreau's avatar Cédric Moreau

Completely rewritten status/forward workflow

parent 1056ba45
......@@ -266,10 +266,6 @@ module.exports = function (pgp, currency, conf) {
}
callback();
});
},
initForwards: function (callback) {
// Eventually renegociate FWD rules according to new Wallet entry
PeeringService.initForwards(callback);
}
},
function(err) {
......@@ -371,109 +367,10 @@ module.exports = function (pgp, currency, conf) {
], function (err, status, peer, wasStatus) {
http.answer(res, 400, err, function () {
slogger.debug('⬇ %s status %s', peer.fingerprint, status.status);
// Answers
process.nextTick(function () {
res.end(JSON.stringify(status.json()));
});
// Send forward request if not done yet
async.waterfall([
function (next){
if(status.isNew()){
// Any previous forward must be removed and resent by each other
Forward.remove({ $or: [ {from: peer.fingerprint}, {to: peer.fingerprint} ] }, function (err, fwds) {
next(err, true);
});
return;
}
next(null, false);
},
], function (err, needForward) {
if(err) slogger.error(err);
async.waterfall([
function (next){
if(needForward){
PeeringService.initForwards(next, peer ? [ peer.fingerprint ] : null);
}
},
function (next){
var newStatus = status.status;
var answerStatus = chooseActionForIncomingStatusAndPeer(wasStatus, newStatus, peer);
answerStatus(peer, next);
},
], function (err) {
if (err) slogger.error(err);
});
});
res.end(JSON.stringify(status.json()));
})
});
}
// 3D associative array
// Dim. 1: sent status
// Dim. 2: received status
// Dim. 3: incoming status
var actionMatrix = {};
_(Peer.status).keys().forEach(function(sentSt){
actionMatrix[sentSt] = {};
_(Peer.status).keys().forEach(function(receivedSt){
actionMatrix[sentSt][receivedSt] = {};
_(Peer.status).keys().forEach(function(incomingSt){
actionMatrix[sentSt][receivedSt][incomingSt] = doNothing;
});
});
});
_(Peer.status).keys().forEach(function(receivedSt){
_(Peer.status).keys().forEach(function(incomingSt){
actionMatrix["NOTHING"][receivedSt][incomingSt] = sendNewStatus;
});
});
_(Peer.status).keys().forEach(function(sentSt){
_(Peer.status).keys().forEach(function(receivedSt){
if (sentSt != 'NOTHING') {
actionMatrix[sentSt][receivedSt]["NEW"] = resetAndSendNewStatus;
}
});
});
// Avoid NEW infinite loop
actionMatrix["NEW"]["NOTHING"]["NEW"] = doNothing;
// Other reset cases
actionMatrix["NEW"]["NOTHING"]["UP"] = resetAndSendNewStatus;
actionMatrix["NEW"]["NOTHING"]["DOWN"] = resetAndSendNewStatus;
actionMatrix["UP"]["NOTHING"]["UP"] = resetAndSendNewStatus;
actionMatrix["UP"]["NOTHING"]["DOWN"] = resetAndSendNewStatus;
actionMatrix["DOWN"]["NOTHING"]["UP"] = resetAndSendNewStatus;
actionMatrix["DOWN"]["NOTHING"]["DOWN"] = resetAndSendNewStatus;
function chooseActionForIncomingStatusAndPeer (wasStatus , newStatus, peer) {
slogger.debug("Choose action for %s %s %s", peer.statusSent, wasStatus, newStatus);
return actionMatrix[peer.statusSent][wasStatus][newStatus];
}
function sendNewStatus (peer, done) {
slogger.debug("Send NEW status to %s", peer.fingerprint);
PeeringService.sendStatusTo(Peer.status.NEW, [ peer.fingerprint ], done);
}
function resetAndSendNewStatus (peer, done) {
slogger.debug("RESET and send NEW status to %s", peer.fingerprint);
async.waterfall([
function (next){
peer.status = Peer.status.NOTHING;
peer.statusSent = Peer.status.NOTHING;
peer.statusSentPending = false;
peer.save(function (err){
next(err);
});
},
function (next){
PeeringService.sendStatusTo(Peer.status.NEW, [ peer.fingerprint ], next);
},
], done);
}
function doNothing (peer, done) {
done();
}
return this;
}
var async = require('async');
var logger = require('../lib/logger')('networker');
var async = require('async');
var request = require('request');
var logger = require('../lib/logger')('networker');
var fifo = async.queue(function (task, callback) {
task(callback);
......@@ -12,7 +13,7 @@ module.exports = function (peeringService) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating key %s to peer %s', pubkey.fingerprint, peer.fingerprint);
logger.debug('Propagating key %s to peer %s', pubkey.fingerprint, peer.keyID());
// Sent!
sent();
});
......@@ -24,7 +25,7 @@ module.exports = function (peeringService) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating vote from %s to peer %s', vote.issuer, peer.fingerprint);
logger.debug('Propagating vote from %s to peer %s', vote.issuer, peer.keyID());
// Sent!
sent();
});
......@@ -36,7 +37,7 @@ module.exports = function (peeringService) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating transaction from %s to peer %s', transaction.issuer, peer.fingerprint);
logger.debug('Propagating transaction from %s to peer %s', transaction.issuer, peer.keyID());
// Sent!
sent();
});
......@@ -48,33 +49,51 @@ module.exports = function (peeringService) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating Wallet entry from %s to peer %s', thtentry.issuer, peer.fingerprint);
logger.debug('Propagating Wallet entry from %s to peer %s', thtentry.issuer, peer.keyID());
// Sent!
sent();
});
});
});
peeringService.on('peer', function(peering, peers) {
logger.debug('new peer to be sent to %s peers', peers.length);
peeringService.on('peer', function(peering, peers, done) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating peering of peer %s to peer %s', peering.fingerprint, peer.fingerprint);
// Sent!
sent();
logger.debug('sending peering of %s to peer %s', peering.keyID(), peer.keyID());
post(peer, "/network/peering/peers", {
entry: peer.getRaw(),
signature: peer.signature
}, function (err, res, body) {
// Sent!
sent();
if (typeof done == 'function') {
done(err, res, body);
}
});
});
});
});
peeringService.on('status', function(status, peers) {
logger.debug('new status to be sent to %s peers', peers.length);
peeringService.on('status', function(status, peers, internal) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating status of peer %s to peer %s', status.fingerprint, peer.fingerprint);
// Sent!
sent();
logger.debug('sending %s status to peer %s', status.status, peer.keyID());
post(peer, "/network/peering/status", {
status: status.getRaw(),
signature: status.signature
}, function (err, res, body) {
if (!err && res && res.statusCode == 400 && !internal) {
logger.debug('sending self peering to peer %s', peer.keyID());
peeringService.emit('peer', peeringService.peer(), [peer], function (err, res, body) {
peeringService.emit('status', status, [peer], true);
});
} else {
// Sent!
sent(err);
}
});
});
});
});
......@@ -84,7 +103,7 @@ module.exports = function (peeringService) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating membership of peer %s to peer %s', membership.fingerprint, peer.fingerprint);
logger.debug('Propagating membership of peer %s to peer %s', membership.fingerprint, peer.keyID());
// Sent!
sent();
});
......@@ -96,48 +115,38 @@ module.exports = function (peeringService) {
peers.forEach(function(peer){
fifo.push(function (sent) {
// Do propagating
logger.debug('Propagating voting of peer %s to peer %s', voting.fingerprint, peer.fingerprint);
logger.debug('Propagating voting of peer %s to peer %s', voting.fingerprint, peer.keyID());
// Sent!
sent();
});
});
});
peeringService.on('forward', function(forward, peers, done) {
fifo.push(function (sent) {
async.forEach(peers, function(peer, callback){
// Do propagating
logger.debug('sending %s forward to peer %s', forward.forward, peer.keyID());
post(peer, "/network/peering/forward", {
forward: forward.getRaw(),
signature: forward.signature
}, function (err, res, body) {
// Sent!
sent();
if (typeof done == 'function') {
done(err, res, body);
}
});
}, function(err){
done(err);
sent();
});
});
});
};
// this.propagateToFingerprint = function (fpr, obj, sendMethod, done) {
// async.waterfall([
// function (next){
// Peer.find({ fingerprint: fpr }, next);
// },
// function (peers, next){
// if(peers.length > 0){
// var remote = peers[0];
// async.waterfall([
// function (next){
// if (remote.status == "NOTHING" && remote.statusSent == "NOTHING") {
// // Send peering entry
// logger.debug("NEVER KNOWN peer %s, send self peering", remote.fingerprint);
// that.submitSelfPeering(remote, function (err) {
// next(err);
// });
// } else {
// next();
// }
// },
// function (next){
// sendMethod.call(sendMethod, remote, obj, next);
// }
// ], next);
// }
// else next();
// },
// ], function (err) {
// done();
// });
// };
function sendPubkey(peer, pubkey, done) {
logger.info('POST pubkey to %s', peer.fingerprint);
logger.info('POST pubkey to %s', peer.keyID());
post(peer, '/pks/add', {
"keytext": pubkey.getRaw(),
"keysign": pubkey.signature
......@@ -145,7 +154,7 @@ function sendPubkey(peer, pubkey, done) {
}
function sendVote(peer, vote, done) {
logger.info('POST vote to %s', peer.fingerprint);
logger.info('POST vote to %s', peer.keyID());
post(peer, '/hdc/amendments/votes', {
"amendment": vote.getRaw(),
"signature": vote.signature
......@@ -153,7 +162,7 @@ function sendVote(peer, vote, done) {
}
function sendTransaction(peer, transaction, done) {
logger.info('POST transaction to %s', peer.fingerprint);
logger.info('POST transaction to %s', peer.keyID());
post(peer, '/hdc/transactions/process', {
"transaction": transaction.getRaw(),
"signature": transaction.signature
......@@ -161,7 +170,7 @@ function sendTransaction(peer, transaction, done) {
}
function sendWallet(peer, entry, done) {
logger.info('POST Wallet entry %s to %s', entry.fingerprint, peer.fingerprint);
logger.info('POST Wallet entry %s to %s', entry.fingerprint, peer.keyID());
post(peer, '/network/tht', {
"entry": entry.getRaw(),
"signature": entry.signature
......@@ -177,41 +186,13 @@ function sendPeering(toPeer, peer, done) {
}
function sendForward(peer, rawForward, signature, done) {
logger.info('POST forward to %s', peer.fingerprint);
logger.info('POST forward to %s', peer.keyID());
post(peer, '/network/peering/forward', {
"forward": rawForward,
"signature": signature
}, done);
}
function sendStatus(peer, status, done) {
logger.info('POST status %s to %s', status.status, peer.fingerprint);
var previouslySent = peer.statusSent;
async.waterfall([
function (next) {
peer.statusSent = status.status;
peer.statusSentPending = true;
peer.save(function (err) {
next(err);
});
},
function (next){
post(peer, '/network/peering/status', {
"status": status.getRaw(),
"signature": status.signature
}, next);
}
], function (err){
peer.statusSentPending = false;
if (err) {
peer.statusSent = previouslySent;
}
peer.save(function (err2) {
done(err || err2);
});
});
}
function post(peer, url, data, done) {
var postReq = request.post('http://' + peer.getURL() + url, function (err, res, body) {
done(err, res, body);
......
......@@ -377,10 +377,6 @@ module.exports.express = {
next(err);
});
},
function (next) {
// Initialize managed keys
PeeringService.initKeys(next);
},
function (next){
// Add selfkey as managed
mongoose.model('Key').setManaged(server.fingerprint(), true, next);
......
......@@ -18,6 +18,14 @@ var ForwardSchema = new Schema({
});
ForwardSchema.methods = {
fromKeyID: function () {
return this.from && this.from.length > 24 ? "0x" + this.from.substring(24) : "0x?";
},
toKeyID: function () {
return this.to && this.to.length > 24 ? "0x" + this.to.substring(24) : "0x?";
},
copyValues: function(to) {
var obj = this;
......@@ -196,6 +204,12 @@ ForwardSchema.statics.getTheOne = function (from, to, done) {
});
}
ForwardSchema.statics.removeTheOne = function (from, to, done) {
Forward.remove({ from: from, to: to }, function (err) {
done(err);
});
}
ForwardSchema.statics.findMatchingTransaction = function (tx, done) {
Forward.find({
$or: [
......
......@@ -7,7 +7,9 @@ var vucoin = require('vucoin');
var Schema = mongoose.Schema;
var STATUS = {
ASK: "ASK",
NEW: "NEW",
NEW_BACK: "NEW_BACK",
UP: "UP",
DOWN: "DOWN",
NOTHING: "NOTHING"
......@@ -23,7 +25,6 @@ var PeerSchema = new Schema({
hash: String,
status: { type: String, default: STATUS.NOTHING },
statusSent: { type: String, default: STATUS.NOTHING },
statusSentPending: { type: Boolean, default: false },
statusSigDate: { type: Date, default: function(){ return new Date(0); } },
propagated: { type: Boolean, default: false },
sigDate: { type: Date, default: function(){ return new Date(0); } },
......@@ -33,6 +34,10 @@ var PeerSchema = new Schema({
PeerSchema.methods = {
keyID: function () {
return this.fingerprint && this.fingerprint.length > 24 ? "0x" + this.fingerprint.substring(24) : "0x?";
},
setStatus: function (newStatus, done) {
if(this.status != newStatus){
this.status = newStatus;
......@@ -301,6 +306,10 @@ PeerSchema.statics.getList = function (fingerprints, done) {
Peer.find({ fingerprint: { $in: fingerprints }}, done);
};
PeerSchema.statics.allBut = function (fingerprint, done) {
Peer.find({ fingerprint: { $ne: fingerprint } }, done);
};
PeerSchema.statics.status = STATUS;
var Peer = mongoose.model('Peer', PeerSchema);
This diff is collapsed.
......@@ -72,34 +72,6 @@ program
});
}));
program
.command('manage-keys')
.description('Update managed keys configuration and send corresponding forwards to other peers')
.action(connect(function (conf) {
// Launching server
server.express.app(program.currency, conf, function (err, app) {
var PeeringService = service.Peering;
async.waterfall([
function (next) {
PeeringService.initKeys(next);
},
function (next) {
PeeringService.initForwards(next);
}
], function (err) {
if(err){
logger.error('An error occured: %s', err);
return;
}
server.database.disconnect();
process.exit();
});
});
}));
program
.command('allow-key [key]')
.description('Add given key to authorized keys of this node')
......
#!/bin/bash
host=127.0.0.1
port=8080
bin/ucoind --currency beta_brousouf reset config
bin/ucoind --currency beta_brousouf reset data
bin/ucoind --currency beta_brousouf allow-key 2E69197FAB029D8669EF85E82457A1587CA0ED9C
bin/ucoind --currency beta_brousouf config --pgpkey test/data/lolcat.priv --ipv4 $host --port $port --remote4 $host --remotep $port --pgppasswd "lolcat" --kmanagement ALL
bin/ucoind --currency beta_brousouf start
#bin/ucoind --currency beta_brousouf reset data
#bin/ucoind --currency beta_brousouf allow-key 2E69197FAB029D8669EF85E82457A1587CA0ED9C
#bin/ucoind --currency beta_brousouf start
#node --debug bin/ucoind --currency beta_brousouf start
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment