...
 
Commits (2)
......@@ -7,6 +7,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
name = "adler32"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ansi_term"
......@@ -137,6 +140,11 @@ dependencies = [
"byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "build_const"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "byte-tools"
version = "0.2.0"
......@@ -203,6 +211,33 @@ name = "constant_time_eq"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "core_affinity"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crc"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crc32fast"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.2.0"
......@@ -642,6 +677,17 @@ name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flate2"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz-sys 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fnv"
version = "1.0.6"
......@@ -782,6 +828,20 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "kv-parts-dbs"
version = "0.1.0-a0.1"
dependencies = [
"bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)",
"flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.86 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "1.2.0"
......@@ -836,6 +896,34 @@ name = "memoffset"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "miniz-sys"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz_oxide"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz_oxide_c_api"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)",
"crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz_oxide 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "mio"
version = "0.6.16"
......@@ -1593,6 +1681,7 @@ dependencies = [
[metadata]
"checksum aho-corasick 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e6f484ae0c99fec2e858eb6134949117399f222608d84cadb3f58c1f97c2364c"
"checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum argon2rs 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3f67b0b6a86dae6e67ff4ca2b6201396074996379fba2b92ff649126f37cb392"
"checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee"
......@@ -1609,6 +1698,7 @@ dependencies = [
"checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12"
"checksum blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "5d6d530bdd2d52966a6d03b7a964add7ae1a288d25214066fd4b600f0f796400"
"checksum block-buffer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a076c298b9ecdb530ed9d967e74a6027d6a7478924520acddcddc24c1c8ab3ab"
"checksum build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39"
"checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40"
"checksum byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "94f88df23a25417badc922ab0f5716cc1330e87f71ddd9203b3a3ccd9cedf75d"
"checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa"
......@@ -1618,6 +1708,9 @@ dependencies = [
"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e"
"checksum core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6d162c6e463c31dbf78fefa99d042156c1c74d404e299cfe3df2923cb857595b"
"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3"
"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150"
"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
......@@ -1628,6 +1721,7 @@ dependencies = [
"checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2"
"checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
"checksum flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f87e68aa82b2de08a6e037f1385455759df6e445a8df5e005b4297191dbf18aa"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
"checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
......@@ -1655,6 +1749,9 @@ dependencies = [
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2efc7bc57c883d4a4d6e3246905283d8dae951bb3bd32f49d6ef297f546e1c39"
"checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3"
"checksum miniz-sys 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0300eafb20369952951699b68243ab4334f4b10a88f411c221d444b36c40e649"
"checksum miniz_oxide 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c468f2369f07d651a5d0bb2c9079f8488a66d5466efe42d0c5c6466edcb7f71e"
"checksum miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b7fe927a42e3807ef71defb191dc87d4e24479b221e67015fe38ae2b7b447bab"
"checksum mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "71646331f2619b1026cc302f87a2b8b648d5c6dd6937846a16cc8ce0f347f432"
"checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
......
......@@ -17,6 +17,7 @@ members = [
"lib/tests-tools/documents-tests-tools",
"lib/tests-tools/common-tests-tools",
"lib/tools/crypto",
"lib/tools/kv-parts-dbs",
"lib/tools/common-tools",
"lib/tools/documents",
"lib/tools/json-pest-parser",
......
......@@ -231,10 +231,10 @@ fn parse_value<S: std::hash::BuildHasher + Default>(pair: Pair<Rule>) -> JSONVal
}
pub fn get_optional_usize<S: std::hash::BuildHasher>(
json_block: &HashMap<&str, JSONValue<S>, S>,
json_object: &HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Option<usize>, Error> {
Ok(match json_block.get(field) {
Ok(match json_object.get(field) {
Some(value) => {
if !value.is_null() {
Some(
......@@ -257,10 +257,10 @@ pub fn get_optional_usize<S: std::hash::BuildHasher>(
}
pub fn get_optional_str_not_empty<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Option<&'a str>, Error> {
let result = get_optional_str(json_block, field);
let result = get_optional_str(json_object, field);
if let Ok(Some(value)) = result {
if !value.is_empty() {
Ok(Some(value))
......@@ -273,10 +273,10 @@ pub fn get_optional_str_not_empty<'a, S: std::hash::BuildHasher>(
}
pub fn get_optional_str<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Option<&'a str>, Error> {
Ok(match json_block.get(field) {
Ok(match json_object.get(field) {
Some(value) => {
if !value.is_null() {
Some(value.to_str().ok_or_else(|| ParseJsonError {
......@@ -291,10 +291,10 @@ pub fn get_optional_str<'a, S: std::hash::BuildHasher>(
}
pub fn get_number<S: std::hash::BuildHasher>(
json_block: &HashMap<&str, JSONValue<S>, S>,
json_object: &HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<f64, Error> {
Ok(json_block
Ok(json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......@@ -306,10 +306,10 @@ pub fn get_number<S: std::hash::BuildHasher>(
}
pub fn get_str<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<&'a str, Error> {
Ok(json_block
Ok(json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......@@ -321,10 +321,10 @@ pub fn get_str<'a, S: std::hash::BuildHasher>(
}
pub fn get_str_array<'a, S: std::hash::BuildHasher>(
json_block: &'a HashMap<&str, JSONValue<S>, S>,
json_object: &'a HashMap<&str, JSONValue<S>, S>,
field: &str,
) -> Result<Vec<&'a str>, ParseJsonError> {
json_block
json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......@@ -346,10 +346,10 @@ pub fn get_str_array<'a, S: std::hash::BuildHasher>(
}
pub fn get_object_array<'a, S: std::hash::BuildHasher>(
json_block: &'a JsonObject<'a, S>,
json_object: &'a JsonObject<'a, S>,
field: &str,
) -> Result<Vec<&'a JsonObject<'a, S>>, ParseJsonError> {
json_block
json_object
.get(field)
.ok_or_else(|| ParseJsonError {
cause: format!("Fail to parse json : field '{}' must exist !", field),
......
[package]
name = "kv-parts-dbs"
version = "0.1.0-a0.1"
authors = ["librelois <elois@ifee.fr>"]
description = "Manage key-value databases persisted in compressed binary files."
license = "AGPL-3.0"
edition = "2018"
[lib]
path = "src/lib.rs"
[dependencies]
bincode = "1.0.1"
core_affinity = "0.5.9"
flate2 = "1.0.7"
fnv = "1.0.6"
log = "0.4.*"
num_cpus = "1.8.*"
rayon = "1.0.*"
serde = { version = "1.0.*", features = ["derive"] }
[dev-dependencies]
# KV Parts DBs
Manage key-value databases persisted in compressed binary files.
\ No newline at end of file
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define KvPartsDB caches
use crate::kv_collection::KvPartsCollection;
use crate::PartId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
/// Database caches
#[derive(Debug)]
pub struct KvPartsDbCaches<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Cache
pub cache: HashMap<PartId, C>,
/// Cache size (in number of items)
pub cache_size: usize,
/// Raw cache
pub raw_cache: HashMap<PartId, Vec<u8>>,
/// Queue of the part identifiers that have been read
pub parts_have_been_read: VecDeque<PartId>,
/// Identifiers of the parts to be written
pub parts_to_be_written: BTreeSet<PartId>,
/// Phantom key
pub phantom_key: PhantomData<K>,
// Phantom value
pub phantom_value: PhantomData<V>,
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Manage databases compression.
use flate2::bufread::GzDecoder;
use flate2::bufread::GzEncoder;
use flate2::Compression as FlateCompression;
use std::io::prelude::*;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
/// Describe the compression level with an explicitly specified integer.
/// The integer here is typically on a scale of 0-9 where 0 means "no compression" and 9 means "take as long as you'd like".
pub struct CompressionLevel(u32);
impl CompressionLevel {
/// Create CompressionLevel
pub fn new(level: u32) -> Self {
if level < 10 {
CompressionLevel(level)
} else {
panic!("CompressionLevel::new() : level must be between 0 and 9 !")
}
}
/// Get compression level
pub fn level(self) -> u32 {
self.0
}
}
impl Default for CompressionLevel {
fn default() -> CompressionLevel {
CompressionLevel(1)
}
}
impl Into<FlateCompression> for CompressionLevel {
fn into(self) -> FlateCompression {
FlateCompression::new(self.0)
}
}
/// Compress binary datas (use gzip)
pub fn compress_binary_datas(
datas: &[u8],
compression: CompressionLevel,
) -> std::io::Result<Vec<u8>> {
let mut gz = GzEncoder::new(datas, compression.into());
let mut buffer = Vec::new();
gz.read_to_end(&mut buffer)?;
Ok(buffer)
}
/// Uncompress gzip binary datas
pub fn uncompress_binary_datas(datas: &[u8]) -> std::io::Result<Vec<u8>> {
let mut gz = GzDecoder::new(datas);
let mut buffer = Vec::new();
gz.read_to_end(&mut buffer)?;
Ok(buffer)
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KvPartsDB Constants
/// Max cache sixe (in number of items)
pub static DEFAULT_MAX_CACHE_SIZE: &'static usize = &10_000;
/// Max raw cache sixe (in number of parts)
pub static DEFAULT_MAX_RAW_CACHE_SIZE: &'static usize = &2_000;
/// Clone requested values to cache ?
pub static DEFAULT_MAX_CLONED_VALUES: &'static usize = &0;
pub static PART_FILE_NAME_BEGIN: &'static str = "_";
pub static PART_FILE_NAME_END: &'static str = ".bin.gz";
pub static PART_FILE_NAME_MIN_LEN: &'static usize = &9;
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define possible DB Errors
use crate::readers::DbReadError;
use crate::writers::DbWriteError;
/// DB Error
#[derive(Debug)]
pub enum DbError {
/// Spawn error
SpawnError(std::io::Error),
/// DB read error
DbReadError(DbReadError),
/// DB write error
DbWriteError(DbWriteError),
/// Need save before execute request
NeedSave,
/// Error when send request
SendReqError,
/// Error when closing the DB
CloseError,
/// The database is no longer reachable
UnreachableError,
/// Forbidden request
ForbiddenRequest,
}
impl From<DbReadError> for DbError {
fn from(err: DbReadError) -> Self {
DbError::DbReadError(err)
}
}
impl From<DbWriteError> for DbError {
fn from(err: DbWriteError) -> Self {
DbError::DbWriteError(err)
}
}
impl From<std::io::Error> for DbError {
fn from(err: std::io::Error) -> Self {
DbError::SpawnError(err)
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KbPartsDB implementation using HashMap to store collection
use crate::kv_collection::KvPartsCollection;
use crate::responses::DbResponse;
use crate::{AbstractKvPartsDb, PartId};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
/// HashMap Database
#[derive(Debug, Clone)]
pub struct KvPartsHashDB<K, V, S, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Hash + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
S: Default + std::hash::BuildHasher + Send + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, HashMap<K, V, S>>>
+ Into<Option<DbResponse<K, V, HashMap<K, V, S>>>>
+ Send
+ Sync,
{
/// Phantom key
phantom_key: PhantomData<K>,
/// Phantom value
phantom_value: PhantomData<V>,
/// Phantom msg
phantom_msg: PhantomData<M>,
/// Phantom hasher
phantom_hasher: PhantomData<S>,
}
impl<K, V, S, M> AbstractKvPartsDb<K, V, HashMap<K, V, S>, M> for KvPartsHashDB<K, V, S, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Hash
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
S: 'static + Clone + Default + std::hash::BuildHasher + Send + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, HashMap<K, V, S>>>
+ Into<Option<DbResponse<K, V, HashMap<K, V, S>>>>
+ Send
+ Sync,
{
}
impl<K, V, S> KvPartsCollection<K, V> for HashMap<K, V, S>
where
K: Debug + DeserializeOwned + Eq + Hash + Send + Serialize + Sync,
V: Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
S: Default + std::hash::BuildHasher + Send + Sync,
{
#[inline]
fn new() -> Self {
HashMap::with_hasher(S::default())
}
#[inline]
fn contains_key(&self, key: &K) -> bool {
self.contains_key(key)
}
#[inline]
fn insert(&mut self, key: K, value: V) -> Option<V> {
self.insert(key, value)
}
#[inline]
fn get(&self, key: &K) -> Option<&V> {
self.get(key)
}
#[inline]
fn remove(&mut self, key: &K) -> Option<V> {
self.remove(key)
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KbPartsDB implementations
pub mod hash_db;
pub mod tree_db;
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! KbPartsDB implementation using BTreeMap to store collection
use crate::kv_collection::KvPartsCollection;
use crate::responses::DbResponse;
use crate::{AbstractKvPartsDb, PartId};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
/// BTreeMap Database
#[derive(Debug, Clone)]
pub struct KvPartsTreeDB<K, V, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Ord + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, BTreeMap<K, V>>>
+ Into<Option<DbResponse<K, V, BTreeMap<K, V>>>>
+ Send
+ Sync,
{
/// Phantom key
phantom_key: PhantomData<K>,
/// Phantom value
phantom_value: PhantomData<V>,
/// Phantom msg
phantom_msg: PhantomData<M>,
}
impl<K, V, M> AbstractKvPartsDb<K, V, BTreeMap<K, V>, M> for KvPartsTreeDB<K, V, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Ord
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, BTreeMap<K, V>>>
+ Into<Option<DbResponse<K, V, BTreeMap<K, V>>>>
+ Send
+ Sync,
{
}
impl<K, V> KvPartsCollection<K, V> for BTreeMap<K, V>
where
K: Debug + DeserializeOwned + Eq + Ord + Send + Serialize + Sync,
V: Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
{
#[inline]
fn new() -> Self {
BTreeMap::new()
}
#[inline]
fn contains_key(&self, key: &K) -> bool {
self.contains_key(key)
}
#[inline]
fn insert(&mut self, key: K, value: V) -> Option<V> {
self.insert(key, value)
}
#[inline]
fn get(&self, key: &K) -> Option<&V> {
self.get(key)
}
#[inline]
fn remove(&mut self, key: &K) -> Option<V> {
self.remove(key)
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define KvPart map trait and implement it to some collections
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::iter::{FromIterator, IntoIterator};
/// KvParts Map
pub trait KvPartsCollection<K, V>:
Debug
+ DeserializeOwned
+ Eq
+ Extend<(K, V)>
+ FromIterator<(K, V)>
+ IntoIterator<Item = (K, V)>
+ Send
+ Serialize
+ Sync
{
/// Instantiate new empty collection
fn new() -> Self;
/// Return true if the map contains a value for the specified key
fn contains_key(&self, key: &K) -> bool;
/// Insert a key-value pair into the map. An existing value for a
/// key is replaced by the new value.
///
/// If the map did not have this key present, `None` is returned.
///
/// If the map did have this key present, the value is updated, and the old
/// value is returned.
fn insert(&mut self, key: K, value: V) -> Option<V>;
/// Returns a reference to the value corresponding to the key.
/// The key may be any borrowed form of the map's key type.
fn get(&self, key: &K) -> Option<&V>;
/// Remove a key-value pair from the map. Return the value if the key
/// was present in the map, otherwise return None.
fn remove(&mut self, key: &K) -> Option<V>;
}
This diff is collapsed.
This diff is collapsed.
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Read databases persisted in compressed binary files.
use crate::compression::uncompress_binary_datas;
use crate::kv_collection::KvPartsCollection;
use crate::requests::DatasFormat;
use crate::responses::DbDatas;
use crate::PartId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
#[derive(Debug)]
/// DB read error
pub enum DbReadError {
/// Serialize error
DeserializeError(bincode::Error),
/// I/O error
IoError(std::io::Error),
/// Part not found
PartNotFound(PartId),
}
impl std::fmt::Display for DbReadError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
DbReadError::DeserializeError(e) => write!(f, "DbReadError: deserialize error: {}", e),
DbReadError::IoError(e) => write!(f, "DbReadError: I/O error: {}", e),
DbReadError::PartNotFound(part_id) => write!(
f,
"DbReadError: part '{}' not found",
part_id.to_file_name()
),
}
}
}
impl From<std::io::Error> for DbReadError {
fn from(err: std::io::Error) -> Self {
DbReadError::IoError(err)
}
}
impl From<bincode::Error> for DbReadError {
fn from(err: bincode::Error) -> Self {
DbReadError::DeserializeError(err)
}
}
/*/// Read on DB part
pub fn read_db_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
db: &BinDb,
part_id: PartId,
unzip_and_deser: bool,
) -> Result<DbDatas<D>, DbReadError> {
match db {
BinDb::File { ref dir_path } => read_part_from_file(dir_path, part_id, unzip_and_deser),
BinDb::Mem { ref parts_datas } => {
if let Some(compressed_bin_datas) = parts_datas.get(&part_id) {
if unzip_and_deser {
Ok(DbDatas::Datas(uncompress_and_deserialize(
compressed_bin_datas,
)?))
} else {
Ok(DbDatas::CompressedBinDatas(compressed_bin_datas.to_vec()))
}
} else {
Err(DbReadError::PartNotFound(part_id))
}
}
}
}
/// Read several Db parts
pub fn read_db_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>(
db: &BinDb,
parts_ids: &[PartId],
unzip_and_deser: bool,
) -> Result<HashMap<PartId, DbDatas<D>>, DbReadError> {
match db {
BinDb::File { ref dir_path } => parts_ids
.par_iter()
.map(|part_id| {
read_part_from_file(dir_path, *part_id, unzip_and_deser)
.map(|datas| (*part_id, datas))
})
.collect::<Result<HashMap<PartId, DbDatas<D>>, DbReadError>>(),
BinDb::Mem { ref parts_datas } => parts_ids
.par_iter()
.map(|part_id| {
if let Some(compressed_bin_datas) = parts_datas.get(&part_id) {
if unzip_and_deser {
uncompress_and_deserialize(compressed_bin_datas)
.map(|datas| (*part_id, DbDatas::Datas(datas)))
} else {
Ok((
*part_id,
DbDatas::CompressedBinDatas(compressed_bin_datas.to_vec()),
))
}
} else {
Err(DbReadError::PartNotFound(*part_id))
}
})
.collect::<Result<HashMap<PartId, DbDatas<D>>, DbReadError>>(),
}
}*/
#[inline]
pub fn uncompress_and_deserialize<D: DeserializeOwned>(
compressed_bin_datas: &[u8],
) -> Result<D, DbReadError> {
let bin_datas = uncompress_binary_datas(compressed_bin_datas)?;
Ok(bincode::deserialize(&bin_datas[..])?)
}
pub fn read_part_from_file<K, V, C>(
dir_path: &PathBuf,
part_id: PartId,
expected_format: DatasFormat,
) -> Result<DbDatas<K, V, C>, DbReadError>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
let compressed_bin_datas = read_raw_part_from_file(dir_path, part_id)?;
match expected_format {
DatasFormat::SerAndZip => {
let mut raw_parts_datas = BTreeMap::new();
raw_parts_datas.insert(part_id, compressed_bin_datas);
Ok(DbDatas::SerAndZip(raw_parts_datas))
}
DatasFormat::UnzipAndSer => {
let mut ser_parts_datas = BTreeMap::new();
ser_parts_datas.insert(part_id, uncompress_binary_datas(&compressed_bin_datas)?);
Ok(DbDatas::UnzipAndSer(ser_parts_datas))
}
DatasFormat::UnzipAndDeser => {
let mut parts_datas = BTreeMap::new();
parts_datas.insert(
part_id,
uncompress_and_deserialize(&compressed_bin_datas[..])?,
);
Ok(DbDatas::UnzipAndDeser(parts_datas))
}
}
}
pub fn read_raw_part_from_file(
dir_path: &PathBuf,
part_id: PartId,
) -> Result<Vec<u8>, DbReadError> {
let mut file_path = dir_path.clone();
file_path.push(part_id.to_file_name());
let mut file = File::open(file_path)?;
let mut compressed_bin_datas = Vec::new();
file.read_to_end(&mut compressed_bin_datas)?;
Ok(compressed_bin_datas)
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define possible requests to a KvPartsDB
use crate::compression::CompressionLevel;
use crate::kv_collection::KvPartsCollection;
use crate::responses::DbResponse;
use crate::{DbReqId, PartId};
use fnv::FnvHashSet;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::mpsc;
/// Datas format
#[derive(Debug, Copy, Clone)]
pub enum DatasFormat {
/// Compressed and serialize
SerAndZip,
/// Uncompressed and serialize
UnzipAndSer,
/// Uncompressed and deserialize
UnzipAndDeser,
}
/// Database Request
#[derive(Debug, Clone)]
pub struct DbRequestMsg<K, V, C, M>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
C: KvPartsCollection<K, V>,
{
/// Request id
pub req_id: DbReqId,
/// Request content
pub req_content: DbRequest<K, V, C>,
/// The response will be sent via this sender
pub send_res_to: mpsc::Sender<M>,
}
/// Database Request content
#[derive(Debug, Clone)]
pub enum DbRequest<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Read request
Read(DbReadRequest<K>),
/// Write request
Write(DbWriteRequest<K, V, C>),
/// Admin request
Admin(DbAdminRequest),
}
/// Database read request
#[derive(Debug, Clone)]
pub enum DbReadRequest<K>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
{
/// Get all parts ids
GetAllPartsIds,
/// Force the retention of specific parts in the cache
ForceCacheParts {
/// Parts identifiers
parts_id: FnvHashSet<PartId>,
},
/// Read values
ReadValues {
/// Keys
keys: Vec<K>,
},
/// Read parts
ReadParts {
/// Identifiers of the requested parts
parts_ids: Vec<PartId>,
/// Expected format
expected_format: DatasFormat,
},
/// Disable forcing retention of specific parts in the cache
UnforceCacheParts {
/// Parts identifiers
parts_id: Vec<PartId>,
},
}
/// Database write request
#[derive(Debug, Clone)]
pub enum DbWriteRequest<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Write values
Write {
/// Data to be written
datas: C,
/// Write in cache or write directly in backend
cache: bool,
/// Compression level (if None, use value in db settings)
compression: Option<CompressionLevel>,
},
/// Remove values
Remove {
/// Keys to be removed
keys: Vec<K>,
},
/// Save write cache in backend
Save,
/// Phantom
Phantom(PhantomData<K>, PhantomData<V>),
}
/// Database admin request
#[derive(Debug, Copy, Clone)]
pub enum DbAdminRequest {
/// Close database
Close,
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define possible responses to a DB request
use crate::errors::DbError;
use crate::kv_collection::KvPartsCollection;
use crate::{DbReqId, PartId};
use fnv::FnvHashSet;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
/// Database Response
#[derive(Debug)]
pub struct DbResponse<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Request id
pub req_id: DbReqId,
/// Request result
pub req_result: Result<DbResContent<K, V, C>, DbError>,
}
/// DB response content
#[derive(Debug, PartialEq, Eq)]
pub enum DbResContent<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Successfully load all DB parts in memory
LoadDbOk,
/// All parts ids
AllPartsIds(FnvHashSet<PartId>),
/// Values read or deleted
Values {
/// Found values
found: C,
/// List of requested keys that do not exist in the database
missing_keys: Vec<K>,
},
/// Values by part
ValuesByPart {
/// Found values by part
found: BTreeMap<PartId, C>,
/// List of requested keys that do not exist in the database
missing_keys: Vec<K>,
},
/// Parts datas
Parts(DbDatas<K, V, C>),
/// Successfully write datas
WriteOk,
/// Successfully save DB into backend
SaveOk,
/// Successfully close DB
CloseOk,
}
/// DB Datas
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DbDatas<K, V, C>
where
K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync,
V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
{
/// Compressed and serialize
SerAndZip(BTreeMap<PartId, Vec<u8>>),
/// Uncompressed and serialize
UnzipAndSer(BTreeMap<PartId, Vec<u8>>),
/// Uncompressed and deserialize
UnzipAndDeser(BTreeMap<PartId, C>),
/// Phantom
Phantom(PhantomData<K>, PhantomData<V>),
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define KvPartsDB settings
use crate::compression::CompressionLevel;
use std::num::NonZeroUsize;
use std::path::PathBuf;
#[derive(Debug, Default, Clone)]
/// Builder for DbSettings
pub struct DbSettingsBuilder {
/// Backend settings
backend: Option<DbSettingsBackend>,
/// Cache sixe (in number of items)
cache_max_size: Option<NonZeroUsize>,
/// Maximum number of values to clone in the cache per request
max_clone_values: Option<usize>,
/// Default compression level
default_compression: Option<CompressionLevel>,
// Number of workers in the pool processing requests to the database
workers_count: Option<usize>,
}
impl DbSettingsBuilder {
/// Create builder
pub fn new() -> Self {
DbSettingsBuilder {
backend: None,
cache_max_size: None,
max_clone_values: None,
default_compression: None,
workers_count: None,
}
}
/// Number of workers in the pool processing requests to the database
pub fn cache_max_size(&mut self, cache_max_size: NonZeroUsize) -> &mut Self {
self.cache_max_size = Some(cache_max_size);
self
}
/// Change maximum number of values to clone in the cache per request
pub fn max_clone_values(&mut self, max_clone_values: usize) -> &mut Self {
self.max_clone_values = Some(max_clone_values);
self
}
/// Change default compression level
pub fn default_compression(&mut self, default_compression: usize) -> &mut Self {
self.default_compression = Some(CompressionLevel::new(default_compression as u32));
self
}
/// Number of workers in the pool processing requests to the database
pub fn workers_count(&mut self, workers_count: usize) -> &mut Self {
self.workers_count = Some(workers_count);
self
}
/// Use file database
pub fn file_db(&mut self, dir_path: PathBuf) -> &mut Self {
self.backend = Some(DbSettingsBackend::new_file_backend(dir_path));
self
}
/// Use memory database
pub fn memory_db(&mut self) -> &mut Self {
self.backend = Some(DbSettingsBackend::Mem);
self
}
/// Change raw_cache_max_size param (file backend only)
pub fn raw_cache_max_size(&mut self, raw_cache_max_size_: usize) -> &mut Self {
if let Some(DbSettingsBackend::Files {
ref mut raw_cache_max_size,
..
}) = self.backend
{
*raw_cache_max_size = raw_cache_max_size_;
} else {
panic!("raw_cache_max_size parameter available only for file database !")
}
self
}
/// Change cache_all_db param (file backend only)
pub fn cache_all_db(&mut self, cache_all_db_: bool) -> &mut Self {
if let Some(DbSettingsBackend::Files {
ref mut cache_all_db,
..
}) = self.backend
{
*cache_all_db = cache_all_db_;
} else {
panic!("cache_all_db parameter available only for file database !")
}
self
}
/// Build DbSettings
pub fn build(&self) -> DbSettings {
DbSettings {
cache_max_size: self.cache_max_size,
max_clone_values: self
.max_clone_values
.unwrap_or(*crate::constants::DEFAULT_MAX_CLONED_VALUES),
default_compression: self
.default_compression
.unwrap_or_else(|| CompressionLevel::new(1u32)),
backend: self.backend.clone().unwrap_or(DbSettingsBackend::Mem),
workers_count: self.workers_count.unwrap_or_else(num_cpus::get),
}
}
}
#[derive(Debug, Clone)]
/// DB Settings
pub struct DbSettings {
/// Backend settings
pub backend: DbSettingsBackend,
/// Cache max sixe (in number of items)
/// None = infinite size
pub cache_max_size: Option<NonZeroUsize>,
/// Maximum number of values to clone in the cache per request
pub max_clone_values: usize,
/// Default compression level
pub default_compression: CompressionLevel,
/// Number of workers in the pool processing requests to the database
pub workers_count: usize,
}
impl DbSettings {
/// Default settings
pub fn default(dir_path: PathBuf) -> Self {
DbSettings {
backend: DbSettingsBackend::Files {
dir_path,
raw_cache_max_size: *crate::constants::DEFAULT_MAX_RAW_CACHE_SIZE,
cache_all_db: false,
},
cache_max_size: NonZeroUsize::new(*crate::constants::DEFAULT_MAX_CACHE_SIZE),
max_clone_values: *crate::constants::DEFAULT_MAX_CLONED_VALUES,
default_compression: CompressionLevel::new(1u32),
workers_count: num_cpus::get(),
}
}
}
/// KvPartsDb backend
#[derive(Debug, Clone)]
pub enum DbSettingsBackend {
/// Files database
Files {
/// Path to directory store database files
dir_path: PathBuf,
/// Raw cache max size (in number of parts)
raw_cache_max_size: usize,
/// Cache all database in memory (false by default)
/// The maximum cache size is not considered
cache_all_db: bool,
},
/// Memory database
Mem,
}
impl DbSettingsBackend {
fn new_file_backend(dir_path: PathBuf) -> Self {
DbSettingsBackend::Files {
dir_path,
raw_cache_max_size: *crate::constants::DEFAULT_MAX_RAW_CACHE_SIZE,
cache_all_db: false,
}
}
/// Get directory path
pub fn get_dir_path(&self) -> Option<PathBuf> {
if let DbSettingsBackend::Files { dir_path, .. } = self {
Some(dir_path.clone())
} else {
None
}
}
}
// Copyright (C) 2019 Éloïs SANCHEZ
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Manage custom threadpool for database.
pub mod workers;
pub mod workers_msg;
use crate::errors::DbError;
use crate::kv_collection::KvPartsCollection;
use crate::orchestrator::DbOrchestratorMsg;
use crate::responses::{DbResContent, DbResponse};
use crate::threadpool::workers_msg::{WorkerReq, WorkerReqContent};
use crate::{DbReqId, PartId};
use core_affinity::CoreId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::cmp;
use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::mpsc::{SendError, Sender};
use std::thread;
use std::thread::JoinHandle;
#[derive(Debug)]
pub struct ThreadPool<K, V, C, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
{
workers_senders: Vec<Sender<WorkerReq<K, V, C>>>,
workers_handlers: Vec<JoinHandle<()>>,
available_workers: VecDeque<usize>,
pending_jobs: VecDeque<WorkerReq<K, V, C>>,
phantom: PhantomData<M>,
}
impl<K, V, C, M> ThreadPool<K, V, C, M>
where
K: 'static
+ Clone
+ Debug
+ DeserializeOwned
+ Eq
+ Into<PartId>
+ Send
+ Serialize
+ Sized
+ Sync,
V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync,
C: 'static + Clone + KvPartsCollection<K, V>,
M: 'static
+ Debug
+ From<DbResponse<K, V, C>>
+ Into<Option<DbResponse<K, V, C>>>
+ Send
+ Sync,
{
pub fn new(
workers_count: usize,
db_dir_path: Option<PathBuf>,
orchestrator_sender: Sender<DbOrchestratorMsg<K, V, C, M>>,
) -> Self {
let core_ids: Vec<Option<CoreId>> = if let Some(core_ids) = core_affinity::get_core_ids() {
core_ids.into_iter().map(Some).collect()
} else {
(0..num_cpus::get()).map(|_| None).collect()
};
let cpus_count = core_ids.len();
let mut workers_senders = Vec::with_capacity(workers_count);
let mut workers_handlers = Vec::with_capacity(workers_count);
for i in 0..workers_count {
let dir_path = db_dir_path.clone();
let orchestrator_sender_clone = orchestrator_sender.clone();
let (sender, recv) = std::sync::mpsc::channel();
workers_senders.push(sender);
let core_id = core_ids[i % cpus_count];
let thread_handler = thread::Builder::new()
.name(format!("KvPartsDB worker {}", i))
.spawn(move || {
crate::threadpool::workers::launch_worker::<K, V, C, M>(
i,
core_id,
dir_path,
orchestrator_sender_clone,
recv,
)
})
.expect("Fatal error: fail to spawn worker thread !");
workers_handlers.push(thread_handler);
}
ThreadPool {
workers_senders,
workers_handlers,
available_workers: (0..workers_count).collect(),
pending_jobs: VecDeque::new(),
phantom: PhantomData,
}
}
pub fn new_jobs(
&mut self,
pending_reqs_senders: &mut BTreeMap<DbReqId, Sender<M>>,
req_id: DbReqId,
jobs: Vec<WorkerReq<K, V, C>>,
) {
self.pending_jobs.append(&mut VecDeque::from(jobs));
if self.launch_pending_jobs().is_err() {
let _ = pending_reqs_senders
.remove(&req_id)
.unwrap_or_else(|| panic!(dbg!("dev error")))
.send(
DbResponse {
req_id,
req_result: Err(DbError::SendReqError),
}
.into(),
);
}
}
#[inline]
pub fn worker_available(
&mut self,
worker_id: usize,
) -> Result<(), SendError<WorkerReq<K, V, C>>> {
self.available_workers.push_back(worker_id);
self.launch_pending_jobs()
}
fn launch_pending_jobs(&mut self) -> Result<(), SendError<WorkerReq<K, V, C>>> {
let count_launchable_jobs = cmp::min(self.pending_jobs.len(), self.available_workers.len());
for _ in 0..count_launchable_jobs {
let worker_id = self
.available_workers