Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • librelois/duniter-rs
  • ji_emme/duniter-rs
  • vindarel/duniter-rs
  • Hiroty1er/duniter-rs
  • dvermd/duniter-rs
  • 666titi999/duniter-rs
6 results
Select Git revision
Loading items
Show changes
Commits on Source (2)
Showing
with 2385 additions and 14 deletions
......@@ -7,6 +7,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
name = "adler32"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ansi_term"
......@@ -137,6 +140,11 @@ dependencies = [
"byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "build_const"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "byte-tools"
version = "0.2.0"
......@@ -203,6 +211,33 @@ name = "constant_time_eq"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "core_affinity"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crc"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crc32fast"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.2.0"
......@@ -642,6 +677,17 @@ name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flate2"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz-sys 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fnv"
version = "1.0.6"
......@@ -782,6 +828,20 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "kv-parts-dbs"
version = "0.1.0-a0.1"
dependencies = [
"bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)",
"flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.86 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "1.2.0"
......@@ -836,6 +896,34 @@ name = "memoffset"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "miniz-sys"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz_oxide"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz_oxide_c_api"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)",
"crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz_oxide 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "mio"
version = "0.6.16"
......@@ -1593,6 +1681,7 @@ dependencies = [
[metadata]
"checksum aho-corasick 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e6f484ae0c99fec2e858eb6134949117399f222608d84cadb3f58c1f97c2364c"
"checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum argon2rs 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3f67b0b6a86dae6e67ff4ca2b6201396074996379fba2b92ff649126f37cb392"
"checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee"
......@@ -1609,6 +1698,7 @@ dependencies = [
"checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12"
"checksum blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "5d6d530bdd2d52966a6d03b7a964add7ae1a288d25214066fd4b600f0f796400"
"checksum block-buffer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a076c298b9ecdb530ed9d967e74a6027d6a7478924520acddcddc24c1c8ab3ab"
"checksum build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39"
"checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40"
"checksum byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "94f88df23a25417badc922ab0f5716cc1330e87f71ddd9203b3a3ccd9cedf75d"
"checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa"
......@@ -1618,6 +1708,9 @@ dependencies = [
"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e"
"checksum core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6d162c6e463c31dbf78fefa99d042156c1c74d404e299cfe3df2923cb857595b"
"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3"
"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150"
"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
......@@ -1628,6 +1721,7 @@ dependencies = [
"checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2"
"checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
"checksum flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f87e68aa82b2de08a6e037f1385455759df6e445a8df5e005b4297191dbf18aa"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
"checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
......@@ -1655,6 +1749,9 @@ dependencies = [
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2efc7bc57c883d4a4d6e3246905283d8dae951bb3bd32f49d6ef297f546e1c39"
"checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3"
"checksum miniz-sys 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0300eafb20369952951699b68243ab4334f4b10a88f411c221d444b36c40e649"
"checksum miniz_oxide 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c468f2369f07d651a5d0bb2c9079f8488a66d5466efe42d0c5c6466edcb7f71e"
"checksum miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b7fe927a42e3807ef71defb191dc87d4e24479b221e67015fe38ae2b7b447bab"
"checksum mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "71646331f2619b1026cc302f87a2b8b648d5c6dd6937846a16cc8ce0f347f432"
"checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
......
......@@ -17,6 +17,7 @@ members = [
"lib/tests-tools/documents-tests-tools",
"lib/tests-tools/common-tests-tools",
"lib/tools/crypto",
"lib/tools/kv-parts-dbs",
"lib/tools/common-tools",
"lib/tools/documents",
"lib/tools/json-pest-parser",
......
......@@ -231,10 +231,10 @@ fn parse_value<S: std::hash::BuildHasher + Default>(pair: Pair<Rule>) -> JSONVal
}
pub fn get_optional_usize<S: std::hash::BuildHasher>(
json_block: &HashMap<&str, JSONValue<S>, S>,
json_object: &HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Option<usize>, Error> {
Ok(match json_block.get(field) {
Ok(match json_object.get(field) {
Some(value) => {
if !value.is_null() {
Some(
......@@ -257,10 +257,10 @@ pub fn get_optional_usize<S: std::hash::BuildHasher>(
}
pub fn get_optional_str_not_empty<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Option<&'a str>, Error> {
let result = get_optional_str(json_block, field);
let result = get_optional_str(json_object, field);
if let Ok(Some(value)) = result {
if !value.is_empty() {
Ok(Some(value))
......@@ -273,10 +273,10 @@ pub fn get_optional_str_not_empty<'a, S: std::hash::BuildHasher>(
}
pub fn get_optional_str<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Option<&'a str>, Error> {
Ok(match json_block.get(field) {
Ok(match json_object.get(field) {
Some(value) => {
if !value.is_null() {
Some(value.to_str().ok_or_else(|| ParseJsonError {
......@@ -291,10 +291,10 @@ pub fn get_optional_str<'a, S: std::hash::BuildHasher>(
}
pub fn get_number<S: std::hash::BuildHasher>(
json_block: &HashMap<&str, JSONValue<S>, S>,
json_object: &HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<f64, Error> {
Ok(json_block
Ok(json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......@@ -306,10 +306,10 @@ pub fn get_number<S: std::hash::BuildHasher>(
}
pub fn get_str<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<&'a str, Error> {
Ok(json_block
Ok(json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......@@ -321,10 +321,10 @@ pub fn get_str<'a, S: std::hash::BuildHasher>(
}
pub fn get_str_array<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Vec<&'a str>, ParseJsonError> {
json_block
json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......@@ -346,10 +346,10 @@ pub fn get_str_array<'a, S: std::hash::BuildHasher>(
}
pub fn get_object_array<'a, S: std::hash::BuildHasher>(
json_block: &'a JsonObject<'a, S>,
json_object: &'a JsonObject<'a, S>,
field: &str,
) -> Result<Vec<&'a JsonObject<'a, S>>, ParseJsonError> {
json_block
json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......
[package]
name = "kv-parts-dbs"
version = "0.1.0-a0.1"
authors = ["librelois <elois@ifee.fr>"]
description = "Manage key-value databases persisted in compressed binary files."
license = "AGPL-3.0"
edition = "2018"
[lib]
path = "src/lib.rs"
[dependencies]
bincode = "1.0.1"
core_affinity = "0.5.9"
flate2 = "1.0.7"
fnv = "1.0.6"
log = "0.4.*"
num_cpus = "1.8.*"
rayon = "1.0.*"
serde = { version = "1.0.*", features = ["derive"] }
[dev-dependencies]
# KV Parts DBs
Manage key-value databases persisted in compressed binary files.
\ No newline at end of file
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define KvPartsDB caches
use crate::kv_collection::KvPartsCollection;
use crate::PartId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
/// Database caches
#[derive(Debug)]
pub struct KvPartsDbCaches<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Cache
pub cache: HashMap<PartId, C>,
/// Cache size (in number of items)
pub cache_size: usize,
/// Raw cache
pub raw_cache: HashMap<PartId, Vec<u8>>,
/// Queue of the part identifiers that have been read
pub parts_have_been_read: VecDeque<PartId>,
/// Identifiers of the parts to be written
pub parts_to_be_written: BTreeSet<PartId>,
/// Phantom key
pub phantom_key: PhantomData<K>,
// Phantom value
pub phantom_value: PhantomData<V>,
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Manage databases compression.
use flate2::bufread::GzDecoder;
use flate2::bufread::GzEncoder;
use flate2::Compression as FlateCompression;
use std::io::prelude::*;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
/// Describe the compression level with an explicitly specified integer.
/// The integer here is typically on a scale of 0-9 where 0 means "no compression" and 9 means "take as long as you'd like".
pub struct CompressionLevel(u32);
impl CompressionLevel {
/// Create CompressionLevel
pub fn new(level: u32) -> Self {
if level < 10 {
CompressionLevel(level)
} else {
panic!("CompressionLevel::new() : level must be between 0 and 9 !")
}
}
/// Get compression level
pub fn level(self) -> u32 {
self.0
}
}
impl Default for CompressionLevel {
fn default() -> CompressionLevel {
CompressionLevel(1)
}
}
impl Into<FlateCompression> for CompressionLevel {
fn into(self) -> FlateCompression {
FlateCompression::new(self.0)
}
}
/// Compress binary datas (use gzip)
pub fn compress_binary_datas(
datas: &[u8],
compression: CompressionLevel,
) -> std::io::Result<Vec<u8>> {
let mut gz = GzEncoder::new(datas, compression.into());
let mut buffer = Vec::new();
gz.read_to_end(&mut buffer)?;
Ok(buffer)
}
/// Uncompress gzip binary datas
pub fn uncompress_binary_datas(datas: &[u8]) -> std::io::Result<Vec<u8>> {
let mut gz = GzDecoder::new(datas);
let mut buffer = Vec::new();
gz.read_to_end(&mut buffer)?;
Ok(buffer)
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KvPartsDB Constants
/// Max cache sixe (in number of items)
pub static DEFAULT_MAX_CACHE_SIZE: &'static usize = &10_000;
/// Max raw cache sixe (in number of parts)
pub static DEFAULT_MAX_RAW_CACHE_SIZE: &'static usize = &2_000;
/// Clone requested values to cache ?
pub static DEFAULT_MAX_CLONED_VALUES: &'static usize = &0;
pub static PART_FILE_NAME_BEGIN: &'static str = "_";
pub static PART_FILE_NAME_END: &'static str = ".bin.gz";
pub static PART_FILE_NAME_MIN_LEN: &'static usize = &9;
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define possible DB Errors
use crate::readers::DbReadError;
use crate::writers::DbWriteError;
/// DB Error
#[derive(Debug)]
pub enum DbError {
/// Spawn error
SpawnError(std::io::Error),
/// DB read error
DbReadError(DbReadError),
/// DB write error
DbWriteError(DbWriteError),
/// Need save before execute request
NeedSave,
/// Error when send request
SendReqError,
/// Error when closing the DB
CloseError,
/// The database is no longer reachable
UnreachableError,
/// Forbidden request
ForbiddenRequest,
}
impl From<DbReadError> for DbError {
fn from(err: DbReadError) -> Self {
DbError::DbReadError(err)
}
}
impl From<DbWriteError> for DbError {
fn from(err: DbWriteError) -> Self {
DbError::DbWriteError(err)
}
}
impl From<std::io::Error> for DbError {
fn from(err: std::io::Error) -> Self {
DbError::SpawnError(err)
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KbPartsDB implementation using HashMap to store collection
use crate::kv_collection::KvPartsCollection;
use crate::responses::DbResponse;
use crate::{AbstractKvPartsDb, PartId};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
/// HashMap Database
#[derive(Debug, Clone)]
pub struct KvPartsHashDB<K, V, S, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Hash + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
S: Default + std::hash::BuildHasher + Send + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, HashMap<K, V, S>>>
+ Into<Option<DbResponse<K, V, HashMap<K, V, S>>>>
+ Send
+ Sync,
{
/// Phantom key
phantom_key: PhantomData<K>,
/// Phantom value
phantom_value: PhantomData<V>,
/// Phantom msg
phantom_msg: PhantomData<M>,
/// Phantom hasher
phantom_hasher: PhantomData<S>,
}
impl<K, V, S, M> AbstractKvPartsDb<K, V, HashMap<K, V, S>, M> for KvPartsHashDB<K, V, S, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Hash
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
S: 'static + Clone + Default + std::hash::BuildHasher + Send + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, HashMap<K, V, S>>>
+ Into<Option<DbResponse<K, V, HashMap<K, V, S>>>>
+ Send
+ Sync,
{
}
impl<K, V, S> KvPartsCollection<K, V> for HashMap<K, V, S>
where
K: Debug + DeserializeOwned + Eq + Hash + Send + Serialize + Sync,
V: Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
S: Default + std::hash::BuildHasher + Send + Sync,
{
#[inline]
fn new() -> Self {
HashMap::with_hasher(S::default())
}
#[inline]
fn contains_key(&self, key: &K) -> bool {
self.contains_key(key)
}
#[inline]
fn insert(&mut self, key: K, value: V) -> Option<V> {
self.insert(key, value)
}
#[inline]
fn get(&self, key: &K) -> Option<&V> {
self.get(key)
}
#[inline]
fn remove(&mut self, key: &K) -> Option<V> {
self.remove(key)
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KbPartsDB implementations
pub mod hash_db;
pub mod tree_db;
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KbPartsDB implementation using BTreeMap to store collection
use crate::kv_collection::KvPartsCollection;
use crate::responses::DbResponse;
use crate::{AbstractKvPartsDb, PartId};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
/// BTreeMap Database
#[derive(Debug, Clone)]
pub struct KvPartsTreeDB<K, V, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Ord + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, BTreeMap<K, V>>>
+ Into<Option<DbResponse<K, V, BTreeMap<K, V>>>>
+ Send
+ Sync,
{
/// Phantom key
phantom_key: PhantomData<K>,
/// Phantom value
phantom_value: PhantomData<V>,
/// Phantom msg
phantom_msg: PhantomData<M>,
}
impl<K, V, M> AbstractKvPartsDb<K, V, BTreeMap<K, V>, M> for KvPartsTreeDB<K, V, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Ord
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, BTreeMap<K, V>>>
+ Into<Option<DbResponse<K, V, BTreeMap<K, V>>>>
+ Send
+ Sync,
{
}
impl<K, V> KvPartsCollection<K, V> for BTreeMap<K, V>
where
K: Debug + DeserializeOwned + Eq + Ord + Send + Serialize + Sync,
V: Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
{
#[inline]
fn new() -> Self {
BTreeMap::new()
}
#[inline]
fn contains_key(&self, key: &K) -> bool {
self.contains_key(key)
}
#[inline]
fn insert(&mut self, key: K, value: V) -> Option<V> {
self.insert(key, value)
}
#[inline]
fn get(&self, key: &K) -> Option<&V> {
self.get(key)
}
#[inline]
fn remove(&mut self, key: &K) -> Option<V> {
self.remove(key)
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define KvPart map trait and implement it to some collections
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::iter::{FromIterator, IntoIterator};
/// KvParts Map
pub trait KvPartsCollection<K, V>:
Debug
+ DeserializeOwned
+ Eq
+ Extend<(K, V)>
+ FromIterator<(K, V)>
+ IntoIterator<Item = (K, V)>
+ Send
+ Serialize
+ Sync
{
/// Instantiate new empty collection
fn new() -> Self;
/// Return true if the map contains a value for the specified key
fn contains_key(&self, key: &K) -> bool;
/// Insert a key-value pair into the map. An existing value for a
/// key is replaced by the new value.
///
/// If the map did not have this key present, `None` is returned.
///
/// If the map did have this key present, the value is updated, and the old
/// value is returned.
fn insert(&mut self, key: K, value: V) -> Option<V>;
/// Returns a reference to the value corresponding to the key.
/// The key may be any borrowed form of the map's key type.
fn get(&self, key: &K) -> Option<&V>;
/// Remove a key-value pair from the map. Return the value if the key
/// was present in the map, otherwise return None.
fn remove(&mut self, key: &K) -> Option<V>;
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Manage databases persisted in compressed binary files.
#![deny(
missing_docs,
missing_debug_implementations,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unsafe_code,
unstable_features,
unused_import_braces,
unused_qualifications
)]
#[macro_use]
extern crate log;
mod caches;
pub mod compression;
mod constants;
pub mod errors;
pub mod impls;
pub mod kv_collection;
mod orchestrator;
mod readers;
pub mod requests;
pub mod responses;
pub mod settings;
mod threadpool;
mod writers;
use crate::caches::KvPartsDbCaches;
use crate::errors::DbError;
use crate::kv_collection::KvPartsCollection;
use crate::orchestrator::{DbOrchestratorMsg, Orchestrator};
pub use crate::readers::DbReadError;
use crate::requests::{DbAdminRequest, DbReadRequest, DbRequest, DbRequestMsg, DbWriteRequest};
use crate::responses::DbResponse;
use crate::settings::{DbSettings, DbSettingsBackend};
pub use crate::writers::DbWriteError;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::mpsc;
use std::thread;
/// Database files extension
pub static PART_EXT: &'static str = ".bin.gz";
/// DB part id
#[derive(Copy, Clone, Debug, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Serialize)]
pub struct PartId(pub u32);
impl PartId {
fn to_file_name(self) -> String {
format!("_{}{}", self.0, PART_EXT)
}
}
/// DB request id
#[derive(Copy, Clone, Debug, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Serialize)]
pub struct DbReqId(pub usize);
/// Abstract KvParts database
pub trait AbstractKvPartsDb<K, V, C, M>
where
K: 'static + Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: 'static + Clone + KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
Self: Sized,
{
/// Open database
fn open(settings: DbSettings) -> Result<KvPartsDB<K, V, C, M>, DbError> {
let orchestrator_sender = Self::prepare_open(settings)?;
Ok(KvPartsDB::new(orchestrator_sender))
}
/// Prepare open database
fn prepare_open(
settings: DbSettings,
) -> Result<mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>, DbError> {
let raw_cache = match settings.backend {
DbSettingsBackend::Mem => HashMap::new(),
DbSettingsBackend::Files {
raw_cache_max_size, ..
} => HashMap::with_capacity(raw_cache_max_size),
};
let db_caches = KvPartsDbCaches::<K, V, C> {
cache: HashMap::new(),
cache_size: 0,
raw_cache,
parts_have_been_read: VecDeque::new(),
parts_to_be_written: BTreeSet::new(),
phantom_key: PhantomData,
phantom_value: PhantomData,
};
let (orchestrator_sender, orchestrator_recv) = mpsc::channel();
let orchestrator_sender_clone = orchestrator_sender.clone();
let _thread_handler = thread::Builder::new()
.name("KvPartsDB orchestrator".to_owned())
.spawn(move || {
Orchestrator::new(
orchestrator_sender_clone,
orchestrator_recv,
settings,
db_caches,
)
.start_main_loop();
})?;
Ok(orchestrator_sender)
}
}
/// Key Value parts Database
#[derive(Debug)]
pub struct KvPartsDB<K, V, C, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: Clone + KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
{
orchestrator_sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>,
}
impl<K, V, C, M> KvPartsDB<K, V, C, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: 'static + Clone + KvPartsCollection<K, V>,
M: Debug + From<DbResponse<K, V, C>> + Into<Option<DbResponse<K, V, C>>> + Send + Sync,
{
/// Close database
pub fn close(self) -> Result<(), DbError> {
let (send_res_to, recv) = mpsc::channel();
if self
.orchestrator_sender
.send(DbOrchestratorMsg::Req(DbRequestMsg {
req_id: DbReqId(0),
req_content: DbRequest::Admin(DbAdminRequest::Close),
send_res_to,
}))
.is_err()
{
Err(DbError::SendReqError)
} else if let Ok(msg) = recv.recv() {
msg.into().unwrap().req_result?;
Ok(())
} else {
Err(DbError::UnreachableError)
}
}
/// Get db reader
pub fn get_reader(&self) -> KvPartsDBReader<K, V, C, M> {
KvPartsDBReader::new(self.orchestrator_sender.clone())
}
/// Get db writer
pub fn get_writer(&self) -> KvPartsDBWriter<K, V, C, M> {
KvPartsDBWriter::new(self.orchestrator_sender.clone())
}
/// Instantiate administrator
fn new(orchestrator_sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) -> Self {
KvPartsDB {
orchestrator_sender,
}
}
}
/// Database reader
#[derive(Debug, Clone)]
pub struct KvPartsDBReader<K, V, C, M>(mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>)
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: Clone + KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync;
impl<K, V, C, M> KvPartsDBReader<K, V, C, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: Clone + KvPartsCollection<K, V>,
M: Debug + From<DbResponse<K, V, C>> + Into<Option<DbResponse<K, V, C>>> + Send + Sync,
{
/// Instantiate reader
fn new(sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) -> Self {
KvPartsDBReader(sender)
}
/// Send read request
pub fn send_request(
&self,
req_id: DbReqId,
req_content: DbReadRequest<K>,
send_res_to: mpsc::Sender<M>,
) -> Result<(), DbError> {
self.0
.send(DbOrchestratorMsg::Req(DbRequestMsg {
req_id,
req_content: DbRequest::Read(req_content),
send_res_to,
}))
.map_err(|_| DbError::SendReqError)
}
}
/// Database writer
#[derive(Debug, Clone)]
pub struct KvPartsDBWriter<K, V, C, M>(mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>)
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: Clone + KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync;
impl<K, V, C, M> KvPartsDBWriter<K, V, C, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: Clone + KvPartsCollection<K, V>,
M: Debug + From<DbResponse<K, V, C>> + Into<Option<DbResponse<K, V, C>>> + Send + Sync,
{
/// Instantiate writer
fn new(sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) -> Self {
KvPartsDBWriter(sender)
}
/// Send read request
pub fn send_request(
&self,
req_id: DbReqId,
req_content: DbWriteRequest<K, V, C>,
send_res_to: mpsc::Sender<M>,
) -> Result<(), DbError> {
self.0
.send(DbOrchestratorMsg::Req(DbRequestMsg {
req_id,
req_content: DbRequest::Write(req_content),
send_res_to,
}))
.map_err(|_| DbError::SendReqError)
}
}
/*impl DbBackend {
/// is memory DB
pub fn is_memory_db(&self) -> bool {
if let BinDb::Mem { .. } = self {
true
} else {
false
}
}
/// Open new memory database
pub fn new_memory() -> BinDb {
BinDb::Mem {
parts_datas: HashMap::new(),
}
}
/// Open database
pub fn open(dir_path: PathBuf) -> std::io::Result<BinDb> {
if !dir_path.as_path().exists() {
if let Err(io_error) = std::fs::create_dir(dir_path.as_path()) {
if io_error.kind() != std::io::ErrorKind::AlreadyExists {
error!(
"Impossible to create DB dir: {} !",
dir_path.as_path().display()
);
return Err(io_error);
}
}
}
Ok(BinDb::File { dir_path })
}
/// Get parts ids
pub fn get_parts_ids(&self, expected_parts_names: HashSet<&'static str>) -> Vec<PartId> {
match self {
BinDb::File { dir_path } => {
let file_list_result = std::fs::read_dir(dir_path.as_path());
if file_list_result.is_err() {
fatal_error("Fail to open bc-archiver database !");
}
let file_list_result = file_list_result.unwrap();
let mut parts_ids = Vec::new();
for dir_entry in file_list_result {
if let Ok(dir_entry) = dir_entry {
if let Ok(file_name) = dir_entry.file_name().into_string() {
if let Ok(file_type) = dir_entry.file_type() {
if file_type.is_file() {
let file_name_len = file_name.len();
let part_number_result: Result<usize, std::num::ParseIntError> =
file_name[1..file_name_len - PART_EXT.len()].parse();
if let Ok(part_number) = part_number_result {
parts_ids.push(PartId::Int(part_number));
} else if let Some(part_name) =
expected_parts_names.get(&file_name[..file_name_len - PART_EXT.len()]) {
parts_ids.push(PartId::Str(part_name));
}
}
}
}
}
}
parts_ids
}
BinDb::Mem { parts_datas } => {
parts_datas.keys().cloned().collect()
}
}
}
/// Write one part
#[inline]
pub fn write_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&mut self,
part_id: PartId,
datas: &D,
compression: CompressionLevel,
) -> Result<(), DbWriteError> {
writers::write_db_part(self, part_id, datas, compression)
}
/// Write async one part
#[inline]
pub fn write_async_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&mut self,
part_id: PartId,
datas: &D,
compression: CompressionLevel,
sender: mpsc::Sender<DbWriteRes>,
req_id: DbReqId,
) {
if let Err(err) = sender.send(DbWriteRes(
req_id,
writers::write_db_part(self, part_id, datas, compression),
)) {
error!("write_async_part(): fail to send response: {}", err)
}
}
/// Write several parts
#[inline]
pub fn write_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&mut self,
datas: &HashMap<PartId, D>,
compression: CompressionLevel,
) -> Result<(), DbWriteError> {
writers::write_db_parts(self, datas, compression)
}
/// Write several parts
#[inline]
pub fn write_async_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&mut self,
datas: &HashMap<PartId, D>,
compression: CompressionLevel,
sender: mpsc::Sender<DbWriteRes>,
req_id: DbReqId,
) -> Result<(), mpsc::SendError<DbWriteRes>> {
let req_result = writers::write_db_parts(self, datas, compression);
sender.send(DbWriteRes(req_id, req_result))
}
/// Read one part
#[inline]
pub fn read_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&self,
part_id: PartId,
unzip_and_deser: bool,
) -> Result<DbDatas<D>, DbReadError> {
readers::read_db_part(self, part_id, unzip_and_deser)
}
/// Read async one part
#[inline]
pub fn read_async_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&self,
part_id: PartId,
sender: mpsc::Sender<DbReadRes<D>>,
req_id: DbReqId,
unzip_and_deser: bool,
) -> Result<(), mpsc::SendError<DbReadRes<D>>> {
let req_result = readers::read_db_part(&self, part_id, unzip_and_deser).map(|datas| {
let mut response = HashMap::with_capacity(1);
response.insert(part_id, datas);
response
});
sender.send(DbReadRes(req_id, req_result))
}
/// Read several parts
#[inline]
pub fn read_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&mut self,
parts_ids: &[PartId],
unzip_and_deser: bool,
) -> Result<HashMap<PartId, DbDatas<D>>, DbReadError> {
readers::read_db_parts(self, parts_ids, unzip_and_deser)
}
/// Asynchronous reading of several parts
#[inline]
pub fn read_async_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
&mut self,
parts_ids: &[PartId],
sender: mpsc::Sender<DbReadRes<D>>,
req_id: DbReqId,
unzip_and_deser: bool,
) -> Result<(), mpsc::SendError<DbReadRes<D>>> {
sender.send(DbReadRes(
req_id,
readers::read_db_parts(self, parts_ids, unzip_and_deser),
))
}
}*/
This diff is collapsed.
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Read databases persisted in compressed binary files.
use crate::compression::uncompress_binary_datas;
use crate::kv_collection::KvPartsCollection;
use crate::requests::DatasFormat;
use crate::responses::DbDatas;
use crate::PartId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
#[derive(Debug)]
/// DB read error
pub enum DbReadError {
/// Serialize error
DeserializeError(bincode::Error),
/// I/O error
IoError(std::io::Error),
/// Part not found
PartNotFound(PartId),
}
impl std::fmt::Display for DbReadError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
DbReadError::DeserializeError(e) => write!(f, "DbReadError: deserialize error: {}", e),
DbReadError::IoError(e) => write!(f, "DbReadError: I/O error: {}", e),
DbReadError::PartNotFound(part_id) => write!(
f,
"DbReadError: part '{}' not found",
part_id.to_file_name()
),
}
}
}
impl From<std::io::Error> for DbReadError {
fn from(err: std::io::Error) -> Self {
DbReadError::IoError(err)
}
}
impl From<bincode::Error> for DbReadError {
fn from(err: bincode::Error) -> Self {
DbReadError::DeserializeError(err)
}
}
/*/// Read on DB part
pub fn read_db_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
db: &BinDb,
part_id: PartId,
unzip_and_deser: bool,
) -> Result<DbDatas<D>, DbReadError> {
match db {
BinDb::File { ref dir_path } => read_part_from_file(dir_path, part_id, unzip_and_deser),
BinDb::Mem { ref parts_datas } => {
if let Some(compressed_bin_datas) = parts_datas.get(&part_id) {
if unzip_and_deser {
Ok(DbDatas::Datas(uncompress_and_deserialize(
compressed_bin_datas,
)?))
} else {
Ok(DbDatas::CompressedBinDatas(compressed_bin_datas.to_vec()))
}
} else {
Err(DbReadError::PartNotFound(part_id))
}
}
}
}
/// Read several Db parts
pub fn read_db_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
db: &BinDb,
parts_ids: &[PartId],
unzip_and_deser: bool,
) -> Result<HashMap<PartId, DbDatas<D>>, DbReadError> {
match db {
BinDb::File { ref dir_path } => parts_ids
.par_iter()
.map(|part_id| {
read_part_from_file(dir_path, *part_id, unzip_and_deser)
.map(|datas| (*part_id, datas))
})
.collect::<Result<HashMap<PartId, DbDatas<D>>, DbReadError>>(),
BinDb::Mem { ref parts_datas } => parts_ids
.par_iter()
.map(|part_id| {
if let Some(compressed_bin_datas) = parts_datas.get(&part_id) {
if unzip_and_deser {
uncompress_and_deserialize(compressed_bin_datas)
.map(|datas| (*part_id, DbDatas::Datas(datas)))
} else {
Ok((
*part_id,
DbDatas::CompressedBinDatas(compressed_bin_datas.to_vec()),
))
}
} else {
Err(DbReadError::PartNotFound(*part_id))
}
})
.collect::<Result<HashMap<PartId, DbDatas<D>>, DbReadError>>(),
}
}*/
#[inline]
pub fn uncompress_and_deserialize<D: DeserializeOwned>(
compressed_bin_datas: &[u8],
) -> Result<D, DbReadError> {
let bin_datas = uncompress_binary_datas(compressed_bin_datas)?;
Ok(bincode::deserialize(&bin_datas[..])?)
}
pub fn read_part_from_file<K, V, C>(
dir_path: &PathBuf,
part_id: PartId,
expected_format: DatasFormat,
) -> Result<DbDatas<K, V, C>, DbReadError>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
let compressed_bin_datas = read_raw_part_from_file(dir_path, part_id)?;
match expected_format {
DatasFormat::SerAndZip => {
let mut raw_parts_datas = BTreeMap::new();
raw_parts_datas.insert(part_id, compressed_bin_datas);
Ok(DbDatas::SerAndZip(raw_parts_datas))
}
DatasFormat::UnzipAndSer => {
let mut ser_parts_datas = BTreeMap::new();
ser_parts_datas.insert(part_id, uncompress_binary_datas(&compressed_bin_datas)?);
Ok(DbDatas::UnzipAndSer(ser_parts_datas))
}
DatasFormat::UnzipAndDeser => {
let mut parts_datas = BTreeMap::new();
parts_datas.insert(
part_id,
uncompress_and_deserialize(&compressed_bin_datas[..])?,
);
Ok(DbDatas::UnzipAndDeser(parts_datas))
}
}
}
pub fn read_raw_part_from_file(
dir_path: &PathBuf,
part_id: PartId,
) -> Result<Vec<u8>, DbReadError> {
let mut file_path = dir_path.clone();
file_path.push(part_id.to_file_name());
let mut file = File::open(file_path)?;
let mut compressed_bin_datas = Vec::new();
file.read_to_end(&mut compressed_bin_datas)?;
Ok(compressed_bin_datas)
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define possible requests to a KvPartsDB
use crate::compression::CompressionLevel;
use crate::kv_collection::KvPartsCollection;
use crate::responses::DbResponse;
use crate::{DbReqId, PartId};
use fnv::FnvHashSet;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::mpsc;
/// Datas format
#[derive(Debug, Copy, Clone)]
pub enum DatasFormat {
/// Compressed and serialize
SerAndZip,
/// Uncompressed and serialize
UnzipAndSer,
/// Uncompressed and deserialize
UnzipAndDeser,
}
/// Database Request
#[derive(Debug, Clone)]
pub struct DbRequestMsg<K, V, C, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
C: KvPartsCollection<K, V>,
{
/// Request id
pub req_id: DbReqId,
/// Request content
pub req_content: DbRequest<K, V, C>,
/// The response will be sent via this sender
pub send_res_to: mpsc::Sender<M>,
}
/// Database Request content
#[derive(Debug, Clone)]
pub enum DbRequest<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Read request
Read(DbReadRequest<K>),
/// Write request
Write(DbWriteRequest<K, V, C>),
/// Admin request
Admin(DbAdminRequest),
}
/// Database read request
#[derive(Debug, Clone)]
pub enum DbReadRequest<K>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
{
/// Get all parts ids
GetAllPartsIds,
/// Force the retention of specific parts in the cache
ForceCacheParts {
/// Parts identifiers
parts_id: FnvHashSet<PartId>,
},
/// Read values
ReadValues {
/// Keys
keys: Vec<K>,
},
/// Read parts
ReadParts {
/// Identifiers of the requested parts
parts_ids: Vec<PartId>,
/// Expected format
expected_format: DatasFormat,
},
/// Disable forcing retention of specific parts in the cache
UnforceCacheParts {
/// Parts identifiers
parts_id: Vec<PartId>,
},
}
/// Database write request
#[derive(Debug, Clone)]
pub enum DbWriteRequest<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Write values
Write {
/// Data to be written
datas: C,
/// Write in cache or write directly in backend
cache: bool,
/// Compression level (if None, use value in db settings)
compression: Option<CompressionLevel>,
},
/// Remove values
Remove {
/// Keys to be removed
keys: Vec<K>,
},
/// Save write cache in backend
Save,
/// Phantom
Phantom(PhantomData<K>, PhantomData<V>),
}
/// Database admin request
#[derive(Debug, Copy, Clone)]
pub enum DbAdminRequest {
/// Close database
Close,
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define possible responses to a DB request
use crate::errors::DbError;
use crate::kv_collection::KvPartsCollection;
use crate::{DbReqId, PartId};
use fnv::FnvHashSet;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
/// Database Response
#[derive(Debug)]
pub struct DbResponse<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Request id
pub req_id: DbReqId,
/// Request result
pub req_result: Result<DbResContent<K, V, C>, DbError>,
}
/// DB response content
#[derive(Debug, PartialEq, Eq)]
pub enum DbResContent<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Successfully load all DB parts in memory
LoadDbOk,
/// All parts ids
AllPartsIds(FnvHashSet<PartId>),
/// Values read or deleted
Values {
/// Found values
found: C,
/// List of requested keys that do not exist in the database
missing_keys: Vec<K>,
},
/// Values by part
ValuesByPart {
/// Found values by part
found: BTreeMap<PartId, C>,
/// List of requested keys that do not exist in the database
missing_keys: Vec<K>,
},
/// Parts datas
Parts(DbDatas<K, V, C>),
/// Successfully write datas
WriteOk,
/// Successfully save DB into backend
SaveOk,
/// Successfully close DB
CloseOk,
}
/// DB Datas
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DbDatas<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Compressed and serialize
SerAndZip(BTreeMap<PartId, Vec<u8>>),
/// Uncompressed and serialize
UnzipAndSer(BTreeMap<PartId, Vec<u8>>),
/// Uncompressed and deserialize
UnzipAndDeser(BTreeMap<PartId, C>),
/// Phantom
Phantom(PhantomData<K>, PhantomData<V>),
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define KvPartsDB settings
use crate::compression::CompressionLevel;
use std::num::NonZeroUsize;
use std::path::PathBuf;
#[derive(Debug, Default, Clone)]
/// Builder for DbSettings
pub struct DbSettingsBuilder {
/// Backend settings
backend: Option<DbSettingsBackend>,
/// Cache sixe (in number of items)
cache_max_size: Option<NonZeroUsize>,
/// Maximum number of values to clone in the cache per request
max_clone_values: Option<usize>,
/// Default compression level
default_compression: Option<CompressionLevel>,
// Number of workers in the pool processing requests to the database
workers_count: Option<usize>,
}
impl DbSettingsBuilder {
/// Create builder
pub fn new() -> Self {
DbSettingsBuilder {
backend: None,
cache_max_size: None,
max_clone_values: None,
default_compression: None,
workers_count: None,
}
}
/// Number of workers in the pool processing requests to the database
pub fn cache_max_size(&mut self, cache_max_size: NonZeroUsize) -> &mut Self {
self.cache_max_size = Some(cache_max_size);
self
}
/// Change maximum number of values to clone in the cache per request
pub fn max_clone_values(&mut self, max_clone_values: usize) -> &mut Self {
self.max_clone_values = Some(max_clone_values);
self
}
/// Change default compression level
pub fn default_compression(&mut self, default_compression: usize) -> &mut Self {
self.default_compression = Some(CompressionLevel::new(default_compression as u32));
self
}
/// Number of workers in the pool processing requests to the database
pub fn workers_count(&mut self, workers_count: usize) -> &mut Self {
self.workers_count = Some(workers_count);
self
}
/// Use file database
pub fn file_db(&mut self, dir_path: PathBuf) -> &mut Self {
self.backend = Some(DbSettingsBackend::new_file_backend(dir_path));
self
}
/// Use memory database
pub fn memory_db(&mut self) -> &mut Self {
self.backend = Some(DbSettingsBackend::Mem);
self
}
/// Change raw_cache_max_size param (file backend only)
pub fn raw_cache_max_size(&mut self, raw_cache_max_size_: usize) -> &mut Self {
if let Some(DbSettingsBackend::Files {
ref mut raw_cache_max_size,
..
}) = self.backend
{
*raw_cache_max_size = raw_cache_max_size_;
} else {
panic!("raw_cache_max_size parameter available only for file database !")
}
self
}
/// Change cache_all_db param (file backend only)
pub fn cache_all_db(&mut self, cache_all_db_: bool) -> &mut Self {
if let Some(DbSettingsBackend::Files {
ref mut cache_all_db,
..
}) = self.backend
{
*cache_all_db = cache_all_db_;
} else {
panic!("cache_all_db parameter available only for file database !")
}
self
}
/// Build DbSettings
pub fn build(&self) -> DbSettings {
DbSettings {
cache_max_size: self.cache_max_size,
max_clone_values: self
.max_clone_values
.unwrap_or(*crate::constants::DEFAULT_MAX_CLONED_VALUES),
default_compression: self
.default_compression
.unwrap_or_else(|| CompressionLevel::new(1u32)),
backend: self.backend.clone().unwrap_or(DbSettingsBackend::Mem),
workers_count: self.workers_count.unwrap_or_else(num_cpus::get),
}
}
}
#[derive(Debug, Clone)]
/// DB Settings
pub struct DbSettings {
/// Backend settings
pub backend: DbSettingsBackend,
/// Cache max sixe (in number of items)
/// None = infinite size
pub cache_max_size: Option<NonZeroUsize>,
/// Maximum number of values to clone in the cache per request
pub max_clone_values: usize,
/// Default compression level
pub default_compression: CompressionLevel,
/// Number of workers in the pool processing requests to the database
pub workers_count: usize,
}
impl DbSettings {
/// Default settings
pub fn default(dir_path: PathBuf) -> Self {
DbSettings {
backend: DbSettingsBackend::Files {
dir_path,
raw_cache_max_size: *crate::constants::DEFAULT_MAX_RAW_CACHE_SIZE,
cache_all_db: false,
},
cache_max_size: NonZeroUsize::new(*crate::constants::DEFAULT_MAX_CACHE_SIZE),
max_clone_values: *crate::constants::DEFAULT_MAX_CLONED_VALUES,
default_compression: CompressionLevel::new(1u32),
workers_count: num_cpus::get(),
}
}
}
/// KvPartsDb backend
#[derive(Debug, Clone)]
pub enum DbSettingsBackend {
/// Files database
Files {
/// Path to directory store database files
dir_path: PathBuf,
/// Raw cache max size (in number of parts)
raw_cache_max_size: usize,
/// Cache all database in memory (false by default)
/// The maximum cache size is not considered
cache_all_db: bool,
},
/// Memory database
Mem,
}
impl DbSettingsBackend {
fn new_file_backend(dir_path: PathBuf) -> Self {
DbSettingsBackend::Files {
dir_path,
raw_cache_max_size: *crate::constants::DEFAULT_MAX_RAW_CACHE_SIZE,
cache_all_db: false,
}
}
/// Get directory path
pub fn get_dir_path(&self) -> Option<PathBuf> {
if let DbSettingsBackend::Files { dir_path, .. } = self {
Some(dir_path.clone())
} else {
None
}
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Manage custom threadpool for database.
pub mod workers;
pub mod workers_msg;
use crate::errors::DbError;
use crate::kv_collection::KvPartsCollection;
use crate::orchestrator::DbOrchestratorMsg;
use crate::responses::{DbResContent, DbResponse};
use crate::threadpool::workers_msg::{WorkerReq, WorkerReqContent};
use crate::{DbReqId, PartId};
use core_affinity::CoreId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::cmp;
use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::mpsc::{SendError, Sender};
use std::thread;
use std::thread::JoinHandle;
#[derive(Debug)]
pub struct ThreadPool<K, V, C, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
{
workers_senders: Vec<Sender<WorkerReq<K, V, C>>>,
workers_handlers: Vec<JoinHandle<()>>,
available_workers: VecDeque<usize>,
pending_jobs: VecDeque<WorkerReq<K, V, C>>,
phantom: PhantomData<M>,
}
impl<K, V, C, M> ThreadPool<K, V, C, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: 'static + Clone + KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
{
pub fn new(
workers_count: usize,
db_dir_path: Option<PathBuf>,
orchestrator_sender: Sender<DbOrchestratorMsg<K, V, C, M>>,
) -> Self {
let core_ids: Vec<Option<CoreId>> = if let Some(core_ids) = core_affinity::get_core_ids() {
core_ids.into_iter().map(Some).collect()
} else {
(0..num_cpus::get()).map(|_| None).collect()
};
let cpus_count = core_ids.len();
let mut workers_senders = Vec::with_capacity(workers_count);
let mut workers_handlers = Vec::with_capacity(workers_count);
for i in 0..workers_count {
let dir_path = db_dir_path.clone();
let orchestrator_sender_clone = orchestrator_sender.clone();
let (sender, recv) = std::sync::mpsc::channel();
workers_senders.push(sender);
let core_id = core_ids[i % cpus_count];
let thread_handler = thread::Builder::new()
.name(format!("KvPartsDB worker {}", i))
.spawn(move || {
crate::threadpool::workers::launch_worker::<K, V, C, M>(
i,
core_id,
dir_path,
orchestrator_sender_clone,
recv,
)
})
.expect("Fatal error: fail to spawn worker thread !");
workers_handlers.push(thread_handler);
}
ThreadPool {
workers_senders,
workers_handlers,
available_workers: (0..workers_count).collect(),
pending_jobs: VecDeque::new(),
phantom: PhantomData,
}
}
pub fn new_jobs(
&mut self,
pending_reqs_senders: &mut BTreeMap<DbReqId, Sender<M>>,
req_id: DbReqId,
jobs: Vec<WorkerReq<K, V, C>>,
) {
self.pending_jobs.append(&mut VecDeque::from(jobs));
if self.launch_pending_jobs().is_err() {
let _ = pending_reqs_senders
.remove(&req_id)
.unwrap_or_else(|| panic!(dbg!("dev error")))
.send(
DbResponse {
req_id,
req_result: Err(DbError::SendReqError),
}
.into(),
);
}
}
#[inline]
pub fn worker_available(
&mut self,
worker_id: usize,
) -> Result<(), SendError<WorkerReq<K, V, C>>> {
self.available_workers.push_back(worker_id);
self.launch_pending_jobs()
}
fn launch_pending_jobs(&mut self) -> Result<(), SendError<WorkerReq<K, V, C>>> {
let count_launchable_jobs = cmp::min(self.pending_jobs.len(), self.available_workers.len());
for _ in 0..count_launchable_jobs {
let worker_id = self
.available_workers
.pop_front()
.unwrap_or_else(|| panic!(dbg!("dev error")));
let job = self
.pending_jobs
.pop_front()
.unwrap_or_else(|| panic!(dbg!("dev error")));
self.workers_senders[worker_id].send(job)?
}
Ok(())
}
pub fn stop(self, req_id: DbReqId, send_res_to: Sender<M>) {
let mut err = None;
for (worker_sender, worker_handler) in self
.workers_senders
.into_iter()
.zip(self.workers_handlers.into_iter())
{
let _ = worker_sender.send(WorkerReq {
part_id: PartId(0),
req_id: DbReqId(0),
req_content: WorkerReqContent::Stop,
});
if let Err(err_) = worker_handler.join() {
err = Some(err_);
}
}
let req_result = if err.is_some() {
Err(DbError::CloseError)
} else {
Ok(DbResContent::CloseOk)
};
let _ = send_res_to.send(DbResponse { req_id, req_result }.into());
}
}