Skip to content
Snippets Groups Projects
Commit 8ed08060 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

feat(#97): add RPC endpoint + gossip protocol for peerings

parent b202881c
No related branches found
No related tags found
1 merge request!316Resolve "Add RPC method to list peers"
Showing
with 3192 additions and 3412 deletions
Source diff could not be displayed: it is too large. Options to address this: view the blob.
......@@ -105,6 +105,8 @@ simple_logger = { version = "4.3.3", default-features = false }
bincode = { version = "1.3.3", default-features = false }
dubp-wot = { version = "0.11.1", default-features = false }
flate2 = { version = "1.0.28", default-features = false }
array-bytes = { version = "6.2.2", default-features = false }
parking_lot = { version = "0.12.1" }
# Subxt
subxt = { git = 'https://github.com/duniter/subxt', branch = 'subxt-v0.38.0-duniter-substrate-v1.17.0', default-features = false }
......@@ -213,6 +215,9 @@ sc-telemetry = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch
sc-transaction-pool = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-basic-authorship = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network-sync = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network-test = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-utils = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-keystore = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-storage = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-timestamp = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
......
......@@ -130,6 +130,9 @@ num-format = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
codec = { workspace = true }
array-bytes = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
# Local
......@@ -162,11 +165,13 @@ sc-consensus-manual-seal = { workspace = true, default-features = true }
sc-executor = { workspace = true, default-features = true }
sc-keystore = { workspace = true, default-features = true }
sc-network = { workspace = true, default-features = true }
sc-network-sync = { workspace = true, default-features = true }
sc-offchain = { workspace = true, default-features = true }
sc-rpc-api = { workspace = true, default-features = true }
sc-telemetry = { workspace = true, default-features = true }
sc-transaction-pool = { workspace = true, default-features = true }
sc-transaction-pool-api = { workspace = true, default-features = true }
sc-utils = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-authority-discovery = { workspace = true, default-features = true }
sp-block-builder = { workspace = true, default-features = true }
......@@ -190,6 +195,11 @@ sp-transaction-pool = { workspace = true, default-features = true }
sp-transaction-storage-proof = { workspace = true, default-features = true }
substrate-frame-rpc-system = { workspace = true, default-features = true }
[dev-dependencies]
sc-network-test = { workspace = true, default-features = true }
async-trait = { version = "0.1.79" }
env_logger = "0.10.2"
[build-dependencies]
substrate-build-script-utils = { workspace = true, default-features = true }
......
......@@ -19,9 +19,14 @@ pub struct Cli {
#[clap(subcommand)]
pub subcommand: Option<Subcommand>,
/// substrate base options
#[clap(flatten)]
pub run: sc_cli::RunCmd,
/// duniter specific options
#[clap(flatten)]
pub duniter_options: DuniterConfigExtension,
/// How blocks should be sealed
///
/// Options are "production", "instant", "manual", or timer interval in milliseconds
......@@ -29,6 +34,18 @@ pub struct Cli {
pub sealing: crate::cli::Sealing,
}
/// add options specific to duniter client
#[derive(Debug, Default, Clone, clap::Parser)]
pub struct DuniterConfigExtension {
/// Public RPC endpoint to gossip on the network and make available in the apps.
#[arg(long)]
pub public_rpc: Option<String>,
/// Public Squid graphql endpoint to gossip on the network and make available in the apps.
#[arg(long)]
pub public_squid: Option<String>,
}
#[derive(Debug, clap::Subcommand)]
pub enum Subcommand {
/// Build a chain specification.
......
......@@ -22,7 +22,7 @@ pub mod utils;
use crate::{
chain_spec,
cli::{Cli, Subcommand},
cli::{Cli, DuniterConfigExtension, Subcommand},
service,
service::{runtime_executor::Executor, RuntimeType},
};
......@@ -374,6 +374,7 @@ pub fn run() -> sc_cli::Result<()> {
}
None => {
let runner = cli.create_runner(&cli.run)?;
let duniter_options: DuniterConfigExtension = cli.duniter_options;
runner.run_node_until_exit(|mut config| async move {
// Force offchain worker and offchain indexing if we have the role Authority
if config.role.is_authority() {
......@@ -386,7 +387,7 @@ pub fn run() -> sc_cli::Result<()> {
service::runtime_executor::runtime::RuntimeApi,
Executor,
sc_network::Litep2pNetworkBackend,
>(config, cli.sealing)
>(config, cli.sealing, duniter_options)
.map_err(sc_cli::Error::Service)
}
})
......
use crate::endpoint_gossip::{
types::validation_result::DuniterStreamValidationResult, DuniterEndpoint, Peer, Peering,
PROPAGATE_TIMEOUT,
};
use codec::{Decode, Encode};
use futures::{stream, FutureExt, Stream, StreamExt};
use log::debug;
use sc_network::{
service::traits::{NotificationEvent, ValidationResult},
utils::interval,
NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::__private::BlockT;
use std::{collections::HashMap, marker::PhantomData, pin::Pin};
pub fn build<
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo + Clone,
>(
notification_service: Box<dyn NotificationService>,
network: N,
rpc_sink: TracingUnboundedSender<DuniterPeeringEvent>,
command_rx: Option<TracingUnboundedReceiver<DuniterPeeringCommand>>,
endpoints: Vec<DuniterEndpoint>,
) -> GossipsHandler<B, N> {
let local_peer_id = network.local_peer_id().clone();
let handler = GossipsHandler {
b: PhantomData::<B>,
notification_service,
propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
network,
peers: HashMap::new(),
command_rx: command_rx.unwrap_or_else(|| {
let (_tx, rx) = tracing_unbounded("mpsc_duniter_peering_rpc_command", 1_000);
rx
}),
self_peering: Peering { endpoints },
events_reporter: DuniterEventsReporter {
sink: rpc_sink,
local_peer_id,
},
};
handler
}
#[derive(Debug, Clone)]
pub enum DuniterPeeringEvent {
StreamOpened(PeerId, ObservedRole),
StreamValidation(PeerId, DuniterStreamValidationResult),
StreamClosed(PeerId),
/// Received gossip from a peer, `bool` indicates whether the gossip was successfully decoded.
GossipReceived(PeerId, bool),
GoodPeering(PeerId, Peering),
AlreadyReceivedPeering(PeerId),
SelfPeeringPropagationSuccess(PeerId, Peering),
SelfPeeringPropagationFailed(PeerId, Peering, String),
}
pub enum DuniterPeeringCommand {
/// Send a peering to a peer.
#[allow(dead_code)] // only used in tests for now, maybe in the future by RPC
SendPeering(PeerId, Peering),
}
struct DuniterEventsReporter {
sink: TracingUnboundedSender<DuniterPeeringEvent>,
local_peer_id: PeerId,
}
impl DuniterEventsReporter {
/// Report an event for monitoring purposes (logs + unit tests).
fn report_event(&self, event: DuniterPeeringEvent) {
self.sink.unbounded_send(event.clone())
.unwrap_or_else(|e| {
log::error!(target: "duniter-libp2p", "[{}] Failed to send notification: {}", self.local_peer_id, e);
})
}
}
/// Handler for gossips. Call [`GossipsHandler::run`] to start the processing.
pub struct GossipsHandler<
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo,
> {
b: PhantomData<B>,
/// Interval at which we try to propagate our peering
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
/// Network service to use to send messages and manage peers.
network: N,
/// All connected peers and their known peering.
peers: HashMap<PeerId, Peer>,
/// The interal peering of the node.
self_peering: Peering,
/// Internal sink to report events.
events_reporter: DuniterEventsReporter,
/// Receiver for external commands (tests/RPC methods).
command_rx: TracingUnboundedReceiver<DuniterPeeringCommand>,
/// Handle that is used to communicate with `sc_network::Notifications`.
notification_service: Box<dyn NotificationService>,
}
impl<B, N> GossipsHandler<B, N>
where
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo,
{
/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
/// interrupted.
pub async fn run(mut self) {
// Share self peering do listeners of current handler
self.events_reporter
.report_event(DuniterPeeringEvent::GoodPeering(
self.network.local_peer_id(),
self.self_peering.clone(),
));
// Then start the network loop
loop {
futures::select! {
_ = self.propagate_timeout.next() => {
for (peer, peer_data) in self.peers.iter_mut() {
if !peer_data.sent_peering {
match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await {
Ok(_) => {
peer_data.sent_peering = true;
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer.clone(), self.self_peering.clone()));
}
Err(e) => {
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(peer.clone(), self.self_peering.clone(), e.to_string()));
}
}
}
}
},
command = self.command_rx.next().fuse() => {
if let Some(command) = command {
self.handle_command(command).await
} else {
// `Notifications` has seemingly closed. Closing as well.
return
}
},
event = self.notification_service.next_event().fuse() => {
if let Some(event) = event {
self.handle_notification_event(event)
} else {
// `Notifications` has seemingly closed. Closing as well.
return
}
}
}
}
}
fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream {
peer,
handshake,
result_tx,
..
} => {
// only accept peers whose role can be determined
let result = self
.network
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let duniter_validation = DuniterStreamValidationResult::from(result);
self.events_reporter
.report_event(DuniterPeeringEvent::StreamValidation(
peer,
duniter_validation.clone(),
));
let _ = result_tx.send(duniter_validation.into());
}
NotificationEvent::NotificationStreamOpened {
peer, handshake, ..
} => {
let Some(role) = self.network.peer_role(peer, handshake) else {
debug!(target: "duniter-libp2p", "[{}] role for {peer} couldn't be determined", self.network.local_peer_id());
return;
};
let _was_in = self.peers.insert(
peer,
Peer {
sent_peering: false,
known_peering: None,
},
);
debug_assert!(_was_in.is_none());
self.events_reporter
.report_event(DuniterPeeringEvent::StreamOpened(peer, role));
}
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
self.events_reporter
.report_event(DuniterPeeringEvent::StreamClosed(peer));
}
NotificationEvent::NotificationReceived { peer, notification } => {
if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) {
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, true));
self.on_peering(peer, peering);
} else {
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, false));
self.network.report_peer(peer, rep::BAD_PEERING);
}
}
}
}
/// Called when peer sends us new peerings
fn on_peering(&mut self, who: PeerId, peering: Peering) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if peer.known_peering.is_some() {
// Peering has already been received for this peer. Only one is allowed per connection.
self.network.report_peer(who, rep::BAD_PEERING);
self.events_reporter
.report_event(DuniterPeeringEvent::AlreadyReceivedPeering(who));
} else {
peer.known_peering = Some(peering.clone());
self.events_reporter
.report_event(DuniterPeeringEvent::GoodPeering(who, peering.clone()));
self.network.report_peer(who, rep::GOOD_PEERING);
}
}
}
async fn handle_command(&mut self, cmd: DuniterPeeringCommand) {
match cmd {
DuniterPeeringCommand::SendPeering(peer, peering) => {
debug!(target: "duniter-libp2p", "[{}]Sending COMMANDED self peering to {}", self.network.local_peer_id(), peer);
match self
.notification_service
.send_async_notification(&peer, peering.encode())
.await
{
Ok(_) => {
self.events_reporter.report_event(
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, peering),
);
}
Err(e) => {
self.events_reporter.report_event(
DuniterPeeringEvent::SelfPeeringPropagationFailed(
peer,
peering,
e.to_string(),
),
);
}
}
}
};
}
}
mod rep {
use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us an peering that we didn't know about.
pub const GOOD_PEERING: Rep = Rep::new(1 << 7, "Good peering");
/// Reputation change when a peer sends us a bad peering.
pub const BAD_PEERING: Rep = Rep::new(-(1 << 12), "Bad peering");
}
pub(crate) mod handler;
pub(crate) mod rpc;
#[cfg(test)]
mod tests;
mod types;
use crate::endpoint_gossip::duniter_peering_protocol_name::NAME;
use codec::{Decode, Encode};
use sc_network::{
config::{PeerStoreProvider, SetConfig},
types::ProtocolName,
NetworkBackend, NotificationMetrics, NotificationService, MAX_RESPONSE_SIZE,
};
use serde::{Deserialize, Serialize};
use sp_api::__private::BlockT;
use std::{sync::Arc, time};
pub mod well_known_endpoint_types {
pub const RPC: &str = "rpc";
pub const SQUID: &str = "squid";
}
#[derive(Debug, Encode, Decode)]
enum EndpointMsg {
/// RPC endpoint of Duniter
RpcEndpoint(String),
/// GraphQL endpoint of Squid
SquidEndpoint(String),
}
pub struct DuniterPeeringParams {
/// Handle that is used to communicate with `sc_network::Notifications`.
pub notification_service: Box<dyn NotificationService>,
}
/// Maximum allowed size for a transactions notification.
pub(crate) const MAX_GOSSIP_SIZE: u64 = MAX_RESPONSE_SIZE;
/// Interval at which we propagate gossips;
pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_secs(1);
pub mod duniter_peering_protocol_name {
pub(crate) const NAME: &str = "duniter-peerings/1";
}
impl DuniterPeeringParams {
/// Create a new instance.
pub fn new<
Hash: AsRef<[u8]>,
Block: BlockT,
Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
>(
genesis_hash: Hash,
fork_id: Option<&str>,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, Net::NotificationProtocolConfig) {
let genesis_hash = genesis_hash.as_ref();
let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
format!(
"/{}/{}/{}",
array_bytes::bytes2hex("", genesis_hash),
fork_id,
NAME,
)
} else {
format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), NAME)
}
.into();
let (config, notification_service) = Net::notification_config(
protocol_name.clone(),
vec![format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), NAME).into()],
MAX_GOSSIP_SIZE,
None,
// Default config, allowing some non-reserved nodes to connect
SetConfig::default(),
metrics,
peer_store_handle,
);
(
Self {
notification_service,
},
config,
)
}
}
/// Peer information
#[derive(Debug)]
struct Peer {
/// Holds a set of transactions known to this peer.
known_peering: Option<Peering>,
sent_peering: bool,
}
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct DuniterEndpoint {
/// The name of the endpoint (e.g. "rpc" or "squid") are well-known names
pub protocol: String,
/// The endpoint itself (e.g. "squid.example.com/v1/graphql")
pub address: String,
}
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct Peering {
endpoints: Vec<DuniterEndpoint>,
}
//! # Duniter Peering RPC API
//!
//! Exposes the `duniter_peerings` RPC method.
use crate::endpoint_gossip::rpc::{data::DuniterPeeringsData, state::DuniterPeeringsState};
use jsonrpsee::{core::async_trait, proc_macros::rpc, Extensions};
use sc_consensus_babe_rpc::Error;
/// The exposed RPC methods
#[rpc(client, server)]
pub trait DuniterPeeringRpcApi {
/// Returns the known peerings list received by network gossips
#[method(name = "duniter_peerings", with_extensions)]
async fn duniter_peerings(&self) -> Result<Option<DuniterPeeringsData>, Error>;
}
/// API implementation
pub struct DuniterPeeringRpcApiImpl {
shared_peer_state: DuniterPeeringsState,
}
impl DuniterPeeringRpcApiImpl {
/// Creates a new instance of the Duniter Peering Rpc handler.
pub fn new(shared_peer_state: DuniterPeeringsState) -> Self {
Self { shared_peer_state }
}
}
#[async_trait]
impl DuniterPeeringRpcApiServer for DuniterPeeringRpcApiImpl {
async fn duniter_peerings(
&self,
_ext: &Extensions,
) -> Result<Option<DuniterPeeringsData>, Error> {
let option = self.shared_peer_state.peer_state();
Ok(option)
}
}
use crate::endpoint_gossip::rpc::state::PeeringWithId;
use jsonrpsee::core::Serialize;
use serde::Deserialize;
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(Debug))]
pub struct DuniterPeeringsData {
pub peerings: Vec<PeeringWithId>,
}
impl DuniterPeeringsData {
pub fn push(&mut self, peering: PeeringWithId) {
self.peerings.push(peering);
}
}
//! # RPC for peering
//!
//! This module gathers all known peering documents for connected peers in memory and provides
//! an RPC interface to query them.
//!
//! ## RPC methods
//!
//! Currently, only one RPC method is available to query the currently known peerings.
//! In the future, the RPC interface could add methods to dynamically change the current node's peering
//! without restarting the node.
//!
//! ### `duniter_peerings`
//!
//! Returns the known peerings list received by network gossips.
//!
//! ```json
//! {
//! "jsonrpc": "2.0",
//! "id": 0,
//! "result": {
//! "peers": [
//! {
//! "endpoints": [
//! "/rpc/wss://gdev.example.com",
//! "/squid/https://squid.gdev.gyroi.de/v1/graphql"
//! ]
//! },
//! {
//! "endpoints": [
//! "/rpc/ws://gdev.example.com:9944"
//! ]
//! }
//! ]
//! }
//! }
//! ```
//!
pub mod api;
pub mod data;
pub mod state;
#[cfg(test)]
mod tests;
use crate::endpoint_gossip::{
handler::DuniterPeeringEvent, rpc::data::DuniterPeeringsData, DuniterEndpoint,
};
use codec::{Decode, Encode};
use futures::StreamExt;
use jsonrpsee::core::Serialize;
use parking_lot::RwLock;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use serde::Deserialize;
use std::sync::Arc;
/// A struct to hold a peer endpoints along with its id for RPC exposure.
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct PeeringWithId {
pub peer_id: String,
pub endpoints: Vec<DuniterEndpoint>,
}
#[derive(Clone)]
pub struct DuniterPeeringsState {
inner: Arc<RwLock<Option<Box<DuniterPeeringsData>>>>,
}
/// Dummy CRUD operations for the state to be exposed, plus a listening sink to be notified of
/// network events and automatically insert/remove peers from the state.
impl DuniterPeeringsState {
pub fn empty() -> Self {
Self {
inner: Arc::new(RwLock::new(Some(Box::new(DuniterPeeringsData {
peerings: Vec::new(),
})))),
}
}
pub fn insert(&self, peering: PeeringWithId) -> &Self {
self.inner.write().as_mut().map(|vs| {
vs.peerings.push(peering);
});
&self
}
pub fn remove(&self, peer_id: String) -> &Self {
self.inner.write().as_mut().map(|vs| {
vs.peerings.retain(|p| p.peer_id != peer_id);
});
&self
}
pub fn peer_state(&self) -> Option<DuniterPeeringsData> {
self.inner.read().as_ref().map(|vs| vs.as_ref().clone())
}
/// Creates a channel for binding to the network events.
pub fn listen(&self) -> TracingUnboundedSender<DuniterPeeringEvent> {
let (sink, mut stream) = tracing_unbounded("mpsc_duniter_peering_rpc_stream", 1_000);
let state = self.clone();
tokio::spawn(async move {
while let Some(event) = stream.next().await {
match event {
DuniterPeeringEvent::GoodPeering(who, peering) => {
state.insert(PeeringWithId {
peer_id: who.to_base58(),
endpoints: peering.endpoints,
});
}
DuniterPeeringEvent::StreamClosed(who) => {
state.remove(who.to_base58());
}
_ => {}
}
}
});
sink
}
}
use crate::endpoint_gossip::{
rpc::{
api::{DuniterPeeringRpcApiImpl, DuniterPeeringRpcApiServer},
state::{DuniterPeeringsState, PeeringWithId},
},
well_known_endpoint_types::{RPC, SQUID},
DuniterEndpoint,
};
use jsonrpsee::RpcModule;
#[tokio::test]
async fn empty_peers_rpc_handler() {
let rpc = setup_io_handler();
let expected_response = r#"{"jsonrpc":"2.0","id":0,"result":{"peerings":[]}}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"duniter_peerings","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
#[tokio::test]
async fn expose_known_peers() {
let rpc = setup_new_rpc_with_initial_peerings(vec![
PeeringWithId {
peer_id: "12D3KooWRkDXunbB64VegYPCQaitcgtdtEtbsbd7f19nsS7aMjDp".into(),
endpoints: vec![
DuniterEndpoint {
protocol: RPC.into(),
address: "/rpc/wss://gdev.example.com".into(),
},
DuniterEndpoint {
protocol: SQUID.into(),
address: "/squid/https://squid.gdev.gyroi.de/v1/graphql".into(),
},
],
},
PeeringWithId {
peer_id: "12D3KooWFiUBo3Kjiryvrpz8b3kfNVk7baezhab7SHdfafgY7nmN".into(),
endpoints: vec![DuniterEndpoint {
protocol: RPC.into(),
address: "/rpc/ws://gdev.example.com:9944".into(),
}],
},
]);
let expected_response = r#"{"jsonrpc":"2.0","id":0,"result":{"peerings":[{"peer_id":"12D3KooWRkDXunbB64VegYPCQaitcgtdtEtbsbd7f19nsS7aMjDp","endpoints":[{"protocol":"rpc","address":"/rpc/wss://gdev.example.com"},{"protocol":"squid","address":"/squid/https://squid.gdev.gyroi.de/v1/graphql"}]},{"peer_id":"12D3KooWFiUBo3Kjiryvrpz8b3kfNVk7baezhab7SHdfafgY7nmN","endpoints":[{"protocol":"rpc","address":"/rpc/ws://gdev.example.com:9944"}]}]}}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"duniter_peerings","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
fn setup_io_handler() -> RpcModule<DuniterPeeringRpcApiImpl> {
DuniterPeeringRpcApiImpl::new(DuniterPeeringsState::empty()).into_rpc()
}
fn setup_new_rpc_with_initial_peerings(
peers: Vec<PeeringWithId>,
) -> RpcModule<DuniterPeeringRpcApiImpl> {
let state = DuniterPeeringsState::empty();
for peer in peers {
state.insert(peer);
}
DuniterPeeringRpcApiImpl::new(state).into_rpc()
}
use crate::{
endpoint_gossip,
endpoint_gossip::{
duniter_peering_protocol_name,
handler::{DuniterPeeringCommand, DuniterPeeringEvent},
well_known_endpoint_types::RPC,
DuniterEndpoint, Peering,
},
};
use futures::{future, stream, FutureExt, StreamExt};
use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
use sc_consensus::{
BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ImportResult,
ImportedAux,
};
use sc_network::{NetworkStateInfo, ObservedRole, PeerId};
use sc_network_test::{
Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, PeersClient, TestNetFactory,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::__private::BlockT;
use sp_consensus::Error as ConsensusError;
use sp_runtime::traits::Header;
use std::{future::Future, sync::Arc, task::Poll, time::Duration};
#[tokio::test]
async fn peering_is_forwarded_and_only_once_per_connection() {
let _ = env_logger::try_init();
let authorities_count = 3;
let full_count = 1;
let total_peers = authorities_count + full_count;
let mut net = DuniterPeeringTestNet::new(authorities_count, full_count);
tokio::spawn(start_network(&mut net, total_peers));
let net = Arc::new(Mutex::new(net));
// make sure the network is ready (each peering is received by all other peers)
let wait_for_all_peering_notifications =
watch_events_and_wait_for_all_peerings(total_peers, &net);
let wait_for = futures::future::join_all(wait_for_all_peering_notifications).map(|_| ());
tokio::time::timeout(Duration::from_secs(5), run_until_complete(wait_for, &net))
.await
.unwrap();
// rule: only one peering is accepted per connection (disconnecting/restarting allows to change the peering value)
let already_received = ensure_only_one_peering_is_accepted(&net);
tokio::time::timeout(
Duration::from_secs(5),
run_until_complete(already_received, &net),
)
.await
.unwrap();
}
fn ensure_only_one_peering_is_accepted(
net: &Arc<Mutex<DuniterPeeringTestNet>>,
) -> impl Future<Output = ()> {
let command_0 = net.lock().peer_commands[0].clone();
let stream1 = net.lock().peer_streams[1].clone();
let peer_id_0 = net.lock().peer_ids[0].clone();
let peer_id_1 = net.lock().peer_ids[1].clone();
let already_received = async move {
while let Some(event) = stream1.write().next().await {
match event {
DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
if peer == peer_id_0 {
// We did receive the peering from peer 0
break;
}
}
_ => {}
}
}
};
let already_received = futures::future::join_all(vec![already_received]).map(|_| ());
command_0
.unbounded_send(DuniterPeeringCommand::SendPeering(
peer_id_1,
Peering {
endpoints: vec![DuniterEndpoint {
protocol: RPC.into(),
address: "gdev.example.com:9944".into(),
}],
},
))
.unwrap();
already_received
}
fn watch_events_and_wait_for_all_peerings(
total_peers: usize,
net: &Arc<Mutex<DuniterPeeringTestNet>>,
) -> Vec<impl Future<Output = ()> + Sized> {
let mut peering_notifications = Vec::new();
for peer_id in 0..total_peers {
let local_peer_id = net.lock().peer_ids[peer_id];
let stream = net.lock().peer_streams[peer_id].clone();
peering_notifications.push(async move {
let mut identified = 0;
while let Some(event) = stream.write().next().await {
debug_event(event.clone(), local_peer_id);
match event {
DuniterPeeringEvent::GoodPeering(peer, _) => {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer);
identified += 1;
if identified == (total_peers - 1) {
// all peers identified
break;
}
},
_ => {}
}
}
warn!("All peers sent their peering");
})
}
peering_notifications
}
fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) {
match event {
DuniterPeeringEvent::StreamOpened(peer, role) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role));
}
DuniterPeeringEvent::StreamValidation(peer, result) => {
debug!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result);
}
DuniterPeeringEvent::StreamClosed(peer) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id);
}
DuniterPeeringEvent::GossipReceived(peer, success) => {
if success {
debug!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id);
} else {
debug!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id);
}
}
DuniterPeeringEvent::GoodPeering(peer, _) => {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer);
}
DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
debug!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer);
panic!("Received peering from the same peer twice");
}
DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => {
debug!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e);
panic!("Failed to propagate self peering");
}
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => {
debug!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer);
}
}
}
fn observed_role_to_str(role: ObservedRole) -> &'static str {
match role {
ObservedRole::Authority => "Authority",
ObservedRole::Full => "Full",
ObservedRole::Light => "Light",
}
}
// Spawns duniter nodes. Returns a future to spawn on the runtime.
fn start_network(net: &mut DuniterPeeringTestNet, peers: usize) -> impl Future<Output = ()> {
let nodes = stream::FuturesUnordered::new();
for peer_id in 0..peers {
let net_service = net.peers[peer_id].network_service().clone();
net.peer_ids.push(net_service.local_peer_id().clone());
let notification_service = net.peers[peer_id]
.take_notification_service(&format!("/{}", duniter_peering_protocol_name::NAME).into())
.unwrap();
let (rpc_sink, stream) = tracing_unbounded("mpsc_duniter_gossip_peering_test", 100_000);
let (command_tx, command_rx) =
tracing_unbounded("mpsc_duniter_gossip_peering_test_command", 100_000);
let handler = endpoint_gossip::handler::build::<Block, _>(
notification_service,
net_service,
rpc_sink,
Some(command_rx),
vec![],
);
// To send external commands to the handler (for tests or RPC commands).
net.peer_streams.push(Arc::new(RwLock::new(stream)));
net.peer_commands.push(command_tx);
let node = handler.run();
fn assert_send<T: Send>(_: &T) {}
assert_send(&node);
nodes.push(node);
}
nodes.for_each(|_| async move {})
}
#[derive(Default)]
struct DuniterPeeringTestNet {
// Peers
peers: Vec<DuniterPeeringPeer>,
// IDs of the peers
peer_ids: Vec<PeerId>,
// RX of the gossip events
peer_streams: Vec<Arc<RwLock<TracingUnboundedReceiver<DuniterPeeringEvent>>>>,
// TX to drive the handler (for tests or configuration)
peer_commands: Vec<TracingUnboundedSender<DuniterPeeringCommand>>,
}
type DuniterPeeringPeer = sc_network_test::Peer<PeerData, DuniterTestBlockImport>;
impl DuniterPeeringTestNet {
fn new(n_authority: usize, n_full: usize) -> Self {
let mut net = DuniterPeeringTestNet {
peers: Vec::with_capacity(n_authority + n_full),
peer_ids: Vec::new(),
peer_streams: Vec::new(),
peer_commands: Vec::new(),
};
for _ in 0..n_authority {
net.add_authority_peer();
}
for _ in 0..n_full {
net.add_full_peer();
}
net
}
fn add_authority_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
format!("/{}", duniter_peering_protocol_name::NAME).into()
],
is_authority: true,
..Default::default()
})
}
}
#[derive(Default)]
struct PeerData;
impl TestNetFactory for DuniterPeeringTestNet {
type BlockImport = DuniterTestBlockImport;
type PeerData = PeerData;
type Verifier = PassThroughVerifier;
fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier {
PassThroughVerifier::new(false) // use non-instant finality.
}
fn peer(&mut self, i: usize) -> &mut DuniterPeeringPeer {
&mut self.peers[i]
}
fn peers(&self) -> &Vec<DuniterPeeringPeer> {
&self.peers
}
fn peers_mut(&mut self) -> &mut Vec<DuniterPeeringPeer> {
&mut self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<DuniterPeeringPeer>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
fn make_block_import(
&self,
_client: PeersClient,
) -> (
BlockImportAdapter<Self::BlockImport>,
Option<BoxJustificationImport<Block>>,
Self::PeerData,
) {
(
BlockImportAdapter::new(DuniterTestBlockImport),
None,
PeerData::default(),
)
}
fn add_full_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
format!("/{}", duniter_peering_protocol_name::NAME).into()
],
is_authority: false,
..Default::default()
})
}
}
async fn run_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<DuniterPeeringTestNet>>) {
let drive_to_completion = futures::future::poll_fn(|cx| {
net.lock().poll(cx);
Poll::<()>::Pending
});
future::select(future, drive_to_completion).await;
}
#[derive(Clone)]
struct DuniterTestBlockImport;
/// Inspired by GrandpaBlockImport
#[async_trait::async_trait]
impl<Block: BlockT> BlockImport<Block> for DuniterTestBlockImport {
type Error = ConsensusError;
/// Fake check block, always succeeds.
async fn check_block(
&self,
_block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::Imported(ImportedAux {
is_new_best: true,
bad_justification: false,
clear_justification_requests: false,
header_only: false,
needs_justification: false,
}))
}
/// Fake import block, always succeeds.
async fn import_block(
&self,
block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
debug!("Importing block #{}", block.header.number());
Ok(ImportResult::Imported(ImportedAux {
is_new_best: true,
bad_justification: false,
clear_justification_requests: false,
header_only: false,
needs_justification: false,
}))
}
}
pub mod validation_result;
use sc_network::service::traits::ValidationResult;
use std::fmt::Display;
/// Clonable version of sc_network::service::traits::ValidationResult
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DuniterStreamValidationResult {
/// Accept inbound substream.
Accept,
/// Reject inbound substream.
Reject,
}
impl Display for DuniterStreamValidationResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DuniterStreamValidationResult::Accept => write!(f, "Accept"),
DuniterStreamValidationResult::Reject => write!(f, "Reject"),
}
}
}
impl From<ValidationResult> for DuniterStreamValidationResult {
fn from(result: ValidationResult) -> Self {
match result {
ValidationResult::Accept => DuniterStreamValidationResult::Accept,
ValidationResult::Reject => DuniterStreamValidationResult::Reject,
}
}
}
impl From<DuniterStreamValidationResult> for ValidationResult {
fn from(result: DuniterStreamValidationResult) -> Self {
match result {
DuniterStreamValidationResult::Accept => ValidationResult::Accept,
DuniterStreamValidationResult::Reject => ValidationResult::Reject,
}
}
}
......@@ -65,5 +65,6 @@
pub mod chain_spec;
pub mod cli;
pub mod command;
pub mod endpoint_gossip;
pub mod rpc;
pub mod service;
......@@ -24,6 +24,7 @@ mod chain_spec;
mod service;
pub(crate) mod cli;
mod command;
mod endpoint_gossip;
mod rpc;
fn main() -> sc_cli::Result<()> {
......
......@@ -22,6 +22,7 @@
#![warn(missing_docs)]
use crate::endpoint_gossip::rpc::{api::DuniterPeeringRpcApiServer, state::DuniterPeeringsState};
use common_runtime::{AccountId, Balance, Block, BlockNumber, Hash, Index};
use jsonrpsee::RpcModule;
use sc_consensus_babe::{BabeApi, BabeWorkerHandle};
......@@ -61,6 +62,13 @@ pub struct GrandpaDeps<B> {
pub finality_provider: Arc<FinalityProofProvider<B, Block>>,
}
/// Dependencies for DuniterPeering
#[derive(Clone)]
pub struct DuniterPeeringRpcModuleDeps {
/// The state of the DuniterPeering RPC module which will be exposed.
pub state: DuniterPeeringsState,
}
/// Full client dependencies.
pub struct FullDeps<C, P, SC, B> {
/// The client instance to use.
......@@ -77,6 +85,8 @@ pub struct FullDeps<C, P, SC, B> {
pub babe: Option<BabeDeps>,
/// GRANDPA specific dependencies.
pub grandpa: GrandpaDeps<B>,
/// DuniterPeering specific dependencies.
pub duniter_peering: DuniterPeeringRpcModuleDeps,
}
/// Instantiate all full RPC extensions.
......@@ -109,6 +119,7 @@ where
command_sink_opt,
babe,
grandpa,
duniter_peering: endpoint_gossip,
} = deps;
if let Some(babe) = babe {
......@@ -140,7 +151,7 @@ where
)?;
module.merge(System::new(client.clone(), pool).into_rpc())?;
module.merge(TransactionPayment::new(client).into_rpc())?;
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
if let Some(command_sink) = command_sink_opt {
// We provide the rpc handler with the sending end of the channel to allow the rpc
// send EngineCommands to the background block authorship task.
......@@ -151,6 +162,10 @@ where
// `YourRpcStruct` should have a reference to a client, which is needed
// to call into the runtime.
// `module.merge(YourRpcTrait::into_rpc(YourRpcStruct::new(ReferenceToClient, ...)))?;`
module.merge(
crate::endpoint_gossip::rpc::api::DuniterPeeringRpcApiImpl::new(endpoint_gossip.state)
.into_rpc(),
)?;
Ok(module)
}
......@@ -19,6 +19,14 @@
pub mod client;
use self::client::{Client, ClientHandle, RuntimeApiCollection};
use crate::{
endpoint_gossip::{
rpc::state::DuniterPeeringsState,
well_known_endpoint_types::{RPC, SQUID},
DuniterEndpoint,
},
rpc::DuniterPeeringRpcModuleDeps,
};
use async_io::Timer;
use common_runtime::Block;
use futures::{Stream, StreamExt};
......@@ -119,15 +127,19 @@ pub fn new_chain_ops(
),
ServiceError,
> {
let PartialComponents {
let (
PartialComponents {
client,
backend,
import_queue,
task_manager,
..
} = new_partial::<runtime_executor::runtime::RuntimeApi, runtime_executor::Executor>(
},
_duniter_config,
) = new_partial::<runtime_executor::runtime::RuntimeApi, runtime_executor::Executor>(
config,
manual_consensus,
Default::default(),
)?;
Ok((
Arc::new(Client::Client(client)),
......@@ -148,7 +160,9 @@ type FullGrandpaBlockImport<RuntimeApi, Executor> = sc_consensus_grandpa::Grandp
pub fn new_partial<RuntimeApi, Executor>(
config: &Configuration,
consensus_manual: bool,
duniter_options: crate::cli::DuniterConfigExtension,
) -> Result<
(
sc_service::PartialComponents<
FullClient<RuntimeApi, Executor>,
FullBackend,
......@@ -171,6 +185,8 @@ pub fn new_partial<RuntimeApi, Executor>(
Option<Telemetry>,
),
>,
crate::cli::DuniterConfigExtension,
),
ServiceError,
>
where
......@@ -281,7 +297,8 @@ where
(queue, Some(handle))
};
Ok(sc_service::PartialComponents {
Ok((
sc_service::PartialComponents {
client,
backend,
task_manager,
......@@ -296,7 +313,9 @@ where
grandpa_link,
telemetry,
),
})
},
duniter_options,
))
}
/// Builds a new service for a full client.
......@@ -307,6 +326,7 @@ pub fn new_full<
>(
config: Configuration,
sealing: crate::cli::Sealing,
duniter_options: crate::cli::DuniterConfigExtension,
) -> Result<TaskManager, ServiceError>
where
RuntimeApi: sp_api::ConstructRuntimeApi<Block, FullClient<RuntimeApi, Executor>>
......@@ -317,7 +337,8 @@ where
Executor: sc_executor::NativeExecutionDispatch + 'static,
Executor: sc_executor::sp_wasm_interface::HostFunctions + 'static,
{
let sc_service::PartialComponents {
let (
sc_service::PartialComponents {
client,
backend,
mut task_manager,
......@@ -326,17 +347,22 @@ where
select_chain,
transaction_pool,
other: (block_import, babe_link, babe_worker_handle, grandpa_link, mut telemetry),
} = new_partial::<RuntimeApi, Executor>(&config, sealing.is_manual_consensus())?;
},
duniter_options,
) = new_partial::<RuntimeApi, Executor>(
&config,
sealing.is_manual_consensus(),
duniter_options,
)?;
let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
&client
// genesis hash used in protocol names
let genesis_hash = client
.block_hash(0)
.ok()
.flatten()
.expect("Genesis block exists; qed"),
&config.chain_spec,
);
.expect("Genesis block exists; qed");
// shared network config
let mut net_config = sc_network::config::FullNetworkConfiguration::<
Block,
<Block as sp_runtime::traits::Block>::Hash,
......@@ -344,20 +370,35 @@ where
>::new(&config.network, config.prometheus_registry().cloned());
let metrics = N::register_notification_metrics(config.prometheus_registry());
let peer_store_handle = net_config.peer_store_handle();
// grandpa network config
let grandpa_protocol_name =
sc_consensus_grandpa::protocol_standard_name(&genesis_hash, &config.chain_spec);
let (grandpa_protocol_config, grandpa_notification_service) =
sc_consensus_grandpa::grandpa_peers_set_config::<_, N>(
grandpa_protocol_name.clone(),
metrics.clone(),
peer_store_handle,
peer_store_handle.clone(),
);
net_config.add_notification_protocol(grandpa_protocol_config);
let (duniter_peering_params, duniter_peering_config) =
crate::endpoint_gossip::DuniterPeeringParams::new::<_, Block, N>(
genesis_hash,
config.chain_spec.fork_id(),
metrics.clone(),
Arc::clone(&peer_store_handle),
);
net_config.add_notification_protocol(duniter_peering_config);
// warp sync network provider
let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
grandpa_link.shared_authority_set().clone(),
Vec::default(),
));
// build network service from params
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
......@@ -372,6 +413,7 @@ where
metrics,
})?;
// aliases
let role = config.role;
let force_authoring = config.force_authoring;
let backoff_authoring_blocks: Option<()> = None;
......@@ -580,6 +622,7 @@ where
let shared_voter_state = SharedVoterState::empty();
let finality_proof_provider =
FinalityProofProvider::new_for_service(backend.clone(), Some(shared_authority_set.clone()));
let shared_duniter_peerings_state = DuniterPeeringsState::empty();
let rpc_extensions_builder = {
let client = client.clone();
......@@ -591,6 +634,7 @@ where
keystore: keystore.clone(),
});
let rpc_setup = shared_voter_state.clone();
let state_clone = shared_duniter_peerings_state.clone();
Box::new(
move |subscription_task_executor: SubscriptionTaskExecutor| {
......@@ -601,6 +645,9 @@ where
subscription_executor: subscription_task_executor.clone(),
finality_provider: finality_proof_provider.clone(),
};
let endpoint_gossip_deps = DuniterPeeringRpcModuleDeps {
state: state_clone.clone(),
};
let deps = crate::rpc::FullDeps {
client: client.clone(),
......@@ -608,6 +655,7 @@ where
select_chain: select_chain.clone(),
babe: babe_deps.clone(),
grandpa: grandpa_deps,
duniter_peering: endpoint_gossip_deps,
command_sink_opt: command_sink_opt.clone(),
};
......@@ -660,8 +708,8 @@ where
let grandpa_config = sc_consensus_grandpa::GrandpaParams {
config: grandpa_config,
link: grandpa_link,
sync: sync_service,
network,
sync: sync_service.clone(),
network: network.clone(),
voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state,
......@@ -681,6 +729,33 @@ where
);
}
let mut duniter_endpoints = vec![];
if let Some(rpc_endoint) = duniter_options.public_rpc {
duniter_endpoints.push(DuniterEndpoint {
protocol: RPC.into(),
address: rpc_endoint.into(),
});
}
if let Some(squid_endoint) = duniter_options.public_squid {
duniter_endpoints.push(DuniterEndpoint {
protocol: SQUID.into(),
address: squid_endoint.into(),
});
}
task_manager.spawn_essential_handle().spawn_blocking(
"duniter-endpoint-gossip-handler",
Some("networking"),
crate::endpoint_gossip::handler::build::<Block, _>(
duniter_peering_params.notification_service,
network.clone(),
shared_duniter_peerings_state.listen(),
None, // We don't send command for now
duniter_endpoints,
)
.run(),
);
network_starter.start_network();
log::info!("***** Duniter has fully started *****");
......
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment