Skip to content
Snippets Groups Projects
Commit c92802ee authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] #739 Added tests on `serviceStart`, `serviceStop` and correct streaming

parent def0fbe6
No related branches found
No related tags found
No related merge requests found
...@@ -197,12 +197,10 @@ function Stack(dependencies) { ...@@ -197,12 +197,10 @@ function Stack(dependencies) {
return yield command.onPluggedDALExecute(server, conf, program, params, return yield command.onPluggedDALExecute(server, conf, program, params,
// Start services and streaming between them // Start services and streaming between them
() => { () => co(function*() {
const modules = streams.input.concat(streams.process).concat(streams.output); const modules = streams.input.concat(streams.process).concat(streams.output);
for (const module of modules) {
// Any streaming module must implement a `startService` method // Any streaming module must implement a `startService` method
module.startService(); yield modules.map(module => module.startService());
}
// All inputs write to global INPUT stream // All inputs write to global INPUT stream
for (const module of streams.input) module.pipe(INPUT); for (const module of streams.input) module.pipe(INPUT);
// All processes read from global INPUT stream // All processes read from global INPUT stream
...@@ -210,16 +208,14 @@ function Stack(dependencies) { ...@@ -210,16 +208,14 @@ function Stack(dependencies) {
// All processes write to global PROCESS stream // All processes write to global PROCESS stream
for (const module of streams.process) module.pipe(PROCESS); for (const module of streams.process) module.pipe(PROCESS);
// All ouputs read from global PROCESS stream // All ouputs read from global PROCESS stream
for (const module of streams.process) PROCESS.pipe(module); for (const module of streams.output) PROCESS.pipe(module);
}, }),
// Stop services and streaming between them // Stop services and streaming between them
() => { () => co(function*() {
const modules = streams.input.concat(streams.process).concat(streams.output); const modules = streams.input.concat(streams.process).concat(streams.output);
for (const module of modules) {
// Any streaming module must implement a `stopService` method // Any streaming module must implement a `stopService` method
module.stopService(); yield modules.map(module => module.stopService());
}
// Stop reading inputs // Stop reading inputs
for (const module of streams.input) module.unpipe(); for (const module of streams.input) module.unpipe();
// Stop reading from global INPUT // Stop reading from global INPUT
...@@ -227,7 +223,7 @@ function Stack(dependencies) { ...@@ -227,7 +223,7 @@ function Stack(dependencies) {
for (const module of streams.process) module.unpipe(); for (const module of streams.process) module.unpipe();
// Stop reading from global PROCESS // Stop reading from global PROCESS
PROCESS.unpipe(); PROCESS.unpipe();
}); }));
} catch (e) { } catch (e) {
server.disconnect(); server.disconnect();
throw e; throw e;
...@@ -430,8 +426,8 @@ function InputStream() { ...@@ -430,8 +426,8 @@ function InputStream() {
if (typeof str === 'string') { if (typeof str === 'string') {
// Keep only strings // Keep only strings
const matches = str.match(/Type: (.*)\n/); const matches = str.match(/Type: (.*)\n/);
if (matches && matches[0].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) { if (matches && matches[1].match(/(Block|Membership|Identity|Certification|Transaction|Peer)/)) {
const type = matches[0].toLowerCase(); const type = matches[1].toLowerCase();
that.push({ type, doc: str }); that.push({ type, doc: str });
} }
} }
......
...@@ -3,7 +3,11 @@ ...@@ -3,7 +3,11 @@
const co = require('co'); const co = require('co');
const _ = require('underscore'); const _ = require('underscore');
const should = require('should'); const should = require('should');
const util = require('util');
const stream = require('stream');
const duniter = require('../../index'); const duniter = require('../../index');
const parsers = require('../../app/lib/streams/parsers');
const querablep = require('../../app/lib/querablep');
describe("v1.0 Module API", () => { describe("v1.0 Module API", () => {
...@@ -126,7 +130,155 @@ describe("v1.0 Module API", () => { ...@@ -126,7 +130,155 @@ describe("v1.0 Module API", () => {
})); }));
}); });
// TODO: test serviceStart /***********************
// TODO: test serviceStop * SERVICE START/STOP
// TODO: test streaming **********************/
describe("Service triggers", () => {
let stack;
let fakeI;
let fakeP;
let fakeO;
const run = (...args) => stack.executeStack(['node', 'index.js', '--memory'].concat(args));
before(() => co(function*() {
stack = duniter.statics.simpleStack();
fakeI = new FakeStream((that, data) => {
// Note: we never pass here
if (typeof data == "string") {
that.push(data);
}
}); });
fakeP = new FakeStream((that, data) => {
if (typeof data == "object" && data.type == "transaction") {
const tx = parsers.parseTransaction.syncWrite(data.doc);
that.push(tx);
}
});
fakeO = new FakeStream((that, data) => {
if (data.issuers) {
that.resolveData();
}
});
// Fake output has a special promise of data receival, for our tests
fakeO.outputed = querablep(new Promise((res) => fakeO.resolveData = res));
const dummyStartServiceDependency = {
duniter: {
cli: [{
name: 'hello-service',
desc: 'Says hello to the world, at service phase. And feed INPUT with a transaction.',
onPluggedDALExecute: (duniterServer, conf, program, programArgs, startServices, stopServices) => co(function*(){
yield startServices();
fakeI.push("Version: 10\n" +
"Type: Transaction\n" +
"Currency: test_net\n" +
"Blockstamp: 3-2A27BD040B16B7AF59DDD88890E616987F4DD28AA47B9ABDBBEE46257B88E945\n" +
"Locktime: 0\n" +
"Issuers:\n" +
"HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk\n" +
"Inputs:\n" +
"100000:0:D:HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk:3428\n" +
"Unlocks:\n" +
"0:SIG(0)\n" +
"Outputs:\n" +
"1000:0:SIG(yGKRRB18B4eaZQdksWBZubea4VJKFSSpii2okemP7x1)\n" +
"99000:0:SIG(HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk)\n" +
"Comment: reessai\n" +
"P6MxJ/2SdkvNDyIyWuOkTz3MUwsgsfo70j+rpWeQWcm6GdvKQsbplB8482Ar1HMz2q0h5V3tfMqjCuAeWVQ+Ag==\n");
yield fakeO.outputed;
return fakeO.outputed;
})
}],
service: {
input: fakeI,
process: fakeP,
output: fakeO
}
}
};
const dummyStopServiceDependency = {
duniter: {
cli: [{
name: 'bye-service',
desc: 'Says goodbye to the world, at service phase.',
onPluggedDALExecute: (duniterServer, conf, program, programArgs, startServices, stopServices) => co(function*(){
yield stopServices();
return Promise.resolve();
})
}],
service: {
input: fakeI,
process: fakeP,
output: fakeO
}
}
};
stack.registerDependency(dummyStartServiceDependency);
stack.registerDependency(dummyStopServiceDependency);
}));
it('verify that services are started', () => co(function*() {
fakeI.started.isResolved().should.equal(false);
fakeP.started.isResolved().should.equal(false);
fakeO.started.isResolved().should.equal(false);
fakeI.stopped.isResolved().should.equal(false);
fakeP.stopped.isResolved().should.equal(false);
fakeO.stopped.isResolved().should.equal(false);
yield run('hello-service');
fakeO.outputed.isResolved().should.equal(true); // The transaction has successfully gone through the whole stream
fakeI.started.isResolved().should.equal(true);
fakeP.started.isResolved().should.equal(true);
fakeO.started.isResolved().should.equal(true);
fakeI.stopped.isResolved().should.equal(false);
fakeP.stopped.isResolved().should.equal(false);
fakeO.stopped.isResolved().should.equal(false);
}));
it('verify that services are stopped', () => co(function*() {
fakeI.stopped.isResolved().should.equal(false);
fakeP.stopped.isResolved().should.equal(false);
fakeO.stopped.isResolved().should.equal(false);
fakeI.started.isResolved().should.equal(true);
fakeP.started.isResolved().should.equal(true);
fakeO.started.isResolved().should.equal(true);
yield run('bye-service');
fakeI.started.isResolved().should.equal(false);
fakeP.started.isResolved().should.equal(false);
fakeO.started.isResolved().should.equal(false);
fakeI.stopped.isResolved().should.equal(true);
fakeP.stopped.isResolved().should.equal(true);
fakeO.stopped.isResolved().should.equal(true);
}));
});
});
function FakeStream(onWrite) {
const that = this;
stream.Transform.call(this, { objectMode: true });
let resolveStart = () => null;
let resolveStop = () => null;
this._write = onWrite.bind(this, that);
this.started = querablep(new Promise(res => resolveStart = res));
this.stopped = querablep(new Promise(res => resolveStop = res));
this.startService = () => co(function*() {
resolveStart();
that.stopped = querablep(new Promise(res => resolveStop = res));
});
this.stopService = () => co(function*() {
resolveStop();
that.started = querablep(new Promise(res => resolveStart = res));
});
}
util.inherits(FakeStream, stream.Transform);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment