From 0f652ac674be12716c0c70432a25bab0ac55fba5 Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Tue, 19 Sep 2017 12:47:40 +0200 Subject: [PATCH] [fix] #1096 Server shutdown should not interrupt a document FIFO task --- app/service/GlobalFifoPromise.ts | 19 +++++++++++++--- server.ts | 24 +++++++++++++------- test/integration/server-shutdown.ts | 34 +++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 11 deletions(-) create mode 100644 test/integration/server-shutdown.ts diff --git a/app/service/GlobalFifoPromise.ts b/app/service/GlobalFifoPromise.ts index d250e1c52..27485cab6 100644 --- a/app/service/GlobalFifoPromise.ts +++ b/app/service/GlobalFifoPromise.ts @@ -1,5 +1,5 @@ "use strict"; -import {CommonConstants} from "../lib/common-libs/constants"; +import {CommonConstants} from "../lib/common-libs/constants" const async = require('async'); export class GlobalFifoPromise { @@ -9,6 +9,7 @@ export class GlobalFifoPromise { }, 1) private operations:{ [k:string]: boolean } = {} + private currentPromise:Promise<any>|null constructor() { } @@ -29,7 +30,8 @@ export class GlobalFifoPromise { this.fifo.push(async (cb:any) => { // OK its the turn of given promise, execute it try { - const res = await p(); + this.currentPromise = p() + const res = await this.currentPromise delete this.operations[operationId] // Finished, we end the function in the FIFO cb(null, res); @@ -45,5 +47,16 @@ export class GlobalFifoPromise { resolve(res); }); }); - }; + } + + async closeFIFO() { + this.fifo.pause() + if (this.currentPromise) { + await this.currentPromise + } + } + + remainingTasksCount() { + return this.fifo.length() + } } diff --git a/server.ts b/server.ts index fbc57016d..85e4b59ea 100644 --- a/server.ts +++ b/server.ts @@ -67,6 +67,7 @@ export class Server extends stream.Duplex implements HookableServer { PeeringService:PeeringService BlockchainService:BlockchainService TransactionsService:TransactionService + private documentFIFO:GlobalFifoPromise constructor(home:string, memoryOnly:boolean, private overrideConf:any) { super({ objectMode: true }) @@ -78,14 +79,18 @@ export class Server extends stream.Duplex implements HookableServer { this.paramsP = directory.getHomeParams(memoryOnly, home) - const documentFIFO = new GlobalFifoPromise() + this.documentFIFO = new GlobalFifoPromise() this.MerkleService = require("./app/lib/helpers/merkle").processForURL - this.IdentityService = new IdentityService(documentFIFO) - this.MembershipService = new MembershipService(documentFIFO) - this.PeeringService = new PeeringService(this, documentFIFO) - this.BlockchainService = new BlockchainService(this, documentFIFO) - this.TransactionsService = new TransactionService(documentFIFO) + this.IdentityService = new IdentityService(this.documentFIFO) + this.MembershipService = new MembershipService(this.documentFIFO) + this.PeeringService = new PeeringService(this, this.documentFIFO) + this.BlockchainService = new BlockchainService(this, this.documentFIFO) + this.TransactionsService = new TransactionService(this.documentFIFO) + } + + getDocumentsFIFO() { + return this.documentFIFO } // Unused, but made mandatory by Duplex interface @@ -459,8 +464,11 @@ export class Server extends stream.Duplex implements HookableServer { } } - disconnect() { - return Promise.resolve(this.dal && this.dal.close()) + async disconnect() { + await this.documentFIFO.closeFIFO() + if (this.dal) { + await this.dal.close() + } } revert() { diff --git a/test/integration/server-shutdown.ts b/test/integration/server-shutdown.ts new file mode 100644 index 000000000..854e20eae --- /dev/null +++ b/test/integration/server-shutdown.ts @@ -0,0 +1,34 @@ +import {NewTestingServer} from "./tools/toolbox" + +const should = require('should') +const querablep = require('querablep') + +describe("Server shutdown", () => { + + it('should not interrupt a task in the documents FIFO', async () => { + const s1 = NewTestingServer({}) + + const fifo = s1._server.getDocumentsFIFO() + const ops:any[] = [] + for (let i = 0; i < 10; i++) { + ops.push(querablep(fifo.pushFIFOPromise('op_' + i, async () => { + // Wait 100ms + await new Promise(res => setTimeout(res, 15)) + }))) + } + fifo.remainingTasksCount().should.equal(10) + while(fifo.remainingTasksCount() >= 9) { + // Wait 1ms until two tasks have been taken + await new Promise(res => setTimeout(res, 5)) + } + await fifo.closeFIFO() + await ops[0] + await ops[1] + fifo.remainingTasksCount().should.equal(8) + ops[0].isFulfilled().should.equal(true) + ops[1].isFulfilled().should.equal(true) + for (let i = 2; i < 10; i++) { + ops[i].isFulfilled().should.equal(false) + } + }) +}) -- GitLab