Skip to content
Snippets Groups Projects
Commit 18822aeb authored by Éloïs's avatar Éloïs
Browse files

perf(indexer): store blocks chunks in files directly

parent bdb5ea59
No related branches found
No related tags found
No related merge requests found
Pipeline #12469 passed
...@@ -1028,8 +1028,9 @@ dependencies = [ ...@@ -1028,8 +1028,9 @@ dependencies = [
"dubp", "dubp",
"duniter-core", "duniter-core",
"duniter-gva-db", "duniter-gva-db",
"flate2",
"log",
"maplit", "maplit",
"miniz_oxide",
"once_cell", "once_cell",
"parking_lot", "parking_lot",
"resiter", "resiter",
...@@ -1176,6 +1177,18 @@ dependencies = [ ...@@ -1176,6 +1177,18 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "flate2"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0"
dependencies = [
"cfg-if 1.0.0",
"crc32fast",
"libc",
"miniz_oxide",
]
[[package]] [[package]]
name = "float-cmp" name = "float-cmp"
version = "0.8.0" version = "0.8.0"
......
...@@ -57,7 +57,6 @@ db_schema!( ...@@ -57,7 +57,6 @@ db_schema!(
["blocks_with_ud", BlocksWithUd, U32BE, ()], ["blocks_with_ud", BlocksWithUd, U32BE, ()],
["blockchain_time", BlockchainTime, U32BE, u64], ["blockchain_time", BlockchainTime, U32BE, u64],
["blocks_chunk_hash", BlocksChunkHash, U32BE, HashDb], ["blocks_chunk_hash", BlocksChunkHash, U32BE, HashDb],
["compressed_blocks_chunk", CompressedBlocksChunk, U32BE, Vec<u8>],
["current_blocks_chunk", CurrentBlocksChunk, U32BE, GvaBlockDbV1], ["current_blocks_chunk", CurrentBlocksChunk, U32BE, GvaBlockDbV1],
["gva_identities", GvaIdentities, PubKeyKeyV2, GvaIdtyDbV1], ["gva_identities", GvaIdentities, PubKeyKeyV2, GvaIdtyDbV1],
[ [
......
...@@ -17,7 +17,8 @@ bincode = "1.3" ...@@ -17,7 +17,8 @@ bincode = "1.3"
duniter-core = { git = "https://git.duniter.org/nodes/rust/duniter-core" } duniter-core = { git = "https://git.duniter.org/nodes/rust/duniter-core" }
duniter-gva-db = { path = "../db" } duniter-gva-db = { path = "../db" }
dubp = { version = "0.54.1", features = ["duniter"] } dubp = { version = "0.54.1", features = ["duniter"] }
miniz_oxide = "0.4.4" flate2 = "1.0.16"
log = "0.4"
once_cell = "1.7" once_cell = "1.7"
resiter = "0.4.0" resiter = "0.4.0"
......
...@@ -14,65 +14,105 @@ ...@@ -14,65 +14,105 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*; use crate::*;
use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
use flate2::Compression;
const CHUNK_SIZE: u32 = 4_096; const CHUNK_SIZE: u32 = 4_096;
pub(super) fn apply_block_blocks_chunk<B: Backend>( pub fn apply_block_blocks_chunk<B: Backend>(
block: &DubpBlockV10, block: &DubpBlockV10,
gva_db: &mut GvaV1DbTxRw<B::Col>, gva_db: &GvaV1Db<B>,
profile_path: &Path,
) -> KvResult<()> { ) -> KvResult<()> {
let block_number = block.number().0; let block_number = block.number().0;
gva_db.current_blocks_chunk.upsert( let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| {
db.current_blocks_chunk.upsert(
U32BE(block_number), U32BE(block_number),
GvaBlockDbV1(DubpBlock::V10(block.clone())), GvaBlockDbV1(DubpBlock::V10(block.clone())),
); );
if (block_number + 1) % CHUNK_SIZE == 0 { if (block_number + 1) % CHUNK_SIZE == 0 {
let current_chunk: Vec<GvaBlockDbV1> = gva_db let current_chunk: Vec<GvaBlockDbV1> = db
.current_blocks_chunk .current_blocks_chunk
.iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?; .iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?;
let current_chunk_bin = bincode_db() let current_chunk_bin = bincode_db()
.serialize(&current_chunk) .serialize(&current_chunk)
.map_err(|e| KvError::DeserError(e.into()))?; .map_err(|e| KvError::DeserError(e.into()))?;
let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref()); let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref());
let chunk_index = U32BE(block_number / CHUNK_SIZE);
db.blocks_chunk_hash.upsert(chunk_index, HashDb(chunk_hash));
let compressed_chunk = miniz_oxide::deflate::compress_to_vec(current_chunk_bin.as_ref(), 3); write_and_compress_chunk_in_file(
current_chunk_bin.as_ref(),
chunk_index.0,
chunks_folder_path.as_path(),
)
.map_err(|e| KvError::Custom(e.into()))?;
}
Ok(())
})
}
let chunk_index = U32BE(block_number / CHUNK_SIZE); /// Read and decompress bytes from file
gva_db fn read_and_remove_compressed_chunk(
.blocks_chunk_hash chunk_index: u32,
.upsert(chunk_index, HashDb(chunk_hash)); chunks_folder_path: &Path,
gva_db ) -> std::io::Result<Option<Vec<u8>>> {
.compressed_blocks_chunk let file_path = chunks_folder_path.join(format!("_{}", chunk_index));
.upsert(chunk_index, compressed_chunk); if !file_path.exists() {
return Ok(None);
}
if std::fs::metadata(file_path.as_path())?.len() > 0 {
let file = std::fs::File::open(file_path)?;
let mut z = ZlibDecoder::new(file);
let mut decompressed_bytes = Vec::new();
z.read_to_end(&mut decompressed_bytes)?;
Ok(Some(decompressed_bytes))
} else {
Ok(None)
}
} }
/// Write and compress chunk in file
fn write_and_compress_chunk_in_file(
chunk: &[u8],
chunk_index: u32,
chunks_folder_path: &Path,
) -> Result<(), std::io::Error> {
log::info!("blocks_chunk_{}: {} bytes", chunk_index, chunk.len());
let file = std::fs::File::create(chunks_folder_path.join(format!("_{}", chunk_index)))?;
let mut e = ZlibEncoder::new(file, Compression::new(3));
e.write_all(chunk)?;
e.finish()?;
Ok(()) Ok(())
} }
pub(super) fn revert_block_blocks_chunk<B: Backend>( pub fn revert_block_blocks_chunk<B: Backend>(
block: &DubpBlockV10, block: &DubpBlockV10,
gva_db: &mut GvaV1DbTxRw<B::Col>, gva_db: &GvaV1Db<B>,
profile_path: &Path,
) -> KvResult<()> { ) -> KvResult<()> {
let block_number = block.number().0; let block_number = block.number().0;
let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| {
if (block_number + 1) % CHUNK_SIZE == 0 { if (block_number + 1) % CHUNK_SIZE == 0 {
// Uncompress las compressed chunk and replace it in current chunk // Uncompress last compressed chunk and replace it in current chunk
let chunk_index = U32BE(block_number / CHUNK_SIZE); let chunk_index = U32BE(block_number / CHUNK_SIZE);
if let Some(compressed_chunk) = gva_db.compressed_blocks_chunk.get(&chunk_index)? { if let Some(current_chunk_bin) =
gva_db.blocks_chunk_hash.remove(chunk_index); read_and_remove_compressed_chunk(chunk_index.0, chunks_folder_path.as_path())?
gva_db.compressed_blocks_chunk.remove(chunk_index); {
db.blocks_chunk_hash.remove(chunk_index);
let current_chunk_bin =
miniz_oxide::inflate::decompress_to_vec(compressed_chunk.as_ref())
.map_err(|e| KvError::Custom(format!("{:?}", e).into()))?;
let current_chunk: Vec<GvaBlockDbV1> = bincode_db() let current_chunk: Vec<GvaBlockDbV1> = bincode_db()
.deserialize(current_chunk_bin.as_ref()) .deserialize(current_chunk_bin.as_ref())
.map_err(|e| KvError::DeserError(e.into()))?; .map_err(|e| KvError::DeserError(e.into()))?;
let current_chunk_begin = block_number - CHUNK_SIZE + 1; let current_chunk_begin = block_number - CHUNK_SIZE + 1;
for (i, block) in current_chunk.into_iter().enumerate() { for (i, block) in current_chunk.into_iter().enumerate() {
gva_db db.current_blocks_chunk
.current_blocks_chunk
.upsert(U32BE(current_chunk_begin + i as u32), block); .upsert(U32BE(current_chunk_begin + i as u32), block);
} }
} else { } else {
...@@ -81,8 +121,9 @@ pub(super) fn revert_block_blocks_chunk<B: Backend>( ...@@ -81,8 +121,9 @@ pub(super) fn revert_block_blocks_chunk<B: Backend>(
)); ));
} }
} else { } else {
gva_db.current_blocks_chunk.remove(U32BE(block_number)); db.current_blocks_chunk.remove(U32BE(block_number));
} }
Ok(()) Ok(())
})
} }
...@@ -27,6 +27,8 @@ mod identities; ...@@ -27,6 +27,8 @@ mod identities;
mod tx; mod tx;
mod utxos; mod utxos;
pub use blocks_chunks::{apply_block_blocks_chunk, revert_block_blocks_chunk};
use bincode::Options as _; use bincode::Options as _;
use dubp::{ use dubp::{
block::prelude::*, block::prelude::*,
...@@ -48,6 +50,7 @@ use once_cell::sync::OnceCell; ...@@ -48,6 +50,7 @@ use once_cell::sync::OnceCell;
use resiter::filter::Filter; use resiter::filter::Filter;
use std::{ use std::{
collections::{BTreeSet, HashMap, HashSet}, collections::{BTreeSet, HashMap, HashSet},
io::prelude::*,
ops::AddAssign, ops::AddAssign,
path::Path, path::Path,
}; };
...@@ -118,7 +121,6 @@ pub fn apply_block<B: Backend>( ...@@ -118,7 +121,6 @@ pub fn apply_block<B: Backend>(
hash: block.hash(), hash: block.hash(),
}; };
gva_db.write(|mut db| { gva_db.write(|mut db| {
blocks_chunks::apply_block_blocks_chunk::<B>(block, &mut db)?;
db.blocks_by_common_time db.blocks_by_common_time
.upsert(U64BE(block.common_time()), block.number().0); .upsert(U64BE(block.common_time()), block.number().0);
db.blockchain_time db.blockchain_time
...@@ -139,9 +141,7 @@ pub fn apply_block<B: Backend>( ...@@ -139,9 +141,7 @@ pub fn apply_block<B: Backend>(
block.common_time() as i64, block.common_time() as i64,
block.transactions(), block.transactions(),
) )
})?; })
Ok(())
} }
pub fn revert_block<B: Backend>( pub fn revert_block<B: Backend>(
...@@ -150,7 +150,6 @@ pub fn revert_block<B: Backend>( ...@@ -150,7 +150,6 @@ pub fn revert_block<B: Backend>(
gva_db: &GvaV1Db<B>, gva_db: &GvaV1Db<B>,
) -> KvResult<()> { ) -> KvResult<()> {
gva_db.write(|mut db| { gva_db.write(|mut db| {
blocks_chunks::revert_block_blocks_chunk::<B>(block, &mut db)?;
db.blocks_by_common_time.remove(U64BE(block.common_time())); db.blocks_by_common_time.remove(U64BE(block.common_time()));
db.blockchain_time.remove(U32BE(block.number().0)); db.blockchain_time.remove(U32BE(block.number().0));
identities::revert_identities::<B>(&block, &mut db.gva_identities)?; identities::revert_identities::<B>(&block, &mut db.gva_identities)?;
...@@ -178,9 +177,7 @@ pub fn revert_block<B: Backend>( ...@@ -178,9 +177,7 @@ pub fn revert_block<B: Backend>(
} }
db.txs_by_block.remove(U32BE(block.number().0)); db.txs_by_block.remove(U32BE(block.number().0));
Ok(()) Ok(())
})?; })
Ok(())
} }
fn apply_ud<B: Backend>( fn apply_ud<B: Backend>(
......
...@@ -76,8 +76,14 @@ impl duniter_core::module::DuniterModule for GvaModule { ...@@ -76,8 +76,14 @@ impl duniter_core::module::DuniterModule for GvaModule {
currency_params: CurrencyParameters, currency_params: CurrencyParameters,
profile_path_opt: Option<&Path>, profile_path_opt: Option<&Path>,
) -> KvResult<()> { ) -> KvResult<()> {
if let Some(profile_path) = profile_path_opt {
let gva_db = get_gva_db_rw(profile_path_opt); let gva_db = get_gva_db_rw(profile_path_opt);
duniter_gva_indexer::apply_block_blocks_chunk(&block, &gva_db, profile_path)?;
duniter_gva_indexer::apply_block(&block, currency_params, gva_db) duniter_gva_indexer::apply_block(&block, currency_params, gva_db)
} else {
let gva_db = get_gva_db_rw(profile_path_opt);
duniter_gva_indexer::apply_block(&block, currency_params, gva_db)
}
} }
fn revert_block( fn revert_block(
block: &DubpBlockV10, block: &DubpBlockV10,
...@@ -85,8 +91,14 @@ impl duniter_core::module::DuniterModule for GvaModule { ...@@ -85,8 +91,14 @@ impl duniter_core::module::DuniterModule for GvaModule {
currency_params: CurrencyParameters, currency_params: CurrencyParameters,
profile_path_opt: Option<&Path>, profile_path_opt: Option<&Path>,
) -> KvResult<()> { ) -> KvResult<()> {
if let Some(profile_path) = profile_path_opt {
let gva_db = get_gva_db_rw(profile_path_opt); let gva_db = get_gva_db_rw(profile_path_opt);
duniter_gva_indexer::revert_block_blocks_chunk(&block, &gva_db, profile_path)?;
duniter_gva_indexer::revert_block(&block, currency_params, gva_db) duniter_gva_indexer::revert_block(&block, currency_params, gva_db)
} else {
let gva_db = get_gva_db_rw(profile_path_opt);
duniter_gva_indexer::revert_block(&block, currency_params, gva_db)
}
} }
async fn init( async fn init(
conf: Self::Conf, conf: Self::Conf,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment