Skip to content
Snippets Groups Projects
Commit b062ee89 authored by Éloïs's avatar Éloïs
Browse files

perf(indexer): apply blocks_chunks with minimal gva db lock

parent 8abbee13
No related branches found
No related tags found
No related merge requests found
Pipeline #12474 passed
...@@ -29,22 +29,24 @@ pub fn apply_block_blocks_chunk<B: Backend>( ...@@ -29,22 +29,24 @@ pub fn apply_block_blocks_chunk<B: Backend>(
) -> KvResult<()> { ) -> KvResult<()> {
let block_number = block.number().0; let block_number = block.number().0;
let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks"); let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| {
db.current_blocks_chunk.upsert( gva_db.current_blocks_chunk_write().upsert(
U32BE(block_number), U32BE(block_number),
GvaBlockDbV1(DubpBlock::V10(block.clone())), GvaBlockDbV1(DubpBlock::V10(block.clone())),
); )?;
if (block_number + 1) % CHUNK_SIZE == 0 { if (block_number + 1) % CHUNK_SIZE == 0 {
let current_chunk: Vec<GvaBlockDbV1> = db let current_chunk: Vec<GvaBlockDbV1> = gva_db
.current_blocks_chunk .current_blocks_chunk()
.iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?; .iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?;
let current_chunk_bin = bincode_db() let current_chunk_bin = bincode_db()
.serialize(&current_chunk) .serialize(&current_chunk)
.map_err(|e| KvError::DeserError(e.into()))?; .map_err(|e| KvError::DeserError(e.into()))?;
let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref()); let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref());
let chunk_index = U32BE(block_number / CHUNK_SIZE); let chunk_index = U32BE(block_number / CHUNK_SIZE);
db.blocks_chunk_hash.upsert(chunk_index, HashDb(chunk_hash)); gva_db
.blocks_chunk_hash_write()
.upsert(chunk_index, HashDb(chunk_hash))?;
write_and_compress_chunk_in_file( write_and_compress_chunk_in_file(
current_chunk_bin.as_ref(), current_chunk_bin.as_ref(),
...@@ -52,10 +54,47 @@ pub fn apply_block_blocks_chunk<B: Backend>( ...@@ -52,10 +54,47 @@ pub fn apply_block_blocks_chunk<B: Backend>(
chunks_folder_path.as_path(), chunks_folder_path.as_path(),
) )
.map_err(|e| KvError::Custom(e.into()))?; .map_err(|e| KvError::Custom(e.into()))?;
gva_db.current_blocks_chunk_write().clear()?;
} }
Ok(()) Ok(())
})?; }
gva_db.current_blocks_chunk_write().clear()
pub fn revert_block_blocks_chunk<B: Backend>(
block: &DubpBlockV10,
gva_db: &GvaV1Db<B>,
profile_path: &Path,
) -> KvResult<()> {
let block_number = block.number().0;
let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| {
if (block_number + 1) % CHUNK_SIZE == 0 {
// Uncompress last compressed chunk and replace it in current chunk
let chunk_index = U32BE(block_number / CHUNK_SIZE);
if let Some(current_chunk_bin) =
read_and_remove_compressed_chunk(chunk_index.0, chunks_folder_path.as_path())?
{
db.blocks_chunk_hash.remove(chunk_index);
let current_chunk: Vec<GvaBlockDbV1> = bincode_db()
.deserialize(current_chunk_bin.as_ref())
.map_err(|e| KvError::DeserError(e.into()))?;
let current_chunk_begin = block_number - CHUNK_SIZE + 1;
for (i, block) in current_chunk.into_iter().enumerate() {
db.current_blocks_chunk
.upsert(U32BE(current_chunk_begin + i as u32), block);
}
} else {
return Err(KvError::DbCorrupted(
"Not found last compressed chunk".to_owned(),
));
}
} else {
db.current_blocks_chunk.remove(U32BE(block_number));
}
Ok(())
})
} }
/// Read and decompress bytes from file /// Read and decompress bytes from file
...@@ -102,40 +141,3 @@ fn write_and_compress_chunk_in_file( ...@@ -102,40 +141,3 @@ fn write_and_compress_chunk_in_file(
Ok(()) Ok(())
} }
pub fn revert_block_blocks_chunk<B: Backend>(
block: &DubpBlockV10,
gva_db: &GvaV1Db<B>,
profile_path: &Path,
) -> KvResult<()> {
let block_number = block.number().0;
let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| {
if (block_number + 1) % CHUNK_SIZE == 0 {
// Uncompress last compressed chunk and replace it in current chunk
let chunk_index = U32BE(block_number / CHUNK_SIZE);
if let Some(current_chunk_bin) =
read_and_remove_compressed_chunk(chunk_index.0, chunks_folder_path.as_path())?
{
db.blocks_chunk_hash.remove(chunk_index);
let current_chunk: Vec<GvaBlockDbV1> = bincode_db()
.deserialize(current_chunk_bin.as_ref())
.map_err(|e| KvError::DeserError(e.into()))?;
let current_chunk_begin = block_number - CHUNK_SIZE + 1;
for (i, block) in current_chunk.into_iter().enumerate() {
db.current_blocks_chunk
.upsert(U32BE(current_chunk_begin + i as u32), block);
}
} else {
return Err(KvError::DbCorrupted(
"Not found last compressed chunk".to_owned(),
));
}
} else {
db.current_blocks_chunk.remove(U32BE(block_number));
}
Ok(())
})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment