Newer
Older
// Copyright 2021 Axiom-Team
//
// This file is part of Duniter-v2S.
// Duniter-v2S is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, version 3 of the License.
//
// Duniter-v2S is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with Duniter-v2S. If not, see <https://www.gnu.org/licenses/>.
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
use self::client::{Client, ClientHandle, RuntimeApiCollection};
use async_io::Timer;
use common_runtime::Block;
use futures::{Stream, StreamExt};
use sc_client_api::client::BlockBackend;
use sc_client_api::Backend;
use sc_consensus_grandpa::SharedVoterState;
use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
pub use sc_executor::WasmExecutor;
use sc_service::WarpSyncParams;
use sc_service::{error::Error as ServiceError, Configuration, PartialComponents, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_core::H256;
use sp_runtime::traits::BlakeTwo256;
use std::{sync::Arc, time::Duration};
#[cfg(not(feature = "runtime-benchmarks"))]
type HostFunctions = sp_io::SubstrateHostFunctions;
#[cfg(feature = "runtime-benchmarks")]
type HostFunctions = (
sp_io::SubstrateHostFunctions,
frame_benchmarking::benchmarking::HostFunctions,
);
type FullClient<RuntimeApi> =
sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
type FullBackend = sc_service::TFullBackend<Block>;
type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
Benjamin Gallois
committed
#[allow(dead_code)]
pub mod gdev_executor {
pub use gdev_runtime;
pub struct GDevExecutor;
impl sc_executor::NativeExecutionDispatch for GDevExecutor {
type ExtendHostFunctions = frame_benchmarking::benchmarking::HostFunctions;
fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
gdev_runtime::api::dispatch(method, data)
}
fn native_version() -> sc_executor::NativeVersion {
gdev_runtime::native_version()
}
Benjamin Gallois
committed
#[allow(dead_code)]
pub mod g1_executor {
pub use g1_runtime;
pub struct G1Executor;
impl sc_executor::NativeExecutionDispatch for G1Executor {
type ExtendHostFunctions = frame_benchmarking::benchmarking::HostFunctions;
fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
g1_runtime::api::dispatch(method, data)
}
fn native_version() -> sc_executor::NativeVersion {
g1_runtime::native_version()
}
Benjamin Gallois
committed
#[allow(dead_code)]
#[cfg(feature = "gtest")]
pub mod gtest_executor {
pub use gtest_runtime;
pub struct GTestExecutor;
impl sc_executor::NativeExecutionDispatch for GTestExecutor {
type ExtendHostFunctions = frame_benchmarking::benchmarking::HostFunctions;
fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
gtest_runtime::api::dispatch(method, data)
}
fn native_version() -> sc_executor::NativeVersion {
gtest_runtime::native_version()
}
///
/// The minimum period of blocks on which justifications will be
/// imported and generated.
const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
pub enum RuntimeType {
G1,
GDev,
GTest,
/// Can be called for a `Configuration` to check if it is a configuration for
/// a particular runtime type.
pub trait IdentifyRuntimeType {
/// Returns the runtime type
fn runtime_type(&self) -> RuntimeType;
}
impl IdentifyRuntimeType for Box<dyn sc_chain_spec::ChainSpec> {
fn runtime_type(&self) -> RuntimeType {
if self.id().starts_with("g1") {
RuntimeType::G1
} else if self.id().starts_with("dev") || self.id().starts_with("gdev") {
RuntimeType::GDev
} else if self.id().starts_with("gtest") {
RuntimeType::GTest
} else {
panic!("unknown runtime")
}
}
}
/// Builds a new object suitable for chain operations.
config: &Configuration,
) -> Result<
(
Arc<Client>,
Arc<FullBackend>,
sc_consensus::BasicQueue<Block>,
TaskManager,
),
ServiceError,
> {
match config.chain_spec.runtime_type() {
RuntimeType::G1::G1 => {
let PartialComponents {
client,
backend,
import_queue,
task_manager,
..
} = new_partial::<g1_runtime::RuntimeApi>(config, manual_consensus)?;
Ok((
Arc::new(Client::G1(client)),
backend,
import_queue,
task_manager,
))
}
#[cfg(feature = "gtest")]
RuntimeType::GTest => {
let PartialComponents {
client,
backend,
import_queue,
task_manager,
..
} = new_partial::<gtest_runtime::RuntimeApi>(config, manual_consensus)?;
Ok((
Arc::new(Client::GTest(client)),
backend,
import_queue,
task_manager,
))
}
#[cfg(feature = "gdev")]
RuntimeType::GDev => {
let PartialComponents {
client,
backend,
import_queue,
task_manager,
..
} = new_partial::<gdev_runtime::RuntimeApi>(config, manual_consensus)?;
Ok((
Arc::new(Client::GDev(client)),
backend,
import_queue,
task_manager,
))
}
_ => panic!("unknown runtime"),
type FullGrandpaBlockImport<RuntimeApi> = sc_consensus_grandpa::GrandpaBlockImport<
FullClient<RuntimeApi>,
#[allow(clippy::type_complexity)]
pub fn new_partial<RuntimeApi>(
FullClient<RuntimeApi>,
sc_consensus::DefaultImportQueue<Block>,
sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>,
sc_consensus_babe::BabeBlockImport<
FullClient<RuntimeApi>,
FullGrandpaBlockImport<RuntimeApi>,
sc_consensus_babe::BabeLink<Block>,
Option<sc_consensus_babe::BabeWorkerHandle<Block>>,
sc_consensus_grandpa::LinkHalf<Block, FullClient<RuntimeApi>, FullSelectChain>,
RuntimeApi: sp_api::ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: RuntimeApiCollection,
let telemetry = config
.telemetry_endpoints
.clone()
.filter(|x| !x.is_empty())
.map(|endpoints| -> Result<_, sc_telemetry::Error> {
let worker = TelemetryWorker::new(16)?;
let telemetry = worker.handle().new_telemetry(endpoints);
Ok((worker, telemetry))
})
.transpose()?;
let executor = sc_service::new_wasm_executor(config);
sc_service::new_full_parts::<Block, RuntimeApi, _>(
config,
)?;
let client = Arc::new(client);
let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager
.spawn_handle()
.spawn("telemetry", None, worker.run());
telemetry
});
let select_chain = sc_consensus::LongestChain::new(backend.clone());
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
let client_ = client.clone();
let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
GRANDPA_JUSTIFICATION_PERIOD,
&(client_ as Arc<_>),
select_chain.clone(),
telemetry.as_ref().map(|x| x.handle()),
)?;
let justification_import = grandpa_block_import.clone();
let (babe_block_import, babe_link) = sc_consensus_babe::block_import(
sc_consensus_babe::configuration(&*client)?,
grandpa_block_import,
client.clone(),
)?;
let (import_queue, babe_worker_handle) = if consensus_manual {
let import_queue = sc_consensus_manual_seal::import_queue(
Box::new(babe_block_import.clone()),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
);
(import_queue, None)
let slot_duration = babe_link.config().slot_duration();
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
let (queue, handle) =
sc_consensus_babe::import_queue(sc_consensus_babe::ImportQueueParams {
link: babe_link.clone(),
block_import: babe_block_import.clone(),
justification_import: Some(Box::new(justification_import)),
client: client.clone(),
select_chain: select_chain.clone(),
create_inherent_data_providers: move |_, ()| async move {
let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
*timestamp,
slot_duration,
);
Ok((slot, timestamp))
},
spawner: &task_manager.spawn_essential_handle(),
registry: config.prometheus_registry(),
telemetry: telemetry.as_ref().map(|x| x.handle()),
offchain_tx_pool_factory:
sc_transaction_pool_api::OffchainTransactionPoolFactory::new(
transaction_pool.clone(),
),
})?;
(queue, Some(handle))
Ok(sc_service::PartialComponents {
client,
backend,
task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: (
babe_block_import,
babe_link,
babe_worker_handle,
grandpa_link,
telemetry,
),
}
/// Builds a new service for a full client.
pub fn new_full<RuntimeApi>(
config: Configuration,
) -> Result<TaskManager, ServiceError>
where
RuntimeApi: sp_api::ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: RuntimeApiCollection,
let sc_service::PartialComponents {
client,
backend,
mut task_manager,
import_queue,
keystore_container,
other: (block_import, babe_link, babe_worker_handle, grandpa_link, mut telemetry),
} = new_partial::<RuntimeApi>(&config, sealing.is_manual_consensus())?;
let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
&client
.block_hash(0)
.ok()
.flatten()
.expect("Genesis block exists; qed"),
&config.chain_spec,
);
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let (grandpa_protocol_config, grandpa_notification_service) =
sc_consensus_grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone());
net_config.add_notification_protocol(grandpa_protocol_config);
let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
grandpa_link.shared_authority_set().clone(),
Vec::default(),
));
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
block_relay: None,
})?;
let role = config.role.clone();
let force_authoring = config.force_authoring;
let backoff_authoring_blocks: Option<()> = None;
let name = config.network.node_name.clone();
let enable_grandpa = !config.disable_grandpa;
let prometheus_registry = config.prometheus_registry().cloned();
if config.offchain_worker.enabled {
use futures::FutureExt;
task_manager.spawn_handle().spawn(
"offchain-workers-runner",
"offchain-work",
sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
runtime_api_provider: client.clone(),
keystore: Some(keystore_container.keystore()),
offchain_db: backend.offchain_storage(),
transaction_pool: Some(
sc_transaction_pool_api::OffchainTransactionPoolFactory::new(
transaction_pool.clone(),
),
),
network_provider: network.clone(),
is_validator: role.is_authority(),
enable_http_requests: false,
custom_extensions: move |_| vec![],
})
.run(client.clone(), task_manager.spawn_handle())
.boxed(),
);
}
let mut command_sink_opt = None;
if role.is_authority() {
let distance_dir = config
.base_path
.config_dir(config.chain_spec.id())
.join("distance");
let proposer_factory = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
prometheus_registry.as_ref(),
telemetry.as_ref().map(|x| x.handle()),
);
let keystore_ptr = keystore_container.keystore();
let client = client.clone();
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
match sealing {
crate::cli::Sealing::Instant => {
Box::new(
// This bit cribbed from the implementation of instant seal.
transaction_pool
.pool()
.validated_pool()
.import_notification_stream()
.map(|_| EngineCommand::SealNewBlock {
create_empty: false,
finalize: false,
parent_hash: None,
sender: None,
}),
)
}
crate::cli::Sealing::Manual => {
let (sink, stream) = futures::channel::mpsc::channel(1000);
// Keep a reference to the other end of the channel. It goes to the RPC.
command_sink_opt = Some(sink);
Box::new(stream)
}
crate::cli::Sealing::Interval(millis) => Box::new(StreamExt::map(
Timer::interval(Duration::from_millis(millis)),
|_| EngineCommand::SealNewBlock {
create_empty: true,
finalize: false,
parent_hash: None,
sender: None,
},
)),
crate::cli::Sealing::Production => unreachable!(),
let babe_consensus_data_provider =
sc_consensus_manual_seal::consensus::babe::BabeConsensusDataProvider::new(
keystore_container.keystore(),
babe_link.epoch_changes().clone(),
vec![(
sp_consensus_babe::AuthorityId::from(
sp_keyring::sr25519::Keyring::Alice.public(),
),
1000,
)],
)
.expect("failed to create BabeConsensusDataProvider");
task_manager.spawn_essential_handle().spawn_blocking(
run_manual_seal(ManualSealParams {
block_import,
env: proposer_factory,
client: client.clone(),
select_chain: select_chain.clone(),
consensus_data_provider: Some(Box::new(babe_consensus_data_provider)),
create_inherent_data_providers: move |parent, _| {
let client = client.clone();
let distance_dir = distance_dir.clone();
let babe_owner_keys =
std::sync::Arc::new(sp_keystore::Keystore::sr25519_public_keys(
keystore_ptr.as_ref(),
sp_runtime::KeyTypeId(*b"babe"),
));
async move {
let timestamp =
sc_consensus_manual_seal::consensus::timestamp::SlotTimestampProvider::new_babe(
client.clone(),
)
.map_err(|err| format!("{:?}", err))?;
let babe = sp_consensus_babe::inherents::InherentDataProvider::new(
let distance =
dc_distance::create_distance_inherent_data_provider::<
Block,
FullClient<RuntimeApi>,
FullBackend,
>(
&*client, parent, distance_dir, &babe_owner_keys.clone()
)?;
Ok((timestamp, babe, distance))
let slot_duration = babe_link.config().slot_duration();
let babe_config = sc_consensus_babe::BabeParams {
keystore: keystore_container.keystore(),
select_chain: select_chain.clone(),
sync_oracle: sync_service.clone(),
justification_sync_link: sync_service.clone(),
// This closure is called during each block generation.
let client = client.clone();
let distance_dir = distance_dir.clone();
let babe_owner_keys =
std::sync::Arc::new(sp_keystore::Keystore::sr25519_public_keys(
keystore_ptr.as_ref(),
sp_runtime::KeyTypeId(*b"babe"),
));
let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
*timestamp,
slot_duration,
);
let storage_proof =
sp_transaction_storage_proof::registration::new_data_provider(
&*client, &parent,
)?;
let distance = dc_distance::create_distance_inherent_data_provider::<
Block,
FullClient<RuntimeApi>,
FullBackend,
>(
&*client, parent, distance_dir, &babe_owner_keys.clone()
)?;
Ok((slot, timestamp, storage_proof, distance))
force_authoring,
backoff_authoring_blocks,
babe_link,
block_proposal_slot_portion: sc_consensus_babe::SlotProportion::new(2f32 / 3f32),
max_block_proposal_slot_portion: None,
telemetry: telemetry.as_ref().map(|x| x.handle()),
};
let babe = sc_consensus_babe::start_babe(babe_config)?;
// the BABE authoring task is considered essential, i.e. if it
// fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking(
"babe-proposer",
let rpc_extensions_builder = {
let client = client.clone();
let pool = transaction_pool.clone();
let select_chain = select_chain;
let chain_spec = config.chain_spec.cloned_box();
let keystore = keystore_container.keystore().clone();
let babe_deps = babe_worker_handle.map(|babe_worker_handle| crate::rpc::BabeDeps {
babe_worker_handle,
keystore: keystore.clone(),
});
Box::new(move |deny_unsafe, _| {
let deps = crate::rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
select_chain: select_chain.clone(),
chain_spec: chain_spec.cloned_box(),
babe: babe_deps.clone(),
command_sink_opt: command_sink_opt.clone(),
crate::rpc::create_full(deps).map_err(Into::into)
})
};
let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
sync_service: sync_service.clone(),
keystore: keystore_container.keystore(),
transaction_pool: transaction_pool.clone(),
rpc_builder: rpc_extensions_builder,
telemetry: telemetry.as_mut(),
})?;
// if the node isn't actively participating in consensus then it doesn't
// need a keystore, regardless of which protocol we use below.
let keystore = if role.is_authority() {
Some(keystore_container.keystore())
let grandpa_config = sc_consensus_grandpa::Config {
justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
protocol_name: grandpa_protocol_name,
};
if enable_grandpa {
// start the full GRANDPA voter
// NOTE: non-authorities could run the GRANDPA observer protocol, but at
// this point the full voter should provide better guarantees of block
// and vote data availability than the observer. The observer has not
// been tested extensively yet and having most nodes in a network run it
// could lead to finality stalls.
let grandpa_config = sc_consensus_grandpa::GrandpaParams {
sync: sync_service,
voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state: SharedVoterState::empty(),
telemetry: telemetry.as_ref().map(|x| x.handle()),
notification_service: grandpa_notification_service,
offchain_tx_pool_factory: sc_transaction_pool_api::OffchainTransactionPoolFactory::new(
transaction_pool.clone(),
),
};
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
log::info!("***** Duniter has fully started *****");
/// Reverts the node state down to at most the last finalized block.
///
/// In particular this reverts:
/// - Low level Babe and Grandpa consensus data.
pub fn revert_backend(
client: Arc<Client>,
backend: Arc<FullBackend>,
blocks: common_runtime::BlockNumber,
) -> sc_cli::Result<()> {
// Revert Substrate consensus related components
client.execute_with(RevertConsensus { blocks, backend })?;
Ok(())
}
pub(super) struct RevertConsensus {
blocks: common_runtime::BlockNumber,
backend: Arc<FullBackend>,
}
impl client::ExecuteWithClient for RevertConsensus {
type Output = sp_blockchain::Result<()>;
fn execute_with_client<Client, Api, Backend>(self, client: Arc<Client>) -> Self::Output
where
Backend: sc_client_api::Backend<Block>,
Backend::State: sc_client_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection,
Client: client::AbstractClient<Block, Backend, Api = Api> + 'static,
{
// Revert consensus-related components.
// The operations are not correlated, thus call order is not relevant.
sc_consensus_babe::revert(client.clone(), self.backend, self.blocks)?;
sc_consensus_grandpa::revert(client, self.blocks)?;