From b748ae5e9a848f62d301515bda6dada7619ee26d Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Tue, 17 Jan 2017 14:38:23 +0100 Subject: [PATCH] [enh] "Routing and multicasting" modularized --- app/lib/computation/permanentProver.js | 2 +- app/lib/entity/configuration.js | 1 - app/lib/streams/router.js | 8 +-- app/modules/crawler.js | 2 +- app/modules/daemon.js | 9 +-- app/modules/peersignal.js | 2 +- app/modules/prover.js | 4 +- app/modules/router.js | 72 +++++++++++++++++++++++ index.js | 13 ++-- server.js | 51 ---------------- test/integration/branches2.js | 6 ++ test/integration/forwarding.js | 4 +- test/integration/peer-outdated.js | 6 +- test/integration/peerings.js | 8 +-- test/integration/peers-same-pubkey.js | 6 +- test/integration/start_generate_blocks.js | 6 +- test/integration/tools/node.js | 16 +---- test/integration/tools/toolbox.js | 10 +--- test/integration/v1.0-modules-api.js | 8 +-- 19 files changed, 113 insertions(+), 121 deletions(-) create mode 100644 app/modules/router.js diff --git a/app/lib/computation/permanentProver.js b/app/lib/computation/permanentProver.js index 63d60a1dc..b3167746c 100644 --- a/app/lib/computation/permanentProver.js +++ b/app/lib/computation/permanentProver.js @@ -149,7 +149,7 @@ function PermanentProver() { }); this.blockchainChanged = (gottenBlock) => co(function*() { - if (!gottenBlock || !lastComputedBlock || gottenBlock.hash !== lastComputedBlock.hash) { + if (server && (!gottenBlock || !lastComputedBlock || gottenBlock.hash !== lastComputedBlock.hash)) { // Cancel any processing proof yield server.BlockchainService.prover.cancel(gottenBlock); // If we were waiting, stop it and process the continuous generation diff --git a/app/lib/entity/configuration.js b/app/lib/entity/configuration.js index 1409067fb..acec9a379 100644 --- a/app/lib/entity/configuration.js +++ b/app/lib/entity/configuration.js @@ -33,7 +33,6 @@ const defaultConf = function() { "httplogs": false, "udid2": false, "timeout": 3000, - "routing": false, "isolate": false, "forksize": constants.BRANCHES.DEFAULT_WINDOW_SIZE }; diff --git a/app/lib/streams/router.js b/app/lib/streams/router.js index 0ad9fb6eb..a02a94a82 100644 --- a/app/lib/streams/router.js +++ b/app/lib/streams/router.js @@ -6,13 +6,13 @@ const stream = require('stream'); const Peer = require('../entity/peer'); const constants = require('../constants'); -module.exports = function (PeeringService, conf, dal) { - return new Router(PeeringService, conf, dal); +module.exports = function (PeeringService, dal) { + return new Router(PeeringService, dal); }; -function Router (PeeringService, conf, dal) { +function Router (PeeringService, dal) { - this.setConfDAL = (theConf, theDAL) => { + this.setConfDAL = (theDAL) => { dal = theDAL; }; diff --git a/app/modules/crawler.js b/app/modules/crawler.js index 30d9cd8de..18b6286a8 100644 --- a/app/modules/crawler.js +++ b/app/modules/crawler.js @@ -15,7 +15,7 @@ const crawler = new Crawler(); module.exports = { duniter: { service: { - neutral: crawler + neutral: () => crawler }, methods: { pullBlocks: crawler.pullBlocks } diff --git a/app/modules/daemon.js b/app/modules/daemon.js index 073ae58e6..fc6420dc2 100644 --- a/app/modules/daemon.js +++ b/app/modules/daemon.js @@ -11,13 +11,14 @@ module.exports = { onPluggedDALExecute: (server, conf, program, params, startServices) => co(function*() { const logger = server.logger; - logger.info(">> NODE STARTING"); + logger.info(">> Server starting..."); - // Routing documents - server.routing(); + yield server.checkConfig(); + // Add signing & public key functions to PeeringService + logger.info('Node version: ' + server.version); + logger.info('Node pubkey: ' + server.conf.pair.pub); // Services - yield server.startServices(); yield startServices(); logger.info('>> Server ready!'); diff --git a/app/modules/peersignal.js b/app/modules/peersignal.js index bbeefa00e..93c52a161 100644 --- a/app/modules/peersignal.js +++ b/app/modules/peersignal.js @@ -7,7 +7,7 @@ const constants = require('../lib/constants'); module.exports = { duniter: { service: { - neutral: new PeerSignalEmitter() + neutral: () => new PeerSignalEmitter() } } } diff --git a/app/modules/prover.js b/app/modules/prover.js index 729ed81a8..babeff7b2 100644 --- a/app/modules/prover.js +++ b/app/modules/prover.js @@ -6,12 +6,10 @@ const stream = require('stream'); const constants = require('../lib/constants'); const permanentProver = require('../lib/computation/permanentProver'); -const prover = new Prover(); - module.exports = { duniter: { service: { - output: prover + output: () => new Prover() }, methods: { diff --git a/app/modules/router.js b/app/modules/router.js new file mode 100644 index 000000000..af458c6a5 --- /dev/null +++ b/app/modules/router.js @@ -0,0 +1,72 @@ +"use strict"; + +const co = require('co'); +const constants = require('../lib/constants'); +const util = require('util'); +const stream = require('stream'); +const router = require('../lib/streams/router'); +const multicaster = require('../lib/streams/multicaster'); + +module.exports = { + duniter: { + service: { + output: () => new Router() + }, + methods: { + routeToNetwork: (server) => { + const router = new Router(); + router.startService(server); + server.pipe(router); + } + } + } +} + +/** + * Service which triggers the server's peering generation (actualization of the Peer document). + * @constructor + */ +function Router() { + + const that = this; + let theRouter, theMulticaster = multicaster(); + + stream.Transform.call(this, { objectMode: true }); + + this._write = function (obj, enc, done) { + // Never close the stream + if (obj) { + that.push(obj); + } + done && done(); + }; + + this.startService = (server) => co(function*() { + if (!theRouter) { + theRouter = router(server.PeeringService, server.dal); + } + theRouter.setActive(true); + theRouter.setConfDAL(server.dal); + + /** + * Enable routing features: + * - The server will try to send documents to the network + * - The server will eventually be notified of network failures + */ + // The router asks for multicasting of documents + server.pipe(that) + .pipe(theRouter) + // The documents get sent to peers + .pipe(theMulticaster) + // The multicaster may answer 'unreachable peer' + .pipe(theRouter); + }); + + this.stopService = () => co(function*() { + that.unpipe(); + theRouter.unpipe(); + theMulticaster.unpipe(); + }); +} + +util.inherits(Router, stream.Transform); diff --git a/index.js b/index.js index f46a44513..7c01587da 100644 --- a/index.js +++ b/index.js @@ -27,6 +27,7 @@ const pSignalDependency = require('./app/modules/peersignal'); const crawlerDependency = require('./app/modules/crawler'); const proverDependency = require('./app/modules/prover'); const bmapiDependency = require('./app/modules/bmapi'); +const routerDependency = require('./app/modules/router'); const MINIMAL_DEPENDENCIES = [ { name: 'duniter-config', required: configDependency } @@ -46,6 +47,7 @@ const DEFAULT_DEPENDENCIES = MINIMAL_DEPENDENCIES.concat([ { name: 'duniter-psignal', required: pSignalDependency }, { name: 'duniter-crawler', required: crawlerDependency }, { name: 'duniter-bmapi', required: bmapiDependency }, + { name: 'duniter-router', required: routerDependency }, { name: 'duniter-keypair', required: dkeypairDependency } ]); @@ -168,15 +170,15 @@ function Stack(dependencies) { } // To handle data that has been submitted by INPUT stream if (def.service.process) { - streams.process.push(def.service.process); + streams.process.push(def.service.process()); } // To handle data that has been validated by PROCESS stream if (def.service.output) { - streams.output.push(def.service.output); + streams.output.push(def.service.output()); } // Special service which does not stream anything particular (ex.: piloting the `server` object) if (def.service.neutral) { - streams.neutral.push(def.service.neutral); + streams.neutral.push(def.service.neutral()); } } }; @@ -297,6 +299,7 @@ function Stack(dependencies) { // Trace these errors process.on('unhandledRejection', (reason) => { logger.error('Unhandled rejection: ' + reason); + logger.error(reason); }); // Executes the command @@ -363,9 +366,7 @@ function commandLineConf(program, conf) { if (cli.timeout) conf.timeout = cli.timeout; if (cli.forksize != null) conf.forksize = cli.forksize; - // Specific internal settings - conf.createNext = true; - return _(conf).extend({routing: true}); + return conf; } function configure(program, server, conf) { diff --git a/server.js b/server.js index 38606038b..46289b304 100644 --- a/server.js +++ b/server.js @@ -12,12 +12,10 @@ const parsers = require('./app/lib/streams/parsers'); const constants = require('./app/lib/constants'); const fileDAL = require('./app/lib/dal/fileDAL'); const jsonpckg = require('./package.json'); -const router = require('./app/lib/streams/router'); const keyring = require('./app/lib/crypto/keyring'); const directory = require('./app/lib/system/directory'); const dos2unix = require('./app/lib/system/dos2unix'); const Synchroniser = require('./app/lib/sync'); -const multicaster = require('./app/lib/streams/multicaster'); const rawer = require('./app/lib/ucp/rawer'); function Server (home, memoryOnly, overrideConf) { @@ -134,7 +132,6 @@ function Server (home, memoryOnly, overrideConf) { [that.IdentityService, that.MembershipService, that.PeeringService, that.BlockchainService, that.TransactionsService].map((service) => { service.setConfDAL(that.conf, that.dal, that.keyPair); }); - that.router().setConfDAL(that.conf, that.dal); return that.conf; }); @@ -186,13 +183,6 @@ function Server (home, memoryOnly, overrideConf) { this.initDAL = () => this.dal.init(); - this.start = () => co(function*(){ - yield that.checkConfig(); - // Add signing & public key functions to PeeringService - logger.info('Node version: ' + that.version); - logger.info('Node pubkey: ' + that.PeeringService.pubkey); - }); - this.recomputeSelfPeer = () => that.PeeringService.generateSelfPeer(that.conf, 0); this.getCountOfSelfMadePoW = () => this.BlockchainService.getCountOfSelfMadePoW(); @@ -368,16 +358,6 @@ function Server (home, memoryOnly, overrideConf) { this.singleWritePromise = (obj) => that.submit(obj); - let theRouter; - - this.router = (active) => { - if (!theRouter) { - theRouter = router(that.PeeringService, that.conf, that.dal); - } - theRouter.setActive(active !== false); - return theRouter; - }; - /** * Synchronize the server with another server. * @@ -407,20 +387,6 @@ function Server (home, memoryOnly, overrideConf) { return remote.test(); }; - /** - * Enable routing features: - * - The server will try to send documents to the network - * - The server will eventually be notified of network failures - */ - this.routing = () => { - // The router asks for multicasting of documents - this.pipe(this.router()) - // The documents get sent to peers - .pipe(multicaster(this.conf)) - // The multicaster may answer 'unreachable peer' - .pipe(this.router()); - }; - this.applyCPU = (cpu) => that.BlockchainService.changeProverCPUSetting(cpu); this.rawer = rawer; @@ -436,23 +402,6 @@ function Server (home, memoryOnly, overrideConf) { * @param linesQuantity */ this.getLastLogLines = (linesQuantity) => this.dal.getLogContent(linesQuantity); - - this.startServices = () => co(function*(){ - - /*************** - * HTTP ROUTING - **************/ - that.router(that.conf.routing); - - /*********************** - * CRYPTO NETWORK LAYER - **********************/ - yield that.start(); - }); - - this.stopServices = () => co(function*(){ - that.router(false); - }); } util.inherits(Server, stream.Duplex); diff --git a/test/integration/branches2.js b/test/integration/branches2.js index 13bbb1584..5dcc3804f 100644 --- a/test/integration/branches2.js +++ b/test/integration/branches2.js @@ -18,6 +18,12 @@ if (constants.MUTE_LOGS_DURING_UNIT_TESTS) { require('../../app/lib/logger')().mute(); } +// Trace these errors +process.on('unhandledRejection', (reason) => { + console.error('Unhandled rejection: ' + reason); + console.error(reason); +}); + const MEMORY_MODE = true; const commonConf = { ipv4: '127.0.0.1', diff --git a/test/integration/forwarding.js b/test/integration/forwarding.js index dc16a21f8..605b43944 100644 --- a/test/integration/forwarding.js +++ b/test/integration/forwarding.js @@ -19,8 +19,8 @@ describe("Forwarding", function() { const common = { currency: 'bb', ipv4: '127.0.0.1', remoteipv4: '127.0.0.1', upnp: false, rootoffset: 0, sigQty: 1 }; - const node1 = node({ name: 'db_1', memory: MEMORY_MODE }, _({ httplogs: false, port: 9600, remoteport: 9600, pair: { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}, routing: true }).extend(common)); - const node2 = node({ name: 'db_2', memory: MEMORY_MODE }, _({ httplogs: false, port: 9601, remoteport: 9601, pair: { pub: 'G2CBgZBPLe6FSFUgpx2Jf1Aqsgta6iib3vmDRA1yLiqU', sec: '58LDg8QLmF5pv6Dn9h7X4yFKfMTdP8fdAiWVcyDoTRJu454fwRihCLULH4MW37zncsg4ruoTGJPZneWk22QmG1w4'}, routing: true }).extend(common)); + const node1 = node({ name: 'db_1', memory: MEMORY_MODE }, _({ httplogs: false, port: 9600, remoteport: 9600, pair: { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} }).extend(common)); + const node2 = node({ name: 'db_2', memory: MEMORY_MODE }, _({ httplogs: false, port: 9601, remoteport: 9601, pair: { pub: 'G2CBgZBPLe6FSFUgpx2Jf1Aqsgta6iib3vmDRA1yLiqU', sec: '58LDg8QLmF5pv6Dn9h7X4yFKfMTdP8fdAiWVcyDoTRJu454fwRihCLULH4MW37zncsg4ruoTGJPZneWk22QmG1w4'} }).extend(common)); const cat = user('cat', { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}, node1); const tac = user('tac', { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'}, node1); diff --git a/test/integration/peer-outdated.js b/test/integration/peer-outdated.js index eb13c91c0..7c7e7112d 100644 --- a/test/integration/peer-outdated.js +++ b/test/integration/peer-outdated.js @@ -44,11 +44,7 @@ describe("Peer document expiry", function() { const bmaAPI = yield bma(server); yield bmaAPI.openConnections(); server.bma = bmaAPI; - server - .pipe(server.router()) // The router asks for multicasting of documents - .pipe(multicaster()) - .pipe(server.router()); - return server.start(); + require('../../app/modules/router').duniter.methods.routeToNetwork(server); }), Q()); // Server 1 diff --git a/test/integration/peerings.js b/test/integration/peerings.js index 7d85ec23b..e963b555f 100644 --- a/test/integration/peerings.js +++ b/test/integration/peerings.js @@ -88,14 +88,8 @@ describe("Network", function() { return bmaAPI.openConnections() .then(() => { server.bma = bmaAPI; - server - .pipe(server.router()) // The router asks for multicasting of documents - .pipe(multicaster()) - .pipe(server.router()); + require('../../app/modules/router').duniter.methods.routeToNetwork(server); }); - }) - .then(function(){ - return server.start(); }); }); }, Q()) diff --git a/test/integration/peers-same-pubkey.js b/test/integration/peers-same-pubkey.js index 6861ddba3..cd16aa371 100644 --- a/test/integration/peers-same-pubkey.js +++ b/test/integration/peers-same-pubkey.js @@ -40,11 +40,7 @@ describe("Peer document", function() { const bmaAPI = yield bma(server); yield bmaAPI.openConnections(); server.bma = bmaAPI; - server - .pipe(server.router()) // The router asks for multicasting of documents - .pipe(multicaster()) - .pipe(server.router()); - return server.start(); + require('../../app/modules/router').duniter.methods.routeToNetwork(server); }), Q()); // Server 1 diff --git a/test/integration/start_generate_blocks.js b/test/integration/start_generate_blocks.js index 0e231368a..38fd8c92f 100644 --- a/test/integration/start_generate_blocks.js +++ b/test/integration/start_generate_blocks.js @@ -70,11 +70,7 @@ describe("Generation", function() { yield server.initWithDAL(); server.bma = yield bma(server); yield server.bma.openConnections(); - server - .pipe(server.router()) // The router asks for multicasting of documents - .pipe(multicaster()) - .pipe(server.router()); - yield server.start(); + require('../../app/modules/router').duniter.methods.routeToNetwork(server); yield server.PeeringService.generateSelfPeer(server.conf, 0); const prover = require('../../app/modules/prover').duniter.methods.prover(); server.startBlockComputation = () => prover.startService(server); diff --git a/test/integration/tools/node.js b/test/integration/tools/node.js index 92f2b0369..d5faaf13e 100644 --- a/test/integration/tools/node.js +++ b/test/integration/tools/node.js @@ -125,20 +125,8 @@ function Node (dbName, options) { function (server, next){ // Launching server that.server = server; - co(function*(){ - try { - yield that.server.start(); - if (server.conf.routing) { - server - .pipe(server.router()) // The router asks for multicasting of documents - .pipe(multicaster()); - } - started = true; - next(); - } catch (e) { - next(e); - } - }); + started = true; + next(); }, function (next) { that.http = contacter(options.remoteipv4, options.remoteport); diff --git a/test/integration/tools/toolbox.js b/test/integration/tools/toolbox.js index 63f6f746a..c6a61fd34 100644 --- a/test/integration/tools/toolbox.js +++ b/test/integration/tools/toolbox.js @@ -50,8 +50,8 @@ module.exports = { yield tac.join(); // Each server forwards to each other - s1.pipe(s1.router()).pipe(multicaster()); - s2.pipe(s2.router()).pipe(multicaster()); + require('../../../app/modules/router').duniter.methods.routeToNetwork(s1); + require('../../../app/modules/router').duniter.methods.routeToNetwork(s2); return { s1, s2, cat, tac }; }), @@ -283,11 +283,7 @@ module.exports = { const bmaAPI = yield bma(server); yield bmaAPI.openConnections(); server.bma = bmaAPI; - server - .pipe(server.router()) // The router asks for multicasting of documents - .pipe(multicaster()) - .pipe(server.router()); - return server.start(); + require('../../../app/modules/router').duniter.methods.routeToNetwork(server); }); const prover = require('../../../app/modules/prover').duniter.methods.prover(); diff --git a/test/integration/v1.0-modules-api.js b/test/integration/v1.0-modules-api.js index 25a386a75..cff55ee69 100644 --- a/test/integration/v1.0-modules-api.js +++ b/test/integration/v1.0-modules-api.js @@ -201,8 +201,8 @@ describe("v1.0 Module API", () => { }], service: { input: () => fakeI, - process: fakeP, - output: fakeO + process: () => fakeP, + output: () => fakeO } } }; @@ -218,8 +218,8 @@ describe("v1.0 Module API", () => { }], service: { input: () => fakeI, - process: fakeP, - output: fakeO + process: () => fakeP, + output: () => fakeO } } }; -- GitLab