Skip to content
Snippets Groups Projects
Unverified Commit 91a91749 authored by bgallois's avatar bgallois
Browse files

refactor oracle

parent 7e57cb65
No related branches found
No related tags found
No related merge requests found
Pipeline #38616 passed
This commit is part of merge request !290. Comments created here will be created in the context of that merge request.
......@@ -19,11 +19,12 @@
use crate::runtime;
use log::debug;
use subxt::utils::H256;
pub type Client = subxt::OnlineClient<crate::RuntimeConfig>;
pub type AccountId = subxt::utils::AccountId32;
pub type IdtyIndex = u32;
pub type EvaluationPool =
runtime::runtime_types::pallet_distance::types::EvaluationPool<AccountId, IdtyIndex>;
pub type H256 = subxt::utils::H256;
pub async fn client(rpc_url: impl AsRef<str>) -> Client {
Client::from_insecure_url(rpc_url)
......@@ -54,7 +55,7 @@ pub async fn current_pool(
client: &Client,
parent_hash: H256,
current_pool_index: u32,
) -> Option<runtime::runtime_types::pallet_distance::types::EvaluationPool<AccountId, IdtyIndex>> {
) -> Option<EvaluationPool> {
client
.storage()
.at(parent_hash)
......
......@@ -24,15 +24,14 @@ mod tests;
#[cfg(test)]
pub use mock as api;
use api::{AccountId, IdtyIndex};
use api::{AccountId, EvaluationPool, IdtyIndex, H256};
use codec::Encode;
use fnv::{FnvHashMap, FnvHashSet};
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{io::Write, path::PathBuf};
// TODO select metadata file using features
#[subxt::subxt(runtime_metadata_path = "../resources/metadata.scale")]
pub mod runtime {}
......@@ -83,10 +82,18 @@ impl Default for Settings {
}
}
/// Asynchronously runs a computation using the provided client and saves the result to a file.
pub async fn run_and_save(client: &api::Client, settings: &Settings) {
/// Runs the evaluation process, saves the results, and cleans up old files.
///
/// This function performs the following steps:
/// 1. Runs the evaluation task by invoking `compute_distance_evaluation`, which provides:
/// - The evaluation results.
/// - The current period index.
/// - The file path where the results should be stored.
/// 2. Saves the evaluation results to a file in the specified directory.
/// 3. Cleans up outdated evaluation files.
pub async fn run(client: &api::Client, settings: &Settings) {
let Some((evaluation, current_period_index, evaluation_result_path)) =
run(client, settings, true).await
compute_distance_evaluation(client, settings).await
else {
return;
};
......@@ -144,57 +151,24 @@ pub async fn run_and_save(client: &api::Client, settings: &Settings) {
});
}
/// Asynchronously runs a computation based on the provided client and settings.
/// Returns `Option<(evaluation, current_period_index, evaluation_result_path)>`.
pub async fn run(
/// Evaluates distance for the current period and prepares results for storage.
///
/// This function performs the following steps:
/// 1. Prepares the evaluation context using `prepare_evaluation_context`. If the context is not
/// ready (e.g., no pending evaluations, or results already exist), it returns `None`.
/// 2. Evaluates distances for all identities in the evaluation pool based on referees and received certifications.
/// 3. Returns the evaluation results, the current period index, and the path to store the results.
///
pub async fn compute_distance_evaluation(
client: &api::Client,
settings: &Settings,
handle_fs: bool,
) -> Option<(Vec<sp_runtime::Perbill>, u32, PathBuf)> {
let parent_hash = api::parent_hash(client).await;
let max_depth = api::max_referee_distance(client).await;
let current_period_index = api::current_period_index(client, parent_hash).await;
// Fetch the pending identities
let Some(evaluation_pool) =
api::current_pool(client, parent_hash, current_period_index % 3).await
else {
info!("Nothing to do: Pool does not exist");
return None;
};
// Stop if nothing to evaluate
if evaluation_pool.evaluations.0.is_empty() {
info!("Nothing to do: Pool is empty");
return None;
}
let evaluation_result_path = settings
.evaluation_result_dir
.join((current_period_index + 1).to_string());
if handle_fs {
// Stop if already evaluated
if evaluation_result_path
.try_exists()
.expect("Result path unavailable")
{
info!("Nothing to do: File already exists");
return None;
}
std::fs::create_dir_all(&settings.evaluation_result_dir).unwrap_or_else(|e| {
error!(
"Cannot create distance evaluation result directory `{0:?}`: {e:?}",
settings.evaluation_result_dir
);
});
}
let (evaluation_block, current_period_index, evaluation_pool, evaluation_result_path) =
prepare_evaluation_context(client, settings).await?;
info!("Evaluating distance for period {}", current_period_index);
let evaluation_block = api::evaluation_block(client, parent_hash).await;
let max_depth = api::max_referee_distance(client).await;
// member idty -> issued certs
let mut members = FnvHashMap::<IdtyIndex, u32>::default();
......@@ -252,6 +226,70 @@ pub async fn run(
Some((evaluation, current_period_index, evaluation_result_path))
}
/// Prepares the context for the next evaluation task.
///
/// This function performs the following steps:
/// 1. Fetches the parent hash of the latest block from the API.
/// 2. Determines the current period index.
/// 3. Retrieves the evaluation pool for the current period.
/// - If the pool does not exist or is empty, it returns `None`.
/// 4. Checks if the evaluation result file for the next period already exists.
/// - If it exists, the task has already been completed, so the function returns `None`.
/// 5. Ensures the evaluation result directory is available, creating it if necessary.
/// 6. Retrieves the block number of the evaluation.
///
async fn prepare_evaluation_context(
client: &api::Client,
settings: &Settings,
) -> Option<(H256, u32, EvaluationPool, PathBuf)> {
let parent_hash = api::parent_hash(client).await;
let current_period_index = api::current_period_index(client, parent_hash).await;
// Fetch the pending identities
let Some(evaluation_pool) =
api::current_pool(client, parent_hash, current_period_index % 3).await
else {
info!("Nothing to do: Pool does not exist");
return None;
};
// Stop if nothing to evaluate
if evaluation_pool.evaluations.0.is_empty() {
info!("Nothing to do: Pool is empty");
return None;
}
let evaluation_result_path = settings
.evaluation_result_dir
.join((current_period_index + 1).to_string());
// Stop if already evaluated
if evaluation_result_path
.try_exists()
.expect("Result path unavailable")
{
info!("Nothing to do: File already exists");
return None;
}
#[cfg(not(test))]
std::fs::create_dir_all(&settings.evaluation_result_dir).unwrap_or_else(|e| {
panic!(
"Cannot create distance evaluation result directory `{0:?}`: {e:?}",
settings.evaluation_result_dir
);
});
Some((
api::evaluation_block(client, parent_hash).await,
current_period_index,
evaluation_pool,
evaluation_result_path,
))
}
/// Recursively explores the certification graph to identify referees accessible within a given depth.
fn distance_rule_recursive(
received_certs: &FnvHashMap<IdtyIndex, Vec<IdtyIndex>>,
referees: &FnvHashMap<IdtyIndex, u32>,
......@@ -297,7 +335,7 @@ fn distance_rule_recursive(
}
}
/// Returns the fraction `nb_accessible_referees / nb_referees`
/// Calculates the fraction of accessible referees to total referees for a given identity.
fn distance_rule(
received_certs: &FnvHashMap<IdtyIndex, Vec<IdtyIndex>>,
referees: &FnvHashMap<IdtyIndex, u32>,
......
......@@ -51,10 +51,10 @@ async fn main() {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(duration));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
distance_oracle::run_and_save(&client, &settings).await;
distance_oracle::run(&client, &settings).await;
interval.tick().await;
}
} else {
distance_oracle::run_and_save(&client, &settings).await;
distance_oracle::run(&client, &settings).await;
}
}
......@@ -19,7 +19,6 @@ use crate::runtime::runtime_types::{
};
use dubp_wot::{data::rusty::RustyWebOfTrust, WebOfTrust, WotId};
use sp_core::H256;
use std::collections::BTreeSet;
pub struct Client {
......@@ -28,8 +27,9 @@ pub struct Client {
}
pub type AccountId = subxt::ext::sp_runtime::AccountId32;
pub type IdtyIndex = u32;
pub type H256 = subxt::utils::H256;
pub struct EvaluationPool<AccountId: Ord, IdtyIndex> {
pub struct EvaluationPool {
pub evaluations: (Vec<(IdtyIndex, MedianAcc<Perbill>)>,),
pub evaluators: BTreeSet<AccountId>,
}
......@@ -54,7 +54,7 @@ pub async fn current_pool(
client: &Client,
_parent_hash: H256,
_current_session: u32,
) -> Option<EvaluationPool<AccountId, IdtyIndex>> {
) -> Option<EvaluationPool> {
Some(EvaluationPool {
evaluations: (client
.wot
......
......@@ -58,7 +58,7 @@ async fn test_distance_against_v1() {
client.pool_len = n;
let t_a = std::time::Instant::now();
let results = crate::run(&client, &Default::default(), false)
let results = crate::compute_distance_evaluation(&client, &Default::default())
.await
.unwrap();
println!("new time: {}", t_a.elapsed().as_millis());
......
......@@ -51,13 +51,13 @@ pub async fn run_oracle(
let origin = PairSigner::new(origin.pair());
let account_id: &AccountId32 = origin.account_id();
if let Some((distances, _current_session, _evaluation_result_path)) = distance_oracle::run(
if let Some((distances, _current_session, _evaluation_result_path)) =
distance_oracle::compute_distance_evaluation(
&distance_oracle::api::client(rpc_url.clone()).await,
&distance_oracle::Settings {
evaluation_result_dir: PathBuf::default(),
rpc_url,
},
false,
)
.await
{
......
......@@ -296,11 +296,11 @@ pub fn run() -> sc_cli::Result<()> {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(duration));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
distance_oracle::run_and_save(&client, &settings).await;
distance_oracle::run(&client, &settings).await;
interval.tick().await;
}
} else {
distance_oracle::run_and_save(&client, &settings).await;
distance_oracle::run(&client, &settings).await;
}
Ok(())
}),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment