Commit 3bb0aa73 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1234 Synchronize workers on each PoW task

parent 6f01a449
import {Querable} from "./permanentProver"
const querablep = require('querablep')
/*********
*
* PoW worker
* ----------
*
* Its model is super simple: we ask him to find a proof, and we can wait for it.
* Eventually, we can tell him to cancel his proof, which makes it answer `null` as proof value.
*
* The worker also provides two properties:
*
* - `worker.online`: a promise which is resolved when the worker gets « online » for the first time
* - `worker.exit`: a promise which is resolved when the worker exits (which occurs when the worker is being closed or killed)
*
********/
export class PowWorker {
private onlinePromise:Promise<void>
private onlineResolver:()=>void
private exitPromise:Promise<void>
private exitResolver:()=>void
private proofPromise:Querable<{ message: { answer:any }}|null>
private proofResolver:(proof:{ message: { answer:any }}|null)=>void
private messageHandler:((worker:any, msg:any)=>void)
constructor(
private nodejsWorker:any,
private onPowMessage:(message:any)=>void,
private onlineHandler:()=>void,
private exitHandler:(code:any, signal:any)=>void) {
// Handle "online" promise
this.onlinePromise = new Promise(res => this.onlineResolver = res)
nodejsWorker.on('online', () => {
this.onlineHandler()
this.onlineResolver()
})
// Handle "exit" promise
this.exitPromise = new Promise(res => this.exitResolver = res)
nodejsWorker.on('exit', (code:any, signal:any) => {
this.exitHandler(code, signal)
this.exitResolver()
})
nodejsWorker.on('message', (message:any) => {
if (message) {
this.onPowMessage(message)
}
if (this.proofPromise && message.uuid && !this.proofPromise.isResolved() && this.proofResolver) {
const result:{ message: { answer:any }}|null = message ? { message } : null
this.proofResolver(result)
}
})
}
get online() {
return this.onlinePromise
}
get exited() {
return this.exitPromise
}
get pid() {
return this.nodejsWorker.process.pid
}
askProof(commandMessage:{ uuid:string, command:string, value:any }) {
this.proofPromise = querablep(new Promise<{ message: { answer:any }}|null>(res => this.proofResolver = res))
this.nodejsWorker.send(commandMessage)
return this.proofPromise
}
sendConf(confMessage:{ command:string, value:any }) {
this.nodejsWorker.send(confMessage)
}
sendCancel() {
this.nodejsWorker.send({
command: 'cancel'
})
}
kill() {
this.nodejsWorker.kill()
}
}
\ No newline at end of file
......@@ -33,9 +33,7 @@ export class PowEngine {
}
async prove(stuff:any) {
if (this.cluster.hasProofPending) {
await this.cluster.cancelWork()
}
await this.cluster.cancelWork()
return await this.cluster.proveByWorkers(stuff)
}
......
......@@ -9,7 +9,7 @@ import {Server} from "../../../../server"
const querablep = require('querablep');
interface Querable<T> extends Promise<T> {
export interface Querable<T> extends Promise<T> {
isFulfilled(): boolean
isResolved(): boolean
isRejected(): boolean
......
import {ConfDTO} from "../../../lib/dto/ConfDTO"
import {ProverConstants} from "./constants"
import {createPowWorker} from "./proof"
import {PowWorker} from "./PowWorker"
const _ = require('underscore')
const nuuid = require('node-uuid');
......@@ -9,6 +11,13 @@ const querablep = require('querablep')
let clusterId = 0
cluster.setMaxListeners(3)
export interface SlaveWorker {
worker:PowWorker,
index:number,
online:Promise<void>,
nonceBeginning:number
}
/**
* Cluster controller, handles the messages between the main program and the PoW cluster.
*/
......@@ -18,15 +27,14 @@ export class Master {
clusterId:number
currentPromise:any|null = null
slaves:any[] = []
slavesMap:any = {}
slaves:SlaveWorker[] = []
slavesMap:{
[k:number]: SlaveWorker|null
} = {}
conf:any = {}
logger:any
onInfoCallback:any
workersOnline:Promise<any>[]
private exitHandler: (worker: any, code: any, signal: any) => void
private onlineHandler: (worker: any) => void
private messageHandler: (worker: any, msg: any) => void
constructor(private nbCores:number, logger:any) {
this.clusterId = clusterId++
......@@ -34,53 +42,23 @@ export class Master {
this.onInfoMessage = (message:any) => {
this.logger.info(`${message.pow.pow} nonce = ${message.pow.block.nonce}`)
}
this.exitHandler = (worker:any, code:any, signal:any) => {
this.logger.info(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`)
}
this.onlineHandler = (worker:any) => {
// We just listen to the workers of this Master
if (this.slavesMap[worker.id]) {
this.logger.info(`[online] worker c#${this.clusterId}#w#${worker.id}`)
this.slavesMap[worker.id].online.extras.resolve()
worker.send({
command: 'conf',
value: this.conf
})
}
}
this.messageHandler = (worker:any, msg:any) => {
// Message for this cluster
if (this.slavesMap[worker.id]) {
this.onWorkerMessage(worker, msg)
}
}
}
get nbWorkers() {
return this.slaves.length
}
get hasProofPending() {
return !!this.currentPromise
}
set onInfoMessage(callback:any) {
this.onInfoCallback = callback
}
onWorkerMessage(worker:any, message:any) {
onWorkerMessage(workerIndex:number, message:any) {
// this.logger.info(`worker#${this.slavesMap[worker.id].index} sent message:${message}`)
if (message.pow && message.pow.pow) {
if (message && message.pow) {
this.onInfoCallback && this.onInfoCallback(message)
}
if (this.currentPromise && message.uuid === this.currentPromise.extras.uuid && !this.currentPromise.isResolved() && message.answer) {
this.logger.info(`ENGINE c#${this.clusterId}#${this.slavesMap[worker.id].index} HAS FOUND A PROOF #${message.answer.pow.pow}`)
this.currentPromise.extras.resolve(message.answer)
// Stop the slaves' current work
this.cancelWork()
if (this.currentPromise && message.uuid && !this.currentPromise.isResolved() && message.answer) {
this.logger.info(`ENGINE c#${this.clusterId}#${workerIndex} HAS FOUND A PROOF #${message.answer.pow.pow}`)
} else if (message.canceled) {
this.nbCancels++
}
......@@ -94,13 +72,26 @@ export class Master {
initCluster() {
// Setup master
cluster.setupMaster({
exec: __filename
exec: __filename,
execArgv: [] // Do not try to debug forks
})
this.slaves = Array.from({ length: this.nbCores }).map((value, index) => {
const worker = cluster.fork()
this.logger.info(`Creating worker c#${this.clusterId}#w#${worker.id}`)
this.slavesMap[worker.id] = {
const nodejsWorker = cluster.fork()
const worker = new PowWorker(nodejsWorker, message => {
this.onWorkerMessage(index, message)
}, () => {
this.logger.info(`[online] worker c#${this.clusterId}#w#${index}`)
worker.sendConf({
command: 'conf',
value: this.conf
})
}, (code:any, signal:any) => {
this.logger.info(`worker ${worker.pid} died with code ${code} and signal ${signal}`)
})
this.logger.info(`Creating worker c#${this.clusterId}#w#${nodejsWorker.id}`)
const slave = {
// The Node.js worker
worker,
......@@ -109,24 +100,16 @@ export class Master {
index,
// Worker ready
online: (function onlinePromise() {
let resolve
const p = querablep(new Promise(res => resolve = res))
p.extras = { resolve }
return p
})(),
online: worker.online,
// Each worker has his own chunk of possible nonces
nonceBeginning: this.nbCores === 1 ? 0 : (index + 1) * ProverConstants.NONCE_RANGE
}
return this.slavesMap[worker.id]
this.slavesMap[nodejsWorker.id] = slave
return slave
})
cluster.on('exit', this.exitHandler)
cluster.on('online', this.onlineHandler)
cluster.on('message', this.messageHandler)
this.workersOnline = this.slaves.map((s:any) => s.online)
this.workersOnline = this.slaves.map((s) => s.online)
return Promise.all(this.workersOnline)
}
......@@ -135,7 +118,7 @@ export class Master {
this.conf.cpu = conf.cpu || this.conf.cpu
this.conf.prefix = this.conf.prefix || conf.prefix
this.slaves.forEach(s => {
s.worker.send({
s.worker.sendConf({
command: 'conf',
value: this.conf
})
......@@ -143,41 +126,26 @@ export class Master {
return Promise.resolve(_.clone(conf))
}
cancelWork() {
this.logger.info(`Cancelling the work on PoW cluster of %s slaves`, this.slaves.length)
private cancelWorkersWork() {
this.slaves.forEach(s => {
s.worker.send({
command: 'cancel'
})
s.worker.sendCancel()
})
}
// Eventually force the end of current promise
if (this.currentPromise && !this.currentPromise.isFulfilled()) {
this.currentPromise.extras.resolve(null)
}
async cancelWork() {
this.cancelWorkersWork()
const workEnded = this.currentPromise
// Current promise is done
this.currentPromise = null
return Promise.resolve()
}
newPromise(uuid:string) {
let resolve
const p = querablep(new Promise(res => resolve = res))
p.extras = { resolve, uuid }
return p
return await workEnded
}
async shutDownWorkers() {
if (this.workersOnline) {
await Promise.all(this.workersOnline)
await Promise.all(this.slaves.map(async (s:any) => {
await Promise.all(this.slaves.map(async (s) => {
s.worker.kill()
}))
cluster.removeListener('exit', this.exitHandler)
cluster.removeListener('online', this.onlineHandler)
cluster.removeListener('message', this.messageHandler)
}
this.slaves = []
}
......@@ -191,9 +159,7 @@ export class Master {
// Register the new proof uuid
const uuid = nuuid.v4()
this.currentPromise = this.newPromise(uuid)
return (async () => {
this.currentPromise = querablep((async () => {
await Promise.all(this.workersOnline)
if (!this.currentPromise) {
......@@ -202,8 +168,8 @@ export class Master {
}
// Start the salves' job
this.slaves.forEach((s:any, index) => {
s.worker.send({
const asks = this.slaves.map(async (s, index) => {
const proof = await s.worker.askProof({
uuid,
command: 'newPoW',
value: {
......@@ -222,10 +188,29 @@ export class Master {
}
}
})
this.logger.info(`[done] worker c#${this.clusterId}#w#${index}`)
return {
workerID: index,
proof
}
})
return await this.currentPromise
})()
// Find a proof
const result = await Promise.race(asks)
this.cancelWorkersWork()
// Wait for all workers to have stopped looking for a proof
await Promise.all(asks)
if (!result.proof || !result.proof.message.answer) {
this.logger.info('No engine found the proof. It was probably cancelled.')
return null
} else {
this.logger.info(`ENGINE c#${this.clusterId}#${result.workerID} HAS FOUND A PROOF #${result.proof.message.answer.pow.pow}`)
return result.proof.message.answer
}
})())
return this.currentPromise
}
static defaultLogger() {
......@@ -250,5 +235,5 @@ if (cluster.isMaster) {
process.exit(0)
});
require('./proof')
createPowWorker()
}
......@@ -11,295 +11,298 @@ import {ProcessCpuProfiler} from "../../../ProcessCpuProfiler"
const moment = require('moment');
const querablep = require('querablep');
let computing = querablep(Promise.resolve(null));
let askedStop = false;
export function createPowWorker() {
// By default, we do not prefix the PoW by any number
let prefix = 0;
let computing = querablep(Promise.resolve(null));
let askedStop = false;
let signatureFunc:any, lastSecret:any, currentCPU = 1;
// By default, we do not prefix the PoW by any number
let prefix = 0;
process.on('uncaughtException', (err:any) => {
console.error(err.stack || Error(err))
if (process.send) {
process.send({error: err});
} else {
throw Error('process.send() is not defined')
}
});
let signatureFunc:any, lastSecret:any, currentCPU = 1;
process.on('unhandledRejection', () => {
process.exit()
})
process.on('uncaughtException', (err:any) => {
console.error(err.stack || Error(err))
if (process.send) {
process.send({error: err});
} else {
throw Error('process.send() is not defined')
}
});
process.on('message', async (message) => {
process.on('unhandledRejection', () => {
process.exit()
})
switch (message.command) {
process.on('message', async (message) => {
case 'newPoW':
(async () => {
askedStop = true
switch (message.command) {
// Very important: do not await if the computation is already done, to keep the lock on JS engine
if (!computing.isFulfilled()) {
await computing;
}
case 'newPoW':
(async () => {
askedStop = true
const res = await beginNewProofOfWork(message.value);
answer(message, res);
})()
break;
// Very important: do not await if the computation is already done, to keep the lock on JS engine
if (!computing.isFulfilled()) {
await computing;
}
case 'cancel':
if (!computing.isFulfilled()) {
askedStop = true;
}
break;
const res = await beginNewProofOfWork(message.value);
answer(message, res);
})()
break;
case 'conf':
if (message.value.cpu !== undefined) {
currentCPU = message.value.cpu
}
if (message.value.prefix !== undefined) {
prefix = message.value.prefix
}
answer(message, { currentCPU, prefix });
break;
}
case 'cancel':
if (!computing.isFulfilled()) {
askedStop = true;
}
break;
})
function beginNewProofOfWork(stuff:any) {
askedStop = false;
computing = querablep((async () => {
/*****************
* PREPARE POW STUFF
****************/
let nonce = 0;
const maxDuration = stuff.maxDuration || 1000
const conf = stuff.conf;
const block = stuff.block;
const nonceBeginning = stuff.nonceBeginning;
const nbZeros = stuff.zeros;
const pair = stuff.pair;
const forcedTime = stuff.forcedTime;
currentCPU = conf.cpu || ProverConstants.DEFAULT_CPU;
prefix = parseInt(conf.prefix || prefix)
if (prefix && prefix < ProverConstants.NONCE_RANGE) {
prefix *= 100 * ProverConstants.NONCE_RANGE
}
const highMark = stuff.highMark;
let sigFunc = null;
if (signatureFunc && lastSecret === pair.sec) {
sigFunc = signatureFunc;
}
else {
lastSecret = pair.sec;
sigFunc = (msg:string) => KeyGen(pair.pub, pair.sec).signSync(msg)
case 'conf':
if (message.value.cpu !== undefined) {
currentCPU = message.value.cpu
}
if (message.value.prefix !== undefined) {
prefix = message.value.prefix
}
answer(message, { currentCPU, prefix });
break;
}
signatureFunc = sigFunc;
let pow = "", sig = "", raw = "";
/*****************
* GO!
****************/
})
function beginNewProofOfWork(stuff:any) {
askedStop = false;
computing = querablep((async () => {
let pausePeriod = 1;
let testsCount = 0;
let found = false;
let turn = 0;
const profiler = new ProcessCpuProfiler(100)
let cpuUsage = profiler.cpuUsageOverLastMilliseconds(1)
// We limit the number of tests according to CPU usage
let testsPerRound = stuff.initialTestsPerRound || 1
let turnDuration = 20 // We initially goes quickly to the max speed = 50 reevaluations per second (1000 / 20)
/*****************
* PREPARE POW STUFF
****************/
while (!found && !askedStop) {
let nonce = 0;
const maxDuration = stuff.maxDuration || 1000
const conf = stuff.conf;
const block = stuff.block;
const nonceBeginning = stuff.nonceBeginning;
const nbZeros = stuff.zeros;
const pair = stuff.pair;
const forcedTime = stuff.forcedTime;
currentCPU = conf.cpu || ProverConstants.DEFAULT_CPU;
prefix = parseInt(conf.prefix || prefix)
if (prefix && prefix < ProverConstants.NONCE_RANGE) {
prefix *= 100 * ProverConstants.NONCE_RANGE
}
const highMark = stuff.highMark;
let sigFunc = null;
if (signatureFunc && lastSecret === pair.sec) {
sigFunc = signatureFunc;
}
else {
lastSecret = pair.sec;
sigFunc = (msg:string) => KeyGen(pair.pub, pair.sec).signSync(msg)
}
signatureFunc = sigFunc;
let pow = "", sig = "", raw = "";
/*****************
* A TURN ~ 100ms
* GO!
****************/
await Promise.race([
let pausePeriod = 1;
let testsCount = 0;
let found = false;
let turn = 0;
const profiler = new ProcessCpuProfiler(100)
let cpuUsage = profiler.cpuUsageOverLastMilliseconds(1)
// We limit the number of tests according to CPU usage
let testsPerRound = stuff.initialTestsPerRound || 1
let turnDuration = 20 // We initially goes quickly to the max speed = 50 reevaluations per second (1000 / 20)
// I. Stop the turn if it exceeds `turnDuration` ms
countDown(turnDuration),