diff --git a/index.js b/index.js index 9eddb7a6de43e73316cfc26770d3f577a787a204..15623d84fd48c6b80d23673e5ad339a4381e66ec 100644 --- a/index.js +++ b/index.js @@ -197,12 +197,10 @@ function Stack(dependencies) { return yield command.onPluggedDALExecute(server, conf, program, params, // Start services and streaming between them - () => { + () => co(function*() { const modules = streams.input.concat(streams.process).concat(streams.output); - for (const module of modules) { - // Any streaming module must implement a `startService` method - module.startService(); - } + // Any streaming module must implement a `startService` method + yield modules.map(module => module.startService()); // All inputs write to global INPUT stream for (const module of streams.input) module.pipe(INPUT); // All processes read from global INPUT stream @@ -210,16 +208,14 @@ function Stack(dependencies) { // All processes write to global PROCESS stream for (const module of streams.process) module.pipe(PROCESS); // All ouputs read from global PROCESS stream - for (const module of streams.process) PROCESS.pipe(module); - }, + for (const module of streams.output) PROCESS.pipe(module); + }), // Stop services and streaming between them - () => { + () => co(function*() { const modules = streams.input.concat(streams.process).concat(streams.output); - for (const module of modules) { - // Any streaming module must implement a `stopService` method - module.stopService(); - } + // Any streaming module must implement a `stopService` method + yield modules.map(module => module.stopService()); // Stop reading inputs for (const module of streams.input) module.unpipe(); // Stop reading from global INPUT @@ -227,7 +223,7 @@ function Stack(dependencies) { for (const module of streams.process) module.unpipe(); // Stop reading from global PROCESS PROCESS.unpipe(); - }); + })); } catch (e) { server.disconnect(); throw e; @@ -430,8 +426,8 @@ function InputStream() { if (typeof str === 'string') { // Keep only strings const matches = str.match(/Type: (.*)\n/); - if (matches && matches[0].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) { - const type = matches[0].toLowerCase(); + if (matches && matches[1].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) { + const type = matches[1].toLowerCase(); that.push({ type, doc: str }); } } diff --git a/test/fast/v1.0-modules-api.js b/test/fast/v1.0-modules-api.js index 130b60b07888931b91dacbc94b4617137d6e2da6..7a2c5650d40b84812526188f3daaa879680c3a9f 100644 --- a/test/fast/v1.0-modules-api.js +++ b/test/fast/v1.0-modules-api.js @@ -3,7 +3,11 @@ const co = require('co'); const _ = require('underscore'); const should = require('should'); +const util = require('util'); +const stream = require('stream'); const duniter = require('../../index'); +const parsers = require('../../app/lib/streams/parsers'); +const querablep = require('../../app/lib/querablep'); describe("v1.0 Module API", () => { @@ -126,7 +130,155 @@ describe("v1.0 Module API", () => { })); }); - // TODO: test serviceStart - // TODO: test serviceStop - // TODO: test streaming + /*********************** + * SERVICE START/STOP + **********************/ + + describe("Service triggers", () => { + + let stack; + let fakeI; + let fakeP; + let fakeO; + const run = (...args) => stack.executeStack(['node', 'index.js', '--memory'].concat(args)); + + before(() => co(function*() { + + stack = duniter.statics.simpleStack(); + fakeI = new FakeStream((that, data) => { + // Note: we never pass here + if (typeof data == "string") { + that.push(data); + } + }); + fakeP = new FakeStream((that, data) => { + if (typeof data == "object" && data.type == "transaction") { + const tx = parsers.parseTransaction.syncWrite(data.doc); + that.push(tx); + } + }); + fakeO = new FakeStream((that, data) => { + if (data.issuers) { + that.resolveData(); + } + }); + // Fake output has a special promise of data receival, for our tests + fakeO.outputed = querablep(new Promise((res) => fakeO.resolveData = res)); + const dummyStartServiceDependency = { + duniter: { + cli: [{ + name: 'hello-service', + desc: 'Says hello to the world, at service phase. And feed INPUT with a transaction.', + onPluggedDALExecute: (duniterServer, conf, program, programArgs, startServices, stopServices) => co(function*(){ + yield startServices(); + fakeI.push("Version: 10\n" + + "Type: Transaction\n" + + "Currency: test_net\n" + + "Blockstamp: 3-2A27BD040B16B7AF59DDD88890E616987F4DD28AA47B9ABDBBEE46257B88E945\n" + + "Locktime: 0\n" + + "Issuers:\n" + + "HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk\n" + + "Inputs:\n" + + "100000:0:D:HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk:3428\n" + + "Unlocks:\n" + + "0:SIG(0)\n" + + "Outputs:\n" + + "1000:0:SIG(yGKRRB18B4eaZQdksWBZubea4VJKFSSpii2okemP7x1)\n" + + "99000:0:SIG(HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk)\n" + + "Comment: reessai\n" + + "P6MxJ/2SdkvNDyIyWuOkTz3MUwsgsfo70j+rpWeQWcm6GdvKQsbplB8482Ar1HMz2q0h5V3tfMqjCuAeWVQ+Ag==\n"); + yield fakeO.outputed; + return fakeO.outputed; + }) + }], + service: { + input: fakeI, + process: fakeP, + output: fakeO + } + } + }; + const dummyStopServiceDependency = { + duniter: { + cli: [{ + name: 'bye-service', + desc: 'Says goodbye to the world, at service phase.', + onPluggedDALExecute: (duniterServer, conf, program, programArgs, startServices, stopServices) => co(function*(){ + yield stopServices(); + return Promise.resolve(); + }) + }], + service: { + input: fakeI, + process: fakeP, + output: fakeO + } + } + }; + + stack.registerDependency(dummyStartServiceDependency); + stack.registerDependency(dummyStopServiceDependency); + })); + + it('verify that services are started', () => co(function*() { + fakeI.started.isResolved().should.equal(false); + fakeP.started.isResolved().should.equal(false); + fakeO.started.isResolved().should.equal(false); + fakeI.stopped.isResolved().should.equal(false); + fakeP.stopped.isResolved().should.equal(false); + fakeO.stopped.isResolved().should.equal(false); + yield run('hello-service'); + fakeO.outputed.isResolved().should.equal(true); // The transaction has successfully gone through the whole stream + fakeI.started.isResolved().should.equal(true); + fakeP.started.isResolved().should.equal(true); + fakeO.started.isResolved().should.equal(true); + fakeI.stopped.isResolved().should.equal(false); + fakeP.stopped.isResolved().should.equal(false); + fakeO.stopped.isResolved().should.equal(false); + })); + + it('verify that services are stopped', () => co(function*() { + fakeI.stopped.isResolved().should.equal(false); + fakeP.stopped.isResolved().should.equal(false); + fakeO.stopped.isResolved().should.equal(false); + fakeI.started.isResolved().should.equal(true); + fakeP.started.isResolved().should.equal(true); + fakeO.started.isResolved().should.equal(true); + yield run('bye-service'); + fakeI.started.isResolved().should.equal(false); + fakeP.started.isResolved().should.equal(false); + fakeO.started.isResolved().should.equal(false); + fakeI.stopped.isResolved().should.equal(true); + fakeP.stopped.isResolved().should.equal(true); + fakeO.stopped.isResolved().should.equal(true); + })); + }); + }); + + +function FakeStream(onWrite) { + + const that = this; + stream.Transform.call(this, { objectMode: true }); + + let resolveStart = () => null; + let resolveStop = () => null; + + this._write = onWrite.bind(this, that); + + this.started = querablep(new Promise(res => resolveStart = res)); + this.stopped = querablep(new Promise(res => resolveStop = res)); + + this.startService = () => co(function*() { + resolveStart(); + that.stopped = querablep(new Promise(res => resolveStop = res)); + }); + + this.stopService = () => co(function*() { + resolveStop(); + that.started = querablep(new Promise(res => resolveStart = res)); + }); +} + +util.inherits(FakeStream, stream.Transform);