From 6ce6036433108f4d9bcfa34fc9be2e2b2788e271 Mon Sep 17 00:00:00 2001 From: librelois <c@elo.tf> Date: Sun, 21 Mar 2021 00:36:23 +0100 Subject: [PATCH] [feat] server: create global cache --- Cargo.lock | 38 +++- Cargo.toml | 1 + neon/native/src/server.rs | 1 + rust-bins/duniter-dbex/src/migrate.rs | 2 +- rust-libs/duniter-bc-reader/src/lib.rs | 10 +- rust-libs/duniter-dbs-write-ops/Cargo.toml | 2 + .../duniter-dbs-write-ops/src/apply_block.rs | 30 ++- rust-libs/duniter-dbs-write-ops/src/cm.rs | 51 +++-- rust-libs/duniter-dbs-write-ops/src/lib.rs | 2 +- rust-libs/duniter-dbs/src/databases/cm_v1.rs | 9 +- rust-libs/duniter-global/Cargo.toml | 18 ++ rust-libs/duniter-global/src/lib.rs | 155 ++++++++++++++ rust-libs/duniter-module/Cargo.toml | 2 + rust-libs/duniter-module/src/lib.rs | 12 +- rust-libs/duniter-server/Cargo.toml | 4 +- rust-libs/duniter-server/src/fill_cm.rs | 47 +++++ .../src/legacy/block_indexer.rs | 2 + rust-libs/duniter-server/src/legacy/dunp.rs | 11 +- .../duniter-server/src/legacy/txs_mempool.rs | 18 +- rust-libs/duniter-server/src/lib.rs | 58 +++--- rust-libs/modules/gva/Cargo.toml | 1 + rust-libs/modules/gva/bca/Cargo.toml | 3 + .../modules/gva/bca/src/exec_req_type.rs | 9 + .../gva/bca/src/exec_req_type/balances.rs | 39 ++++ .../last_blockstamp_out_of_fork_window.rs | 52 +++-- .../bca/src/exec_req_type/members_count.rs | 43 ++-- .../exec_req_type/prepare_simple_payment.rs | 110 +++++----- .../gva/bca/src/exec_req_type/utxos.rs | 58 ++++++ rust-libs/modules/gva/bca/src/lib.rs | 49 +++-- rust-libs/modules/gva/bca/types/Cargo.toml | 1 + rust-libs/modules/gva/bca/types/src/amount.rs | 46 ++++ rust-libs/modules/gva/bca/types/src/lib.rs | 39 +++- .../gva/bca/types/src/prepare_payment.rs | 4 +- .../gva/bca/types/src/utxo.rs} | 18 +- rust-libs/modules/gva/dbs-reader/Cargo.toml | 1 + .../gva/dbs-reader/src/current_frame.rs | 23 +- rust-libs/modules/gva/dbs-reader/src/lib.rs | 43 ++-- rust-libs/modules/gva/dbs-reader/src/utxos.rs | 54 ++--- rust-libs/modules/gva/gql/Cargo.toml | 2 + rust-libs/modules/gva/gql/src/lib.rs | 12 +- rust-libs/modules/gva/gql/src/queries.rs | 15 +- .../gva/gql/src/queries/account_balance.rs | 4 +- .../modules/gva/gql/src/queries/block.rs | 4 +- .../gva/gql/src/queries/current_block.rs | 34 ++- .../gva/gql/src/queries/current_frame.rs | 42 +++- .../gql/src/queries/first_utxos_of_scripts.rs | 2 +- .../modules/gva/gql/src/queries/gen_tx.rs | 196 +++++++++--------- rust-libs/modules/gva/gql/src/queries/idty.rs | 2 +- .../gva/gql/src/queries/txs_history.rs | 2 +- rust-libs/modules/gva/gql/src/queries/uds.rs | 134 ++++++------ .../gva/gql/src/queries/utxos_of_script.rs | 93 +++++---- rust-libs/modules/gva/gql/src/schema.rs | 8 + .../gva/gql/src/subscriptions/new_blocks.rs | 82 ++------ rust-libs/modules/gva/src/lib.rs | 3 + 54 files changed, 1089 insertions(+), 612 deletions(-) create mode 100644 rust-libs/duniter-global/Cargo.toml create mode 100644 rust-libs/duniter-global/src/lib.rs create mode 100644 rust-libs/duniter-server/src/fill_cm.rs create mode 100644 rust-libs/modules/gva/bca/src/exec_req_type/balances.rs create mode 100644 rust-libs/modules/gva/bca/src/exec_req_type/utxos.rs create mode 100644 rust-libs/modules/gva/bca/types/src/amount.rs rename rust-libs/{duniter-server/src/fill_cm_db.rs => modules/gva/bca/types/src/utxo.rs} (62%) diff --git a/Cargo.lock b/Cargo.lock index f0b0019f2..1494e7ae0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,9 @@ name = "arrayvec" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +dependencies = [ + "serde", +] [[package]] name = "async-attributes" @@ -282,6 +285,16 @@ dependencies = [ "futures-micro", ] +[[package]] +name = "async-rwlock" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261803dcc39ba9e72760ba6e16d0199b1eef9fc44e81bffabbebb9f5aea3906c" +dependencies = [ + "async-mutex", + "event-listener", +] + [[package]] name = "async-std" version = "1.6.5" @@ -1154,12 +1167,14 @@ name = "duniter-bca" version = "0.1.0" dependencies = [ "anyhow", + "arrayvec", "async-bincode", "async_io_stream", "bincode", "dubp", "duniter-bca-types", "duniter-dbs", + "duniter-global", "duniter-gva-db", "duniter-gva-dbs-reader", "duniter-mempools", @@ -1176,6 +1191,7 @@ dependencies = [ name = "duniter-bca-types" version = "0.1.0" dependencies = [ + "arrayvec", "bincode", "dubp", "serde", @@ -1249,13 +1265,28 @@ dependencies = [ "chrono", "dubp", "duniter-dbs", + "duniter-global", "fast-threadpool", + "flume", "log", "maplit", "resiter", "serde_json", ] +[[package]] +name = "duniter-global" +version = "1.8.1" +dependencies = [ + "async-rwlock", + "dubp", + "duniter-dbs", + "flume", + "mockall", + "once_cell", + "tokio", +] + [[package]] name = "duniter-gva" version = "0.1.0" @@ -1270,6 +1301,7 @@ dependencies = [ "duniter-bca", "duniter-conf", "duniter-dbs", + "duniter-global", "duniter-gva-db", "duniter-gva-dbs-reader", "duniter-gva-gql", @@ -1315,6 +1347,7 @@ dependencies = [ "anyhow", "arrayvec", "dubp", + "duniter-bca-types", "duniter-dbs", "duniter-gva-db", "maplit", @@ -1336,6 +1369,7 @@ dependencies = [ "duniter-bc-reader", "duniter-conf", "duniter-dbs", + "duniter-global", "duniter-gva-db", "duniter-gva-dbs-reader", "duniter-mempools", @@ -1425,8 +1459,10 @@ dependencies = [ "dubp", "duniter-conf", "duniter-dbs", + "duniter-global", "duniter-mempools", "fast-threadpool", + "log", "paste", "tokio", ] @@ -1442,6 +1478,7 @@ dependencies = [ "duniter-conf", "duniter-dbs", "duniter-dbs-write-ops", + "duniter-global", "duniter-gva", "duniter-mempools", "duniter-module", @@ -1450,7 +1487,6 @@ dependencies = [ "log", "paste", "resiter", - "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f88ec5eae..aa040bba0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ members = [ "rust-libs/duniter-mempools", "rust-libs/duniter-module", "rust-libs/duniter-server", + "rust-libs/duniter-global", "rust-libs/modules/gva", "rust-libs/modules/gva/bca", "rust-libs/modules/gva/bca/types", diff --git a/neon/native/src/server.rs b/neon/native/src/server.rs index 438687493..4036b565e 100644 --- a/neon/native/src/server.rs +++ b/neon/native/src/server.rs @@ -142,6 +142,7 @@ declare_types! { let server = this.borrow(&guard); server.server.get_self_endpoints() }.map(|endpoints| { + log::info!("TMP DEBUG get_self_endpoints={:?}", endpoints); let js_array = JsArray::new(&mut cx, endpoints.len() as u32); for (i, ep) in endpoints.iter().enumerate() { let js_string = cx.string(ep); diff --git a/rust-bins/duniter-dbex/src/migrate.rs b/rust-bins/duniter-dbex/src/migrate.rs index e3e613a58..a3312ae1a 100644 --- a/rust-bins/duniter-dbex/src/migrate.rs +++ b/rust-bins/duniter-dbex/src/migrate.rs @@ -119,7 +119,7 @@ fn migrate_inner( }) .expect("gva:apply_chunk: dbs pool disconnected"); current = Some(duniter_dbs_write_ops::apply_block::apply_chunk( - bc_db, current, &dbs_pool, chunk, + bc_db, current, &dbs_pool, chunk, None, )?); gva_handle .join() diff --git a/rust-libs/duniter-bc-reader/src/lib.rs b/rust-libs/duniter-bc-reader/src/lib.rs index a164f56f4..68ae0c41f 100644 --- a/rust-libs/duniter-bc-reader/src/lib.rs +++ b/rust-libs/duniter-bc-reader/src/lib.rs @@ -23,15 +23,7 @@ )] use dubp::crypto::hashs::Hash; -use duniter_dbs::{ - databases::{bc_v2::BcV2DbReadable, cm_v1::CmV1DbReadable}, - kv_typed::prelude::*, - BlockMetaV2, HashKeyV2, -}; - -pub fn get_current_block_meta<CmDb: CmV1DbReadable>(cm_db: &CmDb) -> KvResult<Option<BlockMetaV2>> { - cm_db.current_block_meta().get(&()) -} +use duniter_dbs::{databases::bc_v2::BcV2DbReadable, kv_typed::prelude::*, HashKeyV2}; pub fn tx_exist<BcDb: BcV2DbReadable>(bc_db_ro: &BcDb, hash: Hash) -> KvResult<bool> { Ok(bc_db_ro.txs_hashs().contains_key(&HashKeyV2(hash))?) diff --git a/rust-libs/duniter-dbs-write-ops/Cargo.toml b/rust-libs/duniter-dbs-write-ops/Cargo.toml index f5002147d..81f6e117c 100644 --- a/rust-libs/duniter-dbs-write-ops/Cargo.toml +++ b/rust-libs/duniter-dbs-write-ops/Cargo.toml @@ -15,7 +15,9 @@ path = "src/lib.rs" chrono = "0.4.19" dubp = { version = "0.50.0", features = ["duniter"] } duniter-dbs = { path = "../duniter-dbs" } +duniter-global = { path = "../duniter-global" } fast-threadpool = "0.2.3" +flume = "0.10" log = "0.4.11" resiter = "0.4.0" diff --git a/rust-libs/duniter-dbs-write-ops/src/apply_block.rs b/rust-libs/duniter-dbs-write-ops/src/apply_block.rs index 83d89c68b..7d7e4511c 100644 --- a/rust-libs/duniter-dbs-write-ops/src/apply_block.rs +++ b/rust-libs/duniter-dbs-write-ops/src/apply_block.rs @@ -20,11 +20,12 @@ pub fn apply_block( block: Arc<DubpBlockV10>, current_opt: Option<BlockMetaV2>, dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>, + global_sender: &flume::Sender<GlobalBackGroundTaskMsg>, throw_chainability: bool, ) -> KvResult<BlockMetaV2> { if let Some(current) = current_opt { if block.number().0 == current.number + 1 { - apply_block_inner(bc_db, dbs_pool, block) + apply_block_inner(bc_db, dbs_pool, block, global_sender) } else if throw_chainability { Err(KvError::Custom( format!( @@ -38,7 +39,7 @@ pub fn apply_block( Ok(current) } } else if block.number() == BlockNumber(0) { - apply_block_inner(bc_db, dbs_pool, block) + apply_block_inner(bc_db, dbs_pool, block, global_sender) } else { Err(KvError::Custom( "Try to apply non genesis block on empty blockchain".into(), @@ -52,9 +53,10 @@ pub fn apply_chunk( current_opt: Option<BlockMetaV2>, dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>, blocks: Arc<[DubpBlockV10]>, + global_sender: Option<&flume::Sender<GlobalBackGroundTaskMsg>>, ) -> KvResult<BlockMetaV2> { verify_chunk_chainability(current_opt, &blocks)?; - apply_chunk_inner(bc_db, dbs_pool, blocks) + apply_chunk_inner(bc_db, dbs_pool, blocks, global_sender) } fn verify_chunk_chainability( @@ -105,17 +107,13 @@ fn apply_block_inner( bc_db: &BcV2Db<FileBackend>, dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>, block: Arc<DubpBlockV10>, + global_sender: &flume::Sender<GlobalBackGroundTaskMsg>, ) -> KvResult<BlockMetaV2> { let block_for_cm = Arc::clone(&block); let block_for_txs_mp = Arc::clone(&block); // Cm - let cm_handle = dbs_pool - .launch(move |dbs| { - crate::cm::apply_block(&block_for_cm, &dbs.cm_db)?; - Ok::<_, KvError>(()) - }) - .expect("dbs pool disconnected"); + crate::cm::update_current_meta(&block_for_cm, &global_sender); //TxsMp let txs_mp_handle = dbs_pool @@ -128,7 +126,6 @@ fn apply_block_inner( // Bc let new_current = crate::bc::apply_block(bc_db, &block)?; - cm_handle.join().expect("dbs pool disconnected")?; txs_mp_handle.join().expect("dbs pool disconnected")?; Ok(new_current) @@ -138,18 +135,16 @@ fn apply_chunk_inner( bc_db: &BcV2Db<FileBackend>, dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>, blocks: Arc<[DubpBlockV10]>, + global_sender: Option<&flume::Sender<GlobalBackGroundTaskMsg>>, ) -> KvResult<BlockMetaV2> { let blocks_len = blocks.len(); - let blocks_for_cm = Arc::clone(&blocks); let blocks_for_txs_mp = Arc::clone(&blocks); // Cm - let cm_handle = dbs_pool - .launch(move |dbs| { - let chunk_len = blocks_for_cm.len(); - crate::cm::apply_block(&blocks_for_cm.deref()[chunk_len - 1], &dbs.cm_db) - }) - .expect("dbs pool disconnected"); + if let Some(global_sender) = global_sender { + let chunk_len = blocks.len(); + crate::cm::update_current_meta(&&blocks.deref()[chunk_len - 1], &global_sender); + } //TxsMp //log::info!("apply_chunk: launch txs_mp job..."); @@ -169,7 +164,6 @@ fn apply_chunk_inner( } let current_block = crate::bc::apply_block(bc_db, &blocks[blocks_len - 1])?; - cm_handle.join().expect("dbs pool disconnected")?; txs_mp_handle .join() .expect("txs_mp_recv: dbs pool disconnected")?; diff --git a/rust-libs/duniter-dbs-write-ops/src/cm.rs b/rust-libs/duniter-dbs-write-ops/src/cm.rs index 1cc1e75a7..5d9a61a27 100644 --- a/rust-libs/duniter-dbs-write-ops/src/cm.rs +++ b/rust-libs/duniter-dbs-write-ops/src/cm.rs @@ -14,27 +14,38 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::*; -use duniter_dbs::databases::bc_v2::BcV2DbReadable; -use duniter_dbs::BlockDbV2; -pub fn init(bc_db: &BcV2Db<FileBackend>, cm_db: &CmV1Db<MemSingleton>) -> KvResult<()> { - if let Some(current_block_meta) = bc_db - .blocks_meta() - .iter_rev(.., |it| it.values().next_res())? - { - cm_db - .current_block_meta_write() - .upsert((), current_block_meta) - } else { - Ok(()) - } +#[inline(always)] +pub(crate) fn update_current_meta( + block: &DubpBlockV10, + global_sender: &flume::Sender<GlobalBackGroundTaskMsg>, +) { + let current_block_meta = block_to_block_meta(block); + global_sender + .send(GlobalBackGroundTaskMsg::NewCurrentBlock(current_block_meta)) + .expect("global task disconnected"); } -pub fn apply_block(block: &DubpBlockV10, cm_db: &CmV1Db<MemSingleton>) -> KvResult<()> { - let block_meta = BlockMetaV2::from(block); - cm_db.current_block_meta_write().upsert((), block_meta)?; - cm_db - .current_block_write() - .upsert((), BlockDbV2(block.clone()))?; - Ok(()) +fn block_to_block_meta(block: &DubpBlockV10) -> BlockMetaV2 { + BlockMetaV2 { + version: 10, + number: block.number().0, + hash: block.hash().0, + signature: block.signature(), + inner_hash: block.inner_hash(), + previous_hash: block.previous_hash(), + issuer: block.issuer(), + previous_issuer: dubp::crypto::keys::ed25519::PublicKey::default(), + time: block.local_time(), + pow_min: block.pow_min() as u32, + members_count: block.members_count() as u64, + issuers_count: block.issuers_count() as u32, + issuers_frame: block.issuers_frame() as u64, + issuers_frame_var: 0, + median_time: block.common_time(), + nonce: block.nonce(), + monetary_mass: block.monetary_mass(), + unit_base: block.unit_base() as u32, + dividend: block.dividend(), + } } diff --git a/rust-libs/duniter-dbs-write-ops/src/lib.rs b/rust-libs/duniter-dbs-write-ops/src/lib.rs index 84410e92e..b112bce01 100644 --- a/rust-libs/duniter-dbs-write-ops/src/lib.rs +++ b/rust-libs/duniter-dbs-write-ops/src/lib.rs @@ -40,13 +40,13 @@ use dubp::wallet::prelude::*; use duniter_dbs::{ databases::{ bc_v2::BcV2Db, - cm_v1::{CmV1Db, CmV1DbWritable}, txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable, TxsMpV2DbWritable}, }, kv_typed::prelude::*, BlockMetaV2, FileBackend, HashKeyV2, PendingTxDbV2, PubKeyKeyV2, PubKeyValV2, SharedDbs, SourceAmountValV2, UtxoValV2, WalletConditionsV2, }; +use duniter_global::GlobalBackGroundTaskMsg; use resiter::filter_map::FilterMap; use resiter::flatten::Flatten; use resiter::map::Map; diff --git a/rust-libs/duniter-dbs/src/databases/cm_v1.rs b/rust-libs/duniter-dbs/src/databases/cm_v1.rs index e2bab4056..4cea86827 100644 --- a/rust-libs/duniter-dbs/src/databases/cm_v1.rs +++ b/rust-libs/duniter-dbs/src/databases/cm_v1.rs @@ -15,11 +15,4 @@ use crate::*; -db_schema!( - CmV1, - [ - ["self_peer_old", SelfPeerOld, (), PeerCardDbV1], - ["current_block_meta", CurrentBlockMeta, (), BlockMetaV2], - ["current_block", CurrentBlock, (), BlockDbV2], - ] -); +db_schema!(CmV1, [["current_block", CurrentBlock, (), BlockDbV2],]); diff --git a/rust-libs/duniter-global/Cargo.toml b/rust-libs/duniter-global/Cargo.toml new file mode 100644 index 000000000..55ee8946c --- /dev/null +++ b/rust-libs/duniter-global/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "duniter-global" +version = "1.8.1" +authors = ["librelois <elois@duniter.org>"] +license = "AGPL-3.0" +edition = "2018" + +[dependencies] +async-rwlock = "1.3.0" +dubp = { version = "0.50.0", features = ["duniter"] } +duniter-dbs = { path = "../duniter-dbs" } +flume = "0.10" +mockall = { version = "0.9", optional = true } +once_cell = "1.5" +tokio = { version = "1.2", features = ["io-util", "rt-multi-thread"] } + +[features] +mock = ["mockall"] diff --git a/rust-libs/duniter-global/src/lib.rs b/rust-libs/duniter-global/src/lib.rs new file mode 100644 index 000000000..ddfd94679 --- /dev/null +++ b/rust-libs/duniter-global/src/lib.rs @@ -0,0 +1,155 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +#![deny( + clippy::unwrap_used, + missing_copy_implementations, + trivial_casts, + trivial_numeric_casts, + unstable_features, + unused_import_braces +)] + +pub use tokio; + +use async_rwlock::RwLock; +use dubp::wallet::prelude::SourceAmount; +use duniter_dbs::BlockMetaV2; +use once_cell::sync::OnceCell; +use std::ops::Deref; + +pub static SELF_ENDPOINTS: RwLock<Option<Vec<String>>> = RwLock::new(None); + +static ASYNC_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new(); +static CURRENT_META: RwLock<Option<CurrentMeta>> = RwLock::new(None); +static SELF_PEER_OLD: RwLock<Option<duniter_dbs::PeerCardDbV1>> = RwLock::new(None); + +#[derive(Clone, Copy, Debug, Default)] +pub struct CurrentMeta { + pub current_ud: SourceAmount, + pub current_block_meta: BlockMetaV2, +} + +#[derive(Clone, Debug)] +pub enum GlobalBackGroundTaskMsg { + InitCurrentMeta(CurrentMeta), + NewCurrentBlock(BlockMetaV2), + GetSelfEndpoints(flume::Sender<Option<Vec<String>>>), + SetSelfPeerOld(duniter_dbs::PeerCardDbV1), +} + +pub async fn start_global_background_task(recv: flume::Receiver<GlobalBackGroundTaskMsg>) { + tokio::spawn(async move { + while let Ok(msg) = recv.recv_async().await { + match msg { + GlobalBackGroundTaskMsg::InitCurrentMeta(current_meta) => { + let mut write_guard = CURRENT_META.write().await; + write_guard.replace(current_meta); + } + GlobalBackGroundTaskMsg::NewCurrentBlock(current_block_meta) => { + let upgradable_read_guard = CURRENT_META.upgradable_read().await; + let new_current_meta = if let Some(dividend) = current_block_meta.dividend { + CurrentMeta { + current_ud: dividend, + current_block_meta, + } + } else if let Some(current_meta) = upgradable_read_guard.deref() { + CurrentMeta { + current_ud: current_meta.current_ud, + current_block_meta, + } + } else { + CurrentMeta { + current_ud: SourceAmount::ZERO, + current_block_meta, + } + }; + let mut write_guard = + async_rwlock::RwLockUpgradableReadGuard::upgrade(upgradable_read_guard) + .await; + write_guard.replace(new_current_meta); + } + GlobalBackGroundTaskMsg::GetSelfEndpoints(sender) => { + let read_guard = SELF_ENDPOINTS.read().await; + let _ = sender.send_async(read_guard.deref().clone()).await; + } + GlobalBackGroundTaskMsg::SetSelfPeerOld(self_peer_old) => { + let mut write_guard = SELF_PEER_OLD.write().await; + write_guard.replace(self_peer_old); + } + } + } + }); +} + +pub fn get_async_runtime() -> &'static tokio::runtime::Runtime { + ASYNC_RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("fail to build tokio runtime") + }) +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct AsyncAccessor; + +impl AsyncAccessor { + pub fn new() -> Self { + AsyncAccessor + } + pub async fn get_current_meta<D: 'static, F: 'static + FnOnce(&CurrentMeta) -> D>( + &self, + f: F, + ) -> Option<D> { + let read_guard = CURRENT_META.read().await; + if let Some(current_meta) = read_guard.deref() { + Some(f(current_meta)) + } else { + None + } + } + pub async fn get_self_peer_old< + D: 'static, + F: 'static + FnOnce(&duniter_dbs::PeerCardDbV1) -> D, + >( + &self, + f: F, + ) -> Option<D> { + let read_guard = SELF_PEER_OLD.read().await; + if let Some(self_peer_old) = read_guard.deref() { + Some(f(self_peer_old)) + } else { + None + } + } +} + +#[cfg(feature = "mock")] +mockall::mock! { + pub AsyncAccessor { + pub async fn get_current_meta<D: 'static, F: 'static + FnOnce(&CurrentMeta) -> D>( + &self, + f: F, + ) -> Option<D>; + pub async fn get_self_peer_old< + D: 'static, + F: 'static + FnOnce(&duniter_dbs::PeerCardDbV1) -> D, + >( + &self, + f: F, + ) -> Option<D>; + } +} diff --git a/rust-libs/duniter-module/Cargo.toml b/rust-libs/duniter-module/Cargo.toml index 480b97660..681c269e7 100644 --- a/rust-libs/duniter-module/Cargo.toml +++ b/rust-libs/duniter-module/Cargo.toml @@ -11,8 +11,10 @@ async-trait = "0.1.41" dubp = { version = "0.50.0", features = ["duniter"] } duniter-conf = { path = "../duniter-conf" } duniter-dbs = { path = "../duniter-dbs" } +duniter-global = { path = "../duniter-global" } duniter-mempools = { path = "../duniter-mempools" } fast-threadpool = "0.2.3" +log = "0.4" [dev-dependencies] duniter-dbs = { path = "../duniter-dbs", features = ["mem"] } diff --git a/rust-libs/duniter-module/src/lib.rs b/rust-libs/duniter-module/src/lib.rs index 9cf718835..79060eaba 100644 --- a/rust-libs/duniter-module/src/lib.rs +++ b/rust-libs/duniter-module/src/lib.rs @@ -216,16 +216,8 @@ macro_rules! plug_duniter_modules { all_endpoints.append(&mut endpoints); )* - let self_peer = duniter_dbs::PeerCardDbV1 { - version: 10, - currency, - endpoints: all_endpoints, - ..Default::default() - }; - - use duniter_dbs::databases::cm_v1::CmV1DbWritable as _; - use duniter_dbs::kv_typed::prelude::DbCollectionRw as _; - dbs_pool.execute(|dbs| dbs.cm_db.self_peer_old_write().upsert((), self_peer)).await?.context("fail to save self peer card")?; + log::info!("TMP DEBUG SELF_ENDPOINTS={:?}", all_endpoints); + duniter_global::SELF_ENDPOINTS.write().await.replace(all_endpoints); $( let [<$M:snake _handle>] = tokio::spawn([<$M:snake>].start()); diff --git a/rust-libs/duniter-server/Cargo.toml b/rust-libs/duniter-server/Cargo.toml index bb43f4deb..9189f1844 100644 --- a/rust-libs/duniter-server/Cargo.toml +++ b/rust-libs/duniter-server/Cargo.toml @@ -10,9 +10,10 @@ anyhow = "1.0.34" cfg-if = "1.0.0" dubp = { version = "0.50.0", features = ["duniter"] } duniter-conf = { path = "../duniter-conf" } -duniter-dbs = { path = "../duniter-dbs" } duniter-bc-reader = { path = "../duniter-bc-reader" } +duniter-dbs = { path = "../duniter-dbs" } duniter-dbs-write-ops = { path = "../duniter-dbs-write-ops" } +duniter-global = { path = "../duniter-global" } duniter-gva = { path = "../modules/gva", optional = true } duniter-mempools = { path = "../duniter-mempools" } duniter-module = { path = "../duniter-module" } @@ -21,7 +22,6 @@ flume = "0.10.0" log = "0.4.11" paste = "1.0.2" resiter = "0.4.0" -tokio = { version = "1.2", features = ["io-util", "rt-multi-thread"] } [features] default = ["gva"] diff --git a/rust-libs/duniter-server/src/fill_cm.rs b/rust-libs/duniter-server/src/fill_cm.rs new file mode 100644 index 000000000..90b7aa96d --- /dev/null +++ b/rust-libs/duniter-server/src/fill_cm.rs @@ -0,0 +1,47 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use dubp::wallet::prelude::SourceAmount; +use duniter_dbs::databases::bc_v2::BcV2DbReadable; +use duniter_global::{CurrentMeta, GlobalBackGroundTaskMsg}; + +pub(super) fn fill_and_get_current_meta<BcDb: BcV2DbReadable>( + bc_db_ro: &BcDb, + global_sender: &flume::Sender<GlobalBackGroundTaskMsg>, +) -> anyhow::Result<Option<BlockMetaV2>> { + if let Some(current_block_meta) = bc_db_ro + .blocks_meta() + .iter_rev(.., |it| it.values().next_res())? + { + if let Some(current_ud) = bc_db_ro + .uds_reval() + .iter_rev(.., |it| it.values().map_ok(|v| v.0).next_res())? + { + global_sender.send(GlobalBackGroundTaskMsg::InitCurrentMeta(CurrentMeta { + current_ud, + current_block_meta, + }))?; + } else { + global_sender.send(GlobalBackGroundTaskMsg::InitCurrentMeta(CurrentMeta { + current_ud: SourceAmount::ZERO, + current_block_meta, + }))?; + } + Ok(Some(current_block_meta)) + } else { + Ok(None) + } +} diff --git a/rust-libs/duniter-server/src/legacy/block_indexer.rs b/rust-libs/duniter-server/src/legacy/block_indexer.rs index 73d3a79f0..a50a6ab0a 100644 --- a/rust-libs/duniter-server/src/legacy/block_indexer.rs +++ b/rust-libs/duniter-server/src/legacy/block_indexer.rs @@ -25,6 +25,7 @@ impl DuniterServer { block.clone(), self.current, &self.dbs_pool, + &self.global_sender, false, )?); apply_block_modules(block, Arc::new(self.conf.clone()), &self.dbs_pool, None) @@ -43,6 +44,7 @@ impl DuniterServer { self.current, &self.dbs_pool, blocks.clone(), + Some(&self.global_sender), )?); apply_chunk_of_blocks_modules(blocks, Arc::new(self.conf.clone()), &self.dbs_pool, None) } diff --git a/rust-libs/duniter-server/src/legacy/dunp.rs b/rust-libs/duniter-server/src/legacy/dunp.rs index 3433c9112..f77f3e1c3 100644 --- a/rust-libs/duniter-server/src/legacy/dunp.rs +++ b/rust-libs/duniter-server/src/legacy/dunp.rs @@ -66,14 +66,9 @@ impl DuniterServer { .map_err(|e| e.into()) } pub fn update_self_peer(&self, new_peer_card: PeerCardDbV1) { - self.dbs_pool - .execute(move |dbs| { - dbs.cm_db - .self_peer_old_write() - .upsert((), new_peer_card) - .expect("fail to write on memory db") - }) - .expect("dbs pool disconnected") + self.global_sender + .send(GlobalBackGroundTaskMsg::SetSelfPeerOld(new_peer_card)) + .expect("global task disconnected"); } } diff --git a/rust-libs/duniter-server/src/legacy/txs_mempool.rs b/rust-libs/duniter-server/src/legacy/txs_mempool.rs index 528a45b60..be7229fe4 100644 --- a/rust-libs/duniter-server/src/legacy/txs_mempool.rs +++ b/rust-libs/duniter-server/src/legacy/txs_mempool.rs @@ -41,12 +41,18 @@ impl DuniterServer { .expect("dbs pool disconnected") } pub fn get_self_endpoints(&self) -> anyhow::Result<Vec<Endpoint>> { - if let Some(self_peer) = self - .dbs_pool - .execute(|dbs| dbs.cm_db.self_peer_old().get(&()))? - .context("fail to get self endpoints")? - { - Ok(self_peer.endpoints) + // Do not get rust endpoints on js tests + if std::env::var_os("DUNITER_JS_TESTS") != Some("yes".into()) { + let (sender, recv) = flume::bounded(1); + loop { + self.global_sender + .send(GlobalBackGroundTaskMsg::GetSelfEndpoints(sender.clone()))?; + if let Some(self_endpoints) = recv.recv()? { + break Ok(self_endpoints); + } else { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } } else { Ok(vec![]) } diff --git a/rust-libs/duniter-server/src/lib.rs b/rust-libs/duniter-server/src/lib.rs index 202335877..beb1d852d 100644 --- a/rust-libs/duniter-server/src/lib.rs +++ b/rust-libs/duniter-server/src/lib.rs @@ -22,7 +22,7 @@ unused_import_braces )] -mod fill_cm_db; +mod fill_cm; mod legacy; pub use duniter_conf::{gva_conf::GvaConf, DuniterConf, DuniterMode}; @@ -41,19 +41,16 @@ use dubp::{ block::prelude::*, common::crypto::hashs::Hash, documents_parser::prelude::FromStringObject, }; use duniter_dbs::{ - databases::{ - bc_v2::BcV2Db, - cm_v1::{CmV1DbReadable, CmV1DbWritable}, - txs_mp_v2::TxsMpV2DbReadable, - }, + databases::{bc_v2::BcV2Db, txs_mp_v2::TxsMpV2DbReadable}, kv_typed::prelude::*, PendingTxDbV2, PubKeyKeyV2, }; use duniter_dbs::{prelude::*, BlockMetaV2, FileBackend}; +use duniter_global::{tokio, GlobalBackGroundTaskMsg}; use duniter_mempools::{Mempools, TxMpError, TxsMempool}; use duniter_module::{plug_duniter_modules, Endpoint, TxsHistoryForBma}; use fast_threadpool::ThreadPoolConfig; -use resiter::filter::Filter; +use resiter::{filter::Filter, map::Map}; use std::{ collections::BTreeMap, path::{Path, PathBuf}, @@ -73,6 +70,7 @@ pub struct DuniterServer { conf: DuniterConf, current: Option<BlockMetaV2>, dbs_pool: fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>, + global_sender: flume::Sender<GlobalBackGroundTaskMsg>, pending_txs_subscriber: flume::Receiver<Arc<Events<duniter_dbs::databases::txs_mp_v2::TxsEvent>>>, profile_path_opt: Option<PathBuf>, @@ -98,10 +96,14 @@ impl DuniterServer { log::info!("open duniter databases..."); let (bc_db, shared_dbs) = duniter_dbs::open_dbs(profile_path_opt)?; shared_dbs.dunp_db.heads_old_write().clear()?; // Clear WS2Pv1 HEADs - duniter_dbs_write_ops::cm::init(&bc_db, &shared_dbs.cm_db)?; + + // Create channel with global async task + let (global_sender, global_recv) = flume::unbounded(); + + // Fill and get current meta + let current = fill_cm::fill_and_get_current_meta(&bc_db, &global_sender)?; log::info!("Databases successfully opened."); - let current = duniter_bc_reader::get_current_block_meta(&shared_dbs.cm_db) - .context("Fail to get current")?; + if let Some(current) = current { log::info!("Current block: #{}-{}", current.number, current.hash); } else { @@ -120,20 +122,19 @@ impl DuniterServer { let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), shared_dbs.clone()); - // Fill CmV1Db - fill_cm_db::fill_current_meta_db(&shared_dbs)?; - - if conf.gva.is_some() { - log::info!("start duniter modules..."); - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()?; - let conf_clone = conf.clone(); - let profile_path_opt_clone = profile_path_opt.map(ToOwned::to_owned); - let threadpool_async_handler = threadpool.async_handler(); - std::thread::spawn(move || { - runtime - .block_on(start_duniter_modules( + // Start async runtime + let conf_clone = conf.clone(); + let profile_path_opt_clone = profile_path_opt.map(ToOwned::to_owned); + let threadpool_async_handler = threadpool.async_handler(); + std::thread::spawn(move || { + duniter_global::get_async_runtime().block_on(async { + // Start global background task + duniter_global::start_global_background_task(global_recv).await; + + // Start duniter modules + if conf_clone.gva.is_some() { + log::info!("start duniter modules..."); + start_duniter_modules( &conf_clone, currency, threadpool_async_handler, @@ -141,16 +142,19 @@ impl DuniterServer { duniter_mode, profile_path_opt_clone, software_version, - )) - .context("Fail to start duniter modules") + ) + .await + .expect("Fail to start duniter modules"); + } }); - } + }); Ok(DuniterServer { bc_db, conf, current, dbs_pool: threadpool.into_sync_handler(), + global_sender, pending_txs_subscriber, profile_path_opt: profile_path_opt.map(ToOwned::to_owned), shared_dbs, diff --git a/rust-libs/modules/gva/Cargo.toml b/rust-libs/modules/gva/Cargo.toml index 5e74b7db7..5052cfb58 100644 --- a/rust-libs/modules/gva/Cargo.toml +++ b/rust-libs/modules/gva/Cargo.toml @@ -20,6 +20,7 @@ duniter-gva-db = { path = "./db" } duniter-gva-dbs-reader = { path = "./dbs-reader" } duniter-gva-indexer = { path = "./indexer" } duniter-gva-gql = { path = "./gql" } +duniter-global = { path = "../../duniter-global" } duniter-mempools = { path = "../../duniter-mempools" } duniter-module = { path = "../../duniter-module" } fast-threadpool = "0.2.3" diff --git a/rust-libs/modules/gva/bca/Cargo.toml b/rust-libs/modules/gva/bca/Cargo.toml index 4fcf042c1..273e60712 100644 --- a/rust-libs/modules/gva/bca/Cargo.toml +++ b/rust-libs/modules/gva/bca/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] anyhow = "1.0.33" +arrayvec = { version = "0.5.1", features = ["serde"] } async-bincode = "0.6.1" async_io_stream = { version = "0.3.1", features = [ "tokio_io"] } bincode = "1.3" @@ -15,6 +16,7 @@ duniter-bca-types = { path = "types", features = ["duniter"] } duniter-dbs = { path = "../../../duniter-dbs" } duniter-gva-db = { path = "../db" } duniter-gva-dbs-reader = { path = "../dbs-reader" } +duniter-global = { path = "../../../duniter-global" } duniter-mempools = { path = "../../../duniter-mempools" } fast-threadpool = "0.2.3" futures = "0.3.6" @@ -26,5 +28,6 @@ uninit = "0.4.0" [dev-dependencies] duniter-dbs = { path = "../../../duniter-dbs", features = ["mem"] } duniter-gva-dbs-reader = { path = "../dbs-reader", features = ["mock"] } +duniter-global = { path = "../../../duniter-global", features = ["mock"] } tokio = { version = "1.2", features = ["macros", "rt-multi-thread", "time"] } mockall = "0.9.1" diff --git a/rust-libs/modules/gva/bca/src/exec_req_type.rs b/rust-libs/modules/gva/bca/src/exec_req_type.rs index 08f1e1f21..b5b75d083 100644 --- a/rust-libs/modules/gva/bca/src/exec_req_type.rs +++ b/rust-libs/modules/gva/bca/src/exec_req_type.rs @@ -13,10 +13,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. +mod balances; mod last_blockstamp_out_of_fork_window; mod members_count; mod prepare_simple_payment; mod send_txs; +mod utxos; use dubp::crypto::keys::KeyPair; @@ -40,6 +42,13 @@ pub(super) async fn execute_req_type( _is_whitelisted: bool, ) -> Result<BcaRespTypeV0, ExecReqTypeError> { match req_type { + BcaReqTypeV0::BalancesOfPubkeys(pubkeys) => { + balances::exec_req_balances_of_pubkeys(bca_executor, pubkeys).await + } + BcaReqTypeV0::FirstUtxosOfPubkeys { + amount_target_opt, + pubkeys, + } => utxos::exec_req_first_utxos_of_pubkeys(bca_executor, amount_target_opt, pubkeys).await, BcaReqTypeV0::LastBlockstampOutOfForkWindow => { last_blockstamp_out_of_fork_window::exec_req_last_blockstamp_out_of_fork_window( bca_executor, diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/balances.rs b/rust-libs/modules/gva/bca/src/exec_req_type/balances.rs new file mode 100644 index 000000000..7a468df84 --- /dev/null +++ b/rust-libs/modules/gva/bca/src/exec_req_type/balances.rs @@ -0,0 +1,39 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use dubp::{crypto::keys::ed25519::PublicKey, wallet::prelude::WalletScriptV10}; + +pub(super) async fn exec_req_balances_of_pubkeys( + bca_executor: &BcaExecutor, + pubkeys: ArrayVec<[PublicKey; 16]>, +) -> Result<BcaRespTypeV0, ExecReqTypeError> { + let dbs_reader = bca_executor.dbs_reader(); + Ok(BcaRespTypeV0::Balances( + bca_executor + .dbs_pool + .execute(move |_| { + pubkeys + .into_iter() + .map(|pubkey| { + dbs_reader + .get_account_balance(&WalletScriptV10::single_sig(pubkey)) + .map(|balance_opt| balance_opt.map(|balance| balance.0)) + }) + .collect::<Result<ArrayVec<_>, _>>() + }) + .await??, + )) +} diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/last_blockstamp_out_of_fork_window.rs b/rust-libs/modules/gva/bca/src/exec_req_type/last_blockstamp_out_of_fork_window.rs index c3be288e5..41529563e 100644 --- a/rust-libs/modules/gva/bca/src/exec_req_type/last_blockstamp_out_of_fork_window.rs +++ b/rust-libs/modules/gva/bca/src/exec_req_type/last_blockstamp_out_of_fork_window.rs @@ -19,15 +19,19 @@ use dubp::common::prelude::*; pub(super) async fn exec_req_last_blockstamp_out_of_fork_window( bca_executor: &BcaExecutor, ) -> Result<BcaRespTypeV0, ExecReqTypeError> { - let dbs_reader = bca_executor.dbs_reader(); - bca_executor - .dbs_pool - .execute(move |dbs| { - if let Some(current_block) = dbs_reader.get_current_block_meta(&dbs.cm_db)? { - let block_ref_number = if current_block.number < 101 { + if let Some(current_block_number) = bca_executor + .cm_accessor + .get_current_meta(|cm| cm.current_block_meta.number) + .await + { + let dbs_reader = bca_executor.dbs_reader(); + bca_executor + .dbs_pool + .execute(move |dbs| { + let block_ref_number = if current_block_number < 101 { 0 } else { - current_block.number - 101 + current_block_number - 101 }; let block_ref_hash = dbs_reader .block(&dbs.bc_db_ro, U32BE(block_ref_number))? @@ -39,11 +43,11 @@ pub(super) async fn exec_req_last_blockstamp_out_of_fork_window( hash: BlockHash(block_ref_hash), }, )) - } else { - Err("no blockchain".into()) - } - }) - .await? + }) + .await? + } else { + Err("no blockchain".into()) + } } #[cfg(test)] @@ -53,12 +57,14 @@ mod tests { #[tokio::test] async fn test_exec_req_last_blockstamp_out_of_fork_window_no_blockchain() { - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + let mut cm_mock = MockAsyncAccessor::new(); + cm_mock + .expect_get_current_meta::<u32>() .times(1) - .returning(|_| Ok(None)); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + .returning(|_| None); + let dbs_reader = MockDbsReader::new(); + let bca_executor = + create_bca_executor(cm_mock, dbs_reader).expect("fail to create bca executor"); let resp_res = exec_req_last_blockstamp_out_of_fork_window(&bca_executor).await; @@ -67,17 +73,19 @@ mod tests { #[tokio::test] async fn test_exec_req_last_blockstamp_out_of_fork_window_ok() -> Result<(), ExecReqTypeError> { - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + let mut cm_mock = MockAsyncAccessor::new(); + cm_mock + .expect_get_current_meta::<u32>() .times(1) - .returning(|_| Ok(Some(BlockMetaV2::default()))); + .returning(|f| Some(f(&CurrentMeta::default()))); + let mut dbs_reader = MockDbsReader::new(); dbs_reader .expect_block() .times(1) .returning(|_, _| Ok(Some(BlockMetaV2::default()))); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + let bca_executor = + create_bca_executor(cm_mock, dbs_reader).expect("fail to create bca executor"); let resp = exec_req_last_blockstamp_out_of_fork_window(&bca_executor).await?; diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs b/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs index 07ebc2d99..71b85c6e3 100644 --- a/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs +++ b/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs @@ -14,20 +14,39 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::*; -use dubp::block::prelude::*; pub(super) async fn exec_req_members_count( bca_executor: &BcaExecutor, ) -> Result<BcaRespTypeV0, ExecReqTypeError> { - let dbs_reader = bca_executor.dbs_reader(); - Ok(bca_executor - .dbs_pool - .execute(move |dbs| match dbs_reader.get_current_block(&dbs.cm_db) { - Ok(Some(current_block)) => { - BcaRespTypeV0::MembersCount(current_block.members_count() as u64) - } - Ok(None) => BcaRespTypeV0::Error("no blockchain".to_owned()), - Err(e) => BcaRespTypeV0::Error(e.to_string()), - }) - .await?) + if let Some(members_count) = bca_executor + .cm_accessor + .get_current_meta(|cm| cm.current_block_meta.members_count) + .await + { + Ok(BcaRespTypeV0::MembersCount(members_count)) + } else { + Err("no blockchain".into()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::*; + + #[tokio::test] + async fn test_exec_req_members_count() { + let mut cm_mock = MockAsyncAccessor::new(); + cm_mock + .expect_get_current_meta::<u64>() + .times(1) + .returning(|f| Some(f(&CurrentMeta::default()))); + let dbs_reader = MockDbsReader::new(); + let bca_executor = + create_bca_executor(cm_mock, dbs_reader).expect("fail to create bca executor"); + + let resp_res = exec_req_members_count(&bca_executor).await; + + assert_eq!(resp_res, Ok(BcaRespTypeV0::MembersCount(0))); + } } diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs b/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs index 825be48be..58b8f94e8 100644 --- a/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs +++ b/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs @@ -21,24 +21,26 @@ pub(super) async fn exec_req_prepare_simple_payment( bca_executor: &BcaExecutor, params: PrepareSimplePayment, ) -> Result<BcaRespTypeV0, ExecReqTypeError> { - let mut amount = params.amount; let issuer = params.issuer; - let dbs_reader = bca_executor.dbs_reader(); - let (amount, block_ref_number, block_ref_hash, (inputs, inputs_sum)) = bca_executor - .dbs_pool - .execute(move |dbs| { - if let Some(current_block) = dbs_reader.get_current_block_meta(&dbs.cm_db)? { - let block_ref_number = if current_block.number < 101 { + if let Some(current_meta) = bca_executor.cm_accessor.get_current_meta(|cm| *cm).await { + let current_block_meta = current_meta.current_block_meta; + let current_ud = current_meta.current_ud; + let dbs_reader = bca_executor.dbs_reader(); + let (amount, block_ref_number, block_ref_hash, (inputs, inputs_sum)) = bca_executor + .dbs_pool + .execute(move |dbs| { + let mut amount = params.amount.to_cents(current_ud); + let block_ref_number = if current_block_meta.number < 101 { 0 } else { - current_block.number - 101 + current_block_meta.number - 101 }; let block_ref_hash = dbs_reader .block(&dbs.bc_db_ro, U32BE(block_ref_number))? .expect("unreachable") .hash; - let current_base = current_block.unit_base as i64; + let current_base = current_block_meta.unit_base as i64; if amount.base() > current_base { Err("too long base".into()) @@ -59,24 +61,24 @@ pub(super) async fn exec_req_prepare_simple_payment( )?, )) } - } else { - Err("no blockchain".into()) - } - }) - .await??; - - if inputs_sum < amount { - return Err("insufficient balance".into()); + }) + .await??; + + if inputs_sum < amount { + return Err("insufficient balance".into()); + } + + Ok(BcaRespTypeV0::PrepareSimplePayment( + PrepareSimplePaymentResp { + current_block_number: block_ref_number, + current_block_hash: block_ref_hash, + inputs, + inputs_sum, + }, + )) + } else { + Err("no blockchain".into()) } - - Ok(BcaRespTypeV0::PrepareSimplePayment( - PrepareSimplePaymentResp { - current_block_number: block_ref_number, - current_block_hash: block_ref_hash, - inputs, - inputs_sum, - }, - )) } #[cfg(test)] @@ -86,18 +88,20 @@ mod tests { #[tokio::test] async fn test_exec_req_prepare_simple_payment_no_blockchain() { - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<CurrentMeta>() .times(1) - .returning(|_| Ok(None)); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + .returning(|_| None); + let dbs_reader = MockDbsReader::new(); + let bca_executor = + create_bca_executor(mock_cm, dbs_reader).expect("fail to create bca executor"); let resp_res = exec_req_prepare_simple_payment( &bca_executor, PrepareSimplePayment { issuer: PublicKey::default(), - amount: SourceAmount::new(42, 0), + amount: Amount::Cents(SourceAmount::new(42, 0)), }, ) .await; @@ -107,22 +111,24 @@ mod tests { #[tokio::test] async fn test_exec_req_prepare_simple_payment_too_long_base() { - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<CurrentMeta>() .times(1) - .returning(|_| Ok(Some(BlockMetaV2::default()))); + .returning(|f| Some(f(&CurrentMeta::default()))); + let mut dbs_reader = MockDbsReader::new(); dbs_reader .expect_block() .times(1) .returning(|_, _| Ok(Some(BlockMetaV2::default()))); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + let bca_executor = + create_bca_executor(mock_cm, dbs_reader).expect("fail to create bca executor"); let resp_res = exec_req_prepare_simple_payment( &bca_executor, PrepareSimplePayment { issuer: PublicKey::default(), - amount: SourceAmount::new(42, 1), + amount: Amount::Cents(SourceAmount::new(42, 1)), }, ) .await; @@ -132,11 +138,12 @@ mod tests { #[tokio::test] async fn test_exec_req_prepare_simple_payment_insufficient_balance() { - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<CurrentMeta>() .times(1) - .returning(|_| Ok(Some(BlockMetaV2::default()))); + .returning(|f| Some(f(&CurrentMeta::default()))); + let mut dbs_reader = MockDbsReader::new(); dbs_reader .expect_block() .times(1) @@ -145,13 +152,14 @@ mod tests { .expect_find_inputs::<TxsMpV2Db<FileBackend>>() .times(1) .returning(|_, _, _, _, _| Ok((vec![], SourceAmount::default()))); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + let bca_executor = + create_bca_executor(mock_cm, dbs_reader).expect("fail to create bca executor"); let resp_res = exec_req_prepare_simple_payment( &bca_executor, PrepareSimplePayment { issuer: PublicKey::default(), - amount: SourceAmount::new(42, 0), + amount: Amount::Cents(SourceAmount::new(42, 0)), }, ) .await; @@ -172,11 +180,12 @@ mod tests { }), }; - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<CurrentMeta>() .times(1) - .returning(|_| Ok(Some(BlockMetaV2::default()))); + .returning(|f| Some(f(&CurrentMeta::default()))); + let mut dbs_reader = MockDbsReader::new(); dbs_reader .expect_block() .times(1) @@ -185,13 +194,14 @@ mod tests { .expect_find_inputs::<TxsMpV2Db<FileBackend>>() .times(1) .returning(move |_, _, _, _, _| Ok((vec![input], SourceAmount::with_base0(57)))); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + let bca_executor = + create_bca_executor(mock_cm, dbs_reader).expect("fail to create bca executor"); let resp = exec_req_prepare_simple_payment( &bca_executor, PrepareSimplePayment { issuer: PublicKey::default(), - amount: SourceAmount::new(42, 0), + amount: Amount::Cents(SourceAmount::new(42, 0)), }, ) .await?; diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/utxos.rs b/rust-libs/modules/gva/bca/src/exec_req_type/utxos.rs new file mode 100644 index 000000000..249ff1f48 --- /dev/null +++ b/rust-libs/modules/gva/bca/src/exec_req_type/utxos.rs @@ -0,0 +1,58 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use dubp::{crypto::keys::ed25519::PublicKey, wallet::prelude::WalletScriptV10}; + +pub(super) async fn exec_req_first_utxos_of_pubkeys( + bca_executor: &BcaExecutor, + amount_target_opt: Option<Amount>, + pubkeys: ArrayVec<[PublicKey; 16]>, +) -> Result<BcaRespTypeV0, ExecReqTypeError> { + if let Some(current_ud) = bca_executor + .cm_accessor + .get_current_meta(|cm| cm.current_ud) + .await + { + let dbs_reader = bca_executor.dbs_reader(); + let scripts: ArrayVec<[WalletScriptV10; 16]> = pubkeys + .into_iter() + .map(WalletScriptV10::single_sig) + .collect(); + if let Some(amount_target) = amount_target_opt { + Ok(BcaRespTypeV0::FirstUtxosOfPubkeys( + bca_executor + .dbs_pool + .execute(move |_| { + Ok::<_, ExecReqTypeError>(dbs_reader.first_scripts_utxos( + Some(amount_target.to_cents(current_ud)), + 40, + &scripts, + )?) + }) + .await??, + )) + } else { + Ok(BcaRespTypeV0::FirstUtxosOfPubkeys( + bca_executor + .dbs_pool + .execute(move |_| dbs_reader.first_scripts_utxos(None, 40, &scripts)) + .await??, + )) + } + } else { + Err("no blockchain".into()) + } +} diff --git a/rust-libs/modules/gva/bca/src/lib.rs b/rust-libs/modules/gva/bca/src/lib.rs index 1e202b7af..b1a1dc1a5 100644 --- a/rust-libs/modules/gva/bca/src/lib.rs +++ b/rust-libs/modules/gva/bca/src/lib.rs @@ -24,20 +24,28 @@ mod exec_req_type; +const MAX_BATCH_SIZE: usize = 10; const RESP_MIN_SIZE: usize = 64; type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>; use crate::exec_req_type::ExecReqTypeError; +#[cfg(test)] +use crate::tests::AsyncAccessor; +use arrayvec::ArrayVec; use async_bincode::AsyncBincodeReader; use async_io_stream::IoStream; use bincode::Options as _; use dubp::crypto::keys::{ed25519::Ed25519KeyPair, Signator}; use duniter_bca_types::{ - bincode_opts, BcaReq, BcaReqExecError, BcaReqTypeV0, BcaResp, BcaRespTypeV0, BcaRespV0, + amount::Amount, bincode_opts, BcaReq, BcaReqExecError, BcaReqTypeV0, BcaResp, BcaRespTypeV0, + BcaRespV0, }; pub use duniter_dbs::kv_typed::prelude::*; use duniter_dbs::{FileBackend, SharedDbs}; +#[cfg(not(test))] +use duniter_global::AsyncAccessor; use duniter_gva_dbs_reader::DbsReader; + use futures::{prelude::stream::FuturesUnordered, StreamExt, TryStream, TryStreamExt}; use once_cell::sync::OnceCell; use smallvec::SmallVec; @@ -52,6 +60,7 @@ static BCA_EXECUTOR: OnceCell<BcaExecutor> = OnceCell::new(); pub fn set_bca_executor( currency: String, + cm_accessor: AsyncAccessor, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>, dbs_reader: DbsReaderImpl, self_keypair: Ed25519KeyPair, @@ -61,6 +70,7 @@ pub fn set_bca_executor( BCA_EXECUTOR .set(BcaExecutor { currency, + cm_accessor, dbs_pool, dbs_reader, self_keypair, @@ -86,6 +96,7 @@ where #[derive(Clone)] struct BcaExecutor { + cm_accessor: AsyncAccessor, currency: String, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>, dbs_reader: DbsReaderImpl, @@ -132,6 +143,7 @@ impl BcaExecutor { vec }) } + async fn execute_inner( &self, stream: impl TryStream<Ok = BcaReq, Error = bincode::Error>, @@ -142,6 +154,7 @@ impl BcaExecutor { let self_clone = self.clone(); tokio::spawn(async move { self_clone.execute_req(req, is_whitelisted).await }) }) + .take(MAX_BATCH_SIZE) .try_collect::<FuturesUnordered<_>>() .await { @@ -225,9 +238,11 @@ mod tests { pub use duniter_dbs::databases::cm_v1::{CmV1Db, CmV1DbReadable}; pub use duniter_dbs::databases::txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable}; pub use duniter_dbs::BlockMetaV2; + pub use duniter_global::{CurrentMeta, MockAsyncAccessor}; pub use duniter_gva_dbs_reader::MockDbsReader; pub use futures::TryStreamExt; + pub type AsyncAccessor = duniter_dbs::kv_typed::prelude::Arc<MockAsyncAccessor>; pub type DbsReaderImpl = duniter_dbs::kv_typed::prelude::Arc<MockDbsReader>; impl BcaExecutor { @@ -237,11 +252,15 @@ mod tests { } } - pub(crate) fn create_bca_executor(mock_dbs_reader: MockDbsReader) -> KvResult<BcaExecutor> { + pub(crate) fn create_bca_executor( + mock_cm: MockAsyncAccessor, + mock_dbs_reader: MockDbsReader, + ) -> KvResult<BcaExecutor> { let dbs = SharedDbs::mem()?; let threadpool = fast_threadpool::ThreadPool::start(fast_threadpool::ThreadPoolConfig::low(), dbs); Ok(BcaExecutor { + cm_accessor: duniter_dbs::kv_typed::prelude::Arc::new(mock_cm), currency: "g1".to_owned(), dbs_pool: threadpool.into_async_handler(), dbs_reader: duniter_dbs::kv_typed::prelude::Arc::new(mock_dbs_reader), @@ -275,12 +294,13 @@ mod tests { //println!("bytes_for_bincode={:?}", &bytes[4..]); assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?); - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<u64>() .times(1) - .returning(|_| Ok(Some(DubpBlockV10::default()))); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + .returning(|f| Some(f(&CurrentMeta::default()))); + let bca_executor = create_bca_executor(mock_cm, MockDbsReader::new()) + .expect("fail to create bca executor"); //println!("bytes={:?}", bytes); let bytes_res = bca_executor.execute(io_stream(bytes), false).await; @@ -317,8 +337,8 @@ mod tests { //println!("bytes_for_bincode={:?}", &bytes[4..]); assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?); - let bca_executor = - create_bca_executor(MockDbsReader::new()).expect("fail to create bca executor"); + let bca_executor = create_bca_executor(MockAsyncAccessor::new(), MockDbsReader::new()) + .expect("fail to create bca executor"); //println!("bytes={:?}", bytes); let bytes_res = bca_executor.execute(io_stream(bytes), false).await; @@ -357,12 +377,13 @@ mod tests { bincode_opts().serialize_into(&mut bytes[11..], &req2)?; bytes[10] = 3; - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<u64>() .times(1) - .returning(|_| Ok(Some(DubpBlockV10::default()))); - let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + .returning(|f| Some(f(&CurrentMeta::default()))); + let bca_executor = create_bca_executor(mock_cm, MockDbsReader::new()) + .expect("fail to create bca executor"); //println!("bytes={:?}", bytes); let bytes_res = bca_executor.execute(io_stream(bytes), false).await; diff --git a/rust-libs/modules/gva/bca/types/Cargo.toml b/rust-libs/modules/gva/bca/types/Cargo.toml index 37df065a5..5c5a8e833 100644 --- a/rust-libs/modules/gva/bca/types/Cargo.toml +++ b/rust-libs/modules/gva/bca/types/Cargo.toml @@ -6,6 +6,7 @@ license = "AGPL-3.0" edition = "2018" [dependencies] +arrayvec = { version = "0.5.1", features = ["serde"] } bincode = "1.3" dubp = { version = "0.50.0" } serde = { version = "1.0.105", features = ["derive"] } diff --git a/rust-libs/modules/gva/bca/types/src/amount.rs b/rust-libs/modules/gva/bca/types/src/amount.rs new file mode 100644 index 000000000..1682a3141 --- /dev/null +++ b/rust-libs/modules/gva/bca/types/src/amount.rs @@ -0,0 +1,46 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] +pub enum Amount { + Cents(SourceAmount), + Uds(f64), +} + +impl Default for Amount { + fn default() -> Self { + Self::Cents(SourceAmount::ZERO) + } +} + +impl Amount { + pub fn to_cents(self, ud_amount: SourceAmount) -> SourceAmount { + match self { + Amount::Cents(sa) => sa, + Amount::Uds(f64_) => { + if !f64_.is_finite() || f64_ <= 0f64 { + SourceAmount::ZERO + } else { + SourceAmount::new( + f64::round(ud_amount.amount() as f64 * f64_) as i64, + ud_amount.base(), + ) + } + } + } + } +} diff --git a/rust-libs/modules/gva/bca/types/src/lib.rs b/rust-libs/modules/gva/bca/types/src/lib.rs index 2d5e9d9cd..8c9f84501 100644 --- a/rust-libs/modules/gva/bca/types/src/lib.rs +++ b/rust-libs/modules/gva/bca/types/src/lib.rs @@ -22,11 +22,16 @@ unused_import_braces )] +pub mod amount; pub mod prepare_payment; pub mod rejected_tx; +pub mod utxo; +use crate::amount::Amount; use crate::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp}; +use crate::utxo::Utxo; +use arrayvec::ArrayVec; use bincode::Options as _; use dubp::crypto::keys::ed25519::{PublicKey, Signature}; use dubp::wallet::prelude::*; @@ -35,33 +40,38 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use thiserror::Error; -pub fn bincode_opts() -> impl bincode::Options { - bincode::options() - .with_limit(u32::max_value() as u64) - .allow_trailing_bytes() -} +// Constants + +pub const MAX_FIRST_UTXOS: usize = 40; // Request -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub enum BcaReq { V0(BcaReqV0), _V1, } -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct BcaReqV0 { pub req_id: usize, pub req_type: BcaReqTypeV0, } #[allow(clippy::large_enum_variant)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub enum BcaReqTypeV0 { + BalancesOfPubkeys(ArrayVec<[PublicKey; 16]>), + FirstUtxosOfPubkeys { + amount_target_opt: Option<Amount>, + pubkeys: ArrayVec<[PublicKey; 16]>, + }, LastBlockstampOutOfForkWindow, MembersCount, PrepareSimplePayment(PrepareSimplePayment), - ProofServerPubkey { challenge: [u8; 16] }, + ProofServerPubkey { + challenge: [u8; 16], + }, Ping, SendTxs(Txs), } @@ -84,9 +94,12 @@ pub struct BcaRespV0 { pub resp_type: BcaRespTypeV0, } +#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub enum BcaRespTypeV0 { Error(String), + Balances(ArrayVec<[Option<SourceAmount>; 16]>), + FirstUtxosOfPubkeys(Vec<ArrayVec<[Utxo; MAX_FIRST_UTXOS]>>), ProofServerPubkey { challenge: [u8; 16], server_pubkey: PublicKey, @@ -114,3 +127,11 @@ pub enum BcaReqExecError { #[error("Unknown error")] Unknown, } + +// Bincode configuration + +pub fn bincode_opts() -> impl bincode::Options { + bincode::options() + .with_limit(u32::max_value() as u64) + .allow_trailing_bytes() +} diff --git a/rust-libs/modules/gva/bca/types/src/prepare_payment.rs b/rust-libs/modules/gva/bca/types/src/prepare_payment.rs index d35c6f204..b815c4ae8 100644 --- a/rust-libs/modules/gva/bca/types/src/prepare_payment.rs +++ b/rust-libs/modules/gva/bca/types/src/prepare_payment.rs @@ -16,10 +16,10 @@ use crate::*; use dubp::documents::transaction::TransactionInputV10; -#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] pub struct PrepareSimplePayment { pub issuer: PublicKey, - pub amount: SourceAmount, + pub amount: Amount, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] diff --git a/rust-libs/duniter-server/src/fill_cm_db.rs b/rust-libs/modules/gva/bca/types/src/utxo.rs similarity index 62% rename from rust-libs/duniter-server/src/fill_cm_db.rs rename to rust-libs/modules/gva/bca/types/src/utxo.rs index dfa4c3390..d7716a808 100644 --- a/rust-libs/duniter-server/src/fill_cm_db.rs +++ b/rust-libs/modules/gva/bca/types/src/utxo.rs @@ -14,18 +14,10 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::*; -use duniter_dbs::databases::bc_v2::BcV2DbReadable; -pub(super) fn fill_current_meta_db(shared_dbs: &SharedDbs<FileBackend>) -> anyhow::Result<()> { - if let Some(current_block_meta) = shared_dbs - .bc_db_ro - .blocks_meta() - .iter_rev(.., |it| it.values().next_res())? - { - shared_dbs - .cm_db - .current_block_meta_write() - .upsert((), current_block_meta)?; - } - Ok(()) +#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq, Serialize)] +pub struct Utxo { + pub amount: SourceAmount, + pub tx_hash: Hash, + pub output_index: u8, } diff --git a/rust-libs/modules/gva/dbs-reader/Cargo.toml b/rust-libs/modules/gva/dbs-reader/Cargo.toml index 2d5c78ac7..517c759c0 100644 --- a/rust-libs/modules/gva/dbs-reader/Cargo.toml +++ b/rust-libs/modules/gva/dbs-reader/Cargo.toml @@ -17,6 +17,7 @@ mock = ["mockall"] [dependencies] anyhow = "1.0.34" arrayvec = "0.5.1" +duniter-bca-types = { path = "../bca/types" } duniter-dbs = { path = "../../../duniter-dbs" } duniter-gva-db = { path = "../db" } dubp = { version = "0.50.0", features = ["duniter"] } diff --git a/rust-libs/modules/gva/dbs-reader/src/current_frame.rs b/rust-libs/modules/gva/dbs-reader/src/current_frame.rs index cecf4e2c9..4ca4c48dc 100644 --- a/rust-libs/modules/gva/dbs-reader/src/current_frame.rs +++ b/rust-libs/modules/gva/dbs-reader/src/current_frame.rs @@ -18,23 +18,16 @@ use duniter_dbs::BlockMetaV2; use crate::*; impl DbsReaderImpl { - pub(super) fn get_current_frame_< - BcDb: 'static + BcV2DbReadable, - CmDb: 'static + CmV1DbReadable, - >( + pub(super) fn get_current_frame_<BcDb: 'static + BcV2DbReadable>( &self, bc_db: &BcDb, - cm_db: &CmDb, + current_block_meta: &BlockMetaV2, ) -> anyhow::Result<Vec<BlockMetaV2>> { - if let Some(current_block_meta) = self.get_current_block_meta(cm_db)? { - let issuers_frame = current_block_meta.issuers_frame; - let start = U32BE(current_block_meta.number + 1 - issuers_frame as u32); - bc_db - .blocks_meta() - .iter_rev(start.., |it| it.values().collect::<KvResult<_>>()) - .map_err(Into::into) - } else { - Ok(Vec::with_capacity(0)) - } + let issuers_frame = current_block_meta.issuers_frame; + let start = U32BE(current_block_meta.number + 1 - issuers_frame as u32); + bc_db + .blocks_meta() + .iter_rev(start.., |it| it.values().collect::<KvResult<_>>()) + .map_err(Into::into) } } diff --git a/rust-libs/modules/gva/dbs-reader/src/lib.rs b/rust-libs/modules/gva/dbs-reader/src/lib.rs index 5e94d2296..385a8daaa 100644 --- a/rust-libs/modules/gva/dbs-reader/src/lib.rs +++ b/rust-libs/modules/gva/dbs-reader/src/lib.rs @@ -32,6 +32,7 @@ pub mod uds_of_pubkey; pub mod utxos; pub use crate::pagination::{PageInfo, PagedData}; +pub use duniter_bca_types::MAX_FIRST_UTXOS; use crate::pagination::{has_next_page, has_previous_page}; use arrayvec::ArrayVec; @@ -39,6 +40,7 @@ use dubp::common::crypto::keys::ed25519::PublicKey; use dubp::documents::transaction::TransactionDocumentV10; use dubp::{block::DubpBlockV10, common::crypto::hashs::Hash}; use dubp::{common::prelude::BlockNumber, wallet::prelude::*}; +use duniter_bca_types::utxo::Utxo; use duniter_dbs::FileBackend; use duniter_dbs::{ databases::{ @@ -103,9 +105,10 @@ pub trait DbsReader { ) -> anyhow::Result<PagedData<utxos::UtxosWithSum>>; fn first_scripts_utxos( &self, + amount_target_opt: Option<SourceAmount>, first: usize, scripts: &[WalletScriptV10], - ) -> anyhow::Result<Vec<arrayvec::ArrayVec<[utxos::Utxo; utxos::MAX_FIRST_UTXOS]>>>; + ) -> anyhow::Result<Vec<arrayvec::ArrayVec<[Utxo; MAX_FIRST_UTXOS]>>>; fn get_account_balance( &self, account_script: &WalletScriptV10, @@ -115,19 +118,11 @@ pub trait DbsReader { &self, cm_db: &CmDb, ) -> KvResult<Option<DubpBlockV10>>; - fn get_current_block_meta<CmDb: 'static + CmV1DbReadable>( - &self, - cm_db: &CmDb, - ) -> KvResult<Option<BlockMetaV2>>; - fn get_current_frame<BcDb: 'static + BcV2DbReadable, CmDb: 'static + CmV1DbReadable>( + fn get_current_frame<BcDb: 'static + BcV2DbReadable>( &self, bc_db: &BcDb, - cm_db: &CmDb, - ) -> anyhow::Result<Vec<duniter_dbs::BlockMetaV2>>; - fn get_current_ud<BcDb: 'static + BcV2DbReadable>( - &self, - bc_db: &BcDb, - ) -> KvResult<Option<SourceAmount>>; + current_block_meta: &BlockMetaV2, + ) -> anyhow::Result<Vec<BlockMetaV2>>; fn get_txs_history_bc_received( &self, from: Option<u64>, @@ -217,10 +212,11 @@ impl DbsReader for DbsReaderImpl { fn first_scripts_utxos( &self, + amount_target_opt: Option<SourceAmount>, first: usize, scripts: &[WalletScriptV10], - ) -> anyhow::Result<Vec<ArrayVec<[utxos::Utxo; utxos::MAX_FIRST_UTXOS]>>> { - self.first_scripts_utxos_(first, scripts) + ) -> anyhow::Result<Vec<ArrayVec<[Utxo; MAX_FIRST_UTXOS]>>> { + self.first_scripts_utxos_(amount_target_opt, first, scripts) } fn get_account_balance( @@ -247,25 +243,12 @@ impl DbsReader for DbsReaderImpl { Ok(cm_db.current_block().get(&())?.map(|db_block| db_block.0)) } - fn get_current_block_meta<CmDb: CmV1DbReadable>( - &self, - cm_db: &CmDb, - ) -> KvResult<Option<BlockMetaV2>> { - cm_db.current_block_meta().get(&()) - } - - fn get_current_frame<BcDb: 'static + BcV2DbReadable, CmDb: 'static + CmV1DbReadable>( + fn get_current_frame<BcDb: 'static + BcV2DbReadable>( &self, bc_db: &BcDb, - cm_db: &CmDb, + current_block_meta: &BlockMetaV2, ) -> anyhow::Result<Vec<BlockMetaV2>> { - self.get_current_frame_(bc_db, cm_db) - } - - fn get_current_ud<BcDb: BcV2DbReadable>(&self, bc_db: &BcDb) -> KvResult<Option<SourceAmount>> { - bc_db - .uds_reval() - .iter_rev(.., |it| it.values().map_ok(|v| v.0).next_res()) + self.get_current_frame_(bc_db, current_block_meta) } fn get_txs_history_bc_received( diff --git a/rust-libs/modules/gva/dbs-reader/src/utxos.rs b/rust-libs/modules/gva/dbs-reader/src/utxos.rs index 2437507b6..ab19edcc6 100644 --- a/rust-libs/modules/gva/dbs-reader/src/utxos.rs +++ b/rust-libs/modules/gva/dbs-reader/src/utxos.rs @@ -18,8 +18,6 @@ use duniter_dbs::SourceAmountValV2; use crate::*; -pub const MAX_FIRST_UTXOS: usize = 40; - #[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] pub struct UtxoCursor { pub block_number: BlockNumber, @@ -60,13 +58,6 @@ impl FromStr for UtxoCursor { } } -#[derive(Clone, Copy, Debug, Default, PartialEq)] -pub struct Utxo { - pub amount: SourceAmount, - pub tx_hash: Hash, - pub output_index: u8, -} - #[derive(Debug, Default)] pub struct UtxosWithSum { pub utxos: Vec<(UtxoCursor, SourceAmount)>, @@ -180,25 +171,38 @@ impl DbsReaderImpl { } pub(super) fn first_scripts_utxos_( &self, + amount_target_opt: Option<SourceAmount>, first: usize, scripts: &[WalletScriptV10], ) -> anyhow::Result<Vec<ArrayVec<[Utxo; MAX_FIRST_UTXOS]>>> { - Ok(scripts - .iter() - .map(|script| { - let (k_min, k_max) = - GvaUtxoIdDbV1::script_interval(Hash::compute(script.to_string().as_bytes())); - self.0.gva_utxos().iter(k_min..k_max, |it| { - it.take(first) - .map_ok(|(k, v)| Utxo { - amount: v.0, - tx_hash: k.get_tx_hash(), - output_index: k.get_output_index(), - }) - .collect::<KvResult<_>>() - }) + let iter = scripts.iter().map(|script| { + let (k_min, k_max) = + GvaUtxoIdDbV1::script_interval(Hash::compute(script.to_string().as_bytes())); + self.0.gva_utxos().iter(k_min..k_max, |it| { + it.take(first) + .map_ok(|(k, v)| Utxo { + amount: v.0, + tx_hash: k.get_tx_hash(), + output_index: k.get_output_index(), + }) + .collect::<KvResult<_>>() }) - .collect::<KvResult<Vec<_>>>()?) + }); + if let Some(amount_target) = amount_target_opt { + let mut sum = SourceAmount::ZERO; + Ok(iter + .take_while(|utxos_res: &KvResult<ArrayVec<[Utxo; MAX_FIRST_UTXOS]>>| { + if let Ok(utxos) = utxos_res { + sum = sum + utxos.iter().map(|utxo| utxo.amount).sum(); + sum <= amount_target + } else { + true + } + }) + .collect::<KvResult<Vec<_>>>()?) + } else { + Ok(iter.collect::<KvResult<Vec<_>>>()?) + } } } @@ -324,7 +328,7 @@ mod tests { )?; assert_eq!( - db_reader.first_scripts_utxos(2, &[script, script2])?, + db_reader.first_scripts_utxos(None, 2, &[script, script2])?, vec![ [ Utxo { diff --git a/rust-libs/modules/gva/gql/Cargo.toml b/rust-libs/modules/gva/gql/Cargo.toml index 30539cd35..4fa2457a5 100644 --- a/rust-libs/modules/gva/gql/Cargo.toml +++ b/rust-libs/modules/gva/gql/Cargo.toml @@ -16,6 +16,7 @@ duniter-dbs = { path = "../../../duniter-dbs" } duniter-bc-reader = { path = "../../../duniter-bc-reader" } duniter-gva-db = { path = "../db" } duniter-gva-dbs-reader = { path = "../dbs-reader" } +duniter-global = { path = "../../../duniter-global" } duniter-mempools = { path = "../../../duniter-mempools" } duniter-module = { path = "../../../duniter-module" } fast-threadpool = "0.2.3" @@ -28,6 +29,7 @@ serde = { version = "1.0.105", features = ["derive"] } [dev-dependencies] duniter-dbs = { path = "../../../duniter-dbs", features = ["mem"] } duniter-gva-dbs-reader = { path = "../dbs-reader", features = ["mock"] } +duniter-global = { path = "../../../duniter-global", features = ["mock"] } mockall = "0.9.1" serde_json = "1.0.53" tokio = { version = "1.2", features = ["macros", "rt-multi-thread", "time"] } diff --git a/rust-libs/modules/gva/gql/src/lib.rs b/rust-libs/modules/gva/gql/src/lib.rs index 37676ab87..9858e81c7 100644 --- a/rust-libs/modules/gva/gql/src/lib.rs +++ b/rust-libs/modules/gva/gql/src/lib.rs @@ -48,6 +48,8 @@ use crate::inputs_validators::TxCommentValidator; use crate::pagination::Pagination; use crate::scalars::{PkOrScriptGva, PubKeyGva}; #[cfg(test)] +use crate::tests::AsyncAccessor; +#[cfg(test)] use crate::tests::DbsReaderImpl; use async_graphql::connection::{Connection, Edge, EmptyFields}; use async_graphql::validators::{IntGreaterThan, IntRange, ListMaxLength, ListMinLength}; @@ -61,6 +63,8 @@ use dubp::wallet::prelude::*; use duniter_dbs::databases::txs_mp_v2::TxsMpV2DbReadable; use duniter_dbs::prelude::*; use duniter_dbs::{kv_typed::prelude::*, FileBackend}; +#[cfg(not(test))] +use duniter_global::AsyncAccessor; use duniter_gva_dbs_reader::pagination::PageInfo; use duniter_gva_dbs_reader::DbsReader; #[cfg(not(test))] @@ -84,18 +88,24 @@ pub struct ServerMetaData { #[cfg(test)] mod tests { + pub use duniter_global::{CurrentMeta, MockAsyncAccessor}; pub use duniter_gva_dbs_reader::MockDbsReader; use super::*; use fast_threadpool::ThreadPoolConfig; + pub type AsyncAccessor = duniter_dbs::kv_typed::prelude::Arc<MockAsyncAccessor>; pub type DbsReaderImpl = duniter_dbs::kv_typed::prelude::Arc<MockDbsReader>; - pub(crate) fn create_schema(dbs_ops: MockDbsReader) -> KvResult<GvaSchema> { + pub(crate) fn create_schema( + mock_cm: MockAsyncAccessor, + dbs_ops: MockDbsReader, + ) -> KvResult<GvaSchema> { let dbs = SharedDbs::mem()?; let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), dbs); Ok(schema::build_schema_with_data( schema::GvaSchemaData { + cm_accessor: Arc::new(mock_cm), dbs_pool: threadpool.into_async_handler(), dbs_reader: Arc::new(dbs_ops), server_meta_data: ServerMetaData { diff --git a/rust-libs/modules/gva/gql/src/queries.rs b/rust-libs/modules/gva/gql/src/queries.rs index b90118298..20de06c01 100644 --- a/rust-libs/modules/gva/gql/src/queries.rs +++ b/rust-libs/modules/gva/gql/src/queries.rs @@ -25,7 +25,6 @@ pub mod uds; pub mod utxos_of_script; use crate::*; -use duniter_dbs::databases::cm_v1::CmV1DbReadable as _; #[derive(async_graphql::MergedObject, Default)] pub struct QueryRoot( @@ -60,11 +59,15 @@ impl Node { ) -> async_graphql::Result<Option<PeerCardGva>> { let data = ctx.data::<GvaSchemaData>()?; - Ok(data - .dbs_pool - .execute(move |dbs| dbs.cm_db.self_peer_old().get(&())) - .await?? - .map(Into::into)) + if let Some(self_peer_old) = data + .cm_accessor() + .get_self_peer_old(|self_peer_old| self_peer_old.clone()) + .await + { + Ok(Some(PeerCardGva::from(self_peer_old))) + } else { + Ok(None) + } } /// Software async fn software(&self) -> &'static str { diff --git a/rust-libs/modules/gva/gql/src/queries/account_balance.rs b/rust-libs/modules/gva/gql/src/queries/account_balance.rs index 57e5f0fd2..9f25cdeca 100644 --- a/rust-libs/modules/gva/gql/src/queries/account_balance.rs +++ b/rust-libs/modules/gva/gql/src/queries/account_balance.rs @@ -92,7 +92,7 @@ mod tests { }) .times(1) .returning(|_| Ok(Some(SourceAmountValV2(SourceAmount::with_base0(38))))); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(MockAsyncAccessor::new(), dbs_reader)?; assert_eq!( exec_graphql_request( &schema, @@ -123,7 +123,7 @@ mod tests { }) .times(1) .returning(|_| Ok(Some(SourceAmountValV2(SourceAmount::with_base0(38))))); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(MockAsyncAccessor::new(), dbs_reader)?; assert_eq!( exec_graphql_request( &schema, diff --git a/rust-libs/modules/gva/gql/src/queries/block.rs b/rust-libs/modules/gva/gql/src/queries/block.rs index 3c0c60743..1d09cb664 100644 --- a/rust-libs/modules/gva/gql/src/queries/block.rs +++ b/rust-libs/modules/gva/gql/src/queries/block.rs @@ -83,7 +83,7 @@ mod tests { .withf(|_, s| s.0 == 0) .times(1) .returning(|_, _| Ok(Some(BlockMetaV2::default()))); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(MockAsyncAccessor::new(), dbs_reader)?; assert_eq!( exec_graphql_request(&schema, r#"{ blockByNumber(number: 0) {number} }"#).await?, serde_json::json!({ @@ -112,7 +112,7 @@ mod tests { has_previous_page: false, }) }); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(MockAsyncAccessor::new(), dbs_reader)?; assert_eq!( exec_graphql_request( &schema, diff --git a/rust-libs/modules/gva/gql/src/queries/current_block.rs b/rust-libs/modules/gva/gql/src/queries/current_block.rs index a6d9ff13b..100706dbd 100644 --- a/rust-libs/modules/gva/gql/src/queries/current_block.rs +++ b/rust-libs/modules/gva/gql/src/queries/current_block.rs @@ -23,37 +23,33 @@ impl CurrentBlockQuery { async fn current_block( &self, ctx: &async_graphql::Context<'_>, - ) -> async_graphql::Result<Block> { + ) -> async_graphql::Result<BlockMeta> { let data = ctx.data::<GvaSchemaData>()?; - let dbs_reader = data.dbs_reader(); - data.dbs_pool - .execute(move |dbs| { - if let Some(current_block) = dbs_reader.get_current_block(&dbs.cm_db)? { - Ok(Block::from(¤t_block)) - } else { - Err(async_graphql::Error::new("no blockchain")) - } - }) - .await? + if let Some(current_block_meta) = data + .cm_accessor() + .get_current_meta(|cm| cm.current_block_meta) + .await + { + Ok(current_block_meta.into()) + } else { + Err(async_graphql::Error::new("no blockchain")) + } } } #[cfg(test)] mod tests { - use super::*; use crate::tests::*; - use dubp::block::DubpBlockV10; - use duniter_dbs::databases::cm_v1::CmV1Db; #[tokio::test] async fn query_current_block() -> anyhow::Result<()> { - let mut dbs_reader = MockDbsReader::new(); - dbs_reader - .expect_get_current_block::<CmV1Db<MemSingleton>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<duniter_dbs::BlockMetaV2>() .times(1) - .returning(|_| Ok(Some(DubpBlockV10::default()))); - let schema = create_schema(dbs_reader)?; + .returning(|f| Some(f(&CurrentMeta::default()))); + let schema = create_schema(mock_cm, MockDbsReader::new())?; assert_eq!( exec_graphql_request(&schema, r#"{ currentBlock {nonce} }"#).await?, serde_json::json!({ diff --git a/rust-libs/modules/gva/gql/src/queries/current_frame.rs b/rust-libs/modules/gva/gql/src/queries/current_frame.rs index 3027ce6f2..a5ba2584a 100644 --- a/rust-libs/modules/gva/gql/src/queries/current_frame.rs +++ b/rust-libs/modules/gva/gql/src/queries/current_frame.rs @@ -27,13 +27,23 @@ impl CurrentFrameQuery { let data = ctx.data::<GvaSchemaData>()?; let dbs_reader = data.dbs_reader(); - Ok(data - .dbs_pool - .execute(move |dbs| dbs_reader.get_current_frame(&dbs.bc_db_ro, &dbs.cm_db)) - .await?? - .into_iter() - .map(Into::into) - .collect()) + if let Some(current_block_meta) = data + .cm_accessor() + .get_current_meta(|cm| cm.current_block_meta) + .await + { + Ok(data + .dbs_pool + .execute(move |dbs| { + dbs_reader.get_current_frame(&dbs.bc_db_ro, ¤t_block_meta) + }) + .await?? + .into_iter() + .map(Into::into) + .collect()) + } else { + Ok(vec![]) + } } } @@ -42,21 +52,33 @@ mod tests { use super::*; use crate::tests::*; use duniter_dbs::databases::bc_v2::BcV2DbRo; - use duniter_dbs::databases::cm_v1::CmV1Db; use duniter_dbs::BlockMetaV2; #[tokio::test] async fn query_current_frame() -> anyhow::Result<()> { + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<BlockMetaV2>() + .times(1) + .returning(|f| { + Some(f(&CurrentMeta { + current_block_meta: BlockMetaV2 { + issuers_frame: 1, + ..Default::default() + }, + ..Default::default() + })) + }); let mut dbs_reader = MockDbsReader::new(); dbs_reader - .expect_get_current_frame::<BcV2DbRo<FileBackend>, CmV1Db<MemSingleton>>() + .expect_get_current_frame::<BcV2DbRo<FileBackend>>() .times(1) .returning(|_, _| { Ok(vec![BlockMetaV2 { ..Default::default() }]) }); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(mock_cm, dbs_reader)?; assert_eq!( exec_graphql_request(&schema, r#"{ currentFrame {nonce} }"#).await?, serde_json::json!({ diff --git a/rust-libs/modules/gva/gql/src/queries/first_utxos_of_scripts.rs b/rust-libs/modules/gva/gql/src/queries/first_utxos_of_scripts.rs index 168fdc43f..a8d3c1bd2 100644 --- a/rust-libs/modules/gva/gql/src/queries/first_utxos_of_scripts.rs +++ b/rust-libs/modules/gva/gql/src/queries/first_utxos_of_scripts.rs @@ -42,7 +42,7 @@ impl FirstUtxosQuery { let utxos_matrice: Vec<arrayvec::ArrayVec<_>> = data .dbs_pool - .execute(move |_| db_reader.first_scripts_utxos(first as usize, &scripts)) + .execute(move |_| db_reader.first_scripts_utxos(None, first as usize, &scripts)) .await??; Ok(utxos_matrice diff --git a/rust-libs/modules/gva/gql/src/queries/gen_tx.rs b/rust-libs/modules/gva/gql/src/queries/gen_tx.rs index 1cd022ffc..8762966a8 100644 --- a/rust-libs/modules/gva/gql/src/queries/gen_tx.rs +++ b/rust-libs/modules/gva/gql/src/queries/gen_tx.rs @@ -113,50 +113,50 @@ impl GenTxsQuery { let db_reader = data.dbs_reader(); let currency = data.server_meta_data.currency.clone(); - let (current_block, (inputs, inputs_sum)) = data - .dbs_pool - .execute(move |dbs| { - if let Some(current_block) = duniter_bc_reader::get_current_block_meta(&dbs.cm_db)? - { - Ok(( - current_block, - db_reader.find_inputs( - &dbs.bc_db_ro, - &dbs.txs_mp_db, - SourceAmount::new(amount as i64, current_block.unit_base as i64), - &WalletScriptV10::single(WalletConditionV10::Sig(issuer)), - use_mempool_sources, - )?, - )) - } else { - Err(anyhow::Error::msg("no blockchain")) - } - }) - .await??; + if let Some(current_block_meta) = data + .cm_accessor + .get_current_meta(|cm| cm.current_block_meta) + .await + { + let (inputs, inputs_sum) = data + .dbs_pool + .execute(move |dbs| { + db_reader.find_inputs( + &dbs.bc_db_ro, + &dbs.txs_mp_db, + SourceAmount::new(amount as i64, current_block_meta.unit_base as i64), + &WalletScriptV10::single(WalletConditionV10::Sig(issuer)), + use_mempool_sources, + ) + }) + .await??; - let amount = SourceAmount::new(amount as i64, current_block.unit_base as i64); + let amount = SourceAmount::new(amount as i64, current_block_meta.unit_base as i64); - if inputs_sum < amount { - return Err(async_graphql::Error::new("insufficient balance")); - } + if inputs_sum < amount { + return Err(async_graphql::Error::new("insufficient balance")); + } - let current_blockstamp = Blockstamp { - number: BlockNumber(current_block.number), - hash: BlockHash(current_block.hash), - }; + let current_blockstamp = Blockstamp { + number: BlockNumber(current_block_meta.number), + hash: BlockHash(current_block_meta.hash), + }; - Ok(TransactionDocumentV10::generate_simple_txs( - current_blockstamp, - currency, - (inputs, inputs_sum), - issuer, - recipient, - (amount, comment), - cash_back_address.map(|pubkey_gva| pubkey_gva.0), - ) - .into_iter() - .map(|tx| tx.as_text().to_owned()) - .collect()) + Ok(TransactionDocumentV10::generate_simple_txs( + current_blockstamp, + currency, + (inputs, inputs_sum), + issuer, + recipient, + (amount, comment), + cash_back_address.map(|pubkey_gva| pubkey_gva.0), + ) + .into_iter() + .map(|tx| tx.as_text().to_owned()) + .collect()) + } else { + Err(async_graphql::Error::new("no blockchain")) + } } /// Generate complex transaction document async fn gen_complex_tx( @@ -191,11 +191,14 @@ impl GenTxsQuery { let db_reader = data.dbs_reader(); let currency = data.server_meta_data.currency.clone(); - let (current_block, issuers_inputs_with_sum) = data - .dbs_pool - .execute(move |dbs| { - if let Some(current_block) = duniter_bc_reader::get_current_block_meta(&dbs.cm_db)? - { + if let Some(current_block_meta) = data + .cm_accessor + .get_current_meta(|cm| cm.current_block_meta) + .await + { + let issuers_inputs_with_sum = data + .dbs_pool + .execute(move |dbs| { let mut issuers_inputs_with_sum = Vec::new(); for issuer in issuers { issuers_inputs_with_sum.push(( @@ -204,7 +207,7 @@ impl GenTxsQuery { &dbs.txs_mp_db, SourceAmount::new( issuer.amount as i64, - current_block.unit_base as i64, + current_block_meta.unit_base as i64, ), &issuer.script, use_mempool_sources, @@ -212,62 +215,63 @@ impl GenTxsQuery { issuer, )); } - Ok((current_block, issuers_inputs_with_sum)) - } else { - Err(anyhow::Error::msg("no blockchain")) - } - }) - .await??; + Ok::<_, anyhow::Error>(issuers_inputs_with_sum) + }) + .await??; - for ((_inputs, inputs_sum), issuer) in &issuers_inputs_with_sum { - let amount = SourceAmount::new(issuer.amount as i64, current_block.unit_base as i64); - if *inputs_sum < amount { - return Err(async_graphql::Error::new(format!( - "Insufficient balance for issuer {}", - issuer.script.to_string() - ))); + for ((_inputs, inputs_sum), issuer) in &issuers_inputs_with_sum { + let amount = + SourceAmount::new(issuer.amount as i64, current_block_meta.unit_base as i64); + if *inputs_sum < amount { + return Err(async_graphql::Error::new(format!( + "Insufficient balance for issuer {}", + issuer.script.to_string() + ))); + } } - } - let current_blockstamp = Blockstamp { - number: BlockNumber(current_block.number), - hash: BlockHash(current_block.hash), - }; - let base = current_block.unit_base as i64; - - let (final_tx_opt, changes_txs) = TransactionDocV10ComplexGen { - blockstamp: current_blockstamp, - currency, - issuers: issuers_inputs_with_sum - .into_iter() - .map(|((inputs, inputs_sum), issuer)| TxV10ComplexIssuer { - amount: SourceAmount::new(issuer.amount as i64, base), - codes: issuer.codes, - inputs, - inputs_sum, - script: issuer.script, - signers: issuer.signers, - }) - .collect(), - recipients: recipients - .into_iter() - .map(|TxRecipientTyped { amount, script }| { - (SourceAmount::new(amount as i64, base), script) - }) - .collect(), - user_comment: comment, - } - .gen()?; + let current_blockstamp = Blockstamp { + number: BlockNumber(current_block_meta.number), + hash: BlockHash(current_block_meta.hash), + }; + let base = current_block_meta.unit_base as i64; - if let Some(final_tx) = final_tx_opt { - Ok(RawTxOrChanges::FinalTx(final_tx.as_text().to_owned())) - } else { - Ok(RawTxOrChanges::Changes( - changes_txs + let (final_tx_opt, changes_txs) = TransactionDocV10ComplexGen { + blockstamp: current_blockstamp, + currency, + issuers: issuers_inputs_with_sum .into_iter() - .map(|tx| tx.as_text().to_owned()) + .map(|((inputs, inputs_sum), issuer)| TxV10ComplexIssuer { + amount: SourceAmount::new(issuer.amount as i64, base), + codes: issuer.codes, + inputs, + inputs_sum, + script: issuer.script, + signers: issuer.signers, + }) .collect(), - )) + recipients: recipients + .into_iter() + .map(|TxRecipientTyped { amount, script }| { + (SourceAmount::new(amount as i64, base), script) + }) + .collect(), + user_comment: comment, + } + .gen()?; + + if let Some(final_tx) = final_tx_opt { + Ok(RawTxOrChanges::FinalTx(final_tx.as_text().to_owned())) + } else { + Ok(RawTxOrChanges::Changes( + changes_txs + .into_iter() + .map(|tx| tx.as_text().to_owned()) + .collect(), + )) + } + } else { + Err(async_graphql::Error::new("no blockchain")) } } } diff --git a/rust-libs/modules/gva/gql/src/queries/idty.rs b/rust-libs/modules/gva/gql/src/queries/idty.rs index 49e603d22..ac0090e8c 100644 --- a/rust-libs/modules/gva/gql/src/queries/idty.rs +++ b/rust-libs/modules/gva/gql/src/queries/idty.rs @@ -60,7 +60,7 @@ mod tests { username: String::from("JohnDoe"), })) }); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(MockAsyncAccessor::new(), dbs_reader)?; assert_eq!( exec_graphql_request( &schema, diff --git a/rust-libs/modules/gva/gql/src/queries/txs_history.rs b/rust-libs/modules/gva/gql/src/queries/txs_history.rs index e1b250d76..3cf5bfe93 100644 --- a/rust-libs/modules/gva/gql/src/queries/txs_history.rs +++ b/rust-libs/modules/gva/gql/src/queries/txs_history.rs @@ -299,7 +299,7 @@ mod tests { has_next_page: false, }) }); - let schema = create_schema(dbs_reader)?; + let schema = create_schema(MockAsyncAccessor::new(), dbs_reader)?; assert_eq!( exec_graphql_request( &schema, diff --git a/rust-libs/modules/gva/gql/src/queries/uds.rs b/rust-libs/modules/gva/gql/src/queries/uds.rs index 4b92132db..a3eab34c1 100644 --- a/rust-libs/modules/gva/gql/src/queries/uds.rs +++ b/rust-libs/modules/gva/gql/src/queries/uds.rs @@ -28,16 +28,17 @@ impl UdsQuery { ctx: &async_graphql::Context<'_>, ) -> async_graphql::Result<Option<CurrentUdGva>> { let data = ctx.data::<GvaSchemaData>()?; - let dbs_reader = data.dbs_reader(); - Ok(data - .dbs_pool - .execute(move |dbs| dbs_reader.get_current_ud(&dbs.bc_db_ro)) - .await?? - .map(|sa| CurrentUdGva { - amount: sa.amount(), - base: sa.base(), - })) + Ok( + if let Some(current_ud) = data.cm_accessor.get_current_meta(|cm| cm.current_ud).await { + Some(CurrentUdGva { + amount: current_ud.amount(), + base: current_ud.base(), + }) + } else { + None + }, + ) } /// Universal dividends issued by a public key #[allow(clippy::clippy::too_many_arguments)] @@ -55,18 +56,21 @@ impl UdsQuery { let data = ctx.data::<GvaSchemaData>()?; let dbs_reader = data.dbs_reader(); - let ( - PagedData { - data: UdsWithSum { uds, sum }, - has_previous_page, - has_next_page, - }, - times, - ) = data - .dbs_pool - .execute(move |dbs| { - if let Some(current_block) = duniter_bc_reader::get_current_block_meta(&dbs.cm_db)? - { + if let Some(current_base) = data + .cm_accessor + .get_current_meta(|cm| cm.current_block_meta.unit_base) + .await + { + let ( + PagedData { + data: UdsWithSum { uds, sum }, + has_previous_page, + has_next_page, + }, + times, + ) = data + .dbs_pool + .execute(move |dbs| { let paged_data = match filter { UdsFilter::All => { dbs_reader.all_uds_of_pubkey(&dbs.bc_db_ro, pubkey.0, pagination) @@ -76,9 +80,7 @@ impl UdsQuery { pubkey.0, pagination, None, - amount.map(|amount| { - SourceAmount::new(amount, current_block.unit_base as i64) - }), + amount.map(|amount| SourceAmount::new(amount, current_base as i64)), ), }?; @@ -87,45 +89,45 @@ impl UdsQuery { times.push(dbs_reader.get_blockchain_time(*bn)?); } Ok::<_, anyhow::Error>((paged_data, times)) - } else { - Err(anyhow::Error::msg("no blockchain")) - } - }) - .await??; + }) + .await??; - let mut conn = Connection::with_additional_fields( - has_previous_page, - has_next_page, - AggregateSum { - aggregate: Sum { - sum: AmountWithBase { - amount: sum.amount() as i32, - base: sum.base() as i32, + let mut conn = Connection::with_additional_fields( + has_previous_page, + has_next_page, + AggregateSum { + aggregate: Sum { + sum: AmountWithBase { + amount: sum.amount() as i32, + base: sum.base() as i32, + }, }, }, - }, - ); - let uds_timed = - uds.into_iter() - .zip(times.into_iter()) - .map(|((bn, sa), blockchain_time)| { - Edge::new( - bn.0.to_string(), - UdGva { - amount: sa.amount(), - base: sa.base(), - issuer: pubkey, - block_number: bn.0, - blockchain_time, - }, - ) - }); - if pagination.order() { - conn.append(uds_timed); + ); + let uds_timed = + uds.into_iter() + .zip(times.into_iter()) + .map(|((bn, sa), blockchain_time)| { + Edge::new( + bn.0.to_string(), + UdGva { + amount: sa.amount(), + base: sa.base(), + issuer: pubkey, + block_number: bn.0, + blockchain_time, + }, + ) + }); + if pagination.order() { + conn.append(uds_timed); + } else { + conn.append(uds_timed.rev()); + } + Ok(conn) } else { - conn.append(uds_timed.rev()); + Err(async_graphql::Error::new("no blockchain")) } - Ok(conn) } /// Universal dividends revaluations async fn uds_reval( @@ -157,13 +159,17 @@ mod tests { #[tokio::test] async fn query_current_ud() -> anyhow::Result<()> { - let mut dbs_reader = MockDbsReader::new(); - use duniter_dbs::databases::bc_v2::BcV2DbRo; - dbs_reader - .expect_get_current_ud::<BcV2DbRo<FileBackend>>() + let mut mock_cm = MockAsyncAccessor::new(); + mock_cm + .expect_get_current_meta::<SourceAmount>() .times(1) - .returning(|_| Ok(Some(SourceAmount::with_base0(100)))); - let schema = create_schema(dbs_reader)?; + .returning(|f| { + Some(f(&CurrentMeta { + current_ud: SourceAmount::with_base0(100), + ..Default::default() + })) + }); + let schema = create_schema(mock_cm, MockDbsReader::new())?; assert_eq!( exec_graphql_request(&schema, r#"{ currentUd {amount} }"#).await?, serde_json::json!({ diff --git a/rust-libs/modules/gva/gql/src/queries/utxos_of_script.rs b/rust-libs/modules/gva/gql/src/queries/utxos_of_script.rs index f9b9b3c7d..c3a3039e5 100644 --- a/rust-libs/modules/gva/gql/src/queries/utxos_of_script.rs +++ b/rust-libs/modules/gva/gql/src/queries/utxos_of_script.rs @@ -37,25 +37,26 @@ impl UtxosQuery { let pagination = Pagination::convert_to_page_info(pagination, *is_whitelisted)?; let data = ctx.data::<GvaSchemaData>()?; + let cm_accessor = data.cm_accessor(); let db_reader = data.dbs_reader(); - let ( - PagedData { - data: UtxosWithSum { utxos, sum }, - has_previous_page, - has_next_page, - }, - times, - ) = data - .dbs_pool - .execute(move |dbs| { - if let Some(current_block) = duniter_bc_reader::get_current_block_meta(&dbs.cm_db)? - { + if let Some(current_base) = cm_accessor + .get_current_meta(|cm| cm.current_block_meta.unit_base) + .await + { + let ( + PagedData { + data: UtxosWithSum { utxos, sum }, + has_previous_page, + has_next_page, + }, + times, + ) = data + .dbs_pool + .execute(move |dbs| { let paged_data = db_reader.find_script_utxos( &dbs.txs_mp_db, - amount.map(|amount| { - SourceAmount::new(amount, current_block.unit_base as i64) - }), + amount.map(|amount| SourceAmount::new(amount, current_base as i64)), pagination, &script.0, )?; @@ -64,39 +65,39 @@ impl UtxosQuery { times.push(db_reader.get_blockchain_time(*block_number)?); } Ok::<_, anyhow::Error>((paged_data, times)) - } else { - Err(anyhow::Error::msg("no blockchain")) - } - }) - .await??; + }) + .await??; - let mut conn = Connection::with_additional_fields( - has_previous_page, - has_next_page, - AggregateSum { - aggregate: Sum { - sum: AmountWithBase { - amount: sum.amount() as i32, - base: sum.base() as i32, + let mut conn = Connection::with_additional_fields( + has_previous_page, + has_next_page, + AggregateSum { + aggregate: Sum { + sum: AmountWithBase { + amount: sum.amount() as i32, + base: sum.base() as i32, + }, }, }, - }, - ); - conn.append(utxos.into_iter().zip(times.into_iter()).map( - |((utxo_cursor, source_amount), blockchain_time)| { - Edge::new( - utxo_cursor.to_string(), - UtxoTimedGva { - amount: source_amount.amount(), - base: source_amount.base(), - tx_hash: utxo_cursor.tx_hash.to_hex(), - output_index: utxo_cursor.output_index as u32, - written_block: utxo_cursor.block_number.0, - written_time: blockchain_time, - }, - ) - }, - )); - Ok(conn) + ); + conn.append(utxos.into_iter().zip(times.into_iter()).map( + |((utxo_cursor, source_amount), blockchain_time)| { + Edge::new( + utxo_cursor.to_string(), + UtxoTimedGva { + amount: source_amount.amount(), + base: source_amount.base(), + tx_hash: utxo_cursor.tx_hash.to_hex(), + output_index: utxo_cursor.output_index as u32, + written_block: utxo_cursor.block_number.0, + written_time: blockchain_time, + }, + ) + }, + )); + Ok(conn) + } else { + Err(async_graphql::Error::new("no blockchain")) + } } } diff --git a/rust-libs/modules/gva/gql/src/schema.rs b/rust-libs/modules/gva/gql/src/schema.rs index 6ef5830de..c5ab4f6de 100644 --- a/rust-libs/modules/gva/gql/src/schema.rs +++ b/rust-libs/modules/gva/gql/src/schema.rs @@ -45,6 +45,7 @@ pub fn build_schema_with_data(data: GvaSchemaData, logger: bool) -> GvaSchema { } pub struct GvaSchemaData { + pub cm_accessor: AsyncAccessor, pub dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>, pub dbs_reader: DbsReaderImpl, pub server_meta_data: ServerMetaData, @@ -53,6 +54,10 @@ pub struct GvaSchemaData { #[cfg(not(test))] impl GvaSchemaData { + #[inline(always)] + pub fn cm_accessor(&self) -> AsyncAccessor { + self.cm_accessor + } #[inline(always)] pub fn dbs_reader(&self) -> DbsReaderImpl { self.dbs_reader @@ -60,6 +65,9 @@ impl GvaSchemaData { } #[cfg(test)] impl GvaSchemaData { + pub fn cm_accessor(&self) -> AsyncAccessor { + self.cm_accessor.clone() + } pub fn dbs_reader(&self) -> DbsReaderImpl { self.dbs_reader.clone() } diff --git a/rust-libs/modules/gva/gql/src/subscriptions/new_blocks.rs b/rust-libs/modules/gva/gql/src/subscriptions/new_blocks.rs index e6e2a53f1..aa6c56e34 100644 --- a/rust-libs/modules/gva/gql/src/subscriptions/new_blocks.rs +++ b/rust-libs/modules/gva/gql/src/subscriptions/new_blocks.rs @@ -15,8 +15,7 @@ use super::create_subscription; use crate::*; -use duniter_dbs::databases::cm_v1::{CmV1DbReadable, CurrentBlockEvent, CurrentBlockMetaEvent}; -use futures::future::Either; +use duniter_dbs::databases::cm_v1::{CmV1DbReadable, CurrentBlockEvent}; #[derive(Clone, Copy, Default)] pub struct NewBlocksSubscription; @@ -27,63 +26,26 @@ impl NewBlocksSubscription { &self, ctx: &async_graphql::Context<'_>, ) -> impl Stream<Item = async_graphql::Result<Vec<Block>>> { - let meta_only = !(ctx.look_ahead().field("identities").exists() - || ctx.look_ahead().field("joiners").exists() - || ctx.look_ahead().field("actives").exists() - || ctx.look_ahead().field("leavers").exists() - || ctx.look_ahead().field("revoked").exists() - || ctx.look_ahead().field("excluded").exists() - || ctx.look_ahead().field("certifications").exists() - || ctx.look_ahead().field("transactions").exists()); - if meta_only { - Either::Left( - create_subscription( - ctx, - |dbs| dbs.cm_db.current_block_meta(), - |events| { - let mut blocks = Vec::new(); - for event in events.deref() { - if let CurrentBlockMetaEvent::Upsert { - value: ref block_meta, - .. - } = event - { - blocks.push(Block::from(block_meta)); - } - } - if blocks.is_empty() { - futures::future::ready(None) - } else { - futures::future::ready(Some(Ok(blocks))) - } - }, - ) - .await, - ) - } else { - Either::Right( - create_subscription( - ctx, - |dbs| dbs.cm_db.current_block(), - |events| { - let mut blocks = Vec::new(); - for event in events.deref() { - if let CurrentBlockEvent::Upsert { - value: ref block, .. - } = event - { - blocks.push(Block::from(&block.0)); - } - } - if blocks.is_empty() { - futures::future::ready(None) - } else { - futures::future::ready(Some(Ok(blocks))) - } - }, - ) - .await, - ) - } + create_subscription( + ctx, + |dbs| dbs.cm_db.current_block(), + |events| { + let mut blocks = Vec::new(); + for event in events.deref() { + if let CurrentBlockEvent::Upsert { + value: ref block, .. + } = event + { + blocks.push(Block::from(&block.0)); + } + } + if blocks.is_empty() { + futures::future::ready(None) + } else { + futures::future::ready(Some(Ok(blocks))) + } + }, + ) + .await } } diff --git a/rust-libs/modules/gva/src/lib.rs b/rust-libs/modules/gva/src/lib.rs index 5317ccebf..b637c342e 100644 --- a/rust-libs/modules/gva/src/lib.rs +++ b/rust-libs/modules/gva/src/lib.rs @@ -39,6 +39,7 @@ use duniter_conf::DuniterMode; use duniter_dbs::databases::txs_mp_v2::TxsMpV2DbReadable; use duniter_dbs::prelude::*; use duniter_dbs::{kv_typed::prelude::*, FileBackend}; +use duniter_global::AsyncAccessor; use duniter_gva_db::*; use duniter_gva_gql::{GvaSchema, QueryContext}; use duniter_gva_indexer::{get_gva_db_ro, get_gva_db_rw}; @@ -243,6 +244,7 @@ impl GvaModule { let self_pubkey = self_keypair.public_key(); duniter_bca::set_bca_executor( currency.clone(), + AsyncAccessor::new(), dbs_pool.clone(), duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), self_keypair, @@ -251,6 +253,7 @@ impl GvaModule { ); let gva_schema = duniter_gva_gql::build_schema_with_data( duniter_gva_gql::GvaSchemaData { + cm_accessor: AsyncAccessor::new(), dbs_reader: duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), dbs_pool, server_meta_data: duniter_gva_gql::ServerMetaData { -- GitLab