diff --git a/docker-compose.yml b/docker-compose.yml index e6c6e38ae4aa5e8e1240ab4a64be1fc04f691ff8..fcddee36af738acd23e0549708a431c1767c554f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.6" services: postgres-datapod: - image: postgres:15 + image: postgres:16 restart: always volumes: - db_data:/var/lib/postgresql/data diff --git a/migrate_csplus/Cargo.lock b/migrate_csplus/Cargo.lock index 49c8e743934b2ce83abe30b355da73bdfb28ac09..2d773d7beaab5ffbab2621ede0a2aea64bd37558 100644 --- a/migrate_csplus/Cargo.lock +++ b/migrate_csplus/Cargo.lock @@ -295,6 +295,31 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "bb8" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2" +dependencies = [ + "async-trait", + "futures-channel", + "futures-util", + "parking_lot", + "tokio", +] + +[[package]] +name = "bb8-postgres" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ac82c42eb30889b5c4ee4763a24b8c566518171ebea648cd7e3bc532c60680" +dependencies = [ + "async-trait", + "bb8", + "tokio", + "tokio-postgres", +] + [[package]] name = "bincode" version = "1.3.3" @@ -1425,6 +1450,8 @@ name = "migrate_csplus" version = "0.1.0" dependencies = [ "base64 0.21.5", + "bb8", + "bb8-postgres", "bs58", "dotenv", "reqwest", diff --git a/migrate_csplus/Cargo.toml b/migrate_csplus/Cargo.toml index 5c612e71a98b69fc5d316f46045140a4dcf299a6..c357793a03cdd0bc72d468abcd720b569cd9b5ee 100644 --- a/migrate_csplus/Cargo.toml +++ b/migrate_csplus/Cargo.toml @@ -7,6 +7,8 @@ edition = "2018" [dependencies] base64 = "0.21.5" +bb8 = "0.8.1" +bb8-postgres = "0.8.1" bs58 = "0.5.0" dotenv = "0.15.0" reqwest = { version = "0.11", features = ["json"] } diff --git a/migrate_csplus/src/data_fetch.rs b/migrate_csplus/src/data_fetch.rs new file mode 100644 index 0000000000000000000000000000000000000000..2991fbc8bb099dbe759d800275b9883307c886fe --- /dev/null +++ b/migrate_csplus/src/data_fetch.rs @@ -0,0 +1,98 @@ +use bb8::Pool; +use bb8_postgres::PostgresConnectionManager; +use reqwest; +use std::error::Error; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::task; +use tokio_postgres::NoTls; + +use crate::{ + db::insert_profiles, + models::{Hit, ScrollResponse}, +}; + +/// Fetches profiles and processes them in batches. +pub async fn fetch_profiles( + pool_arc: Arc<Mutex<Pool<PostgresConnectionManager<NoTls>>>>, +) -> Result<(), Box<dyn Error>> { + let scroll_time = "5m"; + let page_size = 5000; + let mut iteration = 1; + println!("Retrieving page: {}", iteration); + let initial_response = reqwest::Client::new() + .post("https://g1.data.e-is.pro/user/profile/_search?scroll=5m") + .json(&serde_json::json!({ "query": { "match_all": {} }, "size": page_size })) + .send() + .await? + .json::<ScrollResponse>() + .await?; + + let mut scroll_id = initial_response.scroll_id; + + println!("Processing..."); + process_hits(pool_arc.clone(), &initial_response.hits.hits).await?; + + while !scroll_id.is_empty() { + iteration += 1; + println!("Retrieving page: {}", iteration); + + let response = fetch_scroll_batch(&scroll_id, scroll_time).await?; + + if response.hits.hits.is_empty() { + println!("CS+ data are fully imported !"); + break; + } + + println!("Processing..."); + process_hits(pool_arc.clone(), &response.hits.hits).await?; + scroll_id = response.scroll_id; + } + + Ok(()) +} + +/// Fetches a batch of data based on the scroll ID and time. +pub async fn fetch_scroll_batch( + scroll_id: &str, + scroll_time: &str, +) -> Result<ScrollResponse, Box<dyn Error>> { + let client = reqwest::Client::new(); + let response = client + .post("https://g1.data.e-is.pro/_search/scroll") + .json(&serde_json::json!({ "scroll": scroll_time, "scroll_id": scroll_id })) + .send() + .await? + .json::<ScrollResponse>() + .await?; + Ok(response) +} + +/// Processes the hits from the fetched data. +pub async fn process_hits( + pool: Arc<Mutex<Pool<PostgresConnectionManager<NoTls>>>>, + hits: &[Hit], +) -> Result<(), Box<dyn Error>> { + let mut tasks = Vec::new(); + + for chunk in hits.chunks(1000) { + let pool_clone = pool.clone(); + let chunk_clone = chunk.to_vec(); // Cloner le chunk + + let task = task::spawn(async move { + if let Ok(mut client) = pool_clone.lock().await.get().await { + if let Err(e) = insert_profiles(&mut client, &chunk_clone).await { + eprintln!("Insert error: {}", e); + } + } + }); + + tasks.push(task); + } + + for task in tasks { + task.await?; + } + + Ok(()) +} diff --git a/migrate_csplus/src/db.rs b/migrate_csplus/src/db.rs new file mode 100644 index 0000000000000000000000000000000000000000..1ef928391f7dbdbe9d10fa8c5333e41d9babe3f6 --- /dev/null +++ b/migrate_csplus/src/db.rs @@ -0,0 +1,72 @@ +use crate::models::{Hit, ProfileRow}; +use bb8::Pool; +use bb8_postgres::PostgresConnectionManager; +use dotenv::dotenv; +use std::{env, error::Error, sync::Arc}; +use tokio::sync::Mutex; +use tokio_postgres::{types::ToSql, Client, NoTls}; + +/// Create a pool connection to Postgres database +pub async fn database_connection( +) -> Result<Arc<Mutex<Pool<PostgresConnectionManager<NoTls>>>>, Box<dyn Error>> { + let is_production = env::var("PRODUCTION").unwrap_or_else(|_| String::from("false")) == "true"; + + if !is_production { + dotenv().ok(); + } + + let db_user = env::var("DB_USER").map_err(|_| "DB_USER must be set")?; + let db_password = env::var("DB_PASSWORD").map_err(|_| "DB_PASSWORD must be set")?; + let db_database = env::var("DB_DATABASE").map_err(|_| "DB_DATABASE must be set")?; + let db_hostname = if is_production { + "postgres-datapod" + } else { + "localhost" + }; + let database_url = format!( + "postgres://{}:{}@{}:5432/{}", + db_user, db_password, db_hostname, db_database + ); + + // Database connection setup + let manager = PostgresConnectionManager::new_from_stringlike(database_url, NoTls) + .map_err(|e| Box::new(e) as Box<dyn Error>)?; + let pool = Pool::builder() + .build(manager) + .await + .map_err(|e| Box::new(e) as Box<dyn Error>)?; + + Ok(Arc::new(Mutex::new(pool))) +} + +/// Insert parsed profiles to Postgres database +pub async fn insert_profiles(client: &mut Client, hits: &[Hit]) -> Result<(), Box<dyn Error>> { + let transaction = client.transaction().await?; + + for hit in hits { + let profile_row = ProfileRow::from_hit(hit)?; + + // Construct the query using optional parameters for nullable fields + let statement = transaction.prepare( + "INSERT INTO public.profiles (address, avatar, description, geoloc, title, city, socials) + VALUES ($1, $2, $3, point($4, $5), $6, $7, $8)" + ).await?; + + // Prepare parameters, handling None values appropriately + let params: [&(dyn ToSql + Sync); 8] = [ + &profile_row.address, + &profile_row.avatar, + &profile_row.description, + &profile_row.geoloc.map(|(lat, _)| lat), + &profile_row.geoloc.map(|(_, lon)| lon), + &profile_row.title, + &profile_row.city, + &profile_row.socials, + ]; + + transaction.execute(&statement, ¶ms).await?; + } + + transaction.commit().await?; + Ok(()) +} diff --git a/migrate_csplus/src/main.rs b/migrate_csplus/src/main.rs index 9b4c690f39f345d46aea9d585201e94b79a7d16e..083c016b59bcb4a522c26a7025f55e28063f46db 100644 --- a/migrate_csplus/src/main.rs +++ b/migrate_csplus/src/main.rs @@ -1,257 +1,22 @@ -use base64::{engine::general_purpose, Engine as _}; -use bs58; -use dotenv::dotenv; -use reqwest; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use sp_core::crypto::{AccountId32, Ss58Codec}; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::error::Error; -use tokio_postgres::types::ToSql; -use tokio_postgres::{Client, NoTls}; +mod data_fetch; +mod db; +mod models; +mod utils; -#[derive(Serialize, Deserialize, Debug)] -struct Hit { - #[serde(rename = "_source")] - source: HashMap<String, Value>, -} - -#[derive(Serialize, Deserialize, Debug)] -struct ScrollResponse { - #[serde(rename = "_scroll_id")] - scroll_id: String, - hits: Hits, -} - -#[derive(Serialize, Deserialize, Debug)] -struct Hits { - hits: Vec<Hit>, -} - -async fn fetch_scroll_batch( - scroll_id: &str, - scroll_time: &str, -) -> Result<ScrollResponse, Box<dyn Error>> { - let client = reqwest::Client::new(); - let response = client - .post("https://g1.data.e-is.pro/_search/scroll") - .json(&serde_json::json!({ "scroll": scroll_time, "scroll_id": scroll_id })) - .send() - .await? - .json::<ScrollResponse>() - .await?; - Ok(response) -} - -async fn insert_profile( - client: &Client, - profile: &HashMap<String, Value>, -) -> Result<(), Box<dyn Error>> { - let address = base58_to_ss58( - profile - .get("issuer") - .unwrap_or(&Value::Null) - .as_str() - .unwrap_or_default(), - ); - let description = profile - .get("description") - .unwrap_or(&Value::Null) - .as_str() - .unwrap_or_default(); - let (lat, lon) = - convert_geoloc(profile.get("geoPoint").unwrap_or(&Value::Null)).unwrap_or_default(); - let title = profile - .get("title") - .unwrap_or(&Value::Null) - .as_str() - .unwrap_or_default(); - let city = profile - .get("city") - .unwrap_or(&Value::Null) - .as_str() - .unwrap_or_default(); - let socials_str = profile.get("socials").unwrap_or(&Value::Null).to_string(); - let socials_json = - serde_json::from_str::<serde_json::Value>(&socials_str).unwrap_or(serde_json::Value::Null); - let avatar_base64 = profile - .get("avatar") - .and_then(|av| av.get("_content")) - .and_then(|content| content.as_str()) - .unwrap_or_default(); - let mut avatar_bytes = Vec::new(); - if !avatar_base64.is_empty() { - if general_purpose::STANDARD - .decode_vec(avatar_base64, &mut avatar_bytes) - .is_err() - { - avatar_bytes.clear(); - } - } - - let mut query: String = String::from("INSERT INTO public.profiles (address"); - let mut params: Vec<&(dyn ToSql + Sync)> = vec![&address]; - let mut param_index: i32 = 1; - let mut param_index_value: String = String::from("$1"); - - if !avatar_bytes.is_empty() { - query.push_str(", avatar"); - params.push(&avatar_bytes); - param_index += 1; - param_index_value.push_str(&format!(", ${}", param_index)); - } - if !description.is_empty() { - query.push_str(", description"); - params.push(&description); - param_index += 1; - param_index_value.push_str(&format!(", ${}", param_index)); - } - if lat != 0.0 || lon != 0.0 { - query.push_str(", geoloc"); - params.push(&lat); - params.push(&lon); - param_index += 1; - param_index_value.push_str(&format!(", point(${}, ${})", param_index, param_index + 1)); - param_index += 1; - } - if !title.is_empty() { - query.push_str(", title"); - params.push(&title); - param_index += 1; - param_index_value.push_str(&format!(", ${}", param_index)); - } - if !city.is_empty() { - query.push_str(", city"); - params.push(&city); - param_index += 1; - param_index_value.push_str(&format!(", ${}", param_index)); - } - if socials_json != serde_json::Value::Null { - query.push_str(", socials"); - params.push(&socials_json); - param_index += 1; - param_index_value.push_str(&format!(", ${}", param_index)); - } - - query.push_str(") VALUES ("); - query.push_str(¶m_index_value); - query.push_str(")"); - - let statement = client.prepare(&query).await?; - client.execute(&statement, ¶ms).await?; - - // This is the mush better way to do, but we want null values instead of default values - // let statement = client.prepare_typed( - // "INSERT INTO public.profiles (address, avatar, description, geoloc, title, city, socials) VALUES ($1, $2, $3, point($4, $5), $6, $7, $8)", - // &[Type::VARCHAR, Type::BYTEA, Type::TEXT, Type::FLOAT8, Type::FLOAT8, Type::VARCHAR, Type::VARCHAR, Type::JSONB] - // ).await?; - - // client.execute( - // &statement, - // &[&address, &avatar_bytes, &description, &lat, &lon, &title, &city, &socials_json] - // ).await?; - - Ok(()) -} - -fn convert_geoloc(geoloc_value: &Value) -> Option<(f64, f64)> { - if let Some(geoloc) = geoloc_value.as_object() { - let lat = geoloc - .get("lat") - .and_then(Value::as_f64) - .unwrap_or_default(); - let lon = geoloc - .get("lon") - .and_then(Value::as_f64) - .unwrap_or_default(); - Some((lon, lat)) - } else { - None - } -} - -fn base58_to_ss58(base58: &str) -> String { - let pubkey_bytes = bs58::decode(base58).into_vec().unwrap(); - let pubkey_array = <[u8; 32]>::try_from(pubkey_bytes.as_slice()).unwrap(); - let account_id = AccountId32::from(pubkey_array); - account_id.to_ss58check() -} - -async fn process_hits(client: &Client, hits: &[Hit]) -> Result<(), Box<dyn Error>> { - for hit in hits { - insert_profile(client, &hit.source).await?; - } - Ok(()) -} - -async fn fetch_profiles() -> Result<(), Box<dyn Error>> { - let is_production: bool = - std::env::var("PRODUCTION").unwrap_or_else(|_| String::from("false")) == "true"; - - if !is_production { - dotenv().ok(); - } - - let db_user = std::env::var("DB_USER").unwrap_or_else(|_| "postgres".to_string()); - let db_password = std::env::var("DB_PASSWORD").unwrap_or_else(|_| "password".to_string()); - let db_database = std::env::var("DB_DATABASE").unwrap_or_else(|_| "postgres".to_string()); - - // Utilisez 'is_production' pour définir 'db_hostname' - let db_hostname: &str = if is_production { - "postgres-datapod" - } else { - "localhost" - }; - - let database_url = format!( - "postgres://{}:{}@{}:5432/{}", - db_user, db_password, db_hostname, db_database - ); - - let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - let scroll_time = "5m"; // Scroll duration - let page_size = 5000; // Page size - let initial_response = reqwest::Client::new() - .post("https://g1.data.e-is.pro/user/profile/_search?scroll=5m") - .json(&serde_json::json!({ "query": { "match_all": {} }, "size": page_size })) - .send() - .await? - .json::<ScrollResponse>() - .await?; - - let mut scroll_id = initial_response.scroll_id; - process_hits(&client, &initial_response.hits.hits).await?; - let mut iteration = 0; - - while !scroll_id.is_empty() { - iteration += 1; - println!("page: {}", iteration); - - let response = fetch_scroll_batch(&scroll_id, scroll_time).await?; - - if response.hits.hits.is_empty() { - break; - } - - process_hits(&client, &response.hits.hits).await?; - scroll_id = response.scroll_id; - } - - Ok(()) -} +use crate::db::database_connection; #[tokio::main] async fn main() { - println!("Starting Cs+ data migration..."); - if let Err(e) = fetch_profiles().await { - eprintln!("Error: {}", e); + println!("Starting data migration..."); + + // Initialize database connection + match database_connection().await { + Ok(db_pool_arc) => { + // Start the data fetching process + if let Err(e) = data_fetch::fetch_profiles(db_pool_arc).await { + eprintln!("Error during profile fetching: {}", e); + } + } + Err(e) => eprintln!("Failed to establish database connection: {}", e), } } diff --git a/migrate_csplus/src/models.rs b/migrate_csplus/src/models.rs new file mode 100644 index 0000000000000000000000000000000000000000..2a0ca83997331f41da9dca95401466c2a6f94cfa --- /dev/null +++ b/migrate_csplus/src/models.rs @@ -0,0 +1,56 @@ +use crate::utils::*; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::{collections::HashMap, error::Error}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Hit { + #[serde(rename = "_source")] + pub source: HashMap<String, Value>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ScrollResponse { + #[serde(rename = "_scroll_id")] + pub scroll_id: String, + pub hits: Hits, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Hits { + pub hits: Vec<Hit>, +} + +pub struct ProfileRow { + pub address: Option<String>, + pub avatar: Option<Vec<u8>>, + pub description: Option<String>, + pub geoloc: Option<(f64, f64)>, + pub title: Option<String>, + pub city: Option<String>, + pub socials: Option<Value>, +} + +impl ProfileRow { + pub fn from_hit(hit: &Hit) -> Result<Self, Box<dyn Error>> { + let profile = &hit.source; + + Ok(Self { + address: empty_str_to_none(profile.get("issuer").and_then(Value::as_str)).map(|s| { + base58_to_ss58(&s).expect(&format!("Cannot convert pubkey {} to address", s)) + }), + avatar: profile + .get("avatar") + .and_then(|av| av.get("_content")) + .and_then(Value::as_str) + .and_then(|base64| decode_avatar(base64).ok()), + description: empty_str_to_none(profile.get("description").and_then(Value::as_str)), + geoloc: convert_geoloc(profile.get("geoPoint")), + title: empty_str_to_none(profile.get("title").and_then(Value::as_str)), + city: empty_str_to_none(profile.get("city").and_then(Value::as_str)), + socials: profile + .get("socials") + .map(|s| serde_json::to_value(s).unwrap_or(serde_json::Value::Null)), + }) + } +} diff --git a/migrate_csplus/src/utils.rs b/migrate_csplus/src/utils.rs new file mode 100644 index 0000000000000000000000000000000000000000..a0115cb51c41f780f9a3a94368125e7b328d37f2 --- /dev/null +++ b/migrate_csplus/src/utils.rs @@ -0,0 +1,43 @@ +use base64::{engine::general_purpose, DecodeError, Engine as _}; +use bs58; +use serde_json::Value; +use sp_core::crypto::{AccountId32, Ss58Codec}; +use std::convert::TryFrom; + +/// Converts a geolocation value (if present) to a tuple of (latitude, longitude). +pub fn convert_geoloc(geoloc_value: Option<&Value>) -> Option<(f64, f64)> { + geoloc_value.and_then(|v| v.as_object()).and_then(|geoloc| { + let lat = geoloc + .get("lat") + .and_then(Value::as_f64) + .unwrap_or_default(); + let lon = geoloc + .get("lon") + .and_then(Value::as_f64) + .unwrap_or_default(); + Some((lat, lon)) + }) +} + +/// Decodes a Base64-encoded avatar image into a byte vector. +pub fn decode_avatar(base64_str: &str) -> Result<Vec<u8>, DecodeError> { + let mut buf = Vec::new(); + general_purpose::STANDARD.decode_vec(base64_str.as_bytes(), &mut buf)?; + Ok(buf) +} + +/// Converts a Base58-encoded string to an SS58-encoded string. +pub fn base58_to_ss58(base58: &str) -> Result<String, Box<dyn std::error::Error>> { + let pubkey_bytes = bs58::decode(base58).into_vec()?; + let pubkey_array = <[u8; 32]>::try_from(pubkey_bytes.as_slice())?; + let account_id = AccountId32::from(pubkey_array); + Ok(account_id.to_ss58check()) +} + +/// Replace empty string by null value +pub fn empty_str_to_none(value: Option<&str>) -> Option<String> { + match value { + Some(s) if !s.is_empty() => Some(s.to_string()), + _ => None, + } +}