Skip to content
Snippets Groups Projects
Commit cbe4cd67 authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

start indexer refac

parent b7990182
No related branches found
No related tags found
1 merge request!25indexer refac
...@@ -4,6 +4,13 @@ query IdentityNameByIndex($index: Int!) { ...@@ -4,6 +4,13 @@ query IdentityNameByIndex($index: Int!) {
} }
} }
query NamesByIndexes($idtyids: [Int!]) {
identities(where: {index_in: $idtyids}) {
index
name
}
}
query IdentityInfo($index: Int!) { query IdentityInfo($index: Int!) {
identities(where: { index_eq: $index }) { identities(where: { index_eq: $index }) {
name name
......
use crate::{indexer::*, *};
use std::collections::{hash_map, HashMap};
pub struct IdentityCache {
client: Client,
identities: HashMap<IdtyId, String>,
indexer: Option<Indexer>,
}
impl IdentityCache {
pub fn new(client: Client, indexer: Option<Indexer>) -> Self {
Self {
client,
identities: HashMap::new(),
indexer,
}
}
pub async fn fetch_identity(
&mut self,
identity_id: IdtyId,
parent_hash: sp_core::H256,
) -> anyhow::Result<String> {
Ok(match self.identities.entry(identity_id) {
hash_map::Entry::Occupied(entry) => entry.get().clone(),
hash_map::Entry::Vacant(entry) => entry
.insert({
let pubkey = self
.client
.storage()
.at(parent_hash)
.fetch(&runtime::storage().identity().identities(identity_id))
.await?
.ok_or_else(|| anyhow!("Identity {} not found", identity_id))?
.owner_key
.to_string();
format!(
"“ {} ”",
if let Some(indexer) = &self.indexer {
if let Some(username) = indexer.username_by_pubkey(&pubkey).await {
username
} else {
pubkey
}
} else {
pubkey
}
)
})
.clone(),
})
}
}
use crate::*; use crate::{indexer::*, *};
use futures::join; use futures::join;
use std::collections::BTreeMap; use std::collections::BTreeMap;
...@@ -27,7 +26,7 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an ...@@ -27,7 +26,7 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an
let end_block = current_block + blocks; let end_block = current_block + blocks;
let mut identity_cache = cache::IdentityCache::new(client.clone(), indexer); let mut identity_cache = IdentityCache::new(client.clone(), indexer);
// Certifications // Certifications
let mut basic_certs_iter = client let mut basic_certs_iter = client
...@@ -51,12 +50,12 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an ...@@ -51,12 +50,12 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an
println!( println!(
" {} ({}) -> {} ({})", " {} ({}) -> {} ({})",
identity_cache identity_cache
.fetch_identity(issuer_id, parent_hash) .fetch_identity(issuer_id,)
.await .await
.unwrap_or_else(|_| "?".into()), .unwrap_or_else(|_| "?".into()),
issuer_id, issuer_id,
identity_cache identity_cache
.fetch_identity(receiver_id, parent_hash) .fetch_identity(receiver_id,)
.await .await
.unwrap_or_else(|_| "?".into()), .unwrap_or_else(|_| "?".into()),
receiver_id, receiver_id,
...@@ -68,7 +67,8 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an ...@@ -68,7 +67,8 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an
// Memberships // Memberships
let mut basic_membership_iter = client let mut basic_membership_iter = client
.storage() .storage()
.at(parent_hash) .at_latest()
.await?
.iter(runtime::storage().membership().memberships_expire_on_iter()) .iter(runtime::storage().membership().memberships_expire_on_iter())
.await?; .await?;
let mut basic_memberships = BTreeMap::new(); let mut basic_memberships = BTreeMap::new();
...@@ -90,7 +90,7 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an ...@@ -90,7 +90,7 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an
println!( println!(
" {} ({})", " {} ({})",
identity_cache identity_cache
.fetch_identity(identity_id, parent_hash) .fetch_identity(identity_id)
.await .await
.unwrap_or_else(|_| "?".into()), .unwrap_or_else(|_| "?".into()),
identity_id, identity_id,
...@@ -101,3 +101,53 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an ...@@ -101,3 +101,53 @@ pub async fn monitor_expirations(data: &Data, blocks: u32, _sessions: u32) -> an
Ok(()) Ok(())
} }
use std::collections::{hash_map, HashMap};
pub struct IdentityCache {
client: Client,
identities: HashMap<IdtyId, String>,
indexer: Option<Indexer>,
}
impl IdentityCache {
pub fn new(client: Client, indexer: Option<Indexer>) -> Self {
Self {
client,
identities: HashMap::new(),
indexer,
}
}
pub async fn fetch_identity(&mut self, identity_id: IdtyId) -> anyhow::Result<String> {
Ok(match self.identities.entry(identity_id) {
hash_map::Entry::Occupied(entry) => entry.get().clone(),
hash_map::Entry::Vacant(entry) => entry
.insert({
let pubkey = self
.client
.storage()
.at_latest()
.await?
.fetch(&runtime::storage().identity().identities(identity_id))
.await?
.ok_or_else(|| anyhow!("Identity {} not found", identity_id))?
.owner_key
.to_string();
format!(
"“ {} ”",
if let Some(indexer) = &self.indexer {
if let Some(username) = indexer.username_by_pubkey(&pubkey).await {
username
} else {
pubkey
}
} else {
pubkey
}
)
})
.clone(),
})
}
}
...@@ -275,7 +275,7 @@ pub async fn get_identity( ...@@ -275,7 +275,7 @@ pub async fn get_identity(
let pseudo = pseudo.unwrap_or(if let Some(indexer) = &indexer { let pseudo = pseudo.unwrap_or(if let Some(indexer) = &indexer {
indexer indexer
.username_by_index(index) .username_by_index(index)
.await? .await
.ok_or_else(|| anyhow!("indexer does not have username for this index {index}"))? .ok_or_else(|| anyhow!("indexer does not have username for this index {index}"))?
} else { } else {
"<no indexer>".to_string() "<no indexer>".to_string()
......
...@@ -191,40 +191,18 @@ pub async fn go_offline(data: &Data) -> Result<(), subxt::Error> { ...@@ -191,40 +191,18 @@ pub async fn go_offline(data: &Data) -> Result<(), subxt::Error> {
/// get online authorities /// get online authorities
pub async fn online(data: &Data) -> Result<(), anyhow::Error> { pub async fn online(data: &Data) -> Result<(), anyhow::Error> {
let client = data.client(); let client = data.client();
let indexer = data.indexer.clone();
let parent_hash = client let online_authorities = client
.clone()
.storage() .storage()
.at_latest() .at_latest()
.await? .await?
.fetch(&runtime::storage().system().parent_hash())
.await?
.unwrap();
let mut identity_cache = cache::IdentityCache::new(client.clone(), indexer);
let online_authorities = client
.storage()
.at(parent_hash)
.fetch(&runtime::storage().authority_members().online_authorities()) .fetch(&runtime::storage().authority_members().online_authorities())
.await? .await?
.unwrap_or_default(); .unwrap_or_default();
println!("Online:");
for identity_id in online_authorities {
println!(
" {}",
identity_cache
.fetch_identity(identity_id, parent_hash)
.await
.unwrap_or_else(|_| format!("{identity_id}"))
);
}
let incoming_authorities = client let incoming_authorities = client
.storage() .storage()
.at(parent_hash) .at_latest()
.await?
.fetch( .fetch(
&runtime::storage() &runtime::storage()
.authority_members() .authority_members()
...@@ -232,21 +210,10 @@ pub async fn online(data: &Data) -> Result<(), anyhow::Error> { ...@@ -232,21 +210,10 @@ pub async fn online(data: &Data) -> Result<(), anyhow::Error> {
) )
.await? .await?
.unwrap_or_default(); .unwrap_or_default();
println!("Incoming:");
for identity_id in incoming_authorities {
println!(
" {}",
identity_cache
.fetch_identity(identity_id, parent_hash)
.await
.unwrap_or_else(|_| format!("{identity_id}"))
);
}
let outgoing_authorities = client let outgoing_authorities = client
.storage() .storage()
.at(parent_hash) .at_latest()
.await?
.fetch( .fetch(
&runtime::storage() &runtime::storage()
.authority_members() .authority_members()
...@@ -255,16 +222,14 @@ pub async fn online(data: &Data) -> Result<(), anyhow::Error> { ...@@ -255,16 +222,14 @@ pub async fn online(data: &Data) -> Result<(), anyhow::Error> {
.await? .await?
.unwrap_or_default(); .unwrap_or_default();
println!("Online:");
println!("{online_authorities:?}");
println!("Incoming:");
println!("{incoming_authorities:?}");
println!("Outgoing:"); println!("Outgoing:");
for identity_id in outgoing_authorities { println!("{outgoing_authorities:?}");
println!(
" {}",
identity_cache
.fetch_identity(identity_id, parent_hash)
.await
.unwrap_or_else(|_| format!("{identity_id}"))
);
}
Ok(()) Ok(())
} }
......
mod queries;
use crate::*; use crate::*;
use comfy_table::*; use comfy_table::*;
use comfy_table::{ContentArrangement, Table}; use comfy_table::{ContentArrangement, Table};
use graphql_client::{reqwest::post_graphql, GraphQLQuery}; use graphql_client::reqwest::post_graphql;
use identity_info::*; use graphql_client::GraphQLQuery;
use queries::*;
use sp_core::Bytes; use sp_core::Bytes;
// type used in parameters query
// #[allow(non_camel_case_types)]
// type jsonb = serde_json::Value;
// index → identity
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct IdentityNameByIndex;
// index → identity info
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct IdentityInfo;
// pubkey → identity
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct IdentityNameByPubkey;
// pubkey → wasidentity
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct WasIdentityNameByPubkey;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct LatestBlock;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct BlockByNumber;
#[derive(GraphQLQuery, Debug)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct GenesisHash;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Indexer { pub struct Indexer {
pub gql_client: reqwest::Client, pub gql_client: reqwest::Client,
...@@ -69,80 +15,67 @@ pub struct Indexer { ...@@ -69,80 +15,67 @@ pub struct Indexer {
} }
impl Indexer { impl Indexer {
// query
async fn query<T: GraphQLQuery>(
&self,
var: <T as GraphQLQuery>::Variables,
) -> <T as GraphQLQuery>::ResponseData {
post_graphql::<T, _>(&self.gql_client, &self.gql_url, var)
.await
.expect("indexer connexion error")
.data
.expect("indexer error")
}
/// index → name /// index → name
pub async fn username_by_index(&self, index: u32) -> anyhow::Result<Option<String>> { pub async fn username_by_index(&self, index: u32) -> Option<String> {
Ok(post_graphql::<IdentityNameByIndex, _>( self.query::<IdentityNameByIndex>(identity_name_by_index::Variables {
&self.gql_client,
&self.gql_url,
identity_name_by_index::Variables {
index: index.into(), index: index.into(),
}, })
) .await
.await? .identities
.data .pop()
.and_then(move |mut data| data.identities.pop().map(|idty| idty.name))) .map(|idty| idty.name)
} }
/// pubkey → name /// pubkey → name
pub async fn username_by_pubkey(&self, pubkey: &str) -> Option<String> { pub async fn username_by_pubkey(&self, pubkey: &str) -> Option<String> {
post_graphql::<IdentityNameByPubkey, _>( self.query::<IdentityNameByPubkey>(identity_name_by_pubkey::Variables {
&self.gql_client,
&self.gql_url,
identity_name_by_pubkey::Variables {
pubkey: pubkey.to_string(), pubkey: pubkey.to_string(),
}, })
)
.await .await
.expect("indexer connexion error") .identities
.data .pop()
.and_then(move |mut data| data.identities.pop().map(|idty| idty.name)) .map(|idty| idty.name)
} }
/// pubkey → was name /// pubkey → was name
pub async fn wasname_by_pubkey(&self, pubkey: &str) -> Option<String> { pub async fn wasname_by_pubkey(&self, pubkey: &str) -> Option<String> {
post_graphql::<WasIdentityNameByPubkey, _>( self.query::<WasIdentityNameByPubkey>(was_identity_name_by_pubkey::Variables {
&self.gql_client,
&self.gql_url,
was_identity_name_by_pubkey::Variables {
pubkey: pubkey.to_string(), pubkey: pubkey.to_string(),
}, })
)
.await .await
.expect("indexer connexion error") .account_by_id
.data
.and_then(move |data| {
data.account_by_id
.and_then(|mut acc| acc.was_identity.pop()) .and_then(|mut acc| acc.was_identity.pop())
.map(|idty| idty.identity.name) .map(|idty| idty.identity.name)
})
} }
/// index → info /// index → info
pub async fn identity_info(&self, index: u32) -> Option<IdentityInfoIdentities> { pub async fn identity_info(&self, index: u32) -> Option<identity_info::IdentityInfoIdentities> {
post_graphql::<IdentityInfo, _>( self.query::<IdentityInfo>(identity_info::Variables {
&self.gql_client,
&self.gql_url,
identity_info::Variables {
index: index.into(), index: index.into(),
}, })
)
.await .await
.expect("indexer connexion error") .identities
.data .pop()
.and_then(move |mut data| data.identities.pop())
} }
/// fetch latest block /// fetch latest block
pub async fn fetch_latest_block(&self) -> Option<latest_block::LatestBlockBlocks> { pub async fn fetch_latest_block(&self) -> Option<latest_block::LatestBlockBlocks> {
post_graphql::<LatestBlock, _>( self.query::<LatestBlock>(latest_block::Variables {})
&self.gql_client,
self.gql_url.clone(),
latest_block::Variables {},
)
.await .await
.expect("indexer connexion error") .blocks
.data .pop()
.and_then(move |mut data| data.blocks.pop())
} }
/// fetch block by number /// fetch block by number
...@@ -150,21 +83,16 @@ impl Indexer { ...@@ -150,21 +83,16 @@ impl Indexer {
&self, &self,
number: BlockNumber, number: BlockNumber,
) -> Option<block_by_number::BlockByNumberBlocks> { ) -> Option<block_by_number::BlockByNumberBlocks> {
post_graphql::<BlockByNumber, _>( self.query::<BlockByNumber>(block_by_number::Variables {
&self.gql_client,
self.gql_url.clone(),
block_by_number::Variables {
number: number.into(), number: number.into(),
}, })
)
.await .await
.expect("indexer connexion error") .blocks
.data .pop()
.and_then(move |mut data| data.blocks.pop())
} }
/// fetch genesis hash /// fetch genesis hash
// since this is always called before any other indexer request, check errors // since this is always called before any other indexer request, check errors in a more detailed way
pub async fn fetch_genesis_hash(&self) -> Result<Hash, GcliError> { pub async fn fetch_genesis_hash(&self) -> Result<Hash, GcliError> {
// try to connect to indexer // try to connect to indexer
let response = post_graphql::<GenesisHash, _>( let response = post_graphql::<GenesisHash, _>(
......
use graphql_client::GraphQLQuery;
use sp_core::Bytes;
// index → identity
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct IdentityNameByIndex;
// index → identity info
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct IdentityInfo;
// pubkey → identity
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct IdentityNameByPubkey;
// pubkey → wasidentity
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct WasIdentityNameByPubkey;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct LatestBlock;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct BlockByNumber;
#[derive(GraphQLQuery, Debug)]
#[graphql(
schema_path = "res/indexer-schema.json",
query_path = "res/indexer-queries.graphql"
)]
pub struct GenesisHash;
mod cache;
mod commands; mod commands;
mod conf; mod conf;
mod data; mod data;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment