Skip to content
Snippets Groups Projects
Commit c95f7831 authored by Cédric Moreau's avatar Cédric Moreau Committed by Éloïs
Browse files

Resolve "Add RPC method to list peers" (!316)

* fix: Docker config + documentation

* Revert "review: test code, not dead code"

This reverts commit 922df964.

* review: test code, not dead code

* review: iodiomatic usage of stream

* review: add --public-endpoints option

* review: spawn_handle + BoundedVec

* fix: metadata.scale

* fix: warnings

* fix: node was no more starting

* clippy (again)

* clippy

* feat(#97): add RPC endpoint + gossip protocol for peerings
parent b202881c
Branches
Tags
1 merge request!316Resolve "Add RPC method to list peers"
Pipeline #40074 failed
Showing
with 2204 additions and 1016 deletions
This diff is collapsed.
......@@ -105,6 +105,8 @@ simple_logger = { version = "4.3.3", default-features = false }
bincode = { version = "1.3.3", default-features = false }
dubp-wot = { version = "0.11.1", default-features = false }
flate2 = { version = "1.0.28", default-features = false }
array-bytes = { version = "6.2.2", default-features = false }
parking_lot = { version = "0.12.1" }
# Subxt
subxt = { git = 'https://github.com/duniter/subxt', branch = 'subxt-v0.38.0-duniter-substrate-v1.17.0', default-features = false }
......@@ -213,6 +215,9 @@ sc-telemetry = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch
sc-transaction-pool = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-basic-authorship = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network-sync = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network-test = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-utils = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-keystore = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-storage = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-timestamp = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
......
......@@ -68,6 +68,9 @@ volumes:
| `DUNITER_DISABLE_PROMETHEUS` | Boolean to disable the Prometheus endpoint on port 9615. | `false` |
| `DUNITER_DISABLE_TELEMETRY` | Boolean to disable connecting to the Substrate telemetry server. | `false` |
| `DUNITER_PRUNING_PROFILE` | _ `default`<br> _ `archive`: keep all blocks and state blocks<br> \* `light`: keep only last 256 state blocks and last 14400 blocks (one day duration) | `default` |
| `DUNITER_PUBLIC_RPC` | The public RPC endpoint to gossip on the network and make available in the apps. | None |
| `DUNITER_PUBLIC_SQUID` | The public Squid graphql endpoint to gossip on the network and make available in the apps. | None |
| `DUNITER_PUBLIC_ENDPOINTS` | Path to a JSON file containing public endpoints to gossip on the network. The file should use the following format:<br>```{"endpoints": [ { "protocol": "rpc", "address": "wss://gdev.example.com" }, { "protocol": "squid", "address": "gdev.example.com/graphql/v1" }]}``` | None |
## Other Duniter options
......
......@@ -59,6 +59,21 @@ if [ -n "$DUNITER_PUBLIC_ADDR" ]; then
set -- "$@" --public-addr "$DUNITER_PUBLIC_ADDR"
fi
# Define public RPC endpoint (gossiped on the network)
if [ -n "$DUNITER_PUBLIC_RPC" ]; then
set -- "$@" --public-rpc "$DUNITER_PUBLIC_RPC"
fi
# Define public Squid endpoint (gossiped on the network)
if [ -n "$DUNITER_PUBLIC_SQUID" ]; then
set -- "$@" --public-squid "$DUNITER_PUBLIC_SQUID"
fi
# Define public endpoints from JSON file (gossiped on the network)
if [ -n "$DUNITER_PUBLIC_ENDPOINTS" ]; then
set -- "$@" --public-endpoints "$DUNITER_PUBLIC_ENDPOINTS"
fi
# Define listen address (inside docker)
if [ -n "$DUNITER_LISTEN_ADDR" ]; then
set -- "$@" --listen-addr "$DUNITER_LISTEN_ADDR"
......
......@@ -130,6 +130,9 @@ num-format = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
codec = { workspace = true }
array-bytes = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
# Local
......@@ -162,11 +165,13 @@ sc-consensus-manual-seal = { workspace = true, default-features = true }
sc-executor = { workspace = true, default-features = true }
sc-keystore = { workspace = true, default-features = true }
sc-network = { workspace = true, default-features = true }
sc-network-sync = { workspace = true, default-features = true }
sc-offchain = { workspace = true, default-features = true }
sc-rpc-api = { workspace = true, default-features = true }
sc-telemetry = { workspace = true, default-features = true }
sc-transaction-pool = { workspace = true, default-features = true }
sc-transaction-pool-api = { workspace = true, default-features = true }
sc-utils = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-authority-discovery = { workspace = true, default-features = true }
sp-block-builder = { workspace = true, default-features = true }
......@@ -190,6 +195,12 @@ sp-transaction-pool = { workspace = true, default-features = true }
sp-transaction-storage-proof = { workspace = true, default-features = true }
substrate-frame-rpc-system = { workspace = true, default-features = true }
[dev-dependencies]
sc-network-test = { workspace = true, default-features = true }
async-trait = { version = "0.1.79" }
env_logger = "0.10.2"
async-channel = "2.3.1"
[build-dependencies]
substrate-build-script-utils = { workspace = true, default-features = true }
......
......@@ -19,9 +19,14 @@ pub struct Cli {
#[clap(subcommand)]
pub subcommand: Option<Subcommand>,
/// substrate base options
#[clap(flatten)]
pub run: sc_cli::RunCmd,
/// duniter specific options
#[clap(flatten)]
pub duniter_options: DuniterConfigExtension,
/// How blocks should be sealed
///
/// Options are "production", "instant", "manual", or timer interval in milliseconds
......@@ -29,6 +34,33 @@ pub struct Cli {
pub sealing: crate::cli::Sealing,
}
/// add options specific to duniter client
#[derive(Debug, Default, Clone, clap::Parser)]
pub struct DuniterConfigExtension {
/// Public RPC endpoint to gossip on the network and make available in the apps.
#[arg(long)]
pub public_rpc: Option<String>,
/// Public Squid graphql endpoint to gossip on the network and make available in the apps.
#[arg(long)]
pub public_squid: Option<String>,
/// Public endpoints from a JSON file, using following format where `protocol` and `address` are
/// strings (value is free) :
///
/// ```json
/// {
/// "endpoints": [
/// { "protocol": "rpc", "address": "wss://gdev.example.com" },
/// { "protocol": "squid", "address": "gdev.example.com/graphql/v1" },
/// { "protocol": "other", "address": "gdev.example.com/other" }
/// ]
/// }
/// ```
#[arg(long, value_name = "JSON_FILE_PATH")]
pub public_endpoints: Option<String>,
}
#[derive(Debug, clap::Subcommand)]
pub enum Subcommand {
/// Build a chain specification.
......
......@@ -22,7 +22,7 @@ pub mod utils;
use crate::{
chain_spec,
cli::{Cli, Subcommand},
cli::{Cli, DuniterConfigExtension, Subcommand},
service,
service::{runtime_executor::Executor, RuntimeType},
};
......@@ -374,6 +374,7 @@ pub fn run() -> sc_cli::Result<()> {
}
None => {
let runner = cli.create_runner(&cli.run)?;
let duniter_options: DuniterConfigExtension = cli.duniter_options;
runner.run_node_until_exit(|mut config| async move {
// Force offchain worker and offchain indexing if we have the role Authority
if config.role.is_authority() {
......@@ -386,7 +387,7 @@ pub fn run() -> sc_cli::Result<()> {
service::runtime_executor::runtime::RuntimeApi,
Executor,
sc_network::Litep2pNetworkBackend,
>(config, cli.sealing)
>(config, cli.sealing, duniter_options)
.map_err(sc_cli::Error::Service)
}
})
......
use crate::endpoint_gossip::{
types::validation_result::DuniterStreamValidationResult, DuniterEndpoints, Peer, Peering,
PROPAGATE_TIMEOUT,
};
use codec::{Decode, Encode};
use futures::{stream, FutureExt, Stream, StreamExt};
use log::debug;
use sc_network::{
service::traits::{NotificationEvent, ValidationResult},
utils::interval,
NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::__private::BlockT;
use std::{collections::HashMap, marker::PhantomData, pin::Pin};
pub fn build<
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo + Clone,
>(
notification_service: Box<dyn NotificationService>,
network: N,
rpc_sink: TracingUnboundedSender<DuniterPeeringEvent>,
command_rx: Option<TracingUnboundedReceiver<DuniterPeeringCommand>>,
endpoints: DuniterEndpoints,
) -> GossipsHandler<B, N> {
let local_peer_id = network.local_peer_id();
GossipsHandler {
b: PhantomData::<B>,
notification_service,
propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
network,
peers: HashMap::new(),
command_rx: command_rx.unwrap_or_else(|| {
let (_tx, rx) = tracing_unbounded("mpsc_duniter_peering_rpc_command", 1_000);
rx
}),
self_peering: Peering { endpoints },
events_reporter: DuniterEventsReporter {
sink: rpc_sink,
local_peer_id,
},
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum DuniterPeeringEvent {
StreamOpened(PeerId, ObservedRole),
StreamValidation(PeerId, DuniterStreamValidationResult),
StreamClosed(PeerId),
/// Received gossip from a peer, `bool` indicates whether the gossip was successfully decoded.
GossipReceived(PeerId, bool),
GoodPeering(PeerId, Peering),
AlreadyReceivedPeering(PeerId),
SelfPeeringPropagationSuccess(PeerId, Peering),
SelfPeeringPropagationFailed(PeerId, Peering, String),
}
pub enum DuniterPeeringCommand {
/// Send a peering to a peer.
#[allow(dead_code)] // only used in tests for now, maybe in the future by RPC
SendPeering(PeerId, Peering),
}
struct DuniterEventsReporter {
sink: TracingUnboundedSender<DuniterPeeringEvent>,
local_peer_id: PeerId,
}
impl DuniterEventsReporter {
/// Report an event for monitoring purposes (logs + unit tests).
fn report_event(&self, event: DuniterPeeringEvent) {
self.sink.unbounded_send(event.clone())
.unwrap_or_else(|e| {
log::error!(target: "duniter-libp2p", "[{}] Failed to send notification: {}", self.local_peer_id, e);
})
}
}
/// Handler for gossips. Call [`GossipsHandler::run`] to start the processing.
pub struct GossipsHandler<
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo,
> {
b: PhantomData<B>,
/// Interval at which we try to propagate our peering
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
/// Network service to use to send messages and manage peers.
network: N,
/// All connected peers and their known peering.
peers: HashMap<PeerId, Peer>,
/// The interal peering of the node.
self_peering: Peering,
/// Internal sink to report events.
events_reporter: DuniterEventsReporter,
/// Receiver for external commands (tests/RPC methods).
command_rx: TracingUnboundedReceiver<DuniterPeeringCommand>,
/// Handle that is used to communicate with `sc_network::Notifications`.
notification_service: Box<dyn NotificationService>,
}
impl<B, N> GossipsHandler<B, N>
where
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo,
{
/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
/// interrupted.
pub async fn run(mut self) {
// Share self peering do listeners of current handler
self.events_reporter
.report_event(DuniterPeeringEvent::GoodPeering(
self.network.local_peer_id(),
self.self_peering.clone(),
));
// Then start the network loop
loop {
futures::select! {
_ = self.propagate_timeout.next() => {
for (peer, peer_data) in self.peers.iter_mut() {
if !peer_data.sent_peering {
match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await {
Ok(_) => {
peer_data.sent_peering = true;
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone()));
}
Err(e) => {
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string()));
}
}
}
}
},
command = self.command_rx.next().fuse() => {
if let Some(command) = command {
self.handle_command(command).await
}
},
event = self.notification_service.next_event().fuse() => {
if let Some(event) = event {
self.handle_notification_event(event)
} else {
// `Notifications` has seemingly closed. Closing as well.
return
}
}
}
}
}
fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream {
peer,
handshake,
result_tx,
..
} => {
// only accept peers whose role can be determined
let result = self
.network
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let duniter_validation = DuniterStreamValidationResult::from(result);
self.events_reporter
.report_event(DuniterPeeringEvent::StreamValidation(
peer,
duniter_validation.clone(),
));
let _ = result_tx.send(duniter_validation.into());
}
NotificationEvent::NotificationStreamOpened {
peer, handshake, ..
} => {
let Some(role) = self.network.peer_role(peer, handshake) else {
debug!(target: "duniter-libp2p", "[{}] role for {peer} couldn't be determined", self.network.local_peer_id());
return;
};
let _was_in = self.peers.insert(
peer,
Peer {
sent_peering: false,
known_peering: None,
},
);
debug_assert!(_was_in.is_none());
self.events_reporter
.report_event(DuniterPeeringEvent::StreamOpened(peer, role));
}
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
self.events_reporter
.report_event(DuniterPeeringEvent::StreamClosed(peer));
}
NotificationEvent::NotificationReceived { peer, notification } => {
if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) {
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, true));
self.on_peering(peer, peering);
} else {
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, false));
self.network.report_peer(peer, rep::BAD_PEERING);
}
}
}
}
/// Called when peer sends us new peerings
fn on_peering(&mut self, who: PeerId, peering: Peering) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if peer.known_peering.is_some() {
// Peering has already been received for this peer. Only one is allowed per connection.
self.network.report_peer(who, rep::BAD_PEERING);
self.events_reporter
.report_event(DuniterPeeringEvent::AlreadyReceivedPeering(who));
} else {
peer.known_peering = Some(peering.clone());
self.events_reporter
.report_event(DuniterPeeringEvent::GoodPeering(who, peering.clone()));
self.network.report_peer(who, rep::GOOD_PEERING);
}
}
}
async fn handle_command(&mut self, cmd: DuniterPeeringCommand) {
match cmd {
DuniterPeeringCommand::SendPeering(peer, peering) => {
debug!(target: "duniter-libp2p", "[{}]Sending COMMANDED self peering to {}", self.network.local_peer_id(), peer);
match self
.notification_service
.send_async_notification(&peer, peering.encode())
.await
{
Ok(_) => {
self.events_reporter.report_event(
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, peering),
);
}
Err(e) => {
self.events_reporter.report_event(
DuniterPeeringEvent::SelfPeeringPropagationFailed(
peer,
peering,
e.to_string(),
),
);
}
}
}
};
}
}
mod rep {
use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us an peering that we didn't know about.
pub const GOOD_PEERING: Rep = Rep::new(1 << 7, "Good peering");
/// Reputation change when a peer sends us a bad peering.
pub const BAD_PEERING: Rep = Rep::new(-(1 << 12), "Bad peering");
}
pub(crate) mod handler;
pub(crate) mod rpc;
#[cfg(test)]
mod tests;
mod types;
use crate::endpoint_gossip::duniter_peering_protocol_name::NAME;
use codec::{Decode, Encode};
use frame_benchmarking::__private::traits::ConstU32;
use sc_network::{
config::{PeerStoreProvider, SetConfig},
types::ProtocolName,
NetworkBackend, NotificationMetrics, NotificationService, MAX_RESPONSE_SIZE,
};
use serde::{Deserialize, Serialize};
use sp_api::__private::BlockT;
use sp_core::bounded_vec::BoundedVec;
use std::{sync::Arc, time};
pub mod well_known_endpoint_types {
pub const RPC: &str = "rpc";
pub const SQUID: &str = "squid";
}
pub struct DuniterPeeringParams {
/// Handle that is used to communicate with `sc_network::Notifications`.
pub notification_service: Box<dyn NotificationService>,
}
/// Maximum allowed size for a transactions notification.
pub(crate) const MAX_GOSSIP_SIZE: u64 = MAX_RESPONSE_SIZE;
/// Interval at which we propagate gossips;
pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_secs(1);
pub mod duniter_peering_protocol_name {
pub(crate) const NAME: &str = "duniter-peerings/1";
}
impl DuniterPeeringParams {
/// Create a new instance.
pub fn new<
Hash: AsRef<[u8]>,
Block: BlockT,
Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
>(
genesis_hash: Hash,
fork_id: Option<&str>,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, Net::NotificationProtocolConfig) {
let genesis_hash = genesis_hash.as_ref();
let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
format!(
"/{}/{}/{}",
array_bytes::bytes2hex("", genesis_hash),
fork_id,
NAME,
)
} else {
format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), NAME)
}
.into();
let (config, notification_service) = Net::notification_config(
protocol_name.clone(),
vec![format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), NAME).into()],
MAX_GOSSIP_SIZE,
None,
// Default config, allowing some non-reserved nodes to connect
SetConfig::default(),
metrics,
peer_store_handle,
);
(
Self {
notification_service,
},
config,
)
}
}
/// Peer information
#[derive(Debug)]
struct Peer {
/// Holds a set of transactions known to this peer.
known_peering: Option<Peering>,
sent_peering: bool,
}
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct DuniterEndpoint {
/// The name of the endpoint (e.g. "rpc" or "squid") are well-known names
pub protocol: String,
/// The endpoint itself (e.g. "squid.example.com/v1/graphql")
pub address: String,
}
pub type DuniterEndpoints = BoundedVec<DuniterEndpoint, ConstU32<10>>;
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct Peering {
pub endpoints: DuniterEndpoints,
}
//! # Duniter Peering RPC API
//!
//! Exposes the `duniter_peerings` RPC method.
use crate::endpoint_gossip::rpc::{data::DuniterPeeringsData, state::DuniterPeeringsState};
use jsonrpsee::{core::async_trait, proc_macros::rpc, Extensions};
use sc_consensus_babe_rpc::Error;
/// The exposed RPC methods
#[rpc(client, server)]
pub trait DuniterPeeringRpcApi {
/// Returns the known peerings list received by network gossips
#[method(name = "duniter_peerings", with_extensions)]
async fn duniter_peerings(&self) -> Result<Option<DuniterPeeringsData>, Error>;
}
/// API implementation
pub struct DuniterPeeringRpcApiImpl {
shared_peer_state: DuniterPeeringsState,
}
impl DuniterPeeringRpcApiImpl {
/// Creates a new instance of the Duniter Peering Rpc handler.
pub fn new(shared_peer_state: DuniterPeeringsState) -> Self {
Self { shared_peer_state }
}
}
#[async_trait]
impl DuniterPeeringRpcApiServer for DuniterPeeringRpcApiImpl {
async fn duniter_peerings(
&self,
_ext: &Extensions,
) -> Result<Option<DuniterPeeringsData>, Error> {
let option = self.shared_peer_state.peer_state();
Ok(option)
}
}
use crate::endpoint_gossip::rpc::state::PeeringWithId;
use jsonrpsee::core::Serialize;
use serde::Deserialize;
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(Debug))]
pub struct DuniterPeeringsData {
pub peerings: Vec<PeeringWithId>,
}
//! # RPC for peering
//!
//! This module gathers all known peering documents for connected peers in memory and provides
//! an RPC interface to query them.
//!
//! ## RPC methods
//!
//! Currently, only one RPC method is available to query the currently known peerings.
//! In the future, the RPC interface could add methods to dynamically change the current node's peering
//! without restarting the node.
//!
//! ### `duniter_peerings`
//!
//! Returns the known peerings list received by network gossips.
//!
//! ```json
//! {
//! "jsonrpc": "2.0",
//! "id": 0,
//! "result": {
//! "peers": [
//! {
//! "endpoints": [
//! "/rpc/wss://gdev.example.com",
//! "/squid/https://squid.gdev.gyroi.de/v1/graphql"
//! ]
//! },
//! {
//! "endpoints": [
//! "/rpc/ws://gdev.example.com:9944"
//! ]
//! }
//! ]
//! }
//! }
//! ```
//!
pub mod api;
pub mod data;
pub mod state;
#[cfg(test)]
mod tests;
use crate::endpoint_gossip::{
handler::DuniterPeeringEvent, rpc::data::DuniterPeeringsData, DuniterEndpoints,
};
use codec::{Decode, Encode};
use futures::StreamExt;
use jsonrpsee::core::Serialize;
use parking_lot::RwLock;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use serde::Deserialize;
use std::sync::Arc;
/// A struct to hold a peer endpoints along with its id for RPC exposure.
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct PeeringWithId {
pub peer_id: String,
pub endpoints: DuniterEndpoints,
}
#[derive(Clone)]
pub struct DuniterPeeringsState {
inner: Arc<RwLock<Option<Box<DuniterPeeringsData>>>>,
}
/// Dummy CRUD operations for the state to be exposed, plus a listening sink to be notified of
/// network events and automatically insert/remove peers from the state.
impl DuniterPeeringsState {
pub fn empty() -> Self {
Self {
inner: Arc::new(RwLock::new(Some(Box::new(DuniterPeeringsData {
peerings: Vec::new(),
})))),
}
}
pub fn insert(&self, peering: PeeringWithId) -> &Self {
if let Some(vs) = self.inner.write().as_mut() {
vs.peerings.push(peering);
}
self
}
pub fn remove(&self, peer_id: String) -> &Self {
if let Some(vs) = self.inner.write().as_mut() {
vs.peerings.retain(|p| p.peer_id != peer_id);
}
self
}
pub fn peer_state(&self) -> Option<DuniterPeeringsData> {
self.inner.read().as_ref().map(|vs| vs.as_ref().clone())
}
/// Creates a channel for binding to the network events.
pub fn listen(&self) -> TracingUnboundedSender<DuniterPeeringEvent> {
let (sink, stream) = tracing_unbounded("mpsc_duniter_peering_rpc_stream", 1_000);
let state = self.clone();
tokio::spawn(async move {
stream
.for_each(|event| async {
match event {
DuniterPeeringEvent::GoodPeering(who, peering) => {
state.insert(PeeringWithId {
peer_id: who.to_base58(),
endpoints: peering.endpoints,
});
}
DuniterPeeringEvent::StreamClosed(who) => {
state.remove(who.to_base58());
}
_ => {}
}
})
.await
});
sink
}
}
use crate::endpoint_gossip::{
rpc::{
api::{DuniterPeeringRpcApiImpl, DuniterPeeringRpcApiServer},
state::{DuniterPeeringsState, PeeringWithId},
},
well_known_endpoint_types::{RPC, SQUID},
DuniterEndpoint, DuniterEndpoints,
};
use jsonrpsee::RpcModule;
#[tokio::test]
async fn empty_peers_rpc_handler() {
let rpc = setup_io_handler();
let expected_response = r#"{"jsonrpc":"2.0","id":0,"result":{"peerings":[]}}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"duniter_peerings","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
#[tokio::test]
async fn expose_known_peers() {
let rpc = setup_new_rpc_with_initial_peerings(vec![
PeeringWithId {
peer_id: "12D3KooWRkDXunbB64VegYPCQaitcgtdtEtbsbd7f19nsS7aMjDp".into(),
endpoints: DuniterEndpoints::truncate_from(vec![
DuniterEndpoint {
protocol: RPC.into(),
address: "/rpc/wss://gdev.example.com".into(),
},
DuniterEndpoint {
protocol: SQUID.into(),
address: "/squid/https://squid.gdev.gyroi.de/v1/graphql".into(),
},
]),
},
PeeringWithId {
peer_id: "12D3KooWFiUBo3Kjiryvrpz8b3kfNVk7baezhab7SHdfafgY7nmN".into(),
endpoints: DuniterEndpoints::truncate_from(vec![DuniterEndpoint {
protocol: RPC.into(),
address: "/rpc/ws://gdev.example.com:9944".into(),
}]),
},
]);
let expected_response = r#"{"jsonrpc":"2.0","id":0,"result":{"peerings":[{"peer_id":"12D3KooWRkDXunbB64VegYPCQaitcgtdtEtbsbd7f19nsS7aMjDp","endpoints":[{"protocol":"rpc","address":"/rpc/wss://gdev.example.com"},{"protocol":"squid","address":"/squid/https://squid.gdev.gyroi.de/v1/graphql"}]},{"peer_id":"12D3KooWFiUBo3Kjiryvrpz8b3kfNVk7baezhab7SHdfafgY7nmN","endpoints":[{"protocol":"rpc","address":"/rpc/ws://gdev.example.com:9944"}]}]}}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"duniter_peerings","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
fn setup_io_handler() -> RpcModule<DuniterPeeringRpcApiImpl> {
DuniterPeeringRpcApiImpl::new(DuniterPeeringsState::empty()).into_rpc()
}
fn setup_new_rpc_with_initial_peerings(
peers: Vec<PeeringWithId>,
) -> RpcModule<DuniterPeeringRpcApiImpl> {
let state = DuniterPeeringsState::empty();
for peer in peers {
state.insert(peer);
}
DuniterPeeringRpcApiImpl::new(state).into_rpc()
}
use crate::{
endpoint_gossip,
endpoint_gossip::{
duniter_peering_protocol_name,
handler::{DuniterPeeringCommand, DuniterPeeringEvent},
well_known_endpoint_types::RPC,
DuniterEndpoint, DuniterEndpoints, Peering,
},
};
use async_channel::Receiver;
use futures::{future, stream, FutureExt, StreamExt};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_consensus::{
BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ImportResult,
ImportedAux,
};
use sc_network::{NetworkStateInfo, ObservedRole, PeerId};
use sc_network_test::{
Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, PeersClient, TestNetFactory,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::__private::BlockT;
use sp_consensus::Error as ConsensusError;
use sp_runtime::traits::Header;
use std::{future::Future, pin::pin, sync::Arc, task::Poll, time::Duration};
#[tokio::test]
async fn peering_is_forwarded_and_only_once_per_connection() {
let _ = env_logger::try_init();
let authorities_count = 3;
let full_count = 1;
let total_peers = authorities_count + full_count;
let mut net = DuniterPeeringTestNet::new(authorities_count, full_count);
tokio::spawn(start_network(&mut net, total_peers));
let net = Arc::new(Mutex::new(net));
// make sure the network is ready (each peering is received by all other peers)
let wait_for_all_peering_notifications =
watch_events_and_wait_for_all_peerings(total_peers, &net);
let wait_for = futures::future::join_all(wait_for_all_peering_notifications).map(|_| ());
tokio::time::timeout(Duration::from_secs(5), run_until_complete(wait_for, &net))
.await
.unwrap();
// rule: only one peering is accepted per connection (disconnecting/restarting allows to change the peering value)
let already_received = ensure_only_one_peering_is_accepted(&net);
tokio::time::timeout(
Duration::from_secs(5),
run_until_complete(already_received, &net),
)
.await
.unwrap();
}
fn ensure_only_one_peering_is_accepted(
net: &Arc<Mutex<DuniterPeeringTestNet>>,
) -> impl Future<Output = ()> {
let command_0 = net.lock().peer_commands[0].clone();
let peer_id_0 = net.lock().peer_ids[0];
let peer_id_1 = net.lock().peer_ids[1];
let stream_1 = net.lock().peer_streams[1].clone();
let already_received = async move {
let mut stream1 = pin!(stream_1);
while let Some(event) = stream1.next().await {
if let DuniterPeeringEvent::AlreadyReceivedPeering(peer) = event {
if peer == peer_id_0 {
// We did receive the peering from peer 0
break;
}
}
}
};
let already_received = futures::future::join_all(vec![already_received]).map(|_| ());
command_0
.unbounded_send(DuniterPeeringCommand::SendPeering(
peer_id_1,
Peering {
endpoints: DuniterEndpoints::truncate_from(vec![DuniterEndpoint {
protocol: RPC.into(),
address: "gdev.example.com:9944".into(),
}]),
},
))
.unwrap();
already_received
}
fn watch_events_and_wait_for_all_peerings(
total_peers: usize,
net: &Arc<Mutex<DuniterPeeringTestNet>>,
) -> Vec<impl Future<Output = ()> + Sized> {
let mut peering_notifications = Vec::new();
for peer_id in 0..total_peers {
let local_peer_id = net.lock().peer_ids[peer_id];
let stream = net.lock().peer_streams[peer_id].clone();
peering_notifications.push(async move {
let mut identified = 0;
let mut stream = pin!(stream);
while let Some(event) = stream.next().await {
debug_event(event.clone(), local_peer_id);
if let DuniterPeeringEvent::GoodPeering(peer, _) = event {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer);
identified += 1;
if identified == (total_peers - 1) {
// all peers identified
break;
}
}
}
warn!("All peers sent their peering");
})
}
peering_notifications
}
fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) {
match event {
DuniterPeeringEvent::StreamOpened(peer, role) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role));
}
DuniterPeeringEvent::StreamValidation(peer, result) => {
debug!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result);
}
DuniterPeeringEvent::StreamClosed(peer) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id);
}
DuniterPeeringEvent::GossipReceived(peer, success) => {
if success {
debug!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id);
} else {
debug!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id);
}
}
DuniterPeeringEvent::GoodPeering(peer, _) => {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer);
}
DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
debug!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer);
panic!("Received peering from the same peer twice");
}
DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => {
debug!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e);
panic!("Failed to propagate self peering");
}
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => {
debug!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer);
}
}
}
fn observed_role_to_str(role: ObservedRole) -> &'static str {
match role {
ObservedRole::Authority => "Authority",
ObservedRole::Full => "Full",
ObservedRole::Light => "Light",
}
}
// Spawns duniter nodes. Returns a future to spawn on the runtime.
fn start_network(net: &mut DuniterPeeringTestNet, peers: usize) -> impl Future<Output = ()> {
let nodes = stream::FuturesUnordered::new();
for peer_id in 0..peers {
let net_service = net.peers[peer_id].network_service().clone();
net.peer_ids.push(net_service.local_peer_id());
let notification_service = net.peers[peer_id]
.take_notification_service(&format!("/{}", duniter_peering_protocol_name::NAME).into())
.unwrap();
let (rpc_sink, mut stream_unbounded) =
tracing_unbounded("mpsc_duniter_gossip_peering_test", 100_000);
let (sink_unbounded, stream) = async_channel::unbounded();
let (command_tx, command_rx) =
tracing_unbounded("mpsc_duniter_gossip_peering_test_command", 100_000);
// mapping from mpsc TracingUnboundedReceiver to mpmc Receiver
tokio::spawn(async move {
// forward the event
while let Some(command) = stream_unbounded.next().await {
sink_unbounded.send(command).await.unwrap();
}
});
let handler = endpoint_gossip::handler::build::<Block, _>(
notification_service,
net_service,
rpc_sink,
Some(command_rx),
DuniterEndpoints::new(),
);
// To send external commands to the handler (for tests or RPC commands).
net.peer_streams.push(stream);
net.peer_commands.push(command_tx);
let node = handler.run();
fn assert_send<T: Send>(_: &T) {}
assert_send(&node);
nodes.push(node);
}
nodes.for_each(|_| async move {})
}
#[derive(Default)]
struct DuniterPeeringTestNet {
// Peers
peers: Vec<DuniterPeeringPeer>,
// IDs of the peers
peer_ids: Vec<PeerId>,
// RX of the gossip events
peer_streams: Vec<Receiver<DuniterPeeringEvent>>,
// TX to drive the handler (for tests or configuration)
peer_commands: Vec<TracingUnboundedSender<DuniterPeeringCommand>>,
}
type DuniterPeeringPeer = sc_network_test::Peer<PeerData, DuniterTestBlockImport>;
impl DuniterPeeringTestNet {
fn new(n_authority: usize, n_full: usize) -> Self {
let mut net = DuniterPeeringTestNet {
peers: Vec::with_capacity(n_authority + n_full),
peer_ids: Vec::new(),
peer_streams: Vec::new(),
peer_commands: Vec::new(),
};
for _ in 0..n_authority {
net.add_authority_peer();
}
for _ in 0..n_full {
net.add_full_peer();
}
net
}
fn add_authority_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
format!("/{}", duniter_peering_protocol_name::NAME).into()
],
is_authority: true,
..Default::default()
})
}
}
#[derive(Default)]
struct PeerData;
impl TestNetFactory for DuniterPeeringTestNet {
type BlockImport = DuniterTestBlockImport;
type PeerData = PeerData;
type Verifier = PassThroughVerifier;
fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier {
PassThroughVerifier::new(false) // use non-instant finality.
}
fn peer(&mut self, i: usize) -> &mut DuniterPeeringPeer {
&mut self.peers[i]
}
fn peers(&self) -> &Vec<DuniterPeeringPeer> {
&self.peers
}
fn peers_mut(&mut self) -> &mut Vec<DuniterPeeringPeer> {
&mut self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<DuniterPeeringPeer>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
fn make_block_import(
&self,
_client: PeersClient,
) -> (
BlockImportAdapter<Self::BlockImport>,
Option<BoxJustificationImport<Block>>,
Self::PeerData,
) {
(
BlockImportAdapter::new(DuniterTestBlockImport),
None,
PeerData,
)
}
fn add_full_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
format!("/{}", duniter_peering_protocol_name::NAME).into()
],
is_authority: false,
..Default::default()
})
}
}
async fn run_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<DuniterPeeringTestNet>>) {
let drive_to_completion = futures::future::poll_fn(|cx| {
net.lock().poll(cx);
Poll::<()>::Pending
});
future::select(future, drive_to_completion).await;
}
#[derive(Clone)]
struct DuniterTestBlockImport;
/// Inspired by GrandpaBlockImport
#[async_trait::async_trait]
impl<Block: BlockT> BlockImport<Block> for DuniterTestBlockImport {
type Error = ConsensusError;
/// Fake check block, always succeeds.
async fn check_block(
&self,
_block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::Imported(ImportedAux {
is_new_best: true,
bad_justification: false,
clear_justification_requests: false,
header_only: false,
needs_justification: false,
}))
}
/// Fake import block, always succeeds.
async fn import_block(
&self,
block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
debug!("Importing block #{}", block.header.number());
Ok(ImportResult::Imported(ImportedAux {
is_new_best: true,
bad_justification: false,
clear_justification_requests: false,
header_only: false,
needs_justification: false,
}))
}
}
pub mod validation_result;
use sc_network::service::traits::ValidationResult;
use std::fmt::Display;
/// Clonable version of sc_network::service::traits::ValidationResult
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DuniterStreamValidationResult {
/// Accept inbound substream.
Accept,
/// Reject inbound substream.
Reject,
}
impl Display for DuniterStreamValidationResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DuniterStreamValidationResult::Accept => write!(f, "Accept"),
DuniterStreamValidationResult::Reject => write!(f, "Reject"),
}
}
}
impl From<ValidationResult> for DuniterStreamValidationResult {
fn from(result: ValidationResult) -> Self {
match result {
ValidationResult::Accept => DuniterStreamValidationResult::Accept,
ValidationResult::Reject => DuniterStreamValidationResult::Reject,
}
}
}
impl From<DuniterStreamValidationResult> for ValidationResult {
fn from(result: DuniterStreamValidationResult) -> Self {
match result {
DuniterStreamValidationResult::Accept => ValidationResult::Accept,
DuniterStreamValidationResult::Reject => ValidationResult::Reject,
}
}
}
......@@ -65,5 +65,6 @@
pub mod chain_spec;
pub mod cli;
pub mod command;
pub mod endpoint_gossip;
pub mod rpc;
pub mod service;
......@@ -24,6 +24,7 @@ mod chain_spec;
mod service;
pub(crate) mod cli;
mod command;
mod endpoint_gossip;
mod rpc;
fn main() -> sc_cli::Result<()> {
......
......@@ -22,6 +22,7 @@
#![warn(missing_docs)]
use crate::endpoint_gossip::rpc::{api::DuniterPeeringRpcApiServer, state::DuniterPeeringsState};
use common_runtime::{AccountId, Balance, Block, BlockNumber, Hash, Index};
use jsonrpsee::RpcModule;
use sc_consensus_babe::{BabeApi, BabeWorkerHandle};
......@@ -61,6 +62,13 @@ pub struct GrandpaDeps<B> {
pub finality_provider: Arc<FinalityProofProvider<B, Block>>,
}
/// Dependencies for DuniterPeering
#[derive(Clone)]
pub struct DuniterPeeringRpcModuleDeps {
/// The state of the DuniterPeering RPC module which will be exposed.
pub state: DuniterPeeringsState,
}
/// Full client dependencies.
pub struct FullDeps<C, P, SC, B> {
/// The client instance to use.
......@@ -77,6 +85,8 @@ pub struct FullDeps<C, P, SC, B> {
pub babe: Option<BabeDeps>,
/// GRANDPA specific dependencies.
pub grandpa: GrandpaDeps<B>,
/// DuniterPeering specific dependencies.
pub duniter_peering: DuniterPeeringRpcModuleDeps,
}
/// Instantiate all full RPC extensions.
......@@ -109,6 +119,7 @@ where
command_sink_opt,
babe,
grandpa,
duniter_peering: endpoint_gossip,
} = deps;
if let Some(babe) = babe {
......@@ -140,7 +151,7 @@ where
)?;
module.merge(System::new(client.clone(), pool).into_rpc())?;
module.merge(TransactionPayment::new(client).into_rpc())?;
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
if let Some(command_sink) = command_sink_opt {
// We provide the rpc handler with the sending end of the channel to allow the rpc
// send EngineCommands to the background block authorship task.
......@@ -151,6 +162,10 @@ where
// `YourRpcStruct` should have a reference to a client, which is needed
// to call into the runtime.
// `module.merge(YourRpcTrait::into_rpc(YourRpcStruct::new(ReferenceToClient, ...)))?;`
module.merge(
crate::endpoint_gossip::rpc::api::DuniterPeeringRpcApiImpl::new(endpoint_gossip.state)
.into_rpc(),
)?;
Ok(module)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment