Skip to content
Snippets Groups Projects
Select Git revision
  • 8ed08060be083adb30b2b75d613c8fb99cd0e5bc
  • master default protected
  • network/gdev-800 protected
  • cgeek/issue-297-cpu
  • gdev-800-tests
  • update-docker-compose-rpc-squid-names
  • fix-252
  • 1000i100-test
  • hugo/tmp-0.9.1
  • network/gdev-803 protected
  • hugo/endpoint-gossip
  • network/gdev-802 protected
  • hugo/distance-precompute
  • network/gdev-900 protected
  • tuxmain/anonymous-tx
  • debug/podman
  • hugo/195-doc
  • hugo/195-graphql-schema
  • hugo-tmp-dockerfile-cache
  • release/client-800.2 protected
  • release/runtime-800 protected
  • gdev-900-0.10.1 protected
  • gdev-900-0.10.0 protected
  • gdev-900-0.9.2 protected
  • gdev-800-0.8.0 protected
  • gdev-900-0.9.1 protected
  • gdev-900-0.9.0 protected
  • gdev-803 protected
  • gdev-802 protected
  • runtime-801 protected
  • gdev-800 protected
  • runtime-800-bis protected
  • runtime-800 protected
  • runtime-800-backup protected
  • runtime-701 protected
  • runtime-700 protected
  • runtime-600 protected
  • runtime-500 protected
  • v0.4.1 protected
  • runtime-401 protected
  • v0.4.0 protected
41 results

tests.rs

Blame
  • tests.rs 11.77 KiB
    use crate::{
        endpoint_gossip,
        endpoint_gossip::{
            duniter_peering_protocol_name,
            handler::{DuniterPeeringCommand, DuniterPeeringEvent},
            well_known_endpoint_types::RPC,
            DuniterEndpoint, Peering,
        },
    };
    use futures::{future, stream, FutureExt, StreamExt};
    use log::{debug, warn};
    use parking_lot::{Mutex, RwLock};
    use sc_consensus::{
        BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ImportResult,
        ImportedAux,
    };
    use sc_network::{NetworkStateInfo, ObservedRole, PeerId};
    use sc_network_test::{
        Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, PeersClient, TestNetFactory,
    };
    use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
    use sp_api::__private::BlockT;
    use sp_consensus::Error as ConsensusError;
    use sp_runtime::traits::Header;
    use std::{future::Future, sync::Arc, task::Poll, time::Duration};
    
    #[tokio::test]
    async fn peering_is_forwarded_and_only_once_per_connection() {
        let _ = env_logger::try_init();
        let authorities_count = 3;
        let full_count = 1;
        let total_peers = authorities_count + full_count;
        let mut net = DuniterPeeringTestNet::new(authorities_count, full_count);
        tokio::spawn(start_network(&mut net, total_peers));
        let net = Arc::new(Mutex::new(net));
    
        // make sure the network is ready (each peering is received by all other peers)
        let wait_for_all_peering_notifications =
            watch_events_and_wait_for_all_peerings(total_peers, &net);
        let wait_for = futures::future::join_all(wait_for_all_peering_notifications).map(|_| ());
        tokio::time::timeout(Duration::from_secs(5), run_until_complete(wait_for, &net))
            .await
            .unwrap();
    
        // rule: only one peering is accepted per connection (disconnecting/restarting allows to change the peering value)
        let already_received = ensure_only_one_peering_is_accepted(&net);
        tokio::time::timeout(
            Duration::from_secs(5),
            run_until_complete(already_received, &net),
        )
        .await
        .unwrap();
    }
    
    fn ensure_only_one_peering_is_accepted(
        net: &Arc<Mutex<DuniterPeeringTestNet>>,
    ) -> impl Future<Output = ()> {
        let command_0 = net.lock().peer_commands[0].clone();
        let stream1 = net.lock().peer_streams[1].clone();
        let peer_id_0 = net.lock().peer_ids[0].clone();
        let peer_id_1 = net.lock().peer_ids[1].clone();
        let already_received = async move {
            while let Some(event) = stream1.write().next().await {
                match event {
                    DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
                        if peer == peer_id_0 {
                            // We did receive the peering from peer 0
                            break;
                        }
                    }
                    _ => {}
                }
            }
        };
        let already_received = futures::future::join_all(vec![already_received]).map(|_| ());
        command_0
            .unbounded_send(DuniterPeeringCommand::SendPeering(
                peer_id_1,
                Peering {
                    endpoints: vec![DuniterEndpoint {
                        protocol: RPC.into(),
                        address: "gdev.example.com:9944".into(),
                    }],
                },
            ))
            .unwrap();
        already_received
    }
    
    fn watch_events_and_wait_for_all_peerings(
        total_peers: usize,
        net: &Arc<Mutex<DuniterPeeringTestNet>>,
    ) -> Vec<impl Future<Output = ()> + Sized> {
        let mut peering_notifications = Vec::new();
    
        for peer_id in 0..total_peers {
            let local_peer_id = net.lock().peer_ids[peer_id];
            let stream = net.lock().peer_streams[peer_id].clone();
            peering_notifications.push(async move {
                let mut identified = 0;
                while let Some(event) = stream.write().next().await {
                    debug_event(event.clone(), local_peer_id);
                    match event {
                        DuniterPeeringEvent::GoodPeering(peer, _) => {
                            debug!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer);
                            identified += 1;
                            if identified == (total_peers - 1) {
                                // all peers identified
                                break;
                            }
                        },
                        _ => {}
                    }
                }
                warn!("All peers sent their peering");
            })
        }
        peering_notifications
    }
    
    fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) {
        match event {
            DuniterPeeringEvent::StreamOpened(peer, role) => {
                debug!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role));
            }
            DuniterPeeringEvent::StreamValidation(peer, result) => {
                debug!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result);
            }
            DuniterPeeringEvent::StreamClosed(peer) => {
                debug!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id);
            }
            DuniterPeeringEvent::GossipReceived(peer, success) => {
                if success {
                    debug!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id);
                } else {
                    debug!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id);
                }
            }
            DuniterPeeringEvent::GoodPeering(peer, _) => {
                debug!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer);
            }
            DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
                debug!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer);
                panic!("Received peering from the same peer twice");
            }
            DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => {
                debug!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e);
                panic!("Failed to propagate self peering");
            }
            DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => {
                debug!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer);
            }
        }
    }
    
    fn observed_role_to_str(role: ObservedRole) -> &'static str {
        match role {
            ObservedRole::Authority => "Authority",
            ObservedRole::Full => "Full",
            ObservedRole::Light => "Light",
        }
    }
    
    // Spawns duniter nodes. Returns a future to spawn on the runtime.
    fn start_network(net: &mut DuniterPeeringTestNet, peers: usize) -> impl Future<Output = ()> {
        let nodes = stream::FuturesUnordered::new();
    
        for peer_id in 0..peers {
            let net_service = net.peers[peer_id].network_service().clone();
            net.peer_ids.push(net_service.local_peer_id().clone());
            let notification_service = net.peers[peer_id]
                .take_notification_service(&format!("/{}", duniter_peering_protocol_name::NAME).into())
                .unwrap();
    
            let (rpc_sink, stream) = tracing_unbounded("mpsc_duniter_gossip_peering_test", 100_000);
            let (command_tx, command_rx) =
                tracing_unbounded("mpsc_duniter_gossip_peering_test_command", 100_000);
    
            let handler = endpoint_gossip::handler::build::<Block, _>(
                notification_service,
                net_service,
                rpc_sink,
                Some(command_rx),
                vec![],
            );
            // To send external commands to the handler (for tests or RPC commands).
            net.peer_streams.push(Arc::new(RwLock::new(stream)));
            net.peer_commands.push(command_tx);
            let node = handler.run();
    
            fn assert_send<T: Send>(_: &T) {}
            assert_send(&node);
    
            nodes.push(node);
        }
    
        nodes.for_each(|_| async move {})
    }
    
    #[derive(Default)]
    struct DuniterPeeringTestNet {
        // Peers
        peers: Vec<DuniterPeeringPeer>,
        // IDs of the peers
        peer_ids: Vec<PeerId>,
        // RX of the gossip events
        peer_streams: Vec<Arc<RwLock<TracingUnboundedReceiver<DuniterPeeringEvent>>>>,
        // TX to drive the handler (for tests or configuration)
        peer_commands: Vec<TracingUnboundedSender<DuniterPeeringCommand>>,
    }
    
    type DuniterPeeringPeer = sc_network_test::Peer<PeerData, DuniterTestBlockImport>;
    
    impl DuniterPeeringTestNet {
        fn new(n_authority: usize, n_full: usize) -> Self {
            let mut net = DuniterPeeringTestNet {
                peers: Vec::with_capacity(n_authority + n_full),
                peer_ids: Vec::new(),
                peer_streams: Vec::new(),
                peer_commands: Vec::new(),
            };
    
            for _ in 0..n_authority {
                net.add_authority_peer();
            }
    
            for _ in 0..n_full {
                net.add_full_peer();
            }
    
            net
        }
    
        fn add_authority_peer(&mut self) {
            self.add_full_peer_with_config(FullPeerConfig {
                notifications_protocols: vec![
                    format!("/{}", duniter_peering_protocol_name::NAME).into()
                ],
                is_authority: true,
                ..Default::default()
            })
        }
    }
    
    #[derive(Default)]
    struct PeerData;
    
    impl TestNetFactory for DuniterPeeringTestNet {
        type BlockImport = DuniterTestBlockImport;
        type PeerData = PeerData;
        type Verifier = PassThroughVerifier;
    
        fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier {
            PassThroughVerifier::new(false) // use non-instant finality.
        }
    
        fn peer(&mut self, i: usize) -> &mut DuniterPeeringPeer {
            &mut self.peers[i]
        }
    
        fn peers(&self) -> &Vec<DuniterPeeringPeer> {
            &self.peers
        }
    
        fn peers_mut(&mut self) -> &mut Vec<DuniterPeeringPeer> {
            &mut self.peers
        }
    
        fn mut_peers<F: FnOnce(&mut Vec<DuniterPeeringPeer>)>(&mut self, closure: F) {
            closure(&mut self.peers);
        }
    
        fn make_block_import(
            &self,
            _client: PeersClient,
        ) -> (
            BlockImportAdapter<Self::BlockImport>,
            Option<BoxJustificationImport<Block>>,
            Self::PeerData,
        ) {
            (
                BlockImportAdapter::new(DuniterTestBlockImport),
                None,
                PeerData::default(),
            )
        }
    
        fn add_full_peer(&mut self) {
            self.add_full_peer_with_config(FullPeerConfig {
                notifications_protocols: vec![
                    format!("/{}", duniter_peering_protocol_name::NAME).into()
                ],
                is_authority: false,
                ..Default::default()
            })
        }
    }
    
    async fn run_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<DuniterPeeringTestNet>>) {
        let drive_to_completion = futures::future::poll_fn(|cx| {
            net.lock().poll(cx);
            Poll::<()>::Pending
        });
        future::select(future, drive_to_completion).await;
    }
    
    #[derive(Clone)]
    struct DuniterTestBlockImport;
    
    /// Inspired by GrandpaBlockImport
    #[async_trait::async_trait]
    impl<Block: BlockT> BlockImport<Block> for DuniterTestBlockImport {
        type Error = ConsensusError;
    
        /// Fake check block, always succeeds.
        async fn check_block(
            &self,
            _block: BlockCheckParams<Block>,
        ) -> Result<ImportResult, Self::Error> {
            Ok(ImportResult::Imported(ImportedAux {
                is_new_best: true,
                bad_justification: false,
                clear_justification_requests: false,
                header_only: false,
                needs_justification: false,
            }))
        }
    
        /// Fake import block, always succeeds.
        async fn import_block(
            &self,
            block: BlockImportParams<Block>,
        ) -> Result<ImportResult, Self::Error> {
            debug!("Importing block #{}", block.header.number());
            Ok(ImportResult::Imported(ImportedAux {
                is_new_best: true,
                bad_justification: false,
                clear_justification_requests: false,
                header_only: false,
                needs_justification: false,
            }))
        }
    }