diff --git a/Cargo.lock b/Cargo.lock index b3436f1b58991546d1abe6b5805f242901383147..1f372cd0ac579d51259a7c878805bbeca08156b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,6 +7,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +name = "adler32" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "ansi_term" @@ -137,6 +140,11 @@ dependencies = [ "byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "build_const" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "byte-tools" version = "0.2.0" @@ -203,6 +211,33 @@ name = "constant_time_eq" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "core_affinity" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crc32fast" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-deque" version = "0.2.0" @@ -642,6 +677,17 @@ name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "flate2" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "miniz-sys 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "fnv" version = "1.0.6" @@ -782,6 +828,20 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "kv-parts-dbs" +version = "0.1.0-a0.1" +dependencies = [ + "bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.86 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "lazy_static" version = "1.2.0" @@ -836,6 +896,34 @@ name = "memoffset" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "miniz-sys" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cc 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "miniz_oxide" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "miniz_oxide_c_api" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cc 1.0.28 (registry+https://github.com/rust-lang/crates.io-index)", + "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "miniz_oxide 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "mio" version = "0.6.16" @@ -1593,6 +1681,7 @@ dependencies = [ [metadata] "checksum aho-corasick 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e6f484ae0c99fec2e858eb6134949117399f222608d84cadb3f58c1f97c2364c" +"checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum argon2rs 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3f67b0b6a86dae6e67ff4ca2b6201396074996379fba2b92ff649126f37cb392" "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee" @@ -1609,6 +1698,7 @@ dependencies = [ "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "5d6d530bdd2d52966a6d03b7a964add7ae1a288d25214066fd4b600f0f796400" "checksum block-buffer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a076c298b9ecdb530ed9d967e74a6027d6a7478924520acddcddc24c1c8ab3ab" +"checksum build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39" "checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40" "checksum byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "94f88df23a25417badc922ab0f5716cc1330e87f71ddd9203b3a3ccd9cedf75d" "checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa" @@ -1618,6 +1708,9 @@ dependencies = [ "checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" +"checksum core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6d162c6e463c31dbf78fefa99d042156c1c74d404e299cfe3df2923cb857595b" +"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" "checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" "checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150" "checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" @@ -1628,6 +1721,7 @@ dependencies = [ "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +"checksum flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f87e68aa82b2de08a6e037f1385455759df6e445a8df5e005b4297191dbf18aa" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" @@ -1655,6 +1749,9 @@ dependencies = [ "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2efc7bc57c883d4a4d6e3246905283d8dae951bb3bd32f49d6ef297f546e1c39" "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" +"checksum miniz-sys 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0300eafb20369952951699b68243ab4334f4b10a88f411c221d444b36c40e649" +"checksum miniz_oxide 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c468f2369f07d651a5d0bb2c9079f8488a66d5466efe42d0c5c6466edcb7f71e" +"checksum miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b7fe927a42e3807ef71defb191dc87d4e24479b221e67015fe38ae2b7b447bab" "checksum mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "71646331f2619b1026cc302f87a2b8b648d5c6dd6937846a16cc8ce0f347f432" "checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" diff --git a/Cargo.toml b/Cargo.toml index c15c5c204603cb3cb5c7f97307a3d9e63fb01b19..f27534268bb6141b0992375eb3f273a130cc01c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "lib/tests-tools/documents-tests-tools", "lib/tests-tools/common-tests-tools", "lib/tools/crypto", + "lib/tools/kv-parts-dbs", "lib/tools/common-tools", "lib/tools/documents", "lib/tools/json-pest-parser", diff --git a/lib/tools/kv-parts-dbs/Cargo.toml b/lib/tools/kv-parts-dbs/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..361a13c0832957689176ae57885a442a51d2ffc3 --- /dev/null +++ b/lib/tools/kv-parts-dbs/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "kv-parts-dbs" +version = "0.1.0-a0.1" +authors = ["librelois <elois@ifee.fr>"] +description = "Manage key-value databases persisted in compressed binary files." +license = "AGPL-3.0" +edition = "2018" + +[lib] +path = "src/lib.rs" + +[dependencies] +bincode = "1.0.1" +core_affinity = "0.5.9" +flate2 = "1.0.7" +fnv = "1.0.6" +log = "0.4.*" +num_cpus = "1.8.*" +rayon = "1.0.*" +serde = { version = "1.0.*", features = ["derive"] } + +[dev-dependencies] diff --git a/lib/tools/kv-parts-dbs/README.md b/lib/tools/kv-parts-dbs/README.md new file mode 100644 index 0000000000000000000000000000000000000000..da98c31418ec96272fafec305f10bc96791b4404 --- /dev/null +++ b/lib/tools/kv-parts-dbs/README.md @@ -0,0 +1,3 @@ +# KV Parts DBs + +Manage key-value databases persisted in compressed binary files. \ No newline at end of file diff --git a/lib/tools/kv-parts-dbs/src/caches.rs b/lib/tools/kv-parts-dbs/src/caches.rs new file mode 100644 index 0000000000000000000000000000000000000000..c14db7f82298ed36ab0d04fd43395926c1529316 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/caches.rs @@ -0,0 +1,48 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Define KvPartsDB caches + +use crate::kv_collection::KvPartsCollection; +use crate::PartId; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::fmt::Debug; +use std::marker::PhantomData; + +/// Database caches +#[derive(Debug)] +pub struct KvPartsDbCaches<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Cache + pub cache: HashMap<PartId, C>, + /// Cache size (in number of items) + pub cache_size: usize, + /// Raw cache + pub raw_cache: HashMap<PartId, Vec<u8>>, + /// Queue of the part identifiers that have been read + pub parts_have_been_read: VecDeque<PartId>, + /// Identifiers of the parts to be written + pub parts_to_be_written: BTreeSet<PartId>, + /// Phantom key + pub phantom_key: PhantomData<K>, + // Phantom value + pub phantom_value: PhantomData<V>, +} diff --git a/lib/tools/kv-parts-dbs/src/compression.rs b/lib/tools/kv-parts-dbs/src/compression.rs new file mode 100644 index 0000000000000000000000000000000000000000..a1e6aea1ab29c2c076ae2d3e52d631210a1a730e --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/compression.rs @@ -0,0 +1,72 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Manage databases compression. + +use flate2::bufread::GzDecoder; +use flate2::bufread::GzEncoder; +use flate2::Compression as FlateCompression; +use std::io::prelude::*; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +/// Describe the compression level with an explicitly specified integer. +/// The integer here is typically on a scale of 0-9 where 0 means "no compression" and 9 means "take as long as you'd like". +pub struct CompressionLevel(u32); + +impl CompressionLevel { + /// Create CompressionLevel + pub fn new(level: u32) -> Self { + if level < 10 { + CompressionLevel(level) + } else { + panic!("CompressionLevel::new() : level must be between 0 and 9 !") + } + } + /// Get compression level + pub fn level(self) -> u32 { + self.0 + } +} + +impl Default for CompressionLevel { + fn default() -> CompressionLevel { + CompressionLevel(1) + } +} + +impl Into<FlateCompression> for CompressionLevel { + fn into(self) -> FlateCompression { + FlateCompression::new(self.0) + } +} + +/// Compress binary datas (use gzip) +pub fn compress_binary_datas( + datas: &[u8], + compression: CompressionLevel, +) -> std::io::Result<Vec<u8>> { + let mut gz = GzEncoder::new(datas, compression.into()); + let mut buffer = Vec::new(); + gz.read_to_end(&mut buffer)?; + Ok(buffer) +} + +/// Uncompress gzip binary datas +pub fn uncompress_binary_datas(datas: &[u8]) -> std::io::Result<Vec<u8>> { + let mut gz = GzDecoder::new(datas); + let mut buffer = Vec::new(); + gz.read_to_end(&mut buffer)?; + Ok(buffer) +} diff --git a/lib/tools/kv-parts-dbs/src/constants.rs b/lib/tools/kv-parts-dbs/src/constants.rs new file mode 100644 index 0000000000000000000000000000000000000000..18c91dace193a5614c1751606c353dc8e1526727 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/constants.rs @@ -0,0 +1,29 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KvPartsDB Constants + +/// Max cache sixe (in number of items) +pub static DEFAULT_MAX_CACHE_SIZE: &'static usize = &10_000; + +/// Max raw cache sixe (in number of parts) +pub static DEFAULT_MAX_RAW_CACHE_SIZE: &'static usize = &2_000; + +/// Clone requested values to cache ? +pub static DEFAULT_MAX_CLONED_VALUES: &'static usize = &0; + +pub static PART_FILE_NAME_BEGIN: &'static str = "_"; +pub static PART_FILE_NAME_END: &'static str = ".bin.gz"; +pub static PART_FILE_NAME_MIN_LEN: &'static usize = &9; diff --git a/lib/tools/kv-parts-dbs/src/errors.rs b/lib/tools/kv-parts-dbs/src/errors.rs new file mode 100644 index 0000000000000000000000000000000000000000..4aa62434152dd57a5cd506dccb1e6a13666b06d3 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/errors.rs @@ -0,0 +1,58 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Define possible DB Errors + +use crate::readers::DbReadError; +use crate::writers::DbWriteError; + +/// DB Error +#[derive(Debug)] +pub enum DbError { + /// Spawn error + SpawnError(std::io::Error), + /// DB read error + DbReadError(DbReadError), + /// DB write error + DbWriteError(DbWriteError), + /// Need save before execute request + NeedSave, + /// Error when send request + SendReqError, + /// Error when closing the DB + CloseError, + /// The database is no longer reachable + UnreachableError, + /// Forbidden request + ForbiddenRequest, +} + +impl From<DbReadError> for DbError { + fn from(err: DbReadError) -> Self { + DbError::DbReadError(err) + } +} + +impl From<DbWriteError> for DbError { + fn from(err: DbWriteError) -> Self { + DbError::DbWriteError(err) + } +} + +impl From<std::io::Error> for DbError { + fn from(err: std::io::Error) -> Self { + DbError::SpawnError(err) + } +} diff --git a/lib/tools/kv-parts-dbs/src/impls/hash_db.rs b/lib/tools/kv-parts-dbs/src/impls/hash_db.rs new file mode 100644 index 0000000000000000000000000000000000000000..5029924d194476fd196a662ef84abed7c65c79cf --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/impls/hash_db.rs @@ -0,0 +1,106 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KbPartsDB implementation using HashMap to store collection + +use crate::kv_collection::KvPartsCollection; +use crate::responses::DbResponse; +use crate::{AbstractKvPartsDb, PartId}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::HashMap; +use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; + +/// HashMap Database +#[derive(Debug, Clone)] +pub struct KvPartsHashDB<K, V, S, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Hash + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + S: Default + std::hash::BuildHasher + Send + Sync, + M: 'static + + Debug + + From<DbResponse<K, V, HashMap<K, V, S>>> + + Into<Option<DbResponse<K, V, HashMap<K, V, S>>>> + + Send + + Sync, +{ + /// Phantom key + phantom_key: PhantomData<K>, + /// Phantom value + phantom_value: PhantomData<V>, + /// Phantom msg + phantom_msg: PhantomData<M>, + /// Phantom hasher + phantom_hasher: PhantomData<S>, +} + +impl<K, V, S, M> AbstractKvPartsDb<K, V, HashMap<K, V, S>, M> for KvPartsHashDB<K, V, S, M> +where + K: 'static + + Clone + + Debug + + DeserializeOwned + + Eq + + Hash + + Into<PartId> + + Send + + Serialize + + Sized + + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + S: 'static + Clone + Default + std::hash::BuildHasher + Send + Sync, + M: 'static + + Debug + + From<DbResponse<K, V, HashMap<K, V, S>>> + + Into<Option<DbResponse<K, V, HashMap<K, V, S>>>> + + Send + + Sync, +{ +} + +impl<K, V, S> KvPartsCollection<K, V> for HashMap<K, V, S> +where + K: Debug + DeserializeOwned + Eq + Hash + Send + Serialize + Sync, + V: Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + S: Default + std::hash::BuildHasher + Send + Sync, +{ + #[inline] + fn new() -> Self { + HashMap::with_hasher(S::default()) + } + + #[inline] + fn contains_key(&self, key: &K) -> bool { + self.contains_key(key) + } + + #[inline] + fn insert(&mut self, key: K, value: V) -> Option<V> { + self.insert(key, value) + } + + #[inline] + fn get(&self, key: &K) -> Option<&V> { + self.get(key) + } + + #[inline] + fn remove(&mut self, key: &K) -> Option<V> { + self.remove(key) + } +} diff --git a/lib/tools/kv-parts-dbs/src/impls/mod.rs b/lib/tools/kv-parts-dbs/src/impls/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..2a42526ad9b84518b5f2364c442a5d5c668213d2 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/impls/mod.rs @@ -0,0 +1,19 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KbPartsDB implementations + +pub mod hash_db; +pub mod tree_db; diff --git a/lib/tools/kv-parts-dbs/src/impls/tree_db.rs b/lib/tools/kv-parts-dbs/src/impls/tree_db.rs new file mode 100644 index 0000000000000000000000000000000000000000..5d53dc151a7a54216bc56c3fdfe49764b429436e --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/impls/tree_db.rs @@ -0,0 +1,100 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KbPartsDB implementation using BTreeMap to store collection + +use crate::kv_collection::KvPartsCollection; +use crate::responses::DbResponse; +use crate::{AbstractKvPartsDb, PartId}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::marker::PhantomData; + +/// BTreeMap Database +#[derive(Debug, Clone)] +pub struct KvPartsTreeDB<K, V, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Ord + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + M: 'static + + Debug + + From<DbResponse<K, V, BTreeMap<K, V>>> + + Into<Option<DbResponse<K, V, BTreeMap<K, V>>>> + + Send + + Sync, +{ + /// Phantom key + phantom_key: PhantomData<K>, + /// Phantom value + phantom_value: PhantomData<V>, + /// Phantom msg + phantom_msg: PhantomData<M>, +} + +impl<K, V, M> AbstractKvPartsDb<K, V, BTreeMap<K, V>, M> for KvPartsTreeDB<K, V, M> +where + K: 'static + + Clone + + Debug + + DeserializeOwned + + Eq + + Ord + + Into<PartId> + + Send + + Serialize + + Sized + + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + M: 'static + + Debug + + From<DbResponse<K, V, BTreeMap<K, V>>> + + Into<Option<DbResponse<K, V, BTreeMap<K, V>>>> + + Send + + Sync, +{ +} + +impl<K, V> KvPartsCollection<K, V> for BTreeMap<K, V> +where + K: Debug + DeserializeOwned + Eq + Ord + Send + Serialize + Sync, + V: Debug + DeserializeOwned + Eq + Send + Serialize + Sync, +{ + #[inline] + fn new() -> Self { + BTreeMap::new() + } + + #[inline] + fn contains_key(&self, key: &K) -> bool { + self.contains_key(key) + } + + #[inline] + fn insert(&mut self, key: K, value: V) -> Option<V> { + self.insert(key, value) + } + + #[inline] + fn get(&self, key: &K) -> Option<&V> { + self.get(key) + } + + #[inline] + fn remove(&mut self, key: &K) -> Option<V> { + self.remove(key) + } +} diff --git a/lib/tools/kv-parts-dbs/src/kv_collection.rs b/lib/tools/kv-parts-dbs/src/kv_collection.rs new file mode 100644 index 0000000000000000000000000000000000000000..d711f688bb08fa85c3ccbb5fe959b6f1fbb72330 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/kv_collection.rs @@ -0,0 +1,57 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Define KvPart map trait and implement it to some collections + +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::iter::{FromIterator, IntoIterator}; + +/// KvParts Map +pub trait KvPartsCollection<K, V>: + Debug + + DeserializeOwned + + Eq + + Extend<(K, V)> + + FromIterator<(K, V)> + + IntoIterator<Item = (K, V)> + + Send + + Serialize + + Sync +{ + /// Instantiate new empty collection + fn new() -> Self; + + /// Return true if the map contains a value for the specified key + fn contains_key(&self, key: &K) -> bool; + + /// Insert a key-value pair into the map. An existing value for a + /// key is replaced by the new value. + /// + /// If the map did not have this key present, `None` is returned. + /// + /// If the map did have this key present, the value is updated, and the old + /// value is returned. + fn insert(&mut self, key: K, value: V) -> Option<V>; + + /// Returns a reference to the value corresponding to the key. + /// The key may be any borrowed form of the map's key type. + fn get(&self, key: &K) -> Option<&V>; + + /// Remove a key-value pair from the map. Return the value if the key + /// was present in the map, otherwise return None. + fn remove(&mut self, key: &K) -> Option<V>; +} diff --git a/lib/tools/kv-parts-dbs/src/lib.rs b/lib/tools/kv-parts-dbs/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..68ab23c36ed9f1b878f76e1a62e48e4f85cfb152 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/lib.rs @@ -0,0 +1,479 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Manage databases persisted in compressed binary files. + +#![deny( + missing_docs, + missing_debug_implementations, + missing_copy_implementations, + trivial_casts, + trivial_numeric_casts, + unsafe_code, + unstable_features, + unused_import_braces, + unused_qualifications +)] + +#[macro_use] +extern crate log; + +mod caches; +pub mod compression; +mod constants; +pub mod errors; +pub mod impls; +pub mod kv_collection; +mod orchestrator; +mod readers; +pub mod requests; +pub mod responses; +pub mod settings; +mod threadpool; +mod writers; + +use crate::caches::KvPartsDbCaches; +use crate::errors::DbError; +use crate::kv_collection::KvPartsCollection; +use crate::orchestrator::{DbOrchestratorMsg, Orchestrator}; +pub use crate::readers::DbReadError; +use crate::requests::{DbAdminRequest, DbReadRequest, DbRequest, DbRequestMsg, DbWriteRequest}; +use crate::responses::DbResponse; +use crate::settings::{DbSettings, DbSettingsBackend}; +pub use crate::writers::DbWriteError; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::mpsc; +use std::thread; + +/// Database files extension +pub static PART_EXT: &'static str = ".bin.gz"; + +/// DB part id +#[derive(Copy, Clone, Debug, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Serialize)] +pub struct PartId(pub u32); + +impl PartId { + fn to_file_name(self) -> String { + format!("_{}{}", self.0, PART_EXT) + } +} + +/// DB request id +#[derive(Copy, Clone, Debug, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Serialize)] +pub struct DbReqId(pub usize); + +/// Abstract KvParts database +pub trait AbstractKvPartsDb<K, V, C, M> +where + K: 'static + Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, + Self: Sized, +{ + /// Open database + fn open(settings: DbSettings) -> Result<KvPartsDB<K, V, C, M>, DbError> { + let orchestrator_sender = Self::prepare_open(settings)?; + + Ok(KvPartsDB::new(orchestrator_sender)) + } + /// Prepare open database + fn prepare_open( + settings: DbSettings, + ) -> Result<mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>, DbError> { + let raw_cache = match settings.backend { + DbSettingsBackend::Mem => HashMap::new(), + DbSettingsBackend::Files { + raw_cache_max_size, .. + } => HashMap::with_capacity(raw_cache_max_size), + }; + + let db_caches = KvPartsDbCaches::<K, V, C> { + cache: HashMap::new(), + cache_size: 0, + raw_cache, + parts_have_been_read: VecDeque::new(), + parts_to_be_written: BTreeSet::new(), + phantom_key: PhantomData, + phantom_value: PhantomData, + }; + + let (orchestrator_sender, orchestrator_recv) = mpsc::channel(); + + let orchestrator_sender_clone = orchestrator_sender.clone(); + let _thread_handler = thread::Builder::new() + .name("KvPartsDB orchestrator".to_owned()) + .spawn(move || { + Orchestrator::new( + orchestrator_sender_clone, + orchestrator_recv, + settings, + db_caches, + ) + .start_main_loop(); + })?; + Ok(orchestrator_sender) + } +} + +/// Key Value parts Database +#[derive(Debug)] +pub struct KvPartsDB<K, V, C, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + orchestrator_sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>, +} + +impl<K, V, C, M> KvPartsDB<K, V, C, M> +where + K: 'static + + Clone + + Debug + + DeserializeOwned + + Eq + + Into<PartId> + + Send + + Serialize + + Sized + + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + KvPartsCollection<K, V>, + M: Debug + From<DbResponse<K, V, C>> + Into<Option<DbResponse<K, V, C>>> + Send + Sync, +{ + /// Close database + pub fn close(self) -> Result<(), DbError> { + let (send_res_to, recv) = mpsc::channel(); + + if self + .orchestrator_sender + .send(DbOrchestratorMsg::Req(DbRequestMsg { + req_id: DbReqId(0), + req_content: DbRequest::Admin(DbAdminRequest::Close), + send_res_to, + })) + .is_err() + { + Err(DbError::SendReqError) + } else if let Ok(msg) = recv.recv() { + msg.into().unwrap().req_result?; + Ok(()) + } else { + Err(DbError::UnreachableError) + } + } + + /// Get db reader + pub fn get_reader(&self) -> KvPartsDBReader<K, V, C, M> { + KvPartsDBReader::new(self.orchestrator_sender.clone()) + } + + /// Get db writer + pub fn get_writer(&self) -> KvPartsDBWriter<K, V, C, M> { + KvPartsDBWriter::new(self.orchestrator_sender.clone()) + } + + /// Instantiate administrator + fn new(orchestrator_sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) -> Self { + KvPartsDB { + orchestrator_sender, + } + } +} + +/// Database reader +#[derive(Debug, Clone)] +pub struct KvPartsDBReader<K, V, C, M>(mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync; + +impl<K, V, C, M> KvPartsDBReader<K, V, C, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: Debug + From<DbResponse<K, V, C>> + Into<Option<DbResponse<K, V, C>>> + Send + Sync, +{ + /// Instantiate reader + fn new(sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) -> Self { + KvPartsDBReader(sender) + } + + /// Send read request + pub fn send_request( + &self, + req_id: DbReqId, + req_content: DbReadRequest<K>, + send_res_to: mpsc::Sender<M>, + ) -> Result<(), DbError> { + self.0 + .send(DbOrchestratorMsg::Req(DbRequestMsg { + req_id, + req_content: DbRequest::Read(req_content), + send_res_to, + })) + .map_err(|_| DbError::SendReqError) + } +} + +/// Database writer +#[derive(Debug, Clone)] +pub struct KvPartsDBWriter<K, V, C, M>(mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync; + +impl<K, V, C, M> KvPartsDBWriter<K, V, C, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sized + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: Debug + From<DbResponse<K, V, C>> + Into<Option<DbResponse<K, V, C>>> + Send + Sync, +{ + /// Instantiate writer + fn new(sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>) -> Self { + KvPartsDBWriter(sender) + } + + /// Send read request + pub fn send_request( + &self, + req_id: DbReqId, + req_content: DbWriteRequest<K, V, C>, + send_res_to: mpsc::Sender<M>, + ) -> Result<(), DbError> { + self.0 + .send(DbOrchestratorMsg::Req(DbRequestMsg { + req_id, + req_content: DbRequest::Write(req_content), + send_res_to, + })) + .map_err(|_| DbError::SendReqError) + } +} + +/*impl DbBackend { + + /// is memory DB + pub fn is_memory_db(&self) -> bool { + if let BinDb::Mem { .. } = self { + true + } else { + false + } + } + + /// Open new memory database + pub fn new_memory() -> BinDb { + BinDb::Mem { + parts_datas: HashMap::new(), + } + } + + /// Open database + pub fn open(dir_path: PathBuf) -> std::io::Result<BinDb> { + if !dir_path.as_path().exists() { + if let Err(io_error) = std::fs::create_dir(dir_path.as_path()) { + if io_error.kind() != std::io::ErrorKind::AlreadyExists { + error!( + "Impossible to create DB dir: {} !", + dir_path.as_path().display() + ); + return Err(io_error); + } + } + } + + Ok(BinDb::File { dir_path }) + } + + /// Get parts ids + pub fn get_parts_ids(&self, expected_parts_names: HashSet<&'static str>) -> Vec<PartId> { + match self { + BinDb::File { dir_path } => { + let file_list_result = std::fs::read_dir(dir_path.as_path()); + if file_list_result.is_err() { + fatal_error("Fail to open bc-archiver database !"); + } + let file_list_result = file_list_result.unwrap(); + let mut parts_ids = Vec::new(); + + for dir_entry in file_list_result { + if let Ok(dir_entry) = dir_entry { + if let Ok(file_name) = dir_entry.file_name().into_string() { + if let Ok(file_type) = dir_entry.file_type() { + if file_type.is_file() { + let file_name_len = file_name.len(); + let part_number_result: Result<usize, std::num::ParseIntError> = + file_name[1..file_name_len - PART_EXT.len()].parse(); + + if let Ok(part_number) = part_number_result { + parts_ids.push(PartId::Int(part_number)); + } else if let Some(part_name) = + expected_parts_names.get(&file_name[..file_name_len - PART_EXT.len()]) { + parts_ids.push(PartId::Str(part_name)); + } + } + } + } + } + } + + parts_ids + } + BinDb::Mem { parts_datas } => { + parts_datas.keys().cloned().collect() + } + } + } + + /// Write one part + #[inline] + pub fn write_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &mut self, + part_id: PartId, + datas: &D, + compression: CompressionLevel, + ) -> Result<(), DbWriteError> { + writers::write_db_part(self, part_id, datas, compression) + } + + /// Write async one part + #[inline] + pub fn write_async_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &mut self, + part_id: PartId, + datas: &D, + compression: CompressionLevel, + sender: mpsc::Sender<DbWriteRes>, + req_id: DbReqId, + ) { + if let Err(err) = sender.send(DbWriteRes( + req_id, + writers::write_db_part(self, part_id, datas, compression), + )) { + error!("write_async_part(): fail to send response: {}", err) + } + } + + /// Write several parts + #[inline] + pub fn write_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &mut self, + datas: &HashMap<PartId, D>, + compression: CompressionLevel, + ) -> Result<(), DbWriteError> { + writers::write_db_parts(self, datas, compression) + } + + /// Write several parts + #[inline] + pub fn write_async_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &mut self, + datas: &HashMap<PartId, D>, + compression: CompressionLevel, + sender: mpsc::Sender<DbWriteRes>, + req_id: DbReqId, + ) -> Result<(), mpsc::SendError<DbWriteRes>> { + let req_result = writers::write_db_parts(self, datas, compression); + + sender.send(DbWriteRes(req_id, req_result)) + } + + /// Read one part + #[inline] + pub fn read_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &self, + part_id: PartId, + unzip_and_deser: bool, + ) -> Result<DbDatas<D>, DbReadError> { + readers::read_db_part(self, part_id, unzip_and_deser) + } + + /// Read async one part + #[inline] + pub fn read_async_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &self, + part_id: PartId, + sender: mpsc::Sender<DbReadRes<D>>, + req_id: DbReqId, + unzip_and_deser: bool, + ) -> Result<(), mpsc::SendError<DbReadRes<D>>> { + let req_result = readers::read_db_part(&self, part_id, unzip_and_deser).map(|datas| { + let mut response = HashMap::with_capacity(1); + response.insert(part_id, datas); + response + }); + + sender.send(DbReadRes(req_id, req_result)) + } + + /// Read several parts + #[inline] + pub fn read_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &mut self, + parts_ids: &[PartId], + unzip_and_deser: bool, + ) -> Result<HashMap<PartId, DbDatas<D>>, DbReadError> { + readers::read_db_parts(self, parts_ids, unzip_and_deser) + } + + /// Asynchronous reading of several parts + #[inline] + pub fn read_async_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + &mut self, + parts_ids: &[PartId], + sender: mpsc::Sender<DbReadRes<D>>, + req_id: DbReqId, + unzip_and_deser: bool, + ) -> Result<(), mpsc::SendError<DbReadRes<D>>> { + sender.send(DbReadRes( + req_id, + readers::read_db_parts(self, parts_ids, unzip_and_deser), + )) + } +}*/ diff --git a/lib/tools/kv-parts-dbs/src/orchestrator/mod.rs b/lib/tools/kv-parts-dbs/src/orchestrator/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..5e1c2ce493e04816bc3c599fa400974e5cc8598b --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/orchestrator/mod.rs @@ -0,0 +1,463 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Manage orchestrator thread of a DB + +use crate::caches::KvPartsDbCaches; +use crate::compression::CompressionLevel; +use crate::errors::DbError; +use crate::kv_collection::KvPartsCollection; +use crate::requests::{DbAdminRequest, DbReadRequest, DbRequest, DbRequestMsg, DbWriteRequest}; +use crate::responses::{DbResContent, DbResponse}; +use crate::settings::{DbSettings, DbSettingsBackend}; +use crate::threadpool::workers_msg::{WorkerReq, WorkerReqContent, WorkerReqResult, WorkerRes}; +use crate::threadpool::ThreadPool; +use crate::{DbReqId, PartId}; +use fnv::FnvHashSet; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::path::PathBuf; +use std::sync::mpsc::{Receiver, Sender}; + +#[derive(Debug)] +pub enum DbOrchestratorMsg<K, V, C, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + Req(DbRequestMsg<K, V, C, M>), + WorkerRes(WorkerRes<K, V, C>), +} + +pub struct Orchestrator<K, V, C, M> +where + K: 'static + Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + Debug + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + all_parts_ids: Option<FnvHashSet<PartId>>, + caches: KvPartsDbCaches<K, V, C>, + dir_path: Option<PathBuf>, + force_cache_parts_ids: FnvHashSet<PartId>, + get_all_parts_ids: Vec<DbReqId>, + pending_read_reqs_remaining_keys_by_parts: BTreeMap<DbReqId, BTreeMap<PartId, Vec<K>>>, + pending_reqs_remaining_parts: BTreeMap<DbReqId, FnvHashSet<PartId>>, + pending_reqs_senders: BTreeMap<DbReqId, Sender<M>>, + pool: ThreadPool<K, V, C, M>, + raw_parts_being_extracted: FnvHashSet<PartId>, + recv: Receiver<DbOrchestratorMsg<K, V, C, M>>, + settings: DbSettings, +} + +impl<K, V, C, M> Orchestrator<K, V, C, M> +where + K: 'static + Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + Debug + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + pub fn new( + sender: Sender<DbOrchestratorMsg<K, V, C, M>>, + recv: Receiver<DbOrchestratorMsg<K, V, C, M>>, + settings: DbSettings, + caches: KvPartsDbCaches<K, V, C>, + ) -> Orchestrator<K, V, C, M> { + let dir_path = settings.backend.get_dir_path(); + let pool = ThreadPool::new(settings.workers_count, dir_path.clone(), sender); + Orchestrator { + all_parts_ids: None, + caches, + dir_path, + force_cache_parts_ids: FnvHashSet::default(), + get_all_parts_ids: Vec::new(), + pending_read_reqs_remaining_keys_by_parts: BTreeMap::new(), + pending_reqs_remaining_parts: BTreeMap::new(), + pending_reqs_senders: BTreeMap::new(), + pool, + raw_parts_being_extracted: FnvHashSet::default(), + recv, + settings, + } + } + + pub fn start_main_loop(mut self) { + let close_req_id; + self.load_datas(); + loop { + match self.recv.recv() { + Ok(DbOrchestratorMsg::Req(DbRequestMsg { + req_id, + req_content, + send_res_to, + })) => { + trace!("KvPartDB orchestrator receive request : {:?}", req_content); + self.pending_reqs_senders.insert(req_id, send_res_to); + match req_content { + DbRequest::Admin(DbAdminRequest::Close) => { + close_req_id = Some(req_id); + break; + } + DbRequest::Read(read_req) => match read_req { + DbReadRequest::GetAllPartsIds => { + if self.all_parts_ids.is_none() { + if self.dir_path.is_some() { + self.get_all_parts_ids.push(req_id); + self.pool.new_jobs( + &mut self.pending_reqs_senders, + req_id, + vec![WorkerReq { + part_id: PartId(0), + req_id, + req_content: WorkerReqContent::ListAllPartsIds, + }], + ); + } else { + self.all_parts_ids = + Some(self.caches.raw_cache.keys().cloned().collect()); + } + } + if let Some(ref all_parts_ids) = self.all_parts_ids { + self.send_response( + req_id, + Ok(DbResContent::AllPartsIds(all_parts_ids.clone())), + ); + } + } + DbReadRequest::ForceCacheParts { parts_id } => { + self.force_cache_parts_ids = self + .force_cache_parts_ids + .union(&parts_id) + .cloned() + .collect(); + let missing_parts_ids: Vec<PartId> = parts_id + .into_iter() + .filter(|part_id| !self.caches.cache.contains_key(part_id)) + .collect(); + self.get_parts_datas(req_id, missing_parts_ids); + } + DbReadRequest::UnforceCacheParts { parts_id } => { + for part_id in parts_id { + self.force_cache_parts_ids.remove(&part_id); + } + } + DbReadRequest::ReadValues { keys } => self.read_values(req_id, keys), + DbReadRequest::ReadParts { .. } => {} + }, + DbRequest::Write(DbWriteRequest::Save) => { + if self.dir_path.is_some() { + let parts_ids = self + .caches + .raw_cache + .keys() + .cloned() + .collect::<Vec<PartId>>(); + let jobs = parts_ids + .into_iter() + .map(|part_id| WorkerReq { + part_id, + req_id, + req_content: WorkerReqContent::WriteRawPartInFile { + raw_part_datas: self + .caches + .raw_cache + .remove(&part_id) + .unwrap_or_else(|| panic!(dbg!("dev error"))), + }, + }) + .collect(); + self.pool + .new_jobs(&mut self.pending_reqs_senders, req_id, jobs); + } + } + DbRequest::Write(DbWriteRequest::Write { + datas, + cache, + compression, + }) => { + // TODO + let mut datas_by_parts: BTreeMap<PartId, C> = BTreeMap::new(); + for (k, v) in datas { + datas_by_parts + .entry(k.clone().into()) + .or_insert_with(C::new) + .insert(k, v); + } + if cache { + for (part_id, part_datas) in datas_by_parts.into_iter() { + self.caches + .cache + .entry(part_id) + .or_insert_with(C::new) + .extend(part_datas); + } + } else { + self.pool.new_jobs( + &mut self.pending_reqs_senders, + req_id, + if self.dir_path.is_some() { + datas_by_parts + .into_iter() + .map(|(part_id, part_datas)| WorkerReq { + part_id, + req_id, + req_content: WorkerReqContent::WritePartInFile { + part_datas, + compression: compression.unwrap_or_else(|| { + CompressionLevel::new(0) + }), + }, + }) + .collect() + } else { + datas_by_parts + .into_iter() + .map(|(part_id, part_datas)| WorkerReq { + part_id, + req_id, + req_content: WorkerReqContent::ZipAndSerPart { + part_datas, + compression: compression.unwrap_or_else(|| { + CompressionLevel::new(0) + }), + }, + }) + .collect() + }, + ); + } + } + DbRequest::Write(DbWriteRequest::Remove { .. }) => {} + DbRequest::Write(DbWriteRequest::Phantom(_, _)) => {} + } + } + Ok(DbOrchestratorMsg::WorkerRes(WorkerRes { + worker_id, + part_id, + req_id, + req_result, + })) => { + let _part_id = part_id; + let _ = self.pool.worker_available(worker_id); + match req_result { + WorkerReqResult::AllPartsIds(parts_ids) => { + if let DbSettingsBackend::Files { cache_all_db, .. } = + self.settings.backend + { + if cache_all_db { + // Get all parts data + self.pool.new_jobs( + &mut self.pending_reqs_senders, + req_id, + parts_ids + .iter() + .map(|part_id| WorkerReq { + part_id: *part_id, + req_id, + req_content: WorkerReqContent::ReadPartFromFile { + unzip_and_deser: true, + }, + }) + .collect(), + ); + } + } + + for _ in 0..self.get_all_parts_ids.len() { + let req_id = self + .get_all_parts_ids + .pop() + .unwrap_or_else(|| panic!(dbg!("dev error"))); + self.send_response( + req_id, + Ok(DbResContent::AllPartsIds(parts_ids.clone())), + ); + } + + self.all_parts_ids = Some(parts_ids); + } + WorkerReqResult::SaveOk => { + self.send_response(req_id, Ok(DbResContent::SaveOk)); + } + _ => {} + } + } + Err(_) => {} + } + } + let close_req_id = close_req_id.unwrap_or_else(|| panic!(dbg!("dev error"))); + let send_close_result_to = self + .pending_reqs_senders + .remove(&close_req_id) + .unwrap_or_else(|| panic!(dbg!("dev error"))); + self.pool.stop(close_req_id, send_close_result_to); + } + + fn load_datas(&mut self) { + if let DbSettingsBackend::Files { cache_all_db, .. } = self.settings.backend { + if cache_all_db { + self.pool.new_jobs( + &mut self.pending_reqs_senders, + DbReqId(0), + vec![WorkerReq { + part_id: PartId(0), + req_id: DbReqId(0), + req_content: WorkerReqContent::ListAllPartsIds, + }], + ); + } + } + } + + fn get_parts_datas(&mut self, req_id: DbReqId, parts_ids: Vec<PartId>) -> (bool, Vec<PartId>) { + let mut missing_parts = Vec::new(); + let jobs: Vec<WorkerReq<K, V, C>> = parts_ids + .iter() + .filter_map(|part_id| { + if let Some(raw_part_datas) = self.caches.raw_cache.remove(part_id) { + self.raw_parts_being_extracted.insert(*part_id); + Some(WorkerReq { + part_id: *part_id, + req_id, + req_content: WorkerReqContent::UnzipAndDeserPart { raw_part_datas }, + }) + } else if self.dir_path.is_some() { + Some(WorkerReq { + part_id: *part_id, + req_id, + req_content: WorkerReqContent::ReadPartFromFile { + unzip_and_deser: true, + }, + }) + } else { + missing_parts.push(*part_id); + None + } + }) + .collect(); + let send_jobs = if !jobs.is_empty() { + self.pool + .new_jobs(&mut self.pending_reqs_senders, req_id, jobs); + true + } else { + false + }; + (send_jobs, missing_parts) + } + + fn read_values(&mut self, req_id: DbReqId, keys: Vec<K>) { + let mut send_jobs = false; + let mut keys_by_parts: BTreeMap<PartId, Vec<K>> = BTreeMap::new(); + for key in keys { + keys_by_parts + .entry(key.clone().into()) + .or_insert_with(Vec::new) + .push(key); + } + + // Get from cache + let mut missing_keys_by_parts = BTreeMap::new(); + let datas = keys_by_parts + .iter() + .filter_map(|(part_id, part_keys)| { + if let Some(part_datas) = self.caches.cache.get(part_id) { + Some( + part_keys + .iter() + .filter_map(|k| { + if let Some(v) = part_datas.get(k) { + Some((k.clone(), v.clone())) + } else { + missing_keys_by_parts + .entry(*part_id) + .or_insert_with(Vec::new) + .push(k.clone()); + None + } + }) + .collect::<Vec<(K, V)>>(), + ) + } else { + missing_keys_by_parts.insert(*part_id, part_keys.to_vec()); + None + } + }) + .flatten() + .collect::<C>(); + // Get missing parts from raw cache and files + let mut missing_parts = Vec::new(); + if !missing_keys_by_parts.is_empty() { + let (send_jobs_, missing_parts_) = + self.get_parts_datas(req_id, missing_keys_by_parts.keys().cloned().collect()); + send_jobs = send_jobs_; + missing_parts = missing_parts_; + self.pending_read_reqs_remaining_keys_by_parts + .insert(req_id, missing_keys_by_parts); + } + // If no jobs sent, we can send response now + if !send_jobs { + let mut missing_keys_by_parts = self + .pending_read_reqs_remaining_keys_by_parts + .remove(&req_id) + .unwrap_or_else(BTreeMap::new); + let missing_keys = missing_parts + .into_iter() + .filter_map(|part_id| missing_keys_by_parts.remove(&part_id)) + .flatten() + .collect(); + self.send_response( + req_id, + Ok(DbResContent::Values { + found: datas, + missing_keys, + }), + ); + } + + self.pending_reqs_remaining_parts + .insert(req_id, keys_by_parts.into_iter().map(|(k, _)| k).collect()); + } + + fn send_response( + &mut self, + req_id: DbReqId, + req_result: Result<DbResContent<K, V, C>, DbError>, + ) { + let _ = self + .pending_reqs_senders + .remove(&req_id) + .unwrap_or_else(|| panic!(dbg!("dev error"))) + .send(DbResponse { req_id, req_result }.into()); + } +} diff --git a/lib/tools/kv-parts-dbs/src/readers.rs b/lib/tools/kv-parts-dbs/src/readers.rs new file mode 100644 index 0000000000000000000000000000000000000000..e3b1c266c0d933073d2be62a61e73ef91f18edad --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/readers.rs @@ -0,0 +1,181 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Read databases persisted in compressed binary files. + +use crate::compression::uncompress_binary_datas; +use crate::kv_collection::KvPartsCollection; +use crate::requests::DatasFormat; +use crate::responses::DbDatas; +use crate::PartId; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::fs::File; +use std::io::Read; +use std::path::PathBuf; + +#[derive(Debug)] +/// DB read error +pub enum DbReadError { + /// Serialize error + DeserializeError(bincode::Error), + /// I/O error + IoError(std::io::Error), + /// Part not found + PartNotFound(PartId), +} + +impl std::fmt::Display for DbReadError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + DbReadError::DeserializeError(e) => write!(f, "DbReadError: deserialize error: {}", e), + DbReadError::IoError(e) => write!(f, "DbReadError: I/O error: {}", e), + DbReadError::PartNotFound(part_id) => write!( + f, + "DbReadError: part '{}' not found", + part_id.to_file_name() + ), + } + } +} + +impl From<std::io::Error> for DbReadError { + fn from(err: std::io::Error) -> Self { + DbReadError::IoError(err) + } +} + +impl From<bincode::Error> for DbReadError { + fn from(err: bincode::Error) -> Self { + DbReadError::DeserializeError(err) + } +} + +/*/// Read on DB part +pub fn read_db_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + db: &BinDb, + part_id: PartId, + unzip_and_deser: bool, +) -> Result<DbDatas<D>, DbReadError> { + match db { + BinDb::File { ref dir_path } => read_part_from_file(dir_path, part_id, unzip_and_deser), + BinDb::Mem { ref parts_datas } => { + if let Some(compressed_bin_datas) = parts_datas.get(&part_id) { + if unzip_and_deser { + Ok(DbDatas::Datas(uncompress_and_deserialize( + compressed_bin_datas, + )?)) + } else { + Ok(DbDatas::CompressedBinDatas(compressed_bin_datas.to_vec())) + } + } else { + Err(DbReadError::PartNotFound(part_id)) + } + } + } +} + +/// Read several Db parts +pub fn read_db_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + db: &BinDb, + parts_ids: &[PartId], + unzip_and_deser: bool, +) -> Result<HashMap<PartId, DbDatas<D>>, DbReadError> { + match db { + BinDb::File { ref dir_path } => parts_ids + .par_iter() + .map(|part_id| { + read_part_from_file(dir_path, *part_id, unzip_and_deser) + .map(|datas| (*part_id, datas)) + }) + .collect::<Result<HashMap<PartId, DbDatas<D>>, DbReadError>>(), + BinDb::Mem { ref parts_datas } => parts_ids + .par_iter() + .map(|part_id| { + if let Some(compressed_bin_datas) = parts_datas.get(&part_id) { + if unzip_and_deser { + uncompress_and_deserialize(compressed_bin_datas) + .map(|datas| (*part_id, DbDatas::Datas(datas))) + } else { + Ok(( + *part_id, + DbDatas::CompressedBinDatas(compressed_bin_datas.to_vec()), + )) + } + } else { + Err(DbReadError::PartNotFound(*part_id)) + } + }) + .collect::<Result<HashMap<PartId, DbDatas<D>>, DbReadError>>(), + } +}*/ + +#[inline] +pub fn uncompress_and_deserialize<D: DeserializeOwned>( + compressed_bin_datas: &[u8], +) -> Result<D, DbReadError> { + let bin_datas = uncompress_binary_datas(compressed_bin_datas)?; + Ok(bincode::deserialize(&bin_datas[..])?) +} + +pub fn read_part_from_file<K, V, C>( + dir_path: &PathBuf, + part_id: PartId, + expected_format: DatasFormat, +) -> Result<DbDatas<K, V, C>, DbReadError> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + let compressed_bin_datas = read_raw_part_from_file(dir_path, part_id)?; + + match expected_format { + DatasFormat::SerAndZip => { + let mut raw_parts_datas = BTreeMap::new(); + raw_parts_datas.insert(part_id, compressed_bin_datas); + Ok(DbDatas::SerAndZip(raw_parts_datas)) + } + DatasFormat::UnzipAndSer => { + let mut ser_parts_datas = BTreeMap::new(); + ser_parts_datas.insert(part_id, uncompress_binary_datas(&compressed_bin_datas)?); + Ok(DbDatas::UnzipAndSer(ser_parts_datas)) + } + DatasFormat::UnzipAndDeser => { + let mut parts_datas = BTreeMap::new(); + parts_datas.insert( + part_id, + uncompress_and_deserialize(&compressed_bin_datas[..])?, + ); + Ok(DbDatas::UnzipAndDeser(parts_datas)) + } + } +} + +pub fn read_raw_part_from_file( + dir_path: &PathBuf, + part_id: PartId, +) -> Result<Vec<u8>, DbReadError> { + let mut file_path = dir_path.clone(); + file_path.push(part_id.to_file_name()); + + let mut file = File::open(file_path)?; + let mut compressed_bin_datas = Vec::new(); + file.read_to_end(&mut compressed_bin_datas)?; + + Ok(compressed_bin_datas) +} diff --git a/lib/tools/kv-parts-dbs/src/requests.rs b/lib/tools/kv-parts-dbs/src/requests.rs new file mode 100644 index 0000000000000000000000000000000000000000..e6a79c0a9bb2a59bf353167523cd5283b1ce41b3 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/requests.rs @@ -0,0 +1,143 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Define possible requests to a KvPartsDB + +use crate::compression::CompressionLevel; +use crate::kv_collection::KvPartsCollection; +use crate::responses::DbResponse; +use crate::{DbReqId, PartId}; +use fnv::FnvHashSet; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::mpsc; + +/// Datas format +#[derive(Debug, Copy, Clone)] +pub enum DatasFormat { + /// Compressed and serialize + SerAndZip, + /// Uncompressed and serialize + UnzipAndSer, + /// Uncompressed and deserialize + UnzipAndDeser, +} + +/// Database Request +#[derive(Debug, Clone)] +pub struct DbRequestMsg<K, V, C, M> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, + C: KvPartsCollection<K, V>, +{ + /// Request id + pub req_id: DbReqId, + /// Request content + pub req_content: DbRequest<K, V, C>, + /// The response will be sent via this sender + pub send_res_to: mpsc::Sender<M>, +} + +/// Database Request content +#[derive(Debug, Clone)] +pub enum DbRequest<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Read request + Read(DbReadRequest<K>), + /// Write request + Write(DbWriteRequest<K, V, C>), + /// Admin request + Admin(DbAdminRequest), +} + +/// Database read request +#[derive(Debug, Clone)] +pub enum DbReadRequest<K> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, +{ + /// Get all parts ids + GetAllPartsIds, + /// Force the retention of specific parts in the cache + ForceCacheParts { + /// Parts identifiers + parts_id: FnvHashSet<PartId>, + }, + /// Read values + ReadValues { + /// Keys + keys: Vec<K>, + }, + /// Read parts + ReadParts { + /// Identifiers of the requested parts + parts_ids: Vec<PartId>, + /// Expected format + expected_format: DatasFormat, + }, + /// Disable forcing retention of specific parts in the cache + UnforceCacheParts { + /// Parts identifiers + parts_id: Vec<PartId>, + }, +} + +/// Database write request +#[derive(Debug, Clone)] +pub enum DbWriteRequest<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Write values + Write { + /// Data to be written + datas: C, + /// Write in cache or write directly in backend + cache: bool, + /// Compression level (if None, use value in db settings) + compression: Option<CompressionLevel>, + }, + /// Remove values + Remove { + /// Keys to be removed + keys: Vec<K>, + }, + /// Save write cache in backend + Save, + /// Phantom + Phantom(PhantomData<K>, PhantomData<V>), +} + +/// Database admin request +#[derive(Debug, Copy, Clone)] +pub enum DbAdminRequest { + /// Close database + Close, +} diff --git a/lib/tools/kv-parts-dbs/src/responses.rs b/lib/tools/kv-parts-dbs/src/responses.rs new file mode 100644 index 0000000000000000000000000000000000000000..9c172eedb850994a2737498bccf80701d437231b --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/responses.rs @@ -0,0 +1,94 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Define possible responses to a DB request + +use crate::errors::DbError; +use crate::kv_collection::KvPartsCollection; +use crate::{DbReqId, PartId}; +use fnv::FnvHashSet; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::marker::PhantomData; + +/// Database Response +#[derive(Debug)] +pub struct DbResponse<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Request id + pub req_id: DbReqId, + /// Request result + pub req_result: Result<DbResContent<K, V, C>, DbError>, +} + +/// DB response content +#[derive(Debug, PartialEq, Eq)] +pub enum DbResContent<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Successfully load all DB parts in memory + LoadDbOk, + /// All parts ids + AllPartsIds(FnvHashSet<PartId>), + /// Values read or deleted + Values { + /// Found values + found: C, + /// List of requested keys that do not exist in the database + missing_keys: Vec<K>, + }, + /// Values by part + ValuesByPart { + /// Found values by part + found: BTreeMap<PartId, C>, + /// List of requested keys that do not exist in the database + missing_keys: Vec<K>, + }, + /// Parts datas + Parts(DbDatas<K, V, C>), + /// Successfully write datas + WriteOk, + /// Successfully save DB into backend + SaveOk, + /// Successfully close DB + CloseOk, +} + +/// DB Datas +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DbDatas<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Compressed and serialize + SerAndZip(BTreeMap<PartId, Vec<u8>>), + /// Uncompressed and serialize + UnzipAndSer(BTreeMap<PartId, Vec<u8>>), + /// Uncompressed and deserialize + UnzipAndDeser(BTreeMap<PartId, C>), + /// Phantom + Phantom(PhantomData<K>, PhantomData<V>), +} diff --git a/lib/tools/kv-parts-dbs/src/settings.rs b/lib/tools/kv-parts-dbs/src/settings.rs new file mode 100644 index 0000000000000000000000000000000000000000..6568225855592a668fef6ea070077851d2423360 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/settings.rs @@ -0,0 +1,195 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Define KvPartsDB settings + +use crate::compression::CompressionLevel; +use std::num::NonZeroUsize; +use std::path::PathBuf; + +#[derive(Debug, Default, Clone)] +/// Builder for DbSettings +pub struct DbSettingsBuilder { + /// Backend settings + backend: Option<DbSettingsBackend>, + /// Cache sixe (in number of items) + cache_max_size: Option<NonZeroUsize>, + /// Maximum number of values to clone in the cache per request + max_clone_values: Option<usize>, + /// Default compression level + default_compression: Option<CompressionLevel>, + // Number of workers in the pool processing requests to the database + workers_count: Option<usize>, +} + +impl DbSettingsBuilder { + /// Create builder + pub fn new() -> Self { + DbSettingsBuilder { + backend: None, + cache_max_size: None, + max_clone_values: None, + default_compression: None, + workers_count: None, + } + } + + /// Number of workers in the pool processing requests to the database + pub fn cache_max_size(&mut self, cache_max_size: NonZeroUsize) -> &mut Self { + self.cache_max_size = Some(cache_max_size); + self + } + + /// Change maximum number of values to clone in the cache per request + pub fn max_clone_values(&mut self, max_clone_values: usize) -> &mut Self { + self.max_clone_values = Some(max_clone_values); + self + } + + /// Change default compression level + pub fn default_compression(&mut self, default_compression: usize) -> &mut Self { + self.default_compression = Some(CompressionLevel::new(default_compression as u32)); + self + } + + /// Number of workers in the pool processing requests to the database + pub fn workers_count(&mut self, workers_count: usize) -> &mut Self { + self.workers_count = Some(workers_count); + self + } + + /// Use file database + pub fn file_db(&mut self, dir_path: PathBuf) -> &mut Self { + self.backend = Some(DbSettingsBackend::new_file_backend(dir_path)); + self + } + + /// Use memory database + pub fn memory_db(&mut self) -> &mut Self { + self.backend = Some(DbSettingsBackend::Mem); + self + } + + /// Change raw_cache_max_size param (file backend only) + pub fn raw_cache_max_size(&mut self, raw_cache_max_size_: usize) -> &mut Self { + if let Some(DbSettingsBackend::Files { + ref mut raw_cache_max_size, + .. + }) = self.backend + { + *raw_cache_max_size = raw_cache_max_size_; + } else { + panic!("raw_cache_max_size parameter available only for file database !") + } + self + } + + /// Change cache_all_db param (file backend only) + pub fn cache_all_db(&mut self, cache_all_db_: bool) -> &mut Self { + if let Some(DbSettingsBackend::Files { + ref mut cache_all_db, + .. + }) = self.backend + { + *cache_all_db = cache_all_db_; + } else { + panic!("cache_all_db parameter available only for file database !") + } + self + } + + /// Build DbSettings + pub fn build(&self) -> DbSettings { + DbSettings { + cache_max_size: self.cache_max_size, + max_clone_values: self + .max_clone_values + .unwrap_or(*crate::constants::DEFAULT_MAX_CLONED_VALUES), + default_compression: self + .default_compression + .unwrap_or_else(|| CompressionLevel::new(1u32)), + backend: self.backend.clone().unwrap_or(DbSettingsBackend::Mem), + workers_count: self.workers_count.unwrap_or_else(num_cpus::get), + } + } +} + +#[derive(Debug, Clone)] +/// DB Settings +pub struct DbSettings { + /// Backend settings + pub backend: DbSettingsBackend, + /// Cache max sixe (in number of items) + /// None = infinite size + pub cache_max_size: Option<NonZeroUsize>, + /// Maximum number of values to clone in the cache per request + pub max_clone_values: usize, + /// Default compression level + pub default_compression: CompressionLevel, + /// Number of workers in the pool processing requests to the database + pub workers_count: usize, +} + +impl DbSettings { + /// Default settings + pub fn default(dir_path: PathBuf) -> Self { + DbSettings { + backend: DbSettingsBackend::Files { + dir_path, + raw_cache_max_size: *crate::constants::DEFAULT_MAX_RAW_CACHE_SIZE, + cache_all_db: false, + }, + cache_max_size: NonZeroUsize::new(*crate::constants::DEFAULT_MAX_CACHE_SIZE), + max_clone_values: *crate::constants::DEFAULT_MAX_CLONED_VALUES, + default_compression: CompressionLevel::new(1u32), + workers_count: num_cpus::get(), + } + } +} + +/// KvPartsDb backend +#[derive(Debug, Clone)] +pub enum DbSettingsBackend { + /// Files database + Files { + /// Path to directory store database files + dir_path: PathBuf, + /// Raw cache max size (in number of parts) + raw_cache_max_size: usize, + /// Cache all database in memory (false by default) + /// The maximum cache size is not considered + cache_all_db: bool, + }, + /// Memory database + Mem, +} + +impl DbSettingsBackend { + fn new_file_backend(dir_path: PathBuf) -> Self { + DbSettingsBackend::Files { + dir_path, + raw_cache_max_size: *crate::constants::DEFAULT_MAX_RAW_CACHE_SIZE, + cache_all_db: false, + } + } + /// Get directory path + pub fn get_dir_path(&self) -> Option<PathBuf> { + if let DbSettingsBackend::Files { dir_path, .. } = self { + Some(dir_path.clone()) + } else { + None + } + } +} diff --git a/lib/tools/kv-parts-dbs/src/threadpool/mod.rs b/lib/tools/kv-parts-dbs/src/threadpool/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..975c771243fd070ff0337808d01931a86ab8e05d --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/threadpool/mod.rs @@ -0,0 +1,204 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Manage custom threadpool for database. + +pub mod workers; +pub mod workers_msg; + +use crate::errors::DbError; +use crate::kv_collection::KvPartsCollection; +use crate::orchestrator::DbOrchestratorMsg; +use crate::responses::{DbResContent, DbResponse}; +use crate::threadpool::workers_msg::{WorkerReq, WorkerReqContent}; +use crate::{DbReqId, PartId}; +use core_affinity::CoreId; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::cmp; +use std::collections::{BTreeMap, VecDeque}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::path::PathBuf; +use std::sync::mpsc::{SendError, Sender}; +use std::thread; +use std::thread::JoinHandle; + +#[derive(Debug)] +pub struct ThreadPool<K, V, C, M> +where + K: 'static + + Clone + + Debug + + DeserializeOwned + + Eq + + Into<PartId> + + Send + + Serialize + + Sized + + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + workers_senders: Vec<Sender<WorkerReq<K, V, C>>>, + workers_handlers: Vec<JoinHandle<()>>, + available_workers: VecDeque<usize>, + pending_jobs: VecDeque<WorkerReq<K, V, C>>, + phantom: PhantomData<M>, +} + +impl<K, V, C, M> ThreadPool<K, V, C, M> +where + K: 'static + + Clone + + Debug + + DeserializeOwned + + Eq + + Into<PartId> + + Send + + Serialize + + Sized + + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + pub fn new( + workers_count: usize, + db_dir_path: Option<PathBuf>, + orchestrator_sender: Sender<DbOrchestratorMsg<K, V, C, M>>, + ) -> Self { + let core_ids: Vec<Option<CoreId>> = if let Some(core_ids) = core_affinity::get_core_ids() { + core_ids.into_iter().map(Some).collect() + } else { + (0..num_cpus::get()).map(|_| None).collect() + }; + let cpus_count = core_ids.len(); + let mut workers_senders = Vec::with_capacity(workers_count); + let mut workers_handlers = Vec::with_capacity(workers_count); + + for i in 0..workers_count { + let dir_path = db_dir_path.clone(); + let orchestrator_sender_clone = orchestrator_sender.clone(); + let (sender, recv) = std::sync::mpsc::channel(); + workers_senders.push(sender); + let core_id = core_ids[i % cpus_count]; + + let thread_handler = thread::Builder::new() + .name(format!("KvPartsDB worker {}", i)) + .spawn(move || { + crate::threadpool::workers::launch_worker::<K, V, C, M>( + i, + core_id, + dir_path, + orchestrator_sender_clone, + recv, + ) + }) + .expect("Fatal error: fail to spawn worker thread !"); + workers_handlers.push(thread_handler); + } + + ThreadPool { + workers_senders, + workers_handlers, + available_workers: (0..workers_count).collect(), + pending_jobs: VecDeque::new(), + phantom: PhantomData, + } + } + + pub fn new_jobs( + &mut self, + pending_reqs_senders: &mut BTreeMap<DbReqId, Sender<M>>, + req_id: DbReqId, + jobs: Vec<WorkerReq<K, V, C>>, + ) { + self.pending_jobs.append(&mut VecDeque::from(jobs)); + if self.launch_pending_jobs().is_err() { + let _ = pending_reqs_senders + .remove(&req_id) + .unwrap_or_else(|| panic!(dbg!("dev error"))) + .send( + DbResponse { + req_id, + req_result: Err(DbError::SendReqError), + } + .into(), + ); + } + } + + #[inline] + pub fn worker_available( + &mut self, + worker_id: usize, + ) -> Result<(), SendError<WorkerReq<K, V, C>>> { + self.available_workers.push_back(worker_id); + self.launch_pending_jobs() + } + + fn launch_pending_jobs(&mut self) -> Result<(), SendError<WorkerReq<K, V, C>>> { + let count_launchable_jobs = cmp::min(self.pending_jobs.len(), self.available_workers.len()); + for _ in 0..count_launchable_jobs { + let worker_id = self + .available_workers + .pop_front() + .unwrap_or_else(|| panic!(dbg!("dev error"))); + let job = self + .pending_jobs + .pop_front() + .unwrap_or_else(|| panic!(dbg!("dev error"))); + self.workers_senders[worker_id].send(job)? + } + Ok(()) + } + + pub fn stop(self, req_id: DbReqId, send_res_to: Sender<M>) { + let mut err = None; + for (worker_sender, worker_handler) in self + .workers_senders + .into_iter() + .zip(self.workers_handlers.into_iter()) + { + let _ = worker_sender.send(WorkerReq { + part_id: PartId(0), + req_id: DbReqId(0), + req_content: WorkerReqContent::Stop, + }); + if let Err(err_) = worker_handler.join() { + err = Some(err_); + } + } + let req_result = if err.is_some() { + Err(DbError::CloseError) + } else { + Ok(DbResContent::CloseOk) + }; + + let _ = send_res_to.send(DbResponse { req_id, req_result }.into()); + } +} diff --git a/lib/tools/kv-parts-dbs/src/threadpool/workers.rs b/lib/tools/kv-parts-dbs/src/threadpool/workers.rs new file mode 100644 index 0000000000000000000000000000000000000000..766faff65b123e9d3f17587f8d844d4dbb1db8ed --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/threadpool/workers.rs @@ -0,0 +1,240 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Manage workers threads of a DB + +use crate::constants; +use crate::errors::DbError; +use crate::kv_collection::KvPartsCollection; +use crate::orchestrator::DbOrchestratorMsg; +use crate::readers::{read_part_from_file, uncompress_and_deserialize}; +use crate::requests::DatasFormat; +use crate::responses::{DbDatas, DbResponse}; +use crate::threadpool::workers_msg::{WorkerReq, WorkerReqContent, WorkerReqResult, WorkerRes}; +use crate::PartId; +use core_affinity::CoreId; +use fnv::FnvHashSet; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::fs; +use std::path::PathBuf; +use std::sync::mpsc; + +pub fn launch_worker<K, V, C, M>( + worker_id: usize, + core_id: Option<CoreId>, + mut dir_path_opt: Option<PathBuf>, + orchestrator_sender: mpsc::Sender<DbOrchestratorMsg<K, V, C, M>>, + recv: mpsc::Receiver<WorkerReq<K, V, C>>, +) where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + if let Some(core_id) = core_id { + core_affinity::set_for_current(core_id); + } + + while let Ok(WorkerReq { + part_id, + req_id, + req_content, + }) = recv.recv() + { + match req_content { + WorkerReqContent::ReadPartFromFile { unzip_and_deser } => { + let req_result: WorkerReqResult<K, V, C> = if let Some(ref dir_path) = dir_path_opt + { + match read_part_from_file::<K, V, C>(&dir_path, part_id, DatasFormat::SerAndZip) + { + Ok(datas) => { + if let DbDatas::SerAndZip(mut raw_parts_datas) = datas { + let raw_datas = raw_parts_datas + .remove(&part_id) + .unwrap_or_else(|| panic!(dbg!("dev error"))); + if unzip_and_deser { + WorkerReqResult::PartDatas { + datas: uncompress_and_deserialize(&raw_datas).expect( + "KvPartsDB: your DB is corrupted, please reset data !", + ), + raw_datas, + } + } else { + WorkerReqResult::PartRawDatas(raw_datas) + } + } else { + panic!(dbg!("dev error")); + } + } + Err(err) => WorkerReqResult::Err(DbError::DbReadError(err)), + } + } else { + WorkerReqResult::Err(DbError::ForbiddenRequest) + }; + if orchestrator_sender + .send(DbOrchestratorMsg::WorkerRes(WorkerRes { + worker_id, + part_id, + req_id, + req_result, + })) + .is_err() + { + // Break worker loop (quit thread) + break; + } + } + WorkerReqContent::ListAllPartsIds => { + let req_result: WorkerReqResult<K, V, C> = if let Some(ref dir_path) = dir_path_opt + { + let all_parts_ids = list_all_parts_ids(dir_path); + WorkerReqResult::AllPartsIds(all_parts_ids) + } else { + WorkerReqResult::Err(DbError::ForbiddenRequest) + }; + if orchestrator_sender + .send(DbOrchestratorMsg::WorkerRes(WorkerRes { + worker_id, + part_id, + req_id, + req_result, + })) + .is_err() + { + // Break worker loop (quit thread) + break; + } + } + WorkerReqContent::WritePartInFile { + part_datas, + compression, + } => { + let req_result = if let Some(ref mut dir_path) = dir_path_opt { + match crate::writers::write_part_in_file( + dir_path, + part_id, + &part_datas, + compression, + ) { + Ok(()) => WorkerReqResult::SaveOk, + Err(err) => WorkerReqResult::Err(DbError::from(err)), + } + } else { + WorkerReqResult::Err(DbError::ForbiddenRequest) + }; + if orchestrator_sender + .send(DbOrchestratorMsg::WorkerRes(WorkerRes { + worker_id, + part_id, + req_id, + req_result, + })) + .is_err() + { + break; + } + } + WorkerReqContent::WriteRawPartInFile { raw_part_datas } => { + let req_result = if let Some(ref mut dir_path) = dir_path_opt { + match crate::writers::write_raw_part_in_file(dir_path, part_id, &raw_part_datas) + { + Ok(()) => WorkerReqResult::SaveOk, + Err(err) => WorkerReqResult::Err(DbError::from(err)), + } + } else { + WorkerReqResult::Err(DbError::ForbiddenRequest) + }; + if orchestrator_sender + .send(DbOrchestratorMsg::WorkerRes(WorkerRes { + worker_id, + part_id, + req_id, + req_result, + })) + .is_err() + { + break; + } + } + WorkerReqContent::ZipAndSerPart { + part_datas, + compression, + } => { + let _part_datas = part_datas; + let _compression = compression; + } + WorkerReqContent::UnzipAndDeserPart { raw_part_datas } => { + let _raw_part_datas = raw_part_datas; + } + WorkerReqContent::Stop => break, + WorkerReqContent::_Phantom(_, _) => {} + } + } +} + +fn list_all_parts_ids(dir_path: &PathBuf) -> FnvHashSet<PartId> { + let file_list_result = fs::read_dir(dir_path); + let db_name = dir_path + .file_name() + .unwrap_or_else(|| panic!(dbg!("dev error: This check is supposed to be done before"))) + .to_str() + .unwrap_or_else(|| panic!(dbg!("dev error: This check is supposed to be done before"))); + if let Err(err) = file_list_result { + error!("Fail to read DB '{}': {}", db_name, err); + panic!("Fail to read DB '{}': {}", db_name, err); + } + + let mut parts_ids = FnvHashSet::default(); + + for dir_entry in file_list_result.expect("Dev error: err case must be treat before.") { + if let Ok(dir_entry) = dir_entry { + if let Ok(file_name) = dir_entry.file_name().into_string() { + let file_name_len = file_name.len(); + + if let Ok(file_type) = dir_entry.file_type() { + if file_type.is_file() + && file_name_len > (*constants::PART_FILE_NAME_MIN_LEN) + && file_name[0..constants::PART_FILE_NAME_BEGIN.len()] + == *constants::PART_FILE_NAME_BEGIN + && file_name[file_name_len - constants::PART_FILE_NAME_END.len()..] + == *constants::PART_FILE_NAME_END + { + let part_id_result: Result<usize, std::num::ParseIntError> = file_name + [constants::PART_FILE_NAME_BEGIN.len() + ..file_name_len - constants::PART_FILE_NAME_END.len()] + .parse(); + + if let Ok(part_id) = part_id_result { + parts_ids.insert(PartId(part_id as u32)); + } else { + warn!( + "KvPartsDb: DB '{}' contains an intruder file: '{}'", + db_name, file_name + ) + } + } + } + } + } + } + parts_ids +} diff --git a/lib/tools/kv-parts-dbs/src/threadpool/workers_msg.rs b/lib/tools/kv-parts-dbs/src/threadpool/workers_msg.rs new file mode 100644 index 0000000000000000000000000000000000000000..e13a053cdb017ba7e06e199b0656ad33b3c408ce --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/threadpool/workers_msg.rs @@ -0,0 +1,112 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! define workers messages + +use crate::compression::CompressionLevel; +use crate::errors::DbError; +use crate::kv_collection::KvPartsCollection; +use crate::{DbReqId, PartId}; +use fnv::FnvHashSet; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::marker::PhantomData; + +#[derive(Debug, Clone)] +/// Worker request +pub struct WorkerReq<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + /// Part id + pub part_id: PartId, + /// User request id + pub req_id: DbReqId, + /// Orchestrator request content + pub req_content: WorkerReqContent<K, V, C>, +} + +#[derive(Debug, Clone)] +/// Worker request content +pub enum WorkerReqContent<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: KvPartsCollection<K, V>, +{ + ReadPartFromFile { + unzip_and_deser: bool, + }, + ListAllPartsIds, + WritePartInFile { + part_datas: C, + compression: CompressionLevel, + }, + WriteRawPartInFile { + raw_part_datas: Vec<u8>, + }, + ZipAndSerPart { + part_datas: C, + compression: CompressionLevel, + }, + UnzipAndDeserPart { + raw_part_datas: Vec<u8>, + }, + Stop, + _Phantom(PhantomData<K>, PhantomData<V>), +} + +#[derive(Debug)] +/// Worker response +pub struct WorkerRes<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, +{ + /// Worker id + pub worker_id: usize, + /// Part id + pub part_id: PartId, + /// User request id + pub req_id: DbReqId, + /// Orchestrator request result + pub req_result: WorkerReqResult<K, V, C>, +} + +#[derive(Debug)] +/// Worker request result +pub enum WorkerReqResult<K, V, C> +where + K: Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: Clone + Debug + DeserializeOwned + Send + Serialize + Sync, + C: Clone + KvPartsCollection<K, V>, +{ + /// List all parts ids + AllPartsIds(FnvHashSet<PartId>), + /// Successfully get part datas + PartDatas { datas: C, raw_datas: Vec<u8> }, + /// Successfully get part raw datas + PartRawDatas(Vec<u8>), + /// Successfully save + SaveOk, + /// Request in error + Err(DbError), + /// Phantom + _Phantom(PhantomData<K>, PhantomData<V>), +} diff --git a/lib/tools/kv-parts-dbs/src/writers.rs b/lib/tools/kv-parts-dbs/src/writers.rs new file mode 100644 index 0000000000000000000000000000000000000000..3be3697b7e3449b3e88c92775058ae4f3db5b454 --- /dev/null +++ b/lib/tools/kv-parts-dbs/src/writers.rs @@ -0,0 +1,155 @@ +// Copyright (C) 2019 Éloïs SANCHEZ +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Write databases persisted in compressed binary files. + +use crate::compression::compress_binary_datas; +use crate::compression::CompressionLevel; +use crate::PartId; +use serde::Serialize; +//use std::collections::HashMap; +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; + +#[derive(Debug)] +/// Write error +pub enum DbWriteError { + /// Serialize error + SerializeError(bincode::Error), + /// I/O error + IoError(std::io::Error), +} + +impl std::fmt::Display for DbWriteError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + DbWriteError::SerializeError(e) => write!(f, "DbReadError: serialize error: {}", e), + DbWriteError::IoError(e) => write!(f, "DbReadError: I/O error: {}", e), + } + } +} + +impl From<std::io::Error> for DbWriteError { + fn from(err: std::io::Error) -> Self { + DbWriteError::IoError(err) + } +} + +impl From<bincode::Error> for DbWriteError { + fn from(err: bincode::Error) -> Self { + DbWriteError::SerializeError(err) + } +} + +/*/// Write on DB part +pub fn write_db_part<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + db: &mut BinDb, + part_id: PartId, + part_datas: &D, + compression: CompressionLevel, +) -> Result<(), DbWriteError> { + match db { + BinDb::File { ref dir_path } => { + write_part_in_file(dir_path, part_id, part_datas, compression) + } + BinDb::Mem { + ref mut parts_datas, + } => { + let compressed_bin_datas = serialize_and_compress(part_datas, compression)?; + parts_datas.insert(part_id, compressed_bin_datas); + Ok(()) + } + } +} + +/// Write several Db parts +pub fn write_db_parts<D: Clone + Debug + DeserializeOwned + Send + Serialize + Sync>( + db: &mut BinDb, + datas: &HashMap<PartId, D>, + compression: CompressionLevel, +) -> Result<(), DbWriteError> { + match db { + BinDb::File { ref dir_path } => { + datas + .par_iter() + .map(|(part_id, part_datas)| { + write_part_in_file(dir_path, *part_id, part_datas, compression) + }) + .collect::<Result<(), DbWriteError>>()?; + } + BinDb::Mem { + ref mut parts_datas, + } => { + let new_parts_datas = datas + .par_iter() + .map(|(part_id, part_datas)| { + match serialize_and_compress(part_datas, compression) { + Ok(compressed_bin_datas) => Ok((*part_id, compressed_bin_datas)), + Err(err) => Err(err), + } + }) + .collect::<Result<(HashMap<PartId, Vec<u8>>), DbWriteError>>()?; + + for (new_part_id, new_part_datas) in new_parts_datas { + parts_datas.insert(new_part_id, new_part_datas); + } + } + } + + Ok(()) +}*/ + +#[inline] +fn _serialize_and_compress<D: Serialize>( + datas: &D, + compression: CompressionLevel, +) -> Result<Vec<u8>, DbWriteError> { + let bin_datas: Vec<u8> = bincode::serialize(datas)?; + Ok(compress_binary_datas(&bin_datas[..], compression)?) +} + +/// Write part datas in its corresponding file +pub fn write_part_in_file<D: Serialize>( + dir_path: &mut PathBuf, + part_id: PartId, + datas: &D, + compression: CompressionLevel, +) -> Result<(), DbWriteError> { + let compressed_bin_datas = _serialize_and_compress(datas, compression)?; + + write_raw_part_in_file(dir_path, part_id, &compressed_bin_datas) +} + +/// Write raw part datas in its corresponding file +pub fn write_raw_part_in_file( + dir_path: &mut PathBuf, + part_id: PartId, + datas: &[u8], +) -> Result<(), DbWriteError> { + dir_path.push(part_id.to_file_name()); + match File::create(dir_path.as_path()) { + Ok(mut file) => { + dir_path.pop(); + file.write_all(&datas[..])?; + + Ok(()) + } + Err(err) => { + dir_path.pop(); + Err(err.into()) + } + } +} diff --git a/lib/tools/kv-parts-dbs/tests/common.rs b/lib/tools/kv-parts-dbs/tests/common.rs new file mode 100644 index 0000000000000000000000000000000000000000..6e83c99df03bff48e7778328dcb54aad6d09cc8b --- /dev/null +++ b/lib/tools/kv-parts-dbs/tests/common.rs @@ -0,0 +1,26 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use kv_parts_dbs::PartId; +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, Debug, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Serialize)] +pub struct KeyInt(pub u32); + +impl Into<PartId> for KeyInt { + fn into(self) -> PartId { + PartId(self.0) + } +} diff --git a/lib/tools/kv-parts-dbs/tests/hash_db.rs b/lib/tools/kv-parts-dbs/tests/hash_db.rs new file mode 100644 index 0000000000000000000000000000000000000000..c2fc4e0111ce9d59e91e59ef8529e2ac0d070bdf --- /dev/null +++ b/lib/tools/kv-parts-dbs/tests/hash_db.rs @@ -0,0 +1,88 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use common::*; +use fnv::FnvBuildHasher; +use kv_parts_dbs::impls::hash_db::KvPartsHashDB; +use kv_parts_dbs::responses::*; +use std::collections::HashMap; +use tests_empty_memory_db::*; + +mod common; +mod tests_empty_memory_db; + +type DefaultHasher = std::hash::BuildHasherDefault<std::collections::hash_map::DefaultHasher>; +type DatasMap = HashMap<KeyInt, u64, DefaultHasher>; +type DatasFnvMap = HashMap<KeyInt, u64, FnvBuildHasher>; + +#[derive(Debug)] +enum Msg { + DbRes(DbResponse<KeyInt, u64, DatasMap>), + Db2Res(DbResponse<KeyInt, u64, DatasFnvMap>), +} + +impl From<DbResponse<KeyInt, u64, DatasMap>> for Msg { + fn from(res: DbResponse<KeyInt, u64, DatasMap>) -> Self { + Msg::DbRes(res) + } +} + +impl Into<Option<DbResponse<KeyInt, u64, DatasMap>>> for Msg { + fn into(self) -> Option<DbResponse<KeyInt, u64, DatasMap>> { + if let Msg::DbRes(res) = self { + Some(res) + } else { + None + } + } +} + +impl From<DbResponse<KeyInt, u64, DatasFnvMap>> for Msg { + fn from(res: DbResponse<KeyInt, u64, DatasFnvMap>) -> Self { + Msg::Db2Res(res) + } +} + +impl Into<Option<DbResponse<KeyInt, u64, DatasFnvMap>>> for Msg { + fn into(self) -> Option<DbResponse<KeyInt, u64, DatasFnvMap>> { + if let Msg::Db2Res(res) = self { + Some(res) + } else { + None + } + } +} + +#[test] +fn tests_empty_memory_hash_db() { + tests_empty_memory_abstract_db::< + KeyInt, + u64, + DatasMap, + Msg, + KvPartsHashDB<KeyInt, u64, DefaultHasher, Msg>, + >(vec![KeyInt(1), KeyInt(2)]); +} + +#[test] +fn tests_empty_memory_fnv_hash_db() { + tests_empty_memory_abstract_db::< + KeyInt, + u64, + DatasFnvMap, + Msg, + KvPartsHashDB<KeyInt, u64, FnvBuildHasher, Msg>, + >(vec![KeyInt(1), KeyInt(2)]); +} diff --git a/lib/tools/kv-parts-dbs/tests/tests_empty_memory_db.rs b/lib/tools/kv-parts-dbs/tests/tests_empty_memory_db.rs new file mode 100644 index 0000000000000000000000000000000000000000..7a3d6a1fcb3b1dbca1c701920fff7326e95dc047 --- /dev/null +++ b/lib/tools/kv-parts-dbs/tests/tests_empty_memory_db.rs @@ -0,0 +1,95 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use kv_parts_dbs::kv_collection::KvPartsCollection; +use kv_parts_dbs::requests::*; +use kv_parts_dbs::responses::*; +use kv_parts_dbs::settings::*; +use kv_parts_dbs::*; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::sync::mpsc::*; +use std::thread; + +pub fn expected_response<K, V, C, M>( + recv: &Receiver<M>, + req_id: DbReqId, + expected: DbResContent<K, V, C>, +) where + K: 'static + Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, +{ + let res: DbResponse<K, V, C> = recv + .recv_timeout(std::time::Duration::from_millis(1_000)) + .expect("No response from DB after timeout !") + .into() + .unwrap(); + assert_eq!(req_id, res.req_id); + assert!(res.req_result.is_ok()); + let res_content = res.req_result.unwrap(); + assert_eq!(expected, res_content); +} + +pub fn tests_empty_memory_abstract_db<K, V, C, M, DB>(keys: Vec<K>) +where + K: 'static + Clone + Debug + DeserializeOwned + Eq + Into<PartId> + Send + Serialize + Sync, + V: 'static + Clone + Debug + DeserializeOwned + Eq + Send + Serialize + Sync, + C: 'static + Clone + KvPartsCollection<K, V>, + M: 'static + + Debug + + From<DbResponse<K, V, C>> + + Into<Option<DbResponse<K, V, C>>> + + Send + + Sync, + DB: AbstractKvPartsDb<K, V, C, M>, +{ + let db = DB::open(DbSettingsBuilder::new().build()).expect("Fail to open DB"); + + let db_reader = db.get_reader(); + + { + let (my_sender, my_recv) = channel(); + { + db_reader + .send_request( + DbReqId(1), + DbReadRequest::ReadValues { keys: keys.clone() }, + my_sender, + ) + .expect("Fail to send DB request"); + } + + expected_response( + &my_recv, + DbReqId(1), + DbResContent::Values { + found: C::new(), + missing_keys: keys, + }, + ); + } + + thread::sleep(std::time::Duration::from_millis(500)); + + db.close().expect("Fail to close DB properly"); +} diff --git a/lib/tools/kv-parts-dbs/tests/tree_db.rs b/lib/tools/kv-parts-dbs/tests/tree_db.rs new file mode 100644 index 0000000000000000000000000000000000000000..8ae64ac4ac225848a5a080c5b61cbd0d22d12bc3 --- /dev/null +++ b/lib/tools/kv-parts-dbs/tests/tree_db.rs @@ -0,0 +1,54 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use common::*; +use kv_parts_dbs::impls::tree_db::KvPartsTreeDB; +use kv_parts_dbs::responses::*; +use std::collections::BTreeMap; +use tests_empty_memory_db::*; + +mod common; +mod tests_empty_memory_db; + +type DatasMap = BTreeMap<KeyInt, u64>; + +#[derive(Debug)] +enum Msg { + DbRes(DbResponse<KeyInt, u64, DatasMap>), + _Other, +} + +impl From<DbResponse<KeyInt, u64, DatasMap>> for Msg { + fn from(res: DbResponse<KeyInt, u64, DatasMap>) -> Self { + Msg::DbRes(res) + } +} + +impl Into<Option<DbResponse<KeyInt, u64, DatasMap>>> for Msg { + fn into(self) -> Option<DbResponse<KeyInt, u64, DatasMap>> { + if let Msg::DbRes(res) = self { + Some(res) + } else { + None + } + } +} + +#[test] +fn tests_empty_memory_tree_db() { + tests_empty_memory_abstract_db::<KeyInt, u64, DatasMap, Msg, KvPartsTreeDB<KeyInt, u64, Msg>>( + vec![KeyInt(1), KeyInt(2)], + ); +}