Commit 6a785867 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] tests: remove EventEmitter leaks

parent 4aa66888
......@@ -73,7 +73,7 @@ export class WorkerFarm {
}
shutDownEngine() {
this.theEngine.shutDown()
return this.theEngine.shutDown()
}
/**
......
......@@ -213,6 +213,8 @@ export class PermanentProver {
await this.prover.cancel();
// If we were waiting, stop it and process the continuous generation
this.blockchainChangedResolver && this.blockchainChangedResolver();
const farm = await this.prover.getWorker()
await farm.shutDownEngine()
}
private checkTrialIsNotTooHigh(trial:number, current:DBBlock, selfPubkey:string) {
......
......@@ -3,12 +3,11 @@ import {ProverConstants} from "./constants"
const _ = require('underscore')
const nuuid = require('node-uuid');
const moment = require('moment');
const cluster = require('cluster')
const querablep = require('querablep')
const logger = require('../../../lib/logger').NewLogger()
let clusterId = 0
cluster.setMaxListeners(3)
/**
* Cluster controller, handles the messages between the main program and the PoW cluster.
......@@ -25,6 +24,9 @@ export class Master {
logger:any
onInfoCallback:any
workersOnline:Promise<any>[]
private exitHandler: (worker: any, code: any, signal: any) => void
private onlineHandler: (worker: any) => void
private messageHandler: (worker: any, msg: any) => void
constructor(private nbCores:number, logger:any) {
this.clusterId = clusterId++
......@@ -32,6 +34,29 @@ export class Master {
this.onInfoMessage = (message:any) => {
this.logger.info(`${message.pow.pow} nonce = ${message.pow.block.nonce}`)
}
this.exitHandler = (worker:any, code:any, signal:any) => {
this.logger.info(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`)
}
this.onlineHandler = (worker:any) => {
// We just listen to the workers of this Master
if (this.slavesMap[worker.id]) {
this.logger.info(`[online] worker c#${this.clusterId}#w#${worker.id}`)
this.slavesMap[worker.id].online.extras.resolve()
worker.send({
command: 'conf',
value: this.conf
})
}
}
this.messageHandler = (worker:any, msg:any) => {
// Message for this cluster
if (this.slavesMap[worker.id]) {
this.onWorkerMessage(worker, msg)
}
}
}
get nbWorkers() {
......@@ -62,6 +87,10 @@ export class Master {
// this.logger.debug(`ENGINE c#${this.clusterId}#${this.slavesMap[worker.id].index}:`, message)
}
/*****************
* CLUSTER METHODS
****************/
initCluster() {
// Setup master
cluster.setupMaster({
......@@ -93,28 +122,9 @@ export class Master {
return this.slavesMap[worker.id]
})
cluster.on('exit', (worker:any, code:any, signal:any) => {
this.logger.info(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`)
})
cluster.on('online', (worker:any) => {
// We just listen to the workers of this Master
if (this.slavesMap[worker.id]) {
this.logger.info(`[online] worker c#${this.clusterId}#w#${worker.id}`)
this.slavesMap[worker.id].online.extras.resolve()
worker.send({
command: 'conf',
value: this.conf
})
}
})
cluster.on('message', (worker:any, msg:any) => {
// Message for this cluster
if (this.slavesMap[worker.id]) {
this.onWorkerMessage(worker, msg)
}
})
cluster.on('exit', this.exitHandler)
cluster.on('online', this.onlineHandler)
cluster.on('message', this.messageHandler)
this.workersOnline = this.slaves.map((s:any) => s.online)
return Promise.all(this.workersOnline)
......@@ -165,7 +175,11 @@ export class Master {
await Promise.all(this.slaves.map(async (s:any) => {
s.worker.kill()
}))
cluster.removeListener('exit', this.exitHandler)
cluster.removeListener('online', this.onlineHandler)
cluster.removeListener('message', this.messageHandler)
}
this.slaves = []
}
proveByWorkers(stuff:any) {
......
......@@ -28,6 +28,10 @@ process.on('uncaughtException', (err:any) => {
}
});
process.on('unhandledRejection', () => {
process.exit()
})
process.on('message', async (message) => {
switch (message.command) {
......
......@@ -28,6 +28,8 @@ const daemonDependency = require('./app/modules/daemon');
const pSignalDependency = require('./app/modules/peersignal');
const pluginDependency = require('./app/modules/plugin');
let sigintListening = false
class Stacks {
static todoOnRunDone:() => any = () => process.exit()
......@@ -157,6 +159,8 @@ export interface TransformableDuniterService extends DuniterService, stream.Tran
class Stack {
private injectedServices = false
private cli:any
private configLoadingCallbacks:any[]
private configBeforeSaveCallbacks:any[]
......@@ -279,10 +283,12 @@ class Stack {
}
const server = new Server(home, program.memory === true, commandLineConf(program));
let piped = false
// If ever the process gets interrupted
let isSaving = false;
process.on('SIGINT', async () => {
if (!sigintListening) {
process.on('SIGINT', async () => {
if (!isSaving) {
isSaving = true;
// Save DB
......@@ -294,7 +300,9 @@ class Stack {
process.exit(3);
}
}
});
})
sigintListening = true
}
// Config or Data reset hooks
server.resetDataHook = async () => {
......@@ -366,26 +374,30 @@ class Stack {
* Service injection
* -----------------
*/
for (const def of this.definitions) {
if (def.service) {
// To feed data coming from some I/O (network, disk, other module, ...)
if (def.service.input) {
this.streams.input.push(def.service.input(server, conf, logger));
}
// To handle data this has been submitted by INPUT stream
if (def.service.process) {
this.streams.process.push(def.service.process(server, conf, logger));
}
// To handle data this has been validated by PROCESS stream
if (def.service.output) {
this.streams.output.push(def.service.output(server, conf, logger));
}
// Special service which does not stream anything particular (ex.: piloting the `server` object)
if (def.service.neutral) {
this.streams.neutral.push(def.service.neutral(server, conf, logger));
if (!this.injectedServices) {
this.injectedServices = true
for (const def of this.definitions) {
if (def.service) {
// To feed data coming from some I/O (network, disk, other module, ...)
if (def.service.input) {
this.streams.input.push(def.service.input(server, conf, logger));
}
// To handle data this has been submitted by INPUT stream
if (def.service.process) {
this.streams.process.push(def.service.process(server, conf, logger));
}
// To handle data this has been validated by PROCESS stream
if (def.service.output) {
this.streams.output.push(def.service.output(server, conf, logger));
}
// Special service which does not stream anything particular (ex.: piloting the `server` object)
if (def.service.neutral) {
this.streams.neutral.push(def.service.neutral(server, conf, logger));
}
}
}
}
piped = true
// All inputs write to global INPUT stream
for (const module of this.streams.input) module.pipe(this.INPUT);
// All processes read from global INPUT stream
......@@ -408,13 +420,6 @@ class Stack {
const modules = this.streams.input.concat(this.streams.process).concat(this.streams.output).concat(this.streams.neutral);
// Any streaming module must implement a `stopService` method
await Promise.all(modules.map((module:DuniterService) => module.stopService()))
// // Stop reading inputs
// for (const module of streams.input) module.unpipe();
// Stop reading from global INPUT
// INPUT.unpipe();
// for (const module of streams.process) module.unpipe();
// // Stop reading from global PROCESS
// PROCESS.unpipe();
},
this);
......@@ -422,6 +427,15 @@ class Stack {
} catch (e) {
server.disconnect();
throw e;
} finally {
if (piped) {
// Unpipe everything, as the command is done
for (const module of this.streams.input) module.unpipe()
for (const module of this.streams.process) module.unpipe()
for (const module of this.streams.output) module.unpipe()
this.INPUT.unpipe()
this.PROCESS.unpipe()
}
}
}
......
......@@ -12,6 +12,10 @@ describe('PoW Cluster', () => {
master = new Master(1, logger)
})
after(() => {
return master.shutDownWorkers()
})
it('should have an empty cluster if no PoW was asked', () => {
master.nbWorkers.should.equal(0)
})
......
......@@ -10,6 +10,7 @@ describe('PoW Engine', () => {
it('should be configurable', () => co(function*(){
const e1 = new PowEngine({ nbCores: 1 }, logger);
(yield e1.setConf({ cpu: 0.2, prefix: '34' })).should.deepEqual({ cpu: 0.2, prefix: '34' });
yield e1.shutDown()
}));
it('should be able to make a proof', () => co(function*(){
......@@ -52,6 +53,7 @@ describe('PoW Engine', () => {
pow: '009A52E6E2E4EA7DE950A2DA673114FA55B070EBE350D75FF0C62C6AAE9A37E5'
}
});
yield e1.shutDown()
}));
it('should be able to stop a proof', () => co(function*(){
......@@ -85,5 +87,6 @@ describe('PoW Engine', () => {
yield e1.cancel()
// const proof = yield proofPromise;
// should.not.exist(proof);
yield e1.shutDown()
}));
});
......@@ -11,6 +11,7 @@ const rp = require('request-promise');
const httpTest = require('./tools/http');
const commit = require('./tools/commit');
const sync = require('./tools/sync');
const cluster = require('cluster');
const shutDownEngine = require('./tools/shutDownEngine');
const expectJSON = httpTest.expectJSON;
const MEMORY_MODE = true;
......@@ -25,6 +26,7 @@ const commonConf = {
let s1, s2, cat, toc;
describe("Switch", function () {
before(() => co(function* () {
cluster.setMaxListeners(6);
s1 = duniter('/bb11', MEMORY_MODE, _.extend({
switchOnHeadAdvance: 0,
port: '7788',
......@@ -78,6 +80,7 @@ describe("Switch", function () {
// S1 should have switched to the other branch
}));
after(() => {
cluster.setMaxListeners(3);
return Promise.all([
shutDownEngine(s1),
shutDownEngine(s2)
......
......@@ -11,6 +11,7 @@ const rp = require('request-promise');
const httpTest = require('./tools/http');
const commit = require('./tools/commit');
const sync = require('./tools/sync');
const cluster = require('cluster')
const shutDownEngine = require('./tools/shutDownEngine');
const expectJSON = httpTest.expectJSON;
......@@ -31,6 +32,8 @@ describe("Switch", function() {
before(() => co(function *() {
cluster.setMaxListeners(6)
s1 = duniter(
'/bb11',
MEMORY_MODE,
......@@ -97,6 +100,7 @@ describe("Switch", function() {
}));
after(() => {
cluster.setMaxListeners(3)
return Promise.all([
shutDownEngine(s1),
shutDownEngine(s2)
......
......@@ -37,6 +37,7 @@ describe("Continous proof-of-work", function() {
yield i1.join();
yield i2.join();
yield s1.commit();
yield s1.closeCluster();
}));
it('should automatically stop waiting if nothing happens', () => co(function*() {
......@@ -104,7 +105,7 @@ describe("Continous proof-of-work", function() {
s2.conf.cpu = 1.0;
s2.startBlockComputation();
yield s2.until('block', 15);
s2.stopBlockComputation();
yield s2.stopBlockComputation();
yield [
require('../../app/modules/crawler').CrawlerDependency.duniter.methods.pullBlocks(s3),
new Promise(res => {
......@@ -121,11 +122,6 @@ describe("Continous proof-of-work", function() {
const current = yield s3.get('/blockchain/current')
yield s3.stopBlockComputation();
current.number.should.be.aboveOrEqual(14)
yield s1.closeCluster()
}));
after(() => {
return Promise.all([
s1.closeCluster()
])
})
});
"use strict";
const should = require('should');
import {NewLogger} from "../../app/lib/logger"
import {BmaDependency} from "../../app/modules/bma/index"
import {TestUser} from "./tools/TestUser"
import {simpleTestingConf, simpleTestingServer, TestingServer} from "./tools/toolbox"
import {RouterDependency} from "../../app/modules/router"
require('should');
const assert = require('assert');
const async = require('async');
const _ = require('underscore');
const co = require('co');
const node = require('./tools/node');
const TestUser = require('./tools/TestUser').TestUser
const jspckg = require('../../package');
const constants = require('../../app/lib/constants');
require('../../app/modules/bma').BmaDependency.duniter.methods.noLimit(); // Disables the HTTP limiter
BmaDependency.duniter.methods.noLimit(); // Disables the HTTP limiter
if (constants.MUTE_LOGS_DURING_UNIT_TESTS) {
require('../../app/lib/logger').NewLogger().mute();
NewLogger().mute()
}
describe("Forwarding", function() {
describe("Nodes", function() {
const common = { currency: 'bb', nobma: false, bmaWithCrawler:true, ws2p: { upnp: false }, ipv4: '127.0.0.1', remoteipv4: '127.0.0.1', rootoffset: 0, sigQty: 1 };
const now = 1500000000
const conf1 = simpleTestingConf(now, { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'})
const conf2 = simpleTestingConf(now, { pub: 'G2CBgZBPLe6FSFUgpx2Jf1Aqsgta6iib3vmDRA1yLiqU', sec: '58LDg8QLmF5pv6Dn9h7X4yFKfMTdP8fdAiWVcyDoTRJu454fwRihCLULH4MW37zncsg4ruoTGJPZneWk22QmG1w4'})
const node1 = node('db_1', _({ upnp: false, httplogs: false, port: 9600, remoteport: 9600, pair: { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} }).extend(common));
const node2 = node('db_2', _({ upnp: false, httplogs: false, port: 9601, remoteport: 9601, pair: { pub: 'G2CBgZBPLe6FSFUgpx2Jf1Aqsgta6iib3vmDRA1yLiqU', sec: '58LDg8QLmF5pv6Dn9h7X4yFKfMTdP8fdAiWVcyDoTRJu454fwRihCLULH4MW37zncsg4ruoTGJPZneWk22QmG1w4'} }).extend(common));
const node1 = simpleTestingServer(conf1)
const node2 = simpleTestingServer(conf2)
const cat = new TestUser('cat', { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}, node1);
const tac = new TestUser('tac', { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'}, node1);
const tic = new TestUser('tic', { pub: 'DNann1Lh55eZMEDXeYt59bzHbA3NJR46DeQYCS2qQdLV', sec: '468Q1XtTq7h84NorZdWBZFJrGkB18CbmbHr9tkp9snt5GiERP7ySs3wM8myLccbAAGejgMRC9rqnXuW3iAfZACm7'}, node1);
const toc = new TestUser('toc', { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'}, node1);
before(() => co(function*(){
yield [node1, node2].map((theNode) => theNode.startTesting());
yield new Promise(function(resolve, reject){
async.waterfall([
function(next) {
node2.peering(next);
},
function(peer, next) {
node1.submitPeer(peer, function(err) {
next(err);
});
},
function(next) {
node1.peering(next);
},
function(peer, next) {
node2.submitPeer(peer, next);
}
], function(err) {
err ? reject(err) : resolve();
});
});
yield [
before(async () => {
await node1.initDalBmaConnections()
await node2.initDalBmaConnections()
await node1.sharePeeringWith(node2)
await node2.sharePeeringWith(node1)
RouterDependency.duniter.methods.routeToNetwork(node1._server)
RouterDependency.duniter.methods.routeToNetwork(node2._server)
await Promise.all([
node2.until('identity', 4),
node2.until('certification', 2),
node2.until('block', 1),
co(function *() {
(async () => {
// Self certifications
yield cat.createIdentity();
yield tac.createIdentity();
yield tic.createIdentity();
yield toc.createIdentity();
await cat.createIdentity();
await tac.createIdentity();
await tic.createIdentity();
await toc.createIdentity();
// Certifications
yield cat.cert(tac);
yield tac.cert(cat);
yield cat.join();
yield tac.join();
yield node1.commitP();
})
];
yield [
await cat.cert(tac);
await tac.cert(cat);
await cat.join();
await tac.join();
await node1.commit({ time: now })
})()
])
await Promise.all([
node2.until('revocation', 1),
co(function *() {
yield cat.revoke();
})
];
}));
cat.revoke()
])
})
describe("Testing technical API", function(){
it('Node1 should be up and running', node1.summary(function(summary, done){
it('Node1 should be up and running', () => node1.expectThat('/node/summary', (summary:any) => {
should.exists(summary);
should.exists(summary.duniter);
should.exists(summary.duniter.software);
should.exists(summary.duniter.version);
assert.equal(summary.duniter.software, "duniter");
assert.equal(summary.duniter.version, jspckg.version);
done();
}));
}))
it('Node2 should be up and running', node2.summary(function(summary, done){
it('Node2 should be up and running', () => node2.expectThat('/node/summary', (summary:any) => {
should.exists(summary);
should.exists(summary.duniter);
should.exists(summary.duniter.software);
should.exists(summary.duniter.version);
assert.equal(summary.duniter.software, "duniter");
assert.equal(summary.duniter.version, jspckg.version);
done();
}));
}))
});
describe('Node 1', doTests(node1));
......@@ -107,78 +90,47 @@ describe("Forwarding", function() {
});
});
function doTests(theNode) {
function doTests(theNode:TestingServer) {
return function(){
return () => {
describe("user cat", function(){
describe("user cat", () => {
it('should give only 1 result', theNode.lookup('cat', function(res, done){
try {
should.exists(res);
assert.equal(res.results.length, 1);
done();
} catch (e) {
done(e);
}
it('should give only 1 result', () => theNode.expectThat('/wot/lookup/cat', (res:any) => {
should.exists(res);
assert.equal(res.results.length, 1);
}));
it('should have sent 1 signature', theNode.lookup('cat', function(res, done){
try {
should.exists(res);
assert.equal(res.results[0].signed.length, 1);
should.exists(res.results[0].signed[0].isMember);
should.exists(res.results[0].signed[0].wasMember);
assert.equal(res.results[0].signed[0].isMember, true);
assert.equal(res.results[0].signed[0].wasMember, true);
done();
} catch (e) {
done(e);
}
it('should have sent 1 signature', () => theNode.expectThat('/wot/lookup/cat', (res:any) => {
should.exists(res);
assert.equal(res.results[0].signed.length, 1);
should.exists(res.results[0].signed[0].isMember);
should.exists(res.results[0].signed[0].wasMember);
assert.equal(res.results[0].signed[0].isMember, true);
assert.equal(res.results[0].signed[0].wasMember, true);
}));
});
describe("user tac", function(){
describe("user tac", () => {
it('should give only 1 result', theNode.lookup('tac', function(res, done){
try {
should.exists(res);
assert.equal(res.results.length, 1);
done();
} catch (e) {
done(e);
}
}));
it('should give only 1 result', () => theNode.expectThat('/wot/lookup/tac', (res:any) => {
should.exists(res);
assert.equal(res.results.length, 1);
}))
it('should have 1 signature', theNode.lookup('tac', function(res, done){
try {
should.exists(res);
assert.equal(res.results[0].uids[0].others.length, 1);
done();
} catch (e) {
done(e);
}
}));