diff --git a/Cargo.lock b/Cargo.lock index bea864b280641d246b2d9a55f79345b04a3fee05..e6c8605dd5f08e407a3675a5390fe23d650f2e25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1072,10 +1072,13 @@ dependencies = [ "duniter-gva", "duniter-gva-db-writer", "fast-threadpool", + "flume", + "once_cell", "rayon", "serde", "serde_json", "structopt", + "termprogress", "unwrap", ] @@ -3474,6 +3477,26 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "terminal_size" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd2d183bd3fac5f5fe38ddbeb4dc9aec4a39a9d7d59e7491d900302da01cbe1" +dependencies = [ + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "termprogress" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14" +dependencies = [ + "rustc_version", + "terminal_size", +] + [[package]] name = "textwrap" version = "0.11.0" diff --git a/rust-bins/duniter-dbex/Cargo.toml b/rust-bins/duniter-dbex/Cargo.toml index 23439503c80327d1db5ef8e5c0bdd6ac17339eb9..9cca136c239cc817011635e0b25026694382cedb 100644 --- a/rust-bins/duniter-dbex/Cargo.toml +++ b/rust-bins/duniter-dbex/Cargo.toml @@ -28,9 +28,12 @@ duniter-dbs-write-ops = { path = "../../rust-libs/duniter-dbs-write-ops", defaul duniter-gva = { path = "../../rust-libs/modules/gva" } duniter-gva-db-writer = { path = "../../rust-libs/modules/gva/db-writer" } fast-threadpool = "0.2.2" +flume = "0.9.1" +once_cell = "1.5.2" rayon = "1.3.1" serde_json = "1.0.53" structopt = "0.3.16" +termprogress = "0.3.4" [dev-dependencies] serde = { version = "1.0.105", features = ["derive"] } diff --git a/rust-bins/duniter-dbex/src/cli.rs b/rust-bins/duniter-dbex/src/cli.rs index aa8aa5420be98fc258621c5030333fda7c6daa4f..205a07bcc837684d1032f7f3aef0bd79980c8768 100644 --- a/rust-bins/duniter-dbex/src/cli.rs +++ b/rust-bins/duniter-dbex/src/cli.rs @@ -63,6 +63,18 @@ impl FromStr for Database { pub enum SubCommand { /// Count collection entries Count { collection: String }, + /// Export blockchain + ExportBc { + /// Chunk size (in number of blocks) + #[structopt(short, long, default_value = "1000")] + chunk_size: usize, + /// Output directory + #[structopt(parse(from_os_str))] + output_dir: PathBuf, + /// Write pretty json + #[structopt(short, long)] + pretty: bool, + }, /// Get one value Get { collection: String, key: String }, /// Search values by criteria diff --git a/rust-bins/duniter-dbex/src/export_bc.rs b/rust-bins/duniter-dbex/src/export_bc.rs new file mode 100644 index 0000000000000000000000000000000000000000..6578aab162e7937187a99731ce9b6e79cdebc2cd --- /dev/null +++ b/rust-bins/duniter-dbex/src/export_bc.rs @@ -0,0 +1,148 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use duniter_dbs::{ + databases::bc_v1::{BcV1Db, BcV1DbReadable}, + kv_typed::prelude::Backend, +}; +use fast_threadpool::{ThreadPool, ThreadPoolConfig}; +use once_cell::sync::OnceCell; +use std::{ + io::BufWriter, + path::{Path, PathBuf}, +}; +use termprogress::prelude::*; + +static OUTPUT_DIR: OnceCell<PathBuf> = OnceCell::new(); + +pub(crate) fn export_bc<B: Backend>( + bc_v1: BcV1Db<B>, + chunk_size: usize, + output_dir: PathBuf, + pretty: bool, +) -> anyhow::Result<()> { + if !output_dir.exists() { + std::fs::create_dir_all(output_dir.clone())?; + } + let output_dir: &'static Path = OUTPUT_DIR.get_or_init(|| output_dir).as_path(); + + if let Some(last_block) = bc_v1 + .main_blocks() + .iter_rev(.., |it| it.keys().next_res())? + { + let mut chunks_count = (last_block.0).0 as usize / chunk_size; + if (last_block.0).0 as usize % chunk_size > 0 { + chunks_count += 1; + } + let chunks_count = chunks_count as f64; + let mut progress_bar = Bar::default(); + + let start_time = Instant::now(); + + let (s, r) = flume::unbounded(); + let reader_handle = std::thread::spawn(move || { + bc_v1.main_blocks().iter(.., |it| { + it.values() + .map(|block_res| s.send(block_res).map_err(|_| anyhow!("fail to send"))) + .collect::<anyhow::Result<()>>() + }) + }); + + let (s2, r2) = flume::unbounded(); + let jsonifier_handle = std::thread::spawn(move || { + r.iter() + .map(|block_res| { + let json_block_res = match block_res { + Ok(block) => serde_json::to_value(&block) + .map_err(|e| KvError::DeserError(e.to_string())), + Err(e) => Err(e), + }; + s2.send(json_block_res).map_err(|_| anyhow!("fail to send")) + }) + .collect::<anyhow::Result<()>>() + }); + + let threadpool = ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler(); + + let mut chunk_index = 0; + let mut json_blocks = Vec::with_capacity(chunk_size); + let mut writers_handle = Vec::with_capacity(500_000 / chunk_size); + r2.into_iter() + .try_for_each::<_, anyhow::Result<()>>(|json_block_res| { + let json_block = json_block_res?; + json_blocks.push(json_block); + if json_blocks.len() == chunk_size { + let chunk = std::mem::take(&mut json_blocks); + json_blocks.reserve_exact(chunk_size); + // Write chunk "asynchronously" + writers_handle + .push(threadpool.launch(move |_| { + write_chunk(chunk_index, chunk, output_dir, pretty) + })?); + chunk_index += 1; + if chunk_index % 8 == 0 { + progress_bar.set_progress(chunk_index as f64 / chunks_count); + } + } + Ok(()) + })?; + // Write last chunk + write_chunk(chunk_index, json_blocks, output_dir, pretty)?; + progress_bar.set_progress(1.0); + + reader_handle + .join() + .map_err(|_| anyhow!("reader panic"))??; + jsonifier_handle + .join() + .map_err(|_| anyhow!("jsonnifier panic"))??; + for writer_handle in writers_handle { + writer_handle.join()??; + } + + progress_bar.complete(); + + let duration = start_time.elapsed(); + println!( + "Blockchain successfully exported in {}.{} seconds.", + duration.as_secs(), + duration.subsec_millis() + ); + Ok(()) + } else { + Err(anyhow!("no blockchain")) + } +} + +fn write_chunk( + chunk_index: usize, + chunk: Vec<serde_json::Value>, + output_dir: &'static Path, + pretty: bool, +) -> anyhow::Result<()> { + let file = + File::create(output_dir.join(format!("chunk_{}-{}.json", chunk_index, chunk.len())))?; + + let mut buffer = BufWriter::new(file); + if pretty { + serde_json::to_writer_pretty(&mut buffer, &serde_json::Value::Array(chunk))?; + } else { + serde_json::to_writer(&mut buffer, &serde_json::Value::Array(chunk))?; + } + buffer.flush()?; + + Ok(()) +} diff --git a/rust-bins/duniter-dbex/src/main.rs b/rust-bins/duniter-dbex/src/main.rs index 7002d803e983b046c6d6052844412c8be4442a2a..fa68156ae6fce1d6ad03ea89e72c68c70e14327a 100644 --- a/rust-bins/duniter-dbex/src/main.rs +++ b/rust-bins/duniter-dbex/src/main.rs @@ -23,6 +23,7 @@ )] mod cli; +mod export_bc; mod migrate; mod print_found_data; mod stringify_json_value; @@ -86,51 +87,65 @@ fn main() -> anyhow::Result<()> { )); } - if let SubCommand::Migrate = opt.cmd { - migrate::migrate(profile_path) - } else { - let open_db_start_time = Instant::now(); - match opt.database { - Database::BcV1 => apply_subcommand( - BcV1Db::<LevelDb>::open(LevelDbConf { - db_path: data_path.as_path().join("leveldb"), - ..Default::default() - })?, - opt.cmd, - open_db_start_time, - ), - Database::BcV2 => apply_subcommand( - BcV2Db::<Sled>::open(Sled::gen_backend_conf( - BcV2Db::<Sled>::NAME, - Some(profile_path.as_path()), - ))?, - opt.cmd, - open_db_start_time, - ), - Database::DunpV1 => apply_subcommand( - DunpV1Db::<Sled>::open(Sled::gen_backend_conf( - DunpV1Db::<Sled>::NAME, - Some(profile_path.as_path()), - ))?, - opt.cmd, - open_db_start_time, - ), - Database::GvaV1 => apply_subcommand( - GvaV1Db::<Sled>::open(Sled::gen_backend_conf( - GvaV1Db::<Sled>::NAME, - Some(profile_path.as_path()), - ))?, - opt.cmd, - open_db_start_time, - ), - Database::TxsMpV2 => apply_subcommand( - TxsMpV2Db::<Sled>::open(Sled::gen_backend_conf( - TxsMpV2Db::<Sled>::NAME, - Some(profile_path.as_path()), - ))?, - opt.cmd, - open_db_start_time, - ), + match opt.cmd { + SubCommand::Migrate => migrate::migrate(profile_path), + SubCommand::ExportBc { + chunk_size, + output_dir, + pretty, + } => export_bc::export_bc( + BcV1Db::<LevelDb>::open(LevelDbConf { + db_path: data_path.as_path().join("leveldb"), + ..Default::default() + })?, + chunk_size, + output_dir, + pretty, + ), + _ => { + let open_db_start_time = Instant::now(); + match opt.database { + Database::BcV1 => apply_subcommand( + BcV1Db::<LevelDb>::open(LevelDbConf { + db_path: data_path.as_path().join("leveldb"), + ..Default::default() + })?, + opt.cmd, + open_db_start_time, + ), + Database::BcV2 => apply_subcommand( + BcV2Db::<Sled>::open(Sled::gen_backend_conf( + BcV2Db::<Sled>::NAME, + Some(profile_path.as_path()), + ))?, + opt.cmd, + open_db_start_time, + ), + Database::DunpV1 => apply_subcommand( + DunpV1Db::<Sled>::open(Sled::gen_backend_conf( + DunpV1Db::<Sled>::NAME, + Some(profile_path.as_path()), + ))?, + opt.cmd, + open_db_start_time, + ), + Database::GvaV1 => apply_subcommand( + GvaV1Db::<Sled>::open(Sled::gen_backend_conf( + GvaV1Db::<Sled>::NAME, + Some(profile_path.as_path()), + ))?, + opt.cmd, + open_db_start_time, + ), + Database::TxsMpV2 => apply_subcommand( + TxsMpV2Db::<Sled>::open(Sled::gen_backend_conf( + TxsMpV2Db::<Sled>::NAME, + Some(profile_path.as_path()), + ))?, + opt.cmd, + open_db_start_time, + ), + } } } } @@ -285,7 +300,7 @@ fn apply_subcommand<DB: DbExplorable>( SubCommand::Schema => { show_db_schema(DB::list_collections()); } - SubCommand::Migrate => unreachable!(), + _ => unreachable!(), }; Ok(()) diff --git a/rust-bins/duniter-dbex/src/migrate.rs b/rust-bins/duniter-dbex/src/migrate.rs index aba30ba3c6db70e20d8b704fa1d77bb02a29b0c0..69699aad8ee66af6262b2655bf693e93927634ae 100644 --- a/rust-bins/duniter-dbex/src/migrate.rs +++ b/rust-bins/duniter-dbex/src/migrate.rs @@ -62,7 +62,7 @@ fn migrate_inner( if let Some(target) = get_target_block_number(&duniter_js_db)? { println!("target block: #{}", target.0); - let (s, r) = std::sync::mpsc::channel(); + let (s, r) = flume::unbounded(); let reader_handle = std::thread::spawn(move || { duniter_js_db.main_blocks().iter(.., |it| { it.values() @@ -70,7 +70,7 @@ fn migrate_inner( .collect::<anyhow::Result<()>>() }) }); - let (s2, r2) = std::sync::mpsc::channel(); + let (s2, r2) = flume::unbounded(); let parser_handle = std::thread::spawn(move || { let target_u64 = target.0 as u64; let mut db_blocks = Vec::with_capacity(CHUNK_SIZE);