Skip to content
Snippets Groups Projects

Elois/prevent module recv own event

Merged Éloïs requested to merge elois/prevent-module-recv-own-event into dev
8 files
+ 32
31
Compare changes
  • Side-by-side
  • Inline

Files

+ 17
28
@@ -40,7 +40,6 @@ enum DursMsgReceiver {
/// Start broadcasting thread
fn start_broadcasting_thread(
start_time: SystemTime,
run_duration_in_secs: u64,
receiver: &mpsc::Receiver<RouterThreadMessage<DursMsg>>,
_external_followers: &[mpsc::Sender<DursMsg>],
) {
@@ -178,32 +177,29 @@ fn start_broadcasting_thread(
}
RouterThreadMessage::ModuleMessage(msg) => match msg {
DursMsg::Stop => break,
DursMsg::Event { event_type, .. } => {
DursMsg::Event {
event_from,
event_type,
..
} => {
// the node to be started less than MAX_REGISTRATION_DELAY seconds ago,
// keep the message in memory to be able to send it back to modules not yet plugged
store_msg_in_pool(
start_time,
run_duration_in_secs,
msg.clone(),
&mut pool_msgs,
);
store_msg_in_pool(start_time, msg.clone(), &mut pool_msgs);
// Get list of receivers
let receivers = events_subscriptions
.get(&event_type)
.unwrap_or(&Vec::with_capacity(0))
.to_vec();
.iter()
.filter(|module_static_name| **module_static_name != event_from)
.cloned()
.collect::<Vec<ModuleStaticName>>();
// Send msg to receivers
send_msg_to_several_receivers(msg, &receivers, &modules_senders)
}
DursMsg::Request { req_to: role, .. } => {
// If the node to be started less than MAX_REGISTRATION_DELAY seconds ago,
// keep the message in memory to be able to send it back to modules not yet plugged
store_msg_in_pool(
start_time,
run_duration_in_secs,
msg.clone(),
&mut pool_msgs,
);
store_msg_in_pool(start_time, msg.clone(), &mut pool_msgs);
// Get list of receivers
let receivers =
roles.get(&role).unwrap_or(&Vec::with_capacity(0)).to_vec();
@@ -288,16 +284,14 @@ fn send_msg_to_several_receivers(
/// keep the message in memory to be able to send it back to modules not yet plugged
fn store_msg_in_pool(
start_time: SystemTime,
run_duration_in_secs: u64,
msg: DursMsg,
pool_msgs: &mut HashMap<DursMsgReceiver, Vec<DursMsg>>,
) {
if run_duration_in_secs > 0
&& SystemTime::now()
.duration_since(start_time)
.expect("Duration error !")
.as_secs()
< *MAX_REGISTRATION_DELAY
if SystemTime::now()
.duration_since(start_time)
.expect("Duration error !")
.as_secs()
< *MAX_REGISTRATION_DELAY
{
let msg_recv = match msg {
DursMsg::Event { event_type, .. } => Some(DursMsgReceiver::Event(event_type)),
@@ -339,12 +333,7 @@ pub fn start_router(
// Create broadcasting thread
thread::spawn(move || {
start_broadcasting_thread(
start_time,
run_duration_in_secs,
&broadcasting_receiver,
&external_followers,
);
start_broadcasting_thread(start_time, &broadcasting_receiver, &external_followers);
});
// Create conf thread channel
Loading