Skip to content
Snippets Groups Projects

Distance Oracle

Merged Pascal Engélibert requested to merge distance into master
Compare and Show latest version
4 files
+ 161
21
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 154
16
use codec::{Decode, Encode};
use codec::Encode;
 
use rayon::iter::{IntoParallelIterator, ParallelIterator};
 
use std::collections::{HashMap, HashSet};
 
use std::io::Write;
use std::path::PathBuf;
use std::path::PathBuf;
use subxt::{
use subxt::ext::sp_runtime::Perbill;
rpc::rpc_params,
use subxt::storage::StorageKey;
tx::{BaseExtrinsicParams, BaseExtrinsicParamsBuilder},
Config,
};
#[subxt::subxt(runtime_metadata_path = "../resources/metadata.scale")]
#[subxt::subxt(runtime_metadata_path = "../resources/metadata.scale")]
pub mod gdev {}
pub mod gdev {}
@@ -45,8 +45,12 @@ impl From<u64> for Tip {
@@ -45,8 +45,12 @@ impl From<u64> for Tip {
}
}
}
}
 
type IdtyIndex = u32;
 
#[tokio::main]
#[tokio::main]
async fn main() {
async fn main() {
 
let min_certs: u32 = 5;
 
let client = Client::new().await.unwrap();
let client = Client::new().await.unwrap();
let parent_hash = client
let parent_hash = client
@@ -64,10 +68,10 @@ async fn main() {
@@ -64,10 +68,10 @@ async fn main() {
)
)
.await
.await
.unwrap()
.unwrap()
.unwrap();
.unwrap_or_default();
// Fetch the pending identities
// Fetch the pending identities
let evaluation_pool = client
let Some(evaluation_pool) = client
.storage()
.storage()
.fetch(
.fetch(
&match current_session % 3 {
&match current_session % 3 {
@@ -79,25 +83,159 @@ async fn main() {
@@ -79,25 +83,159 @@ async fn main() {
Some(parent_hash),
Some(parent_hash),
)
)
.await
.await
.unwrap()
.unwrap() else {
.unwrap();
// Nothing to do
 
return
 
};
// Stop if nothing to evaluate
// Stop if nothing to evaluate
if evaluation_pool.0 .0.is_empty() {
if evaluation_pool.0 .0.is_empty() {
return;
return;
}
}
let evaluation_result_path = PathBuf::from(format!(
let evaluation_result_dir = PathBuf::from("/tmp/duniter/chains/gdev/distance");
"/tmp/duniter/chains/gdev/distance/{}",
let evaluation_result_path = evaluation_result_dir.join((current_session + 1).to_string());
current_session + 1
));
// Stop if already evaluated
// Stop if already evaluated
if evaluation_result_path.try_exists().unwrap() {
if evaluation_result_path.try_exists().unwrap() {
return;
return;
}
}
// Evaluate
let evaluation_block = client
 
.storage()
 
.fetch(
 
&gdev::storage().distance().evaluation_block(),
 
Some(parent_hash),
 
)
 
.await
 
.unwrap()
 
.unwrap();
 
 
std::fs::create_dir_all(evaluation_result_dir).unwrap();
 
 
// member idty -> issued certs
 
let mut members = HashMap::<IdtyIndex, u32>::new();
 
 
let mut members_iter = client
 
.storage()
 
.iter(
 
gdev::storage().membership().membership(0),
 
100,
 
Some(evaluation_block),
 
)
 
.await
 
.unwrap();
 
while let Some((member_idty, _membership_expire)) = members_iter.next().await.unwrap() {
 
members.insert(idty_id_from_storage_key(&member_idty), 0);
 
}
 
 
// idty -> received certs
 
let mut received_certs = HashMap::<IdtyIndex, Vec<IdtyIndex>>::new();
 
 
let mut certs_iter = client
 
.storage()
 
.iter(
 
gdev::storage().cert().certs_by_receiver(0),
 
100,
 
Some(evaluation_block),
 
)
 
.await
 
.unwrap();
 
while let Some((receiver, issuers)) = certs_iter.next().await.unwrap() {
 
let receiver = idty_id_from_storage_key(&receiver);
 
// Update members' issued certs count
 
if issuers.len() as u32 >= min_certs {
 
for (issuer, _removable_on) in issuers.iter() {
 
if let Some(issued_certs) = members.get_mut(issuer) {
 
*issued_certs += 1;
 
}
 
}
 
} else {
 
// This member is not referee
 
members.remove(&receiver);
 
}
 
received_certs.insert(
 
receiver,
 
issuers
 
.into_iter()
 
.map(|(issuer, _removable_on)| issuer)
 
.collect(),
 
);
 
}
 
 
// Only retain referees
 
// TODO benchmark: can it be faster? (maybe using drain_filter)
 
members.retain(|_idty, issued_certs| *issued_certs >= min_certs);
 
let referees = members;
 
 
let evaluation: Vec<Perbill> = evaluation_pool
 
.0
 
.0
 
.into_par_iter()
 
.map(|(idty, _)| {
 
Perbill::from_rational(
 
distance_rule(&received_certs, &referees, min_certs, idty),
 
referees.len() as u32,
 
)
 
})
 
.collect();
 
 
let mut evaluation_result_file = std::fs::OpenOptions::new()
 
.write(true)
 
.create_new(true)
 
.open(evaluation_result_path)
 
.unwrap();
 
evaluation_result_file
 
.write(
 
&sp_distance::ComputationResult {
 
distances: evaluation,
 
}
 
.encode(),
 
)
 
.unwrap();
 
}
 
 
fn distance_rule_recursive(
 
received_certs: &HashMap<IdtyIndex, Vec<IdtyIndex>>,
 
referees: &HashMap<IdtyIndex, u32>,
 
idty: IdtyIndex,
 
accessible_referees: &mut std::collections::HashSet<IdtyIndex>,
 
depth: u32,
 
) {
 
if referees.contains_key(&idty) {
 
accessible_referees.insert(idty);
 
}
 
if depth == 0 {
 
return;
 
}
 
for &certifier in received_certs.get(&idty).expect("unreachable").iter() {
 
distance_rule_recursive(
 
received_certs,
 
referees,
 
certifier,
 
accessible_referees,
 
depth - 1,
 
);
 
}
 
}
 
 
fn distance_rule(
 
received_certs: &HashMap<IdtyIndex, Vec<IdtyIndex>>,
 
referees: &HashMap<IdtyIndex, u32>,
 
depth: u32,
 
idty: IdtyIndex,
 
) -> u32 {
 
let mut accessible_referees = HashSet::<u32>::new();
 
distance_rule_recursive(
 
received_certs,
 
referees,
 
idty,
 
&mut accessible_referees,
 
depth + 1,
 
);
 
accessible_referees.len() as u32
 
}
// Save file
fn idty_id_from_storage_key(storage_key: &StorageKey) -> IdtyIndex {
 
u32::from_le_bytes(storage_key.as_ref()[40..44].try_into().unwrap())
}
}
Loading