Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • nodes/rust/duniter-v2s
  • llaq/lc-core-substrate
  • pini-gh/duniter-v2s
  • vincentux/duniter-v2s
  • mildred/duniter-v2s
  • d0p1/duniter-v2s
  • bgallois/duniter-v2s
  • Nicolas80/duniter-v2s
8 results
Show changes
Commits on Source (2)
Showing
with 2204 additions and 1016 deletions
This diff is collapsed.
......@@ -105,6 +105,8 @@ simple_logger = { version = "4.3.3", default-features = false }
bincode = { version = "1.3.3", default-features = false }
dubp-wot = { version = "0.11.1", default-features = false }
flate2 = { version = "1.0.28", default-features = false }
array-bytes = { version = "6.2.2", default-features = false }
parking_lot = { version = "0.12.1" }
# Subxt
subxt = { git = 'https://github.com/duniter/subxt', branch = 'subxt-v0.38.0-duniter-substrate-v1.17.0', default-features = false }
......@@ -213,6 +215,9 @@ sc-telemetry = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch
sc-transaction-pool = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-basic-authorship = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network-sync = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-network-test = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sc-utils = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-keystore = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-storage = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
sp-timestamp = { git = 'https://github.com/duniter/duniter-polkadot-sdk', branch = 'duniter-substrate-v1.17.0', default-features = false }
......
......@@ -68,6 +68,9 @@ volumes:
| `DUNITER_DISABLE_PROMETHEUS` | Boolean to disable the Prometheus endpoint on port 9615. | `false` |
| `DUNITER_DISABLE_TELEMETRY` | Boolean to disable connecting to the Substrate telemetry server. | `false` |
| `DUNITER_PRUNING_PROFILE` | _ `default`<br> _ `archive`: keep all blocks and state blocks<br> \* `light`: keep only last 256 state blocks and last 14400 blocks (one day duration) | `default` |
| `DUNITER_PUBLIC_RPC` | The public RPC endpoint to gossip on the network and make available in the apps. | None |
| `DUNITER_PUBLIC_SQUID` | The public Squid graphql endpoint to gossip on the network and make available in the apps. | None |
| `DUNITER_PUBLIC_ENDPOINTS` | Path to a JSON file containing public endpoints to gossip on the network. The file should use the following format:<br>```{"endpoints": [ { "protocol": "rpc", "address": "wss://gdev.example.com" }, { "protocol": "squid", "address": "gdev.example.com/graphql/v1" }]}``` | None |
## Other Duniter options
......
......@@ -59,6 +59,21 @@ if [ -n "$DUNITER_PUBLIC_ADDR" ]; then
set -- "$@" --public-addr "$DUNITER_PUBLIC_ADDR"
fi
# Define public RPC endpoint (gossiped on the network)
if [ -n "$DUNITER_PUBLIC_RPC" ]; then
set -- "$@" --public-rpc "$DUNITER_PUBLIC_RPC"
fi
# Define public Squid endpoint (gossiped on the network)
if [ -n "$DUNITER_PUBLIC_SQUID" ]; then
set -- "$@" --public-squid "$DUNITER_PUBLIC_SQUID"
fi
# Define public endpoints from JSON file (gossiped on the network)
if [ -n "$DUNITER_PUBLIC_ENDPOINTS" ]; then
set -- "$@" --public-endpoints "$DUNITER_PUBLIC_ENDPOINTS"
fi
# Define listen address (inside docker)
if [ -n "$DUNITER_LISTEN_ADDR" ]; then
set -- "$@" --listen-addr "$DUNITER_LISTEN_ADDR"
......
......@@ -130,6 +130,9 @@ num-format = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
codec = { workspace = true }
array-bytes = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
# Local
......@@ -162,11 +165,13 @@ sc-consensus-manual-seal = { workspace = true, default-features = true }
sc-executor = { workspace = true, default-features = true }
sc-keystore = { workspace = true, default-features = true }
sc-network = { workspace = true, default-features = true }
sc-network-sync = { workspace = true, default-features = true }
sc-offchain = { workspace = true, default-features = true }
sc-rpc-api = { workspace = true, default-features = true }
sc-telemetry = { workspace = true, default-features = true }
sc-transaction-pool = { workspace = true, default-features = true }
sc-transaction-pool-api = { workspace = true, default-features = true }
sc-utils = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-authority-discovery = { workspace = true, default-features = true }
sp-block-builder = { workspace = true, default-features = true }
......@@ -190,6 +195,12 @@ sp-transaction-pool = { workspace = true, default-features = true }
sp-transaction-storage-proof = { workspace = true, default-features = true }
substrate-frame-rpc-system = { workspace = true, default-features = true }
[dev-dependencies]
sc-network-test = { workspace = true, default-features = true }
async-trait = { version = "0.1.79" }
env_logger = "0.10.2"
async-channel = "2.3.1"
[build-dependencies]
substrate-build-script-utils = { workspace = true, default-features = true }
......@@ -228,4 +239,4 @@ assets = [
{ source = "../resources/debian/duniter-mirror.service", dest = "/usr/lib/systemd/system/duniter-mirror.service", mode = "0644" },
{ source = "../resources/debian/duniter-smith.service", dest = "/usr/lib/systemd/system/duniter-smith.service", mode = "0644" },
{ source = "../resources/debian/duniter-smith.service", dest = "/usr/lib/systemd/system/distance-oracle.service", mode = "0644" },
]
]
\ No newline at end of file
......@@ -19,9 +19,14 @@ pub struct Cli {
#[clap(subcommand)]
pub subcommand: Option<Subcommand>,
/// substrate base options
#[clap(flatten)]
pub run: sc_cli::RunCmd,
/// duniter specific options
#[clap(flatten)]
pub duniter_options: DuniterConfigExtension,
/// How blocks should be sealed
///
/// Options are "production", "instant", "manual", or timer interval in milliseconds
......@@ -29,6 +34,33 @@ pub struct Cli {
pub sealing: crate::cli::Sealing,
}
/// add options specific to duniter client
#[derive(Debug, Default, Clone, clap::Parser)]
pub struct DuniterConfigExtension {
/// Public RPC endpoint to gossip on the network and make available in the apps.
#[arg(long)]
pub public_rpc: Option<String>,
/// Public Squid graphql endpoint to gossip on the network and make available in the apps.
#[arg(long)]
pub public_squid: Option<String>,
/// Public endpoints from a JSON file, using following format where `protocol` and `address` are
/// strings (value is free) :
///
/// ```json
/// {
/// "endpoints": [
/// { "protocol": "rpc", "address": "wss://gdev.example.com" },
/// { "protocol": "squid", "address": "gdev.example.com/graphql/v1" },
/// { "protocol": "other", "address": "gdev.example.com/other" }
/// ]
/// }
/// ```
#[arg(long, value_name = "JSON_FILE_PATH")]
pub public_endpoints: Option<String>,
}
#[derive(Debug, clap::Subcommand)]
pub enum Subcommand {
/// Build a chain specification.
......
......@@ -22,7 +22,7 @@ pub mod utils;
use crate::{
chain_spec,
cli::{Cli, Subcommand},
cli::{Cli, DuniterConfigExtension, Subcommand},
service,
service::{runtime_executor::Executor, RuntimeType},
};
......@@ -374,6 +374,7 @@ pub fn run() -> sc_cli::Result<()> {
}
None => {
let runner = cli.create_runner(&cli.run)?;
let duniter_options: DuniterConfigExtension = cli.duniter_options;
runner.run_node_until_exit(|mut config| async move {
// Force offchain worker and offchain indexing if we have the role Authority
if config.role.is_authority() {
......@@ -386,7 +387,7 @@ pub fn run() -> sc_cli::Result<()> {
service::runtime_executor::runtime::RuntimeApi,
Executor,
sc_network::Litep2pNetworkBackend,
>(config, cli.sealing)
>(config, cli.sealing, duniter_options)
.map_err(sc_cli::Error::Service)
}
})
......
use crate::endpoint_gossip::{
types::validation_result::DuniterStreamValidationResult, DuniterEndpoints, Peer, Peering,
PROPAGATE_TIMEOUT,
};
use codec::{Decode, Encode};
use futures::{stream, FutureExt, Stream, StreamExt};
use log::debug;
use sc_network::{
service::traits::{NotificationEvent, ValidationResult},
utils::interval,
NetworkEventStream, NetworkPeers, NetworkStateInfo, NotificationService, ObservedRole, PeerId,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::__private::BlockT;
use std::{collections::HashMap, marker::PhantomData, pin::Pin};
pub fn build<
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo + Clone,
>(
notification_service: Box<dyn NotificationService>,
network: N,
rpc_sink: TracingUnboundedSender<DuniterPeeringEvent>,
command_rx: Option<TracingUnboundedReceiver<DuniterPeeringCommand>>,
endpoints: DuniterEndpoints,
) -> GossipsHandler<B, N> {
let local_peer_id = network.local_peer_id();
GossipsHandler {
b: PhantomData::<B>,
notification_service,
propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
network,
peers: HashMap::new(),
command_rx: command_rx.unwrap_or_else(|| {
let (_tx, rx) = tracing_unbounded("mpsc_duniter_peering_rpc_command", 1_000);
rx
}),
self_peering: Peering { endpoints },
events_reporter: DuniterEventsReporter {
sink: rpc_sink,
local_peer_id,
},
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum DuniterPeeringEvent {
StreamOpened(PeerId, ObservedRole),
StreamValidation(PeerId, DuniterStreamValidationResult),
StreamClosed(PeerId),
/// Received gossip from a peer, `bool` indicates whether the gossip was successfully decoded.
GossipReceived(PeerId, bool),
GoodPeering(PeerId, Peering),
AlreadyReceivedPeering(PeerId),
SelfPeeringPropagationSuccess(PeerId, Peering),
SelfPeeringPropagationFailed(PeerId, Peering, String),
}
pub enum DuniterPeeringCommand {
/// Send a peering to a peer.
#[allow(dead_code)] // only used in tests for now, maybe in the future by RPC
SendPeering(PeerId, Peering),
}
struct DuniterEventsReporter {
sink: TracingUnboundedSender<DuniterPeeringEvent>,
local_peer_id: PeerId,
}
impl DuniterEventsReporter {
/// Report an event for monitoring purposes (logs + unit tests).
fn report_event(&self, event: DuniterPeeringEvent) {
self.sink.unbounded_send(event.clone())
.unwrap_or_else(|e| {
log::error!(target: "duniter-libp2p", "[{}] Failed to send notification: {}", self.local_peer_id, e);
})
}
}
/// Handler for gossips. Call [`GossipsHandler::run`] to start the processing.
pub struct GossipsHandler<
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo,
> {
b: PhantomData<B>,
/// Interval at which we try to propagate our peering
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
/// Network service to use to send messages and manage peers.
network: N,
/// All connected peers and their known peering.
peers: HashMap<PeerId, Peer>,
/// The interal peering of the node.
self_peering: Peering,
/// Internal sink to report events.
events_reporter: DuniterEventsReporter,
/// Receiver for external commands (tests/RPC methods).
command_rx: TracingUnboundedReceiver<DuniterPeeringCommand>,
/// Handle that is used to communicate with `sc_network::Notifications`.
notification_service: Box<dyn NotificationService>,
}
impl<B, N> GossipsHandler<B, N>
where
B: BlockT + 'static,
N: NetworkPeers + NetworkEventStream + NetworkStateInfo,
{
/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
/// interrupted.
pub async fn run(mut self) {
// Share self peering do listeners of current handler
self.events_reporter
.report_event(DuniterPeeringEvent::GoodPeering(
self.network.local_peer_id(),
self.self_peering.clone(),
));
// Then start the network loop
loop {
futures::select! {
_ = self.propagate_timeout.next() => {
for (peer, peer_data) in self.peers.iter_mut() {
if !peer_data.sent_peering {
match self.notification_service.send_async_notification(peer, self.self_peering.encode()).await {
Ok(_) => {
peer_data.sent_peering = true;
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationSuccess(*peer, self.self_peering.clone()));
}
Err(e) => {
self.events_reporter.report_event(DuniterPeeringEvent::SelfPeeringPropagationFailed(*peer, self.self_peering.clone(), e.to_string()));
}
}
}
}
},
command = self.command_rx.next().fuse() => {
if let Some(command) = command {
self.handle_command(command).await
}
},
event = self.notification_service.next_event().fuse() => {
if let Some(event) = event {
self.handle_notification_event(event)
} else {
// `Notifications` has seemingly closed. Closing as well.
return
}
}
}
}
}
fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream {
peer,
handshake,
result_tx,
..
} => {
// only accept peers whose role can be determined
let result = self
.network
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let duniter_validation = DuniterStreamValidationResult::from(result);
self.events_reporter
.report_event(DuniterPeeringEvent::StreamValidation(
peer,
duniter_validation.clone(),
));
let _ = result_tx.send(duniter_validation.into());
}
NotificationEvent::NotificationStreamOpened {
peer, handshake, ..
} => {
let Some(role) = self.network.peer_role(peer, handshake) else {
debug!(target: "duniter-libp2p", "[{}] role for {peer} couldn't be determined", self.network.local_peer_id());
return;
};
let _was_in = self.peers.insert(
peer,
Peer {
sent_peering: false,
known_peering: None,
},
);
debug_assert!(_was_in.is_none());
self.events_reporter
.report_event(DuniterPeeringEvent::StreamOpened(peer, role));
}
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
self.events_reporter
.report_event(DuniterPeeringEvent::StreamClosed(peer));
}
NotificationEvent::NotificationReceived { peer, notification } => {
if let Ok(peering) = <Peering as Decode>::decode(&mut notification.as_ref()) {
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, true));
self.on_peering(peer, peering);
} else {
self.events_reporter
.report_event(DuniterPeeringEvent::GossipReceived(peer, false));
self.network.report_peer(peer, rep::BAD_PEERING);
}
}
}
}
/// Called when peer sends us new peerings
fn on_peering(&mut self, who: PeerId, peering: Peering) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if peer.known_peering.is_some() {
// Peering has already been received for this peer. Only one is allowed per connection.
self.network.report_peer(who, rep::BAD_PEERING);
self.events_reporter
.report_event(DuniterPeeringEvent::AlreadyReceivedPeering(who));
} else {
peer.known_peering = Some(peering.clone());
self.events_reporter
.report_event(DuniterPeeringEvent::GoodPeering(who, peering.clone()));
self.network.report_peer(who, rep::GOOD_PEERING);
}
}
}
async fn handle_command(&mut self, cmd: DuniterPeeringCommand) {
match cmd {
DuniterPeeringCommand::SendPeering(peer, peering) => {
debug!(target: "duniter-libp2p", "[{}]Sending COMMANDED self peering to {}", self.network.local_peer_id(), peer);
match self
.notification_service
.send_async_notification(&peer, peering.encode())
.await
{
Ok(_) => {
self.events_reporter.report_event(
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, peering),
);
}
Err(e) => {
self.events_reporter.report_event(
DuniterPeeringEvent::SelfPeeringPropagationFailed(
peer,
peering,
e.to_string(),
),
);
}
}
}
};
}
}
mod rep {
use sc_network::ReputationChange as Rep;
/// Reputation change when a peer sends us an peering that we didn't know about.
pub const GOOD_PEERING: Rep = Rep::new(1 << 7, "Good peering");
/// Reputation change when a peer sends us a bad peering.
pub const BAD_PEERING: Rep = Rep::new(-(1 << 12), "Bad peering");
}
pub(crate) mod handler;
pub(crate) mod rpc;
#[cfg(test)]
mod tests;
mod types;
use crate::endpoint_gossip::duniter_peering_protocol_name::NAME;
use codec::{Decode, Encode};
use frame_benchmarking::__private::traits::ConstU32;
use sc_network::{
config::{PeerStoreProvider, SetConfig},
types::ProtocolName,
NetworkBackend, NotificationMetrics, NotificationService, MAX_RESPONSE_SIZE,
};
use serde::{Deserialize, Serialize};
use sp_api::__private::BlockT;
use sp_core::bounded_vec::BoundedVec;
use std::{sync::Arc, time};
pub mod well_known_endpoint_types {
pub const RPC: &str = "rpc";
pub const SQUID: &str = "squid";
}
pub struct DuniterPeeringParams {
/// Handle that is used to communicate with `sc_network::Notifications`.
pub notification_service: Box<dyn NotificationService>,
}
/// Maximum allowed size for a transactions notification.
pub(crate) const MAX_GOSSIP_SIZE: u64 = MAX_RESPONSE_SIZE;
/// Interval at which we propagate gossips;
pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_secs(1);
pub mod duniter_peering_protocol_name {
pub(crate) const NAME: &str = "duniter-peerings/1";
}
impl DuniterPeeringParams {
/// Create a new instance.
pub fn new<
Hash: AsRef<[u8]>,
Block: BlockT,
Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
>(
genesis_hash: Hash,
fork_id: Option<&str>,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, Net::NotificationProtocolConfig) {
let genesis_hash = genesis_hash.as_ref();
let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
format!(
"/{}/{}/{}",
array_bytes::bytes2hex("", genesis_hash),
fork_id,
NAME,
)
} else {
format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), NAME)
}
.into();
let (config, notification_service) = Net::notification_config(
protocol_name.clone(),
vec![format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), NAME).into()],
MAX_GOSSIP_SIZE,
None,
// Default config, allowing some non-reserved nodes to connect
SetConfig::default(),
metrics,
peer_store_handle,
);
(
Self {
notification_service,
},
config,
)
}
}
/// Peer information
#[derive(Debug)]
struct Peer {
/// Holds a set of transactions known to this peer.
known_peering: Option<Peering>,
sent_peering: bool,
}
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct DuniterEndpoint {
/// The name of the endpoint (e.g. "rpc" or "squid") are well-known names
pub protocol: String,
/// The endpoint itself (e.g. "squid.example.com/v1/graphql")
pub address: String,
}
pub type DuniterEndpoints = BoundedVec<DuniterEndpoint, ConstU32<10>>;
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct Peering {
pub endpoints: DuniterEndpoints,
}
//! # Duniter Peering RPC API
//!
//! Exposes the `duniter_peerings` RPC method.
use crate::endpoint_gossip::rpc::{data::DuniterPeeringsData, state::DuniterPeeringsState};
use jsonrpsee::{core::async_trait, proc_macros::rpc, Extensions};
use sc_consensus_babe_rpc::Error;
/// The exposed RPC methods
#[rpc(client, server)]
pub trait DuniterPeeringRpcApi {
/// Returns the known peerings list received by network gossips
#[method(name = "duniter_peerings", with_extensions)]
async fn duniter_peerings(&self) -> Result<Option<DuniterPeeringsData>, Error>;
}
/// API implementation
pub struct DuniterPeeringRpcApiImpl {
shared_peer_state: DuniterPeeringsState,
}
impl DuniterPeeringRpcApiImpl {
/// Creates a new instance of the Duniter Peering Rpc handler.
pub fn new(shared_peer_state: DuniterPeeringsState) -> Self {
Self { shared_peer_state }
}
}
#[async_trait]
impl DuniterPeeringRpcApiServer for DuniterPeeringRpcApiImpl {
async fn duniter_peerings(
&self,
_ext: &Extensions,
) -> Result<Option<DuniterPeeringsData>, Error> {
let option = self.shared_peer_state.peer_state();
Ok(option)
}
}
use crate::endpoint_gossip::rpc::state::PeeringWithId;
use jsonrpsee::core::Serialize;
use serde::Deserialize;
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(Debug))]
pub struct DuniterPeeringsData {
pub peerings: Vec<PeeringWithId>,
}
//! # RPC for peering
//!
//! This module gathers all known peering documents for connected peers in memory and provides
//! an RPC interface to query them.
//!
//! ## RPC methods
//!
//! Currently, only one RPC method is available to query the currently known peerings.
//! In the future, the RPC interface could add methods to dynamically change the current node's peering
//! without restarting the node.
//!
//! ### `duniter_peerings`
//!
//! Returns the known peerings list received by network gossips.
//!
//! ```json
//! {
//! "jsonrpc": "2.0",
//! "id": 0,
//! "result": {
//! "peers": [
//! {
//! "endpoints": [
//! "/rpc/wss://gdev.example.com",
//! "/squid/https://squid.gdev.gyroi.de/v1/graphql"
//! ]
//! },
//! {
//! "endpoints": [
//! "/rpc/ws://gdev.example.com:9944"
//! ]
//! }
//! ]
//! }
//! }
//! ```
//!
pub mod api;
pub mod data;
pub mod state;
#[cfg(test)]
mod tests;
use crate::endpoint_gossip::{
handler::DuniterPeeringEvent, rpc::data::DuniterPeeringsData, DuniterEndpoints,
};
use codec::{Decode, Encode};
use futures::StreamExt;
use jsonrpsee::core::Serialize;
use parking_lot::RwLock;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use serde::Deserialize;
use std::sync::Arc;
/// A struct to hold a peer endpoints along with its id for RPC exposure.
#[derive(Encode, Decode, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct PeeringWithId {
pub peer_id: String,
pub endpoints: DuniterEndpoints,
}
#[derive(Clone)]
pub struct DuniterPeeringsState {
inner: Arc<RwLock<Option<Box<DuniterPeeringsData>>>>,
}
/// Dummy CRUD operations for the state to be exposed, plus a listening sink to be notified of
/// network events and automatically insert/remove peers from the state.
impl DuniterPeeringsState {
pub fn empty() -> Self {
Self {
inner: Arc::new(RwLock::new(Some(Box::new(DuniterPeeringsData {
peerings: Vec::new(),
})))),
}
}
pub fn insert(&self, peering: PeeringWithId) -> &Self {
if let Some(vs) = self.inner.write().as_mut() {
vs.peerings.push(peering);
}
self
}
pub fn remove(&self, peer_id: String) -> &Self {
if let Some(vs) = self.inner.write().as_mut() {
vs.peerings.retain(|p| p.peer_id != peer_id);
}
self
}
pub fn peer_state(&self) -> Option<DuniterPeeringsData> {
self.inner.read().as_ref().map(|vs| vs.as_ref().clone())
}
/// Creates a channel for binding to the network events.
pub fn listen(&self) -> TracingUnboundedSender<DuniterPeeringEvent> {
let (sink, stream) = tracing_unbounded("mpsc_duniter_peering_rpc_stream", 1_000);
let state = self.clone();
tokio::spawn(async move {
stream
.for_each(|event| async {
match event {
DuniterPeeringEvent::GoodPeering(who, peering) => {
state.insert(PeeringWithId {
peer_id: who.to_base58(),
endpoints: peering.endpoints,
});
}
DuniterPeeringEvent::StreamClosed(who) => {
state.remove(who.to_base58());
}
_ => {}
}
})
.await
});
sink
}
}
use crate::endpoint_gossip::{
rpc::{
api::{DuniterPeeringRpcApiImpl, DuniterPeeringRpcApiServer},
state::{DuniterPeeringsState, PeeringWithId},
},
well_known_endpoint_types::{RPC, SQUID},
DuniterEndpoint, DuniterEndpoints,
};
use jsonrpsee::RpcModule;
#[tokio::test]
async fn empty_peers_rpc_handler() {
let rpc = setup_io_handler();
let expected_response = r#"{"jsonrpc":"2.0","id":0,"result":{"peerings":[]}}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"duniter_peerings","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
#[tokio::test]
async fn expose_known_peers() {
let rpc = setup_new_rpc_with_initial_peerings(vec![
PeeringWithId {
peer_id: "12D3KooWRkDXunbB64VegYPCQaitcgtdtEtbsbd7f19nsS7aMjDp".into(),
endpoints: DuniterEndpoints::truncate_from(vec![
DuniterEndpoint {
protocol: RPC.into(),
address: "/rpc/wss://gdev.example.com".into(),
},
DuniterEndpoint {
protocol: SQUID.into(),
address: "/squid/https://squid.gdev.gyroi.de/v1/graphql".into(),
},
]),
},
PeeringWithId {
peer_id: "12D3KooWFiUBo3Kjiryvrpz8b3kfNVk7baezhab7SHdfafgY7nmN".into(),
endpoints: DuniterEndpoints::truncate_from(vec![DuniterEndpoint {
protocol: RPC.into(),
address: "/rpc/ws://gdev.example.com:9944".into(),
}]),
},
]);
let expected_response = r#"{"jsonrpc":"2.0","id":0,"result":{"peerings":[{"peer_id":"12D3KooWRkDXunbB64VegYPCQaitcgtdtEtbsbd7f19nsS7aMjDp","endpoints":[{"protocol":"rpc","address":"/rpc/wss://gdev.example.com"},{"protocol":"squid","address":"/squid/https://squid.gdev.gyroi.de/v1/graphql"}]},{"peer_id":"12D3KooWFiUBo3Kjiryvrpz8b3kfNVk7baezhab7SHdfafgY7nmN","endpoints":[{"protocol":"rpc","address":"/rpc/ws://gdev.example.com:9944"}]}]}}"#.to_string();
let request = r#"{"jsonrpc":"2.0","method":"duniter_peerings","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
fn setup_io_handler() -> RpcModule<DuniterPeeringRpcApiImpl> {
DuniterPeeringRpcApiImpl::new(DuniterPeeringsState::empty()).into_rpc()
}
fn setup_new_rpc_with_initial_peerings(
peers: Vec<PeeringWithId>,
) -> RpcModule<DuniterPeeringRpcApiImpl> {
let state = DuniterPeeringsState::empty();
for peer in peers {
state.insert(peer);
}
DuniterPeeringRpcApiImpl::new(state).into_rpc()
}
use crate::{
endpoint_gossip,
endpoint_gossip::{
duniter_peering_protocol_name,
handler::{DuniterPeeringCommand, DuniterPeeringEvent},
well_known_endpoint_types::RPC,
DuniterEndpoint, DuniterEndpoints, Peering,
},
};
use async_channel::Receiver;
use futures::{future, stream, FutureExt, StreamExt};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_consensus::{
BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ImportResult,
ImportedAux,
};
use sc_network::{NetworkStateInfo, ObservedRole, PeerId};
use sc_network_test::{
Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, PeersClient, TestNetFactory,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::__private::BlockT;
use sp_consensus::Error as ConsensusError;
use sp_runtime::traits::Header;
use std::{future::Future, pin::pin, sync::Arc, task::Poll, time::Duration};
#[tokio::test]
async fn peering_is_forwarded_and_only_once_per_connection() {
let _ = env_logger::try_init();
let authorities_count = 3;
let full_count = 1;
let total_peers = authorities_count + full_count;
let mut net = DuniterPeeringTestNet::new(authorities_count, full_count);
tokio::spawn(start_network(&mut net, total_peers));
let net = Arc::new(Mutex::new(net));
// make sure the network is ready (each peering is received by all other peers)
let wait_for_all_peering_notifications =
watch_events_and_wait_for_all_peerings(total_peers, &net);
let wait_for = futures::future::join_all(wait_for_all_peering_notifications).map(|_| ());
tokio::time::timeout(Duration::from_secs(5), run_until_complete(wait_for, &net))
.await
.unwrap();
// rule: only one peering is accepted per connection (disconnecting/restarting allows to change the peering value)
let already_received = ensure_only_one_peering_is_accepted(&net);
tokio::time::timeout(
Duration::from_secs(5),
run_until_complete(already_received, &net),
)
.await
.unwrap();
}
fn ensure_only_one_peering_is_accepted(
net: &Arc<Mutex<DuniterPeeringTestNet>>,
) -> impl Future<Output = ()> {
let command_0 = net.lock().peer_commands[0].clone();
let peer_id_0 = net.lock().peer_ids[0];
let peer_id_1 = net.lock().peer_ids[1];
let stream_1 = net.lock().peer_streams[1].clone();
let already_received = async move {
let mut stream1 = pin!(stream_1);
while let Some(event) = stream1.next().await {
if let DuniterPeeringEvent::AlreadyReceivedPeering(peer) = event {
if peer == peer_id_0 {
// We did receive the peering from peer 0
break;
}
}
}
};
let already_received = futures::future::join_all(vec![already_received]).map(|_| ());
command_0
.unbounded_send(DuniterPeeringCommand::SendPeering(
peer_id_1,
Peering {
endpoints: DuniterEndpoints::truncate_from(vec![DuniterEndpoint {
protocol: RPC.into(),
address: "gdev.example.com:9944".into(),
}]),
},
))
.unwrap();
already_received
}
fn watch_events_and_wait_for_all_peerings(
total_peers: usize,
net: &Arc<Mutex<DuniterPeeringTestNet>>,
) -> Vec<impl Future<Output = ()> + Sized> {
let mut peering_notifications = Vec::new();
for peer_id in 0..total_peers {
let local_peer_id = net.lock().peer_ids[peer_id];
let stream = net.lock().peer_streams[peer_id].clone();
peering_notifications.push(async move {
let mut identified = 0;
let mut stream = pin!(stream);
while let Some(event) = stream.next().await {
debug_event(event.clone(), local_peer_id);
if let DuniterPeeringEvent::GoodPeering(peer, _) = event {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}",local_peer_id, peer);
identified += 1;
if identified == (total_peers - 1) {
// all peers identified
break;
}
}
}
warn!("All peers sent their peering");
})
}
peering_notifications
}
fn debug_event(event: DuniterPeeringEvent, local_peer_id: PeerId) {
match event {
DuniterPeeringEvent::StreamOpened(peer, role) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} connected with role {}", local_peer_id, observed_role_to_str(role));
}
DuniterPeeringEvent::StreamValidation(peer, result) => {
debug!(target: "duniter-libp2p", "[{}] Validating inbound substream from {peer} with result {}", local_peer_id, result);
}
DuniterPeeringEvent::StreamClosed(peer) => {
debug!(target: "duniter-libp2p", "[{}] Peer {peer} disconnected", local_peer_id);
}
DuniterPeeringEvent::GossipReceived(peer, success) => {
if success {
debug!(target: "duniter-libp2p", "[{}] Received peering message from {peer}", local_peer_id);
} else {
debug!(target: "duniter-libp2p", "[{}] Failed to receive peering message from {peer}", local_peer_id);
}
}
DuniterPeeringEvent::GoodPeering(peer, _) => {
debug!(target: "duniter-libp2p", "[{}] Received peering from {}", local_peer_id, peer);
}
DuniterPeeringEvent::AlreadyReceivedPeering(peer) => {
debug!(target: "duniter-libp2p", "[{}] Already received peering from {}", local_peer_id, peer);
panic!("Received peering from the same peer twice");
}
DuniterPeeringEvent::SelfPeeringPropagationFailed(peer, _peering, e) => {
debug!(target: "duniter-libp2p", "[{}] Failed to propagate self peering to {}: {}", local_peer_id, peer, e);
panic!("Failed to propagate self peering");
}
DuniterPeeringEvent::SelfPeeringPropagationSuccess(peer, _peering) => {
debug!(target: "duniter-libp2p", "[{}] Successfully propagated self peering to {}", local_peer_id, peer);
}
}
}
fn observed_role_to_str(role: ObservedRole) -> &'static str {
match role {
ObservedRole::Authority => "Authority",
ObservedRole::Full => "Full",
ObservedRole::Light => "Light",
}
}
// Spawns duniter nodes. Returns a future to spawn on the runtime.
fn start_network(net: &mut DuniterPeeringTestNet, peers: usize) -> impl Future<Output = ()> {
let nodes = stream::FuturesUnordered::new();
for peer_id in 0..peers {
let net_service = net.peers[peer_id].network_service().clone();
net.peer_ids.push(net_service.local_peer_id());
let notification_service = net.peers[peer_id]
.take_notification_service(&format!("/{}", duniter_peering_protocol_name::NAME).into())
.unwrap();
let (rpc_sink, mut stream_unbounded) =
tracing_unbounded("mpsc_duniter_gossip_peering_test", 100_000);
let (sink_unbounded, stream) = async_channel::unbounded();
let (command_tx, command_rx) =
tracing_unbounded("mpsc_duniter_gossip_peering_test_command", 100_000);
// mapping from mpsc TracingUnboundedReceiver to mpmc Receiver
tokio::spawn(async move {
// forward the event
while let Some(command) = stream_unbounded.next().await {
sink_unbounded.send(command).await.unwrap();
}
});
let handler = endpoint_gossip::handler::build::<Block, _>(
notification_service,
net_service,
rpc_sink,
Some(command_rx),
DuniterEndpoints::new(),
);
// To send external commands to the handler (for tests or RPC commands).
net.peer_streams.push(stream);
net.peer_commands.push(command_tx);
let node = handler.run();
fn assert_send<T: Send>(_: &T) {}
assert_send(&node);
nodes.push(node);
}
nodes.for_each(|_| async move {})
}
#[derive(Default)]
struct DuniterPeeringTestNet {
// Peers
peers: Vec<DuniterPeeringPeer>,
// IDs of the peers
peer_ids: Vec<PeerId>,
// RX of the gossip events
peer_streams: Vec<Receiver<DuniterPeeringEvent>>,
// TX to drive the handler (for tests or configuration)
peer_commands: Vec<TracingUnboundedSender<DuniterPeeringCommand>>,
}
type DuniterPeeringPeer = sc_network_test::Peer<PeerData, DuniterTestBlockImport>;
impl DuniterPeeringTestNet {
fn new(n_authority: usize, n_full: usize) -> Self {
let mut net = DuniterPeeringTestNet {
peers: Vec::with_capacity(n_authority + n_full),
peer_ids: Vec::new(),
peer_streams: Vec::new(),
peer_commands: Vec::new(),
};
for _ in 0..n_authority {
net.add_authority_peer();
}
for _ in 0..n_full {
net.add_full_peer();
}
net
}
fn add_authority_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
format!("/{}", duniter_peering_protocol_name::NAME).into()
],
is_authority: true,
..Default::default()
})
}
}
#[derive(Default)]
struct PeerData;
impl TestNetFactory for DuniterPeeringTestNet {
type BlockImport = DuniterTestBlockImport;
type PeerData = PeerData;
type Verifier = PassThroughVerifier;
fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier {
PassThroughVerifier::new(false) // use non-instant finality.
}
fn peer(&mut self, i: usize) -> &mut DuniterPeeringPeer {
&mut self.peers[i]
}
fn peers(&self) -> &Vec<DuniterPeeringPeer> {
&self.peers
}
fn peers_mut(&mut self) -> &mut Vec<DuniterPeeringPeer> {
&mut self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<DuniterPeeringPeer>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
fn make_block_import(
&self,
_client: PeersClient,
) -> (
BlockImportAdapter<Self::BlockImport>,
Option<BoxJustificationImport<Block>>,
Self::PeerData,
) {
(
BlockImportAdapter::new(DuniterTestBlockImport),
None,
PeerData,
)
}
fn add_full_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
format!("/{}", duniter_peering_protocol_name::NAME).into()
],
is_authority: false,
..Default::default()
})
}
}
async fn run_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<DuniterPeeringTestNet>>) {
let drive_to_completion = futures::future::poll_fn(|cx| {
net.lock().poll(cx);
Poll::<()>::Pending
});
future::select(future, drive_to_completion).await;
}
#[derive(Clone)]
struct DuniterTestBlockImport;
/// Inspired by GrandpaBlockImport
#[async_trait::async_trait]
impl<Block: BlockT> BlockImport<Block> for DuniterTestBlockImport {
type Error = ConsensusError;
/// Fake check block, always succeeds.
async fn check_block(
&self,
_block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::Imported(ImportedAux {
is_new_best: true,
bad_justification: false,
clear_justification_requests: false,
header_only: false,
needs_justification: false,
}))
}
/// Fake import block, always succeeds.
async fn import_block(
&self,
block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
debug!("Importing block #{}", block.header.number());
Ok(ImportResult::Imported(ImportedAux {
is_new_best: true,
bad_justification: false,
clear_justification_requests: false,
header_only: false,
needs_justification: false,
}))
}
}
pub mod validation_result;
use sc_network::service::traits::ValidationResult;
use std::fmt::Display;
/// Clonable version of sc_network::service::traits::ValidationResult
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DuniterStreamValidationResult {
/// Accept inbound substream.
Accept,
/// Reject inbound substream.
Reject,
}
impl Display for DuniterStreamValidationResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DuniterStreamValidationResult::Accept => write!(f, "Accept"),
DuniterStreamValidationResult::Reject => write!(f, "Reject"),
}
}
}
impl From<ValidationResult> for DuniterStreamValidationResult {
fn from(result: ValidationResult) -> Self {
match result {
ValidationResult::Accept => DuniterStreamValidationResult::Accept,
ValidationResult::Reject => DuniterStreamValidationResult::Reject,
}
}
}
impl From<DuniterStreamValidationResult> for ValidationResult {
fn from(result: DuniterStreamValidationResult) -> Self {
match result {
DuniterStreamValidationResult::Accept => ValidationResult::Accept,
DuniterStreamValidationResult::Reject => ValidationResult::Reject,
}
}
}
......@@ -65,5 +65,6 @@
pub mod chain_spec;
pub mod cli;
pub mod command;
pub mod endpoint_gossip;
pub mod rpc;
pub mod service;
......@@ -24,6 +24,7 @@ mod chain_spec;
mod service;
pub(crate) mod cli;
mod command;
mod endpoint_gossip;
mod rpc;
fn main() -> sc_cli::Result<()> {
......
......@@ -22,6 +22,7 @@
#![warn(missing_docs)]
use crate::endpoint_gossip::rpc::{api::DuniterPeeringRpcApiServer, state::DuniterPeeringsState};
use common_runtime::{AccountId, Balance, Block, BlockNumber, Hash, Index};
use jsonrpsee::RpcModule;
use sc_consensus_babe::{BabeApi, BabeWorkerHandle};
......@@ -61,6 +62,13 @@ pub struct GrandpaDeps<B> {
pub finality_provider: Arc<FinalityProofProvider<B, Block>>,
}
/// Dependencies for DuniterPeering
#[derive(Clone)]
pub struct DuniterPeeringRpcModuleDeps {
/// The state of the DuniterPeering RPC module which will be exposed.
pub state: DuniterPeeringsState,
}
/// Full client dependencies.
pub struct FullDeps<C, P, SC, B> {
/// The client instance to use.
......@@ -77,6 +85,8 @@ pub struct FullDeps<C, P, SC, B> {
pub babe: Option<BabeDeps>,
/// GRANDPA specific dependencies.
pub grandpa: GrandpaDeps<B>,
/// DuniterPeering specific dependencies.
pub duniter_peering: DuniterPeeringRpcModuleDeps,
}
/// Instantiate all full RPC extensions.
......@@ -109,6 +119,7 @@ where
command_sink_opt,
babe,
grandpa,
duniter_peering: endpoint_gossip,
} = deps;
if let Some(babe) = babe {
......@@ -140,7 +151,7 @@ where
)?;
module.merge(System::new(client.clone(), pool).into_rpc())?;
module.merge(TransactionPayment::new(client).into_rpc())?;
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
if let Some(command_sink) = command_sink_opt {
// We provide the rpc handler with the sending end of the channel to allow the rpc
// send EngineCommands to the background block authorship task.
......@@ -151,6 +162,10 @@ where
// `YourRpcStruct` should have a reference to a client, which is needed
// to call into the runtime.
// `module.merge(YourRpcTrait::into_rpc(YourRpcStruct::new(ReferenceToClient, ...)))?;`
module.merge(
crate::endpoint_gossip::rpc::api::DuniterPeeringRpcApiImpl::new(endpoint_gossip.state)
.into_rpc(),
)?;
Ok(module)
}