From febd35694004846cea603bba8d27d4ea5b0d23cf Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Mon, 17 Feb 2025 20:45:10 +0100 Subject: [PATCH] review: iodiomatic usage of stream --- node/src/endpoint_gossip/rpc/state.rs | 30 ++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/node/src/endpoint_gossip/rpc/state.rs b/node/src/endpoint_gossip/rpc/state.rs index 75d8a252..b9d69c1f 100644 --- a/node/src/endpoint_gossip/rpc/state.rs +++ b/node/src/endpoint_gossip/rpc/state.rs @@ -52,23 +52,25 @@ impl DuniterPeeringsState { /// Creates a channel for binding to the network events. pub fn listen(&self) -> TracingUnboundedSender<DuniterPeeringEvent> { - let (sink, mut stream) = tracing_unbounded("mpsc_duniter_peering_rpc_stream", 1_000); + let (sink, stream) = tracing_unbounded("mpsc_duniter_peering_rpc_stream", 1_000); let state = self.clone(); tokio::spawn(async move { - while let Some(event) = stream.next().await { - match event { - DuniterPeeringEvent::GoodPeering(who, peering) => { - state.insert(PeeringWithId { - peer_id: who.to_base58(), - endpoints: peering.endpoints, - }); + stream + .for_each(|event| async { + match event { + DuniterPeeringEvent::GoodPeering(who, peering) => { + state.insert(PeeringWithId { + peer_id: who.to_base58(), + endpoints: peering.endpoints, + }); + } + DuniterPeeringEvent::StreamClosed(who) => { + state.remove(who.to_base58()); + } + _ => {} } - DuniterPeeringEvent::StreamClosed(who) => { - state.remove(who.to_base58()); - } - _ => {} - } - } + }) + .await }); sink } -- GitLab