diff --git a/Cargo.lock b/Cargo.lock index b966b136042bca1ae6d779cf00b96d657c0cf539..3ea57d5a4a3e2d4681b24ba4172b73e85d63f0ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,10 +510,6 @@ dependencies = [ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", - "pbr 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "prettytable-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "unwrap 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -573,7 +569,7 @@ dependencies = [ "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "pbr 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "prettytable-rs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/lib/dubp/block-doc/src/block/v10.rs b/lib/dubp/block-doc/src/block/v10.rs index 497c9d47621807ec801d638e381787fcfbcc597c..cbf00fbfd867a79e0e7aac104d2755b1fe61a6de 100644 --- a/lib/dubp/block-doc/src/block/v10.rs +++ b/lib/dubp/block-doc/src/block/v10.rs @@ -35,43 +35,8 @@ use dubp_user_docs::documents::transaction::{TransactionDocument, TransactionDoc use dup_crypto::hashs::Hash; use dup_crypto::keys::*; use durs_common_tools::fatal_error; -use std::ops::Deref; use unwrap::unwrap; -/// Store a transaction document or just its hash. -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -pub enum TxDocOrTxHash { - /// Transaction document - TxDoc(Box<TransactionDocument>), - /// transaction hash - TxHash(Hash), -} - -impl TxDocOrTxHash { - /// Lightens the TxDocOrTxHash (for example to store it while minimizing the space required) - /// lightening consists in transforming the document by its hash. - pub fn reduce(&self) -> TxDocOrTxHash { - if let TxDocOrTxHash::TxDoc(ref tx_doc) = self { - let tx_doc = tx_doc.deref(); - if let Some(ref hash) = tx_doc.get_hash_opt() { - TxDocOrTxHash::TxHash(*hash) - } else { - TxDocOrTxHash::TxHash(tx_doc.compute_hash()) - } - } else { - self.clone() - } - } - /// Get TxDoc variant - pub fn unwrap_doc(&self) -> TransactionDocument { - if let TxDocOrTxHash::TxDoc(ref tx_doc) = self { - tx_doc.deref().clone() - } else { - fatal_error!("Try to unwrap_doc() in a TxHash() variant of TxDocOrTxHash !") - } - } -} - /// Wrap a Block document. /// /// Must be created by parsing a text document or using a builder. @@ -135,7 +100,7 @@ pub struct BlockDocumentV10 { /// Certifications pub certifications: Vec<TextDocumentFormat<CertificationDocumentV10>>, /// Transactions - pub transactions: Vec<TxDocOrTxHash>, + pub transactions: Vec<TransactionDocument>, } impl BlockDocumentTrait for BlockDocumentV10 { @@ -206,11 +171,9 @@ impl BlockDocumentTrait for BlockDocumentV10 { certifications_str.push_str(&certification.as_compact_text()); } let mut transactions_str = String::from(""); - for transaction in self.transactions.clone() { - if let TxDocOrTxHash::TxDoc(transaction) = transaction { - transactions_str.push_str("\n"); - transactions_str.push_str(&transaction.deref().generate_compact_text()); - } + for transaction in &self.transactions { + transactions_str.push_str("\n"); + transactions_str.push_str(&transaction.generate_compact_text()); } let mut dividend_str = String::from(""); if let Some(dividend) = self.dividend { @@ -610,12 +573,7 @@ impl ToStringObject for BlockDocumentV10 { transactions: self .transactions .iter() - .map(|tx_doc_or_tx_hash| match tx_doc_or_tx_hash { - TxDocOrTxHash::TxDoc(tx_doc) => tx_doc.to_string_object(), - TxDocOrTxHash::TxHash(_) => { - fatal_error!("Try to stringify block without their tx documents") - } - }) + .map(|tx_doc| tx_doc.to_string_object()) .collect(), } } @@ -766,7 +724,7 @@ a9PHPuSfw7jW8FRQHXFsGi/bnLjbtDnTYvEVgUC9u0WlR7GVofa+Xb+l5iy6NwuEXiwvueAkf08wPVY8 revoked: Vec::new(), excluded: Vec::new(), certifications: vec![TextDocumentFormat::Complete(cert1)], - transactions: vec![TxDocOrTxHash::TxDoc(Box::new(tx1)), TxDocOrTxHash::TxDoc(Box::new(tx2))], + transactions: vec![tx1, tx2], }; // test inner_hash computation block.generate_inner_hash(); @@ -948,7 +906,7 @@ nxr4exGrt16jteN9ZX3XZPP9l+X0OUbZ1o/QjE1hbWQNtVU3HhH9SJoEvNj2iVl3gCRr9u2OA9uj9vCy revoked: vec![], excluded: vec![], certifications: vec![], - transactions: vec![TxDocOrTxHash::TxDoc(Box::new(tx1)), TxDocOrTxHash::TxDoc(Box::new(tx2))], + transactions: vec![tx1, tx2], }; // test inner_hash computation block.generate_inner_hash(); diff --git a/lib/dubp/block-doc/src/parser.rs b/lib/dubp/block-doc/src/parser.rs index 83855d976c8e31401b0213d7297294e1e567fe89..e85bffa269123c24fa7d65457bdf910511b553d4 100644 --- a/lib/dubp/block-doc/src/parser.rs +++ b/lib/dubp/block-doc/src/parser.rs @@ -15,11 +15,12 @@ //! Parsers for block. -use crate::block::{v10::TxDocOrTxHash, BlockDocument, BlockDocumentV10}; +use crate::block::{BlockDocument, BlockDocumentV10}; use dubp_common_doc::{BlockHash, BlockNumber}; use dubp_currency_params::genesis_block_params::v10::BlockV10Parameters; use dubp_currency_params::CurrencyName; use dubp_user_docs::documents::membership::v10::MembershipType; +use dubp_user_docs::documents::transaction::TransactionDocument; use dubp_user_docs::parsers::{serde_json_value_to_pest_json_value, DefaultHasher}; use dup_crypto::bases::BaseConvertionError; use dup_crypto::hashs::Hash; @@ -134,8 +135,7 @@ pub fn parse_json_block(json_block: &JSONValue<DefaultHasher>) -> Result<BlockDo })? .iter() .map(|tx| dubp_user_docs::parsers::transactions::parse_json_transaction(tx)) - .map(|tx_result| tx_result.map(|tx_doc| TxDocOrTxHash::TxDoc(Box::new(tx_doc)))) - .collect::<Result<Vec<TxDocOrTxHash>, Error>>()?, + .collect::<Result<Vec<TransactionDocument>, Error>>()?, })) } @@ -353,7 +353,7 @@ mod tests { revoked: vec![], excluded: vec![], certifications: vec![], - transactions: vec![TxDocOrTxHash::TxDoc(Box::new(dubp_user_docs_tests_tools::mocks::tx::first_g1_tx_doc()))], + transactions: vec![dubp_user_docs_tests_tools::mocks::tx::first_g1_tx_doc()], }); assert_eq!( expected_block, diff --git a/lib/dubp/indexes/src/sindex.rs b/lib/dubp/indexes/src/sindex.rs index 2e5e23d084b5918381e3a08157c17a07ed27f673..58568bdb69211f7c70478e9de310e06ae39ba39b 100644 --- a/lib/dubp/indexes/src/sindex.rs +++ b/lib/dubp/indexes/src/sindex.rs @@ -23,10 +23,21 @@ use dup_crypto::hashs::Hash; use dup_crypto::keys::PubKey; use serde::{Deserialize, Serialize}; +const UTXO_ID_SIZE: usize = 36; + #[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] /// Unique identifier for Unused tx output v10 pub struct UniqueIdUTXOv10(pub Hash, pub OutputIndex); +impl Into<Vec<u8>> for UniqueIdUTXOv10 { + fn into(self) -> Vec<u8> { + let mut buffer = Vec::with_capacity(UTXO_ID_SIZE); + buffer.append(&mut (self.0).0.to_vec()); + buffer.append(&mut (self.1).0.to_be_bytes().to_vec()); + buffer + } +} + #[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] /// Index of a V10 source pub enum SourceUniqueIdV10 { diff --git a/lib/modules-lib/bc-db-reader/src/constants.rs b/lib/modules-lib/bc-db-reader/src/constants.rs index 3849292bf097b6bfc0498d966f18c42fc585edf3..0ca5f166d8ebb25c0e0c07b6a624499648fccdbb 100644 --- a/lib/modules-lib/bc-db-reader/src/constants.rs +++ b/lib/modules-lib/bc-db-reader/src/constants.rs @@ -39,3 +39,13 @@ pub static WOT_ID_INDEX: &str = "wii"; /// Identities (WotId, DbIdentity) pub static IDENTITIES: &str = "idty"; + +/// Unused universal dividends +pub static DIVIDENDS: &str = "du"; + +/// Unused Transaction Output (UniqueIdUTXOv10, TransactionOutput) +pub static UTXOS: &str = "utxo"; + +/// Consumed UTXOs (BlockNumber, UTXO) +/// Used only to revert a block +pub static CONSUMED_UTXOS: &str = "cutxo"; diff --git a/lib/modules-lib/bc-db-reader/src/current_meta_datas.rs b/lib/modules-lib/bc-db-reader/src/current_meta_datas.rs index d1d1a76336b59bbf151e1bb1259da0792461501f..a57b7155d9e9244afb3dd85f6842301918dc778d 100644 --- a/lib/modules-lib/bc-db-reader/src/current_meta_datas.rs +++ b/lib/modules-lib/bc-db-reader/src/current_meta_datas.rs @@ -37,7 +37,7 @@ pub enum CurrentMetaDataKey { /// Fork tree ForkTree, /// Greatest wot id - GreatestWotId, + NextWotId, } impl CurrentMetaDataKey { @@ -49,7 +49,7 @@ impl CurrentMetaDataKey { Self::CurrentBlockstamp => 2, Self::CurrentBlockchainTime => 3, Self::ForkTree => 4, - Self::GreatestWotId => 5, + Self::NextWotId => 5, } } } @@ -91,24 +91,31 @@ pub fn get_currency_name<DB: DbReadable>(db: &DB) -> Result<Option<CurrencyName> } /// Get current blockstamp +#[inline] pub fn get_current_blockstamp<DB: DbReadable>(db: &DB) -> Result<Option<Blockstamp>, DbError> { - db.read(|r| { - if let Some(v) = db - .get_int_store(CURRENT_METAS_DATAS) - .get(r, CurrentMetaDataKey::CurrentBlockstamp.to_u32())? - { - if let DbValue::Blob(current_blockstamp_bytes) = v { - Ok(Some( - Blockstamp::from_bytes(current_blockstamp_bytes) - .map_err(|_| DbError::DBCorrupted)?, - )) - } else { - Err(DbError::DBCorrupted) - } + db.read(|r| get_current_blockstamp_(db, r)) +} + +/// Get current blockstamp +pub fn get_current_blockstamp_<DB: DbReadable, R: Reader>( + db: &DB, + r: &R, +) -> Result<Option<Blockstamp>, DbError> { + if let Some(v) = db + .get_int_store(CURRENT_METAS_DATAS) + .get(r, CurrentMetaDataKey::CurrentBlockstamp.to_u32())? + { + if let DbValue::Blob(current_blockstamp_bytes) = v { + Ok(Some( + Blockstamp::from_bytes(current_blockstamp_bytes) + .map_err(|_| DbError::DBCorrupted)?, + )) } else { - Ok(None) + Err(DbError::DBCorrupted) } - }) + } else { + Ok(None) + } } /// Get current common time (also named "blockchain time") @@ -148,7 +155,7 @@ pub fn get_fork_tree<DB: DbReadable>(db: &DB) -> Result<ForkTree, DbError> { pub fn get_greatest_wot_id_<DB: DbReadable, R: Reader>(db: &DB, r: &R) -> Result<WotId, DbError> { if let Some(v) = db .get_int_store(CURRENT_METAS_DATAS) - .get(r, CurrentMetaDataKey::GreatestWotId.to_u32())? + .get(r, CurrentMetaDataKey::NextWotId.to_u32())? { if let DbValue::U64(greatest_wot_id) = v { Ok(WotId(greatest_wot_id as usize)) diff --git a/lib/modules-lib/bc-db-reader/src/indexes.rs b/lib/modules-lib/bc-db-reader/src/indexes.rs index 5dcfd4bd1bbc69a31a82df8396da2788ca7d22cd..c3331138fe20bc98abda9efc1027d23ff9a59e6a 100644 --- a/lib/modules-lib/bc-db-reader/src/indexes.rs +++ b/lib/modules-lib/bc-db-reader/src/indexes.rs @@ -15,7 +15,6 @@ //! Blockchain stored indexes: definition and read requests. -pub mod balance; pub mod certs; pub mod identities; pub mod sources; diff --git a/lib/modules-lib/bc-db-reader/src/indexes/balance.rs b/lib/modules-lib/bc-db-reader/src/indexes/balance.rs deleted file mode 100644 index 02442d1b2c82d1b86f7d14e471a1d4a4e6e2138c..0000000000000000000000000000000000000000 --- a/lib/modules-lib/bc-db-reader/src/indexes/balance.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (C) 2017-2019 The AXIOM TEAM Association. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -//! Balances stored index. - -use super::sources::*; -use crate::BalancesV10Datas; -use dubp_user_docs::documents::transaction::UTXOConditionsGroup; -use durs_dbs_tools::{BinFreeStructDb, DbError}; - -/// Get address balance -pub fn get_address_balance( - balances_db: &BinFreeStructDb<BalancesV10Datas>, - address: &UTXOConditionsGroup, -) -> Result<Option<SourceAmount>, DbError> { - Ok(balances_db.read(|db| { - if let Some(balance_and_utxos) = db.get(address) { - Some(balance_and_utxos.0) - } else { - None - } - })?) -} diff --git a/lib/modules-lib/bc-db-reader/src/indexes/identities.rs b/lib/modules-lib/bc-db-reader/src/indexes/identities.rs index b83841a69213e0214a7e8892a8859b9925e042f0..9b008a8ab4bf77188c66024bfd8cac372b01cdf6 100644 --- a/lib/modules-lib/bc-db-reader/src/indexes/identities.rs +++ b/lib/modules-lib/bc-db-reader/src/indexes/identities.rs @@ -187,14 +187,14 @@ pub fn get_uid<DB: DbReadable>(db: &DB, pubkey: &PubKey) -> Result<Option<String Ok(get_identity_by_pubkey(db, pubkey)?.map(|db_idty| db_idty.idty_doc.username().to_owned())) } -/// Get pubkey from uid -pub fn get_pubkey_from_uid<DB: DbReadable>(db: &DB, uid: &str) -> Result<Option<PubKey>, DbError> { +/// Get wot id from uid +pub fn get_wot_id_from_uid<DB: DbReadable>(db: &DB, uid: &str) -> Result<Option<WotId>, DbError> { db.read(|r| { let greatest_wot_id = crate::current_meta_datas::get_greatest_wot_id_(db, r)?; for wot_id in 0..=greatest_wot_id.0 { if let Some(db_idty) = get_identity_by_wot_id_(db, r, WotId(wot_id))? { if db_idty.idty_doc.username() == uid { - return Ok(Some(db_idty.idty_doc.issuers()[0])); + return Ok(Some(WotId(wot_id))); } } } @@ -323,7 +323,7 @@ mod test { db.write(|mut w| { db.get_int_store(CURRENT_METAS_DATAS).put( w.as_mut(), - CurrentMetaDataKey::GreatestWotId.to_u32(), + CurrentMetaDataKey::NextWotId.to_u32(), &DbValue::U64(wot_id), )?; Ok(w) diff --git a/lib/modules-lib/bc-db-reader/src/indexes/sources.rs b/lib/modules-lib/bc-db-reader/src/indexes/sources.rs index db855fbee316721da8032669022d2f16dc10bc9d..5aade1616619fc3b8f5999ddab4999ebcb7507eb 100644 --- a/lib/modules-lib/bc-db-reader/src/indexes/sources.rs +++ b/lib/modules-lib/bc-db-reader/src/indexes/sources.rs @@ -15,12 +15,17 @@ //! Sources stored index. +use crate::constants::UTXOS; +use crate::*; +use dubp_common_doc::BlockNumber; use dubp_indexes::sindex::UniqueIdUTXOv10; use dubp_user_docs::documents::transaction::*; use durs_common_tools::fatal_error; +use durs_dbs_tools::DbError; use log::error; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; +use std::collections::HashMap; use std::ops::{Add, Sub}; #[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, PartialEq, PartialOrd, Serialize)] @@ -79,12 +84,9 @@ impl Sub for SourceAmount { } } -/// UTXO content V10 -pub type UTXOContentV10 = TransactionOutput; - #[derive(Debug, Clone, Deserialize, Serialize)] /// V10 Unused Transaction Output -pub struct UTXOV10(pub UniqueIdUTXOv10, pub UTXOContentV10); +pub struct UTXOV10(pub UniqueIdUTXOv10, pub TransactionOutput); impl UTXOV10 { /// UTXO conditions @@ -122,3 +124,45 @@ impl UTXO { } } } + +/// Get utxo v10 +pub fn get_utxo_v10<DB: DbReadable>( + db: &DB, + utxo_id: UniqueIdUTXOv10, +) -> Result<Option<TransactionOutput>, DbError> { + let utxo_id_bytes: Vec<u8> = utxo_id.into(); + db.read(|r| { + if let Some(v) = db.get_store(UTXOS).get(r, &utxo_id_bytes)? { + Ok(Some(DB::from_db_value(v)?)) + } else { + Ok(None) + } + }) +} + +/// Get utxo v10 +pub fn get_utxo_v10_<DB: DbReadable, R: Reader>( + db: &DB, + r: &R, + utxo_id: UniqueIdUTXOv10, +) -> Result<Option<TransactionOutput>, DbError> { + let utxo_id_bytes: Vec<u8> = utxo_id.into(); + if let Some(v) = db.get_store(UTXOS).get(r, &utxo_id_bytes)? { + Ok(Some(DB::from_db_value(v)?)) + } else { + Ok(None) + } +} + +/// Get block consumed sources +pub fn get_block_consumed_sources_<DB: DbReadable, R: Reader>( + db: &DB, + r: &R, + block_number: BlockNumber, +) -> Result<Option<HashMap<UniqueIdUTXOv10, TransactionOutput>>, DbError> { + if let Some(v) = db.get_int_store(CONSUMED_UTXOS).get(r, block_number.0)? { + Ok(Some(DB::from_db_value(v)?)) + } else { + Ok(None) + } +} diff --git a/lib/modules-lib/bc-db-reader/src/lib.rs b/lib/modules-lib/bc-db-reader/src/lib.rs index a4b58e28bdaaad291df8cac8d41bd904c59f822a..3d824cccce67b43c2891e21fe6a9685fccae6de9 100644 --- a/lib/modules-lib/bc-db-reader/src/lib.rs +++ b/lib/modules-lib/bc-db-reader/src/lib.rs @@ -56,6 +56,9 @@ pub fn bc_db_schema() -> KvFileDbSchema { ORPHAN_BLOCKSTAMP.to_owned() => KvFileDbStoreType::Single, IDENTITIES.to_owned() => KvFileDbStoreType::SingleIntKey, WOT_ID_INDEX.to_owned() => KvFileDbStoreType::Single, + DIVIDENDS.to_owned() => KvFileDbStoreType::Multi, + UTXOS.to_owned() => KvFileDbStoreType::Single, + CONSUMED_UTXOS.to_owned() => KvFileDbStoreType::SingleIntKey, ], } } @@ -76,15 +79,6 @@ pub type CertsExpirV10Datas = fnv::FnvHashMap< std::collections::HashSet<(durs_wot::WotId, durs_wot::WotId)>, >; -/// V10 Balances accounts -pub type BalancesV10Datas = std::collections::HashMap< - dubp_user_docs::documents::transaction::UTXOConditionsGroup, - ( - crate::indexes::sources::SourceAmount, - std::collections::HashSet<dubp_indexes::sindex::UniqueIdUTXOv10>, - ), ->; - #[cfg(test)] pub mod tests { diff --git a/lib/modules/blockchain/bc-db-writer/src/blocks.rs b/lib/modules/blockchain/bc-db-writer/src/blocks.rs index 621deb58d861a1c5abf37920216166f08f9f6eaf..38246e68920bc205310afa870ce213bfd7ffb261 100644 --- a/lib/modules/blockchain/bc-db-writer/src/blocks.rs +++ b/lib/modules/blockchain/bc-db-writer/src/blocks.rs @@ -31,6 +31,7 @@ use unwrap::unwrap; /// Insert new head Block in databases pub fn insert_new_head_block( db: &Db, + w: &mut DbWriter, fork_tree: Option<&mut ForkTree>, dal_block: DbBlock, ) -> Result<(), DbError> { @@ -38,66 +39,61 @@ pub fn insert_new_head_block( let bin_dal_block = durs_dbs_tools::to_bytes(&dal_block)?; let new_current_blockstamp_bytes: Vec<u8> = dal_block.blockstamp().into(); - // Open write transaction - db.write(|mut w| { - let current_meta_datas_store = db.get_int_store(CURRENT_METAS_DATAS); - let main_blocks_store = db.get_int_store(MAIN_BLOCKS); - let fork_blocks_store = db.get_store(FORK_BLOCKS); + let current_meta_datas_store = db.get_int_store(CURRENT_METAS_DATAS); + let main_blocks_store = db.get_int_store(MAIN_BLOCKS); + let fork_blocks_store = db.get_store(FORK_BLOCKS); - // Insert block in MAIN_BLOCKS store - main_blocks_store.put( - w.as_mut(), - *dal_block.block.number(), - &Db::db_value(&bin_dal_block)?, - )?; + // Insert block in MAIN_BLOCKS store + main_blocks_store.put( + w.as_mut(), + *dal_block.block.number(), + &Db::db_value(&bin_dal_block)?, + )?; - // Update current blockstamp - current_meta_datas_store.put( - w.as_mut(), - CurrentMetaDataKey::CurrentBlockstamp.to_u32(), - &DbValue::Blob(&new_current_blockstamp_bytes), - )?; - // Update current common time (also named "blockchain time") - current_meta_datas_store.put( + // Update current blockstamp + current_meta_datas_store.put( + w.as_mut(), + CurrentMetaDataKey::CurrentBlockstamp.to_u32(), + &DbValue::Blob(&new_current_blockstamp_bytes), + )?; + // Update current common time (also named "blockchain time") + current_meta_datas_store.put( + w.as_mut(), + CurrentMetaDataKey::CurrentBlockchainTime.to_u32(), + &DbValue::U64(dal_block.block.common_time()), + )?; + + if let Some(fork_tree) = fork_tree { + // Insert head block in fork tree + let removed_blockstamps = + crate::blocks::fork_tree::insert_new_head_block(fork_tree, dal_block.blockstamp())?; + // Insert head block in ForkBlocks + let blockstamp_bytes: Vec<u8> = dal_block.blockstamp().into(); + fork_blocks_store.put( w.as_mut(), - CurrentMetaDataKey::CurrentBlockchainTime.to_u32(), - &DbValue::U64(dal_block.block.common_time()), + &blockstamp_bytes, + &Db::db_value(&bin_dal_block)?, )?; - - if let Some(fork_tree) = fork_tree { - // Insert head block in fork tree - let removed_blockstamps = - crate::blocks::fork_tree::insert_new_head_block(fork_tree, dal_block.blockstamp())?; - // Insert head block in ForkBlocks - let blockstamp_bytes: Vec<u8> = dal_block.blockstamp().into(); - fork_blocks_store.put( - w.as_mut(), - &blockstamp_bytes, - &Db::db_value(&bin_dal_block)?, - )?; - // Remove too old blocks - for blockstamp in removed_blockstamps { - let blockstamp_bytes: Vec<u8> = blockstamp.into(); - fork_blocks_store.delete(w.as_mut(), &blockstamp_bytes)?; - } + // Remove too old blocks + for blockstamp in removed_blockstamps { + let blockstamp_bytes: Vec<u8> = blockstamp.into(); + fork_blocks_store.delete(w.as_mut(), &blockstamp_bytes)?; } - - Ok(w) - }) + } + Ok(()) } /// Remove a block in local blockchain storage -pub fn remove_block(db: &Db, block_number: BlockNumber) -> Result<(), DbError> { - db.write(|mut w| { - db.get_int_store(MAIN_BLOCKS) - .delete(w.as_mut(), block_number.0)?; - Ok(w) - }) +pub fn remove_block(db: &Db, w: &mut DbWriter, block_number: BlockNumber) -> Result<(), DbError> { + db.get_int_store(MAIN_BLOCKS) + .delete(w.as_mut(), block_number.0)?; + Ok(()) } /// Insert new fork Block in databases pub fn insert_new_fork_block( db: &Db, + w: &mut DbWriter, fork_tree: &mut ForkTree, dal_block: DbBlock, ) -> Result<bool, DbError> { @@ -109,50 +105,43 @@ pub fn insert_new_fork_block( unwrap!(dal_block.block.previous_hash()), )? { // Insert fork block FORK_BLOCKS - db.write(|mut w| { - db.get_store(FORK_BLOCKS).put( - w.as_mut(), - &blockstamp_bytes, - &Db::db_value(&bin_dal_block)?, - )?; - Ok(w) - })?; + db.get_store(FORK_BLOCKS).put( + w.as_mut(), + &blockstamp_bytes, + &Db::db_value(&bin_dal_block)?, + )?; // As long as orphan blocks can succeed the last inserted block, they are inserted for stackable_block in durs_bc_db_reader::blocks::get_stackables_blocks(db, dal_block.blockstamp())? { - let _ = insert_new_fork_block(db, fork_tree, stackable_block); + let _ = insert_new_fork_block(db, w, fork_tree, stackable_block); } Ok(true) } else { // Insert block in OrphanBlocks store let previous_blockstamp_bytes: Vec<u8> = dal_block.previous_blockstamp().into(); - db.write(|mut w| { - let orphan_blockstamps_store = db.get_store(ORPHAN_BLOCKSTAMP); - let mut orphan_blockstamps = if let Some(v) = - orphan_blockstamps_store.get(w.as_ref(), &previous_blockstamp_bytes)? - { - Db::from_db_value::<Vec<Blockstamp>>(v)? - } else { - vec![] - }; - orphan_blockstamps.push(dal_block.blockstamp()); - orphan_blockstamps_store.put( - w.as_mut(), - &previous_blockstamp_bytes, - &DbValue::Blob(&durs_dbs_tools::to_bytes(&orphan_blockstamps)?), - )?; - // Insert orphan block in FORK_BLOCKS - db.get_store(FORK_BLOCKS).put( - w.as_mut(), - &blockstamp_bytes, - &Db::db_value(&bin_dal_block)?, - )?; - // Commit - Ok(w) - })?; + let orphan_blockstamps_store = db.get_store(ORPHAN_BLOCKSTAMP); + let mut orphan_blockstamps = if let Some(v) = + orphan_blockstamps_store.get(w.as_ref(), &previous_blockstamp_bytes)? + { + Db::from_db_value::<Vec<Blockstamp>>(v)? + } else { + vec![] + }; + orphan_blockstamps.push(dal_block.blockstamp()); + orphan_blockstamps_store.put( + w.as_mut(), + &previous_blockstamp_bytes, + &DbValue::Blob(&durs_dbs_tools::to_bytes(&orphan_blockstamps)?), + )?; + // Insert orphan block in FORK_BLOCKS + db.get_store(FORK_BLOCKS).put( + w.as_mut(), + &blockstamp_bytes, + &Db::db_value(&bin_dal_block)?, + )?; Ok(false) } } diff --git a/lib/modules/blockchain/bc-db-writer/src/blocks/fork_tree.rs b/lib/modules/blockchain/bc-db-writer/src/blocks/fork_tree.rs index 9835fdfb3d5b62d0b707bd91a0cdfbdc195ba698..30f41d92a66a64c9abfb6519af5f55016d9593a0 100644 --- a/lib/modules/blockchain/bc-db-writer/src/blocks/fork_tree.rs +++ b/lib/modules/blockchain/bc-db-writer/src/blocks/fork_tree.rs @@ -22,16 +22,14 @@ use durs_bc_db_reader::constants::*; use durs_bc_db_reader::current_meta_datas::CurrentMetaDataKey; /// SAve fork tree -pub fn save_fork_tree(db: &Db, fork_tree: &ForkTree) -> Result<(), DbError> { +pub fn save_fork_tree(db: &Db, w: &mut DbWriter, fork_tree: &ForkTree) -> Result<(), DbError> { let bin_fork_tree = durs_dbs_tools::to_bytes(&fork_tree)?; - db.write(|mut w| { - db.get_int_store(CURRENT_METAS_DATAS).put( - w.as_mut(), - CurrentMetaDataKey::ForkTree.to_u32(), - &Db::db_value(&bin_fork_tree)?, - )?; - Ok(w) - }) + db.get_int_store(CURRENT_METAS_DATAS).put( + w.as_mut(), + CurrentMetaDataKey::ForkTree.to_u32(), + &Db::db_value(&bin_fork_tree)?, + )?; + Ok(()) } /// Insert new head Block in fork tree, diff --git a/lib/modules/blockchain/bc-db-writer/src/indexes/certs.rs b/lib/modules/blockchain/bc-db-writer/src/indexes/certs.rs index 487c9be013b2efee673808199fb351558ecea207..e1aa5ee797e7ed61bac1ec05fca2b3a5efae2913 100644 --- a/lib/modules/blockchain/bc-db-writer/src/indexes/certs.rs +++ b/lib/modules/blockchain/bc-db-writer/src/indexes/certs.rs @@ -15,7 +15,7 @@ //! Certifications stored indexes: write requests. -use crate::{BinFreeStructDb, Db, DbError}; +use crate::{BinFreeStructDb, Db, DbError, DbWriter}; use dubp_common_doc::BlockNumber; use dubp_currency_params::CurrencyParameters; use dubp_user_docs::documents::certification::CompactCertificationDocumentV10; @@ -28,6 +28,7 @@ use durs_wot::WotId; pub fn write_certification( currency_params: &CurrencyParameters, db: &Db, + w: &mut DbWriter, certs_db: &BinFreeStructDb<CertsExpirV10Datas>, source: WotId, target: WotId, @@ -44,14 +45,11 @@ pub fn write_certification( .push(written_timestamp + currency_params.sig_period); // Write new identity datas let bin_member_datas = durs_dbs_tools::to_bytes(&member_datas)?; - db.write(|mut w| { - db.get_int_store(IDENTITIES).put( - w.as_mut(), - source.0 as u32, - &DbValue::Blob(&bin_member_datas), - )?; - Ok(w) - })?; + db.get_int_store(IDENTITIES).put( + w.as_mut(), + source.0 as u32, + &DbValue::Blob(&bin_member_datas), + )?; // Add cert in certs_db certs_db.write(|db| { let mut created_certs = db.get(&created_block_id).cloned().unwrap_or_default(); @@ -64,6 +62,7 @@ pub fn write_certification( /// Revert writtent certification pub fn revert_write_cert( db: &Db, + w: &mut DbWriter, certs_db: &BinFreeStructDb<CertsExpirV10Datas>, compact_doc: CompactCertificationDocumentV10, source: WotId, @@ -79,20 +78,17 @@ pub fn revert_write_cert( db.insert(compact_doc.block_number, certs); })?; // Pop last cert_chainable_on - db.write(|mut w| { - let identities_store = db.get_int_store(IDENTITIES); - if let Some(v) = identities_store.get(w.as_ref(), source.0 as u32)? { - let mut member_datas = Db::from_db_value::<DbIdentity>(v)?; - member_datas.cert_chainable_on.pop(); - let bin_member_datas = durs_dbs_tools::to_bytes(&member_datas)?; - identities_store.put( - w.as_mut(), - source.0 as u32, - &DbValue::Blob(&bin_member_datas), - )? - } - Ok(w) - })?; + let identities_store = db.get_int_store(IDENTITIES); + if let Some(v) = identities_store.get(w.as_ref(), source.0 as u32)? { + let mut member_datas = Db::from_db_value::<DbIdentity>(v)?; + member_datas.cert_chainable_on.pop(); + let bin_member_datas = durs_dbs_tools::to_bytes(&member_datas)?; + identities_store.put( + w.as_mut(), + source.0 as u32, + &DbValue::Blob(&bin_member_datas), + )? + } Ok(()) } diff --git a/lib/modules/blockchain/bc-db-writer/src/indexes/dividends.rs b/lib/modules/blockchain/bc-db-writer/src/indexes/dividends.rs index 7a2a819d71abd32b722c9d517c4511f5963ff8d6..de1f1d05bf0fd954fdb26f6c69f5251adfd47313 100644 --- a/lib/modules/blockchain/bc-db-writer/src/indexes/dividends.rs +++ b/lib/modules/blockchain/bc-db-writer/src/indexes/dividends.rs @@ -17,16 +17,15 @@ use crate::*; use dubp_common_doc::BlockNumber; -use dubp_user_docs::documents::transaction::*; use dup_crypto::keys::PubKey; +use durs_bc_db_reader::constants::DIVIDENDS; use durs_bc_db_reader::indexes::sources::SourceAmount; -use durs_bc_db_reader::BalancesV10Datas; -use std::collections::{HashMap, HashSet}; +use durs_bc_db_reader::DbValue; /// Apply UD creation in databases pub fn create_du( - du_db: &BinFreeStructDb<UDsV10Datas>, - balances_db: &BinFreeStructDb<BalancesV10Datas>, + db: &Db, + w: &mut DbWriter, du_amount: &SourceAmount, du_block_id: BlockNumber, members: &[PubKey], @@ -37,54 +36,21 @@ pub fn create_du( du_amount, du_block_id.0, members, revert ); // Insert/Remove UD sources in UDsV10DB - du_db.write(|db| { - for pubkey in members { - let mut pubkey_dus = db.get(&pubkey).cloned().unwrap_or_default(); - if revert { - pubkey_dus.remove(&du_block_id); - } else { - pubkey_dus.insert(du_block_id); - } - db.insert(*pubkey, pubkey_dus); + for pubkey in members { + let pubkey_bytes = pubkey.to_bytes_vector(); + if revert { + db.get_multi_store(DIVIDENDS).delete( + w.as_mut(), + &pubkey_bytes, + &DbValue::U64(u64::from(du_block_id.0)), + )?; + } else { + db.get_multi_store(DIVIDENDS).put( + w.as_mut(), + &pubkey_bytes, + &DbValue::U64(u64::from(du_block_id.0)), + )?; } - })?; - // Get members balances - let members_balances: HashMap<PubKey, (SourceAmount, HashSet<UniqueIdUTXOv10>)> = - balances_db.read(|db| { - let mut members_balances = HashMap::new(); - for pubkey in members { - members_balances.insert( - *pubkey, - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(*pubkey), - )) - .cloned() - .unwrap_or_default(), - ); - } - members_balances - })?; - // Increase/Decrease members balance - let members_balances: Vec<(PubKey, (SourceAmount, HashSet<UniqueIdUTXOv10>))> = - members_balances - .iter() - .map(|(pubkey, (balance, utxos_indexs))| { - let new_balance = if revert { - *balance - *du_amount - } else { - *balance + *du_amount - }; - (*pubkey, (new_balance, utxos_indexs.clone())) - }) - .collect(); - // Write new members balance - balances_db.write(|db| { - for (pubkey, (balance, utxos_indexs)) in members_balances { - db.insert( - UTXOConditionsGroup::Single(TransactionOutputCondition::Sig(pubkey)), - (balance, utxos_indexs), - ); - } - })?; + } Ok(()) } diff --git a/lib/modules/blockchain/bc-db-writer/src/indexes/identities.rs b/lib/modules/blockchain/bc-db-writer/src/indexes/identities.rs index 8f0ecd81176843bf1191bef96dcce0b5185a9ce2..a15634c6517efa4197deeba34a3057f01da9b361 100644 --- a/lib/modules/blockchain/bc-db-writer/src/indexes/identities.rs +++ b/lib/modules/blockchain/bc-db-writer/src/indexes/identities.rs @@ -15,7 +15,7 @@ //! Identities stored indexes: write requests. -use crate::{BinFreeStructDb, Db, DbError, MsExpirV10Datas}; +use crate::{BinFreeStructDb, Db, DbError, DbWriter, MsExpirV10Datas}; use dubp_common_doc::traits::Document; use dubp_common_doc::{BlockNumber, Blockstamp}; use dubp_currency_params::CurrencyParameters; @@ -23,6 +23,7 @@ use dubp_user_docs::documents::identity::IdentityDocumentV10; use dup_crypto::keys::PubKey; use dup_crypto::keys::PublicKey; use durs_bc_db_reader::constants::*; +use durs_bc_db_reader::current_meta_datas::CurrentMetaDataKey; use durs_bc_db_reader::indexes::identities::get_wot_id_; use durs_bc_db_reader::indexes::identities::{DbIdentity, DbIdentityState}; use durs_bc_db_reader::{DbReadable, DbValue}; @@ -32,6 +33,7 @@ use durs_wot::WotId; /// Remove identity from databases pub fn revert_create_identity( db: &Db, + w: &mut DbWriter, ms_db: &BinFreeStructDb<MsExpirV10Datas>, pubkey: &PubKey, ) -> Result<(), DbError> { @@ -47,25 +49,40 @@ pub fn revert_create_identity( db.insert(dal_idty.ms_created_block_id, memberships); })?; // Remove identity - db.write(|mut w| { - let pubkey_bytes = dal_idty.idty_doc.issuers()[0].to_bytes_vector(); - if let Some(DbValue::U64(wot_id)) = - db.get_store(WOT_ID_INDEX).get(w.as_ref(), &pubkey_bytes)? - { - db.get_int_store(IDENTITIES) - .delete(w.as_mut(), wot_id as u32)?; - db.get_store(WOT_ID_INDEX) - .delete(w.as_mut(), &pubkey_bytes)?; - } - Ok(w) - })?; + let pubkey_bytes = dal_idty.idty_doc.issuers()[0].to_bytes_vector(); + if let Some(DbValue::U64(wot_id)) = db.get_store(WOT_ID_INDEX).get(w.as_ref(), &pubkey_bytes)? { + db.get_int_store(IDENTITIES) + .delete(w.as_mut(), wot_id as u32)?; + db.get_store(WOT_ID_INDEX) + .delete(w.as_mut(), &pubkey_bytes)?; + } Ok(()) } +/// Create WotId +pub fn create_wot_id(db: &Db, w: &mut DbWriter) -> Result<WotId, DbError> { + let next_wot_id = if let Some(DbValue::U64(next_wot_id)) = db + .get_int_store(CURRENT_METAS_DATAS) + .get(w.as_ref(), CurrentMetaDataKey::NextWotId.to_u32())? + { + next_wot_id + } else { + 0u64 + }; + + db.get_int_store(CURRENT_METAS_DATAS).put( + w.as_mut(), + CurrentMetaDataKey::NextWotId.to_u32(), + &DbValue::U64(next_wot_id + 1), + )?; + Ok(WotId(next_wot_id as usize)) +} + /// Write identity in databases pub fn create_identity( currency_params: &CurrencyParameters, db: &Db, + w: &mut DbWriter, ms_db: &BinFreeStructDb<MsExpirV10Datas>, idty_doc: &IdentityDocumentV10, ms_created_block_id: BlockNumber, @@ -89,16 +106,13 @@ pub fn create_identity( }; // Write Identity let bin_idty = durs_dbs_tools::to_bytes(&idty)?; - db.write(|mut w| { - db.get_store(WOT_ID_INDEX).put( - w.as_mut(), - &idty.idty_doc.issuers()[0].to_bytes_vector(), - &DbValue::U64(wot_id.0 as u64), - )?; - db.get_int_store(IDENTITIES) - .put(w.as_mut(), wot_id.0 as u32, &DbValue::Blob(&bin_idty))?; - Ok(w) - })?; + db.get_store(WOT_ID_INDEX).put( + w.as_mut(), + &idty.idty_doc.issuers()[0].to_bytes_vector(), + &DbValue::U64(wot_id.0 as u64), + )?; + db.get_int_store(IDENTITIES) + .put(w.as_mut(), wot_id.0 as u32, &DbValue::Blob(&bin_idty))?; // Write membership ms_db.write(|db| { let mut memberships = db.get(&ms_created_block_id).cloned().unwrap_or_default(); @@ -111,6 +125,7 @@ pub fn create_identity( /// Apply "exclude identity" event pub fn exclude_identity( db: &Db, + w: &mut DbWriter, pubkey: &PubKey, exclusion_blockstamp: &Blockstamp, revert: bool, @@ -140,24 +155,19 @@ pub fn exclude_identity( }; // Write new identity datas let bin_idty = durs_dbs_tools::to_bytes(&idty_datas)?; - db.write(|mut w| { - if let Some(wot_id) = get_wot_id_(db, w.as_ref(), &pubkey)? { - db.get_int_store(IDENTITIES).put( - w.as_mut(), - wot_id.0 as u32, - &DbValue::Blob(&bin_idty), - )?; - Ok(w) - } else { - Err(DbError::DBCorrupted) - } - })?; - Ok(()) + if let Some(wot_id) = get_wot_id_(db, w.as_ref(), &pubkey)? { + db.get_int_store(IDENTITIES) + .put(w.as_mut(), wot_id.0 as u32, &DbValue::Blob(&bin_idty))?; + Ok(()) + } else { + Err(DbError::DBCorrupted) + } } /// Apply "revoke identity" event pub fn revoke_identity( db: &Db, + w: &mut DbWriter, pubkey: &PubKey, renewal_blockstamp: &Blockstamp, explicit: bool, @@ -201,25 +211,20 @@ pub fn revoke_identity( // Update idty let bin_idty = durs_dbs_tools::to_bytes(&member_datas)?; - db.write(|mut w| { - if let Some(wot_id) = get_wot_id_(db, w.as_ref(), &pubkey)? { - db.get_int_store(IDENTITIES).put( - w.as_mut(), - wot_id.0 as u32, - &DbValue::Blob(&bin_idty), - )?; - Ok(w) - } else { - Err(DbError::DBCorrupted) - } - })?; - Ok(()) + if let Some(wot_id) = get_wot_id_(db, w.as_ref(), &pubkey)? { + db.get_int_store(IDENTITIES) + .put(w.as_mut(), wot_id.0 as u32, &DbValue::Blob(&bin_idty))?; + Ok(()) + } else { + Err(DbError::DBCorrupted) + } } /// Apply "renewal identity" event in databases pub fn renewal_identity( currency_params: &CurrencyParameters, db: &Db, + w: &mut DbWriter, ms_db: &BinFreeStructDb<MsExpirV10Datas>, idty_wot_id: WotId, renewal_timestamp: u64, @@ -269,14 +274,11 @@ pub fn renewal_identity( } // Write new identity datas let bin_idty = durs_dbs_tools::to_bytes(&idty_datas)?; - db.write(|mut w| { - db.get_int_store(IDENTITIES).put( - w.as_mut(), - idty_wot_id.0 as u32, - &DbValue::Blob(&bin_idty), - )?; - Ok(w) - })?; + db.get_int_store(IDENTITIES).put( + w.as_mut(), + idty_wot_id.0 as u32, + &DbValue::Blob(&bin_idty), + )?; // Update MsExpirV10DB ms_db.write(|db| { let mut memberships = db.get(&ms_created_block_id).cloned().unwrap_or_default(); @@ -287,14 +289,12 @@ pub fn renewal_identity( } /// Remove identity from databases -pub fn remove_identity(db: &Db, pubkey: PubKey) -> Result<(), DbError> { - db.write(|mut w| { - if let Some(wot_id) = get_wot_id_(db, w.as_ref(), &pubkey)? { - db.get_int_store(IDENTITIES) - .delete(w.as_mut(), wot_id.0 as u32)?; - Ok(w) - } else { - Err(DbError::DBCorrupted) - } - }) +pub fn remove_identity(db: &Db, w: &mut DbWriter, pubkey: PubKey) -> Result<(), DbError> { + if let Some(wot_id) = get_wot_id_(db, w.as_ref(), &pubkey)? { + db.get_int_store(IDENTITIES) + .delete(w.as_mut(), wot_id.0 as u32)?; + Ok(()) + } else { + Err(DbError::DBCorrupted) + } } diff --git a/lib/modules/blockchain/bc-db-writer/src/indexes/transactions.rs b/lib/modules/blockchain/bc-db-writer/src/indexes/transactions.rs index c6fdeb764e82c62d954987b80a1602360576c07c..7df63fa75e3f7e2912bffadd9954b487c99960e4 100644 --- a/lib/modules/blockchain/bc-db-writer/src/indexes/transactions.rs +++ b/lib/modules/blockchain/bc-db-writer/src/indexes/transactions.rs @@ -16,11 +16,13 @@ //! Transactions stored indexes: write requests. use dubp_user_docs::documents::transaction::*; +use durs_bc_db_reader::constants::*; +use durs_bc_db_reader::DbValue; use durs_common_tools::fatal_error; use crate::*; use dubp_indexes::sindex::{SourceUniqueIdV10, UniqueIdUTXOv10}; -use durs_bc_db_reader::indexes::sources::{SourceAmount, UTXOV10}; +use durs_bc_db_reader::indexes::sources::UTXOV10; #[derive(Debug)] /// Transaction error @@ -37,27 +39,19 @@ impl From<DbError> for TxError { } } -#[derive(Debug, Clone, Deserialize, Serialize)] -/// DB Transaction V10 -pub struct DbTxV10 { - /// Transaction document - pub tx_doc: TransactionDocument, - /// Index of sources destroyed by this transaction - pub sources_destroyed: HashSet<UniqueIdUTXOv10>, -} - /// Apply transaction backwards -pub fn revert_tx( - blockstamp: &Blockstamp, - dbs: &CurrencyV10DBs, - dal_tx: &DbTxV10, +pub fn revert_tx<S: std::hash::BuildHasher>( + db: &Db, + w: &mut DbWriter, + tx_doc: &TransactionDocument, + block_consumed_sources: &mut HashMap<UniqueIdUTXOv10, TransactionOutput, S>, ) -> Result<(), DbError> { - let mut tx_doc = dal_tx.tx_doc.clone(); - let tx_hash = tx_doc.get_hash(); - let sources_destroyed = &dal_tx.sources_destroyed; + let tx_hash = tx_doc + .get_hash_opt() + .unwrap_or_else(|| tx_doc.compute_hash()); - // Index consumed utxos - let consumed_utxos: Vec<UTXOV10> = tx_doc + // Index created utxos + let created_utxos: Vec<UTXOV10> = tx_doc .get_outputs() .iter() .enumerate() @@ -68,309 +62,130 @@ pub fn revert_tx( ) }) .collect(); - // Recalculate balance of consumed adress - let new_balances_consumed_adress = dbs.balances_db.read(|db| { - let mut new_balances_consumed_adress: HashMap< - UTXOConditionsGroup, - (SourceAmount, HashSet<UniqueIdUTXOv10>), - > = HashMap::new(); - for source in &consumed_utxos { - let source_amount = source.get_amount(); - let conditions = source.get_conditions(); - let (balance, new_sources_index) = if let Some((balance, sources_index)) = - new_balances_consumed_adress.get(&conditions) - { - let mut new_sources_index = sources_index.clone(); - new_sources_index.remove(&source.0); - (*balance, new_sources_index) - } else if let Some((balance, sources_index)) = db.get(&conditions) { - let mut new_sources_index = sources_index.clone(); - new_sources_index.remove(&source.0); - (*balance, new_sources_index) - } else { - fatal_error!("Fail to revert tx : an output conditions don't exist in BalancesDB.") - }; - let new_balance = if balance >= source_amount { - balance - source_amount - } else { - fatal_error!("Fail to revert tx : an output revert cause negative balance.") - }; - new_balances_consumed_adress.insert(conditions, (new_balance, new_sources_index)); - } - new_balances_consumed_adress - })?; - // Remove consumed UTXOs - dbs.utxos_db.write(|db| { - for utxo_v10 in consumed_utxos { - db.remove(&utxo_v10.0); - } - })?; - // Write new balance of consumed adress - dbs.balances_db.write(|db| { - for (conditions, (balance, sources_index)) in new_balances_consumed_adress { - db.insert(conditions, (balance, sources_index)); - } - })?; - // Complete sources_destroyed - let sources_destroyed: HashMap<UTXOConditionsGroup, Vec<(UniqueIdUTXOv10, SourceAmount)>> = - if !sources_destroyed.is_empty() { - dbs.tx_db.read(|db| { - let mut sources_destroyed_completed = HashMap::new(); - for s_index in sources_destroyed { - let tx_output = db - .get(&s_index.0) - .expect("Not find tx") - .tx_doc - .get_outputs()[(s_index.1).0] - .clone(); - let mut sources_destroyed_for_same_address: Vec<( - UniqueIdUTXOv10, - SourceAmount, - )> = sources_destroyed_completed - .get(&tx_output.conditions.conditions) - .cloned() - .unwrap_or_default(); - sources_destroyed_for_same_address - .push((*s_index, SourceAmount(tx_output.amount, tx_output.base))); - sources_destroyed_completed.insert( - tx_output.conditions.conditions, - sources_destroyed_for_same_address, - ); - } - sources_destroyed_completed - })? - } else { - HashMap::with_capacity(0) - }; - // Index recreated sources - let recreated_sources: HashMap<SourceUniqueIdV10, SourceAmount> = tx_doc + // Remove created UTXOs + for utxo_v10 in created_utxos { + let utxo_id_bytes: Vec<u8> = utxo_v10.0.into(); + db.get_store(UTXOS).delete(w.as_mut(), &utxo_id_bytes)?; + } + // Index consumed sources + let consumed_sources_ids: HashSet<SourceUniqueIdV10> = tx_doc .get_inputs() .iter() .map(|input| match *input { - TransactionInput::D(tx_amout, tx_amout_base, pubkey, block_id) => ( - SourceUniqueIdV10::UD(pubkey, block_id), - SourceAmount(tx_amout, tx_amout_base), - ), - TransactionInput::T(tx_amout, tx_amout_base, hash, tx_index) => ( - SourceUniqueIdV10::UTXO(UniqueIdUTXOv10(hash, tx_index)), - SourceAmount(tx_amout, tx_amout_base), - ), - }) - .collect(); - // Find adress of recreated sources - let recreated_adress: HashMap<UTXOConditionsGroup, (SourceAmount, HashSet<UniqueIdUTXOv10>)> = - dbs.utxos_db.read(|db| { - let mut recreated_adress: HashMap< - UTXOConditionsGroup, - (SourceAmount, HashSet<UniqueIdUTXOv10>), - > = HashMap::new(); - for (source_index, source_amount) in &recreated_sources { - if let SourceUniqueIdV10::UTXO(utxo_index) = source_index { - // Get utxo - let utxo = db.get(&utxo_index).unwrap_or_else(|| { - fatal_error!( - "ApplyBLockError {} : unknow UTXO in inputs : {:?} !", - blockstamp, - utxo_index - ) - }); - // Get utxo conditions(=address) - let conditions = &utxo.conditions.conditions; - // Calculate new balances datas for "conditions" address - let (mut balance, mut utxos_index) = recreated_adress - .get(conditions) - .cloned() - .unwrap_or_default(); - balance = balance + *source_amount; - utxos_index.insert(*utxo_index); - // Write new balances datas for "conditions" address - recreated_adress.insert(conditions.clone(), (balance, utxos_index)); - } else if let SourceUniqueIdV10::UD(pubkey, _block_id) = source_index { - let address = - UTXOConditionsGroup::Single(TransactionOutputCondition::Sig(*pubkey)); - let (mut balance, utxos_index) = - recreated_adress.get(&address).cloned().unwrap_or_default(); - balance = balance + *source_amount; - recreated_adress.insert(address, (balance, utxos_index)); - } + TransactionInput::D(_tx_amout, _tx_amout_base, pubkey, block_id) => { + SourceUniqueIdV10::UD(pubkey, block_id) } - recreated_adress - })?; - // Recalculate balance of recreated adress - let new_balances_recreated_adress = dbs.balances_db.read(|db| { - let mut new_balances_recreated_adress = Vec::new(); - for (conditions, (amount_recreated, adress_recreated_sources)) in recreated_adress { - let (mut balance, mut sources_indexs) = - if let Some((balance, sources_indexs)) = db.get(&conditions) { - (*balance, sources_indexs.clone()) - } else { - (SourceAmount::default(), HashSet::new()) - }; - // Apply recreated sources (inputs) - balance = balance + amount_recreated; - for s_index in adress_recreated_sources { - sources_indexs.insert(s_index); + TransactionInput::T(_tx_amout, _tx_amout_base, hash, tx_index) => { + SourceUniqueIdV10::UTXO(UniqueIdUTXOv10(hash, tx_index)) } - // Recreate destroy sources - if let Some(address_sources_destroyed) = sources_destroyed.get(&conditions) { - for (utxo_index, s_amout) in address_sources_destroyed { - balance = balance + *s_amout; - sources_indexs.insert(*utxo_index); - } + }) + .collect(); + // Recreate consumed sources + for s_index in consumed_sources_ids { + if let SourceUniqueIdV10::UTXO(utxo_id) = s_index { + if let Some(utxo_content) = block_consumed_sources.remove(&utxo_id) { + let utxo_id_bytes: Vec<u8> = utxo_id.into(); + let utxo_content_bytes = durs_dbs_tools::to_bytes(&utxo_content)?; + db.get_store(UTXOS).put( + w.as_mut(), + &utxo_id_bytes, + &DbValue::Blob(&utxo_content_bytes[..]), + )?; + } else { + fatal_error!( + "Revert invalid block: utxo {:?} not found in block.", + utxo_id + ); } - new_balances_recreated_adress.push((conditions.clone(), (balance, sources_indexs))); - } - new_balances_recreated_adress - })?; - // Write new balance of recreated adress - dbs.balances_db.write(|db| { - for (conditions, (balance, sources_index)) in new_balances_recreated_adress { - db.insert(conditions, (balance, sources_index)); - } - })?; - // Recreate recreated sources - for s_index in recreated_sources.keys() { - if let SourceUniqueIdV10::UTXO(utxo_index) = s_index { - let utxo_content = dbs.tx_db.read(|db| { - db.get(&utxo_index.0) - .expect("Fatal error : not found Source TX of this utxo !") - .tx_doc - .get_outputs()[(utxo_index.1).0] - .clone() - })?; - dbs.utxos_db.write(|db| { - db.insert(*utxo_index, utxo_content); - })?; } else if let SourceUniqueIdV10::UD(pubkey, block_id) = s_index { - let mut pubkey_dus: HashSet<BlockNumber> = dbs - .du_db - .read(|db| db.get(&pubkey).cloned().unwrap_or_default())?; - pubkey_dus.insert(*block_id); - dbs.du_db.write(|db| { - db.insert(*pubkey, pubkey_dus); - })?; + db.get_multi_store(DIVIDENDS).put( + w.as_mut(), + &pubkey.to_bytes_vector(), + &DbValue::U64(u64::from(block_id.0)), + )?; } } Ok(()) } -/// Apply and write transaction in databases +/// Apply and write transaction pub fn apply_and_write_tx( - blockstamp: &Blockstamp, - dbs: &CurrencyV10DBs, + db: &Db, + w: &mut DbWriter, tx_doc: &TransactionDocument, + in_fork_window: bool, ) -> Result<(), DbError> { - let mut tx_doc = tx_doc.clone(); - let tx_hash = tx_doc.get_hash(); - let mut sources_destroyed = HashSet::new(); + let tx_hash = tx_doc + .get_hash_opt() + .unwrap_or_else(|| tx_doc.compute_hash()); // Index consumed sources - let consumed_sources: HashMap<SourceUniqueIdV10, SourceAmount> = tx_doc + let consumed_sources_ids: HashSet<SourceUniqueIdV10> = tx_doc .get_inputs() .iter() .map(|input| match *input { - TransactionInput::D(tx_amout, tx_amout_base, pubkey, block_id) => ( - SourceUniqueIdV10::UD(pubkey, block_id), - SourceAmount(tx_amout, tx_amout_base), - ), - TransactionInput::T(tx_amout, tx_amout_base, hash, tx_index) => ( - SourceUniqueIdV10::UTXO(UniqueIdUTXOv10(hash, tx_index)), - SourceAmount(tx_amout, tx_amout_base), - ), + TransactionInput::D(_tx_amout, _tx_amout_base, pubkey, block_id) => { + SourceUniqueIdV10::UD(pubkey, block_id) + } + TransactionInput::T(_tx_amout, _tx_amout_base, hash, tx_index) => { + SourceUniqueIdV10::UTXO(UniqueIdUTXOv10(hash, tx_index)) + } }) .collect(); - // Find adress of consumed sources - let consumed_adress: HashMap<UTXOConditionsGroup, (SourceAmount, HashSet<UniqueIdUTXOv10>)> = - dbs.utxos_db.read(|db| { - let mut consumed_adress: HashMap< - UTXOConditionsGroup, - (SourceAmount, HashSet<UniqueIdUTXOv10>), - > = HashMap::new(); - for (source_index, source_amount) in &consumed_sources { - if let SourceUniqueIdV10::UTXO(utxo_index) = source_index { - // Get utxo - let utxo = db.get(&utxo_index).unwrap_or_else(|| { - debug!("apply_tx=\"{:#?}\"", tx_doc); - fatal_error!( - "ApplyBLockError {} : unknow UTXO in inputs : {:?} !", - blockstamp, - utxo_index - ) - }); - // Get utxo conditions(=address) - let conditions = &utxo.conditions.conditions; - // Calculate new balances datas for "conditions" address - let (mut balance, mut utxos_index) = - consumed_adress.get(conditions).cloned().unwrap_or_default(); - balance = balance + *source_amount; - utxos_index.insert(*utxo_index); - // Write new balances datas for "conditions" address - consumed_adress.insert(conditions.clone(), (balance, utxos_index)); - } else if let SourceUniqueIdV10::UD(pubkey, _block_id) = source_index { - let address = - UTXOConditionsGroup::Single(TransactionOutputCondition::Sig(*pubkey)); - let (mut balance, utxos_index) = - consumed_adress.get(&address).cloned().unwrap_or_default(); - balance = balance + *source_amount; - consumed_adress.insert(address, (balance, utxos_index)); - } - } - consumed_adress - })?; - // Recalculate balance of consumed adress - let new_balances_consumed_adress = dbs.balances_db.read(|db| { - let mut new_balances_consumed_adress = Vec::new(); - for (conditions, (amount_consumed, adress_consumed_sources)) in consumed_adress { - if let Some((balance, sources)) = db.get(&conditions) { - let mut new_balance = *balance - amount_consumed; - if (new_balance.1 == TxBase(0) && new_balance.0 < TxAmount(100)) - || (new_balance.1 == TxBase(1) && new_balance.0 < TxAmount(10)) { - sources_destroyed = sources.union(&sources_destroyed).cloned().collect(); - new_balance = SourceAmount(TxAmount(0), new_balance.1); + if in_fork_window { + // Persist consumed sources (for future revert) + let consumed_sources = consumed_sources_ids + .iter() + .filter_map(|source_id| { + if let SourceUniqueIdV10::UTXO(utxo_id) = source_id { + Some(utxo_id) + } else { + None } - let mut new_sources_index = sources.clone(); - for source in adress_consumed_sources { - new_sources_index.remove(&source); + }) + .map(|utxo_id| { + let utxo_id_bytes: Vec<u8> = (*utxo_id).into(); + if let Some(value) = db.get_store(UTXOS).get(w.as_ref(), &utxo_id_bytes)? { + let utxo_content: TransactionOutput = Db::from_db_value(value)?; + Ok((*utxo_id, utxo_content)) + } else { + fatal_error!("Try to persist unexist consumed source."); } - new_balances_consumed_adress - .push((conditions.clone(), (new_balance, new_sources_index))); - } else { - fatal_error!("Apply Tx : try to consume a source, but the owner address is not found in balances db : {:?}", conditions) - } - } - new_balances_consumed_adress - })?; - // Write new balance of consumed adress - dbs.balances_db.write(|db| { - for (conditions, (balance, sources_index)) in new_balances_consumed_adress { - db.insert(conditions, (balance, sources_index)); - } - })?; + }) + .collect::<Result<HashMap<UniqueIdUTXOv10, TransactionOutput>, DbError>>()?; + let consumed_sources_bytes = durs_dbs_tools::to_bytes(&consumed_sources)?; + let block_number = + durs_bc_db_reader::current_meta_datas::get_current_blockstamp_(db, w.as_ref())? + .unwrap_or_default() + .id; + db.get_int_store(CONSUMED_UTXOS).put( + w.as_mut(), + block_number.0, + &DbValue::Blob(&consumed_sources_bytes[..]), + )?; + } // Remove consumed sources - for source_index in consumed_sources.keys() { - if let SourceUniqueIdV10::UTXO(utxo_index) = source_index { - dbs.utxos_db.write(|db| { - db.remove(utxo_index); - })?; - } else if let SourceUniqueIdV10::UD(pubkey, block_id) = source_index { - let mut pubkey_dus: HashSet<BlockNumber> = dbs - .du_db - .read(|db| db.get(&pubkey).cloned().unwrap_or_default())?; - pubkey_dus.remove(block_id); - dbs.du_db.write(|db| { - db.insert(*pubkey, pubkey_dus); - })?; + for source_id in consumed_sources_ids { + if let SourceUniqueIdV10::UTXO(utxo_id) = source_id { + let uxtx_id_bytes: Vec<u8> = utxo_id.into(); + db.get_store(UTXOS) + .delete(w.as_mut(), uxtx_id_bytes) + .map_err(|e| { + warn!("Fail to delete UTXO({:?}).", utxo_id); + e + })?; + } else if let SourceUniqueIdV10::UD(pubkey, block_id) = source_id { + db.get_multi_store(DIVIDENDS) + .delete( + w.as_mut(), + &pubkey.to_bytes_vector(), + &DbValue::U64(u64::from(block_id.0)), + ) + .map_err(|e| { + warn!("Fail to delete UD({}-#{}).", pubkey, block_id); + e + })?; } } - // Index created sources - /*let mut created_utxos: Vec<UTXOV10> = Vec::new(); - let mut output_index = 0; - for output in tx_doc.get_outputs() { - created_utxos.push(UTXOV10( - UniqueIdUTXOv10(tx_hash, TxIndex(output_index)), - output.clone(), - )); - output_index += 1; - }*/ let created_utxos: Vec<UTXOV10> = tx_doc .get_outputs() .iter() @@ -382,58 +197,16 @@ pub fn apply_and_write_tx( ) }) .collect(); - // Recalculate balance of supplied adress - let new_balances_supplied_adress = dbs.balances_db.read(|db| { - let mut new_balances_supplied_adress: HashMap< - UTXOConditionsGroup, - (SourceAmount, HashSet<UniqueIdUTXOv10>), - > = HashMap::new(); - for source in &created_utxos { - let source_amount = source.get_amount(); - let conditions = source.get_conditions(); - let (balance, new_sources_index) = if let Some((balance, sources_index)) = - new_balances_supplied_adress.get(&conditions) - { - let mut new_sources_index = sources_index.clone(); - new_sources_index.insert(source.0); - (*balance, new_sources_index) - } else if let Some((balance, sources_index)) = db.get(&conditions) { - let mut new_sources_index = sources_index.clone(); - new_sources_index.insert(source.0); - (*balance, new_sources_index) - } else { - let mut new_sources_index = HashSet::new(); - new_sources_index.insert(source.0); - (SourceAmount::default(), new_sources_index) - }; - new_balances_supplied_adress - .insert(conditions, (balance + source_amount, new_sources_index)); - } - new_balances_supplied_adress - })?; // Insert created UTXOs - dbs.utxos_db.write(|db| { - for utxo_v10 in created_utxos { - db.insert(utxo_v10.0, utxo_v10.1); - } - })?; - // Write new balance of supplied adress - dbs.balances_db.write(|db| { - for (conditions, (balance, sources_index)) in new_balances_supplied_adress { - db.insert(conditions, (balance, sources_index)); - } - })?; - // Write tx - tx_doc.reduce(); - dbs.tx_db.write(|db| { - db.insert( - tx_hash, - DbTxV10 { - tx_doc, - sources_destroyed, - }, - ); - })?; + for utxo_v10 in created_utxos { + let utxo_id_bytes: Vec<u8> = utxo_v10.0.into(); + let utxo_value_bytes = durs_dbs_tools::to_bytes(&utxo_v10.1)?; + db.get_store(UTXOS).put( + w.as_mut(), + utxo_id_bytes, + &DbValue::Blob(&utxo_value_bytes[..]), + )?; + } Ok(()) } @@ -441,8 +214,10 @@ pub fn apply_and_write_tx( mod tests { use super::*; use dubp_common_doc::traits::{Document, DocumentBuilder}; + use dubp_common_doc::BlockHash; + use durs_bc_db_reader::current_meta_datas::CurrentMetaDataKey; + use durs_bc_db_reader::indexes::sources::SourceAmount; use std::str::FromStr; - use unwrap::unwrap; fn build_first_tx_of_g1() -> TransactionDocument { let pubkey = PubKey::Ed25519( @@ -485,7 +260,7 @@ mod tests { } #[test] - fn apply_and_revert_one_tx() { + fn apply_and_revert_one_tx() -> Result<(), DbError> { // Get document of first g1 transaction let tx_doc = build_first_tx_of_g1(); assert_eq!(tx_doc.verify_signatures(), Ok(())); @@ -494,119 +269,63 @@ mod tests { ed25519::PublicKey::from_base58("Com8rJukCozHZyFao6AheSsfDQdPApxQRnz7QYFf64mm") .unwrap(), ); - // Open currencys_db in memory mode - let currency_dbs = CurrencyV10DBs::open(None); + // Open blockchain DB + let db = crate::tests::open_tmp_db()?; // Create first g1 UD for cgeek and tortue - crate::indexes::dividends::create_du( - ¤cy_dbs.du_db, - ¤cy_dbs.balances_db, - &SourceAmount(TxAmount(1000), TxBase(0)), - BlockNumber(1), - &vec![tx_doc.issuers()[0], tortue_pubkey], - false, - ) - .expect("Fail to create first g1 UD !"); - // Check members balance - let cgeek_new_balance = currency_dbs - .balances_db - .read(|db| { - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(tx_doc.issuers()[0]), - )) - .cloned() - }) - .expect("Fail to read cgeek new balance") - .expect("Error : cgeek is not referenced in balances_db !"); - assert_eq!(cgeek_new_balance.0, SourceAmount(TxAmount(1000), TxBase(0))); - let tortue_new_balance = currency_dbs - .balances_db - .read(|db| { - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(tortue_pubkey), - )) - .cloned() - }) - .expect("Fail to read receiver new balance") - .expect("Error : receiver is not referenced in balances_db !"); - assert_eq!( - tortue_new_balance.0, - SourceAmount(TxAmount(1000), TxBase(0)) - ); - // Apply first g1 transaction - let blockstamp = unwrap!(Blockstamp::from_string( - "52-000057D4B29AF6DADB16F841F19C54C00EB244CECA9C8F2D4839D54E5F91451C" - )); - apply_and_write_tx(&blockstamp, ¤cy_dbs, &tx_doc).expect("Fail to apply first g1 tx"); - // Check issuer new balance - let cgeek_new_balance = currency_dbs - .balances_db - .read(|db| { - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(tx_doc.issuers()[0]), - )) - .cloned() - }) - .expect("Fail to read cgeek new balance") - .expect("Error : cgeek is not referenced in balances_db !"); - assert_eq!(cgeek_new_balance.0, SourceAmount(TxAmount(999), TxBase(0))); + db.write(|mut w| { + crate::indexes::dividends::create_du( + &db, + &mut w, + &SourceAmount(TxAmount(1000), TxBase(0)), + BlockNumber(1), + &vec![tx_doc.issuers()[0], tortue_pubkey], + false, + )?; + Ok(w) + })?; - // Check receiver new balance - let receiver_new_balance = currency_dbs - .balances_db - .read(|db| { - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(tortue_pubkey), - )) - .cloned() - }) - .expect("Fail to read receiver new balance") - .expect("Error : receiver is not referenced in balances_db !"); - assert_eq!( - receiver_new_balance.0, - SourceAmount(TxAmount(1001), TxBase(0)) - ); + db.write(|mut w| { + // Update current blockstamp + let new_current_blockstamp_bytes: Vec<u8> = Blockstamp { + id: BlockNumber(52), + hash: BlockHash(Hash::default()), + } + .into(); + db.get_int_store(CURRENT_METAS_DATAS).put( + w.as_mut(), + CurrentMetaDataKey::CurrentBlockstamp.to_u32(), + &DbValue::Blob(&new_current_blockstamp_bytes), + )?; + // Apply first g1 transaction + apply_and_write_tx(&db, &mut w, &tx_doc, true)?; + Ok(w) + })?; + // Check new UTXOS + // TODO + //db.get_store(UTXOS).iter_start()? + let count_utxos = db.read(|r| Ok(db.get_store(UTXOS).iter_start(r)?.count()))?; + assert_eq!(2, count_utxos); // Revert first g1 tx - let blockstamp = unwrap!(Blockstamp::from_string( - "52-000057D4B29AF6DADB16F841F19C54C00EB244CECA9C8F2D4839D54E5F91451C" - )); - revert_tx( - &blockstamp, - ¤cy_dbs, - &DbTxV10 { - tx_doc: tx_doc.clone(), - sources_destroyed: HashSet::with_capacity(0), - }, - ) - .expect("Fail to revert first g1 tx"); + db.write(|mut w| { + if let Some(mut block_consumed_sources_opt) = + durs_bc_db_reader::indexes::sources::get_block_consumed_sources_( + &db, + w.as_ref(), + BlockNumber(52), + )? + { + revert_tx(&db, &mut w, &tx_doc, &mut block_consumed_sources_opt)?; + } else { + panic!(dbg!("No block consumed sources")); + } + Ok(w) + })?; - // Check issuer new balance - let cgeek_new_balance = currency_dbs - .balances_db - .read(|db| { - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(tx_doc.issuers()[0]), - )) - .cloned() - }) - .expect("Fail to read cgeek new balance") - .expect("Error : cgeek is not referenced in balances_db !"); - assert_eq!(cgeek_new_balance.0, SourceAmount(TxAmount(1000), TxBase(0))); + // UTXOS must be empty + let count_utxos = db.read(|r| Ok(db.get_store(UTXOS).iter_start(r)?.count()))?; + assert_eq!(0, count_utxos); - // Check receiver new balance - let receiver_new_balance = currency_dbs - .balances_db - .read(|db| { - db.get(&UTXOConditionsGroup::Single( - TransactionOutputCondition::Sig(tortue_pubkey), - )) - .cloned() - }) - .expect("Fail to read receiver new balance") - .expect("Error : receiver is not referenced in balances_db !"); - assert_eq!( - receiver_new_balance.0, - SourceAmount(TxAmount(1000), TxBase(0)) - ); + Ok(()) } } diff --git a/lib/modules/blockchain/bc-db-writer/src/lib.rs b/lib/modules/blockchain/bc-db-writer/src/lib.rs index ac01201637d815432a355eb86295701bfdccd082..0978d03b83d469f7954a2322b4f0abf50541a35f 100644 --- a/lib/modules/blockchain/bc-db-writer/src/lib.rs +++ b/lib/modules/blockchain/bc-db-writer/src/lib.rs @@ -29,8 +29,6 @@ #[macro_use] extern crate log; -#[macro_use] -extern crate serde_derive; pub mod blocks; pub mod indexes; @@ -38,25 +36,21 @@ pub mod writers; pub use durs_dbs_tools::kv_db::{ KvFileDbHandler, KvFileDbRead as DbReadable, KvFileDbRoHandler, KvFileDbSchema, - KvFileDbStoreType, KvFileDbValue, + KvFileDbStoreType, KvFileDbValue, KvFileDbWriter as DbWriter, }; pub use durs_dbs_tools::{ open_free_struct_db, open_free_struct_file_db, open_free_struct_memory_db, }; pub use durs_dbs_tools::{BinFreeStructDb, DbError}; -use crate::indexes::transactions::DbTxV10; use dubp_common_doc::{BlockNumber, Blockstamp}; use dubp_indexes::sindex::UniqueIdUTXOv10; use dubp_user_docs::documents::transaction::*; use dup_crypto::hashs::Hash; use dup_crypto::keys::*; -use durs_bc_db_reader::indexes::sources::UTXOContentV10; -use durs_bc_db_reader::{BalancesV10Datas, CertsExpirV10Datas}; -use durs_common_tools::fatal_error; +use durs_bc_db_reader::CertsExpirV10Datas; use durs_wot::data::{rusty::RustyWebOfTrust, WotId}; use fnv::FnvHashMap; -use serde::Serialize; use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; @@ -72,10 +66,6 @@ pub type ForksTreeV10Datas = durs_bc_db_reader::blocks::fork_tree::ForkTree; pub type WotDB = RustyWebOfTrust; /// Memberships sorted by created block pub type MsExpirV10Datas = FnvHashMap<BlockNumber, HashSet<WotId>>; -/// V10 Transactions indexed by their hashs -pub type TxV10Datas = HashMap<Hash, DbTxV10>; -/// V10 Unused Transaction Output (=sources) -pub type UTXOsV10Datas = HashMap<UniqueIdUTXOv10, UTXOContentV10>; /// V10 UDs sources pub type UDsV10Datas = HashMap<PubKey, HashSet<BlockNumber>>; @@ -127,55 +117,6 @@ impl WotsV10DBs { } } -#[derive(Debug)] -/// Set of databases storing currency information -pub struct CurrencyV10DBs { - /// Store all UD sources - pub du_db: BinFreeStructDb<UDsV10Datas>, - /// Store all Transactions - pub tx_db: BinFreeStructDb<TxV10Datas>, - /// Store all UTXOs - pub utxos_db: BinFreeStructDb<UTXOsV10Datas>, - /// Store balances of all address (and theirs UTXOs indexs) - pub balances_db: BinFreeStructDb<BalancesV10Datas>, -} - -impl CurrencyV10DBs { - /// Open currency databases from their respective files - pub fn open(db_path: Option<&PathBuf>) -> CurrencyV10DBs { - CurrencyV10DBs { - du_db: open_free_struct_db::<UDsV10Datas>(db_path, "du.db") - .expect("Fail to open UDsV10DB"), - tx_db: open_free_struct_db::<TxV10Datas>(db_path, "tx.db") - .unwrap_or_else(|_| fatal_error!("Fail to open TxV10DB")), - utxos_db: open_free_struct_db::<UTXOsV10Datas>(db_path, "sources.db") - .expect("Fail to open UTXOsV10DB"), - balances_db: open_free_struct_db::<BalancesV10Datas>(db_path, "balances.db") - .expect("Fail to open BalancesV10DB"), - } - } - /// Save currency databases in their respective files - pub fn save_dbs(&self, tx: bool, du: bool) { - if tx { - info!("BC-DB-WRITER: Save CurrencyV10DBs."); - self.tx_db - .save() - .expect("Fatal error : fail to save LocalBlockchainV10DB !"); - self.utxos_db - .save() - .expect("Fatal error : fail to save UTXOsV10DB !"); - self.balances_db - .save() - .expect("Fatal error : fail to save BalancesV10DB !"); - } - if du { - self.du_db - .save() - .expect("Fatal error : fail to save UDsV10DB !"); - } - } -} - /*#[derive(Debug, Clone)] pub struct WotStats { pub block_number: u32, diff --git a/lib/modules/blockchain/bc-db-writer/src/writers/mod.rs b/lib/modules/blockchain/bc-db-writer/src/writers/mod.rs index 86e98a4af827349ad3dc7a6072091adfc64358a0..82ea3bb48ee009d9e6e846b9e1aed6207b9416ff 100644 --- a/lib/modules/blockchain/bc-db-writer/src/writers/mod.rs +++ b/lib/modules/blockchain/bc-db-writer/src/writers/mod.rs @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -//! DELETION IN PROGRESS +//! Blockchain DB writers /// DELETION IN PROGRESS pub mod requests; diff --git a/lib/modules/blockchain/bc-db-writer/src/writers/requests.rs b/lib/modules/blockchain/bc-db-writer/src/writers/requests.rs index 2f7fa57020feab3740a7a83fbef75531052facbc..1f635870dd74ead5dcff9d4a77798ad7a610e893 100644 --- a/lib/modules/blockchain/bc-db-writer/src/writers/requests.rs +++ b/lib/modules/blockchain/bc-db-writer/src/writers/requests.rs @@ -13,7 +13,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use crate::indexes::transactions::DbTxV10; use crate::*; use dubp_block_doc::block::{BlockDocument, BlockDocumentTrait}; use dubp_common_doc::Blockstamp; @@ -59,6 +58,7 @@ impl BlocksDBsWriteQuery { pub fn apply( self, db: &Db, + w: &mut DbWriter, fork_tree: &mut ForkTree, fork_window_size: usize, sync_target: Option<Blockstamp>, @@ -71,14 +71,14 @@ impl BlocksDBsWriteQuery { || dal_block.blockstamp().id.0 + fork_window_size as u32 >= sync_target.expect("safe unwrap").id.0 { - crate::blocks::insert_new_head_block(db, Some(fork_tree), dal_block)?; + crate::blocks::insert_new_head_block(db, w, Some(fork_tree), dal_block)?; } else { - crate::blocks::insert_new_head_block(db, None, dal_block)?; + crate::blocks::insert_new_head_block(db, w, None, dal_block)?; } } BlocksDBsWriteQuery::RevertBlock(dal_block) => { trace!("BlocksDBsWriteQuery::WriteBlock..."); - crate::blocks::remove_block(db, dal_block.block.number())?; + crate::blocks::remove_block(db, w, dal_block.block.number())?; trace!("BlocksDBsWriteQuery::WriteBlock...finish"); } } @@ -125,10 +125,11 @@ impl WotsDBsWriteQuery { /// Apply WotsDBsWriteQuery pub fn apply( &self, + db: &Db, + w: &mut DbWriter, _blockstamp: &Blockstamp, currency_params: &CurrencyParameters, databases: &WotsV10DBs, - db: &Db, ) -> Result<(), DbError> { match *self { WotsDBsWriteQuery::CreateIdentity( @@ -141,6 +142,7 @@ impl WotsDBsWriteQuery { crate::indexes::identities::create_identity( currency_params, &db, + w, &databases.ms_db, idty_doc.deref(), *ms_created_block_id, @@ -150,7 +152,12 @@ impl WotsDBsWriteQuery { )?; } WotsDBsWriteQuery::RevertCreateIdentity(ref pubkey) => { - crate::indexes::identities::revert_create_identity(&db, &databases.ms_db, pubkey)?; + crate::indexes::identities::revert_create_identity( + &db, + w, + &databases.ms_db, + pubkey, + )?; } WotsDBsWriteQuery::RenewalIdentity( _, @@ -162,6 +169,7 @@ impl WotsDBsWriteQuery { crate::indexes::identities::renewal_identity( currency_params, &db, + w, &databases.ms_db, *idty_wot_id, *current_bc_time, @@ -179,6 +187,7 @@ impl WotsDBsWriteQuery { crate::indexes::identities::renewal_identity( currency_params, &db, + w, &databases.ms_db, *idty_wot_id, *current_bc_time, @@ -187,19 +196,19 @@ impl WotsDBsWriteQuery { )?; } WotsDBsWriteQuery::ExcludeIdentity(ref pubkey, ref blockstamp) => { - crate::indexes::identities::exclude_identity(&db, pubkey, blockstamp, false)?; + crate::indexes::identities::exclude_identity(&db, w, pubkey, blockstamp, false)?; } WotsDBsWriteQuery::RevertExcludeIdentity(ref pubkey, ref blockstamp) => { - crate::indexes::identities::exclude_identity(&db, pubkey, blockstamp, true)?; + crate::indexes::identities::exclude_identity(&db, w, pubkey, blockstamp, true)?; } WotsDBsWriteQuery::RevokeIdentity(ref pubkey, ref blockstamp, ref explicit) => { crate::indexes::identities::revoke_identity( - &db, pubkey, blockstamp, *explicit, false, + &db, w, pubkey, blockstamp, *explicit, false, )?; } WotsDBsWriteQuery::RevertRevokeIdentity(ref pubkey, ref blockstamp, ref explicit) => { crate::indexes::identities::revoke_identity( - &db, pubkey, blockstamp, *explicit, true, + &db, w, pubkey, blockstamp, *explicit, true, )?; } WotsDBsWriteQuery::CreateCert( @@ -213,6 +222,7 @@ impl WotsDBsWriteQuery { crate::indexes::certs::write_certification( currency_params, &db, + w, &databases.certs_db, *source, *target, @@ -225,6 +235,7 @@ impl WotsDBsWriteQuery { trace!("WotsDBsWriteQuery::CreateCert..."); crate::indexes::certs::revert_write_cert( &db, + w, &databases.certs_db, *compact_doc, *source, @@ -254,7 +265,7 @@ pub enum CurrencyDBsWriteQuery { /// Write transaction WriteTx(Box<TransactionDocument>), /// Revert transaction - RevertTx(Box<DbTxV10>), + RevertTx(Box<TransactionDocument>), /// Create dividend CreateUD(SourceAmount, BlockNumber, Vec<PubKey>), /// Revert dividend @@ -265,39 +276,39 @@ impl CurrencyDBsWriteQuery { /// Apply CurrencyDBsWriteQuery pub fn apply( &self, - blockstamp: &Blockstamp, - databases: &CurrencyV10DBs, + db: &Db, + w: &mut DbWriter, + block_consumed_sources_opt: Option<&mut HashMap<UniqueIdUTXOv10, TransactionOutput>>, + in_fork_window: bool, ) -> Result<(), DbError> { match *self { CurrencyDBsWriteQuery::WriteTx(ref tx_doc) => { crate::indexes::transactions::apply_and_write_tx( - blockstamp, - &databases, + db, + w, tx_doc.deref(), + in_fork_window, )?; } - CurrencyDBsWriteQuery::RevertTx(ref dal_tx) => { - crate::indexes::transactions::revert_tx(blockstamp, &databases, dal_tx.deref())?; + CurrencyDBsWriteQuery::RevertTx(ref tx_doc) => { + if let Some(block_consumed_sources) = block_consumed_sources_opt { + crate::indexes::transactions::revert_tx( + db, + w, + tx_doc.deref(), + block_consumed_sources, + )?; + } else { + durs_common_tools::fatal_error!( + "Try to revert tx without block_consumed_sources." + ) + } } CurrencyDBsWriteQuery::CreateUD(ref du_amount, ref block_id, ref members) => { - crate::indexes::dividends::create_du( - &databases.du_db, - &databases.balances_db, - du_amount, - *block_id, - members, - false, - )?; + crate::indexes::dividends::create_du(db, w, du_amount, *block_id, members, false)?; } CurrencyDBsWriteQuery::RevertUD(ref du_amount, ref block_id, ref members) => { - crate::indexes::dividends::create_du( - &databases.du_db, - &databases.balances_db, - du_amount, - *block_id, - members, - true, - )?; + crate::indexes::dividends::create_du(db, w, du_amount, *block_id, members, true)?; } } Ok(()) diff --git a/lib/modules/blockchain/blockchain/src/dbex.rs b/lib/modules/blockchain/blockchain/src/dbex.rs index a25e09c35c1540e3b0bfbfa4a3f6b676863a0de7..0d777fe60834a117db593e78935d2310d580e26e 100644 --- a/lib/modules/blockchain/blockchain/src/dbex.rs +++ b/lib/modules/blockchain/blockchain/src/dbex.rs @@ -18,7 +18,6 @@ use crate::*; use dubp_block_doc::block::BlockDocumentTrait; use dubp_common_doc::BlockNumber; -use dubp_user_docs::documents::transaction::*; use dup_crypto::keys::*; use durs_bc_db_reader::BcDbRo; use durs_wot::data::rusty::RustyWebOfTrust; @@ -204,18 +203,19 @@ pub fn dbex_fork_tree(profile_path: PathBuf, _csv: bool) { } /// Execute DbExTxQuery -pub fn dbex_tx(profile_path: PathBuf, _csv: bool, query: &DbExTxQuery) { +pub fn dbex_tx(profile_path: PathBuf, _csv: bool, _query: &DbExTxQuery) { // Get db path - let db_path = durs_conf::get_blockchain_db_path(profile_path.clone()); + let _db_path = durs_conf::get_blockchain_db_path(profile_path.clone()); - // Open DB + unimplemented!(); + + /*// Open DB let load_db_begin = SystemTime::now(); let db = if let Some(db) = open_bc_db_ro(profile_path) { db } else { return; }; - let currency_databases = CurrencyV10DBs::open(Some(&db_path)); let load_dbs_duration = SystemTime::now() .duration_since(load_db_begin) .expect("duration_since error !"); @@ -260,7 +260,7 @@ pub fn dbex_tx(profile_path: PathBuf, _csv: bool, query: &DbExTxQuery) { "Request processed in {}.{:06} seconds.", req_process_duration.as_secs(), req_process_duration.subsec_micros() - ); + );*/ } /// Execute DbExWotQuery diff --git a/lib/modules/blockchain/blockchain/src/dubp/apply/mod.rs b/lib/modules/blockchain/blockchain/src/dubp/apply/mod.rs index 551f3bddb7dc9a941edcad099794d3bd4281073b..5d26d601dbe186b89d2428176e32c3771726ebf5 100644 --- a/lib/modules/blockchain/blockchain/src/dubp/apply/mod.rs +++ b/lib/modules/blockchain/blockchain/src/dubp/apply/mod.rs @@ -21,9 +21,10 @@ use dubp_common_doc::BlockNumber; use dubp_user_docs::documents::transaction::{TxAmount, TxBase}; use dup_crypto::keys::*; use durs_bc_db_reader::blocks::DbBlock; +use durs_bc_db_reader::indexes::sources::get_block_consumed_sources_; use durs_bc_db_reader::indexes::sources::SourceAmount; use durs_bc_db_writer::writers::requests::*; -use durs_bc_db_writer::BinFreeStructDb; +use durs_bc_db_writer::{BinFreeStructDb, Db, DbError, DbWriter}; use durs_common_tools::fatal_error; use durs_wot::data::NewLinkResult; use durs_wot::{WebOfTrust, WotId}; @@ -47,6 +48,8 @@ pub enum ApplyValidBlockError { #[inline] pub fn apply_valid_block<W: WebOfTrust>( + db: &Db, + w: &mut DbWriter, block: BlockDocument, wot_index: &mut HashMap<PubKey, WotId>, wot_db: &BinFreeStructDb<W>, @@ -54,12 +57,14 @@ pub fn apply_valid_block<W: WebOfTrust>( ) -> Result<ValidBlockApplyReqs, ApplyValidBlockError> { match block { BlockDocument::V10(block_v10) => { - apply_valid_block_v10(block_v10, wot_index, wot_db, expire_certs) + apply_valid_block_v10(db, w, block_v10, wot_index, wot_db, expire_certs) } } } pub fn apply_valid_block_v10<W: WebOfTrust>( + db: &Db, + w: &mut DbWriter, mut block: BlockDocumentV10, wot_index: &mut HashMap<PubKey, WotId>, wot_db: &BinFreeStructDb<W>, @@ -80,11 +85,8 @@ pub fn apply_valid_block_v10<W: WebOfTrust>( let pubkey = joiner.issuers()[0]; if let Some(idty_doc) = identities.get(&pubkey) { // Newcomer - let wot_id = WotId( - wot_db - .read(WebOfTrust::size) - .expect("Fatal error : fail to read WotDB !"), - ); + let wot_id = durs_bc_db_writer::indexes::identities::create_wot_id(db, w) + .expect("Fatal error : fail to create WotId !"); wot_db .write(|db| { db.add_node(); @@ -231,7 +233,7 @@ pub fn apply_valid_block_v10<W: WebOfTrust>( } for tx in &block.transactions { - currency_dbs_requests.push(CurrencyDBsWriteQuery::WriteTx(Box::new(tx.unwrap_doc()))); + currency_dbs_requests.push(CurrencyDBsWriteQuery::WriteTx(Box::new(tx.clone()))); } /*// Calculate the state of the wot @@ -295,3 +297,18 @@ pub fn apply_valid_block_v10<W: WebOfTrust>( currency_dbs_requests, )) } + +/// Execute currency queries +#[inline] +pub fn exec_currency_queries( + db: &Db, + w: &mut DbWriter, + block_number: BlockNumber, + currency_queries: Vec<CurrencyDBsWriteQuery>, +) -> Result<(), DbError> { + let mut block_consumed_sources = get_block_consumed_sources_(db, w.as_ref(), block_number)?; + for query in ¤cy_queries { + query.apply(db, w, block_consumed_sources.as_mut(), true)?; + } + Ok(()) +} diff --git a/lib/modules/blockchain/blockchain/src/dubp/mod.rs b/lib/modules/blockchain/blockchain/src/dubp/mod.rs index 13c3b485896959ef89920ef2768cd082265beb6d..14060ec9b6e2cfcfa28e06ab1d7999eebd6a3166 100644 --- a/lib/modules/blockchain/blockchain/src/dubp/mod.rs +++ b/lib/modules/blockchain/blockchain/src/dubp/mod.rs @@ -65,11 +65,13 @@ impl From<ApplyValidBlockError> for BlockError { pub fn check_and_apply_block( bc: &mut BlockchainModule, + db: &Db, + w: &mut DbWriter, block_doc: BlockDocument, ) -> Result<CheckAndApplyBlockReturn, BlockError> { // Get BlockDocument && check if already have block let already_have_block = durs_bc_db_reader::blocks::already_have_block( - &bc.db, + db, block_doc.blockstamp(), block_doc.previous_hash(), )?; @@ -97,7 +99,7 @@ pub fn check_and_apply_block( // Verify block validity (check all protocol rule, very long !) verify_block_validity( &block_doc, - &bc.db, + db, &bc.wot_databases.certs_db, &bc.wot_index, &bc.wot_databases.wot_db, @@ -117,6 +119,8 @@ pub fn check_and_apply_block( } Ok(CheckAndApplyBlockReturn::ValidMainBlock(apply_valid_block( + db, + w, block_doc, &mut bc.wot_index, &bc.wot_databases.wot_db, @@ -138,7 +142,7 @@ pub fn check_and_apply_block( expire_certs: None, }; - if durs_bc_db_writer::blocks::insert_new_fork_block(&bc.db, &mut bc.fork_tree, dal_block) + if durs_bc_db_writer::blocks::insert_new_fork_block(&db, w, &mut bc.fork_tree, dal_block) .expect("durs_bc_db_writer::writers::block::insert_new_fork_block() : DbError") { Ok(CheckAndApplyBlockReturn::ForkBlock) diff --git a/lib/modules/blockchain/blockchain/src/dunp/receiver.rs b/lib/modules/blockchain/blockchain/src/dunp/receiver.rs index 0bba39410b63eaaebdc97318439a2feb2ca15cf8..0776a10c37974c6e69aa482d1762792676bba5a1 100644 --- a/lib/modules/blockchain/blockchain/src/dunp/receiver.rs +++ b/lib/modules/blockchain/blockchain/src/dunp/receiver.rs @@ -16,6 +16,7 @@ //! Sub-module managing the reception of messages from the inter-node network layer //! (received by the intermediaries of events transmitted by the network module). +use crate::dubp::apply::exec_currency_queries; use crate::*; use dubp_common_doc::traits::Document; use dubp_user_docs::documents::UserDocumentDUBP; @@ -35,112 +36,122 @@ pub fn receive_user_documents(_bc: &mut BlockchainModule, network_documents: &[U pub fn receive_blocks(bc: &mut BlockchainModule, blocks: Vec<BlockDocument>) { debug!("BlockchainModule : receive_blocks({})", blocks.len()); - let mut save_blocks_dbs = false; + let mut save_dbs = false; let mut save_wots_dbs = false; - let mut save_currency_dbs = false; let mut first_orphan = true; for block in blocks.into_iter() { let blockstamp = block.blockstamp(); - match check_and_apply_block(bc, block) { - Ok(check_block_return) => match check_block_return { - CheckAndApplyBlockReturn::ValidMainBlock(ValidBlockApplyReqs( - bc_db_query, - wot_dbs_queries, - tx_dbs_queries, - )) => { - let new_current_block = bc_db_query.get_block_doc_copy(); - bc.current_blockstamp = new_current_block.blockstamp(); - // Apply db requests - bc_db_query - .apply( - &bc.db, + + // Open write db transaction + let db = bc.take_db(); + db.write(|mut w| { + match check_and_apply_block(bc, &db, &mut w, block) { + Ok(check_block_return) => match check_block_return { + CheckAndApplyBlockReturn::ValidMainBlock(ValidBlockApplyReqs( + bc_db_query, + wot_dbs_queries, + tx_dbs_queries, + )) => { + let new_current_block = bc_db_query.get_block_doc_copy(); + bc.current_blockstamp = new_current_block.blockstamp(); + + // Apply db requests + bc_db_query.apply( + &db, + &mut w, &mut bc.fork_tree, unwrap!(bc.currency_params).fork_window_size, None, - ) - .expect("Fatal error : Fail to apply DBWriteRequest !"); - for query in &wot_dbs_queries { - query - .apply( - &blockstamp, - &unwrap!(bc.currency_params), - &bc.wot_databases, - &bc.db, - ) - .expect("Fatal error : Fail to apply WotsDBsWriteRequest !"); + )?; + for query in &wot_dbs_queries { + query + .apply( + &db, + &mut w, + &blockstamp, + &unwrap!(bc.currency_params), + &bc.wot_databases, + ) + .expect("Fatal error : Fail to apply WotsDBsWriteRequest !"); + } + exec_currency_queries(&db, &mut w, blockstamp.id, tx_dbs_queries)?; + if !wot_dbs_queries.is_empty() { + save_wots_dbs = true; + } + durs_bc_db_writer::blocks::fork_tree::save_fork_tree( + &db, + &mut w, + &bc.fork_tree, + )?; + save_dbs = true; + events::sent::send_event( + bc, + &BlockchainEvent::StackUpValidBlock(Box::new(new_current_block)), + ); } - for query in &tx_dbs_queries { - query - .apply(&blockstamp, &bc.currency_databases) - .expect("Fatal error : Fail to apply CurrencyDBsWriteRequest !"); + CheckAndApplyBlockReturn::ForkBlock => { + info!("blockchain: new fork block(#{})", blockstamp); + if let Ok(Some(new_bc_branch)) = fork_algo::fork_resolution_algo( + &db, + &bc.fork_tree, + unwrap!(bc.currency_params).fork_window_size, + bc.current_blockstamp, + &bc.invalid_forks, + ) { + info!("blockchain: apply_rollback({:?})", new_bc_branch); + rollback::apply_rollback(bc, new_bc_branch); + } } - save_blocks_dbs = true; - if !wot_dbs_queries.is_empty() { - save_wots_dbs = true; + CheckAndApplyBlockReturn::OrphanBlock => { + if first_orphan { + first_orphan = false; + debug!("blockchain: new orphan block(#{})", blockstamp); + crate::requests::sent::request_orphan_previous(bc, blockstamp); + } } - if !tx_dbs_queries.is_empty() { - save_currency_dbs = true; + }, + Err(e) => match e { + BlockError::VerifyBlockHashError(_) | BlockError::InvalidBlock(_) => { + warn!("InvalidBlock(#{})", blockstamp.id.0); + crate::events::sent::send_event( + bc, + &BlockchainEvent::RefusedBlock(blockstamp), + ); } - events::sent::send_event( - bc, - &BlockchainEvent::StackUpValidBlock(Box::new(new_current_block)), - ); - } - CheckAndApplyBlockReturn::ForkBlock => { - info!("blockchain: new fork block(#{})", blockstamp); - if let Ok(Some(new_bc_branch)) = fork_algo::fork_resolution_algo( - &bc.db, - &bc.fork_tree, - unwrap!(bc.currency_params).fork_window_size, - bc.current_blockstamp, - &bc.invalid_forks, - ) { - info!("blockchain: apply_rollback({:?})", new_bc_branch); - rollback::apply_rollback(bc, new_bc_branch); + BlockError::ApplyValidBlockError(e2) => { + error!("ApplyValidBlockError(#{}): {:?}", blockstamp, e2); + crate::events::sent::send_event( + bc, + &BlockchainEvent::RefusedBlock(blockstamp), + ); } - } - CheckAndApplyBlockReturn::OrphanBlock => { - if first_orphan { - first_orphan = false; - debug!("blockchain: new orphan block(#{})", blockstamp); - crate::requests::sent::request_orphan_previous(bc, blockstamp); + BlockError::DbError(e2) => { + error!("BlockError::DbError(#{}): {:?}", blockstamp, e2); + crate::events::sent::send_event( + bc, + &BlockchainEvent::RefusedBlock(blockstamp), + ); } - } - }, - Err(e) => match e { - BlockError::VerifyBlockHashError(_) | BlockError::InvalidBlock(_) => { - warn!("InvalidBlock(#{})", blockstamp.id.0); - crate::events::sent::send_event(bc, &BlockchainEvent::RefusedBlock(blockstamp)); - } - BlockError::ApplyValidBlockError(e2) => { - error!("ApplyValidBlockError(#{}): {:?}", blockstamp, e2); - crate::events::sent::send_event(bc, &BlockchainEvent::RefusedBlock(blockstamp)); - } - BlockError::DbError(e2) => { - error!("BlockError::DbError(#{}): {:?}", blockstamp, e2); - crate::events::sent::send_event(bc, &BlockchainEvent::RefusedBlock(blockstamp)); - } - BlockError::AlreadyHaveBlock => { - debug!("AlreadyHaveBlock(#{})", blockstamp.id); - } - BlockError::BlockOrOutForkWindow => { - debug!("BlockOrOutForkWindow(#{})", blockstamp); - } - }, - } + BlockError::AlreadyHaveBlock => { + debug!("AlreadyHaveBlock(#{})", blockstamp.id); + } + BlockError::BlockOrOutForkWindow => { + debug!("BlockOrOutForkWindow(#{})", blockstamp); + } + }, + } + Ok(w) + }) + .unwrap_or_else(|_| fatal_error!("Fail to check or apply block: {}.", blockstamp)); + bc.db = Some(db); } // Save databases - if save_blocks_dbs { - durs_bc_db_writer::blocks::fork_tree::save_fork_tree(&bc.db, &bc.fork_tree) - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); - bc.db + if save_dbs { + bc.db() .save() .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); } if save_wots_dbs { bc.wot_databases.save_dbs(); } - if save_currency_dbs { - bc.currency_databases.save_dbs(true, true); - } } diff --git a/lib/modules/blockchain/blockchain/src/fork/fork_algo.rs b/lib/modules/blockchain/blockchain/src/fork/fork_algo.rs index 0ab1c8395baa43cfcfff6324cc19e5c797e675b0..f1b8911fcc8e7e9c91ae2829507c5c10862a7576 100644 --- a/lib/modules/blockchain/blockchain/src/fork/fork_algo.rs +++ b/lib/modules/blockchain/blockchain/src/fork/fork_algo.rs @@ -118,16 +118,20 @@ mod tests { ); // Insert mock blocks in forks_dbs - for block in &main_branch { - durs_bc_db_writer::blocks::insert_new_head_block( - &db, - Some(&mut fork_tree), - DbBlock { - block: block.clone(), - expire_certs: None, - }, - )?; - } + db.write(|mut w| { + for block in &main_branch { + durs_bc_db_writer::blocks::insert_new_head_block( + &db, + &mut w, + Some(&mut fork_tree), + DbBlock { + block: block.clone(), + expire_certs: None, + }, + )?; + } + Ok(w) + })?; // Local blockchain must contain at least `fork_window_size +2` blocks assert!(durs_bc_db_reader::blocks::get_block_in_local_blockchain( @@ -184,23 +188,27 @@ mod tests { id: BlockNumber(fork_point.number().0 + 4), hash: BlockHash(dup_crypto_tests_tools::mocks::hash('A')), }; - assert_eq!( - true, - durs_bc_db_writer::blocks::insert_new_fork_block( - &db, - &mut fork_tree, - DbBlock { - block: BlockDocument::V10( - dubp_user_docs_tests_tools::mocks::gen_empty_timed_block_v10( - determining_blockstamp, - *ADVANCE_TIME, - dup_crypto_tests_tools::mocks::hash('A'), - ) - ), - expire_certs: None, - }, - )?, - ); + db.write(|mut w| { + assert_eq!( + true, + durs_bc_db_writer::blocks::insert_new_fork_block( + &db, + &mut w, + &mut fork_tree, + DbBlock { + block: BlockDocument::V10( + dubp_user_docs_tests_tools::mocks::gen_empty_timed_block_v10( + determining_blockstamp, + *ADVANCE_TIME, + dup_crypto_tests_tools::mocks::hash('A'), + ) + ), + expire_certs: None, + }, + )?, + ); + Ok(w) + })?; // Must fork assert_eq!( @@ -262,20 +270,22 @@ mod tests { fork_tree: &mut ForkTree, blocks: &[BlockDocument], ) -> Result<(), DbError> { - for block in blocks { - assert_eq!( - true, - durs_bc_db_writer::blocks::insert_new_fork_block( - db, - fork_tree, - DbBlock { - block: block.clone(), - expire_certs: None, - }, - )?, - ); - } - - Ok(()) + db.write(|mut w| { + for block in blocks { + assert_eq!( + true, + durs_bc_db_writer::blocks::insert_new_fork_block( + db, + &mut w, + fork_tree, + DbBlock { + block: block.clone(), + expire_certs: None, + }, + )?, + ); + } + Ok(w) + }) } } diff --git a/lib/modules/blockchain/blockchain/src/fork/revert_block.rs b/lib/modules/blockchain/blockchain/src/fork/revert_block.rs index 9f9002cc222c0062aebf672e3c77bb712f57e889..81b18c3d6fa97bebc2c83f7db34b1198c413e0de 100644 --- a/lib/modules/blockchain/blockchain/src/fork/revert_block.rs +++ b/lib/modules/blockchain/blockchain/src/fork/revert_block.rs @@ -15,7 +15,6 @@ //! Sub-module that applies a block backwards. -use dubp_block_doc::block::v10::TxDocOrTxHash; use dubp_block_doc::block::{BlockDocument, BlockDocumentTrait, BlockDocumentV10}; use dubp_common_doc::traits::Document; use dubp_common_doc::{BlockNumber, Blockstamp}; @@ -23,9 +22,8 @@ use dubp_user_docs::documents::transaction::{TxAmount, TxBase}; use dup_crypto::keys::*; use durs_bc_db_reader::blocks::DbBlock; use durs_bc_db_reader::indexes::sources::SourceAmount; -use durs_bc_db_writer::indexes::transactions::DbTxV10; use durs_bc_db_writer::writers::requests::*; -use durs_bc_db_writer::{BinFreeStructDb, DbError, TxV10Datas}; +use durs_bc_db_writer::{BinFreeStructDb, DbError}; use durs_common_tools::fatal_error; use durs_wot::data::{NewLinkResult, RemLinkResult}; use durs_wot::{WebOfTrust, WotId}; @@ -59,7 +57,6 @@ pub fn revert_block<W: WebOfTrust>( dal_block: DbBlock, wot_index: &mut HashMap<PubKey, WotId>, wot_db: &BinFreeStructDb<W>, - txs_db: &BinFreeStructDb<TxV10Datas>, ) -> Result<ValidBlockRevertReqs, RevertValidBlockError> { match dal_block.block { BlockDocument::V10(block_v10) => revert_block_v10( @@ -67,7 +64,6 @@ pub fn revert_block<W: WebOfTrust>( unwrap!(dal_block.expire_certs), wot_index, wot_db, - txs_db, ), } } @@ -77,27 +73,7 @@ pub fn revert_block_v10<W: WebOfTrust>( expire_certs: HashMap<(WotId, WotId), BlockNumber>, wot_index: &mut HashMap<PubKey, WotId>, wot_db: &BinFreeStructDb<W>, - txs_db: &BinFreeStructDb<TxV10Datas>, ) -> Result<ValidBlockRevertReqs, RevertValidBlockError> { - // Get transactions - let dal_txs: Vec<DbTxV10> = block - .transactions - .iter() - .map(|tx_enum| match *tx_enum { - TxDocOrTxHash::TxHash(tx_hash) => tx_hash, - TxDocOrTxHash::TxDoc(ref tx_doc) => tx_doc.get_hash_opt().unwrap_or_else(|| { - fatal_error!("Dev error: The hash of a transaction should never be deleted."); - }), - }) - .map(|tx_hash| { - if let Ok(Some(tx)) = txs_db.read(|db| db.get(&tx_hash).cloned()) { - tx - } else { - fatal_error!("revert_block(): tx {} not found !", tx_hash); - } - }) - .collect(); - // Revert reduce block block.generate_inner_hash(); debug!("blockchain: revert_valid_block({})", block.blockstamp()); @@ -105,8 +81,8 @@ pub fn revert_block_v10<W: WebOfTrust>( // REVERT_CURRENCY_EVENTS let mut currency_dbs_requests = Vec::new(); // Revert transactions - for dal_tx in dal_txs.iter().rev() { - currency_dbs_requests.push(CurrencyDBsWriteQuery::RevertTx(Box::new(dal_tx.clone()))); + for tx_doc in block.transactions.iter().rev() { + currency_dbs_requests.push(CurrencyDBsWriteQuery::RevertTx(Box::new(tx_doc.clone()))); } // Revert UD if let Some(du_amount) = block.dividend { diff --git a/lib/modules/blockchain/blockchain/src/fork/rollback.rs b/lib/modules/blockchain/blockchain/src/fork/rollback.rs index e1614e8b9b785617dc663a3985bc16506790e84d..0808c8ee8e1951a806ceea7c124aeb9d6dc6ae80 100644 --- a/lib/modules/blockchain/blockchain/src/fork/rollback.rs +++ b/lib/modules/blockchain/blockchain/src/fork/rollback.rs @@ -13,6 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. +use crate::dubp::apply::exec_currency_queries; use crate::fork::revert_block::ValidBlockRevertReqs; use crate::*; use dubp_common_doc::traits::Document; @@ -28,144 +29,154 @@ pub fn apply_rollback(bc: &mut BlockchainModule, new_bc_branch: Vec<Blockstamp>) let old_current_blockstamp = bc.current_blockstamp; let last_common_block_number = new_bc_branch[0].id.0 - 1; - // Rollback (revert old branch) - while bc.current_blockstamp.id.0 > last_common_block_number { - if let Some(dal_block) = - durs_bc_db_reader::blocks::get_fork_block(&bc.db, bc.current_blockstamp).unwrap_or_else( - |_| { - fatal_error!("revert block {} fail !", bc.current_blockstamp); - }, - ) - { - let blockstamp = dal_block.block.blockstamp(); - debug!("try to revert block #{}", blockstamp); - let ValidBlockRevertReqs { - new_current_blockstamp, - block_query, - wot_queries, - currency_queries, - } = super::revert_block::revert_block( - dal_block, - &mut bc.wot_index, - &bc.wot_databases.wot_db, - &bc.currency_databases.tx_db, - ) - .unwrap_or_else(|_| { - fatal_error!("revert block {} fail !", bc.current_blockstamp); - }); - // Update current blockstamp - bc.current_blockstamp = new_current_blockstamp; - // Apply db requests - block_query - .apply( - &bc.db, - &mut bc.fork_tree, - unwrap!(bc.currency_params).fork_window_size, - None, - ) - .expect("Fatal error : Fail to apply DBWriteRequest !"); - for query in &wot_queries { - query - .apply( - &blockstamp, - &unwrap!(bc.currency_params), - &bc.wot_databases, - &bc.db, - ) - .expect("Fatal error : Fail to apply WotsDBsWriteRequest !"); - } - for query in ¤cy_queries { - query - .apply(&blockstamp, &bc.currency_databases) - .expect("Fatal error : Fail to apply CurrencyDBsWriteRequest !"); - } - debug!("Successfully revert block #{}", blockstamp); - } else { - fatal_error!("apply_rollback(): Not found current block in forks blocks DB !"); - } - } - - // Apply new branch - let mut new_branch_is_valid = true; + // Open write db transaction + let db = bc.take_db(); let mut new_branch_blocks = Vec::with_capacity(new_bc_branch.len()); - for blockstamp in &new_bc_branch { - if let Ok(Some(dal_block)) = durs_bc_db_reader::blocks::get_fork_block(&bc.db, *blockstamp) - { - new_branch_blocks.push(dal_block.clone()); - if let Ok(CheckAndApplyBlockReturn::ValidMainBlock(ValidBlockApplyReqs( - bc_db_query, - wot_dbs_queries, - tx_dbs_queries, - ))) = check_and_apply_block(bc, dal_block.block) + let db_tx_result = db.write(|mut w| { + // Rollback (revert old branch) + while bc.current_blockstamp.id.0 > last_common_block_number { + if let Some(dal_block) = + durs_bc_db_reader::blocks::get_fork_block(&db, bc.current_blockstamp) + .unwrap_or_else(|_| { + fatal_error!("revert block {} fail !", bc.current_blockstamp); + }) { - bc.current_blockstamp = *blockstamp; + let blockstamp = dal_block.block.blockstamp(); + debug!("try to revert block #{}", blockstamp); + let ValidBlockRevertReqs { + new_current_blockstamp, + block_query, + wot_queries, + currency_queries, + } = super::revert_block::revert_block( + dal_block, + &mut bc.wot_index, + &bc.wot_databases.wot_db, + ) + .unwrap_or_else(|_| { + fatal_error!("revert block {} fail !", bc.current_blockstamp); + }); + // Update current blockstamp + bc.current_blockstamp = new_current_blockstamp; // Apply db requests - bc_db_query + block_query .apply( - &bc.db, + &db, + &mut w, &mut bc.fork_tree, unwrap!(bc.currency_params).fork_window_size, None, ) .expect("Fatal error : Fail to apply DBWriteRequest !"); - for query in &wot_dbs_queries { + for query in &wot_queries { query .apply( + &db, + &mut w, &blockstamp, &unwrap!(bc.currency_params), &bc.wot_databases, - &bc.db, ) .expect("Fatal error : Fail to apply WotsDBsWriteRequest !"); } - for query in &tx_dbs_queries { - query - .apply(&blockstamp, &bc.currency_databases) - .expect("Fatal error : Fail to apply CurrencyDBsWriteRequest !"); + exec_currency_queries(&db, &mut w, blockstamp.id, currency_queries)?; + + debug!("Successfully revert block #{}", blockstamp); + } else { + fatal_error!("apply_rollback(): Not found current block in forks blocks DB !"); + } + } + + // Apply new branch + let mut new_branch_is_valid = true; + for blockstamp in &new_bc_branch { + if let Ok(Some(dal_block)) = durs_bc_db_reader::blocks::get_fork_block(&db, *blockstamp) + { + new_branch_blocks.push(dal_block.clone()); + if let Ok(CheckAndApplyBlockReturn::ValidMainBlock(ValidBlockApplyReqs( + bc_db_query, + wot_dbs_queries, + tx_dbs_queries, + ))) = check_and_apply_block(bc, &db, &mut w, dal_block.block) + { + bc.current_blockstamp = *blockstamp; + // Apply db requests + + bc_db_query + .apply( + &db, + &mut w, + &mut bc.fork_tree, + unwrap!(bc.currency_params).fork_window_size, + None, + ) + .expect("Fatal error : Fail to apply DBWriteRequest !"); + for query in &wot_dbs_queries { + query + .apply( + &db, + &mut w, + &blockstamp, + &unwrap!(bc.currency_params), + &bc.wot_databases, + ) + .expect("Fatal error : Fail to apply WotsDBsWriteRequest !"); + } + exec_currency_queries(&db, &mut w, blockstamp.id, tx_dbs_queries)?; + } else { + new_branch_is_valid = false; + bc.invalid_forks.insert(*blockstamp); + break; } } else { - new_branch_is_valid = false; - bc.invalid_forks.insert(*blockstamp); - break; + fatal_error!( + "apply_rollback(): Fail to get block {} on new branch in forks blocks DB !", + blockstamp + ); } - } else { - fatal_error!( - "apply_rollback(): Fail to get block {} on new branch in forks blocks DB !", - blockstamp - ); } - } - if new_branch_is_valid { - // update main branch in fork tree - if let Err(err) = durs_bc_db_writer::blocks::fork_tree::change_main_branch( - &bc.db, - &mut bc.fork_tree, - old_current_blockstamp, - bc.current_blockstamp, - ) { - fatal_error!("DbError: ForksDB: {:?}", err); + if new_branch_is_valid { + // update main branch in fork tree + if let Err(err) = durs_bc_db_writer::blocks::fork_tree::change_main_branch( + &db, + &mut bc.fork_tree, + old_current_blockstamp, + bc.current_blockstamp, + ) { + fatal_error!("DbError: ForksDB: {:?}", err); + } + durs_bc_db_writer::blocks::fork_tree::save_fork_tree(&db, &mut w, &bc.fork_tree)?; + + Ok(w) + } else { + Err(DbError::WriteAbort { + reason: "Abort rollback: new branch is invalid.".to_owned(), + }) } + }); + bc.db = Some(db); - // save dbs - durs_bc_db_writer::blocks::fork_tree::save_fork_tree(&bc.db, &bc.fork_tree) - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); - bc.db - .save() - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); - bc.wot_databases.save_dbs(); - bc.currency_databases.save_dbs(true, true); - // Send events stackUpValidBlock - for db_block in new_branch_blocks { - events::sent::send_event( - bc, - &BlockchainEvent::StackUpValidBlock(Box::new(db_block.block)), - ) + match db_tx_result { + Ok(()) => { + // Save db + bc.wot_databases.save_dbs(); + bc.db() + .save() + .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); + // Send events stackUpValidBlock + for db_block in new_branch_blocks { + events::sent::send_event( + bc, + &BlockchainEvent::StackUpValidBlock(Box::new(db_block.block)), + ) + } + } + Err(DbError::WriteAbort { .. }) => { + // reload dbs + let dbs_path = durs_conf::get_blockchain_db_path(bc.profile_path.clone()); + bc.wot_databases = WotsV10DBs::open(Some(&dbs_path)); } - } else { - // reload dbs - let dbs_path = durs_conf::get_blockchain_db_path(bc.profile_path.clone()); - bc.wot_databases = WotsV10DBs::open(Some(&dbs_path)); - bc.currency_databases = CurrencyV10DBs::open(Some(&dbs_path)); + Err(e) => fatal_error!("Fatal error : Fail to write rollback in DB: {:?} !", e), } } diff --git a/lib/modules/blockchain/blockchain/src/fork/stackable_blocks.rs b/lib/modules/blockchain/blockchain/src/fork/stackable_blocks.rs index 2e2ee31b82ffac20a47ce1ae748d310cf2cbb1d7..0248b0fea1eaace7a6076731a68c78712efff828 100644 --- a/lib/modules/blockchain/blockchain/src/fork/stackable_blocks.rs +++ b/lib/modules/blockchain/blockchain/src/fork/stackable_blocks.rs @@ -15,26 +15,32 @@ //! Sub-module that finds and applies the orphaned blocks that have become stackable on the local blockchain. +use crate::dubp::apply::exec_currency_queries; use crate::*; use dubp_block_doc::block::BlockDocumentTrait; use dubp_common_doc::traits::Document; use unwrap::unwrap; pub fn apply_stackable_blocks(bc: &mut BlockchainModule) { - 'blockchain: loop { + 'blocks: loop { let stackable_blocks = - durs_bc_db_reader::blocks::get_stackables_blocks(&bc.db, bc.current_blockstamp) + durs_bc_db_reader::blocks::get_stackables_blocks(bc.db(), bc.current_blockstamp) .expect("Fatal error : Fail to read ForksDB !"); + if stackable_blocks.is_empty() { - break 'blockchain; - } else { - for stackable_block in stackable_blocks { - debug!("stackable_block({})", stackable_block.block.number()); + break 'blocks; + } - let stackable_block_number = stackable_block.block.number(); - let stackable_block_blockstamp = stackable_block.block.blockstamp(); + for stackable_block in stackable_blocks { + debug!("stackable_block({})", stackable_block.block.number()); - match check_and_apply_block(bc, stackable_block.block) { + let stackable_block_number = stackable_block.block.number(); + let stackable_block_blockstamp = stackable_block.block.blockstamp(); + + // Apply db requests + let db = bc.take_db(); + let db_write_result = db.write(|mut w| { + match check_and_apply_block(bc, &db, &mut w, stackable_block.block) { Ok(CheckAndApplyBlockReturn::ValidMainBlock(ValidBlockApplyReqs( bc_db_query, wot_dbs_queries, @@ -42,30 +48,35 @@ pub fn apply_stackable_blocks(bc: &mut BlockchainModule) { ))) => { let new_current_block = bc_db_query.get_block_doc_copy(); let blockstamp = new_current_block.blockstamp(); - // Apply db requests + bc_db_query .apply( - &bc.db, + &db, + &mut w, &mut bc.fork_tree, unwrap!(bc.currency_params).fork_window_size, None, ) - .expect("Fatal error : Fail to apply DBWriteRequest !"); + .expect("DB error : Fail to apply block query !"); for query in &wot_dbs_queries { query .apply( + &db, + &mut w, &blockstamp, &unwrap!(bc.currency_params), &bc.wot_databases, - &bc.db, ) - .expect("Fatal error : Fail to apply WotsDBsWriteRequest !"); - } - for query in &tx_dbs_queries { - query - .apply(&blockstamp, &bc.currency_databases) - .expect("Fatal error : Fail to apply CurrencyDBsWriteRequest !"); + .expect("DB error : Fail to apply wot queries !"); } + exec_currency_queries(&db, &mut w, blockstamp.id, tx_dbs_queries) + .expect("DB error : Fail to apply currency queries !"); + durs_bc_db_writer::blocks::fork_tree::save_fork_tree( + &db, + &mut w, + &bc.fork_tree, + ) + .expect("DB error : Fail to save fork tree !"); debug!("success to stackable_block({})", stackable_block_number); bc.current_blockstamp = stackable_block_blockstamp; @@ -73,7 +84,6 @@ pub fn apply_stackable_blocks(bc: &mut BlockchainModule) { bc, &BlockchainEvent::StackUpValidBlock(Box::new(new_current_block)), ); - continue 'blockchain; } Ok(re) => warn!( "fail to stackable_block({}) : {:?}", @@ -84,16 +94,26 @@ pub fn apply_stackable_blocks(bc: &mut BlockchainModule) { stackable_block_number, e ), } + Ok(w) + }); + bc.db = Some(db); + match db_write_result { + Ok(()) => continue 'blocks, + Err(e) => { + debug!( + "Invalid stackable block {}: {:?}", + stackable_block_blockstamp, e + ); + } } - // Save databases - durs_bc_db_writer::blocks::fork_tree::save_fork_tree(&bc.db, &bc.fork_tree) - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); - bc.db - .save() - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); - bc.wot_databases.save_dbs(); - bc.currency_databases.save_dbs(true, true); - break 'blockchain; } + + // If we reach this point, it is that none of the stackable blocks are valid + break 'blocks; } + // Save database + bc.db() + .save() + .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); + bc.wot_databases.save_dbs(); } diff --git a/lib/modules/blockchain/blockchain/src/lib.rs b/lib/modules/blockchain/blockchain/src/lib.rs index 8a857c9ddb653c62c94813f26dd27788be5a527d..fc3e46e69046bb1bc6ca686805122b80ca8026d6 100644 --- a/lib/modules/blockchain/blockchain/src/lib.rs +++ b/lib/modules/blockchain/blockchain/src/lib.rs @@ -97,15 +97,13 @@ pub struct BlockchainModule { /// Currency pub currency: Option<CurrencyName>, /// Database - pub db: Db, + pub db: Option<Db>, /// Fork tree pub fork_tree: ForkTree, /// Wot index pub wot_index: HashMap<PubKey, WotId>, /// Wots Databases pub wot_databases: WotsV10DBs, - /// Currency databases - currency_databases: CurrencyV10DBs, /// Currency parameters pub currency_params: Option<CurrencyParameters>, /// Current blockstamp @@ -193,7 +191,6 @@ impl BlockchainModule { let fork_tree = durs_bc_db_reader::current_meta_datas::get_fork_tree(&db) .unwrap_or_else(|_| fatal_error!("Fail to get fork tree.")); let wot_databases = WotsV10DBs::open(Some(&dbs_path)); - let currency_databases = CurrencyV10DBs::open(Some(&dbs_path)); // Get current blockstamp let current_blockstamp = durs_bc_db_reader::current_meta_datas::get_current_blockstamp(&db) @@ -225,11 +222,10 @@ impl BlockchainModule { currency_params, current_blockstamp, consensus: Blockstamp::default(), - db, + db: Some(db), fork_tree, wot_index, wot_databases, - currency_databases, pending_block: None, invalid_forks: HashSet::new(), pending_network_requests: HashMap::new(), @@ -275,6 +271,22 @@ impl BlockchainModule { self.main_loop(blockchain_receiver); } } + /// Take blockchain database + #[inline] + pub fn take_db(&mut self) -> Db { + self.db + .take() + .unwrap_or_else(|| fatal_error!("Dev error: none bc db.")) + } + /// Reference to blockchain database + #[inline] + pub fn db(&self) -> &Db { + if let Some(ref db) = self.db { + db + } else { + fatal_error!("Dev error: none bc db.") + } + } /// Start blockchain main loop pub fn main_loop(&mut self, blockchain_receiver: &mpsc::Receiver<DursMsg>) { diff --git a/lib/modules/blockchain/blockchain/src/requests/received.rs b/lib/modules/blockchain/blockchain/src/requests/received.rs index 197fd26c2af33c76d1cec8c0220d8f235c2f5887..1de8ba259f1a4a9d5f894248bfe2e49f1782e4c8 100644 --- a/lib/modules/blockchain/blockchain/src/requests/received.rs +++ b/lib/modules/blockchain/blockchain/src/requests/received.rs @@ -38,7 +38,7 @@ pub fn receive_req( debug!("BlockchainModule : receive BlockchainRequest::CurrentBlock()"); if let Ok(block_opt) = durs_bc_db_reader::blocks::get_block_in_local_blockchain( - &bc.db, + bc.db(), bc.current_blockstamp.id, ) { if let Some(block) = block_opt { @@ -71,7 +71,7 @@ pub fn receive_req( ); if let Ok(block_opt) = - durs_bc_db_reader::blocks::get_block_in_local_blockchain(&bc.db, block_number) + durs_bc_db_reader::blocks::get_block_in_local_blockchain(bc.db(), block_number) { if let Some(block) = block_opt { debug!( @@ -106,7 +106,7 @@ pub fn receive_req( ); if let Ok(blocks) = durs_bc_db_reader::blocks::get_blocks_in_local_blockchain( - &bc.db, + bc.db(), first_block_number, count, ) { @@ -145,7 +145,7 @@ pub fn receive_req( .map(|p| { ( p, - durs_bc_db_reader::indexes::identities::get_uid(&bc.db, &p) + durs_bc_db_reader::indexes::identities::get_uid(bc.db(), &p) .expect("Fatal error : get_uid : Fail to read WotV10DB !"), ) }) diff --git a/lib/modules/blockchain/blockchain/src/sync/apply/blocks_worker.rs b/lib/modules/blockchain/blockchain/src/sync/apply/blocks_worker.rs index 9b94c146fddbfe05e3940f8a8666f6a6f567cc55..684857e4e75ed39b2a2decefbf5b034ff7c0e821 100644 --- a/lib/modules/blockchain/blockchain/src/sync/apply/blocks_worker.rs +++ b/lib/modules/blockchain/blockchain/src/sync/apply/blocks_worker.rs @@ -49,12 +49,16 @@ pub fn execute( all_wait_duration += SystemTime::now().duration_since(wait_begin).unwrap(); // Apply db request - req.apply( - &db, - &mut fork_tree, - fork_window_size, - Some(target_blockstamp), - ) + db.write(|mut w| { + req.apply( + &db, + &mut w, + &mut fork_tree, + fork_window_size, + Some(target_blockstamp), + )?; + Ok(w) + }) .expect("Fatal error : Fail to apply DBWriteRequest !"); chunk_index += 1; @@ -80,18 +84,19 @@ pub fn execute( // Increment progress bar (last chunk) apply_pb.inc(); - // Save blockchain, and fork databases + // Save fork tree println!(); println!("Write indexs in files..."); - info!("Save blockchain and forks databases in files..."); - durs_bc_db_writer::blocks::fork_tree::save_fork_tree(&db, &fork_tree) - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); - db.save() - .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); + info!("Save db..."); + db.write(|mut w| { + durs_bc_db_writer::blocks::fork_tree::save_fork_tree(&db, &mut w, &fork_tree)?; + Ok(w) + }) + .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); // Send finish signal sender_sync_thread - .send(MessForSyncThread::ApplyFinish()) + .send(MessForSyncThread::ApplyFinish(Some(db))) .expect("Fatal error : sync_thread unrechable !"); let blocks_job_duration = SystemTime::now().duration_since(blocks_job_begin).unwrap() - all_wait_duration; diff --git a/lib/modules/blockchain/blockchain/src/sync/apply/mod.rs b/lib/modules/blockchain/blockchain/src/sync/apply/mod.rs index e0d7b6a07e25addeff481a21981267832319e085..b50c9aaf7bd1f91473db319773d264defd463789 100644 --- a/lib/modules/blockchain/blockchain/src/sync/apply/mod.rs +++ b/lib/modules/blockchain/blockchain/src/sync/apply/mod.rs @@ -19,8 +19,9 @@ pub mod wot_worker; use crate::dubp; use crate::dubp::apply::apply_valid_block; -use crate::dubp::apply::ValidBlockApplyReqs; +use crate::dubp::apply::{ApplyValidBlockError, ValidBlockApplyReqs}; use crate::sync::SyncJobsMess; +use crate::Db; use dubp_block_doc::block::{BlockDocument, BlockDocumentTrait}; use dubp_common_doc::traits::Document; use dubp_common_doc::{BlockNumber, Blockstamp}; @@ -58,6 +59,7 @@ pub struct BlockApplicator { pub blocks_not_expiring: VecDeque<u64>, pub last_block_expiring: isize, // databases + pub db: Option<Db>, pub wot_index: HashMap<PubKey, WotId>, pub wot_databases: WotsV10DBs, pub certs_db: BinFreeStructDb<CertsExpirV10Datas>, @@ -104,13 +106,26 @@ impl BlockApplicator { // Apply block let apply_valid_block_begin = SystemTime::now(); + let mut apply_valid_block_result = Err(ApplyValidBlockError::DBsCorrupted); + if let Some(db) = self.db.take() { + db.write(|mut w| { + apply_valid_block_result = apply_valid_block::<RustyWebOfTrust>( + &db, + &mut w, + block_doc, + &mut self.wot_index, + &self.wot_databases.wot_db, + &expire_certs, + ); + Ok(w) + }) + .expect("Fail to apply valid block."); + self.db = Some(db); + } else { + fatal_error!("Dev error: BlockApplicator must have DB.") + } if let Ok(ValidBlockApplyReqs(block_req, wot_db_reqs, currency_db_reqs)) = - apply_valid_block::<RustyWebOfTrust>( - block_doc, - &mut self.wot_index, - &self.wot_databases.wot_db, - &expire_certs, - ) + apply_valid_block_result { self.all_apply_valid_block_duration += SystemTime::now() .duration_since(apply_valid_block_begin) @@ -154,13 +169,19 @@ impl BlockApplicator { "Fail to communicate with tx worker thread, please reset data & resync !", ) } + + // In fork window ? + let fork_window_size = unwrap!(self.currency_params).fork_window_size as u32; + let in_fork_window = self.target_blockstamp.id.0 < fork_window_size + || self.current_blockstamp.id.0 > self.target_blockstamp.id.0 - fork_window_size; + // Send blocks and wot requests to wot worker thread for req in currency_db_reqs { self.sender_tx_thread - .send(SyncJobsMess::CurrencyDBsWriteQuery( - self.current_blockstamp, - req.clone(), - )) + .send(SyncJobsMess::CurrencyDBsWriteQuery { + in_fork_window, + req: req.clone(), + }) .expect( "Fail to communicate with tx worker thread, please reset data & resync !", ); diff --git a/lib/modules/blockchain/blockchain/src/sync/apply/txs_worker.rs b/lib/modules/blockchain/blockchain/src/sync/apply/txs_worker.rs index 09a0dcb20e75fe9c434ff271df045f682cfaaa11..e49686651ac8063234d0b1b1c75726a715150fcd 100644 --- a/lib/modules/blockchain/blockchain/src/sync/apply/txs_worker.rs +++ b/lib/modules/blockchain/blockchain/src/sync/apply/txs_worker.rs @@ -27,25 +27,29 @@ pub fn execute( let tx_job_begin = SystemTime::now(); // Open databases let db_path = durs_conf::get_blockchain_db_path(profile_path); - let databases = CurrencyV10DBs::open(Some(&db_path)); + let db = open_db(db_path.as_path()).expect("Fail to open blockchain DB."); // Listen db requets let mut all_wait_duration = Duration::from_millis(0); let mut wait_begin = SystemTime::now(); - while let Ok(SyncJobsMess::CurrencyDBsWriteQuery(blockstamp, req)) = recv.recv() { + while let Ok(SyncJobsMess::CurrencyDBsWriteQuery { + in_fork_window, + req, + }) = recv.recv() + { all_wait_duration += SystemTime::now().duration_since(wait_begin).unwrap(); // Apply db request - req.apply(&blockstamp, &databases) - .expect("Fatal error : Fail to apply DBWriteRequest !"); + db.write(|mut w| { + req.apply(&db, &mut w, None, in_fork_window)?; + Ok(w) + }) + .expect("Fatal error : Fail to apply DBWriteRequest !"); wait_begin = SystemTime::now(); } - // Save tx, utxo, du and balances databases - info!("Save tx and sources database in file..."); - databases.save_dbs(true, true); // Send finish signal sender_sync_thread - .send(MessForSyncThread::ApplyFinish()) + .send(MessForSyncThread::ApplyFinish(None)) .expect("Fatal error : sync_thread unrechable !"); let tx_job_duration = SystemTime::now().duration_since(tx_job_begin).unwrap() - all_wait_duration; diff --git a/lib/modules/blockchain/blockchain/src/sync/apply/wot_worker.rs b/lib/modules/blockchain/blockchain/src/sync/apply/wot_worker.rs index 5435ec9a0ebefb171637b25b494a8f0eaef949a0..8c78e91a9ae7e64576e585875acfa43e6528550d 100644 --- a/lib/modules/blockchain/blockchain/src/sync/apply/wot_worker.rs +++ b/lib/modules/blockchain/blockchain/src/sync/apply/wot_worker.rs @@ -37,9 +37,19 @@ pub fn execute( while let Ok(mess) = recv.recv() { all_wait_duration += SystemTime::now().duration_since(wait_begin).unwrap(); match mess { - SyncJobsMess::WotsDBsWriteQuery(blockstamp, currency_params, req) => req - .apply(&blockstamp, ¤cy_params.deref(), &databases, &db) - .expect("Fatal error : Fail to apply DBWriteRequest !"), + SyncJobsMess::WotsDBsWriteQuery(blockstamp, currency_params, req) => { + db.write(|mut w| { + req.apply( + &db, + &mut w, + &blockstamp, + ¤cy_params.deref(), + &databases, + )?; + Ok(w) + }) + .expect("Fatal error : Fail to apply DBWriteRequest !"); + } SyncJobsMess::End => break, _ => {} } @@ -51,7 +61,7 @@ pub fn execute( // Send finish signal sender_sync_thread - .send(MessForSyncThread::ApplyFinish()) + .send(MessForSyncThread::ApplyFinish(None)) .expect("Fatal error : sync_thread unrechable !"); let wot_job_duration = SystemTime::now().duration_since(wot_job_begin).unwrap() - all_wait_duration; diff --git a/lib/modules/blockchain/blockchain/src/sync/mod.rs b/lib/modules/blockchain/blockchain/src/sync/mod.rs index 22508f607bea13fed1b64df7e7fc4db97853e869..6d8e7d5d76eff80047ac8b9792a521455311b1fb 100644 --- a/lib/modules/blockchain/blockchain/src/sync/mod.rs +++ b/lib/modules/blockchain/blockchain/src/sync/mod.rs @@ -48,13 +48,13 @@ pub struct BlockHeader { pub issuer: PubKey, } -#[derive(Debug, Clone)] +#[derive(Debug)] /// Message for main sync thread pub enum MessForSyncThread { Target(CurrencyName, Blockstamp), BlockDocument(BlockDocument), DownloadFinish(), - ApplyFinish(), + ApplyFinish(Option<Db>), } #[derive(Debug)] @@ -63,7 +63,10 @@ pub enum SyncJobsMess { ForkWindowSize(usize), // informs block worker of fork window size BlocksDBsWriteQuery(BlocksDBsWriteQuery), WotsDBsWriteQuery(Blockstamp, Box<CurrencyParameters>, WotsDBsWriteQuery), - CurrencyDBsWriteQuery(Blockstamp, CurrencyDBsWriteQuery), + CurrencyDBsWriteQuery { + in_fork_window: bool, + req: CurrencyDBsWriteQuery, + }, End, } @@ -263,6 +266,7 @@ pub fn local_sync<DC: DursConfTrait>( // Open databases let dbs_path = durs_conf::get_blockchain_db_path(profile_path.clone()); + let db = open_db(dbs_path.as_path()).expect("Fail to open blockchain DB."); let certs_db = BinFreeStructDb::Mem( open_free_struct_memory_db::<CertsExpirV10Datas>().expect("Fail to create memory certs_db"), ); @@ -273,6 +277,7 @@ pub fn local_sync<DC: DursConfTrait>( currency, currency_params: None, dbs_path, + db: Some(db), verif_inner_hash: !unsafe_mode, target_blockstamp, current_blockstamp, @@ -379,15 +384,29 @@ pub fn local_sync<DC: DursConfTrait>( // Wait recv two finish signals let mut wait_jobs = *NB_SYNC_JOBS - 1; + let mut db = None; while wait_jobs > 0 { match recv_sync_thread.recv() { - Ok(MessForSyncThread::ApplyFinish()) => wait_jobs -= 1, + Ok(MessForSyncThread::ApplyFinish(db_opt)) => { + if db.is_none() { + db = db_opt; + } + wait_jobs -= 1; + } Ok(_) => thread::sleep(Duration::from_millis(50)), Err(_) => wait_jobs -= 1, } } info!("All sync jobs finish."); + // Save blockchain DB + if let Some(db) = db { + db.save() + .unwrap_or_else(|_| fatal_error!("DB corrupted, please reset data.")); + } else { + fatal_error!("Dev error: sync workers didn't return the DB.") + } + // Log sync duration debug!("certs_count={}", block_applicator.certs_count); let sync_duration = SystemTime::now().duration_since(sync_start_time).unwrap(); diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs index ff7d51baeb465f58479e8ad0c5d2a5c4135bbe3d..342fcdca79d7de62ec4f43d73ef13f2a8905216b 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs @@ -159,7 +159,7 @@ impl Handler for Client { let s: String = msg .into_text() .expect("WS2P: Fail to convert message payload to String !"); - debug!("WS2P: receive mess: {}", s); + trace!("WS2P: receive mess: {}", s); let json_message: serde_json::Value = serde_json::from_str(&s) .expect("WS2P: Fail to convert string message ton json value !"); let result = self diff --git a/lib/tools/dbs-tools/src/kv_db/file.rs b/lib/tools/dbs-tools/src/kv_db/file.rs index ea96f19be4b205926a440228ae6aa2b727c72d53..b810704a7f6914fb1d5e56ea473d4eb213a0c190 100644 --- a/lib/tools/dbs-tools/src/kv_db/file.rs +++ b/lib/tools/dbs-tools/src/kv_db/file.rs @@ -48,6 +48,7 @@ impl<'a> AsMut<rkv::Writer<'a>> for KvFileDbWriter<'a> { } /// Key-value file Database handler +#[derive(Debug)] pub struct KvFileDbHandler { arc: Arc<RwLock<Rkv>>, path: PathBuf, @@ -202,6 +203,17 @@ pub enum KvFileDbStore { MultiIntKey(super::MultiIntegerStore<u32>), } +impl Debug for KvFileDbStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Single(_) => write!(f, "KvFileDbStore::Single ()"), + Self::SingleIntKey(_) => write!(f, "KvFileDbStore::SingleIntKey ()"), + Self::Multi(_) => write!(f, "KvFileDbStore::Multi ()"), + Self::MultiIntKey(_) => write!(f, "KvFileDbStore::MultiIntKey ()"), + } + } +} + impl KvFileDbRead for KvFileDbHandler { #[inline] fn from_db_value<T: DeserializeOwned>(v: Value) -> Result<T, DbError> {