Skip to content
Snippets Groups Projects
Commit 11c68005 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

fix(#297): duniter-endpoints task was using 100% of one CPU core

parent 31ef3447
No related branches found
No related tags found
No related merge requests found
Pipeline #40268 failed
...@@ -3,16 +3,16 @@ use crate::endpoint_gossip::{ ...@@ -3,16 +3,16 @@ use crate::endpoint_gossip::{
PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT,
}; };
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use futures::{stream, FutureExt, Stream, StreamExt}; use futures::{future, stream, FutureExt, Stream, StreamExt};
use log::debug; use log::debug;
use sc_network::{ use sc_network::{
service::traits::{NotificationEvent, ValidationResult}, service::traits::{NotificationEvent, ValidationResult},
utils::interval, utils::interval,
NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId, NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId,
}; };
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::__private::BlockT; use sp_api::__private::BlockT;
use std::{collections::HashMap, marker::PhantomData, pin::Pin}; use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};
pub fn build< pub fn build<
B: BlockT + 'static, B: BlockT + 'static,
...@@ -34,10 +34,7 @@ pub fn build< ...@@ -34,10 +34,7 @@ pub fn build<
.fuse(), .fuse(),
network, network,
peers: HashMap::new(), peers: HashMap::new(),
command_rx: command_rx.unwrap_or_else(|| { command_rx: CommandHandler(command_rx),
let (_tx, rx) = tracing_unbounded("mpsc_duniter_peering_rpc_command", 1_000);
rx
}),
self_peering: Peering { endpoints }, self_peering: Peering { endpoints },
events_reporter: DuniterEventsReporter { events_reporter: DuniterEventsReporter {
sink: rpc_sink, sink: rpc_sink,
...@@ -46,6 +43,21 @@ pub fn build< ...@@ -46,6 +43,21 @@ pub fn build<
} }
} }
// Structure to avoid borrowing issues with the command receiver.
struct CommandHandler(Option<TracingUnboundedReceiver<DuniterPeeringCommand>>);
impl CommandHandler {
/// Wait for the next command to be received.
pub fn get_next_command(
&mut self,
) -> Pin<Box<dyn Future<Output = Option<DuniterPeeringCommand>> + Send + '_>> {
match &mut self.0 {
Some(tx) => Box::pin(tx.next()),
// Cannot receive any command
None => Box::pin(future::pending()),
}
}
}
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum DuniterPeeringEvent { pub enum DuniterPeeringEvent {
...@@ -98,7 +110,7 @@ pub struct GossipsHandler< ...@@ -98,7 +110,7 @@ pub struct GossipsHandler<
/// Internal sink to report events. /// Internal sink to report events.
events_reporter: DuniterEventsReporter, events_reporter: DuniterEventsReporter,
/// Receiver for external commands (tests/RPC methods). /// Receiver for external commands (tests/RPC methods).
command_rx: TracingUnboundedReceiver<DuniterPeeringCommand>, command_rx: CommandHandler,
/// Handle that is used to communicate with `sc_network::Notifications`. /// Handle that is used to communicate with `sc_network::Notifications`.
notification_service: Box<dyn NotificationService>, notification_service: Box<dyn NotificationService>,
} }
...@@ -123,23 +135,21 @@ where ...@@ -123,23 +135,21 @@ where
_ = self.propagate_timeout.next() => { _ = self.propagate_timeout.next() => {
for (peer, peer_data) in self.peers.iter_mut() { for (peer, peer_data) in self.peers.iter_mut() {
if !peer_data.sent_peering { if !peer_data.sent_peering {
debug!(target: "duniter-libp2p", "[{}] sending self peering to {}", self.network.local_peer_id(), peer);
match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await { match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await {
Ok(_) => { Ok(_) => {
peer_data.sent_peering = true; peer_data.sent_peering = true;
debug!(target: "duniter-libp2p", "[{}] self peering sent to {}", self.network.local_peer_id(), peer);
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone())); self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone()));
} }
Err(e) => { Err(e) => {
debug!(target: "duniter-libp2p", "[{}] failed to send self peering to {}: {}", self.network.local_peer_id(), peer, e);
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string())); self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string()));
} }
} }
} }
} }
}, },
command = self.command_rx.next().fuse() => {
if let Some(command) = command {
self.handle_command(command).await
}
},
event = self.notification_service.next_event().fuse() => { event = self.notification_service.next_event().fuse() => {
if let Some(event) = event { if let Some(event) = event {
self.handle_notification_event(event) self.handle_notification_event(event)
...@@ -147,7 +157,12 @@ where ...@@ -147,7 +157,12 @@ where
// `Notifications` has seemingly closed. Closing as well. // `Notifications` has seemingly closed. Closing as well.
return return
} }
} },
command = self.command_rx.get_next_command().fuse() => {
if let Some(command) = command {
self.handle_command(command).await
}
},
} }
} }
} }
...@@ -160,12 +175,14 @@ where ...@@ -160,12 +175,14 @@ where
result_tx, result_tx,
.. ..
} => { } => {
debug!(target: "duniter-libp2p", "[{}] validating stream from {}", self.network.local_peer_id(), peer);
// only accept peers whose role can be determined // only accept peers whose role can be determined
let result = self let result = self
.network .network
.peer_role(peer, handshake) .peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept); .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let duniter_validation = DuniterStreamValidationResult::from(result); let duniter_validation = DuniterStreamValidationResult::from(result);
debug!(target: "duniter-libp2p", "[{}] stream validation result for {}: {:?}", self.network.local_peer_id(), peer, duniter_validation);
self.events_reporter self.events_reporter
.report_event(DuniterPeeringEvent::StreamValidation( .report_event(DuniterPeeringEvent::StreamValidation(
peer, peer,
...@@ -189,23 +206,28 @@ where ...@@ -189,23 +206,28 @@ where
}, },
); );
debug_assert!(_was_in.is_none()); debug_assert!(_was_in.is_none());
debug!(target: "duniter-libp2p", "[{}] stream opened with {peer}", self.network.local_peer_id());
self.events_reporter self.events_reporter
.report_event(DuniterPeeringEvent::StreamOpened(peer, role)); .report_event(DuniterPeeringEvent::StreamOpened(peer, role));
} }
NotificationEvent::NotificationStreamClosed { peer } => { NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer); let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some()); debug_assert!(_peer.is_some());
debug!(target: "duniter-libp2p", "[{}] stream closed with {peer}", self.network.local_peer_id());
self.events_reporter self.events_reporter
.report_event(DuniterPeeringEvent::StreamClosed(peer)); .report_event(DuniterPeeringEvent::StreamClosed(peer));
} }
NotificationEvent::NotificationReceived { peer, notification } => { NotificationEvent::NotificationReceived { peer, notification } => {
debug!(target: "duniter-libp2p", "[{}] received gossip from {}", self.network.local_peer_id(), peer);
if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) { if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) {
self.events_reporter self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, true)); .report_event(DuniterPeeringEvent::GossipReceived(peer, true));
debug!(target: "duniter-libp2p", "[{}] received gossip from {}: {:?}", self.network.local_peer_id(), peer, peering);
self.on_peering(peer, peering); self.on_peering(peer, peering);
} else { } else {
self.events_reporter self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, false)); .report_event(DuniterPeeringEvent::GossipReceived(peer, false));
debug!(target: "duniter-libp2p", "[{}] received gossip from {} but couldn't decode it", self.network.local_peer_id(), peer);
self.network.report_peer(peer, rep::BAD_PEERING); self.network.report_peer(peer, rep::BAD_PEERING);
} }
} }
......
...@@ -101,7 +101,7 @@ fn watch_events_and_wait_for_all_peerings( ...@@ -101,7 +101,7 @@ fn watch_events_and_wait_for_all_peerings(
while let Some(event) = stream.next().await { while let Some(event) = stream.next().await {
debug_event(event.clone(), local_peer_id); debug_event(event.clone(), local_peer_id);
if let DuniterPeeringEvent::GoodPeering(peer, _) = event { if let DuniterPeeringEvent::GoodPeering(peer, _) = event {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer); warn!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer);
identified += 1; identified += 1;
if identified == (total_peers - 1) { if identified == (total_peers - 1) {
// all peers identified // all peers identified
...@@ -118,34 +118,34 @@ fn watch_events_and_wait_for_all_peerings( ...@@ -118,34 +118,34 @@ fn watch_events_and_wait_for_all_peerings(
fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) { fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) {
match event { match event {
DuniterPeeringEvent::StreamOpened(peer, role) => { DuniterPeeringEvent::StreamOpened(peer, role) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role)); warn!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role));
} }
DuniterPeeringEvent::StreamValidation(peer, result) => { DuniterPeeringEvent::StreamValidation(peer, result) => {
debug!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result); warn!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result);
} }
DuniterPeeringEvent::StreamClosed(peer) => { DuniterPeeringEvent::StreamClosed(peer) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id); warn!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id);
} }
DuniterPeeringEvent::GossipReceived(peer, success) => { DuniterPeeringEvent::GossipReceived(peer, success) => {
if success { if success {
debug!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id); warn!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id);
} else { } else {
debug!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id); warn!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id);
} }
} }
DuniterPeeringEvent::GoodPeering(peer, _) => { DuniterPeeringEvent::GoodPeering(peer, _) => {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer); warn!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer);
} }
DuniterPeeringEvent::AlreadyReceivedPeering(peer) => { DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
debug!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer); warn!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer);
panic!("Received peering from the same peer twice"); panic!("Received peering from the same peer twice");
} }
DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => { DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => {
debug!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e); warn!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e);
panic!("Failed to propagate self peering"); panic!("Failed to propagate self peering");
} }
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => { DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => {
debug!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer); warn!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer);
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment