Skip to content
Snippets Groups Projects
Commit d7988cc2 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

Merge branch '1.6_fix_1234' into '1.6_fixes'

1.6 fix 1234

See merge request !1228
parents 6f01a449 155c98a7
Branches
Tags
4 merge requests!12331.6,!12291.6 into DockerBuild,!12281.6 fix 1234,!12251.6 fixes
import {Querable} from "./permanentProver"
const querablep = require('querablep')
/*********
*
* PoW worker
* ----------
*
* Its model is super simple: we ask him to find a proof, and we can wait for it.
* Eventually, we can tell him to cancel his proof, which makes it answer `null` as proof value.
*
* The worker also provides two properties:
*
* - `worker.online`: a promise which is resolved when the worker gets « online » for the first time
* - `worker.exit`: a promise which is resolved when the worker exits (which occurs when the worker is being closed or killed)
*
********/
export class PowWorker {
private onlinePromise:Promise<void>
private onlineResolver:()=>void
private exitPromise:Promise<void>
private exitResolver:()=>void
private proofPromise:Querable<{ message: { answer:any }}|null>
private proofResolver:(proof:{ message: { answer:any }}|null)=>void
private messageHandler:((worker:any, msg:any)=>void)
constructor(
private nodejsWorker:any,
private onPowMessage:(message:any)=>void,
private onlineHandler:()=>void,
private exitHandler:(code:any, signal:any)=>void) {
// Handle "online" promise
this.onlinePromise = new Promise(res => this.onlineResolver = res)
nodejsWorker.on('online', () => {
this.onlineHandler()
this.onlineResolver()
})
// Handle "exit" promise
this.exitPromise = new Promise(res => this.exitResolver = res)
nodejsWorker.on('exit', (code:any, signal:any) => {
this.exitHandler(code, signal)
this.exitResolver()
})
nodejsWorker.on('message', (message:any) => {
if (message) {
this.onPowMessage(message)
}
if (this.proofPromise && message.uuid && !this.proofPromise.isResolved() && this.proofResolver) {
const result:{ message: { answer:any }}|null = message ? { message } : null
this.proofResolver(result)
}
})
}
get online() {
return this.onlinePromise
}
get exited() {
return this.exitPromise
}
get pid() {
return this.nodejsWorker.process.pid
}
askProof(commandMessage:{ uuid:string, command:string, value:any }) {
this.proofPromise = querablep(new Promise<{ message: { answer:any }}|null>(res => this.proofResolver = res))
this.nodejsWorker.send(commandMessage)
return this.proofPromise
}
sendConf(confMessage:{ command:string, value:any }) {
this.nodejsWorker.send(confMessage)
}
sendCancel() {
this.nodejsWorker.send({
command: 'cancel'
})
}
kill() {
this.nodejsWorker.kill()
}
}
\ No newline at end of file
...@@ -33,9 +33,7 @@ export class PowEngine { ...@@ -33,9 +33,7 @@ export class PowEngine {
} }
async prove(stuff:any) { async prove(stuff:any) {
if (this.cluster.hasProofPending) {
await this.cluster.cancelWork() await this.cluster.cancelWork()
}
return await this.cluster.proveByWorkers(stuff) return await this.cluster.proveByWorkers(stuff)
} }
......
...@@ -9,7 +9,7 @@ import {Server} from "../../../../server" ...@@ -9,7 +9,7 @@ import {Server} from "../../../../server"
const querablep = require('querablep'); const querablep = require('querablep');
interface Querable<T> extends Promise<T> { export interface Querable<T> extends Promise<T> {
isFulfilled(): boolean isFulfilled(): boolean
isResolved(): boolean isResolved(): boolean
isRejected(): boolean isRejected(): boolean
......
import {ConfDTO} from "../../../lib/dto/ConfDTO" import {ConfDTO} from "../../../lib/dto/ConfDTO"
import {ProverConstants} from "./constants" import {ProverConstants} from "./constants"
import {createPowWorker} from "./proof"
import {PowWorker} from "./PowWorker"
const _ = require('underscore') const _ = require('underscore')
const nuuid = require('node-uuid'); const nuuid = require('node-uuid');
...@@ -9,6 +11,13 @@ const querablep = require('querablep') ...@@ -9,6 +11,13 @@ const querablep = require('querablep')
let clusterId = 0 let clusterId = 0
cluster.setMaxListeners(3) cluster.setMaxListeners(3)
export interface SlaveWorker {
worker:PowWorker,
index:number,
online:Promise<void>,
nonceBeginning:number
}
/** /**
* Cluster controller, handles the messages between the main program and the PoW cluster. * Cluster controller, handles the messages between the main program and the PoW cluster.
*/ */
...@@ -18,15 +27,14 @@ export class Master { ...@@ -18,15 +27,14 @@ export class Master {
clusterId:number clusterId:number
currentPromise:any|null = null currentPromise:any|null = null
slaves:any[] = [] slaves:SlaveWorker[] = []
slavesMap:any = {} slavesMap:{
[k:number]: SlaveWorker|null
} = {}
conf:any = {} conf:any = {}
logger:any logger:any
onInfoCallback:any onInfoCallback:any
workersOnline:Promise<any>[] workersOnline:Promise<any>[]
private exitHandler: (worker: any, code: any, signal: any) => void
private onlineHandler: (worker: any) => void
private messageHandler: (worker: any, msg: any) => void
constructor(private nbCores:number, logger:any) { constructor(private nbCores:number, logger:any) {
this.clusterId = clusterId++ this.clusterId = clusterId++
...@@ -34,53 +42,23 @@ export class Master { ...@@ -34,53 +42,23 @@ export class Master {
this.onInfoMessage = (message:any) => { this.onInfoMessage = (message:any) => {
this.logger.info(`${message.pow.pow} nonce = ${message.pow.block.nonce}`) this.logger.info(`${message.pow.pow} nonce = ${message.pow.block.nonce}`)
} }
this.exitHandler = (worker:any, code:any, signal:any) => {
this.logger.info(`worker ${worker.process.pid} died with code ${code} and signal ${signal}`)
}
this.onlineHandler = (worker:any) => {
// We just listen to the workers of this Master
if (this.slavesMap[worker.id]) {
this.logger.info(`[online] worker c#${this.clusterId}#w#${worker.id}`)
this.slavesMap[worker.id].online.extras.resolve()
worker.send({
command: 'conf',
value: this.conf
})
}
}
this.messageHandler = (worker:any, msg:any) => {
// Message for this cluster
if (this.slavesMap[worker.id]) {
this.onWorkerMessage(worker, msg)
}
}
} }
get nbWorkers() { get nbWorkers() {
return this.slaves.length return this.slaves.length
} }
get hasProofPending() {
return !!this.currentPromise
}
set onInfoMessage(callback:any) { set onInfoMessage(callback:any) {
this.onInfoCallback = callback this.onInfoCallback = callback
} }
onWorkerMessage(worker:any, message:any) { onWorkerMessage(workerIndex:number, message:any) {
// this.logger.info(`worker#${this.slavesMap[worker.id].index} sent message:${message}`) // this.logger.info(`worker#${this.slavesMap[worker.id].index} sent message:${message}`)
if (message.pow && message.pow.pow) { if (message && message.pow) {
this.onInfoCallback && this.onInfoCallback(message) this.onInfoCallback && this.onInfoCallback(message)
} }
if (this.currentPromise && message.uuid === this.currentPromise.extras.uuid && !this.currentPromise.isResolved() && message.answer) { if (this.currentPromise && message.uuid && !this.currentPromise.isResolved() && message.answer) {
this.logger.info(`ENGINE c#${this.clusterId}#${this.slavesMap[worker.id].index} HAS FOUND A PROOF #${message.answer.pow.pow}`) this.logger.info(`ENGINE c#${this.clusterId}#${workerIndex} HAS FOUND A PROOF #${message.answer.pow.pow}`)
this.currentPromise.extras.resolve(message.answer)
// Stop the slaves' current work
this.cancelWork()
} else if (message.canceled) { } else if (message.canceled) {
this.nbCancels++ this.nbCancels++
} }
...@@ -94,13 +72,26 @@ export class Master { ...@@ -94,13 +72,26 @@ export class Master {
initCluster() { initCluster() {
// Setup master // Setup master
cluster.setupMaster({ cluster.setupMaster({
exec: __filename exec: __filename,
execArgv: [] // Do not try to debug forks
}) })
this.slaves = Array.from({ length: this.nbCores }).map((value, index) => { this.slaves = Array.from({ length: this.nbCores }).map((value, index) => {
const worker = cluster.fork() const nodejsWorker = cluster.fork()
this.logger.info(`Creating worker c#${this.clusterId}#w#${worker.id}`) const worker = new PowWorker(nodejsWorker, message => {
this.slavesMap[worker.id] = { this.onWorkerMessage(index, message)
}, () => {
this.logger.info(`[online] worker c#${this.clusterId}#w#${index}`)
worker.sendConf({
command: 'conf',
value: this.conf
})
}, (code:any, signal:any) => {
this.logger.info(`worker ${worker.pid} died with code ${code} and signal ${signal}`)
})
this.logger.info(`Creating worker c#${this.clusterId}#w#${nodejsWorker.id}`)
const slave = {
// The Node.js worker // The Node.js worker
worker, worker,
...@@ -109,24 +100,16 @@ export class Master { ...@@ -109,24 +100,16 @@ export class Master {
index, index,
// Worker ready // Worker ready
online: (function onlinePromise() { online: worker.online,
let resolve
const p = querablep(new Promise(res => resolve = res))
p.extras = { resolve }
return p
})(),
// Each worker has his own chunk of possible nonces // Each worker has his own chunk of possible nonces
nonceBeginning: this.nbCores === 1 ? 0 : (index + 1) * ProverConstants.NONCE_RANGE nonceBeginning: this.nbCores === 1 ? 0 : (index + 1) * ProverConstants.NONCE_RANGE
} }
return this.slavesMap[worker.id] this.slavesMap[nodejsWorker.id] = slave
return slave
}) })
cluster.on('exit', this.exitHandler) this.workersOnline = this.slaves.map((s) => s.online)
cluster.on('online', this.onlineHandler)
cluster.on('message', this.messageHandler)
this.workersOnline = this.slaves.map((s:any) => s.online)
return Promise.all(this.workersOnline) return Promise.all(this.workersOnline)
} }
...@@ -135,7 +118,7 @@ export class Master { ...@@ -135,7 +118,7 @@ export class Master {
this.conf.cpu = conf.cpu || this.conf.cpu this.conf.cpu = conf.cpu || this.conf.cpu
this.conf.prefix = this.conf.prefix || conf.prefix this.conf.prefix = this.conf.prefix || conf.prefix
this.slaves.forEach(s => { this.slaves.forEach(s => {
s.worker.send({ s.worker.sendConf({
command: 'conf', command: 'conf',
value: this.conf value: this.conf
}) })
...@@ -143,41 +126,26 @@ export class Master { ...@@ -143,41 +126,26 @@ export class Master {
return Promise.resolve(_.clone(conf)) return Promise.resolve(_.clone(conf))
} }
cancelWork() { private cancelWorkersWork() {
this.logger.info(`Cancelling the work on PoW cluster of %s slaves`, this.slaves.length)
this.slaves.forEach(s => { this.slaves.forEach(s => {
s.worker.send({ s.worker.sendCancel()
command: 'cancel'
})
}) })
// Eventually force the end of current promise
if (this.currentPromise && !this.currentPromise.isFulfilled()) {
this.currentPromise.extras.resolve(null)
} }
async cancelWork() {
this.cancelWorkersWork()
const workEnded = this.currentPromise
// Current promise is done // Current promise is done
this.currentPromise = null this.currentPromise = null
return await workEnded
return Promise.resolve()
}
newPromise(uuid:string) {
let resolve
const p = querablep(new Promise(res => resolve = res))
p.extras = { resolve, uuid }
return p
} }
async shutDownWorkers() { async shutDownWorkers() {
if (this.workersOnline) { if (this.workersOnline) {
await Promise.all(this.workersOnline) await Promise.all(this.workersOnline)
await Promise.all(this.slaves.map(async (s:any) => { await Promise.all(this.slaves.map(async (s) => {
s.worker.kill() s.worker.kill()
})) }))
cluster.removeListener('exit', this.exitHandler)
cluster.removeListener('online', this.onlineHandler)
cluster.removeListener('message', this.messageHandler)
} }
this.slaves = [] this.slaves = []
} }
...@@ -191,9 +159,7 @@ export class Master { ...@@ -191,9 +159,7 @@ export class Master {
// Register the new proof uuid // Register the new proof uuid
const uuid = nuuid.v4() const uuid = nuuid.v4()
this.currentPromise = this.newPromise(uuid) this.currentPromise = querablep((async () => {
return (async () => {
await Promise.all(this.workersOnline) await Promise.all(this.workersOnline)
if (!this.currentPromise) { if (!this.currentPromise) {
...@@ -202,8 +168,8 @@ export class Master { ...@@ -202,8 +168,8 @@ export class Master {
} }
// Start the salves' job // Start the salves' job
this.slaves.forEach((s:any, index) => { const asks = this.slaves.map(async (s, index) => {
s.worker.send({ const proof = await s.worker.askProof({
uuid, uuid,
command: 'newPoW', command: 'newPoW',
value: { value: {
...@@ -222,10 +188,29 @@ export class Master { ...@@ -222,10 +188,29 @@ export class Master {
} }
} }
}) })
this.logger.info(`[done] worker c#${this.clusterId}#w#${index}`)
return {
workerID: index,
proof
}
}) })
return await this.currentPromise // Find a proof
})() const result = await Promise.race(asks)
this.cancelWorkersWork()
// Wait for all workers to have stopped looking for a proof
await Promise.all(asks)
if (!result.proof || !result.proof.message.answer) {
this.logger.info('No engine found the proof. It was probably cancelled.')
return null
} else {
this.logger.info(`ENGINE c#${this.clusterId}#${result.workerID} HAS FOUND A PROOF #${result.proof.message.answer.pow.pow}`)
return result.proof.message.answer
}
})())
return this.currentPromise
} }
static defaultLogger() { static defaultLogger() {
...@@ -250,5 +235,5 @@ if (cluster.isMaster) { ...@@ -250,5 +235,5 @@ if (cluster.isMaster) {
process.exit(0) process.exit(0)
}); });
require('./proof') createPowWorker()
} }
...@@ -11,6 +11,8 @@ import {ProcessCpuProfiler} from "../../../ProcessCpuProfiler" ...@@ -11,6 +11,8 @@ import {ProcessCpuProfiler} from "../../../ProcessCpuProfiler"
const moment = require('moment'); const moment = require('moment');
const querablep = require('querablep'); const querablep = require('querablep');
export function createPowWorker() {
let computing = querablep(Promise.resolve(null)); let computing = querablep(Promise.resolve(null));
let askedStop = false; let askedStop = false;
...@@ -185,7 +187,7 @@ function beginNewProofOfWork(stuff:any) { ...@@ -185,7 +187,7 @@ function beginNewProofOfWork(stuff:any) {
i++; i++;
testsCount++; testsCount++;
if (i % pausePeriod === 0) { if (i % pausePeriod === 0) {
await countDown(0); // Very low pause, just the time to process eventual end of the turn await countDown(1); // Very low pause, just the time to process eventual end of the turn
} }
} }
} }
...@@ -303,3 +305,4 @@ function pSend(stuff:any) { ...@@ -303,3 +305,4 @@ function pSend(stuff:any) {
} }
}); });
} }
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment