Skip to content
Snippets Groups Projects
Commit fef6bfd8 authored by Éloïs's avatar Éloïs
Browse files

[opti] blockchain: local sync : parse json files per batch of 16 in //

parent 1764f24d
No related branches found
No related tags found
1 merge request!109Resolve "Fork resolution algorithm"
......@@ -392,10 +392,9 @@ dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pbr 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustbreak 2.0.0-rc3 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.86 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)",
"sqlite 0.23.9 (registry+https://github.com/rust-lang/crates.io-index)",
"threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
......
......@@ -26,10 +26,9 @@ json-pest-parser = { path = "../../../tools/json-pest-parser" }
log = "0.4.*"
num_cpus = "1.8.*"
pbr = "1.0.*"
rustbreak = {version = "2.0.0-rc3", features = ["bin_enc"]}
rayon = "1.0.3"
serde = "1.0.*"
serde_json = "1.0.*"
sqlite = "0.23.*"
threadpool = "1.7.*"
[dev-dependencies]
......
......@@ -19,12 +19,16 @@ use dubp_documents::parsers::blocks::parse_json_block;
use dubp_documents::Blockstamp;
use durs_common_tools::fatal_error;
use failure::Error;
use rayon::prelude::*;
use std::collections::HashSet;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use threadpool::ThreadPool;
/// Number of chunk parsed before sending them to the apply workers
static CHUNKS_STEP: &'static usize = &16;
/// Json reader worker
pub fn json_reader_worker(
pool: &ThreadPool,
......@@ -108,31 +112,25 @@ pub fn json_reader_worker(
current_blockstamp.id.0 as usize / *crate::constants::CHUNK_SIZE;
// Parse chunks
for chunk_number in first_chunk_number..=max_chunk_number {
if chunks_set.get(&chunk_number).is_some() {
// Open chunk file
let chunk_file_content_result =
open_json_chunk_file(&json_chunks_path, chunk_number);
if chunk_file_content_result.is_err() {
fatal_error(&format!("Fail to open chunk file n°{}", chunk_number));
}
// Parse chunk file content
let blocks_result =
parse_json_chunk(&chunk_file_content_result.expect("safe unwrap"));
let blocks = match blocks_result {
Ok(blocks) => blocks,
Err(e) => {
fatal_error(&format!(
"Fail to parse chunk file n°{} : {}",
chunk_number, e,
));
panic!(); // for compilator
}
let mut begin_chunk_number = first_chunk_number;
while begin_chunk_number <= max_chunk_number {
let last_chunk_number = if begin_chunk_number + *CHUNKS_STEP < max_chunk_number + 1 {
begin_chunk_number + *CHUNKS_STEP
} else {
max_chunk_number + 1
};
let chunks_numbers: Vec<_> = (begin_chunk_number..last_chunk_number).collect();
let mut chunks_blocks: HashMap<usize, Vec<BlockDocument>> = chunks_numbers
.par_iter()
.map(|chunk_number| treat_once_json_chunk(&json_chunks_path, *chunk_number))
.collect();
// Send all blocks of this chunk
for block in blocks {
// Send blocks
for chunk_number in chunks_numbers {
for block in chunks_blocks
.remove(&chunk_number)
.expect("Dev error: sync: chunk_blocks not contain key chunk_number !")
{
// Verify if the block number is within the expected interval
let block_id = block.blockstamp().id;
if (block_id > current_blockstamp.id && block_id.0 <= max_block_id)
......@@ -144,9 +142,9 @@ pub fn json_reader_worker(
.expect("Fatal error : sync_thread unrechable !");
}
}
} else {
fatal_error(&format!("Missing chunk file n°{}", chunk_number));
}
begin_chunk_number += *CHUNKS_STEP;
}
sender_sync_thread
......@@ -163,6 +161,32 @@ pub fn json_reader_worker(
});
}
/// Treat one JSON Chunk
fn treat_once_json_chunk(
json_chunks_path: &PathBuf,
chunk_number: usize,
) -> (usize, Vec<BlockDocument>) {
// Open chunk file
let chunk_file_content_result = open_json_chunk_file(json_chunks_path, chunk_number);
if chunk_file_content_result.is_err() {
fatal_error(&format!("Fail to open chunk file n°{}", chunk_number));
}
// Parse chunk file content
let blocks_result = parse_json_chunk(&chunk_file_content_result.expect("safe unwrap"));
let blocks = match blocks_result {
Ok(blocks) => blocks,
Err(e) => {
fatal_error(&format!(
"Fail to parse chunk file n°{} : {}",
chunk_number, e,
));
panic!(); // for compilator
}
};
(chunk_number, blocks)
}
/// Parse json chunk into BlockDocument Vector
fn parse_json_chunk(json_chunk_content: &str) -> Result<Vec<BlockDocument>, Error> {
let mut block_doc_vec = Vec::with_capacity(*crate::constants::CHUNK_SIZE);
......
......@@ -43,7 +43,7 @@ pub struct BlockHeader {
pub issuer: PubKey,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
/// Message for main sync thread
pub enum MessForSyncThread {
Target(CurrencyName, Blockstamp),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment