diff --git a/.env.example b/.env.example index 72cfb0779207ccf855d803f31a87fabe25fc9712..1f8c977b2a0721c1a56f4ad1cad9c70fa74c4f4e 100644 --- a/.env.example +++ b/.env.example @@ -5,6 +5,7 @@ HASURA_GRAPHQL_ADMIN_SECRET=my_hasura_password HASURA_LISTEN_PORT=8080 DUNITER_ENDPOINTS="['wss://gdev.p2p.legal/ws', 'wss://gdev.coinduf.eu/ws', 'wss://gdev.cgeek.fr/ws']" -### OPTIONNAL ### +### OPTIONNAL: only for Cs+ data migration ### IMPORT_CSPLUS_DATA=false CSPLUS_ENDPOINT="https://g1.data.e-is.pro" +SQUID_ENDPOINT="https://subsquid.gdev.coinduf.eu/graphql" diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 73138f51ad99b06f7898a1becc3938ae4263f6db..f3ecb29cd31739a3f90e6c458c61c7e8c5edf16d 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -15,5 +15,6 @@ services: DB_DATABASE: ${DB_DATABASE} IMPORT_CSPLUS_DATA: ${IMPORT_CSPLUS_DATA} DUNITER_ENDPOINTS: ${DUNITER_ENDPOINTS} - CSPLUS_ENDPOINT: ${CSPLUS_ENDPOINT} HASURA_LISTEN_PORT: ${HASURA_LISTEN_PORT} + CSPLUS_ENDPOINT: ${CSPLUS_ENDPOINT} + SQUID_ENDPOINT: ${SQUID_ENDPOINT} diff --git a/lib/utils.ts b/lib/utils.ts index 10ebccac12557b83221b17495595b223f3bd2fe2..d1b9c8451bfc232099debdabb90fe82f3cb385c8 100644 --- a/lib/utils.ts +++ b/lib/utils.ts @@ -50,8 +50,6 @@ export async function runCsplusImport(isProduction: boolean) { if (!status.success) { throw new Error(`Process migrate_csplus exited with code: ${status.code}`); } - - console.log("End of Cs+ data import"); } export async function isProfilesTableEmpty(client: Client): Promise<boolean> { diff --git a/migrate_csplus/src/data_fetch.rs b/migrate_csplus/src/data_fetch.rs index f6a904d554b30af30d77b95d9a7b6d5dc6fc817f..2351fbdb660b0d5354acdb4d445843317e81b227 100644 --- a/migrate_csplus/src/data_fetch.rs +++ b/migrate_csplus/src/data_fetch.rs @@ -7,6 +7,7 @@ use tokio_postgres::NoTls; use crate::{ db::insert_profiles, + fetch_addresses_from_squid::fetch_ids_from_graphql, models::{Hit, ScrollResponse}, }; @@ -27,16 +28,22 @@ pub async fn fetch_profiles( let total_profiles = fetch_total_profiles_count(&url_profiles).await?; let total_pages = (total_profiles as f64 / page_size as f64).ceil() as usize; - println!("Start migration of {} CS+ profiles...", total_profiles); + println!( + "Start migration of {} CS+ profiles from {} endpoint ...", + total_profiles, csplus_endpoint + ); println!("Retrieving page: {}/{}", iteration, total_pages); // Fetch initial batch let mut future_scroll_response = fetch_batch(url_profiles.to_string(), initial_batch_body).await; + // Fetch IDs from the GraphQL endpoint + let ids = Arc::new(fetch_ids_from_graphql().await?); + while let Ok(current_response) = future_scroll_response { // Process the current batch - process_hits(pool_arc.clone(), ¤t_response.hits.hits).await?; + process_hits(pool_arc.clone(), ¤t_response.hits.hits, &ids).await?; // Log the retrieval or completion iteration += 1; @@ -77,6 +84,7 @@ async fn fetch_batch( async fn process_hits( pool: Arc<Mutex<Pool<PostgresConnectionManager<NoTls>>>>, hits: &[Hit], + ids: &Arc<Vec<String>>, ) -> Result<(), Box<dyn Error>> { println!("Processing..."); let mut tasks = Vec::new(); @@ -84,10 +92,12 @@ async fn process_hits( for chunk in hits.chunks(2000) { let pool_clone = pool.clone(); let chunk_clone = chunk.to_vec(); + let ids_clone = Arc::clone(&ids); let task = tokio::spawn(async move { if let Ok(mut client) = pool_clone.lock().await.get().await { - if let Err(e) = insert_profiles(&mut client, &chunk_clone).await { + if let Err(e) = insert_profiles(&mut client, &chunk_clone, ids_clone.as_ref()).await + { eprintln!("Insert error: {}", e); } } diff --git a/migrate_csplus/src/db.rs b/migrate_csplus/src/db.rs index 542e2195a28e779b02187f798cb6ea546c9312be..940b2dc3167786270586616eed7f830cec51246e 100644 --- a/migrate_csplus/src/db.rs +++ b/migrate_csplus/src/db.rs @@ -42,12 +42,25 @@ pub async fn database_connection( } /// Insert parsed profiles to Postgres database -pub async fn insert_profiles(client: &mut Client, hits: &[Hit]) -> Result<(), Box<dyn Error>> { +pub async fn insert_profiles( + client: &mut Client, + hits: &[Hit], + ids: &[String], +) -> Result<(), Box<dyn Error>> { let transaction = client.transaction().await?; for hit in hits { let profile_row = ProfileRow::from_hit(hit)?; + // Skip insert if address is not in the list of IDs + if profile_row + .address + .as_ref() + .map_or(true, |address| !ids.contains(address)) + { + continue; + } + // Construct the query using optional parameters for nullable fields let statement = transaction.prepare( "INSERT INTO public.profiles (address, avatar, description, geoloc, title, city, socials, created_at) diff --git a/migrate_csplus/src/fetch_addresses_from_squid.rs b/migrate_csplus/src/fetch_addresses_from_squid.rs new file mode 100644 index 0000000000000000000000000000000000000000..658d5da14bf1cbf8d4b60b59559e0482ee26f541 --- /dev/null +++ b/migrate_csplus/src/fetch_addresses_from_squid.rs @@ -0,0 +1,54 @@ +use reqwest; +use serde::Deserialize; +use serde_json::json; +use std::{env, error::Error}; + +// Define a structure to deserialize the GraphQL response +#[derive(Deserialize)] +struct GraphQLResponse { + data: GraphQLData, +} + +#[derive(Deserialize)] +struct GraphQLData { + accounts: Vec<Account>, +} + +#[derive(Deserialize)] +struct Account { + id: String, +} + +/// Get all existing accounts on squid indexer +pub async fn fetch_ids_from_graphql() -> Result<Vec<String>, Box<dyn Error>> { + let endpoint = env::var("SQUID_ENDPOINT").map_err(|_| "SQUID_ENDPOINT must be set")?; + println!("Fetch addresses from squid endpoint {} ...", endpoint); + + // Define the GraphQL query + let graphql_query = json!({ + "query": "{ accounts { id } }" + }); + + // Send the request + let client = reqwest::Client::new(); + let res = client.post(endpoint).json(&graphql_query).send().await?; + + // Check if the request was successful + if res.status().is_success() { + // Parse the response + let response_body: GraphQLResponse = res.json().await?; + // Extract the IDs + let ids = response_body + .data + .accounts + .into_iter() + .map(|account| account.id) + .collect(); + Ok(ids) + } else { + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to fetch data from GraphQL endpoint", + ))) + } +} diff --git a/migrate_csplus/src/main.rs b/migrate_csplus/src/main.rs index e80298227c6d84db4012fbe9aa231b22b0d2dc2f..e5f5ff748df65e2d923f9551baff3dbb1d688ac2 100644 --- a/migrate_csplus/src/main.rs +++ b/migrate_csplus/src/main.rs @@ -2,6 +2,7 @@ mod data_fetch; mod db; mod models; mod utils; +mod fetch_addresses_from_squid; use crate::db::database_connection;