Skip to content
Snippets Groups Projects
Commit 78ae846b authored by Éloïs's avatar Éloïs
Browse files

[feat] dex: add command export-bc

parent 8676bab4
No related branches found
No related tags found
1 merge request!1343[feat] dex: add command export-bc
...@@ -1072,10 +1072,13 @@ dependencies = [ ...@@ -1072,10 +1072,13 @@ dependencies = [
"duniter-gva", "duniter-gva",
"duniter-gva-db-writer", "duniter-gva-db-writer",
"fast-threadpool", "fast-threadpool",
"flume",
"once_cell",
"rayon", "rayon",
"serde", "serde",
"serde_json", "serde_json",
"structopt", "structopt",
"termprogress",
"unwrap", "unwrap",
] ]
...@@ -3474,6 +3477,26 @@ dependencies = [ ...@@ -3474,6 +3477,26 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "terminal_size"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bd2d183bd3fac5f5fe38ddbeb4dc9aec4a39a9d7d59e7491d900302da01cbe1"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "termprogress"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14"
dependencies = [
"rustc_version",
"terminal_size",
]
[[package]] [[package]]
name = "textwrap" name = "textwrap"
version = "0.11.0" version = "0.11.0"
......
...@@ -28,9 +28,12 @@ duniter-dbs-write-ops = { path = "../../rust-libs/duniter-dbs-write-ops", defaul ...@@ -28,9 +28,12 @@ duniter-dbs-write-ops = { path = "../../rust-libs/duniter-dbs-write-ops", defaul
duniter-gva = { path = "../../rust-libs/modules/gva" } duniter-gva = { path = "../../rust-libs/modules/gva" }
duniter-gva-db-writer = { path = "../../rust-libs/modules/gva/db-writer" } duniter-gva-db-writer = { path = "../../rust-libs/modules/gva/db-writer" }
fast-threadpool = "0.2.2" fast-threadpool = "0.2.2"
flume = "0.9.1"
once_cell = "1.5.2"
rayon = "1.3.1" rayon = "1.3.1"
serde_json = "1.0.53" serde_json = "1.0.53"
structopt = "0.3.16" structopt = "0.3.16"
termprogress = "0.3.4"
[dev-dependencies] [dev-dependencies]
serde = { version = "1.0.105", features = ["derive"] } serde = { version = "1.0.105", features = ["derive"] }
......
...@@ -63,6 +63,18 @@ impl FromStr for Database { ...@@ -63,6 +63,18 @@ impl FromStr for Database {
pub enum SubCommand { pub enum SubCommand {
/// Count collection entries /// Count collection entries
Count { collection: String }, Count { collection: String },
/// Export blockchain
ExportBc {
/// Chunk size (in number of blocks)
#[structopt(short, long, default_value = "1000")]
chunk_size: usize,
/// Output directory
#[structopt(parse(from_os_str))]
output_dir: PathBuf,
/// Write pretty json
#[structopt(short, long)]
pretty: bool,
},
/// Get one value /// Get one value
Get { collection: String, key: String }, Get { collection: String, key: String },
/// Search values by criteria /// Search values by criteria
......
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use duniter_dbs::{
databases::bc_v1::{BcV1Db, BcV1DbReadable},
kv_typed::prelude::Backend,
};
use fast_threadpool::{ThreadPool, ThreadPoolConfig};
use once_cell::sync::OnceCell;
use std::{
io::BufWriter,
path::{Path, PathBuf},
};
use termprogress::prelude::*;
static OUTPUT_DIR: OnceCell<PathBuf> = OnceCell::new();
pub(crate) fn export_bc<B: Backend>(
bc_v1: BcV1Db<B>,
chunk_size: usize,
output_dir: PathBuf,
pretty: bool,
) -> anyhow::Result<()> {
if !output_dir.exists() {
std::fs::create_dir_all(output_dir.clone())?;
}
let output_dir: &'static Path = OUTPUT_DIR.get_or_init(|| output_dir).as_path();
if let Some(last_block) = bc_v1
.main_blocks()
.iter_rev(.., |it| it.keys().next_res())?
{
let mut chunks_count = (last_block.0).0 as usize / chunk_size;
if (last_block.0).0 as usize % chunk_size > 0 {
chunks_count += 1;
}
let chunks_count = chunks_count as f64;
let mut progress_bar = Bar::default();
let start_time = Instant::now();
let (s, r) = flume::unbounded();
let reader_handle = std::thread::spawn(move || {
bc_v1.main_blocks().iter(.., |it| {
it.values()
.map(|block_res| s.send(block_res).map_err(|_| anyhow!("fail to send")))
.collect::<anyhow::Result<()>>()
})
});
let (s2, r2) = flume::unbounded();
let jsonifier_handle = std::thread::spawn(move || {
r.iter()
.map(|block_res| {
let json_block_res = match block_res {
Ok(block) => serde_json::to_value(&block)
.map_err(|e| KvError::DeserError(e.to_string())),
Err(e) => Err(e),
};
s2.send(json_block_res).map_err(|_| anyhow!("fail to send"))
})
.collect::<anyhow::Result<()>>()
});
let threadpool = ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler();
let mut chunk_index = 0;
let mut json_blocks = Vec::with_capacity(chunk_size);
let mut writers_handle = Vec::with_capacity(500_000 / chunk_size);
r2.into_iter()
.try_for_each::<_, anyhow::Result<()>>(|json_block_res| {
let json_block = json_block_res?;
json_blocks.push(json_block);
if json_blocks.len() == chunk_size {
let chunk = std::mem::take(&mut json_blocks);
json_blocks.reserve_exact(chunk_size);
// Write chunk "asynchronously"
writers_handle
.push(threadpool.launch(move |_| {
write_chunk(chunk_index, chunk, output_dir, pretty)
})?);
chunk_index += 1;
if chunk_index % 8 == 0 {
progress_bar.set_progress(chunk_index as f64 / chunks_count);
}
}
Ok(())
})?;
// Write last chunk
write_chunk(chunk_index, json_blocks, output_dir, pretty)?;
progress_bar.set_progress(1.0);
reader_handle
.join()
.map_err(|_| anyhow!("reader panic"))??;
jsonifier_handle
.join()
.map_err(|_| anyhow!("jsonnifier panic"))??;
for writer_handle in writers_handle {
writer_handle.join()??;
}
progress_bar.complete();
let duration = start_time.elapsed();
println!(
"Blockchain successfully exported in {}.{} seconds.",
duration.as_secs(),
duration.subsec_millis()
);
Ok(())
} else {
Err(anyhow!("no blockchain"))
}
}
fn write_chunk(
chunk_index: usize,
chunk: Vec<serde_json::Value>,
output_dir: &'static Path,
pretty: bool,
) -> anyhow::Result<()> {
let file =
File::create(output_dir.join(format!("chunk_{}-{}.json", chunk_index, chunk.len())))?;
let mut buffer = BufWriter::new(file);
if pretty {
serde_json::to_writer_pretty(&mut buffer, &serde_json::Value::Array(chunk))?;
} else {
serde_json::to_writer(&mut buffer, &serde_json::Value::Array(chunk))?;
}
buffer.flush()?;
Ok(())
}
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
)] )]
mod cli; mod cli;
mod export_bc;
mod migrate; mod migrate;
mod print_found_data; mod print_found_data;
mod stringify_json_value; mod stringify_json_value;
...@@ -86,9 +87,22 @@ fn main() -> anyhow::Result<()> { ...@@ -86,9 +87,22 @@ fn main() -> anyhow::Result<()> {
)); ));
} }
if let SubCommand::Migrate = opt.cmd { match opt.cmd {
migrate::migrate(profile_path) SubCommand::Migrate => migrate::migrate(profile_path),
} else { SubCommand::ExportBc {
chunk_size,
output_dir,
pretty,
} => export_bc::export_bc(
BcV1Db::<LevelDb>::open(LevelDbConf {
db_path: data_path.as_path().join("leveldb"),
..Default::default()
})?,
chunk_size,
output_dir,
pretty,
),
_ => {
let open_db_start_time = Instant::now(); let open_db_start_time = Instant::now();
match opt.database { match opt.database {
Database::BcV1 => apply_subcommand( Database::BcV1 => apply_subcommand(
...@@ -134,6 +148,7 @@ fn main() -> anyhow::Result<()> { ...@@ -134,6 +148,7 @@ fn main() -> anyhow::Result<()> {
} }
} }
} }
}
fn apply_subcommand<DB: DbExplorable>( fn apply_subcommand<DB: DbExplorable>(
db: DB, db: DB,
...@@ -285,7 +300,7 @@ fn apply_subcommand<DB: DbExplorable>( ...@@ -285,7 +300,7 @@ fn apply_subcommand<DB: DbExplorable>(
SubCommand::Schema => { SubCommand::Schema => {
show_db_schema(DB::list_collections()); show_db_schema(DB::list_collections());
} }
SubCommand::Migrate => unreachable!(), _ => unreachable!(),
}; };
Ok(()) Ok(())
......
...@@ -62,7 +62,7 @@ fn migrate_inner( ...@@ -62,7 +62,7 @@ fn migrate_inner(
if let Some(target) = get_target_block_number(&duniter_js_db)? { if let Some(target) = get_target_block_number(&duniter_js_db)? {
println!("target block: #{}", target.0); println!("target block: #{}", target.0);
let (s, r) = std::sync::mpsc::channel(); let (s, r) = flume::unbounded();
let reader_handle = std::thread::spawn(move || { let reader_handle = std::thread::spawn(move || {
duniter_js_db.main_blocks().iter(.., |it| { duniter_js_db.main_blocks().iter(.., |it| {
it.values() it.values()
...@@ -70,7 +70,7 @@ fn migrate_inner( ...@@ -70,7 +70,7 @@ fn migrate_inner(
.collect::<anyhow::Result<()>>() .collect::<anyhow::Result<()>>()
}) })
}); });
let (s2, r2) = std::sync::mpsc::channel(); let (s2, r2) = flume::unbounded();
let parser_handle = std::thread::spawn(move || { let parser_handle = std::thread::spawn(move || {
let target_u64 = target.0 as u64; let target_u64 = target.0 as u64;
let mut db_blocks = Vec::with_capacity(CHUNK_SIZE); let mut db_blocks = Vec::with_capacity(CHUNK_SIZE);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment