Skip to content
Snippets Groups Projects
Commit 14b553eb authored by Éloïs's avatar Éloïs
Browse files

[ref] improve modules threads management

parent 2f12f18c
No related branches found
No related tags found
No related merge requests found
......@@ -48,15 +48,13 @@ use simplelog::*;
//use std::fmt::{Debug, Formatter};
use crate::cli::keys::*;
use crate::cli::*;
use std::collections::HashMap;
use std::fs;
use std::fs::{File, OpenOptions};
use std::sync::mpsc;
use std::thread;
use structopt::clap::{App, ArgMatches};
use structopt::StructOpt;
use threadpool::ThreadPool;
/// Number of thread in plugins ThreadPool
pub static THREAD_POOL_SIZE: &'static usize = &4;
#[macro_export]
macro_rules! durs_core_server {
......@@ -143,7 +141,6 @@ pub struct TupleApp<'b, 'a: 'b>(&'b App<'a, 'b>);
}
}*/
#[derive(Clone)]
/// Duniter Core Datas
pub struct DuniterCore<'a, 'b: 'a, DC: DuniterConf> {
/// Command line configuration
......@@ -162,12 +159,12 @@ pub struct DuniterCore<'a, 'b: 'a, DC: DuniterConf> {
pub run_duration_in_secs: u64,
/// Sender channel of router thread
pub router_sender: Option<mpsc::Sender<RouterThreadMessage<DursMsg>>>,
/// Count the number of plugged modules
pub modules_count: usize,
/// Count the number of plugged network modules
pub network_modules_count: usize,
/// ThreadPool that execute plugged modules
pub thread_pool: ThreadPool,
/// Modules names
pub modules_names: Vec<ModuleStaticName>,
/// Threads handlers that execute plugged modules
pub threads: HashMap<ModuleStaticName, thread::JoinHandle<()>>,
}
impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
......@@ -195,9 +192,9 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
keypairs: None,
run_duration_in_secs,
router_sender: None,
modules_count: 1, // Count blockchain module
network_modules_count: 0,
thread_pool: ThreadPool::new(*THREAD_POOL_SIZE),
modules_names: Vec::new(),
threads: HashMap::new(),
}
}
/// Inject cli subcommand
......@@ -428,7 +425,7 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
}
}
/// Start core (=blockchain module)
pub fn start_core(&self) {
pub fn start_core(&mut self) {
if self.network_modules_count == 0 {
panic!("You must plug at least one network layer !");
}
......@@ -447,7 +444,9 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
// Send expected modules count to router thread
router_sender
.send(RouterThreadMessage::ModulesCount(self.modules_count))
.send(RouterThreadMessage::ModulesCount(
self.modules_names.len() + 1,
))
.expect("Fatal error: fail to send expected modules count to router thread !");
// Send blockchain module registration to router thread
......@@ -471,8 +470,25 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
);
info!("Success to load Blockchain module.");
// Start blockchain module in main thread
blockchain_module.start_blockchain(&blockchain_receiver);
// Start blockchain module in thread
let thread_builder = thread::Builder::new().name(BlockchainModule::name().0.into());
let blockchain_thread_handler = thread_builder
.spawn(move || blockchain_module.start_blockchain(&blockchain_receiver))
.expect("Fatal error: fail to spawn module main thread !");
// Wait until all modules threads are finished
for module_static_name in &self.modules_names {
if let Some(module_thread_handler) = self.threads.remove(module_static_name) {
if let Err(err) = module_thread_handler.join() {
error!("'{}' module thread panic : {:?}", module_static_name.0, err);
}
}
}
// Wait until blockchain main thread are finished
if let Err(err) = blockchain_thread_handler.join() {
error!("'blockchain' thread panic : {:?}", err);
}
}
}
/// Plug a network module
......@@ -496,7 +512,11 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
.cloned();
let keypairs = self.keypairs;
let sync_params = network_sync.clone();
self.thread_pool.execute(move || {
let thread_builder = thread::Builder::new().name(NM::name().0.into());
self.threads.insert(
NM::name(),
thread_builder
.spawn(move || {
// Load module conf and keys
let (module_conf, required_keys) = get_module_conf_and_keys::<NM>(
module_conf_json,
......@@ -515,8 +535,10 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
NM::name().to_string()
)
});
});
self.modules_count += 1;
})
.expect("Fatail error: fail to spawn network module main thread !"),
);
self.modules_names.push(NM::name());
info!("Success to load {} module.", NM::name().to_string());
} else {
self.plug_::<NM>(true);
......@@ -550,7 +572,11 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
.get(&M::name().to_string().as_str())
.cloned();
let keypairs = self.keypairs;
self.thread_pool.execute(move || {
let thread_builder = thread::Builder::new().name(M::name().0.into());
self.threads.insert(
M::name(),
thread_builder
.spawn(move || {
// Load module conf and keys
let (module_conf, required_keys) = get_module_conf_and_keys::<M>(
module_conf_json,
......@@ -569,8 +595,10 @@ impl<'a, 'b: 'a> DuniterCore<'b, 'a, DuRsConf> {
M::name().to_string()
)
});
});
self.modules_count += 1;
})
.expect("Fatail error: fail to spawn module main thread !"),
);
self.modules_names.push(M::name());
info!("Success to load {} module.", M::name().to_string());
} else if let Some(UserCommand::UnknowCommand(ref subcommand)) = self.user_command {
if M::have_subcommand() && *subcommand == M::name().to_string() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment