Skip to content
Snippets Groups Projects
Commit bcefae6f authored by poka's avatar poka
Browse files

feat: add rust script to migrate cs+ data to psql database

parent c7e28f02
No related branches found
No related tags found
No related merge requests found
.env .env
scrap_csplus/target/
...@@ -5,3 +5,5 @@ COPY ./scripts/init-hasura.sh /init-hasura.sh ...@@ -5,3 +5,5 @@ COPY ./scripts/init-hasura.sh /init-hasura.sh
COPY ./config.yaml /config.yaml COPY ./config.yaml /config.yaml
RUN curl -sL https://github.com/hasura/graphql-engine/raw/stable/cli/get.sh | bash 2>/dev/null RUN curl -sL https://github.com/hasura/graphql-engine/raw/stable/cli/get.sh | bash 2>/dev/null
ENTRYPOINT ["/init-hasura.sh"]
...@@ -30,7 +30,6 @@ services: ...@@ -30,7 +30,6 @@ services:
HASURA_GRAPHQL_ADMIN_SECRET: ${HASURA_GRAPHQL_ADMIN_SECRET} HASURA_GRAPHQL_ADMIN_SECRET: ${HASURA_GRAPHQL_ADMIN_SECRET}
HASURA_GRAPHQL_UNAUTHORIZED_ROLE: public HASURA_GRAPHQL_UNAUTHORIZED_ROLE: public
HASURA_GRAPHQL_ENABLE_TELEMETRY: "false" HASURA_GRAPHQL_ENABLE_TELEMETRY: "false"
command: sh /init-hasura.sh
volumes: volumes:
db_data: db_data:
...@@ -4,6 +4,9 @@ CREATE TABLE public.profiles ( ...@@ -4,6 +4,9 @@ CREATE TABLE public.profiles (
avatar bytea, avatar bytea,
description text, description text,
geoloc point, geoloc point,
title text,
city text,
socials jsonb,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP, created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP
); );
......
This diff is collapsed.
[package]
name = "scrap_csplus"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
base64 = "0.21.5"
bs58 = "0.5.0"
dotenv = "0.15.0"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sp-core = "27.0.0"
tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "0.7.10", features = ["with-serde_json-1"] }
use reqwest;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::Write;
use std::env;
#[derive(Serialize, Deserialize, Debug)]
struct Hit {
#[serde(rename = "_source")]
source: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ScrollResponse {
#[serde(rename = "_scroll_id")]
scroll_id: String,
hits: Hits,
}
#[derive(Serialize, Deserialize, Debug)]
struct Hits {
hits: Vec<Hit>,
}
async fn fetch_scroll_batch(scroll_id: &str, scroll_time: &str) -> Result<ScrollResponse, Box<dyn Error>> {
let client = reqwest::Client::new();
let response = client
.post("https://g1.data.e-is.pro/_search/scroll")
.json(&serde_json::json!({ "scroll": scroll_time, "scroll_id": scroll_id }))
.send()
.await?
.json::<ScrollResponse>()
.await?;
Ok(response)
}
fn process_hits(hits: &[Hit]) -> Vec<HashMap<String, Value>> {
hits.iter().map(|hit| {
let mut profile = hit.source.clone();
profile.remove("signature");
profile.remove("version");
profile.remove("hash");
profile.remove("tags");
profile
}).collect()
}
async fn fetch_profiles() -> Result<(), Box<dyn Error>> {
let args: Vec<String> = env::args().collect();
let get_avatar = if args.len() > 1 {
args[1].parse::<bool>().unwrap_or(false)
} else {
false
};
let mut url = String::from("https://g1.data.e-is.pro/user/profile/_search?scroll=5m");
if !get_avatar {
println!("Avatars are skipped");
url.push_str("&_source_exclude=avatar._content");
} else {
println!("Avatars are recovered");
}
let scroll_time = "5m"; // Scroll duration
let page_size = 5000; // Page size
let initial_response = reqwest::Client::new()
.post(url)
.json(&serde_json::json!({ "query": { "match_all": {} }, "size": page_size }))
.send()
.await?
.json::<ScrollResponse>()
.await?;
let mut scroll_id = initial_response.scroll_id;
let mut profiles: Vec<HashMap<String, Value>> = process_hits(&initial_response.hits.hits);
let mut iteration = 0;
while !scroll_id.is_empty() {
iteration += 1;
println!("Iteration: {}", iteration);
let response = fetch_scroll_batch(&scroll_id, scroll_time).await?;
let new_profiles = process_hits(&response.hits.hits);
if new_profiles.is_empty() {
break;
}
profiles.extend(new_profiles);
scroll_id = response.scroll_id;
}
let mut file = File::create("profile_csplus.json")?;
write!(file, "{}", serde_json::to_string(&profiles)?)?;
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = fetch_profiles().await {
eprintln!("Error: {}", e);
}
}
\ No newline at end of file
use base64::{engine::general_purpose, Engine as _};
use bs58;
use dotenv::dotenv;
use reqwest;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sp_core::crypto::{AccountId32, Ss58Codec};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::error::Error;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls};
#[derive(Serialize, Deserialize, Debug)]
struct Hit {
#[serde(rename = "_source")]
source: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ScrollResponse {
#[serde(rename = "_scroll_id")]
scroll_id: String,
hits: Hits,
}
#[derive(Serialize, Deserialize, Debug)]
struct Hits {
hits: Vec<Hit>,
}
async fn fetch_scroll_batch(
scroll_id: &str,
scroll_time: &str,
) -> Result<ScrollResponse, Box<dyn Error>> {
let client = reqwest::Client::new();
let response = client
.post("https://g1.data.e-is.pro/_search/scroll")
.json(&serde_json::json!({ "scroll": scroll_time, "scroll_id": scroll_id }))
.send()
.await?
.json::<ScrollResponse>()
.await?;
Ok(response)
}
async fn insert_profile(
client: &Client,
profile: &HashMap<String, Value>,
) -> Result<(), Box<dyn Error>> {
let address = base58_to_ss58(
profile
.get("issuer")
.unwrap_or(&Value::Null)
.as_str()
.unwrap_or_default(),
);
let description = profile
.get("description")
.unwrap_or(&Value::Null)
.as_str()
.unwrap_or_default();
let (lat, lon) =
convert_geoloc(profile.get("geoPoint").unwrap_or(&Value::Null)).unwrap_or_default();
let title = profile
.get("title")
.unwrap_or(&Value::Null)
.as_str()
.unwrap_or_default();
let city = profile
.get("city")
.unwrap_or(&Value::Null)
.as_str()
.unwrap_or_default();
let socials_str = profile.get("socials").unwrap_or(&Value::Null).to_string();
let socials_json =
serde_json::from_str::<serde_json::Value>(&socials_str).unwrap_or(serde_json::Value::Null);
let avatar_base64 = profile
.get("avatar")
.and_then(|av| av.get("_content"))
.and_then(|content| content.as_str())
.unwrap_or_default();
let mut avatar_bytes = Vec::new();
if !avatar_base64.is_empty() {
if general_purpose::STANDARD
.decode_vec(avatar_base64, &mut avatar_bytes)
.is_err()
{
avatar_bytes.clear();
}
}
let mut query: String = String::from("INSERT INTO public.profiles (address");
let mut params: Vec<&(dyn ToSql + Sync)> = vec![&address];
let mut param_index: i32 = 1;
let mut param_index_value: String = String::from("$1");
if !avatar_bytes.is_empty() {
query.push_str(", avatar");
params.push(&avatar_bytes);
param_index += 1;
param_index_value.push_str(&format!(", ${}", param_index));
}
if !description.is_empty() {
query.push_str(", description");
params.push(&description);
param_index += 1;
param_index_value.push_str(&format!(", ${}", param_index));
}
if lat != 0.0 || lon != 0.0 {
query.push_str(", geoloc");
params.push(&lat);
params.push(&lon);
param_index += 1;
param_index_value.push_str(&format!(", point(${}, ${})", param_index, param_index + 1));
param_index += 1;
}
if !title.is_empty() {
query.push_str(", title");
params.push(&title);
param_index += 1;
param_index_value.push_str(&format!(", ${}", param_index));
}
if !city.is_empty() {
query.push_str(", city");
params.push(&city);
param_index += 1;
param_index_value.push_str(&format!(", ${}", param_index));
}
if socials_json != serde_json::Value::Null {
query.push_str(", socials");
params.push(&socials_json);
param_index += 1;
param_index_value.push_str(&format!(", ${}", param_index));
}
query.push_str(") VALUES (");
query.push_str(&param_index_value);
query.push_str(")");
let statement = client.prepare(&query).await?;
client.execute(&statement, &params).await?;
// This is the mush better way to do, but we want null values instead of default values
// let statement = client.prepare_typed(
// "INSERT INTO public.profiles (address, avatar, description, geoloc, title, city, socials) VALUES ($1, $2, $3, point($4, $5), $6, $7, $8)",
// &[Type::VARCHAR, Type::BYTEA, Type::TEXT, Type::FLOAT8, Type::FLOAT8, Type::VARCHAR, Type::VARCHAR, Type::JSONB]
// ).await?;
// client.execute(
// &statement,
// &[&address, &avatar_bytes, &description, &lat, &lon, &title, &city, &socials_json]
// ).await?;
Ok(())
}
fn convert_geoloc(geoloc_value: &Value) -> Option<(f64, f64)> {
if let Some(geoloc) = geoloc_value.as_object() {
let lat = geoloc
.get("lat")
.and_then(Value::as_f64)
.unwrap_or_default();
let lon = geoloc
.get("lon")
.and_then(Value::as_f64)
.unwrap_or_default();
Some((lon, lat))
} else {
None
}
}
fn base58_to_ss58(base58: &str) -> String {
let pubkey_bytes = bs58::decode(base58).into_vec().unwrap();
let pubkey_array = <[u8; 32]>::try_from(pubkey_bytes.as_slice()).unwrap();
let account_id = AccountId32::from(pubkey_array);
account_id.to_ss58check()
}
async fn process_hits(client: &Client, hits: &[Hit]) -> Result<(), Box<dyn Error>> {
for hit in hits {
insert_profile(client, &hit.source).await?;
}
Ok(())
}
async fn fetch_profiles() -> Result<(), Box<dyn Error>> {
let is_production: bool =
std::env::var("PRODUCTION").unwrap_or_else(|_| String::from("false")) == "true";
if !is_production {
dotenv().ok();
}
let db_user = std::env::var("DB_USER").unwrap_or_else(|_| "postgres".to_string());
let db_password = std::env::var("DB_PASSWORD").unwrap_or_else(|_| "password".to_string());
let db_database = std::env::var("DB_DATABASE").unwrap_or_else(|_| "postgres".to_string());
// Utilisez 'is_production' pour définir 'db_hostname'
let db_hostname: &str = if is_production {
"postgres-datapod"
} else {
"localhost"
};
let database_url = format!(
"postgres://{}:{}@{}:5432/{}",
db_user, db_password, db_hostname, db_database
);
let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let scroll_time = "5m"; // Scroll duration
let page_size = 5000; // Page size
let initial_response = reqwest::Client::new()
.post("https://g1.data.e-is.pro/user/profile/_search?scroll=5m")
.json(&serde_json::json!({ "query": { "match_all": {} }, "size": page_size }))
.send()
.await?
.json::<ScrollResponse>()
.await?;
let mut scroll_id = initial_response.scroll_id;
process_hits(&client, &initial_response.hits.hits).await?;
let mut iteration = 0;
while !scroll_id.is_empty() {
iteration += 1;
println!("page: {}", iteration);
let response = fetch_scroll_batch(&scroll_id, scroll_time).await?;
if response.hits.hits.is_empty() {
break;
}
process_hits(&client, &response.hits.hits).await?;
scroll_id = response.scroll_id;
}
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = fetch_profiles().await {
eprintln!("Error: {}", e);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment