Skip to content
Snippets Groups Projects

Resolve "#297 has not been merged to master (CPU 100%)"

Merged Resolve "#297 has not been merged to master (CPU 100%)"
1 unresolved thread
Merged Cédric Moreau requested to merge 301-297-has-not-been-merged-to-master-cpu-100 into master
1 unresolved thread
3 files
+ 47
24
Compare changes
  • Side-by-side
  • Inline

Files

+ 36
14
@@ -3,16 +3,16 @@ use crate::endpoint_gossip::{
@@ -3,16 +3,16 @@ use crate::endpoint_gossip::{
PROPAGATE_TIMEOUT,
PROPAGATE_TIMEOUT,
};
};
use codec::{Decode, Encode};
use codec::{Decode, Encode};
use futures::{stream, FutureExt, Stream, StreamExt};
use futures::{future, stream, FutureExt, Stream, StreamExt};
use log::debug;
use log::debug;
use sc_network::{
use sc_network::{
service::traits::{NotificationEvent, ValidationResult},
service::traits::{NotificationEvent, ValidationResult},
utils::interval,
utils::interval,
NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId,
NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId,
};
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::__private::BlockT;
use sp_api::__private::BlockT;
use std::{collections::HashMap, marker::PhantomData, pin::Pin};
use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};
pub fn build<
pub fn build<
B: BlockT + 'static,
B: BlockT + 'static,
@@ -34,10 +34,7 @@ pub fn build<
@@ -34,10 +34,7 @@ pub fn build<
.fuse(),
.fuse(),
network,
network,
peers: HashMap::new(),
peers: HashMap::new(),
command_rx: command_rx.unwrap_or_else(|| {
command_rx: CommandHandler(command_rx),
let (_tx, rx) = tracing_unbounded("mpsc_duniter_peering_rpc_command", 1_000);
rx
}),
self_peering: Peering { endpoints },
self_peering: Peering { endpoints },
events_reporter: DuniterEventsReporter {
events_reporter: DuniterEventsReporter {
sink: rpc_sink,
sink: rpc_sink,
@@ -46,6 +43,21 @@ pub fn build<
@@ -46,6 +43,21 @@ pub fn build<
}
}
}
}
 
// Structure to avoid borrowing issues with the command receiver.
 
struct CommandHandler(Option<TracingUnboundedReceiver<DuniterPeeringCommand>>);
 
impl CommandHandler {
 
/// Wait for the next command to be received.
 
pub fn get_next_command(
 
&mut self,
 
) -> Pin<Box<dyn Future<Output = Option<DuniterPeeringCommand>> + Send + '_>> {
 
match &mut self.0 {
 
Some(tx) => Box::pin(tx.next()),
 
// Cannot receive any command
 
None => Box::pin(future::pending()),
 
}
 
}
 
}
 
#[allow(dead_code)]
#[allow(dead_code)]
#[derive(Debug, Clone)]
#[derive(Debug, Clone)]
pub enum DuniterPeeringEvent {
pub enum DuniterPeeringEvent {
@@ -98,7 +110,7 @@ pub struct GossipsHandler<
@@ -98,7 +110,7 @@ pub struct GossipsHandler<
/// Internal sink to report events.
/// Internal sink to report events.
events_reporter: DuniterEventsReporter,
events_reporter: DuniterEventsReporter,
/// Receiver for external commands (tests/RPC methods).
/// Receiver for external commands (tests/RPC methods).
command_rx: TracingUnboundedReceiver<DuniterPeeringCommand>,
command_rx: CommandHandler,
/// Handle that is used to communicate with `sc_network::Notifications`.
/// Handle that is used to communicate with `sc_network::Notifications`.
notification_service: Box<dyn NotificationService>,
notification_service: Box<dyn NotificationService>,
}
}
@@ -123,23 +135,21 @@ where
@@ -123,23 +135,21 @@ where
_ = self.propagate_timeout.next() => {
_ = self.propagate_timeout.next() => {
for (peer, peer_data) in self.peers.iter_mut() {
for (peer, peer_data) in self.peers.iter_mut() {
if !peer_data.sent_peering {
if !peer_data.sent_peering {
 
debug!(target: "duniter-libp2p", "[{}] sending self peering to {}", self.network.local_peer_id(), peer);
match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await {
match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await {
Ok(_) => {
Ok(_) => {
peer_data.sent_peering = true;
peer_data.sent_peering = true;
 
debug!(target: "duniter-libp2p", "[{}] self peering sent to {}", self.network.local_peer_id(), peer);
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone()));
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone()));
}
}
Err(e) => {
Err(e) => {
 
debug!(target: "duniter-libp2p", "[{}] failed to send self peering to {}: {}", self.network.local_peer_id(), peer, e);
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string()));
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string()));
}
}
}
}
}
}
}
}
},
},
command = self.command_rx.next().fuse() => {
if let Some(command) = command {
self.handle_command(command).await
}
},
event = self.notification_service.next_event().fuse() => {
event = self.notification_service.next_event().fuse() => {
if let Some(event) = event {
if let Some(event) = event {
self.handle_notification_event(event)
self.handle_notification_event(event)
@@ -147,7 +157,12 @@ where
@@ -147,7 +157,12 @@ where
// `Notifications` has seemingly closed. Closing as well.
// `Notifications` has seemingly closed. Closing as well.
return
return
}
}
}
},
 
command = self.command_rx.get_next_command().fuse() => {
 
if let Some(command) = command {
 
self.handle_command(command).await
 
}
 
},
}
}
}
}
}
}
@@ -160,12 +175,14 @@ where
@@ -160,12 +175,14 @@ where
result_tx,
result_tx,
..
..
} => {
} => {
 
debug!(target: "duniter-libp2p", "[{}] validating stream from {}", self.network.local_peer_id(), peer);
// only accept peers whose role can be determined
// only accept peers whose role can be determined
let result = self
let result = self
.network
.network
.peer_role(peer, handshake)
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let duniter_validation = DuniterStreamValidationResult::from(result);
let duniter_validation = DuniterStreamValidationResult::from(result);
 
debug!(target: "duniter-libp2p", "[{}] stream validation result for {}: {:?}", self.network.local_peer_id(), peer, duniter_validation);
self.events_reporter
self.events_reporter
.report_event(DuniterPeeringEvent::StreamValidation(
.report_event(DuniterPeeringEvent::StreamValidation(
peer,
peer,
@@ -189,23 +206,28 @@ where
@@ -189,23 +206,28 @@ where
},
},
);
);
debug_assert!(_was_in.is_none());
debug_assert!(_was_in.is_none());
 
debug!(target: "duniter-libp2p", "[{}] stream opened with {peer}", self.network.local_peer_id());
self.events_reporter
self.events_reporter
.report_event(DuniterPeeringEvent::StreamOpened(peer, role));
.report_event(DuniterPeeringEvent::StreamOpened(peer, role));
}
}
NotificationEvent::NotificationStreamClosed { peer } => {
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
debug_assert!(_peer.is_some());
 
debug!(target: "duniter-libp2p", "[{}] stream closed with {peer}", self.network.local_peer_id());
self.events_reporter
self.events_reporter
.report_event(DuniterPeeringEvent::StreamClosed(peer));
.report_event(DuniterPeeringEvent::StreamClosed(peer));
}
}
NotificationEvent::NotificationReceived { peer, notification } => {
NotificationEvent::NotificationReceived { peer, notification } => {
 
debug!(target: "duniter-libp2p", "[{}] received gossip from {}", self.network.local_peer_id(), peer);
if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) {
if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) {
self.events_reporter
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, true));
.report_event(DuniterPeeringEvent::GossipReceived(peer, true));
 
debug!(target: "duniter-libp2p", "[{}] received gossip from {}: {:?}", self.network.local_peer_id(), peer, peering);
self.on_peering(peer, peering);
self.on_peering(peer, peering);
} else {
} else {
self.events_reporter
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, false));
.report_event(DuniterPeeringEvent::GossipReceived(peer, false));
 
debug!(target: "duniter-libp2p", "[{}] received gossip from {} but couldn't decode it", self.network.local_peer_id(), peer);
self.network.report_peer(peer, rep::BAD_PEERING);
self.network.report_peer(peer, rep::BAD_PEERING);
}
}
}
}
Loading