diff --git a/distance-oracle/src/api.rs b/distance-oracle/src/api.rs index 4144648aad061b5f4d9bc0009b3c3cc501bb000c..9e507dd031907d919a5340c5c3d2a9d0e4db57c0 100644 --- a/distance-oracle/src/api.rs +++ b/distance-oracle/src/api.rs @@ -19,11 +19,12 @@ use crate::runtime; use log::debug; -use subxt::utils::H256; - pub type Client = subxt::OnlineClient<crate::RuntimeConfig>; pub type AccountId = subxt::utils::AccountId32; pub type IdtyIndex = u32; +pub type EvaluationPool = + runtime::runtime_types::pallet_distance::types::EvaluationPool<AccountId, IdtyIndex>; +pub type H256 = subxt::utils::H256; pub async fn client(rpc_url: impl AsRef<str>) -> Client { Client::from_insecure_url(rpc_url) @@ -54,7 +55,7 @@ pub async fn current_pool( client: &Client, parent_hash: H256, current_pool_index: u32, -) -> Option<runtime::runtime_types::pallet_distance::types::EvaluationPool<AccountId, IdtyIndex>> { +) -> Option<EvaluationPool> { client .storage() .at(parent_hash) diff --git a/distance-oracle/src/lib.rs b/distance-oracle/src/lib.rs index 668995d8e38614865d59234611258ac22ce6a814..f54567b48250cc9647182845d1730ebac89958e8 100644 --- a/distance-oracle/src/lib.rs +++ b/distance-oracle/src/lib.rs @@ -24,15 +24,14 @@ mod tests; #[cfg(test)] pub use mock as api; -use api::{AccountId, IdtyIndex}; +use api::{AccountId, EvaluationPool, IdtyIndex, H256}; use codec::Encode; use fnv::{FnvHashMap, FnvHashSet}; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{io::Write, path::PathBuf}; -// TODO select metadata file using features #[subxt::subxt(runtime_metadata_path = "../resources/metadata.scale")] pub mod runtime {} @@ -83,10 +82,18 @@ impl Default for Settings { } } -/// Asynchronously runs a computation using the provided client and saves the result to a file. -pub async fn run_and_save(client: &api::Client, settings: &Settings) { +/// Runs the evaluation process, saves the results, and cleans up old files. +/// +/// This function performs the following steps: +/// 1. Runs the evaluation task by invoking `compute_distance_evaluation`, which provides: +/// - The evaluation results. +/// - The current period index. +/// - The file path where the results should be stored. +/// 2. Saves the evaluation results to a file in the specified directory. +/// 3. Cleans up outdated evaluation files. +pub async fn run(client: &api::Client, settings: &Settings) { let Some((evaluation, current_period_index, evaluation_result_path)) = - run(client, settings, true).await + compute_distance_evaluation(client, settings).await else { return; }; @@ -144,57 +151,24 @@ pub async fn run_and_save(client: &api::Client, settings: &Settings) { }); } -/// Asynchronously runs a computation based on the provided client and settings. -/// Returns `Option<(evaluation, current_period_index, evaluation_result_path)>`. -pub async fn run( +/// Evaluates distance for the current period and prepares results for storage. +/// +/// This function performs the following steps: +/// 1. Prepares the evaluation context using `prepare_evaluation_context`. If the context is not +/// ready (e.g., no pending evaluations, or results already exist), it returns `None`. +/// 2. Evaluates distances for all identities in the evaluation pool based on referees and received certifications. +/// 3. Returns the evaluation results, the current period index, and the path to store the results. +/// +pub async fn compute_distance_evaluation( client: &api::Client, settings: &Settings, - handle_fs: bool, ) -> Option<(Vec<sp_runtime::Perbill>, u32, PathBuf)> { - let parent_hash = api::parent_hash(client).await; - - let max_depth = api::max_referee_distance(client).await; - - let current_period_index = api::current_period_index(client, parent_hash).await; - - // Fetch the pending identities - let Some(evaluation_pool) = - api::current_pool(client, parent_hash, current_period_index % 3).await - else { - info!("Nothing to do: Pool does not exist"); - return None; - }; - - // Stop if nothing to evaluate - if evaluation_pool.evaluations.0.is_empty() { - info!("Nothing to do: Pool is empty"); - return None; - } - - let evaluation_result_path = settings - .evaluation_result_dir - .join((current_period_index + 1).to_string()); - - if handle_fs { - // Stop if already evaluated - if evaluation_result_path - .try_exists() - .expect("Result path unavailable") - { - info!("Nothing to do: File already exists"); - return None; - } - - std::fs::create_dir_all(&settings.evaluation_result_dir).unwrap_or_else(|e| { - error!( - "Cannot create distance evaluation result directory `{0:?}`: {e:?}", - settings.evaluation_result_dir - ); - }); - } + let (evaluation_block, current_period_index, evaluation_pool, evaluation_result_path) = + prepare_evaluation_context(client, settings).await?; info!("Evaluating distance for period {}", current_period_index); - let evaluation_block = api::evaluation_block(client, parent_hash).await; + + let max_depth = api::max_referee_distance(client).await; // member idty -> issued certs let mut members = FnvHashMap::<IdtyIndex, u32>::default(); @@ -252,6 +226,70 @@ pub async fn run( Some((evaluation, current_period_index, evaluation_result_path)) } +/// Prepares the context for the next evaluation task. +/// +/// This function performs the following steps: +/// 1. Fetches the parent hash of the latest block from the API. +/// 2. Determines the current period index. +/// 3. Retrieves the evaluation pool for the current period. +/// - If the pool does not exist or is empty, it returns `None`. +/// 4. Checks if the evaluation result file for the next period already exists. +/// - If it exists, the task has already been completed, so the function returns `None`. +/// 5. Ensures the evaluation result directory is available, creating it if necessary. +/// 6. Retrieves the block number of the evaluation. +/// +async fn prepare_evaluation_context( + client: &api::Client, + settings: &Settings, +) -> Option<(H256, u32, EvaluationPool, PathBuf)> { + let parent_hash = api::parent_hash(client).await; + + let current_period_index = api::current_period_index(client, parent_hash).await; + + // Fetch the pending identities + let Some(evaluation_pool) = + api::current_pool(client, parent_hash, current_period_index % 3).await + else { + info!("Nothing to do: Pool does not exist"); + return None; + }; + + // Stop if nothing to evaluate + if evaluation_pool.evaluations.0.is_empty() { + info!("Nothing to do: Pool is empty"); + return None; + } + + let evaluation_result_path = settings + .evaluation_result_dir + .join((current_period_index + 1).to_string()); + + // Stop if already evaluated + if evaluation_result_path + .try_exists() + .expect("Result path unavailable") + { + info!("Nothing to do: File already exists"); + return None; + } + + #[cfg(not(test))] + std::fs::create_dir_all(&settings.evaluation_result_dir).unwrap_or_else(|e| { + panic!( + "Cannot create distance evaluation result directory `{0:?}`: {e:?}", + settings.evaluation_result_dir + ); + }); + + Some(( + api::evaluation_block(client, parent_hash).await, + current_period_index, + evaluation_pool, + evaluation_result_path, + )) +} + +/// Recursively explores the certification graph to identify referees accessible within a given depth. fn distance_rule_recursive( received_certs: &FnvHashMap<IdtyIndex, Vec<IdtyIndex>>, referees: &FnvHashMap<IdtyIndex, u32>, @@ -297,7 +335,7 @@ fn distance_rule_recursive( } } -/// Returns the fraction `nb_accessible_referees / nb_referees` +/// Calculates the fraction of accessible referees to total referees for a given identity. fn distance_rule( received_certs: &FnvHashMap<IdtyIndex, Vec<IdtyIndex>>, referees: &FnvHashMap<IdtyIndex, u32>, diff --git a/distance-oracle/src/main.rs b/distance-oracle/src/main.rs index 405abda20c09adbbaea89379d0e7a0fa84b84b7a..bd39816a29ed90dfe1b2f10cdbaca63efd9532ec 100644 --- a/distance-oracle/src/main.rs +++ b/distance-oracle/src/main.rs @@ -51,10 +51,10 @@ async fn main() { let mut interval = tokio::time::interval(std::time::Duration::from_secs(duration)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { - distance_oracle::run_and_save(&client, &settings).await; + distance_oracle::run(&client, &settings).await; interval.tick().await; } } else { - distance_oracle::run_and_save(&client, &settings).await; + distance_oracle::run(&client, &settings).await; } } diff --git a/distance-oracle/src/mock.rs b/distance-oracle/src/mock.rs index 103c940d2ac27f27ff87e1368797b30772bf81d0..881022205f92e5725ad8be079a6f84d8fd2093b3 100644 --- a/distance-oracle/src/mock.rs +++ b/distance-oracle/src/mock.rs @@ -19,7 +19,6 @@ use crate::runtime::runtime_types::{ }; use dubp_wot::{data::rusty::RustyWebOfTrust, WebOfTrust, WotId}; -use sp_core::H256; use std::collections::BTreeSet; pub struct Client { @@ -28,8 +27,9 @@ pub struct Client { } pub type AccountId = subxt::ext::sp_runtime::AccountId32; pub type IdtyIndex = u32; +pub type H256 = subxt::utils::H256; -pub struct EvaluationPool<AccountId: Ord, IdtyIndex> { +pub struct EvaluationPool { pub evaluations: (Vec<(IdtyIndex, MedianAcc<Perbill>)>,), pub evaluators: BTreeSet<AccountId>, } @@ -54,7 +54,7 @@ pub async fn current_pool( client: &Client, _parent_hash: H256, _current_session: u32, -) -> Option<EvaluationPool<AccountId, IdtyIndex>> { +) -> Option<EvaluationPool> { Some(EvaluationPool { evaluations: (client .wot diff --git a/distance-oracle/src/tests.rs b/distance-oracle/src/tests.rs index a1c197fa82121eb99b9e16293e85368a916404d4..23df3f2874b5f46ce9b33abb7e1152f08e338058 100644 --- a/distance-oracle/src/tests.rs +++ b/distance-oracle/src/tests.rs @@ -58,7 +58,7 @@ async fn test_distance_against_v1() { client.pool_len = n; let t_a = std::time::Instant::now(); - let results = crate::run(&client, &Default::default(), false) + let results = crate::compute_distance_evaluation(&client, &Default::default()) .await .unwrap(); println!("new time: {}", t_a.elapsed().as_millis()); diff --git a/end2end-tests/tests/common/distance.rs b/end2end-tests/tests/common/distance.rs index f9f43d6707559ea0145d72ff043b0b7ecd50cc09..f7d02df5544cf80a7b8922d90296d2ae8478ad85 100644 --- a/end2end-tests/tests/common/distance.rs +++ b/end2end-tests/tests/common/distance.rs @@ -51,15 +51,15 @@ pub async fn run_oracle( let origin = PairSigner::new(origin.pair()); let account_id: &AccountId32 = origin.account_id(); - if let Some((distances, _current_session, _evaluation_result_path)) = distance_oracle::run( - &distance_oracle::api::client(rpc_url.clone()).await, - &distance_oracle::Settings { - evaluation_result_dir: PathBuf::default(), - rpc_url, - }, - false, - ) - .await + if let Some((distances, _current_session, _evaluation_result_path)) = + distance_oracle::compute_distance_evaluation( + &distance_oracle::api::client(rpc_url.clone()).await, + &distance_oracle::Settings { + evaluation_result_dir: PathBuf::default(), + rpc_url, + }, + ) + .await { // Distance evaluation period is 7 blocks for _ in 0..7 { diff --git a/node/src/command.rs b/node/src/command.rs index 9aa124d912fb453143f61420438c5ac90ded6bf5..0189fe1706f40cfcb9da32f255d9969b3781038c 100644 --- a/node/src/command.rs +++ b/node/src/command.rs @@ -296,11 +296,11 @@ pub fn run() -> sc_cli::Result<()> { let mut interval = tokio::time::interval(std::time::Duration::from_secs(duration)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { - distance_oracle::run_and_save(&client, &settings).await; + distance_oracle::run(&client, &settings).await; interval.tick().await; } } else { - distance_oracle::run_and_save(&client, &settings).await; + distance_oracle::run(&client, &settings).await; } Ok(()) }),