diff --git a/rust-libs/tools/kv_typed/Cargo.toml b/rust-libs/tools/kv_typed/Cargo.toml index d6a71e55a265b1a22cf6d3dbefc8d2c446978a25..6f7b52371860d64d0f2ad7d436b943c97d616749 100644 --- a/rust-libs/tools/kv_typed/Cargo.toml +++ b/rust-libs/tools/kv_typed/Cargo.toml @@ -12,19 +12,26 @@ edition = "2018" path = "src/lib.rs" [dependencies] +blake3 = { version = "0.3.7", optional = true } +cfg-if = "0.1.10" flume = "0.9.1" kv_typed_code_gen = { path = "../kv_typed_code_gen" } leveldb_minimal = { version = "0.1.0", optional = true } mockall = { version = "0.8.0", optional = true } -parking_lot = { version = "0.11.0", optional = true } +parking_lot = "0.11.0" +paste = "1.0.2" rayon = { version = "1.3.1", optional = true } regex = { version = "1.3.9", optional = true } serde_json = { version = "1.0.53", optional = true } sled = { version = "0.34.4", optional = true } smallvec = { version = "1.4.0", features = ["serde", "write"] } thiserror = "1.0.20" +uninit = "0.4.0" zerocopy = "0.3.0" +[target.'cfg(target_arch="x86_64")'.dependencies] +lmdb-zero = "0.4.4" + [[bench]] name = "compare_backends" harness = false @@ -34,24 +41,23 @@ required-features = ["leveldb_backend", "memory_backend", "sled_backend"] async-std = { version = "1.6.3", features = ["attributes"] } maybe-async = "0.2.0" smallvec = { version = "1.4.0", features = ["serde", "write"] } +tempdir = "0.3.7" unwrap = "1.2.1" # Benches dependencies criterion = { version = "0.3.1" } [features] -default = ["memory_backend"] +#default = ["memory_backend"] async = [] explorer = ["rayon", "regex", "serde_json"] leveldb_backend = ["leveldb_minimal"] -memory_backend = ["parking_lot"] +memory_backend = ["blake3"] sled_backend = ["sled"] -subscription = ["parking_lot"] -sync = [] -mock = ["mockall"] +#mock = ["mockall"] -#default = ["memory_backend", "subscription", "sync"] -#default = ["memory_backend", "subscription", "sync", "explorer"] -#default = ["memory_backend", "subscription", "sync", "mock"] +default = ["memory_backend", "sled_backend"] +#default = ["memory_backend", "explorer"] +#default = ["memory_backend", "mock"] diff --git a/rust-libs/tools/kv_typed/benches/compare_backends.rs b/rust-libs/tools/kv_typed/benches/compare_backends.rs index b89d4be1a2256a7e63a16f2c5c097cd56dfb24f9..ac8e2d92b1ab46012df03e8b2b2a178d78424136 100644 --- a/rust-libs/tools/kv_typed/benches/compare_backends.rs +++ b/rust-libs/tools/kv_typed/benches/compare_backends.rs @@ -3,19 +3,28 @@ use kv_typed::prelude::*; use std::{fmt::Debug, path::PathBuf}; db_schema!(Test, [["c1", col_1, u32, String],]); -const LEVELDB_DIR_PATH: &str = "/dev/shm/kv_typed/benches/compare_backends/leveldb"; +//const LEVELDB_DIR_PATH: &str = "/dev/shm/kv_typed/benches/compare_backends/leveldb"; +//const LMDB_DIR_PATH: &str = "/dev/shm/kv_typed/benches/compare_backends/lmdb"; +const LEVELDB_DIR_PATH: &str = "/home/elois/tmp/kv_typed/benches/compare_backends/leveldb"; +const LMDB_DIR_PATH: &str = "/home/elois/tmp/kv_typed/benches/compare_backends/lmdb"; +const SLED_DIR_PATH: &str = "/home/elois/tmp/kv_typed/benches/compare_backends/sled"; static SMALL_VAL: &str = "abcdefghijklmnopqrst"; static LARGE_VAL: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; fn read_n_entries<B: Backend>(db: &TestDb<B>, n: u32, val: String) { - let mut iter = db.col_1().iter(..).values(); - for _ in 0..n { - assert_eq!(iter.next_res().expect(""), Some(val.clone())); - //assert_eq!(db.col_1().get(&i).expect(""), Some(val.clone())); + for i in 0..n { + assert_eq!(db.col_1().get(&i).expect("db err"), Some(val.clone())); } - assert_eq!(iter.next_res().expect(""), None); + /*db.col_1().iter(.., |iter| { + let mut iter = iter.values(); + for _ in 0..n { + assert_eq!(iter.next_res().expect(""), Some(val.clone())); + //assert_eq!(db.col_1().get(&i).expect(""), Some(val.clone())); + } + assert_eq!(iter.next_res().expect(""), None); + });*/ } -fn write_n_entries<B: Backend>(db: &TestDb<B>, n: u32, val: String) { +fn remove_and_write_n_entries<B: Backend>(db: &TestDb<B>, n: u32, val: String) { for i in 0..n { db.col_1_write().remove(i).expect("fail to write"); db.col_1_write() @@ -23,10 +32,17 @@ fn write_n_entries<B: Backend>(db: &TestDb<B>, n: u32, val: String) { .expect("fail to write"); } } +fn write_n_entries<B: Backend>(db: &TestDb<B>, n: u32, val: String) { + for i in 0..n { + db.col_1_write() + .upsert(i, val.clone()) + .expect("fail to write"); + } +} pub fn benchmark(c: &mut Criterion) { // Read chart config - let read_chart_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); + //let read_chart_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); // Create DBs std::fs::create_dir_all(LEVELDB_DIR_PATH).expect("fail to create leveldb dir"); @@ -35,32 +51,47 @@ pub fn benchmark(c: &mut Criterion) { ..Default::default() }) .expect("fail to open db"); - let mem_db = TestDb::<Mem>::open(MemConf::default()).expect("fail to open db"); + let lmdb_db = + TestDb::<Lmdb>::open(LmdbConf::default().folder_path(PathBuf::from(LMDB_DIR_PATH))) + .expect("fail to open db"); + //let mem_db = TestDb::<Mem>::open(MemConf::default()).expect("fail to open db"); let sled_db = - TestDb::<Sled>::open(SledConf::default().temporary(true)).expect("fail to open db"); + TestDb::<Sled>::open(SledConf::default().path(SLED_DIR_PATH)).expect("fail to open db"); // Test write small values let mut group = c.benchmark_group("write small values"); - group.bench_function("leveldb", |b| { - b.iter(|| write_n_entries::<LevelDb>(&leveldb_db, 100, String::from(SMALL_VAL))) + group.bench_function("lmdb", |b| { + b.iter(|| remove_and_write_n_entries(&lmdb_db, 100, String::from(SMALL_VAL))) }); - group.bench_function("mem", |b| { - b.iter(|| write_n_entries::<Mem>(&mem_db, 100, String::from(SMALL_VAL))) + group.bench_function("leveldb", |b| { + b.iter(|| remove_and_write_n_entries(&leveldb_db, 100, String::from(SMALL_VAL))) }); + /*group.bench_function("mem", |b| { + b.iter(|| remove_and_write_n_entries(&mem_db, 100, String::from(SMALL_VAL))) + });*/ group.bench_function("sled", |b| { - b.iter(|| write_n_entries::<Sled>(&sled_db, 100, String::from(SMALL_VAL))) + b.iter(|| remove_and_write_n_entries(&sled_db, 100, String::from(SMALL_VAL))) }); group.finish(); + // Prepare read test + write_n_entries(&lmdb_db, 100, String::from(SMALL_VAL)); + write_n_entries(&leveldb_db, 100, String::from(SMALL_VAL)); + //write_n_entries(&mem_db, 100, String::from(SMALL_VAL)); + write_n_entries(&sled_db, 100, String::from(SMALL_VAL)); + // Test read small values let mut group = c.benchmark_group("read small values"); - group.plot_config(read_chart_config.clone()); + //group.plot_config(read_chart_config.clone()); + group.bench_function("lmdb", |b| { + b.iter(|| read_n_entries(&lmdb_db, 100, String::from(SMALL_VAL))) + }); group.bench_function("leveldb", |b| { b.iter(|| read_n_entries(&leveldb_db, 100, String::from(SMALL_VAL))) }); - group.bench_function("mem", |b| { + /*group.bench_function("mem", |b| { b.iter(|| read_n_entries(&mem_db, 100, String::from(SMALL_VAL))) - }); + });*/ group.bench_function("sled", |b| { b.iter(|| read_n_entries(&sled_db, 100, String::from(SMALL_VAL))) }); @@ -68,33 +99,47 @@ pub fn benchmark(c: &mut Criterion) { // Test write large values let mut group = c.benchmark_group("write large values"); - group.bench_function("leveldb", |b| { - b.iter(|| write_n_entries::<LevelDb>(&leveldb_db, 100, String::from(LARGE_VAL))) + group.bench_function("lmdb", |b| { + b.iter(|| remove_and_write_n_entries(&lmdb_db, 100, String::from(LARGE_VAL))) }); - group.bench_function("mem", |b| { - b.iter(|| write_n_entries::<Mem>(&mem_db, 100, String::from(LARGE_VAL))) + group.bench_function("leveldb", |b| { + b.iter(|| remove_and_write_n_entries(&leveldb_db, 100, String::from(LARGE_VAL))) }); + /*group.bench_function("mem", |b| { + b.iter(|| remove_and_write_n_entries(&mem_db, 100, String::from(LARGE_VAL))) + });*/ group.bench_function("sled", |b| { - b.iter(|| write_n_entries::<Sled>(&sled_db, 100, String::from(LARGE_VAL))) + b.iter(|| remove_and_write_n_entries(&sled_db, 100, String::from(LARGE_VAL))) }); group.finish(); + // Prepare read test + write_n_entries(&lmdb_db, 100, String::from(LARGE_VAL)); + write_n_entries(&leveldb_db, 100, String::from(LARGE_VAL)); + //write_n_entries(&mem_db, 100, String::from(LARGE_VAL)); + write_n_entries(&sled_db, 100, String::from(LARGE_VAL)); + // Test read large values let mut group = c.benchmark_group("read large values"); - group.plot_config(read_chart_config); + //group.plot_config(read_chart_config); + group.bench_function("lmdb", |b| { + b.iter(|| read_n_entries(&lmdb_db, 100, String::from(LARGE_VAL))) + }); group.bench_function("leveldb", |b| { b.iter(|| read_n_entries(&leveldb_db, 100, String::from(LARGE_VAL))) }); - group.bench_function("mem", |b| { + /*group.bench_function("mem", |b| { b.iter(|| read_n_entries(&mem_db, 100, String::from(LARGE_VAL))) - }); + });*/ group.bench_function("sled", |b| { b.iter(|| read_n_entries(&sled_db, 100, String::from(LARGE_VAL))) }); group.finish(); // Close DBs - std::fs::remove_dir_all(LEVELDB_DIR_PATH).expect("fail to create leveldb dir"); + std::fs::remove_dir_all(LEVELDB_DIR_PATH).expect("fail to remove leveldb dir"); + std::fs::remove_dir_all(LMDB_DIR_PATH).expect("fail to remove lmdb dir"); + std::fs::remove_dir_all(SLED_DIR_PATH).expect("fail to remove sled dir"); } criterion_group!(benches, benchmark); diff --git a/rust-libs/tools/kv_typed/src/as_bytes.rs b/rust-libs/tools/kv_typed/src/as_bytes.rs index f35ad1f2b9494a429b4c88cf8a2d13db9aeab108..0ba24d8e07edab661687ce0010693134924f3394 100644 --- a/rust-libs/tools/kv_typed/src/as_bytes.rs +++ b/rust-libs/tools/kv_typed/src/as_bytes.rs @@ -2,20 +2,63 @@ use crate::*; pub trait KeyAsBytes { fn as_bytes<T, F: FnMut(&[u8]) -> T>(&self, f: F) -> T; + fn fill_bytes_size(&self) -> Option<usize> { + None + } + fn fill_bytes(&self, _: &mut [u8]) { + unimplemented!() + } } pub trait ValueAsBytes { fn as_bytes<T, F: FnMut(&[u8]) -> Result<T, KvError>>(&self, f: F) -> Result<T, KvError>; + fn fill_bytes_size(&self) -> Option<usize> { + None + } + fn fill_bytes(&self, _: &mut [u8]) { + unimplemented!() + } +} + +impl KeyAsBytes for () { + fn as_bytes<T, F: FnMut(&[u8]) -> T>(&self, mut f: F) -> T { + f(&[]) + } + fn fill_bytes_size(&self) -> Option<usize> { + Some(0) + } + fn fill_bytes(&self, _: &mut [u8]) {} +} +impl ValueAsBytes for () { + fn as_bytes<T, F: FnMut(&[u8]) -> Result<T, KvError>>(&self, mut f: F) -> Result<T, KvError> { + f(&[]) + } + fn fill_bytes_size(&self) -> Option<usize> { + Some(0) + } + fn fill_bytes(&self, _: &mut [u8]) {} } impl KeyAsBytes for String { fn as_bytes<T, F: FnMut(&[u8]) -> T>(&self, mut f: F) -> T { f(self.as_bytes()) } + fn fill_bytes_size(&self) -> Option<usize> { + Some(self.len()) + } + fn fill_bytes(&self, buffer: &mut [u8]) { + buffer.copy_from_slice(self.as_bytes()) + } } impl ValueAsBytes for String { fn as_bytes<T, F: FnMut(&[u8]) -> Result<T, KvError>>(&self, mut f: F) -> Result<T, KvError> { f(self.as_bytes()) } + fn fill_bytes_size(&self) -> Option<usize> { + Some(self.len()) + } + fn fill_bytes(&self, buffer: &mut [u8]) { + buffer.copy_from_slice(self.as_bytes()) + } } impl<T> ValueAsBytes for Vec<T> @@ -26,6 +69,13 @@ where use zerocopy::AsBytes as _; f((&self[..]).as_bytes()) } + fn fill_bytes_size(&self) -> Option<usize> { + Some(self.len() * std::mem::size_of::<T>()) + } + fn fill_bytes(&self, buffer: &mut [u8]) { + use zerocopy::AsBytes as _; + buffer.copy_from_slice((&self[..]).as_bytes()) + } } macro_rules! impl_as_bytes_for_smallvec { @@ -38,6 +88,11 @@ macro_rules! impl_as_bytes_for_smallvec { use zerocopy::AsBytes as _; f((&self[..]).as_bytes()) } + fn fill_bytes_size(&self) -> Option<usize> { Some(self.len() * std::mem::size_of::<T>()) } + fn fill_bytes(&self, buffer: &mut [u8]) { + use zerocopy::AsBytes as _; + buffer.copy_from_slice((&self[..]).as_bytes()) + } } )*}; } @@ -67,12 +122,12 @@ macro_rules! impl_as_bytes_for_numbers { ($($T:ty),*) => {$( impl KeyAsBytes for $T { fn as_bytes<T, F: FnMut(&[u8]) -> T>(&self, mut f: F) -> T { - f(&self.to_be_bytes()[..]) + f(&self.to_le_bytes()[..]) } } impl ValueAsBytes for $T { fn as_bytes<T, F: FnMut(&[u8]) -> Result<T, KvError>>(&self, mut f: F) -> Result<T, KvError> { - f(&self.to_be_bytes()[..]) + f(&self.to_le_bytes()[..]) } } )*}; diff --git a/rust-libs/tools/kv_typed/src/backend.rs b/rust-libs/tools/kv_typed/src/backend.rs index a8fcc35070a90ca956f81ebfa8ca1d14b8adda75..81af043ea130184169c2f14a36ae2d0264727498 100644 --- a/rust-libs/tools/kv_typed/src/backend.rs +++ b/rust-libs/tools/kv_typed/src/backend.rs @@ -17,6 +17,8 @@ #[cfg(feature = "leveldb_backend")] pub mod leveldb; +#[cfg(target_arch = "x86_64")] +pub mod lmdb; #[cfg(feature = "memory_backend")] pub mod memory; #[cfg(feature = "mock")] @@ -26,22 +28,6 @@ pub mod sled; use crate::*; -pub trait TransactionalBackend<DbReader: From<Vec<Self::TxCol>>, DbWriter: From<Vec<Self::TxCol>>>: - Backend -{ - type Err: Error + Send + Sync + 'static; - type TxCol: BackendCol; - - fn read<A: Debug, D, F: Fn(&DbReader) -> TransactionResult<D, A, Self::Err>>( - &self, - f: F, - ) -> TransactionResult<D, A, Self::Err>; - fn write<A: Debug, F: Fn(&DbWriter) -> TransactionResult<(), A, Self::Err>>( - &self, - f: F, - ) -> TransactionResult<(), A, Self::Err>; -} - pub trait Backend: 'static + Clone + Sized { const NAME: &'static str; type Col: BackendCol; @@ -51,12 +37,11 @@ pub trait Backend: 'static + Clone + Sized { fn open_col(&mut self, conf: &Self::Conf, col_name: &str) -> KvResult<Self::Col>; } -pub trait BackendCol: 'static + Clone + Send + Sync { +pub trait BackendCol: 'static + Clone + Debug + Send + Sync { type Batch: BackendBatch; - type KeyBytes: AsRef<[u8]>; - type ValueBytes: AsRef<[u8]>; - type Iter: Iterator<Item = Result<(Self::KeyBytes, Self::ValueBytes), DynErr>> - + ReversableIterator; + type KeyBytes: KeyBytes; + type ValueBytes: ValueBytes; + type Iter: BackendIter<Self::KeyBytes, Self::ValueBytes>; fn get<K: Key, V: Value>(&self, k: &K) -> KvResult<Option<V>>; fn get_ref<K: Key, V: ValueZc, D, F: Fn(&V::Ref) -> KvResult<D>>( @@ -69,18 +54,30 @@ pub trait BackendCol: 'static + Clone + Send + Sync { k: &K, f: F, ) -> KvResult<Option<D>>; - fn clear(&self) -> KvResult<()>; + fn clear(&mut self) -> KvResult<()>; fn count(&self) -> KvResult<usize>; fn iter<K: Key, V: Value>(&self, range: RangeBytes) -> Self::Iter; - fn put<K: Key, V: Value>(&self, k: &K, value: &V) -> KvResult<()>; - fn delete<K: Key>(&self, k: &K) -> KvResult<()>; + fn put<K: Key, V: Value>(&mut self, k: &K, value: &V) -> KvResult<()>; + fn delete<K: Key>(&mut self, k: &K) -> KvResult<()>; fn new_batch() -> Self::Batch; - fn write_batch(&self, inner_batch: Self::Batch) -> KvResult<()>; + fn write_batch(&mut self, inner_batch: Self::Batch) -> KvResult<()>; fn save(&self) -> KvResult<()>; } +pub trait BackendIter<K: KeyBytes, V: ValueBytes>: + Iterator<Item = Result<(K, V), DynErr>> + ReversableIterator +{ +} + #[cfg_attr(feature = "mock", mockall::automock)] -pub trait BackendBatch: Default { +pub trait BackendBatch: Debug + Default { fn upsert(&mut self, k: &[u8], v: &[u8]); fn remove(&mut self, k: &[u8]); } + +#[cfg(feature = "mock")] +impl Debug for MockBackendBatch { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } +} diff --git a/rust-libs/tools/kv_typed/src/backend/leveldb.rs b/rust-libs/tools/kv_typed/src/backend/leveldb.rs index d9c7b9a904215c303d5f215a338c35782cfb3071..d0d710b5c9fc22a8333e25e1b23e626307d14cf5 100644 --- a/rust-libs/tools/kv_typed/src/backend/leveldb.rs +++ b/rust-libs/tools/kv_typed/src/backend/leveldb.rs @@ -57,27 +57,53 @@ impl Debug for LevelDbCol { } } -impl BackendBatch for WriteBatch { +#[derive(Default)] +pub struct LevelDbBatch(WriteBatch); + +impl Debug for LevelDbBatch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LevelDbBatch") + .field("0", &"WriteBatch") + .finish() + } +} + +impl BackendBatch for LevelDbBatch { fn upsert(&mut self, k: &[u8], v: &[u8]) { - self.put(k, v) + self.0.put(k, v) } fn remove(&mut self, k: &[u8]) { - self.delete(k) + self.0.delete(k) + } +} + +#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] +pub struct LevelDbBytes(Vec<u8>); +impl AsRef<[u8]> for LevelDbBytes { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} +impl FromBytes for LevelDbBytes { + type Err = std::convert::Infallible; + + fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Err> { + Ok(Self(bytes.into())) } } impl BackendCol for LevelDbCol { - type Batch = WriteBatch; - type KeyBytes = Vec<u8>; - type ValueBytes = Vec<u8>; + type Batch = LevelDbBatch; + type KeyBytes = LevelDbBytes; + type ValueBytes = LevelDbBytes; type Iter = LevelDbIter; #[inline(always)] fn new_batch() -> Self::Batch { - WriteBatch::default() + LevelDbBatch(WriteBatch::default()) } - fn clear(&self) -> KvResult<()> { + fn clear(&mut self) -> KvResult<()> { let keys = self .0 .iter(ReadOptions::new()) @@ -159,20 +185,20 @@ impl BackendCol for LevelDbCol { }) } #[inline(always)] - fn delete<K: Key>(&self, k: &K) -> KvResult<()> { + fn delete<K: Key>(&mut self, k: &K) -> KvResult<()> { k.as_bytes(|k_bytes| self.0.delete(WriteOptions::new(), k_bytes))?; Ok(()) } #[inline(always)] - fn put<K: Key, V: Value>(&self, k: &K, value: &V) -> KvResult<()> { + fn put<K: Key, V: Value>(&mut self, k: &K, value: &V) -> KvResult<()> { value.as_bytes(|value_bytes| { k.as_bytes(|k_bytes| self.0.put(WriteOptions::new(), k_bytes, value_bytes))?; Ok(()) }) } #[inline(always)] - fn write_batch(&self, inner_batch: Self::Batch) -> KvResult<()> { - self.0.write(WriteOptions::new(), &inner_batch)?; + fn write_batch(&mut self, inner_batch: Self::Batch) -> KvResult<()> { + self.0.write(WriteOptions::new(), &inner_batch.0)?; Ok(()) } #[inline(always)] @@ -195,11 +221,13 @@ impl Debug for LevelDbIter { } impl Iterator for LevelDbIter { - type Item = Result<(Vec<u8>, Vec<u8>), DynErr>; + type Item = Result<(LevelDbBytes, LevelDbBytes), DynErr>; #[inline(always)] fn next(&mut self) -> Option<Self::Item> { - self.0.next().map(Ok) + self.0 + .next() + .map(|(k, v)| Ok((LevelDbBytes(k), LevelDbBytes(v)))) } } impl ReversableIterator for LevelDbIter { @@ -208,6 +236,7 @@ impl ReversableIterator for LevelDbIter { Self(self.0.reverse()) } } +impl BackendIter<LevelDbBytes, LevelDbBytes> for LevelDbIter {} #[derive(Clone, Debug)] /// leveldb configuration diff --git a/rust-libs/tools/kv_typed/src/backend/lmdb.rs b/rust-libs/tools/kv_typed/src/backend/lmdb.rs new file mode 100644 index 0000000000000000000000000000000000000000..64dcd1f8e42801111c055141b3289298b1eb3de7 --- /dev/null +++ b/rust-libs/tools/kv_typed/src/backend/lmdb.rs @@ -0,0 +1,370 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Memory backend for KV Typed, + +use crate::*; +use lmdb::{traits::CreateCursor as _, LmdbResultExt as _}; +use lmdb_zero as lmdb; +use std::path::PathBuf; + +#[derive(Clone, Copy, Debug)] +/// Be careful with this backend +/// LMDB does not support multiple iterators in the same thread. So you need to make sure that : +/// 1. Any iterator must be drop before any new call to the `iter()` method. +/// 2. If you are in an asynchronous context, an async task should never yield when it to an instantiated iterator. +pub struct Lmdb; + +#[derive(Clone, Debug, Default)] +pub struct LmdbConf { + folder_path: PathBuf, +} + +impl LmdbConf { + pub fn folder_path(mut self, folder_path: PathBuf) -> Self { + self.folder_path = folder_path; + self + } +} + +impl Backend for Lmdb { + const NAME: &'static str = "lmdb"; + type Col = LmdbCol; + type Conf = LmdbConf; + + fn open(conf: &Self::Conf) -> KvResult<Self> { + std::fs::create_dir_all(conf.folder_path.as_path())?; + Ok(Lmdb) + } + fn open_col(&mut self, conf: &Self::Conf, col_name: &str) -> KvResult<Self::Col> { + let path: PathBuf = conf.folder_path.join(col_name); + let exist = path.as_path().exists(); + if !exist { + std::fs::create_dir(path.as_path())?; + } + let path = path + .into_os_string() + .into_string() + .expect("Invalid DB path"); + let mut env_flags = lmdb::open::Flags::empty(); + env_flags.insert(lmdb::open::WRITEMAP); + env_flags.insert(lmdb::open::MAPASYNC); + env_flags.insert(lmdb::open::NOLOCK); + let col_options = if exist { + lmdb::DatabaseOptions::defaults() + } else { + lmdb::DatabaseOptions::new(lmdb::db::CREATE) + }; + let env = + std::sync::Arc::new(unsafe { lmdb::EnvBuilder::new()?.open(&path, env_flags, 0o600)? }); + let tree = std::sync::Arc::new(lmdb::Database::open(env.clone(), None, &col_options)?); + Ok(LmdbCol(LmdbColInner { env, tree })) + } +} + +#[derive(Clone, Debug)] +pub struct LmdbCol(LmdbColInner); + +#[derive(Clone, Debug)] +struct LmdbColInner { + env: std::sync::Arc<lmdb::Environment>, + tree: std::sync::Arc<lmdb::Database<'static>>, +} + +#[derive(Debug, Default)] +pub struct LmdbBatch { + upsert_ops: Vec<(IVec, IVec)>, + remove_ops: Vec<IVec>, +} + +impl BackendBatch for LmdbBatch { + fn upsert(&mut self, k: &[u8], v: &[u8]) { + self.upsert_ops.push((k.into(), v.into())); + } + + fn remove(&mut self, k: &[u8]) { + self.remove_ops.push(k.into()); + } +} + +#[derive(Debug)] +struct LmdbIterAccess { + env: std::sync::Arc<lmdb::Environment>, + access: lmdb::ConstAccessor<'static>, + tree: std::sync::Arc<lmdb::Database<'static>>, + tx: lmdb::ReadTransaction<'static>, +} + +#[derive(Debug)] +pub struct LmdbIter { + access: Arc<LmdbIterAccess>, + cursor: lmdb::Cursor<'static, 'static>, + reversed: bool, + started: bool, +} + +impl LmdbIter { + fn new( + env: std::sync::Arc<lmdb::Environment>, + tree: std::sync::Arc<lmdb::Database<'static>>, + ) -> Self { + let tx = lmdb::ReadTransaction::new(env.clone()).expect("fail to read DB"); + let tx_static: &'static lmdb::ReadTransaction<'static> = + unsafe { std::mem::transmute(&tx) }; + let access = tx_static.access(); + let cursor = tx_static + .cursor(tree.clone()) + .expect("fail to create DB cursor"); + LmdbIter { + access: Arc::new(LmdbIterAccess { + access, + env, + tree, + tx, + }), + cursor, + reversed: false, + started: false, + } + } +} + +impl Iterator for LmdbIter { + type Item = Result<(&'static [u8], &'static [u8]), DynErr>; + + fn next(&mut self) -> Option<Self::Item> { + if self.reversed { + if self.started { + match self + .cursor + .prev::<[u8], [u8]>(unsafe { + // # Safety + // Lifetime of accessor is used to track db and lmdb_tx lifetimes: These are already static. + // It's safe because the byte references will be transformed into K and V owned types before + // being exposed to the user API. + std::mem::transmute(&self.access.access) + }) + .to_opt() + { + Ok(Some((k, v))) => Some(Ok((k, v))), + Ok(None) => None, + Err(e) => Some(Err(e.into())), + } + } else { + self.started = true; + match self + .cursor + .last::<[u8], [u8]>(unsafe { + // # Safety + // Lifetime of accessor is used to track db and lmdb_tx lifetimes: These are already static. + // It's safe because the byte references will be transformed into K and V owned types before + // being exposed to the user API. + std::mem::transmute(&self.access.access) + }) + .to_opt() + { + Ok(Some((k, v))) => Some(Ok((k, v))), + Ok(None) => None, + Err(e) => Some(Err(e.into())), + } + } + } else if self.started { + match self + .cursor + .next::<[u8], [u8]>(unsafe { + // # Safety + // Lifetime of accessor is used to track db and lmdb_tx lifetimes: These are already static. + // It's safe because the byte references will be transformed into K and V owned types before + // being exposed to the user API. + std::mem::transmute(&self.access.access) + }) + .to_opt() + { + Ok(Some((k, v))) => Some(Ok((k, v))), + Ok(None) => None, + Err(e) => Some(Err(e.into())), + } + } else { + self.started = true; + match self + .cursor + .first::<[u8], [u8]>(unsafe { + // # Safety + // Lifetime of accessor is used to track db and lmdb_tx lifetimes: These are already static. + // It's safe because the byte references will be transformed into K and V owned types before + // being exposed to the user API. + std::mem::transmute(&self.access.access) + }) + .to_opt() + { + Ok(Some((k, v))) => Some(Ok((k, v))), + Ok(None) => None, + Err(e) => Some(Err(e.into())), + } + } + } +} + +impl ReversableIterator for LmdbIter { + fn reverse(mut self) -> Self { + self.reversed = true; + self + } +} + +impl BackendIter<&'static [u8], &'static [u8]> for LmdbIter {} + +impl BackendCol for LmdbCol { + type Batch = LmdbBatch; + type KeyBytes = &'static [u8]; + type ValueBytes = &'static [u8]; + type Iter = LmdbIter; + + fn get<K: Key, V: Value>(&self, k: &K) -> KvResult<Option<V>> { + let tx = lmdb::ReadTransaction::new(self.0.tree.env())?; + let access = tx.access(); + k.as_bytes(|k_bytes| { + access + .get(&self.0.tree, k_bytes) + .to_opt()? + .map(|bytes| { + V::from_bytes(&bytes).map_err(|e| KvError::DeserError(format!("{}", e))) + }) + .transpose() + }) + } + + fn get_ref<K: Key, V: ValueZc, D, F: Fn(&V::Ref) -> KvResult<D>>( + &self, + k: &K, + f: F, + ) -> KvResult<Option<D>> { + k.as_bytes(|k_bytes| { + let tx = lmdb::ReadTransaction::new(self.0.tree.env())?; + let access = tx.access(); + access + .get::<_, [u8]>(&self.0.tree, k_bytes) + .to_opt()? + .map(|bytes| { + if let Some(layout_verified) = zerocopy::LayoutVerified::<_, V::Ref>::new(bytes) + { + f(&layout_verified) + } else { + Err(KvError::DeserError( + "Bytes are invalid length or alignment.".to_owned(), + )) + } + }) + .transpose() + }) + } + + fn get_ref_slice<K: Key, V: ValueSliceZc, D, F: Fn(&[V::Elem]) -> KvResult<D>>( + &self, + k: &K, + f: F, + ) -> KvResult<Option<D>> { + k.as_bytes(|k_bytes| { + let tx = lmdb::ReadTransaction::new(self.0.tree.env())?; + let access = tx.access(); + access + .get::<_, [u8]>(&self.0.tree, k_bytes) + .to_opt()? + .map(|bytes| { + if let Some(layout_verified) = + zerocopy::LayoutVerified::<_, [V::Elem]>::new_slice( + &bytes[V::prefix_len()..], + ) + { + f(&layout_verified) + } else { + Err(KvError::DeserError( + "Bytes are invalid length or alignment.".to_owned(), + )) + } + }) + .transpose() + }) + } + + fn clear(&mut self) -> KvResult<()> { + let tx = lmdb::WriteTransaction::new(self.0.tree.env())?; + { + let mut access = tx.access(); + access.clear_db(&self.0.tree)?; + } + tx.commit()?; + Ok(()) + } + + fn count(&self) -> KvResult<usize> { + let tx = lmdb::ReadTransaction::new(self.0.tree.env())?; + Ok(tx.db_stat(&self.0.tree)?.entries) + } + + fn iter<K: Key, V: Value>(&self, _range: RangeBytes) -> Self::Iter { + LmdbIter::new(self.0.env.clone(), self.0.tree.clone()) + } + + fn put<K: Key, V: Value>(&mut self, k: &K, value: &V) -> KvResult<()> { + value.as_bytes(|v_bytes| { + let tx = lmdb::WriteTransaction::new(self.0.tree.env())?; + k.as_bytes(|k_bytes| { + let mut access = tx.access(); + access.put(&self.0.tree, k_bytes, v_bytes, lmdb::put::Flags::empty()) + })?; + tx.commit()?; + Ok(()) + }) + } + + fn delete<K: Key>(&mut self, k: &K) -> KvResult<()> { + let tx = lmdb::WriteTransaction::new(self.0.tree.env())?; + k.as_bytes(|k_bytes| { + let mut access = tx.access(); + access.del_key(&self.0.tree, k_bytes).to_opt() + })?; + tx.commit()?; + Ok(()) + } + + fn new_batch() -> Self::Batch { + LmdbBatch::default() + } + + fn write_batch(&mut self, inner_batch: Self::Batch) -> KvResult<()> { + let tx = lmdb::WriteTransaction::new(self.0.tree.env())?; + { + let mut access = tx.access(); + for (k, v) in inner_batch.upsert_ops { + access.put( + &self.0.tree, + k.as_ref(), + v.as_ref(), + lmdb::put::Flags::empty(), + )?; + } + for k in inner_batch.remove_ops { + access.del_key(&self.0.tree, k.as_ref()).to_opt()?; + } + } + tx.commit()?; + Ok(()) + } + + fn save(&self) -> KvResult<()> { + Ok(self.0.tree.env().sync(true)?) + } +} diff --git a/rust-libs/tools/kv_typed/src/backend/memory.rs b/rust-libs/tools/kv_typed/src/backend/memory.rs index da8fd227c4a135d5140bc988edac80e9887f4bca..d5e57e487e202087b491960b3bb10edda96cab50 100644 --- a/rust-libs/tools/kv_typed/src/backend/memory.rs +++ b/rust-libs/tools/kv_typed/src/backend/memory.rs @@ -16,23 +16,20 @@ //! Memory backend for KV Typed, use crate::*; -use parking_lot::{RwLock, RwLockReadGuard}; use std::collections::BTreeMap; +use uninit::extension_traits::VecCapacity as _; #[derive(Clone, Copy, Debug)] pub struct Mem; -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct MemConf { - // Allows to prevent `MemConf` from being instantiated without using the `Default` trait. - // Thus the eventual addition of a field in the future will not be a breaking change. - phantom: PhantomData<()>, + folder_path: Option<std::path::PathBuf>, } type KeyBytes = IVec; type ValueBytes = IVec; -type Map = BTreeMap<KeyBytes, ValueBytes>; -type ArcSharedMap = Arc<RwLock<Map>>; +type Tree = BTreeMap<KeyBytes, ValueBytes>; impl Backend for Mem { const NAME: &'static str = "mem"; @@ -42,15 +39,22 @@ impl Backend for Mem { fn open(_conf: &Self::Conf) -> KvResult<Self> { Ok(Mem) } - fn open_col(&mut self, _conf: &Self::Conf, _col_name: &str) -> KvResult<Self::Col> { - Ok(MemCol(Arc::new(RwLock::new(BTreeMap::new())))) + fn open_col(&mut self, conf: &Self::Conf, col_name: &str) -> KvResult<Self::Col> { + if let Some(ref folder_path) = conf.folder_path { + MemCol::from_file(folder_path.join(col_name)) + } else { + Ok(MemCol { + path: None, + tree: BTreeMap::new(), + }) + } } } #[derive(Debug, Default)] pub struct MemBatch { - upsert_ops: Vec<(KeyBytes, ValueBytes)>, - remove_ops: Vec<KeyBytes>, + upsert_ops: Vec<(IVec, IVec)>, + remove_ops: Vec<IVec>, } impl BackendBatch for MemBatch { @@ -64,7 +68,158 @@ impl BackendBatch for MemBatch { } #[derive(Clone, Debug)] -pub struct MemCol(ArcSharedMap); +pub struct MemCol { + path: Option<std::path::PathBuf>, + tree: Tree, +} + +impl MemCol { + fn from_file(file_path: std::path::PathBuf) -> KvResult<Self> { + let mut file = std::fs::File::open(file_path.as_path())?; + let bytes = Vec::<u8>::new(); + if file.metadata()?.len() > 0 { + let mut bytes = Vec::new(); + use std::io::Read as _; + file.read_to_end(&mut bytes)?; + } + + Ok(MemCol { + path: Some(file_path), + tree: Self::tree_from_bytes(&bytes)?, + }) + } + fn tree_from_bytes(bytes: &[u8]) -> KvResult<BTreeMap<IVec, IVec>> { + let mut tree = BTreeMap::new(); + + if bytes.len() < 32 { + return Err(KvError::BackendError( + StringErr("Corrupted tree".to_owned()).into(), + )); + } else { + let hash = blake3::hash(&bytes[32..]); + if hash.as_bytes() != &bytes[..32] { + return Err(KvError::BackendError( + StringErr("Corrupted tree: wrong hash".to_owned()).into(), + )); + } + }; + + let mut len = 32; + while len < bytes.len() { + let len_add_4 = len + 4; + if bytes.len() >= len_add_4 { + let mut k_len = [0u8; 4]; + k_len.copy_from_slice(&bytes[len..len_add_4]); + let k_len = u32::from_le_bytes(k_len); + len = len_add_4 + k_len as usize; + + if bytes.len() >= len { + let k = IVec::from(&bytes[len_add_4..len]); + + if bytes.len() >= len_add_4 { + let len_add_4 = len + 4; + let mut v_len = [0u8; 4]; + v_len.copy_from_slice(&bytes[len..len_add_4]); + let v_len = u32::from_le_bytes(v_len); + len = len_add_4 + v_len as usize; + + if bytes.len() >= len { + let v = IVec::from(&bytes[len_add_4..len]); + + tree.insert(k, v); + } else { + return Err(KvError::BackendError( + StringErr("Corrupted tree".to_owned()).into(), + )); + } + } else { + return Err(KvError::BackendError( + StringErr("Corrupted tree".to_owned()).into(), + )); + } + } else { + return Err(KvError::BackendError( + StringErr("Corrupted tree".to_owned()).into(), + )); + } + } else { + return Err(KvError::BackendError( + StringErr("Corrupted tree".to_owned()).into(), + )); + } + } + + Ok(tree) + } + fn tree_to_bytes(tree: &BTreeMap<IVec, IVec>) -> Vec<u8> { + let mut bytes = Vec::with_capacity(tree.len() * 20); + let mut len = 32; + for (k, v) in tree.iter() { + // Write key len + let k_len = (k.len() as u32).to_le_bytes(); + let len_add_4 = len + 4; + bytes.reserve_uninit(4).r().copy_from_slice(&k_len[..]); + unsafe { + // # Safety + // + // - `.copy_from_slice()` contract guarantees initialization + // of 4 additional bytes, which, in turn, from `reserve_uninit`'s contract, + // leads to the `vec` extra capacity having been initialized. + bytes.set_len(len_add_4) + } + + // Write key content + bytes + .reserve_uninit(k.len()) + .r() + .copy_from_slice(k.as_ref()); + let new_len = len_add_4 + k.len(); + unsafe { + // # Safety + // + // - `.copy_from_slice()` contract guarantees initialization + // of `k.len()` additional bytes, which, in turn, from `reserve_uninit`'s contract, + // leads to the `vec` extra capacity having been initialized. + bytes.set_len(new_len) + } + len = new_len; + + // Write value len + let v_len = (v.len() as u32).to_le_bytes(); + let len_add_4 = len + 4; + bytes.reserve_uninit(4).r().copy_from_slice(&v_len[..]); + unsafe { + // # Safety + // + // - `.copy_from_slice()` contract guarantees initialization + // of 4 additional bytes, which, in turn, from `reserve_uninit`'s contract, + // leads to the `vec` extra capacity having been initialized. + bytes.set_len(len_add_4) + } + + // Write value content + bytes + .reserve_uninit(v.len()) + .r() + .copy_from_slice(v.as_ref()); + let new_len = len_add_4 + v.len(); + unsafe { + // # Safety + // + // - `.copy_from_slice()` contract guarantees initialization + // of `v.len()` additional bytes, which, in turn, from `reserve_uninit`'s contract, + // leads to the `vec` extra capacity having been initialized. + bytes.set_len(new_len) + } + len = new_len; + } + + let hash = blake3::hash(&bytes[32..]); + (&mut bytes[..32]).copy_from_slice(hash.as_bytes()); + + bytes + } +} impl BackendCol for MemCol { type Batch = MemBatch; @@ -77,21 +232,18 @@ impl BackendCol for MemCol { MemBatch::default() } #[inline(always)] - fn clear(&self) -> KvResult<()> { - let mut writer = self.0.write(); - writer.clear(); + fn clear(&mut self) -> KvResult<()> { + self.tree.clear(); Ok(()) } #[inline(always)] fn count(&self) -> KvResult<usize> { - let reader = self.0.read(); - Ok(reader.len()) + Ok(self.tree.len()) } #[inline(always)] fn get<K: Key, V: Value>(&self, k: &K) -> KvResult<Option<V>> { k.as_bytes(|k_bytes| { - let reader = self.0.read(); - reader + self.tree .get(k_bytes) .map(|bytes| { V::from_bytes(&bytes).map_err(|e| KvError::DeserError(format!("{}", e))) @@ -106,8 +258,7 @@ impl BackendCol for MemCol { f: F, ) -> KvResult<Option<D>> { k.as_bytes(|k_bytes| { - let reader = self.0.read(); - reader + self.tree .get(k_bytes) .map(|bytes| { if let Some(layout_verified) = @@ -130,8 +281,7 @@ impl BackendCol for MemCol { f: F, ) -> KvResult<Option<D>> { k.as_bytes(|k_bytes| { - let reader = self.0.read(); - reader + self.tree .get(k_bytes) .map(|bytes| { if let Some(layout_verified) = @@ -150,88 +300,70 @@ impl BackendCol for MemCol { }) } #[inline(always)] - fn delete<K: Key>(&self, k: &K) -> KvResult<()> { - k.as_bytes(|k_bytes| { - let mut writer = self.0.write(); - writer.remove(k_bytes) - }); + fn delete<K: Key>(&mut self, k: &K) -> KvResult<()> { + k.as_bytes(|k_bytes| self.tree.remove(k_bytes)); Ok(()) } #[inline(always)] - fn put<K: Key, V: Value>(&self, k: &K, value: &V) -> KvResult<()> { + fn put<K: Key, V: Value>(&mut self, k: &K, value: &V) -> KvResult<()> { value.as_bytes(|value_bytes| { k.as_bytes(|k_bytes| { - let mut writer = self.0.write(); - writer.insert(k_bytes.into(), value_bytes.into()); + self.tree.insert(k_bytes.into(), value_bytes.into()); }); Ok(()) }) } #[inline(always)] - fn write_batch(&self, inner_batch: Self::Batch) -> KvResult<()> { - let mut writer = self.0.write(); + fn write_batch(&mut self, inner_batch: Self::Batch) -> KvResult<()> { for (k, v) in inner_batch.upsert_ops { - writer.insert(k, v); + self.tree.insert(k, v); } for k in inner_batch.remove_ops { - writer.remove(&k); + self.tree.remove(&k); } Ok(()) } #[inline(always)] fn iter<K: Key, V: Value>(&self, range: RangeBytes) -> Self::Iter { - let map_shared_arc = self.0.clone(); - let map_shared_ref = map_shared_arc.as_ref(); - - let reader = map_shared_ref.read(); - self.iter_inner(range, reader) + MemIter::new(unsafe { + // # Safety + // On front API, the iterator is given to a closure executed inside of a `ColRo` method, + // so that ensure borrowed tree keep alive + std::mem::transmute(self.tree.range(range)) + }) } #[inline(always)] fn save(&self) -> KvResult<()> { + if let Some(ref file_path) = self.path { + let bytes = Self::tree_to_bytes(&self.tree); + + let mut file = + std::fs::File::create(file_path).map_err(|e| KvError::BackendError(e.into()))?; + use std::io::Write as _; + file.write_all(&bytes[..]) + .map_err(|e| KvError::BackendError(e.into()))?; + } + Ok(()) } } -impl MemCol { - fn iter_inner( - &self, - range: RangeBytes, - reader: RwLockReadGuard<BTreeMap<KeyBytes, ValueBytes>>, - ) -> MemIter { - let reader = unsafe { - std::mem::transmute::< - RwLockReadGuard<BTreeMap<KeyBytes, ValueBytes>>, - RwLockReadGuard<'static, BTreeMap<KeyBytes, ValueBytes>>, - >(reader) - }; - let reader_ref = unsafe { - std::mem::transmute::< - &RwLockReadGuard<BTreeMap<KeyBytes, ValueBytes>>, - &'static RwLockReadGuard<'static, BTreeMap<KeyBytes, ValueBytes>>, - >(&reader) - }; - let iter = reader_ref.range(range); +pub struct MemIter { + iter: std::collections::btree_map::Range<'static, KeyBytes, ValueBytes>, + reversed: bool, +} +impl MemIter { + fn new( + tree_iter: std::collections::btree_map::Range<'static, KeyBytes, ValueBytes>, + ) -> MemIter { MemIter { - col: self.clone(), - reader: Some(reader), - iter, + iter: tree_iter, reversed: false, } } } -pub struct MemIter { - #[allow(dead_code)] - // Needed for safety - col: MemCol, - #[allow(dead_code)] - // Needed for safety - reader: Option<RwLockReadGuard<'static, BTreeMap<KeyBytes, ValueBytes>>>, - iter: std::collections::btree_map::Range<'static, KeyBytes, ValueBytes>, - reversed: bool, -} - impl Debug for MemIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MemIter").field("0", &"???").finish() @@ -258,3 +390,41 @@ impl ReversableIterator for MemIter { self } } + +impl BackendIter<IVec, IVec> for MemIter {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_save() -> KvResult<()> { + let mut tree = BTreeMap::new(); + + let k1 = IVec::from(&[1, 2, 3]); + let v1 = IVec::from(&[1, 2, 3, 4, 5]); + let k2 = IVec::from(&[1, 2]); + let v2 = IVec::from(&[]); + let k3 = IVec::from(&[1, 2, 3, 4, 5, 6, 7]); + let v3 = IVec::from(&[1, 2, 3, 4, 5, 6]); + let k4 = IVec::from(&[]); + let v4 = IVec::from(&[1, 2, 3, 4, 5, 6, 7]); + + tree.insert(k1.clone(), v1.clone()); + tree.insert(k2.clone(), v2.clone()); + tree.insert(k3.clone(), v3.clone()); + tree.insert(k4.clone(), v4.clone()); + + let bytes = MemCol::tree_to_bytes(&tree); + + let tree2 = MemCol::tree_from_bytes(&bytes)?; + + assert_eq!(tree2.len(), 4); + assert_eq!(tree2.get(&k1), Some(&v1)); + assert_eq!(tree2.get(&k2), Some(&v2)); + assert_eq!(tree2.get(&k3), Some(&v3)); + assert_eq!(tree2.get(&k4), Some(&v4)); + + Ok(()) + } +} diff --git a/rust-libs/tools/kv_typed/src/backend/mock.rs b/rust-libs/tools/kv_typed/src/backend/mock.rs index a18c140022c42afc3fd297e6d3c57ec44c52e7f0..0d479a398f5caa7256334402b360790e937765b2 100644 --- a/rust-libs/tools/kv_typed/src/backend/mock.rs +++ b/rust-libs/tools/kv_typed/src/backend/mock.rs @@ -18,11 +18,10 @@ use super::MockBackendBatch; use crate::*; -#[cfg(feature = "mock")] mockall::mock! { pub BackendIter {} trait Iterator { - type Item = Result<(Vec<u8>, Vec<u8>), DynErr>; + type Item = Result<(IVec, IVec), DynErr>; fn next(&mut self) -> Option<<Self as Iterator>::Item>; } @@ -30,8 +29,8 @@ mockall::mock! { fn reverse(self) -> Self; } } +impl BackendIter<IVec, IVec> for MockBackendIter {} -#[cfg(feature = "mock")] mockall::mock! { pub BackendCol {} trait Clone { @@ -39,11 +38,21 @@ mockall::mock! { } trait BackendCol { type Batch = MockBackendBatch; - type KeyBytes = Vec<u8>; - type ValueBytes = Vec<u8>; + type KeyBytes = IVec; + type ValueBytes = IVec; type Iter = MockBackendIter; fn get<K: Key, V: Value>(&self, k: &K) -> KvResult<Option<V>>; + fn get_ref<K: Key, V: ValueZc, D, F: Fn(&V::Ref) -> KvResult<D>>( + &self, + k: &K, + f: F, + ) -> KvResult<Option<D>>; + fn get_ref_slice<K: Key, V: ValueSliceZc, D, F: Fn(&[V::Elem]) -> KvResult<D>>( + &self, + k: &K, + f: F, + ) -> KvResult<Option<D>>; fn clear(&self) -> KvResult<()>; fn count(&self) -> KvResult<usize>; fn iter<K: Key, V: Value>(&self, range: RangeBytes) -> MockBackendIter; @@ -51,10 +60,15 @@ mockall::mock! { fn delete<K: Key>(&self, k: &K) -> KvResult<()>; fn new_batch() -> MockBackendBatch; fn write_batch(&self, inner_batch: MockBackendBatch) -> KvResult<()>; + fn save(&self) -> KvResult<()>; + } +} +impl Debug for MockBackendCol { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unimplemented!() } } -#[cfg(feature = "mock")] mockall::mock! { pub Backend {} trait Clone { diff --git a/rust-libs/tools/kv_typed/src/backend/sled.rs b/rust-libs/tools/kv_typed/src/backend/sled.rs index 071fc5ce1f872af11adf8c4add826844c227d929..ffef96fcfed42239a204de2d39478b75f3b6e069 100644 --- a/rust-libs/tools/kv_typed/src/backend/sled.rs +++ b/rust-libs/tools/kv_typed/src/backend/sled.rs @@ -15,8 +15,6 @@ //! Sled backend for KV Typed, -mod transactional; - pub use sled::Config; use crate::*; @@ -69,7 +67,7 @@ impl BackendCol for SledCol { sled::Batch::default() } #[inline(always)] - fn clear(&self) -> KvResult<()> { + fn clear(&mut self) -> KvResult<()> { self.0.clear()?; Ok(()) } @@ -137,19 +135,19 @@ impl BackendCol for SledCol { }) } #[inline(always)] - fn delete<K: Key>(&self, k: &K) -> KvResult<()> { + fn delete<K: Key>(&mut self, k: &K) -> KvResult<()> { k.as_bytes(|k_bytes| self.0.remove(k_bytes))?; Ok(()) } #[inline(always)] - fn put<K: Key, V: Value>(&self, k: &K, value: &V) -> KvResult<()> { + fn put<K: Key, V: Value>(&mut self, k: &K, value: &V) -> KvResult<()> { value.as_bytes(|value_bytes| { k.as_bytes(|k_bytes| self.0.insert(k_bytes, value_bytes))?; Ok(()) }) } #[inline(always)] - fn write_batch(&self, inner_batch: Self::Batch) -> KvResult<()> { + fn write_batch(&mut self, inner_batch: Self::Batch) -> KvResult<()> { self.0.apply_batch(inner_batch)?; Ok(()) } @@ -201,3 +199,5 @@ impl ReversableIterator for SledIter { } } } + +impl BackendIter<IVec, IVec> for SledIter {} diff --git a/rust-libs/tools/kv_typed/src/backend/sled/transactional.rs b/rust-libs/tools/kv_typed/src/backend/sled/transactional.rs deleted file mode 100644 index 5ec9b43bbb8166b28c1472f8a322996805504484..0000000000000000000000000000000000000000 --- a/rust-libs/tools/kv_typed/src/backend/sled/transactional.rs +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (C) 2020 Éloïs SANCHEZ. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -//! Sled backend for KV Typed, - -/*use crate::*; -use sled::transaction::{ConflictableTransactionError, Transactional, TransactionalTree}; - -enum AbortType<A> { - User(A), - Kv(KvError), -} - -#[allow(missing_copy_implementations)] -pub struct SledTxCol(&'static TransactionalTree); - -impl Debug for SledTxCol { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SledTxCol") - .field("0", &"TransactionalTree") - .finish() - } -} - -impl<DbReader: From<Vec<SledTxCol>>, DbWriter> TransactionalBackend<DbReader, DbWriter> for Sled { - type Err = sled::Error; - type TxCol = SledTxCol; - - fn read<A: Debug, D, F: Fn(&DbReader) -> TransactionResult<D, A, Self::Err>>( - &self, - f: F, - ) -> TransactionResult<D, A, Self::Err> { - match self - .trees - .transaction::<_, D>(|tx_trees: &Vec<TransactionalTree>| { - let reader = DbReader::from( - tx_trees - .iter() - .map(|tx_tree| SledTxCol(unsafe { to_static_ref(tx_tree) })) - .collect(), - ); - f(&reader).map_err(|e| match e { - TransactionError::Abort(a) => { - ConflictableTransactionError::Abort(AbortType::User(a)) - } - TransactionError::BackendErr(e) => ConflictableTransactionError::Storage(e), - TransactionError::KvError(e) => { - ConflictableTransactionError::Abort(AbortType::Kv(e)) - } - }) - }) { - Ok(t) => Ok(t), - Err(e) => match e { - sled::transaction::TransactionError::Abort(a) => match a { - AbortType::User(a) => Err(TransactionError::Abort(a)), - AbortType::Kv(e) => Err(TransactionError::KvError(e)), - }, - sled::transaction::TransactionError::Storage(e) => { - Err(TransactionError::BackendErr(e)) - } - }, - } - } - - fn write<A: Debug, F: Fn(&DbWriter) -> TransactionResult<(), A, Self::Err>>( - &self, - _f: F, - ) -> TransactionResult<(), A, Self::Err> { - todo!() - } -}*/ diff --git a/rust-libs/tools/kv_typed/src/batch.rs b/rust-libs/tools/kv_typed/src/batch.rs index c7d3d8c64e7e70dc603be1418a89dee5b5cbcf2c..90b9738dfe577271cb043a536b180400c12db61b 100644 --- a/rust-libs/tools/kv_typed/src/batch.rs +++ b/rust-libs/tools/kv_typed/src/batch.rs @@ -14,21 +14,28 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::*; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; #[derive(Debug)] pub struct Batch<BC: BackendCol, C: DbCollectionRw> { - backend_batch: BC::Batch, - upsert_keys_bytes: BTreeSet<IVec>, + phantom: PhantomData<BC>, + pub(crate) tree: BTreeMap<IVec, Option<IVec>>, upsert_ops: HashMap<C::K, C::V>, delete_ops: HashSet<C::K>, } +#[derive(Debug, PartialEq)] +pub enum BatchGet<'v, V: Value> { + None, + Deleted, + Updated(&'v V), +} + impl<BC: BackendCol, C: DbCollectionRw> Default for Batch<BC, C> { fn default() -> Self { Batch { - backend_batch: BC::Batch::default(), - upsert_keys_bytes: BTreeSet::default(), + phantom: PhantomData, + tree: BTreeMap::default(), upsert_ops: HashMap::default(), delete_ops: HashSet::default(), } @@ -36,39 +43,64 @@ impl<BC: BackendCol, C: DbCollectionRw> Default for Batch<BC, C> { } impl<BC: BackendCol, C: DbCollectionRw> Batch<BC, C> { - pub fn get(&self, k: &C::K) -> Option<&C::V> { + pub fn clear(&mut self) { + self.tree.clear(); + self.upsert_ops.clear(); + self.delete_ops.clear(); + } + pub fn get(&self, k: &C::K) -> BatchGet<C::V> { if self.delete_ops.contains(k) { - None + BatchGet::Deleted + } else if let Some(v) = self.upsert_ops.get(k) { + BatchGet::Updated(v) } else { - self.upsert_ops.get(k) + BatchGet::None } } pub fn upsert(&mut self, k: C::K, v: C::V) { let _ = k.as_bytes(|k_bytes| { - self.upsert_keys_bytes.insert(k_bytes.into()); v.as_bytes(|v_bytes| { - self.backend_batch.upsert(k_bytes, v_bytes); + self.tree + .insert(IVec::from(k_bytes), Some(IVec::from(v_bytes))); Ok(()) }) }); self.upsert_ops.insert(k, v); } pub fn remove(&mut self, k: C::K) { - let _ = k.as_bytes(|k_bytes| self.backend_batch.remove(k_bytes)); + let _ = k.as_bytes(|k_bytes| { + self.tree.insert(IVec::from(k_bytes), None); + }); self.delete_ops.insert(k); } - #[cfg(not(feature = "subscription"))] + #[doc(hidden)] pub fn into_backend_batch(self) -> BC::Batch { - self.backend_batch + let mut backend_batch = BC::Batch::default(); + for (k_bytes, v_bytes_opt) in self.tree { + if let Some(v_bytes) = v_bytes_opt { + backend_batch.upsert(k_bytes.as_ref(), v_bytes.as_ref()); + } else { + backend_batch.remove(k_bytes.as_ref()); + } + } + backend_batch } - #[cfg(feature = "subscription")] + #[doc(hidden)] pub fn into_backend_batch_and_events(self) -> (BC::Batch, SmallVec<[C::Event; 4]>) { + let mut backend_batch = BC::Batch::default(); + for (k_bytes, v_bytes_opt) in self.tree { + if let Some(v_bytes) = v_bytes_opt { + backend_batch.upsert(k_bytes.as_ref(), v_bytes.as_ref()); + } else { + backend_batch.remove(k_bytes.as_ref()); + } + } let mut events: SmallVec<[C::Event; 4]> = self .upsert_ops .into_iter() .map(|(k, v)| C::Event::upsert(k, v)) .collect(); events.extend(self.delete_ops.into_iter().map(C::Event::remove)); - (self.backend_batch, events) + (backend_batch, events) } } diff --git a/rust-libs/tools/kv_typed/src/bytes.rs b/rust-libs/tools/kv_typed/src/bytes.rs new file mode 100644 index 0000000000000000000000000000000000000000..54a4d29dd0bc9285ee61db86ab2f55e847bcc1d6 --- /dev/null +++ b/rust-libs/tools/kv_typed/src/bytes.rs @@ -0,0 +1,62 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KV Typed bytes + +use crate::*; + +pub trait KeyBytes: AsRef<[u8]> + Debug + Ord {} +impl<T> KeyBytes for T where T: AsRef<[u8]> + Debug + Ord {} +pub trait ValueBytes: AsRef<[u8]> + Debug {} +impl<T> ValueBytes for T where T: AsRef<[u8]> + Debug {} + +#[derive(Debug, Eq, PartialEq)] +pub enum CowKB<'a, B: KeyBytes> { + B(&'a [u8]), + O(B), +} +impl<'a, B: KeyBytes> AsRef<[u8]> for CowKB<'a, B> { + fn as_ref(&self) -> &[u8] { + match self { + CowKB::B(b_ref) => b_ref, + CowKB::O(b) => b.as_ref(), + } + } +} + +impl<'a, B: KeyBytes> PartialOrd for CowKB<'a, B> { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + self.as_ref().partial_cmp(other.as_ref()) + } +} +impl<'a, B: KeyBytes> Ord for CowKB<'a, B> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.as_ref().cmp(other.as_ref()) + } +} + +#[derive(Debug)] +pub enum CowVB<'a, B: ValueBytes> { + B(&'a [u8]), + O(B), +} +impl<'a, B: ValueBytes> AsRef<[u8]> for CowVB<'a, B> { + fn as_ref(&self) -> &[u8] { + match self { + CowVB::B(b_ref) => b_ref, + CowVB::O(b) => b.as_ref(), + } + } +} diff --git a/rust-libs/tools/kv_typed/src/collection_inner.rs b/rust-libs/tools/kv_typed/src/collection_inner.rs new file mode 100644 index 0000000000000000000000000000000000000000..a03d9883ef2e6bab986194de3fb6eb42360b088d --- /dev/null +++ b/rust-libs/tools/kv_typed/src/collection_inner.rs @@ -0,0 +1,43 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; + +#[derive(Debug)] +pub(crate) struct ColInner<BC: BackendCol, E: EventTrait> { + pub(crate) backend_col: BC, + subscribers: ColSubscribers<E>, +} + +impl<BC: BackendCol, E: EventTrait> ColInner<BC, E> { + pub(crate) fn new(backend_col: BC) -> (Self, SubscriptionsSender<E>) { + let subscribers = ColSubscribers::<E>::default(); + let subscription_sender = subscribers.get_subscription_sender(); + + ( + ColInner { + backend_col, + subscribers, + }, + subscription_sender, + ) + } + pub(crate) fn notify_subscribers(&mut self, events: Events<E>) { + // Add new subscribers, notify all subscribers them prune died subscribers + self.subscribers.add_new_subscribers(); + let died_subscribers = self.subscribers.notify_subscribers(Arc::new(events)); + self.subscribers.prune_subscribers(died_subscribers); + } +} diff --git a/rust-libs/tools/kv_typed/src/collection_ro.rs b/rust-libs/tools/kv_typed/src/collection_ro.rs index ab81f517596c06a9263c618b85aeec3c2363a0d3..182031c7b5c6c4136da27de80b516b951d1a64d8 100644 --- a/rust-libs/tools/kv_typed/src/collection_ro.rs +++ b/rust-libs/tools/kv_typed/src/collection_ro.rs @@ -8,11 +8,25 @@ pub trait DbCollectionRo: Sized { fn count(&self) -> KvResult<usize>; fn get(&self, k: &Self::K) -> KvResult<Option<Self::V>>; - fn iter<R: 'static + RangeBounds<Self::K>>( + /// Don't worry about complex iter type. Use it like an `impl Iterator<Item=KvResult<(K, V)>>`. + fn iter< + D: Send + Sync, + R: 'static + RangeBounds<Self::K>, + F: FnOnce( + KvIter< + Self::BackendCol, + <Self::BackendCol as BackendCol>::KeyBytes, + <Self::BackendCol as BackendCol>::ValueBytes, + <Self::BackendCol as BackendCol>::Iter, + Self::K, + Self::V, + >, + ) -> D, + >( &self, range: R, - ) -> KvIter<Self::BackendCol, Self::K, Self::V>; - #[cfg(feature = "subscription")] + f: F, + ) -> D; fn subscribe(&self, subscriber_sender: Subscriber<Self::Event>) -> KvResult<()>; } @@ -28,27 +42,21 @@ mockall::mock! { fn count(&self) -> KvResult<usize>; fn get(&self, k: &E::K) -> KvResult<Option<E::V>>; fn iter<R: 'static + RangeBounds<E::K>>(&self, range: R) - -> KvIter<MockBackendCol, E::K, E::V>; - #[cfg(feature = "subscription")] + -> KvIter<MockBackendCol, MockBackendIter, E::K, E::V>; fn subscribe(&self, subscriber_sender: Subscriber<E>) -> KvResult<()>; } } #[derive(Debug)] pub struct ColRo<BC: BackendCol, E: EventTrait> { - pub(crate) inner: BC, - #[cfg(not(feature = "subscription"))] - pub(crate) phantom: PhantomData<E>, - #[cfg(feature = "subscription")] - pub(crate) subscription_sender: crate::subscription::SubscriptionsSender<E>, + pub(crate) inner: Arc<parking_lot::RwLock<ColInner<BC, E>>>, + pub(crate) subscription_sender: SubscriptionsSender<E>, } + impl<BC: BackendCol, E: EventTrait> Clone for ColRo<BC, E> { fn clone(&self) -> Self { Self { - inner: self.inner.clone(), - #[cfg(not(feature = "subscription"))] - phantom: PhantomData, - #[cfg(feature = "subscription")] + inner: Arc::clone(&self.inner), subscription_sender: self.subscription_sender.clone(), } } @@ -61,21 +69,38 @@ impl<BC: BackendCol, E: EventTrait> DbCollectionRo for ColRo<BC, E> { #[inline(always)] fn count(&self) -> KvResult<usize> { - self.inner.count() + let r = self.inner.read(); + r.backend_col.count() } #[inline(always)] fn get(&self, k: &Self::K) -> KvResult<Option<Self::V>> { - self.inner.get(k) + let r = self.inner.read(); + r.backend_col.get(k) } #[inline(always)] - fn iter<R: 'static + RangeBounds<Self::K>>( + fn iter< + D: Send + Sync, + R: 'static + RangeBounds<Self::K>, + F: FnOnce( + KvIter< + Self::BackendCol, + <Self::BackendCol as BackendCol>::KeyBytes, + <Self::BackendCol as BackendCol>::ValueBytes, + <Self::BackendCol as BackendCol>::Iter, + Self::K, + Self::V, + >, + ) -> D, + >( &self, range: R, - ) -> KvIter<Self::BackendCol, Self::K, Self::V> { - let range: RangeBytes = KvIter::<BC, Self::K, Self::V>::convert_range::<R>(range); - KvIter::new(self.inner.iter::<Self::K, Self::V>(range.clone()), range) + f: F, + ) -> D { + let range: RangeBytes = crate::iter::convert_range::<Self::K, R>(range); + let r = self.inner.read(); + let iter = r.backend_col.iter::<Self::K, Self::V>(range.clone()); + f(KvIter::new(iter, range)) } - #[cfg(feature = "subscription")] #[inline(always)] fn subscribe(&self, subscriber_sender: Subscriber<Self::Event>) -> KvResult<()> { self.subscription_sender @@ -94,7 +119,8 @@ pub trait DbCollectionRoGetRef<V: ValueZc>: DbCollectionRo<V = V> { impl<V: ValueZc, BC: BackendCol, E: EventTrait<V = V>> DbCollectionRoGetRef<V> for ColRo<BC, E> { fn get_ref<D, F: Fn(&V::Ref) -> KvResult<D>>(&self, k: &E::K, f: F) -> KvResult<Option<D>> { - self.inner.get_ref::<E::K, V, D, F>(k, f) + let r = self.inner.read(); + r.backend_col.get_ref::<E::K, V, D, F>(k, f) } } @@ -114,6 +140,7 @@ impl<V: ValueSliceZc, BC: BackendCol, E: EventTrait<V = V>> DbCollectionRoGetRef k: &E::K, f: F, ) -> KvResult<Option<D>> { - self.inner.get_ref_slice::<E::K, V, D, F>(k, f) + let r = self.inner.read(); + r.backend_col.get_ref_slice::<E::K, V, D, F>(k, f) } } diff --git a/rust-libs/tools/kv_typed/src/collection_rw.rs b/rust-libs/tools/kv_typed/src/collection_rw.rs index 1d245358d5e4d6504ec91129a31b39a3bd220aba..dbe112476732b627cbd6c2f1aba0061e9a3f3a56 100644 --- a/rust-libs/tools/kv_typed/src/collection_rw.rs +++ b/rust-libs/tools/kv_typed/src/collection_rw.rs @@ -1,6 +1,5 @@ use crate::*; -#[cfg(feature = "subscription")] -use parking_lot::Mutex; +use parking_lot::RwLockWriteGuard as WriteGuard; pub trait DbCollectionRw { type K: Key; @@ -15,25 +14,13 @@ pub trait DbCollectionRw { #[derive(Debug)] pub struct ColRw<BC: BackendCol, E: EventTrait> { - inner: ColRo<BC, E>, - #[cfg(feature = "subscription")] - pending_events_receiver: Receiver<Events<E>>, - #[cfg(feature = "subscription")] - pending_events_sender: Sender<Events<E>>, - #[cfg(feature = "subscription")] - subscribers: Arc<Mutex<ColSubscribers<E>>>, + pub(crate) inner: ColRo<BC, E>, } impl<BC: BackendCol, E: EventTrait> Clone for ColRw<BC, E> { fn clone(&self) -> Self { Self { inner: self.inner.clone(), - #[cfg(feature = "subscription")] - pending_events_receiver: self.pending_events_receiver.clone(), - #[cfg(feature = "subscription")] - pending_events_sender: self.pending_events_sender.clone(), - #[cfg(feature = "subscription")] - subscribers: self.subscribers.clone(), } } } @@ -44,93 +31,61 @@ impl<BC: BackendCol, E: EventTrait> DbCollectionRw for ColRw<BC, E> { type Event = E; fn clear(&self) -> KvResult<()> { - self.inner.inner.clear()?; - #[cfg(feature = "subscription")] - { - let events = smallvec::smallvec![E::clear()]; - self.notify_subscribers(events); - } + let mut w = self.inner.inner.write(); + w.backend_col.clear()?; + let events = smallvec::smallvec![E::clear()]; + w.notify_subscribers(events); Ok(()) } fn remove(&self, k: Self::K) -> KvResult<()> { - self.inner.inner.delete(&k)?; - #[cfg(feature = "subscription")] - { - let events = smallvec::smallvec![E::remove(k)]; - self.notify_subscribers(events); - } + let mut w = self.inner.inner.write(); + w.backend_col.delete(&k)?; + let events = smallvec::smallvec![E::remove(k)]; + w.notify_subscribers(events); Ok(()) } fn save(&self) -> KvResult<()> { - self.inner.inner.save()?; + let w = self.inner.inner.write(); + w.backend_col.save()?; Ok(()) } fn upsert(&self, k: Self::K, v: Self::V) -> KvResult<()> { - self.inner.inner.put(&k, &v)?; - #[cfg(feature = "subscription")] - { - let events = smallvec::smallvec![E::upsert(k, v)]; - self.notify_subscribers(events); - } + let mut w = self.inner.inner.write(); + w.backend_col.put(&k, &v)?; + let events = smallvec::smallvec![E::upsert(k, v)]; + w.notify_subscribers(events); Ok(()) } } impl<BC: BackendCol, E: EventTrait> ColRw<BC, E> { - #[cfg(not(feature = "subscription"))] - pub fn new(col_backend: BC) -> Self { + pub fn new(backend_col: BC) -> Self { + let (col_inner, subscription_sender) = ColInner::new(backend_col); Self { inner: ColRo { - inner: col_backend, - phantom: PhantomData, + inner: Arc::new(parking_lot::RwLock::new(col_inner)), + subscription_sender, }, } } - #[cfg(feature = "subscription")] - pub fn new(col_backend: BC) -> Self { - let subscribers = ColSubscribers::<E>::default(); - let subscription_sender = subscribers.get_subscription_sender(); - let inner = ColRo { - inner: col_backend, - subscription_sender, - }; - let (pending_events_sender, pending_events_receiver) = unbounded(); - Self { - inner, - pending_events_sender, - pending_events_receiver, - subscribers: Arc::new(Mutex::new(subscribers)), - } - } pub fn to_ro(&self) -> &ColRo<BC, E> { &self.inner } - #[cfg(feature = "subscription")] - fn notify_subscribers(&self, mut events: Events<E>) { - if let Some(mut subscribers_guard) = self.subscribers.try_lock() { - // Take pending events - while let Ok(pending_events) = self.pending_events_receiver.try_recv() { - events.extend(pending_events); - } - // Add new subscribers, notify all subscribers them prune died subscribers - subscribers_guard.add_new_subscribers(); - let died_subscribers = subscribers_guard.notify_subscribers(Arc::new(events)); - subscribers_guard.prune_subscribers(died_subscribers); - } else if !events.is_empty() { - // Push pending events into the queue - let _ = self.pending_events_sender.try_send(events); - } - } - #[cfg(not(feature = "subscription"))] pub fn write_batch(&self, batch: Batch<BC, Self>) -> KvResult<()> { - self.inner.inner.write_batch(batch.into_backend_batch())?; + let (backend_batch, events) = batch.into_backend_batch_and_events(); + let mut w = self.inner.inner.write(); + w.backend_col.write_batch(backend_batch)?; + w.notify_subscribers(events); Ok(()) } - #[cfg(feature = "subscription")] - pub fn write_batch(&self, batch: Batch<BC, Self>) -> KvResult<()> { - let (backend_batch, events) = batch.into_backend_batch_and_events(); - self.inner.inner.write_batch(backend_batch)?; - self.notify_subscribers(events); + pub(crate) fn write_backend_batch( + &self, + backend_batch: BC::Batch, + events: Events<E>, + write_guard: &mut WriteGuard<ColInner<BC, E>>, + ) -> KvResult<()> { + write_guard.backend_col.write_batch(backend_batch)?; + write_guard.notify_subscribers(events); Ok(()) } } diff --git a/rust-libs/tools/kv_typed/src/error.rs b/rust-libs/tools/kv_typed/src/error.rs index 9245d0059d38e4563943791df41148d8fdcd9cca..d9602e7471b55ec1b8e7702f01ec094cedcc16c7 100644 --- a/rust-libs/tools/kv_typed/src/error.rs +++ b/rust-libs/tools/kv_typed/src/error.rs @@ -26,6 +26,10 @@ pub type DynErr = Box<dyn Error + Send + Sync + 'static>; /// KV Typed error pub type KvResult<T> = Result<T, KvError>; +#[allow(type_alias_bounds)] +pub(crate) type BackendResult<BC: BackendCol> = + Result<(<BC as BackendCol>::KeyBytes, <BC as BackendCol>::ValueBytes), DynErr>; + /// KV Typed error #[derive(Debug, Error)] pub enum KvError { @@ -34,7 +38,7 @@ pub enum KvError { BackendError(DynErr), /// Custom #[error("{0}")] - Custom(String), + Custom(DynErr), // DB corrupted #[error("DB corrupted:{0}")] DbCorrupted(String), @@ -49,23 +53,27 @@ pub enum KvError { FailToSubscribe, } +impl From<std::io::Error> for KvError { + fn from(e: std::io::Error) -> Self { + KvError::BackendError(e.into()) + } +} + #[cfg(feature = "leveldb_backend")] impl From<crate::backend::leveldb::LevelDbError> for KvError { fn from(e: crate::backend::leveldb::LevelDbError) -> Self { KvError::BackendError(Box::new(e).into()) } } +#[cfg(target_arch = "x86_64")] +impl From<lmdb_zero::Error> for KvError { + fn from(e: lmdb_zero::Error) -> Self { + KvError::BackendError(e.into()) + } +} #[cfg(feature = "sled_backend")] impl From<sled::Error> for KvError { fn from(e: sled::Error) -> Self { KvError::BackendError(Box::new(e).into()) } } - -pub type TransactionResult<D, A, BE> = Result<D, TransactionError<A, BE>>; -#[derive(Debug)] -pub enum TransactionError<A: Debug, BE: Error + Send + Sync + 'static> { - Abort(A), - BackendErr(BE), - KvError(KvError), -} diff --git a/rust-libs/tools/kv_typed/src/event.rs b/rust-libs/tools/kv_typed/src/event.rs index 0a2c79abe82bc8ab9cea316804bd8095eacb2e47..e259e267c5e545b7f3488192c653c0b1bf8fa319 100644 --- a/rust-libs/tools/kv_typed/src/event.rs +++ b/rust-libs/tools/kv_typed/src/event.rs @@ -29,3 +29,18 @@ pub trait EventTrait: 'static + Debug + PartialEq + Send + Sync { fn upsert(k: Self::K, v: Self::V) -> Self; fn remove(k: Self::K) -> Self; } + +impl EventTrait for () { + type K = (); + type V = (); + + fn clear() -> Self { + unimplemented!() + } + fn upsert(_: Self::K, _: Self::V) -> Self { + unimplemented!() + } + fn remove(_: Self::K) -> Self { + unimplemented!() + } +} diff --git a/rust-libs/tools/kv_typed/src/explorer.rs b/rust-libs/tools/kv_typed/src/explorer.rs index 721c839b0299c9e8040774c4e34c6f7184695a27..9522f03b3bc6ef15e7836aa2babb056dd52452c5 100644 --- a/rust-libs/tools/kv_typed/src/explorer.rs +++ b/rust-libs/tools/kv_typed/src/explorer.rs @@ -1,5 +1,4 @@ use crate::*; -#[cfg(not(feature = "async"))] use rayon::{iter::ParallelBridge, prelude::*}; use std::num::NonZeroUsize; @@ -18,6 +17,16 @@ pub trait ExplorableKey: Sized { fn to_explorer_string(&self) -> KvResult<String>; } +impl ExplorableKey for () { + fn from_explorer_str(_: &str) -> Result<Self, StringErr> { + Ok(()) + } + + fn to_explorer_string(&self) -> KvResult<String> { + Ok(String::with_capacity(0)) + } +} + impl ExplorableKey for String { fn from_explorer_str(source: &str) -> Result<Self, StringErr> { Ok(source.to_owned()) @@ -49,6 +58,16 @@ pub trait ExplorableValue: Sized { fn to_explorer_json(&self) -> KvResult<serde_json::Value>; } +impl ExplorableValue for () { + fn from_explorer_str(_: &str) -> Result<Self, StringErr> { + Ok(()) + } + + fn to_explorer_json(&self) -> KvResult<serde_json::Value> { + Ok(serde_json::Value::String(String::with_capacity(0))) + } +} + impl ExplorableValue for String { fn from_explorer_str(source: &str) -> Result<Self, StringErr> { Ok(source.to_owned()) @@ -231,37 +250,21 @@ impl<'a> ExplorerAction<'a> { }; if let Some(limit) = limit { - let iter = col.iter(range); - - if reverse { - iter.reverse() - .step_by(step.get()) - .filter_map(filter_map_closure) - .take(limit) - .collect() - } else { - iter.step_by(step.get()) - .filter_map(filter_map_closure) - .take(limit) - .collect() - } - } else { - #[cfg(feature = "async")] - { + col.iter(range, |iter| { if reverse { - col.iter(range) - .reverse() + iter.reverse() .step_by(step.get()) .filter_map(filter_map_closure) + .take(limit) .collect() } else { - col.iter(range) - .step_by(step.get()) + iter.step_by(step.get()) .filter_map(filter_map_closure) + .take(limit) .collect() } - } - #[cfg(not(feature = "async"))] + }) + } else { { let (send, recv) = unbounded(); @@ -271,22 +274,24 @@ impl<'a> ExplorerAction<'a> { iter.filter_map(filter_map_closure).collect() }); - if reverse { - for entry_res in col.iter(range).reverse() { - if send.try_send(entry_res).is_err() { - return handler.join().expect("child thread panic"); + col.iter(range, |iter| { + if reverse { + for entry_res in iter.reverse() { + if send.try_send(entry_res).is_err() { + return handler.join().expect("child thread panic"); + } } - } - } else { - for entry_res in col.iter(range) { - if send.try_send(entry_res).is_err() { - return handler.join().expect("child thread panic"); + } else { + for entry_res in iter { + if send.try_send(entry_res).is_err() { + return handler.join().expect("child thread panic"); + } } } - } - drop(send); + drop(send); - handler.join().expect("child thread panic") + handler.join().expect("child thread panic") + }) } } } diff --git a/rust-libs/tools/kv_typed/src/from_bytes.rs b/rust-libs/tools/kv_typed/src/from_bytes.rs index ef87edab6c7ad5e4e6874f74f4992779983e0c15..a8837d27d53aa461f6516a8f3fbd3e0d2dbaa572 100644 --- a/rust-libs/tools/kv_typed/src/from_bytes.rs +++ b/rust-libs/tools/kv_typed/src/from_bytes.rs @@ -7,13 +7,21 @@ pub trait FromBytes: Sized { fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Err>; } +impl FromBytes for () { + type Err = std::convert::Infallible; + + fn from_bytes(_: &[u8]) -> Result<Self, Self::Err> { + Ok(()) + } +} + macro_rules! impl_from_bytes_for_numbers { ($($T:ty),*) => {$( impl FromBytes for $T { type Err = std::array::TryFromSliceError; fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Err> { - Ok(<$T>::from_be_bytes(bytes.try_into()?)) + Ok(<$T>::from_le_bytes(bytes.try_into()?)) } } )*}; @@ -51,15 +59,21 @@ impl_from_bytes_for_smallvec!(1, 2, 4, 8, 16, 32, 64); impl<T> FromBytes for Vec<T> where - T: Copy + zerocopy::FromBytes, + T: Copy + Default + zerocopy::FromBytes, { type Err = StringErr; fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Err> { - let layout_verified = zerocopy::LayoutVerified::<_, [T]>::new_slice(bytes) - .ok_or_else(|| StringErr("".to_owned()))?; + let layout_verified = + zerocopy::LayoutVerified::<_, [T]>::new_slice(bytes).ok_or_else(|| { + StringErr( + "Corrupted DB: Vec<T> bytes are wrong aligned or have invalid length" + .to_owned(), + ) + })?; let slice = layout_verified.into_slice(); let mut vec = Vec::with_capacity(slice.len()); + vec.resize_with(slice.len(), Default::default); vec.copy_from_slice(slice); Ok(vec) } @@ -92,3 +106,11 @@ where Ok(HashSet::from_iter(slice.iter().copied())) } } + +impl FromBytes for IVec { + type Err = std::convert::Infallible; + + fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Err> { + Ok(Self::from(bytes)) + } +} diff --git a/rust-libs/tools/kv_typed/src/iter.rs b/rust-libs/tools/kv_typed/src/iter.rs index b177bb65d422de75b3917a68b77edd9196e907c0..2fee25780e6d7c43c306fe38f70e0db159dbe23a 100644 --- a/rust-libs/tools/kv_typed/src/iter.rs +++ b/rust-libs/tools/kv_typed/src/iter.rs @@ -41,13 +41,22 @@ impl<I, T, E> ResultIter<T, E> for I where I: Iterator<Item = Result<T, E>> + Si pub type RangeBytes = (Bound<IVec>, Bound<IVec>); #[derive(Debug)] -pub struct KvIter<C: BackendCol, K: Key, V: Value> { - range_iter: range::RangeIter<C>, +pub struct KvIter< + C: BackendCol, + KB: KeyBytes, + VB: ValueBytes, + BI: BackendIter<KB, VB>, + K: Key, + V: Value, +> { + range_iter: range::RangeIter<C, KB, VB, BI>, phantom_key: PhantomData<K>, phantom_value: PhantomData<V>, } -impl<C: BackendCol, K: Key, V: Value> Iterator for KvIter<C, K, V> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + Iterator for KvIter<C, KB, VB, BI, K, V> +{ type Item = KvResult<(K, V)>; fn next(&mut self) -> Option<Self::Item> { @@ -65,7 +74,9 @@ impl<C: BackendCol, K: Key, V: Value> Iterator for KvIter<C, K, V> { } } -impl<C: BackendCol, K: Key, V: Value> ReversableIterator for KvIter<C, K, V> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + ReversableIterator for KvIter<C, KB, VB, BI, K, V> +{ #[inline(always)] fn reverse(self) -> Self { Self { @@ -76,20 +87,11 @@ impl<C: BackendCol, K: Key, V: Value> ReversableIterator for KvIter<C, K, V> { } } -impl<C: BackendCol, K: Key, V: Value> KvIter<C, K, V> { - pub fn keys(self) -> KvIterKeys<C, K> { - KvIterKeys::new(self.range_iter) - } - pub fn values(self) -> KvIterValues<C, K, V> { - KvIterValues::new(self.range_iter) - } - pub(crate) fn convert_range<RK: RangeBounds<K>>(range: RK) -> RangeBytes { - let range_start = convert_bound(range.start_bound()); - let range_end = convert_bound(range.end_bound()); - (range_start, range_end) - } +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + KvIter<C, KB, VB, BI, K, V> +{ #[cfg(feature = "mock")] - pub fn new(backend_iter: C::Iter, range: RangeBytes) -> Self { + pub fn new(backend_iter: BI, range: RangeBytes) -> Self { Self { range_iter: range::RangeIter::new(backend_iter, range.0, range.1), phantom_key: PhantomData, @@ -97,7 +99,7 @@ impl<C: BackendCol, K: Key, V: Value> KvIter<C, K, V> { } } #[cfg(not(feature = "mock"))] - pub(crate) fn new(backend_iter: C::Iter, range: RangeBytes) -> Self { + pub(crate) fn new(backend_iter: BI, range: RangeBytes) -> Self { Self { range_iter: range::RangeIter::new(backend_iter, range.0, range.1), phantom_key: PhantomData, @@ -106,6 +108,38 @@ impl<C: BackendCol, K: Key, V: Value> KvIter<C, K, V> { } } +pub trait EntryIter { + type K: Key; + type V: Value; + type KeysIter: Iterator<Item = KvResult<Self::K>>; + type ValuesIter: Iterator<Item = KvResult<Self::V>>; + + fn keys(self) -> Self::KeysIter; + fn values(self) -> Self::ValuesIter; +} + +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + EntryIter for KvIter<C, KB, VB, BI, K, V> +{ + type K = K; + type V = V; + type KeysIter = KvIterKeys<C, KB, VB, BI, K>; + type ValuesIter = KvIterValues<C, KB, VB, BI, K, V>; + + fn keys(self) -> KvIterKeys<C, KB, VB, BI, K> { + KvIterKeys::new(self.range_iter) + } + fn values(self) -> KvIterValues<C, KB, VB, BI, K, V> { + KvIterValues::new(self.range_iter) + } +} + +pub(crate) fn convert_range<K: Key, RK: RangeBounds<K>>(range: RK) -> RangeBytes { + let range_start = convert_bound(range.start_bound()); + let range_end = convert_bound(range.end_bound()); + (range_start, range_end) +} + #[inline(always)] fn convert_bound<K: Key>(bound_key: Bound<&K>) -> Bound<IVec> { match bound_key { diff --git a/rust-libs/tools/kv_typed/src/iter/keys.rs b/rust-libs/tools/kv_typed/src/iter/keys.rs index bbe9a0d74e5f6231ff022471f9d0e2d30f9b3564..89c63f7a5ce67f8d8b6d9cada919337df347531b 100644 --- a/rust-libs/tools/kv_typed/src/iter/keys.rs +++ b/rust-libs/tools/kv_typed/src/iter/keys.rs @@ -18,12 +18,15 @@ use crate::*; #[derive(Debug)] -pub struct KvIterKeys<C: BackendCol, K: Key> { - range_iter: super::range::RangeIter<C>, +pub struct KvIterKeys<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key> +{ + range_iter: super::range::RangeIter<C, KB, VB, BI>, phantom_key: PhantomData<K>, } -impl<C: BackendCol, K: Key> Iterator for KvIterKeys<C, K> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key> Iterator + for KvIterKeys<C, KB, VB, BI, K> +{ type Item = KvResult<K>; fn next(&mut self) -> Option<Self::Item> { @@ -38,7 +41,9 @@ impl<C: BackendCol, K: Key> Iterator for KvIterKeys<C, K> { } } -impl<C: BackendCol, K: Key> ReversableIterator for KvIterKeys<C, K> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key> + ReversableIterator for KvIterKeys<C, KB, VB, BI, K> +{ #[inline(always)] fn reverse(self) -> Self { Self { @@ -48,8 +53,10 @@ impl<C: BackendCol, K: Key> ReversableIterator for KvIterKeys<C, K> { } } -impl<C: BackendCol, K: Key> KvIterKeys<C, K> { - pub(super) fn new(range_iter: super::range::RangeIter<C>) -> Self { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key> + KvIterKeys<C, KB, VB, BI, K> +{ + pub(super) fn new(range_iter: super::range::RangeIter<C, KB, VB, BI>) -> Self { Self { range_iter, phantom_key: PhantomData, diff --git a/rust-libs/tools/kv_typed/src/iter/range.rs b/rust-libs/tools/kv_typed/src/iter/range.rs index 58afce651111c2bb4ffdc0e6782cd1db0df90c53..cabef97a24211929e74fdee83e1c28519998e873 100644 --- a/rust-libs/tools/kv_typed/src/iter/range.rs +++ b/rust-libs/tools/kv_typed/src/iter/range.rs @@ -17,15 +17,19 @@ use crate::*; -// V2 -pub(super) struct RangeIter<C: BackendCol> { - backend_iter: C::Iter, +pub(super) struct RangeIter<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>> { + backend_iter: BI, + phantom: PhantomData<C>, + phantom_kb: PhantomData<KB>, + phantom_vb: PhantomData<VB>, reversed: bool, range_start: Bound<IVec>, range_end: Bound<IVec>, } -impl<C: BackendCol> Debug for RangeIter<C> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>> Debug + for RangeIter<C, KB, VB, BI> +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LevelDbCol") .field("backend_iter", &"BackendIter") @@ -36,15 +40,16 @@ impl<C: BackendCol> Debug for RangeIter<C> { } } -impl<C: BackendCol> RangeIter<C> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>> + RangeIter<C, KB, VB, BI> +{ #[inline(always)] - pub(crate) fn new( - backend_iter: C::Iter, - range_start: Bound<IVec>, - range_end: Bound<IVec>, - ) -> Self { + pub(crate) fn new(backend_iter: BI, range_start: Bound<IVec>, range_end: Bound<IVec>) -> Self { RangeIter { backend_iter, + phantom: PhantomData, + phantom_kb: PhantomData, + phantom_vb: PhantomData, reversed: false, range_start, range_end, @@ -52,8 +57,10 @@ impl<C: BackendCol> RangeIter<C> { } } -impl<C: BackendCol> Iterator for RangeIter<C> { - type Item = <C::Iter as Iterator>::Item; +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>> Iterator + for RangeIter<C, KB, VB, BI> +{ + type Item = Result<(KB, VB), DynErr>; fn next(&mut self) -> Option<Self::Item> { loop { @@ -97,11 +104,16 @@ impl<C: BackendCol> Iterator for RangeIter<C> { } } } -impl<C: BackendCol> ReversableIterator for RangeIter<C> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>> ReversableIterator + for RangeIter<C, KB, VB, BI> +{ #[inline(always)] fn reverse(self) -> Self { RangeIter { backend_iter: self.backend_iter.reverse(), + phantom: PhantomData, + phantom_kb: PhantomData, + phantom_vb: PhantomData, reversed: !self.reversed, range_start: self.range_start, range_end: self.range_end, diff --git a/rust-libs/tools/kv_typed/src/iter/values.rs b/rust-libs/tools/kv_typed/src/iter/values.rs index d75986e46ddb7b3a492ac3f80451108a76672d65..f701dab041c0beff984dfa388c91dfe3294af3a1 100644 --- a/rust-libs/tools/kv_typed/src/iter/values.rs +++ b/rust-libs/tools/kv_typed/src/iter/values.rs @@ -18,13 +18,22 @@ use crate::*; #[derive(Debug)] -pub struct KvIterValues<C: BackendCol, K: Key, V: Value> { - range_iter: super::range::RangeIter<C>, +pub struct KvIterValues< + C: BackendCol, + KB: KeyBytes, + VB: ValueBytes, + BI: BackendIter<KB, VB>, + K: Key, + V: Value, +> { + range_iter: super::range::RangeIter<C, KB, VB, BI>, phantom_key: PhantomData<K>, phantom_value: PhantomData<V>, } -impl<C: BackendCol, K: Key, V: Value> Iterator for KvIterValues<C, K, V> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + Iterator for KvIterValues<C, KB, VB, BI, K, V> +{ type Item = KvResult<V>; fn next(&mut self) -> Option<Self::Item> { @@ -39,7 +48,9 @@ impl<C: BackendCol, K: Key, V: Value> Iterator for KvIterValues<C, K, V> { } } -impl<C: BackendCol, K: Key, V: Value> ReversableIterator for KvIterValues<C, K, V> { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + ReversableIterator for KvIterValues<C, KB, VB, BI, K, V> +{ #[inline(always)] fn reverse(self) -> Self { Self { @@ -50,8 +61,10 @@ impl<C: BackendCol, K: Key, V: Value> ReversableIterator for KvIterValues<C, K, } } -impl<C: BackendCol, K: Key, V: Value> KvIterValues<C, K, V> { - pub(super) fn new(range_iter: super::range::RangeIter<C>) -> Self { +impl<C: BackendCol, KB: KeyBytes, VB: ValueBytes, BI: BackendIter<KB, VB>, K: Key, V: Value> + KvIterValues<C, KB, VB, BI, K, V> +{ + pub(super) fn new(range_iter: super::range::RangeIter<C, KB, VB, BI>) -> Self { Self { range_iter, phantom_key: PhantomData, diff --git a/rust-libs/tools/kv_typed/src/lib.rs b/rust-libs/tools/kv_typed/src/lib.rs index 3c7e4ebd9c122d4d9e95b270bbdf8d3aafbf28c2..21800d9eed521ba7d8d993a2d3ef5905539e49c6 100644 --- a/rust-libs/tools/kv_typed/src/lib.rs +++ b/rust-libs/tools/kv_typed/src/lib.rs @@ -29,6 +29,8 @@ mod as_bytes; pub mod backend; mod batch; +mod bytes; +mod collection_inner; mod collection_ro; mod collection_rw; mod error; @@ -38,8 +40,9 @@ pub mod explorer; mod from_bytes; mod iter; mod key; -#[cfg(feature = "subscription")] mod subscription; +mod transactional_read; +mod transactional_write; mod utils; mod value; @@ -54,46 +57,50 @@ pub mod prelude { pub use crate::as_bytes::{KeyAsBytes, ValueAsBytes}; #[cfg(feature = "leveldb_backend")] pub use crate::backend::leveldb::{LevelDb, LevelDbConf}; + #[cfg(target_arch = "x86_64")] + pub use crate::backend::lmdb::{Lmdb, LmdbConf}; #[cfg(feature = "memory_backend")] pub use crate::backend::memory::{Mem, MemConf}; #[cfg(feature = "mock")] pub use crate::backend::mock::{MockBackend, MockBackendCol, MockBackendIter}; #[cfg(feature = "sled_backend")] pub use crate::backend::sled::{Config as SledConf, Sled}; - pub use crate::backend::{Backend, BackendCol, TransactionalBackend}; - pub use crate::batch::Batch; + pub use crate::backend::{Backend, BackendCol}; + pub use crate::batch::{Batch, BatchGet}; #[cfg(feature = "mock")] pub use crate::collection_ro::MockColRo; pub use crate::collection_ro::{ ColRo, DbCollectionRo, DbCollectionRoGetRef, DbCollectionRoGetRefSlice, }; pub use crate::collection_rw::{ColRw, DbCollectionRw}; - pub use crate::error::{ - DynErr, KvError, KvResult, StringErr, TransactionError, TransactionResult, - }; + pub use crate::error::{DynErr, KvError, KvResult, StringErr}; pub use crate::event::{EventTrait, Events}; #[cfg(feature = "explorer")] pub use crate::explorer::{ExplorableKey, ExplorableValue}; pub use crate::from_bytes::FromBytes; pub use crate::iter::{ - keys::KvIterKeys, values::KvIterValues, KvIter, ResultIter, ReversableIterator, + keys::KvIterKeys, values::KvIterValues, EntryIter, KvIter, ResultIter, ReversableIterator, }; pub use crate::key::Key; - #[cfg(feature = "subscription")] pub use crate::subscription::{NewSubscribers, Subscriber, Subscribers}; + pub use crate::transactional_read::TransactionalRead; + pub use crate::transactional_write::{DbTxCollectionRw, TransactionalWrite, TxColRw}; pub use crate::utils::arc::Arc; pub use crate::value::{Value, ValueSliceZc, ValueZc}; pub use kv_typed_code_gen::db_schema; } // Internal crate imports -pub(crate) use crate::backend::BackendBatch; +pub(crate) use crate::backend::{BackendBatch, BackendIter}; +pub(crate) use crate::bytes::{CowKB, CowVB, KeyBytes, ValueBytes}; +pub(crate) use crate::collection_inner::ColInner; +pub(crate) use crate::error::BackendResult; #[cfg(feature = "explorer")] pub(crate) use crate::explorer::{ExplorableKey, ExplorableValue}; pub(crate) use crate::iter::RangeBytes; pub(crate) use crate::prelude::*; -#[cfg(feature = "subscription")] -pub(crate) use crate::subscription::ColSubscribers; +pub(crate) use crate::subscription::{ColSubscribers, SubscriptionsSender}; +pub(crate) use crate::transactional_write::tx_iter::BackendTxIter; pub(crate) use crate::utils::arc::Arc; pub(crate) use crate::utils::ivec::IVec; use flume::{unbounded, Receiver, Sender, TrySendError}; diff --git a/rust-libs/tools/kv_typed/src/subscription.rs b/rust-libs/tools/kv_typed/src/subscription.rs index 3053328907df3907704235fa313d10b847aeb641..049aa446ae9238c255cc248bdf144a0a41d9b776 100644 --- a/rust-libs/tools/kv_typed/src/subscription.rs +++ b/rust-libs/tools/kv_typed/src/subscription.rs @@ -73,12 +73,6 @@ impl<E: EventTrait> ColSubscribers<E> { .unwrap_or_else(|| Arc::clone(&events)), ) { match e { - #[cfg(feature = "async")] - TrySendError::Closed(events_) => { - unsend_events_opt = Some(events_); - died_subscribers.push(*id); - } - #[cfg(not(feature = "async"))] TrySendError::Disconnected(events_) => { unsend_events_opt = Some(events_); died_subscribers.push(*id); diff --git a/rust-libs/tools/kv_typed/src/transactional_read.rs b/rust-libs/tools/kv_typed/src/transactional_read.rs new file mode 100644 index 0000000000000000000000000000000000000000..e4ef0b077fab0b5e0e8290292fb6497daa64a451 --- /dev/null +++ b/rust-libs/tools/kv_typed/src/transactional_read.rs @@ -0,0 +1,145 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KV Typed transactional read + +use crate::*; +use parking_lot::RwLockReadGuard as ReadGuard; + +pub struct TxColRo<'db, BC: BackendCol, E: EventTrait> { + col_reader: ReadGuard<'db, ColInner<BC, E>>, +} +impl<'db, BC: BackendCol, E: EventTrait> Debug for TxColRo<'db, BC, E> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LevelDbCol") + .field("col_reader", &format!("{:?}", self.col_reader)) + .finish() + } +} +impl<'db, BC: BackendCol, E: EventTrait> TxColRo<'db, BC, E> { + #[inline(always)] + fn new(col_reader: ReadGuard<'db, ColInner<BC, E>>) -> Self { + TxColRo { col_reader } + } + #[inline(always)] + pub fn count(&self) -> KvResult<usize> { + self.col_reader.backend_col.count() + } + #[inline(always)] + pub fn get(&self, k: &E::K) -> KvResult<Option<E::V>> { + self.col_reader.backend_col.get(k) + } + #[allow(clippy::type_complexity)] + #[inline(always)] + /// Don't worry about complex iter type. Use it like an `impl Iterator<Item=KvResult<(K, V)>>`. + pub fn iter<D, R, F>(&self, range: R, f: F) -> D + where + D: Send + Sync, + R: 'static + RangeBounds<E::K>, + F: FnOnce(KvIter<BC, BC::KeyBytes, BC::ValueBytes, BC::Iter, E::K, E::V>) -> D, + { + let range_bytes = crate::iter::convert_range::<E::K, R>(range); + let backend_iter = self + .col_reader + .backend_col + .iter::<E::K, E::V>(range_bytes.clone()); + f(KvIter::new(backend_iter, range_bytes)) + } +} +impl<'db, V: ValueZc, BC: BackendCol, E: EventTrait<V = V>> TxColRo<'db, BC, E> { + pub fn get_ref<D, F: Fn(&V::Ref) -> KvResult<D>>(&self, k: &E::K, f: F) -> KvResult<Option<D>> { + self.col_reader.backend_col.get_ref::<E::K, V, D, F>(k, f) + } +} +impl<'db, V: ValueSliceZc, BC: BackendCol, E: EventTrait<V = V>> TxColRo<'db, BC, E> { + pub fn get_ref_slice<D, F: Fn(&[V::Elem]) -> KvResult<D>>( + &self, + k: &E::K, + f: F, + ) -> KvResult<Option<D>> { + self.col_reader + .backend_col + .get_ref_slice::<E::K, V, D, F>(k, f) + } +} + +pub trait TransactionalRead<'db, BC: BackendCol> { + type TxCols; + + fn read<D, F: Fn(Self::TxCols) -> KvResult<D>>(&'db self, f: F) -> KvResult<D>; + + fn try_read<D, F: Fn(Self::TxCols) -> KvResult<D>>(&'db self, f: F) -> Result<KvResult<D>, F>; +} + +impl<'db, BC: BackendCol, E: EventTrait> TransactionalRead<'db, BC> for &'db ColRo<BC, E> { + type TxCols = TxColRo<'db, BC, E>; + + fn read<D, F: Fn(Self::TxCols) -> KvResult<D>>(&'db self, f: F) -> KvResult<D> { + let read_guard_0 = self.inner.read(); + + f(TxColRo::new(read_guard_0)) + } + + fn try_read<D, F: Fn(Self::TxCols) -> KvResult<D>>(&'db self, f: F) -> Result<KvResult<D>, F> { + if let Some(read_guard_0) = self.inner.try_read() { + Ok(f(TxColRo::new(read_guard_0))) + } else { + Err(f) + } + } +} + +macro_rules! impl_transactional_read { + ($($i:literal),*) => { + paste::paste! { + impl<'db, BC: BackendCol $( ,[<E $i>]: EventTrait)*> TransactionalRead<'db, BC> + for ($(&'db ColRo<BC, [<E $i>]>, )*) + { + type TxCols = ($(TxColRo<'db, BC, [<E $i>]>, )*); + + fn read<D, F: Fn(Self::TxCols) -> KvResult<D>>( + &'db self, + f: F, + ) -> KvResult<D> { + $(let [<read_guard_ $i>] = self.$i.inner.read();)* + + f(($(TxColRo::new([<read_guard_ $i>]), )*)) + } + + fn try_read<D, F: Fn(Self::TxCols) -> KvResult<D>>( + &'db self, + f: F, + ) -> Result<KvResult<D>, F> { + $(let [<read_guard_opt_ $i>] = self.$i.inner.try_read();)* + + if $([<read_guard_opt_ $i>].is_none() || )* false { + Err(f) + } else { + Ok(f(($(TxColRo::new([<read_guard_opt_ $i>].expect("unreachable")), )*))) + } + } + } + } + }; +} +impl_transactional_read!(0, 1); +impl_transactional_read!(0, 1, 2); +impl_transactional_read!(0, 1, 2, 3); +impl_transactional_read!(0, 1, 2, 3, 4); +impl_transactional_read!(0, 1, 2, 3, 4, 5); +impl_transactional_read!(0, 1, 2, 3, 4, 5, 6); +impl_transactional_read!(0, 1, 2, 3, 4, 5, 6, 7); +impl_transactional_read!(0, 1, 2, 3, 4, 5, 6, 7, 8); +impl_transactional_read!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); diff --git a/rust-libs/tools/kv_typed/src/transactional_write.rs b/rust-libs/tools/kv_typed/src/transactional_write.rs new file mode 100644 index 0000000000000000000000000000000000000000..64b522788bdeecc4051f1464eabaf93771c3dfff --- /dev/null +++ b/rust-libs/tools/kv_typed/src/transactional_write.rs @@ -0,0 +1,206 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KV Typed transactional write + +pub(crate) mod tx_iter; + +use crate::*; +use parking_lot::RwLockUpgradableReadGuard as UpgradableReadGuard; + +pub struct TxColRw<'db, BC: BackendCol, E: EventTrait> { + batch: &'static mut Batch<BC, ColRw<BC, E>>, + col_reader: &'db UpgradableReadGuard<'db, ColInner<BC, E>>, +} +impl<'db, BC: BackendCol, E: EventTrait> Debug for TxColRw<'db, BC, E> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LevelDbCol") + .field("batch", &format!("{:?}", self.batch)) + .field("col_reader", &format!("{:?}", self.col_reader)) + .finish() + } +} + +impl<'db, V: ValueZc, BC: BackendCol, E: EventTrait<V = V>> TxColRw<'db, BC, E> { + pub fn get_ref<D, F: Fn(&V::Ref) -> KvResult<D>>(&self, k: &E::K, f: F) -> KvResult<Option<D>> { + self.col_reader.backend_col.get_ref::<E::K, V, D, F>(k, f) + } +} +impl<'db, V: ValueSliceZc, BC: BackendCol, E: EventTrait<V = V>> TxColRw<'db, BC, E> { + pub fn get_ref_slice<D, F: Fn(&[V::Elem]) -> KvResult<D>>( + &self, + k: &E::K, + f: F, + ) -> KvResult<Option<D>> { + self.col_reader + .backend_col + .get_ref_slice::<E::K, V, D, F>(k, f) + } +} + +impl<'db, BC: BackendCol, E: EventTrait> TxColRw<'db, BC, E> { + /*type BackendCol = BC; + type K = E::K; + type V = E::V; + type Event = E;*/ + + #[inline(always)] + pub fn count(&self) -> KvResult<usize> { + self.col_reader.backend_col.count() + } + #[inline(always)] + pub fn get(&self, k: &E::K) -> KvResult<Option<E::V>> { + match self.batch.get(k) { + batch::BatchGet::None => self.col_reader.backend_col.get(k), + batch::BatchGet::Deleted => Ok(None), + batch::BatchGet::Updated(v) => Ok(Some(v.as_bytes(|v_bytes| { + E::V::from_bytes(v_bytes).map_err(|e| KvError::DeserError(format!("{}", e))) + })?)), + } + } + #[allow(clippy::type_complexity)] + #[inline(always)] + /// Don't worry about complex iter type. Use it like an `impl Iterator<Item=KvResult<(K, V)>>`. + pub fn iter<'tx, D, R, F>(&'tx self, range: R, f: F) -> D + where + D: Send + Sync, + R: 'static + RangeBounds<E::K>, + F: FnOnce( + KvIter< + BC, + CowKB<'tx, BC::KeyBytes>, + CowVB<'tx, BC::ValueBytes>, + BackendTxIter<BC>, + E::K, + E::V, + >, + ) -> D, + { + let range_bytes = crate::iter::convert_range::<E::K, R>(range); + let backend_iter = self + .col_reader + .backend_col + .iter::<E::K, E::V>(range_bytes.clone()); + f(KvIter::new( + BackendTxIter::new(backend_iter, &self.batch.tree), + range_bytes, + )) + } +} + +pub trait DbTxCollectionRw { + type K: Key; + type V: Value; + type Event: EventTrait<K = Self::K, V = Self::V>; + + fn remove(&mut self, k: Self::K); + fn upsert(&mut self, k: Self::K, v: Self::V); +} + +impl<'db, BC: BackendCol, E: EventTrait> DbTxCollectionRw for TxColRw<'db, BC, E> { + type K = E::K; + type V = E::V; + type Event = E; + + #[inline(always)] + fn remove(&mut self, k: Self::K) { + self.batch.remove(k) + } + #[inline(always)] + fn upsert(&mut self, k: Self::K, v: Self::V) { + self.batch.upsert(k, v) + } +} + +pub trait TransactionalWrite<'db, BC: BackendCol> { + type TxCols; + + fn write<D, F: FnOnce(Self::TxCols) -> KvResult<D>>(&'db self, f: F) -> KvResult<D>; +} + +impl<'db, BC: BackendCol, E: EventTrait> TransactionalWrite<'db, BC> for &'db ColRw<BC, E> { + type TxCols = TxColRw<'db, BC, E>; + + fn write<D, F: FnOnce(Self::TxCols) -> KvResult<D>>(&'db self, f: F) -> KvResult<D> { + let upgradable_guard = self.inner.inner.upgradable_read(); + + let mut batch = Batch::<BC, ColRw<BC, E>>::default(); + + let tx_col = TxColRw { + batch: unsafe { std::mem::transmute(&mut batch) }, + col_reader: unsafe { std::mem::transmute(&upgradable_guard) }, + }; + let data = f(tx_col)?; + + // Prepare commit + let (backend_batch, events) = batch.into_backend_batch_and_events(); + + // Acquire exclusive lock + let mut write_guard = UpgradableReadGuard::upgrade(upgradable_guard); + + // Commit + self.write_backend_batch(backend_batch, events, &mut write_guard)?; + + Ok(data) + } +} + +macro_rules! impl_transactional_write { + ($($i:literal),*) => { + paste::paste! { + impl<'db, BC: BackendCol $( ,[<E $i>]: EventTrait)*> TransactionalWrite<'db, BC> + for ($(&'db ColRw<BC, [<E $i>]>, )*) + { + type TxCols = ($(TxColRw<'db, BC, [<E $i>]>, )*); + + fn write<D, F: FnOnce(Self::TxCols) -> KvResult<D>>( + &'db self, + f: F, + ) -> KvResult<D> { + $(let [<upgradable_guard_ $i>] = self.$i.inner.inner.upgradable_read();)* + + $(let mut [<batch_ $i>] = Batch::<BC, ColRw<BC, [<E $i>]>>::default();)* + + $(let [<tx_col $i>] = TxColRw { + batch: unsafe { std::mem::transmute(&mut [<batch_ $i>]) }, + col_reader: unsafe { std::mem::transmute(&[<upgradable_guard_ $i>]) }, + };)* + + let data = f(($([<tx_col $i>], )*))?; + + // Prepare commit + $(let ([<backend_batch_ $i>], [<events_ $i>]) = [<batch_ $i>].into_backend_batch_and_events();)* + + // Acquire exclusive lock + $(let mut [<write_guard_ $i>] = UpgradableReadGuard::upgrade([<upgradable_guard_ $i>]);)* + + // Commit + $(self.$i.write_backend_batch([<backend_batch_ $i>], [<events_ $i>], &mut [<write_guard_ $i>])?;)* + + Ok(data) + } + } + } + }; +} +impl_transactional_write!(0, 1); +impl_transactional_write!(0, 1, 2); +impl_transactional_write!(0, 1, 2, 3); +impl_transactional_write!(0, 1, 2, 3, 4); +impl_transactional_write!(0, 1, 2, 3, 4, 5); +impl_transactional_write!(0, 1, 2, 3, 4, 5, 6); +impl_transactional_write!(0, 1, 2, 3, 4, 5, 6, 7); +impl_transactional_write!(0, 1, 2, 3, 4, 5, 6, 7, 8); +impl_transactional_write!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); diff --git a/rust-libs/tools/kv_typed/src/transactional_write/tx_iter.rs b/rust-libs/tools/kv_typed/src/transactional_write/tx_iter.rs new file mode 100644 index 0000000000000000000000000000000000000000..fe1c28bcf79c30b5a85fdc86baf551313a203596 --- /dev/null +++ b/rust-libs/tools/kv_typed/src/transactional_write/tx_iter.rs @@ -0,0 +1,157 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! KV Typed transactional iterator + +use crate::*; +use std::collections::BTreeMap; + +#[doc(hidden)] +#[derive(Debug)] +pub struct BackendTxIter<'b, BC: BackendCol> { + batch_end_reached: bool, + batch_iter: std::collections::btree_map::Iter<'b, IVec, Option<IVec>>, + batch_tree_ref: &'b BTreeMap<IVec, Option<IVec>>, + backend_iter: BC::Iter, + db_end_reached: bool, + next_batch_entry_opt: Option<(&'b IVec, &'b Option<IVec>)>, + next_db_entry_opt: Option<(BC::KeyBytes, BC::ValueBytes)>, + reverted: bool, +} + +impl<'b, BC: BackendCol> BackendTxIter<'b, BC> { + pub(crate) fn new( + backend_iter: BC::Iter, + batch_tree: &'b BTreeMap<IVec, Option<IVec>>, + ) -> Self { + Self { + batch_end_reached: false, + batch_iter: batch_tree.iter(), + batch_tree_ref: batch_tree, + backend_iter, + db_end_reached: false, + next_batch_entry_opt: None, + next_db_entry_opt: None, + reverted: false, + } + } +} + +impl<'b, BC: BackendCol> BackendTxIter<'b, BC> { + fn get_next_db_item(&mut self) -> Option<BackendResult<BC>> { + match self.backend_iter.next() { + Some(Ok(entry)) => { + if self.batch_tree_ref.contains_key(entry.0.as_ref()) { + self.get_next_db_item() + } else { + Some(Ok(entry)) + } + } + o => o, + } + } +} + +#[allow(type_alias_bounds)] +type CowBytesEntry<'a, BC: BackendCol> = (CowKB<'a, BC::KeyBytes>, CowVB<'a, BC::ValueBytes>); + +impl<'b, BC: BackendCol> Iterator for BackendTxIter<'b, BC> { + type Item = Result<CowBytesEntry<'b, BC>, DynErr>; + + fn next(&mut self) -> Option<Self::Item> { + if self.next_batch_entry_opt.is_none() { + self.next_batch_entry_opt = if self.reverted { + self.batch_iter.next_back() + } else { + self.batch_iter.next() + }; + } + if self.next_batch_entry_opt.is_none() { + self.batch_end_reached = true; + } + if self.next_db_entry_opt.is_none() { + self.next_db_entry_opt = match self.get_next_db_item() { + Some(Ok(entry)) => Some(entry), + Some(Err(e)) => return Some(Err(e)), + None => { + self.db_end_reached = true; + None + } + }; + } + + if self.batch_end_reached { + if self.db_end_reached { + None + } else { + // Return db item + Some(Ok(self + .next_db_entry_opt + .take() + .map(|(k, v)| (CowKB::O(k), CowVB::O(v))) + .expect("unreachable"))) + } + } else if self.db_end_reached { + // Return batch item + if let Some((k, v_opt)) = self.next_batch_entry_opt.take() { + if let Some(v) = v_opt { + Some(Ok((CowKB::B(k.as_ref()), CowVB::B(v.as_ref())))) + } else { + self.next() + } + } else { + // batch_end_reached = false + unreachable!() + } + } else if let Some((k_batch, v_batch_opt)) = self.next_batch_entry_opt.take() { + if let Some((k_db, v_db)) = self.next_db_entry_opt.take() { + if (!self.reverted && k_batch.as_ref() <= k_db.as_ref()) + || (self.reverted && k_batch.as_ref() >= k_db.as_ref()) + { + self.next_db_entry_opt = Some((k_db, v_db)); + // Return batch item + if let Some(v_batch) = v_batch_opt { + Some(Ok((CowKB::B(k_batch.as_ref()), CowVB::B(v_batch.as_ref())))) + } else { + self.next() + } + } else { + self.next_batch_entry_opt = Some((k_batch, v_batch_opt)); + // Return db item + Some(Ok((CowKB::O(k_db), CowVB::O(v_db)))) + } + } else { + // db_end_reached = false + unreachable!() + } + } else { + // batch_end_reached = false + unreachable!() + } + } +} + +impl<'b, BC: BackendCol> ReversableIterator for BackendTxIter<'b, BC> { + fn reverse(mut self) -> Self { + self.backend_iter = self.backend_iter.reverse(); + self.reverted = true; + self + } +} + +impl<'b, BC: BackendCol> BackendIter<CowKB<'b, BC::KeyBytes>, CowVB<'b, BC::ValueBytes>> + for BackendTxIter<'b, BC> +{ +} diff --git a/rust-libs/tools/kv_typed/tests/db_schema.rs b/rust-libs/tools/kv_typed/tests/db_schema.rs index c1998fa286f77b5f4adbf6b9f933b55a61abbae6..4b635aab689a0c0ed647c846c5045b823116d5bd 100644 --- a/rust-libs/tools/kv_typed/tests/db_schema.rs +++ b/rust-libs/tools/kv_typed/tests/db_schema.rs @@ -1,24 +1,24 @@ -#[cfg(feature = "memory_backend")] mod tests { use kv_typed::prelude::*; use smallvec::SmallVec; use std::fmt::Debug; + use unwrap::unwrap; - #[derive(Debug, PartialEq)] + #[derive(Clone, Debug, PartialEq)] pub struct VecU128(Vec<u128>); kv_typed::impl_value_for_vec_zc!(VecU128, u128); - #[derive(Debug, PartialEq)] + #[derive(Clone, Debug, PartialEq)] pub struct SVecU128(SmallVec<[u128; 4]>); kv_typed::impl_value_for_smallvec_zc!(SVecU128, u128, 4); use std::collections::BTreeSet; - #[derive(Debug, PartialEq)] + #[derive(Clone, Debug, PartialEq)] pub struct BTSetU128(BTreeSet<u128>); kv_typed::impl_value_for_btreeset_zc!(BTSetU128, u128); use std::collections::HashSet; - #[derive(Debug, PartialEq)] + #[derive(Clone, Debug, PartialEq)] pub struct HashSetU128(HashSet<u128>); kv_typed::impl_value_for_hashset_zc!(HashSetU128, u128); @@ -32,14 +32,33 @@ mod tests { ] ); - //#[maybe_async::test(not(feature = "async"), async(feature = "async", async_std::test))] #[test] - fn test_db_schema() -> KvResult<()> { + fn test_db_schema_lmdb() -> KvResult<()> { + let tmp_dir = unwrap!(tempdir::TempDir::new("kv_typed_lmdb")); + let db = TestV1Db::<kv_typed::backend::lmdb::Lmdb>::open( + kv_typed::backend::lmdb::LmdbConf::default().folder_path(tmp_dir.path().to_owned()), + )?; + + test_db_schema(&db) + } + + #[test] + fn test_db_schema_mem() -> KvResult<()> { let db = TestV1Db::<Mem>::open(MemConf::default())?; - #[cfg(feature = "subscription")] + test_db_schema(&db) + } + + //#[cfg(feature = "sled_backend")] + #[test] + fn test_db_schema_sled() -> KvResult<()> { + let db = TestV1Db::<Sled>::open(SledConf::default().temporary(true))?; + + test_db_schema(&db) + } + + fn test_db_schema<B: Backend>(db: &TestV1Db<B>) -> KvResult<()> { let (sender, recv) = kv_typed::channel::unbounded(); - #[cfg(feature = "subscription")] db.col_1().subscribe(sender)?; let db2 = db.clone(); @@ -47,18 +66,14 @@ mod tests { let handler = std::thread::spawn(move || db2.col_1_write().upsert(3, "toto".to_owned())); handler.join().expect("thread panic")?; - #[cfg(feature = "subscription")] - { - let expected_events: Events<Col1Event> = smallvec::smallvec![Col1Event::Upsert { - key: 3, - value: "toto".to_owned(), - }]; - #[allow(unused_parens)] - if let Ok(msg) = recv.recv() { - assert_eq!(msg.as_ref(), &expected_events,) - } else { - panic!("must be receive event") - } + let expected_events: Events<Col1Event> = smallvec::smallvec![Col1Event::Upsert { + key: 3, + value: "toto".to_owned(), + }]; + if let Ok(msg) = recv.recv() { + assert_eq!(msg.as_ref(), &expected_events,) + } else { + panic!("must be receive event") } assert_eq!(db.col_1().get(&3)?, Some("toto".to_owned()),); @@ -74,27 +89,33 @@ mod tests { db.col_1_write().upsert(5, "tutu".to_owned())?; - { - let mut iter = db.col_1().iter(..); - + db.col_1().iter(.., |mut iter| { assert_eq!(iter.next_res()?, Some((3, "toto".to_owned()))); assert_eq!(iter.next_res()?, Some((5, "tutu".to_owned()))); assert_eq!(iter.next_res()?, None); + Ok::<(), KvError>(()) + })?; - let mut iter = db.col_1().iter(..).values().reverse(); + db.col_1().iter(.., |it| { + let mut iter = it.values().reverse(); assert_eq!(iter.next_res()?, Some("tutu".to_owned())); assert_eq!(iter.next_res()?, Some("toto".to_owned())); assert_eq!(iter.next_res()?, None); - } + Ok::<(), KvError>(()) + })?; db.col_1_write().upsert(7, "titi".to_owned())?; - let mut iter = db.col_1().iter(..).values().reverse().step_by(2); + db.col_1().iter(.., |it| { + let mut iter = it.values().reverse().step_by(2); - assert_eq!(iter.next_res()?, Some("titi".to_owned())); - assert_eq!(iter.next_res()?, Some("toto".to_owned())); - assert_eq!(iter.next_res()?, None); + assert_eq!(iter.next_res()?, Some("titi".to_owned())); + assert_eq!(iter.next_res()?, Some("toto".to_owned())); + assert_eq!(iter.next_res()?, None); + + Ok::<(), KvError>(()) + })?; db.col_3_write().upsert(4, VecU128(vec![1, 2, 3]))?; db.col_3().get_ref_slice(&4, |numbers| { @@ -102,6 +123,7 @@ mod tests { Ok(()) })?; + // Test get_ref_slice use std::iter::FromIterator as _; db.col_4_write().upsert( 4, @@ -112,6 +134,79 @@ mod tests { Ok(()) })?; + // Test transactional + // A read tx should be opened when write tx not commited + let (s1, r1) = flume::bounded::<()>(0); + let (s2, r2) = flume::bounded::<()>(0); + let db_ro = db.get_ro_handler(); + let read_task = std::thread::spawn(move || { + r1.recv().expect("disconnected"); + (db_ro.col_3(), db_ro.col_4(), db_ro.col_2()).read(|(c3, c4, _c2)| { + c3.get_ref_slice(&4, |numbers| { + assert_eq!(numbers, &[1, 2, 3]); + Ok(()) + })?; + c3.iter(.., |it| { + let iter = it.keys(); + s2.send(()).expect("disconnected"); + assert_eq!(iter.collect::<KvResult<Vec<_>>>()?, vec![4]); + Ok::<(), KvError>(()) + })?; + c4.get_ref_slice(&4, |numbers| { + assert_eq!(numbers, &[1, 2, 3, 4]); + Ok(()) + })?; + Ok(()) + }) + }); + + let tres: KvResult<()> = (db.col_3_write(), db.col_4_write(), db.col_2_write()).write( + |(mut c3, mut c4, _c2)| { + s1.send(()).expect("disconnected"); + assert_eq!( + c3.iter(.., |it| it.keys().collect::<KvResult<Vec<_>>>())?, + vec![4] + ); + assert_eq!( + c3.iter(.., |it| it.values().collect::<KvResult<Vec<_>>>())?, + vec![VecU128(vec![1, 2, 3])] + ); + c3.upsert(42, VecU128(vec![5, 4, 6])); + assert_eq!( + c3.iter(.., |it| it.keys().collect::<KvResult<Vec<_>>>())?, + vec![4, 42] + ); + assert_eq!( + c3.iter(.., |it| it.reverse().keys().collect::<KvResult<Vec<_>>>())?, + vec![42, 4] + ); + c3.upsert(8, VecU128(vec![11, 12, 13])); + c3.remove(4); + assert_eq!( + c3.iter(.., |it| it.keys().collect::<KvResult<Vec<_>>>())?, + vec![8, 42] + ); + c3.iter(.., |it| { + let iter = it.reverse().keys(); + r2.recv().expect("disconnected"); + assert_eq!(iter.collect::<KvResult<Vec<_>>>()?, vec![42, 8]); + + Ok::<(), KvError>(()) + })?; + c4.upsert( + 4, + BTSetU128(BTreeSet::from_iter((&[7, 8, 6, 5]).iter().copied())), + ); + Ok(()) + }, + ); + tres?; + read_task.join().expect("read task panic")?; + + // Test clear() + db.col_4_write().clear()?; + assert_eq!(db.col_4().count()?, 0); + Ok(()) } } diff --git a/rust-libs/tools/kv_typed_code_gen/src/db_readable.rs b/rust-libs/tools/kv_typed_code_gen/src/db_readable.rs index 9773269a951f47c9ca1f2adf5ff71652bf4c4535..0c5977d8e8248c8fafaaac058cc0dd02600b4268 100644 --- a/rust-libs/tools/kv_typed_code_gen/src/db_readable.rs +++ b/rust-libs/tools/kv_typed_code_gen/src/db_readable.rs @@ -20,30 +20,24 @@ pub(crate) fn impl_db_readable( db: &Ident, db_ro: &Ident, db_readable: &Ident, - col_name_class: &[Ident], col_field: &[Ident], col_event_type: &[Ident], - col_key_type: &[Ident], - col_value_type: &[Ident], ) -> proc_macro2::TokenStream { quote! { pub trait #db_readable: Sized { type Backend: Backend; - #(type #col_name_class: DbCollectionRo<Event=#col_event_type, K=#col_key_type, V=#col_value_type>;)* - #(fn #col_field(&self) -> Self::#col_name_class;)* + #(fn #col_field(&self) -> &ColRo<<Self::Backend as Backend>::Col, #col_event_type>;)* } impl<B: Backend> #db_readable for #db<B> { type Backend = B; - #(type #col_name_class = ColRo<B::Col, #col_event_type>;)* - #(fn #col_field(&self) -> Self::#col_name_class { self.collections.#col_field.to_ro().clone() })* + #(fn #col_field(&self) -> &ColRo<B::Col, #col_event_type> { &self.collections.#col_field.to_ro() })* } impl<B: Backend> #db_readable for #db_ro<B>{ type Backend = B; - #(type #col_name_class = ColRo<B::Col, #col_event_type>;)* - #(fn #col_field(&self) -> Self::#col_name_class { self.collections.#col_field.clone() })* + #(fn #col_field(&self) -> &ColRo<B::Col, #col_event_type> { &self.collections.#col_field })* } } } diff --git a/rust-libs/tools/kv_typed_code_gen/src/lib.rs b/rust-libs/tools/kv_typed_code_gen/src/lib.rs index cb3712dd8e1c77f628695725adebce2710b2a3d8..f4306e21511fd0b01c865ac3cfb1d00ebb2a73cd 100644 --- a/rust-libs/tools/kv_typed_code_gen/src/lib.rs +++ b/rust-libs/tools/kv_typed_code_gen/src/lib.rs @@ -88,16 +88,7 @@ pub fn db_schema(input: TokenStream) -> TokenStream { /*let define_each_db_collection: Vec<proc_macro2::TokenStream> = collections.iter().map(define_db_collection).collect();*/ let db_readable = format_ident!("{}Readable", db); - let impl_db_readable = impl_db_readable( - &db, - &db_ro, - &db_readable, - &col_name_class, - &col_field, - &col_event_type, - &col_key_type, - &col_value_type, - ); + let impl_db_readable = impl_db_readable(&db, &db_ro, &db_readable, &col_field, &col_event_type); let db_writable = format_ident!("{}Writable", db); let impl_db_writable = impl_db_writable( &db, @@ -138,7 +129,7 @@ pub fn db_schema(input: TokenStream) -> TokenStream { type Backend = kv_typed::backend::mock::MockBackend; #(type #col_name_class = kv_typed::prelude::MockColRo<#col_event_type>;)* - #(fn #col_field(&self) -> kv_typed::prelude::MockColRo<#col_event_type>;)* + #(fn #col_field(&self) -> &kv_typed::prelude::MockColRo<#col_event_type>;)* } } // Inner module used to hide internals types that must not be exposed on public api