Commit c687d2a7 authored by Éloïs's avatar Éloïs

[ref] blockchain-dal: #162: migrate blocks collection in LMDB

parent 6298000d
Pipeline #6450 passed with stages
in 30 minutes and 9 seconds
This diff is collapsed.
......@@ -14,6 +14,7 @@ members = [
"lib/dubp/user-docs",
"lib/dubp/wot",
"lib/dunp/network-documents",
"lib/modules-lib/common-dal",
"lib/modules/blockchain/blockchain",
"lib/modules/blockchain/blockchain-dal",
"lib/modules/skeleton",
......
[package]
name = "durs-common-dal"
version = "0.1.0-a"
authors = ["librelois <elois@ifee.fr>"]
description = "Common Data Access Layer for Dunitrust project."
license = "AGPL-3.0"
edition = "2018"
[lib]
path = "src/lib.rs"
[dependencies]
bincode = "1.0.*"
durs-common-tools = { path = "../../tools/common-tools" }
fnv = "1.0.6"
log = "0.4.*"
rkv = "0.9.7"
rustbreak = {version = "2.0.0-rc3", features = ["bin_enc"]}
serde = { version = "1.0.*", features = ["derive"] }
serde_json = "1.0.*"
unwrap = "1.2.1"
[dev-dependencies]
[features]
// Copyright (C) 2017-2019 The AXIOM TEAM Association.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Common Datas Access Layer for Dunitrust project
//! Errors manadgment
use rustbreak::error::{RustbreakError, RustbreakErrorKind};
#[derive(Debug)]
/// Data Access Layer Error
pub enum DALError {
/// Abort write transaction
WriteAbort {
/// Reason of transaction abort
reason: String,
},
/// Error in write operation
WriteError,
/// Error in read operation
ReadError,
/// A database is corrupted, you have to reset the data completely
DBCorrupted,
/// Error with the file system
FileSystemError,
/// Serialization/Deserialization error
SerdeError(String),
/// Rkv store error
StoreError(rkv::error::StoreError),
/// Capturing a panic signal during a write operation
WritePanic,
/// Unknown error
UnknowError,
}
impl From<bincode::Error> for DALError {
fn from(e: bincode::Error) -> DALError {
DALError::SerdeError(format!("{}", e))
}
}
impl From<rkv::error::StoreError> for DALError {
fn from(e: rkv::error::StoreError) -> DALError {
DALError::StoreError(e)
}
}
impl<T> From<std::sync::PoisonError<T>> for DALError {
fn from(_: std::sync::PoisonError<T>) -> DALError {
DALError::DBCorrupted
}
}
impl From<RustbreakError> for DALError {
fn from(rust_break_error: RustbreakError) -> DALError {
match rust_break_error.kind() {
RustbreakErrorKind::Serialization => DALError::WriteError,
RustbreakErrorKind::Deserialization => DALError::ReadError,
RustbreakErrorKind::Poison => DALError::DBCorrupted,
RustbreakErrorKind::Backend => DALError::FileSystemError,
RustbreakErrorKind::WritePanic => DALError::WritePanic,
_ => DALError::UnknowError,
}
}
}
// Copyright (C) 2017-2019 The AXIOM TEAM Association.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define free structure database
use crate::errors::DALError;
use rustbreak::backend::{FileBackend, MemoryBackend};
use rustbreak::error::RustbreakError;
use rustbreak::{deser::Bincode, Database, FileDatabase, MemoryDatabase};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::default::Default;
use std::fmt::Debug;
use std::fs;
use std::panic::UnwindSafe;
use std::path::PathBuf;
/// Open free structured rustbreak memory database
pub fn open_free_struct_memory_db<
D: Serialize + DeserializeOwned + Debug + Default + Clone + Send,
>() -> Result<MemoryDatabase<D, Bincode>, DALError> {
let backend = MemoryBackend::new();
let db = MemoryDatabase::<D, Bincode>::from_parts(D::default(), backend, Bincode);
Ok(db)
}
/// Open free structured rustbreak file database
pub fn open_free_struct_file_db<
D: Serialize + DeserializeOwned + Debug + Default + Clone + Send,
>(
dbs_folder_path: &PathBuf,
db_file_name: &str,
) -> Result<FileDatabase<D, Bincode>, DALError> {
let mut db_path = dbs_folder_path.clone();
db_path.push(db_file_name);
let file_path = db_path.as_path();
if file_path.exists()
&& fs::metadata(file_path)
.expect("fail to get file size")
.len()
> 0
{
let backend = FileBackend::open(db_path.as_path())?;
let db = FileDatabase::<D, Bincode>::from_parts(D::default(), backend, Bincode);
db.load()?;
Ok(db)
} else {
Ok(FileDatabase::<D, Bincode>::from_path(
db_path.as_path(),
D::default(),
)?)
}
}
#[derive(Debug)]
/// Database
pub enum BinFreeStructDb<D: Serialize + DeserializeOwned + Debug + Default + Clone + Send> {
/// File database
File(Database<D, FileBackend, Bincode>),
/// Memory database
Mem(Database<D, MemoryBackend, Bincode>),
}
impl<D: Serialize + DeserializeOwned + Debug + Default + Clone + Send> BinFreeStructDb<D> {
/// Flush the data structure to the backend
pub fn save(&self) -> Result<(), RustbreakError> {
match *self {
BinFreeStructDb::File(ref file_db) => file_db.save(),
BinFreeStructDb::Mem(ref mem_db) => mem_db.save(),
}
}
/// Read lock the database and get write access to the Data container
/// This gives you a read-only lock on the database. You can have as many readers in parallel as you wish.
pub fn read<T, R>(&self, task: T) -> Result<R, RustbreakError>
where
T: FnOnce(&D) -> R,
{
match *self {
BinFreeStructDb::File(ref file_db) => file_db.read(task),
BinFreeStructDb::Mem(ref mem_db) => mem_db.read(task),
}
}
/// Write lock the database and get write access to the Data container
/// This gives you an exclusive lock on the memory object. Trying to open the database in writing will block if it is currently being written to.
pub fn write<T>(&self, task: T) -> Result<(), RustbreakError>
where
T: FnOnce(&mut D),
{
match *self {
BinFreeStructDb::File(ref file_db) => file_db.write(task),
BinFreeStructDb::Mem(ref mem_db) => mem_db.write(task),
}
}
/// Write lock the database and get write access to the Data container in a safe way (clone of the internal data is made).
pub fn write_safe<T>(&self, task: T) -> Result<(), RustbreakError>
where
T: FnOnce(&mut D) + UnwindSafe,
{
match *self {
BinFreeStructDb::File(ref file_db) => file_db.write_safe(task),
BinFreeStructDb::Mem(ref mem_db) => mem_db.write_safe(task),
}
}
/// Load the Data from the backend
pub fn load(&self) -> Result<(), RustbreakError> {
match *self {
BinFreeStructDb::File(ref file_db) => file_db.load(),
BinFreeStructDb::Mem(ref mem_db) => mem_db.load(),
}
}
}
// Copyright (C) 2017-2019 The AXIOM TEAM Association.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define Key-Value database
mod bin_kiv_db;
mod bin_kv_db;
mod file;
mod file_inner_trait;
mod mem;
pub use bin_kiv_db::FileKivDb;
pub use bin_kv_db::FileKvDb;
use crate::errors::DALError;
use durs_common_tools::fatal_error;
use file_inner_trait::FileKvDbInnerTrait;
use log::error;
use rkv::{EnvironmentFlags, IntegerStore, Manager, Rkv, SingleStore, StoreOptions, Value};
use rustbreak::{backend::MemoryBackend, deser::Bincode, MemoryDatabase};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
/// Key-value Database with integer key
pub type BinKivDb<V> = KvDbStore<FileKivDb<V>>;
/// Key-value Database
pub type BinKvDb<K, V> = KvDbStore<FileKvDb<K, V>>;
// Key-value database reader
pub struct KvDbReader<'a> {
reader: Option<&'a rkv::Reader<'a>>,
}
/// Key-value database writer
pub struct KvDbWriter<'a> {
writer: Option<rkv::Writer<'a>>,
}
/// Key-value database
pub struct KvDb {
db: KvDbEnum,
}
/// Key-value database enum
enum KvDbEnum {
/// Key-value file Database
File(file::KvFileDb),
/// Key-value memory Database
Mem(mem::KvMemDb),
}
pub struct KvDbRO(KvDb);
impl KvDbRO {
// Try to clone read only handler
pub fn try_clone(&self) -> Result<KvDbRO, DALError> {
Ok(KvDbRO(self.0.try_clone()?))
}
}
impl KvDb {
/// Get read only handler
pub fn get_ro_handler(&self) -> Result<KvDbRO, DALError> {
Ok(KvDbRO(self.try_clone()?))
}
/// Open Key-value database
pub fn open_db(path: Option<&PathBuf>, schema: KbDbSchema) -> Result<KvDb, DALError> {
let db = if let Some(path) = path {
KvDbEnum::File(file::KvFileDb::open_db(path, &schema)?)
} else {
KvDbEnum::Mem(mem::KvMemDb::open_db(&schema)?)
};
Ok(KvDb { db })
}
/// Try to clone database handler
pub fn try_clone(&self) -> Result<KvDb, DALError> {
let db_clone = match &self.db {
KvDbEnum::File(fdb) => KvDbEnum::File(fdb.try_clone()?),
KvDbEnum::Mem(mdb) => KvDbEnum::Mem(mdb.clone()),
};
Ok(KvDb { db: db_clone })
}
}
/// Describe Key-Value database schema
#[derive(Debug, Clone)]
pub struct KbDbSchema {
stores: HashMap<String, KvDbStoreType>,
}
/// Key-value Database
pub enum KvDbStore<S>
where
S: FileKvDbTrait,
{
/// File database
File(S),
/// Memory database
Mem(MemoryDatabase<HashMap<<S as FileKvDbInnerTrait>::K, S::V>, Bincode>),
}
/// Key-value store type (store is like "table" in SGBD)
#[derive(Debug, Clone, Copy)]
pub enum KvDbStoreType {
/// Single valued map
Single,
/// Single valued map with integer key
SingleIntKey,
}
impl<S> KvDbStore<S>
where
S: FileKvDbTrait,
{
/// Get one value
pub fn get(
&self,
reader: &KvDbReader,
k: <S as FileKvDbInnerTrait>::K,
) -> Result<Option<S::V>, DALError> {
match self {
KvDbStore::File(file_kv_db) => file_kv_db.get(reader, k),
KvDbStore::Mem(mem_kv_db) => Ok(mem_kv_db.read(|datas| datas.get(&k).cloned())?),
}
}
/// Put one value
pub fn put(
&self,
writer: &mut KvDbWriter,
k: <S as FileKvDbInnerTrait>::K,
v: &S::V,
) -> Result<(), DALError> {
match self {
KvDbStore::File(file_kv_db) => file_kv_db.put(writer, k, v),
KvDbStore::Mem(mem_kv_db) => Ok(mem_kv_db.write(|datas| {
datas.insert(k, v.clone());
})?),
}
}
/// Delete one value
pub fn delete(
&self,
writer: &mut KvDbWriter,
k: <S as FileKvDbInnerTrait>::K,
) -> Result<(), DALError> {
match self {
KvDbStore::File(file_kv_db) => file_kv_db.delete(writer, k),
KvDbStore::Mem(mem_kv_db) => Ok(mem_kv_db.write(|datas| {
datas.remove(&k);
})?),
}
}
/// Open a Key-Value database
pub fn open(path: Option<&PathBuf>, collection_name: &str) -> Result<Self, DALError> {
if let Some(path) = path {
Ok(KvDbStore::File(S::open(path.as_path(), collection_name)?))
} else {
let backend = MemoryBackend::new();
Ok(KvDbStore::Mem(MemoryDatabase::<
HashMap<<S as FileKvDbInnerTrait>::K, S::V>,
Bincode,
>::from_parts(
HashMap::default(), backend, Bincode
)))
}
}
/// Read datas in transaction database
pub fn read<F, R>(&self, f: F) -> Result<R, DALError>
where
F: FnOnce(KvDbReader) -> Result<R, DALError>,
{
match self {
KvDbStore::File(file_kv_db) => file_kv_db.read(f),
KvDbStore::Mem(_) => f(KvDbReader { reader: None }),
}
}
/// Persist DB datas on disk
pub fn save(&self) -> Result<(), DALError> {
if let KvDbStore::File(file_kv_db) = self {
file_kv_db.save()
} else {
Ok(())
}
}
/// Write datas in database
/// /!\ The written data are visible to readers not persisted on the disk until a save() is performed.
pub fn write<F>(&self, f: F) -> Result<(), DALError>
where
F: FnOnce(KvDbWriter) -> Result<KvDbWriter, DALError>,
{
match self {
KvDbStore::File(file_kv_db) => file_kv_db.write(f),
KvDbStore::Mem(_) => {
f(KvDbWriter { writer: None })?;
Ok(())
}
}
}
}
pub trait FileKvDbTrait: FileKvDbInnerTrait {
/// Value
type V: 'static + Serialize + DeserializeOwned + Debug + Clone + Send;
/// Get one value
fn get(
&self,
reader: &KvDbReader,
k: <Self as FileKvDbInnerTrait>::K,
) -> Result<Option<Self::V>, DALError>;
/// Put one value
fn put(
&self,
writer: &mut KvDbWriter,
k: <Self as FileKvDbInnerTrait>::K,
v: &Self::V,
) -> Result<(), DALError> {
if let Some(ref mut writer) = writer.writer {
self.store_put(writer, k, &Value::Blob(&bincode::serialize(v)?[..]))?;
Ok(())
} else {
fatal_error!("Dev err: writer for file db must have a file writer");
}
}
/// Delete one value
fn delete(
&self,
writer: &mut KvDbWriter,
k: <Self as FileKvDbInnerTrait>::K,
) -> Result<(), DALError> {
if let Some(ref mut writer) = writer.writer {
self.store_delete(writer, k)?;
Ok(())
} else {
fatal_error!("Dev err: writer for file db must have a file writer");
}
}
/// Open a Key-Value database
fn open(path: &Path, collection_name: &str) -> Result<Self, DALError> {
let mut manager = Manager::singleton().write()?;
let mut env = Rkv::environment_builder();
env.set_flags(EnvironmentFlags::NO_SYNC)
.set_max_dbs(64)
.set_map_size(std::u32::MAX as usize);
let arc = manager.get_or_create(path, |path| Rkv::from_env(path, env))?;
Self::store_open(arc, collection_name)
}
/// Read datas in transaction database
fn read<F, D>(&self, f: F) -> Result<D, DALError>
where
F: FnOnce(KvDbReader) -> Result<D, DALError>,
{
Ok(f(KvDbReader {
reader: Some(&self.arc_clone().read()?.read()?),
})?)
}
/// Persist DB datas on disk
fn save(&self) -> Result<(), DALError> {
Ok(self.arc_clone().read()?.sync(true)?)
}
/// Write datas in database
/// /!\ The written data are visible to readers not persisted on the disk until a save() is performed.
fn write<F>(&self, f: F) -> Result<(), DALError>
where
F: FnOnce(KvDbWriter) -> Result<KvDbWriter, DALError>,
{
if let Some(writer) = f(KvDbWriter {
writer: Some(self.arc().read()?.write()?),
})?
.writer
{
writer.commit()?;
}
Ok(())
}
}
// Copyright (C) 2017-2019 The AXIOM TEAM Association.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define Key-Value database with integer key
use super::file_inner_trait::FileKvDbInnerTrait;
use super::{FileKvDbTrait, KvDbReader};
use crate::errors::DALError;
use rkv::{IntegerStore, Rkv, StoreError, StoreOptions, Value};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};
/// Key-Value persisted DB with integer key
pub struct FileKivDb<V>
where
V: 'static + Serialize + DeserializeOwned + Debug + Clone + Send,
{
arc: Arc<RwLock<Rkv>>,
store: IntegerStore<u32>,
//store_name: String,
phantom: PhantomData<V>,
}
impl<V> FileKvDbInnerTrait for FileKivDb<V>
where
V: 'static + Serialize + DeserializeOwned + Debug + Clone + Send,
{
type K = u32;
fn arc(&self) -> &Arc<RwLock<Rkv>> {
&self.arc
}
fn arc_clone(&self) -> Arc<RwLock<Rkv>> {
self.arc().clone()
}
fn store_open(arc: Arc<RwLock<Rkv>>, store_name: &str) -> Result<Self, DALError> {
let store = arc
.clone()
.read()?
.open_integer(store_name, StoreOptions::create())?;
Ok(FileKivDb {
arc,
store,
//store_name: collection_name.to_owned(),
phantom: PhantomData,
})
}
fn store_put(
&self,
writer: &mut rkv::Writer,
k: Self::K,
value: &Value,
) -> Result<(), StoreError> {
self.store.put(writer, k, value)
}
fn store_delete(&self, writer: &mut rkv::Writer, k: Self::K) -> Result<(), StoreError> {
self.store.delete(writer, k)
}
}
impl<V> FileKvDbTrait for FileKivDb<V>
where
V: 'static + Serialize + DeserializeOwned + Debug + Clone + Send,
{
type V = V;
fn get(&self, reader: &KvDbReader, k: u32) -> Result<Option<V>, DALError> {
if let Some(Value::Blob(v)) = self.store.get(
reader
.reader
.expect("Dev err: reader for file db must have a file reader"),
k,
)? {
Ok(Some(bincode::deserialize(&v)?))
} else {
Ok(None)
}
}
}
// Copyright (C) 2017-2019 The AXIOM TEAM Association.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Define Key-Value database
use super::file_inner_trait::FileKvDbInnerTrait;
use super::{FileKvDbTrait, KvDbReader};
use crate::errors::DALError;
use rkv::{Rkv, SingleStore, StoreError, StoreOptions, Value};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};
/// Key-Value persisted DB
pub struct FileKvDb<K, V>
where
K: 'static + AsRef<[u8]> + Clone + Debug + DeserializeOwned + Eq + Hash + Send + Serialize,
V: 'static + Serialize + DeserializeOwned + Debug + Clone + Send,
{
arc: Arc<RwLock<Rkv>>,
store: SingleStore,
//store_name: String,
phantom_key: PhantomData<K>,
phantom_value: PhantomData<V>,
}
impl<K, V> FileKvDbInnerTrait for FileKvDb<K, V>
where
K: 'static + AsRef<[u8]> + Clone + Debug + DeserializeOwned + Eq + Hash + Send + Serialize,
V: 'static + Serialize + DeserializeOwned + Debug + Clone + Send,
{
type K = K;
fn arc(&self) -> &Arc<RwLock<Rkv>> {
&self.arc
}
fn arc_clone(&self) -> Arc<RwLock<Rkv>> {
self.arc().clone()
}
fn store_open(arc: Arc<RwLock<Rkv>>, store_name: &str) -> Result<Self, DALError> {
let store = arc