diff --git a/node/src/endpoint_gossip/handler.rs b/node/src/endpoint_gossip/handler.rs index 2491298ca85b374fd04247add61db155bf1dfe9e..bd995ae21602418e9e2a219450e424470e125951 100644 --- a/node/src/endpoint_gossip/handler.rs +++ b/node/src/endpoint_gossip/handler.rs @@ -3,16 +3,16 @@ use crate::endpoint_gossip::{ PROPAGATE_TIMEOUT, }; use codec::{Decode, Encode}; -use futures::{stream, FutureExt, Stream, StreamExt}; +use futures::{future, stream, FutureExt, Stream, StreamExt}; use log::debug; use sc_network::{ service::traits::{NotificationEvent, ValidationResult}, utils::interval, NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId, }; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; use sp_api::__private::BlockT; -use std::{collections::HashMap, marker::PhantomData, pin::Pin}; +use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin}; pub fn build< B: BlockT + 'static, @@ -34,10 +34,7 @@ pub fn build< .fuse(), network, peers: HashMap::new(), - command_rx: command_rx.unwrap_or_else(|| { - let (_tx, rx) = tracing_unbounded("mpsc_duniter_peering_rpc_command", 1_000); - rx - }), + command_rx: CommandHandler(command_rx), self_peering: Peering { endpoints }, events_reporter: DuniterEventsReporter { sink: rpc_sink, @@ -46,6 +43,21 @@ pub fn build< } } +// Structure to avoid borrowing issues with the command receiver. +struct CommandHandler(Option<TracingUnboundedReceiver<DuniterPeeringCommand>>); +impl CommandHandler { + /// Wait for the next command to be received. + pub fn get_next_command( + &mut self, + ) -> Pin<Box<dyn Future<Output = Option<DuniterPeeringCommand>> + Send + '_>> { + match &mut self.0 { + Some(tx) => Box::pin(tx.next()), + // Cannot receive any command + None => Box::pin(future::pending()), + } + } +} + #[allow(dead_code)] #[derive(Debug, Clone)] pub enum DuniterPeeringEvent { @@ -98,7 +110,7 @@ pub struct GossipsHandler< /// Internal sink to report events. events_reporter: DuniterEventsReporter, /// Receiver for external commands (tests/RPC methods). - command_rx: TracingUnboundedReceiver<DuniterPeeringCommand>, + command_rx: CommandHandler, /// Handle that is used to communicate with `sc_network::Notifications`. notification_service: Box<dyn NotificationService>, } @@ -123,23 +135,21 @@ where _ = self.propagate_timeout.next() => { for (peer, peer_data) in self.peers.iter_mut() { if !peer_data.sent_peering { + debug!(target: "duniter-libp2p", "[{}] sending self peering to {}", self.network.local_peer_id(), peer); match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await { Ok(_) => { peer_data.sent_peering = true; + debug!(target: "duniter-libp2p", "[{}] self peering sent to {}", self.network.local_peer_id(), peer); self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone())); } Err(e) => { + debug!(target: "duniter-libp2p", "[{}] failed to send self peering to {}: {}", self.network.local_peer_id(), peer, e); self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string())); } } } } }, - command = self.command_rx.next().fuse() => { - if let Some(command) = command { - self.handle_command(command).await - } - }, event = self.notification_service.next_event().fuse() => { if let Some(event) = event { self.handle_notification_event(event) @@ -147,7 +157,12 @@ where // `Notifications` has seemingly closed. Closing as well. return } - } + }, + command = self.command_rx.get_next_command().fuse() => { + if let Some(command) = command { + self.handle_command(command).await + } + }, } } } @@ -160,12 +175,14 @@ where result_tx, .. } => { + debug!(target: "duniter-libp2p", "[{}] validating stream from {}", self.network.local_peer_id(), peer); // only accept peers whose role can be determined let result = self .network .peer_role(peer, handshake) .map_or(ValidationResult::Reject, |_| ValidationResult::Accept); let duniter_validation = DuniterStreamValidationResult::from(result); + debug!(target: "duniter-libp2p", "[{}] stream validation result for {}: {:?}", self.network.local_peer_id(), peer, duniter_validation); self.events_reporter .report_event(DuniterPeeringEvent::StreamValidation( peer, @@ -189,23 +206,28 @@ where }, ); debug_assert!(_was_in.is_none()); + debug!(target: "duniter-libp2p", "[{}] stream opened with {peer}", self.network.local_peer_id()); self.events_reporter .report_event(DuniterPeeringEvent::StreamOpened(peer, role)); } NotificationEvent::NotificationStreamClosed { peer } => { let _peer = self.peers.remove(&peer); debug_assert!(_peer.is_some()); + debug!(target: "duniter-libp2p", "[{}] stream closed with {peer}", self.network.local_peer_id()); self.events_reporter .report_event(DuniterPeeringEvent::StreamClosed(peer)); } NotificationEvent::NotificationReceived { peer, notification } => { + debug!(target: "duniter-libp2p", "[{}] received gossip from {}", self.network.local_peer_id(), peer); if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) { self.events_reporter .report_event(DuniterPeeringEvent::GossipReceived(peer, true)); + debug!(target: "duniter-libp2p", "[{}] received gossip from {}: {:?}", self.network.local_peer_id(), peer, peering); self.on_peering(peer, peering); } else { self.events_reporter .report_event(DuniterPeeringEvent::GossipReceived(peer, false)); + debug!(target: "duniter-libp2p", "[{}] received gossip from {} but couldn't decode it", self.network.local_peer_id(), peer); self.network.report_peer(peer, rep::BAD_PEERING); } } diff --git a/node/src/endpoint_gossip/tests.rs b/node/src/endpoint_gossip/tests.rs index 454c0bc6bf1ac5629e91d92bee9a21f14a7bd396..fd77d4f41131d9e4fc7eb6b72f0e35adaa8d2ee0 100644 --- a/node/src/endpoint_gossip/tests.rs +++ b/node/src/endpoint_gossip/tests.rs @@ -101,7 +101,7 @@ fn watch_events_and_wait_for_all_peerings( while let Some(event) = stream.next().await { debug_event(event.clone(), local_peer_id); if let DuniterPeeringEvent::GoodPeering(peer, _) = event { - debug!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer); + warn!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer); identified += 1; if identified == (total_peers - 1) { // all peers identified @@ -118,34 +118,34 @@ fn watch_events_and_wait_for_all_peerings( fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) { match event { DuniterPeeringEvent::StreamOpened(peer, role) => { - debug!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role)); + warn!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role)); } DuniterPeeringEvent::StreamValidation(peer, result) => { - debug!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result); + warn!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result); } DuniterPeeringEvent::StreamClosed(peer) => { - debug!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id); + warn!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id); } DuniterPeeringEvent::GossipReceived(peer, success) => { if success { - debug!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id); + warn!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id); } else { - debug!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id); + warn!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id); } } DuniterPeeringEvent::GoodPeering(peer, _) => { - debug!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer); + warn!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer); } DuniterPeeringEvent::AlreadyReceivedPeering(peer) => { - debug!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer); + warn!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer); panic!("Received peering from the same peer twice"); } DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => { - debug!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e); + warn!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e); panic!("Failed to propagate self peering"); } DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => { - debug!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer); + warn!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer); } } }