Skip to content
Snippets Groups Projects
Commit 0f652ac6 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1096 Server shutdown should not interrupt a document FIFO task

parent c9541c41
No related branches found
No related tags found
No related merge requests found
"use strict"; "use strict";
import {CommonConstants} from "../lib/common-libs/constants"; import {CommonConstants} from "../lib/common-libs/constants"
const async = require('async'); const async = require('async');
export class GlobalFifoPromise { export class GlobalFifoPromise {
...@@ -9,6 +9,7 @@ export class GlobalFifoPromise { ...@@ -9,6 +9,7 @@ export class GlobalFifoPromise {
}, 1) }, 1)
private operations:{ [k:string]: boolean } = {} private operations:{ [k:string]: boolean } = {}
private currentPromise:Promise<any>|null
constructor() { constructor() {
} }
...@@ -29,7 +30,8 @@ export class GlobalFifoPromise { ...@@ -29,7 +30,8 @@ export class GlobalFifoPromise {
this.fifo.push(async (cb:any) => { this.fifo.push(async (cb:any) => {
// OK its the turn of given promise, execute it // OK its the turn of given promise, execute it
try { try {
const res = await p(); this.currentPromise = p()
const res = await this.currentPromise
delete this.operations[operationId] delete this.operations[operationId]
// Finished, we end the function in the FIFO // Finished, we end the function in the FIFO
cb(null, res); cb(null, res);
...@@ -45,5 +47,16 @@ export class GlobalFifoPromise { ...@@ -45,5 +47,16 @@ export class GlobalFifoPromise {
resolve(res); resolve(res);
}); });
}); });
}; }
async closeFIFO() {
this.fifo.pause()
if (this.currentPromise) {
await this.currentPromise
}
}
remainingTasksCount() {
return this.fifo.length()
}
} }
...@@ -67,6 +67,7 @@ export class Server extends stream.Duplex implements HookableServer { ...@@ -67,6 +67,7 @@ export class Server extends stream.Duplex implements HookableServer {
PeeringService:PeeringService PeeringService:PeeringService
BlockchainService:BlockchainService BlockchainService:BlockchainService
TransactionsService:TransactionService TransactionsService:TransactionService
private documentFIFO:GlobalFifoPromise
constructor(home:string, memoryOnly:boolean, private overrideConf:any) { constructor(home:string, memoryOnly:boolean, private overrideConf:any) {
super({ objectMode: true }) super({ objectMode: true })
...@@ -78,14 +79,18 @@ export class Server extends stream.Duplex implements HookableServer { ...@@ -78,14 +79,18 @@ export class Server extends stream.Duplex implements HookableServer {
this.paramsP = directory.getHomeParams(memoryOnly, home) this.paramsP = directory.getHomeParams(memoryOnly, home)
const documentFIFO = new GlobalFifoPromise() this.documentFIFO = new GlobalFifoPromise()
this.MerkleService = require("./app/lib/helpers/merkle").processForURL this.MerkleService = require("./app/lib/helpers/merkle").processForURL
this.IdentityService = new IdentityService(documentFIFO) this.IdentityService = new IdentityService(this.documentFIFO)
this.MembershipService = new MembershipService(documentFIFO) this.MembershipService = new MembershipService(this.documentFIFO)
this.PeeringService = new PeeringService(this, documentFIFO) this.PeeringService = new PeeringService(this, this.documentFIFO)
this.BlockchainService = new BlockchainService(this, documentFIFO) this.BlockchainService = new BlockchainService(this, this.documentFIFO)
this.TransactionsService = new TransactionService(documentFIFO) this.TransactionsService = new TransactionService(this.documentFIFO)
}
getDocumentsFIFO() {
return this.documentFIFO
} }
// Unused, but made mandatory by Duplex interface // Unused, but made mandatory by Duplex interface
...@@ -459,8 +464,11 @@ export class Server extends stream.Duplex implements HookableServer { ...@@ -459,8 +464,11 @@ export class Server extends stream.Duplex implements HookableServer {
} }
} }
disconnect() { async disconnect() {
return Promise.resolve(this.dal && this.dal.close()) await this.documentFIFO.closeFIFO()
if (this.dal) {
await this.dal.close()
}
} }
revert() { revert() {
......
import {NewTestingServer} from "./tools/toolbox"
const should = require('should')
const querablep = require('querablep')
describe("Server shutdown", () => {
it('should not interrupt a task in the documents FIFO', async () => {
const s1 = NewTestingServer({})
const fifo = s1._server.getDocumentsFIFO()
const ops:any[] = []
for (let i = 0; i < 10; i++) {
ops.push(querablep(fifo.pushFIFOPromise('op_' + i, async () => {
// Wait 100ms
await new Promise(res => setTimeout(res, 15))
})))
}
fifo.remainingTasksCount().should.equal(10)
while(fifo.remainingTasksCount() >= 9) {
// Wait 1ms until two tasks have been taken
await new Promise(res => setTimeout(res, 5))
}
await fifo.closeFIFO()
await ops[0]
await ops[1]
fifo.remainingTasksCount().should.equal(8)
ops[0].isFulfilled().should.equal(true)
ops[1].isFulfilled().should.equal(true)
for (let i = 2; i < 10; i++) {
ops[i].isFulfilled().should.equal(false)
}
})
})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment