Skip to content
Snippets Groups Projects
Commit c9bf2ba6 authored by poka's avatar poka
Browse files

feat(migration): skip Cs+ profile if doesn't exist on squid

parent 0335d7cb
No related branches found
No related tags found
No related merge requests found
...@@ -5,6 +5,7 @@ HASURA_GRAPHQL_ADMIN_SECRET=my_hasura_password ...@@ -5,6 +5,7 @@ HASURA_GRAPHQL_ADMIN_SECRET=my_hasura_password
HASURA_LISTEN_PORT=8080 HASURA_LISTEN_PORT=8080
DUNITER_ENDPOINTS="['wss://gdev.p2p.legal/ws', 'wss://gdev.coinduf.eu/ws', 'wss://gdev.cgeek.fr/ws']" DUNITER_ENDPOINTS="['wss://gdev.p2p.legal/ws', 'wss://gdev.coinduf.eu/ws', 'wss://gdev.cgeek.fr/ws']"
### OPTIONNAL ### ### OPTIONNAL: only for Cs+ data migration ###
IMPORT_CSPLUS_DATA=false IMPORT_CSPLUS_DATA=false
CSPLUS_ENDPOINT="https://g1.data.e-is.pro" CSPLUS_ENDPOINT="https://g1.data.e-is.pro"
SQUID_ENDPOINT="https://subsquid.gdev.coinduf.eu/graphql"
...@@ -15,5 +15,6 @@ services: ...@@ -15,5 +15,6 @@ services:
DB_DATABASE: ${DB_DATABASE} DB_DATABASE: ${DB_DATABASE}
IMPORT_CSPLUS_DATA: ${IMPORT_CSPLUS_DATA} IMPORT_CSPLUS_DATA: ${IMPORT_CSPLUS_DATA}
DUNITER_ENDPOINTS: ${DUNITER_ENDPOINTS} DUNITER_ENDPOINTS: ${DUNITER_ENDPOINTS}
CSPLUS_ENDPOINT: ${CSPLUS_ENDPOINT}
HASURA_LISTEN_PORT: ${HASURA_LISTEN_PORT} HASURA_LISTEN_PORT: ${HASURA_LISTEN_PORT}
CSPLUS_ENDPOINT: ${CSPLUS_ENDPOINT}
SQUID_ENDPOINT: ${SQUID_ENDPOINT}
...@@ -50,8 +50,6 @@ export async function runCsplusImport(isProduction: boolean) { ...@@ -50,8 +50,6 @@ export async function runCsplusImport(isProduction: boolean) {
if (!status.success) { if (!status.success) {
throw new Error(`Process migrate_csplus exited with code: ${status.code}`); throw new Error(`Process migrate_csplus exited with code: ${status.code}`);
} }
console.log("End of Cs+ data import");
} }
export async function isProfilesTableEmpty(client: Client): Promise<boolean> { export async function isProfilesTableEmpty(client: Client): Promise<boolean> {
......
...@@ -7,6 +7,7 @@ use tokio_postgres::NoTls; ...@@ -7,6 +7,7 @@ use tokio_postgres::NoTls;
use crate::{ use crate::{
db::insert_profiles, db::insert_profiles,
fetch_addresses_from_squid::fetch_ids_from_graphql,
models::{Hit, ScrollResponse}, models::{Hit, ScrollResponse},
}; };
...@@ -27,16 +28,22 @@ pub async fn fetch_profiles( ...@@ -27,16 +28,22 @@ pub async fn fetch_profiles(
let total_profiles = fetch_total_profiles_count(&url_profiles).await?; let total_profiles = fetch_total_profiles_count(&url_profiles).await?;
let total_pages = (total_profiles as f64 / page_size as f64).ceil() as usize; let total_pages = (total_profiles as f64 / page_size as f64).ceil() as usize;
println!("Start migration of {} CS+ profiles...", total_profiles); println!(
"Start migration of {} CS+ profiles from {} endpoint ...",
total_profiles, csplus_endpoint
);
println!("Retrieving page: {}/{}", iteration, total_pages); println!("Retrieving page: {}/{}", iteration, total_pages);
// Fetch initial batch // Fetch initial batch
let mut future_scroll_response = let mut future_scroll_response =
fetch_batch(url_profiles.to_string(), initial_batch_body).await; fetch_batch(url_profiles.to_string(), initial_batch_body).await;
// Fetch IDs from the GraphQL endpoint
let ids = Arc::new(fetch_ids_from_graphql().await?);
while let Ok(current_response) = future_scroll_response { while let Ok(current_response) = future_scroll_response {
// Process the current batch // Process the current batch
process_hits(pool_arc.clone(), &current_response.hits.hits).await?; process_hits(pool_arc.clone(), &current_response.hits.hits, &ids).await?;
// Log the retrieval or completion // Log the retrieval or completion
iteration += 1; iteration += 1;
...@@ -77,6 +84,7 @@ async fn fetch_batch( ...@@ -77,6 +84,7 @@ async fn fetch_batch(
async fn process_hits( async fn process_hits(
pool: Arc<Mutex<Pool<PostgresConnectionManager<NoTls>>>>, pool: Arc<Mutex<Pool<PostgresConnectionManager<NoTls>>>>,
hits: &[Hit], hits: &[Hit],
ids: &Arc<Vec<String>>,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
println!("Processing..."); println!("Processing...");
let mut tasks = Vec::new(); let mut tasks = Vec::new();
...@@ -84,10 +92,12 @@ async fn process_hits( ...@@ -84,10 +92,12 @@ async fn process_hits(
for chunk in hits.chunks(2000) { for chunk in hits.chunks(2000) {
let pool_clone = pool.clone(); let pool_clone = pool.clone();
let chunk_clone = chunk.to_vec(); let chunk_clone = chunk.to_vec();
let ids_clone = Arc::clone(&ids);
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
if let Ok(mut client) = pool_clone.lock().await.get().await { if let Ok(mut client) = pool_clone.lock().await.get().await {
if let Err(e) = insert_profiles(&mut client, &chunk_clone).await { if let Err(e) = insert_profiles(&mut client, &chunk_clone, ids_clone.as_ref()).await
{
eprintln!("Insert error: {}", e); eprintln!("Insert error: {}", e);
} }
} }
......
...@@ -42,12 +42,25 @@ pub async fn database_connection( ...@@ -42,12 +42,25 @@ pub async fn database_connection(
} }
/// Insert parsed profiles to Postgres database /// Insert parsed profiles to Postgres database
pub async fn insert_profiles(client: &mut Client, hits: &[Hit]) -> Result<(), Box<dyn Error>> { pub async fn insert_profiles(
client: &mut Client,
hits: &[Hit],
ids: &[String],
) -> Result<(), Box<dyn Error>> {
let transaction = client.transaction().await?; let transaction = client.transaction().await?;
for hit in hits { for hit in hits {
let profile_row = ProfileRow::from_hit(hit)?; let profile_row = ProfileRow::from_hit(hit)?;
// Skip insert if address is not in the list of IDs
if profile_row
.address
.as_ref()
.map_or(true, |address| !ids.contains(address))
{
continue;
}
// Construct the query using optional parameters for nullable fields // Construct the query using optional parameters for nullable fields
let statement = transaction.prepare( let statement = transaction.prepare(
"INSERT INTO public.profiles (address, avatar, description, geoloc, title, city, socials, created_at) "INSERT INTO public.profiles (address, avatar, description, geoloc, title, city, socials, created_at)
......
use reqwest;
use serde::Deserialize;
use serde_json::json;
use std::{env, error::Error};
// Define a structure to deserialize the GraphQL response
#[derive(Deserialize)]
struct GraphQLResponse {
data: GraphQLData,
}
#[derive(Deserialize)]
struct GraphQLData {
accounts: Vec<Account>,
}
#[derive(Deserialize)]
struct Account {
id: String,
}
/// Get all existing accounts on squid indexer
pub async fn fetch_ids_from_graphql() -> Result<Vec<String>, Box<dyn Error>> {
let endpoint = env::var("SQUID_ENDPOINT").map_err(|_| "SQUID_ENDPOINT must be set")?;
println!("Fetch addresses from squid endpoint {} ...", endpoint);
// Define the GraphQL query
let graphql_query = json!({
"query": "{ accounts { id } }"
});
// Send the request
let client = reqwest::Client::new();
let res = client.post(endpoint).json(&graphql_query).send().await?;
// Check if the request was successful
if res.status().is_success() {
// Parse the response
let response_body: GraphQLResponse = res.json().await?;
// Extract the IDs
let ids = response_body
.data
.accounts
.into_iter()
.map(|account| account.id)
.collect();
Ok(ids)
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to fetch data from GraphQL endpoint",
)))
}
}
...@@ -2,6 +2,7 @@ mod data_fetch; ...@@ -2,6 +2,7 @@ mod data_fetch;
mod db; mod db;
mod models; mod models;
mod utils; mod utils;
mod fetch_addresses_from_squid;
use crate::db::database_connection; use crate::db::database_connection;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment