From 02a24c84391f54a69e840db34e19fdacdbe0724a Mon Sep 17 00:00:00 2001 From: librelois <c@elo.tf> Date: Wed, 17 Mar 2021 03:06:22 +0100 Subject: [PATCH] [feat] gva: add subset bca --- Cargo.lock | 100 +++++ Cargo.toml | 2 + rust-libs/modules/gva/Cargo.toml | 2 + rust-libs/modules/gva/bca/Cargo.toml | 27 ++ .../modules/gva/bca/src/exec_req_type.rs | 45 ++ .../bca/src/exec_req_type/members_count.rs | 33 ++ .../exec_req_type/prepare_simple_payment.rs | 188 +++++++++ rust-libs/modules/gva/bca/src/lib.rs | 385 ++++++++++++++++++ rust-libs/modules/gva/bca/src/utils.rs | 33 ++ rust-libs/modules/gva/bca/types/Cargo.toml | 17 + rust-libs/modules/gva/bca/types/src/lib.rs | 81 ++++ .../gva/bca/types/src/prepare_payment.rs | 31 ++ rust-libs/modules/gva/src/lib.rs | 10 +- rust-libs/modules/gva/src/warp_.rs | 156 ++++--- 14 files changed, 1060 insertions(+), 50 deletions(-) create mode 100644 rust-libs/modules/gva/bca/Cargo.toml create mode 100644 rust-libs/modules/gva/bca/src/exec_req_type.rs create mode 100644 rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs create mode 100644 rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs create mode 100644 rust-libs/modules/gva/bca/src/lib.rs create mode 100644 rust-libs/modules/gva/bca/src/utils.rs create mode 100644 rust-libs/modules/gva/bca/types/Cargo.toml create mode 100644 rust-libs/modules/gva/bca/types/src/lib.rs create mode 100644 rust-libs/modules/gva/bca/types/src/prepare_payment.rs diff --git a/Cargo.lock b/Cargo.lock index b1d85f162..3c59dfb2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,37 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" +[[package]] +name = "aes" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561" +dependencies = [ + "aes-soft", + "aesni", + "cipher", +] + +[[package]] +name = "aes-soft" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be14c7498ea50828a38d0e24a765ed2effe92a705885b57d029cd67d45744072" +dependencies = [ + "cipher", + "opaque-debug 0.3.0", +] + +[[package]] +name = "aesni" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2e11f5e94c2f7d386164cc2aa1f97823fed6f259e486940a71c174dd01b0ce" +dependencies = [ + "cipher", + "opaque-debug 0.3.0", +] + [[package]] name = "ahash" version = "0.3.8" @@ -83,6 +114,21 @@ dependencies = [ "syn", ] +[[package]] +name = "async-bincode" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a31c08aa335b3ab414d29bdefe1f4353408abf93f3db1e3e2cc78d3ec4f0d43" +dependencies = [ + "bincode", + "byteorder", + "bytes 1.0.1", + "futures-core", + "futures-sink", + "serde", + "tokio", +] + [[package]] name = "async-channel" version = "1.5.1" @@ -603,6 +649,15 @@ dependencies = [ "envmnt", ] +[[package]] +name = "cipher" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12f8e7987cbd042a63249497f41aed09f8e65add917ea6566effbc56578d6801" +dependencies = [ + "generic-array 0.14.4", +] + [[package]] name = "clap" version = "2.33.3" @@ -1083,6 +1138,36 @@ dependencies = [ "smallvec", ] +[[package]] +name = "duniter-bca" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-bincode", + "bincode", + "dubp", + "duniter-bca-types", + "duniter-dbs", + "duniter-gva-db", + "duniter-gva-dbs-reader", + "fast-threadpool", + "futures", + "mockall", + "once_cell", + "smallvec", + "tokio", + "uninit", +] + +[[package]] +name = "duniter-bca-types" +version = "0.1.0" +dependencies = [ + "dubp", + "serde", + "smallvec", +] + [[package]] name = "duniter-conf" version = "0.1.0" @@ -1165,7 +1250,9 @@ dependencies = [ "async-graphql", "async-mutex", "async-trait", + "bytes 1.0.1", "dubp", + "duniter-bca", "duniter-conf", "duniter-dbs", "duniter-gva-db", @@ -1376,12 +1463,16 @@ version = "0.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0be04829b31b18bacf5317001366d807e5fbd02085ee6348508c1299b5bcaf6c" dependencies = [ + "aes", + "arrayvec", "base64 0.13.0", "blake3", "bs58 0.4.0", "byteorder", "cryptoxide", + "ed25519-bip32", "getrandom 0.2.2", + "once_cell", "ring", "serde", "thiserror", @@ -1389,6 +1480,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ed25519-bip32" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8827180a2b511141fbe49141e50b31a8d542465e0fb572f81f36feea2addfe92" +dependencies = [ + "cryptoxide", +] + [[package]] name = "either" version = "1.6.1" diff --git a/Cargo.toml b/Cargo.toml index 0813c6d13..f88ec5eae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,8 @@ members = [ "rust-libs/duniter-module", "rust-libs/duniter-server", "rust-libs/modules/gva", + "rust-libs/modules/gva/bca", + "rust-libs/modules/gva/bca/types", "rust-libs/modules/gva/dbs-reader", "rust-libs/modules/gva/gql", "rust-libs/modules/gva/indexer", diff --git a/rust-libs/modules/gva/Cargo.toml b/rust-libs/modules/gva/Cargo.toml index fefd481af..1a29df632 100644 --- a/rust-libs/modules/gva/Cargo.toml +++ b/rust-libs/modules/gva/Cargo.toml @@ -11,7 +11,9 @@ arrayvec = "0.5.1" async-graphql = "2.2.0" async-mutex = "1.4.0" async-trait = "0.1.41" +bytes = "1.0" dubp = { version = "0.49.0", features = ["duniter"] } +duniter-bca = { path = "./bca" } duniter-conf = { path = "../../duniter-conf" } duniter-dbs = { path = "../../duniter-dbs" } duniter-gva-db = { path = "./db" } diff --git a/rust-libs/modules/gva/bca/Cargo.toml b/rust-libs/modules/gva/bca/Cargo.toml new file mode 100644 index 000000000..79b5e05d8 --- /dev/null +++ b/rust-libs/modules/gva/bca/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "duniter-bca" +version = "0.1.0" +authors = ["librelois <elois@duniter.org>"] +license = "AGPL-3.0" +edition = "2018" + +[dependencies] +anyhow = "1.0.33" +async-bincode = "0.6.1" +bincode = "1.3" +dubp = { version = "0.49.0", features = ["duniter"] } +duniter-bca-types = { path = "types", features = ["duniter"] } +duniter-dbs = { path = "../../../duniter-dbs" } +duniter-gva-db = { path = "../db" } +duniter-gva-dbs-reader = { path = "../dbs-reader" } +fast-threadpool = "0.2.3" +futures = "0.3.6" +once_cell = "1.5" +smallvec = { version = "1.4.0", features = ["serde", "write"] } +tokio = { version = "1.2", features = ["macros", "rt-multi-thread"] } +uninit = "0.4.0" + +[dev-dependencies] +duniter-dbs = { path = "../../../duniter-dbs", features = ["mem"] } +tokio = { version = "1.2", features = ["macros", "rt-multi-thread", "time"] } +mockall = "0.8.0" diff --git a/rust-libs/modules/gva/bca/src/exec_req_type.rs b/rust-libs/modules/gva/bca/src/exec_req_type.rs new file mode 100644 index 000000000..ea3a3a193 --- /dev/null +++ b/rust-libs/modules/gva/bca/src/exec_req_type.rs @@ -0,0 +1,45 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +mod members_count; +mod prepare_simple_payment; + +use crate::*; + +#[derive(Debug, PartialEq)] +pub(super) struct ExecReqTypeError(pub(super) String); + +impl<E> From<E> for ExecReqTypeError +where + E: ToString, +{ + fn from(e: E) -> Self { + Self(e.to_string()) + } +} + +pub(super) async fn execute_req_type( + bca_executor: &BcaExecutor, + req_type: BcaReqTypeV0, + _is_whitelisted: bool, +) -> Result<BcaRespTypeV0, ExecReqTypeError> { + match req_type { + BcaReqTypeV0::MembersCount => members_count::exec_req_members_count(bca_executor).await, + BcaReqTypeV0::PrepareSimplePayment(params) => { + prepare_simple_payment::exec_req_prepare_simple_payment(bca_executor, params).await + } + BcaReqTypeV0::Ping => Ok(BcaRespTypeV0::Pong), + } +} diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs b/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs new file mode 100644 index 000000000..07ebc2d99 --- /dev/null +++ b/rust-libs/modules/gva/bca/src/exec_req_type/members_count.rs @@ -0,0 +1,33 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use dubp::block::prelude::*; + +pub(super) async fn exec_req_members_count( + bca_executor: &BcaExecutor, +) -> Result<BcaRespTypeV0, ExecReqTypeError> { + let dbs_reader = bca_executor.dbs_reader(); + Ok(bca_executor + .dbs_pool + .execute(move |dbs| match dbs_reader.get_current_block(&dbs.cm_db) { + Ok(Some(current_block)) => { + BcaRespTypeV0::MembersCount(current_block.members_count() as u64) + } + Ok(None) => BcaRespTypeV0::Error("no blockchain".to_owned()), + Err(e) => BcaRespTypeV0::Error(e.to_string()), + }) + .await?) +} diff --git a/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs b/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs new file mode 100644 index 000000000..89bfe33ce --- /dev/null +++ b/rust-libs/modules/gva/bca/src/exec_req_type/prepare_simple_payment.rs @@ -0,0 +1,188 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software current_block_number: (), current_block_hash: (), inputs: (), inputs_sum: (): you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use dubp::wallet::prelude::*; +use duniter_bca_types::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp}; + +pub(super) async fn exec_req_prepare_simple_payment( + bca_executor: &BcaExecutor, + params: PrepareSimplePayment, +) -> Result<BcaRespTypeV0, ExecReqTypeError> { + let mut amount = params.amount; + let issuer = params.issuer; + + let dbs_reader = bca_executor.dbs_reader(); + let (amount, current_block, (inputs, inputs_sum)) = bca_executor + .dbs_pool + .execute(move |dbs| { + if let Some(current_block) = dbs_reader.get_current_block_meta(&dbs.cm_db)? { + let current_base = current_block.unit_base as i64; + if amount.base() > current_base { + Err("too long base".into()) + } else { + while amount.base() < current_base { + amount = amount.increment_base(); + } + Ok::<_, ExecReqTypeError>(( + amount, + current_block, + dbs_reader.find_inputs( + &dbs.bc_db_ro, + &dbs.txs_mp_db, + amount, + &WalletScriptV10::single(WalletConditionV10::Sig(issuer)), + false, + )?, + )) + } + } else { + Err("no blockchain".into()) + } + }) + .await??; + + if inputs_sum < amount { + return Err("insufficient balance".into()); + } + + Ok(BcaRespTypeV0::PrepareSimplePayment( + PrepareSimplePaymentResp { + current_block_number: current_block.number, + current_block_hash: current_block.hash, + inputs, + inputs_sum, + }, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::*; + + #[tokio::test] + async fn test_exec_req_prepare_simple_payment_no_blockchain() { + let mut dbs_reader = MockDbsReader::new(); + dbs_reader + .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + .times(1) + .returning(|_| Ok(None)); + let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + + let resp_res = exec_req_prepare_simple_payment( + &bca_executor, + PrepareSimplePayment { + issuer: PublicKey::default(), + amount: SourceAmount::new(42, 0), + }, + ) + .await; + + assert_eq!(resp_res, Err(ExecReqTypeError("no blockchain".into()))); + } + + #[tokio::test] + async fn test_exec_req_prepare_simple_payment_too_long_base() { + let mut dbs_reader = MockDbsReader::new(); + dbs_reader + .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + .times(1) + .returning(|_| Ok(Some(BlockMetaV2::default()))); + let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + + let resp_res = exec_req_prepare_simple_payment( + &bca_executor, + PrepareSimplePayment { + issuer: PublicKey::default(), + amount: SourceAmount::new(42, 1), + }, + ) + .await; + + assert_eq!(resp_res, Err(ExecReqTypeError("too long base".into()))); + } + + #[tokio::test] + async fn test_exec_req_prepare_simple_payment_insufficient_balance() { + let mut dbs_reader = MockDbsReader::new(); + dbs_reader + .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + .times(1) + .returning(|_| Ok(Some(BlockMetaV2::default()))); + dbs_reader + .expect_find_inputs::<BcV2DbRo<FileBackend>, TxsMpV2Db<FileBackend>>() + .times(1) + .returning(|_, _, _, _, _| Ok((vec![], SourceAmount::default()))); + let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + + let resp_res = exec_req_prepare_simple_payment( + &bca_executor, + PrepareSimplePayment { + issuer: PublicKey::default(), + amount: SourceAmount::new(42, 0), + }, + ) + .await; + + assert_eq!( + resp_res, + Err(ExecReqTypeError("insufficient balance".into())) + ); + } + + #[tokio::test] + async fn test_exec_req_prepare_simple_payment_ok() -> Result<(), ExecReqTypeError> { + let input = TransactionInputV10 { + amount: SourceAmount::with_base0(57), + id: SourceIdV10::Utxo(UtxoIdV10 { + tx_hash: Hash::default(), + output_index: 3, + }), + }; + + let mut dbs_reader = MockDbsReader::new(); + dbs_reader + .expect_get_current_block_meta::<CmV1Db<MemSingleton>>() + .times(1) + .returning(|_| Ok(Some(BlockMetaV2::default()))); + dbs_reader + .expect_find_inputs::<BcV2DbRo<FileBackend>, TxsMpV2Db<FileBackend>>() + .times(1) + .returning(move |_, _, _, _, _| Ok((vec![input], SourceAmount::with_base0(57)))); + let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + + let resp = exec_req_prepare_simple_payment( + &bca_executor, + PrepareSimplePayment { + issuer: PublicKey::default(), + amount: SourceAmount::new(42, 0), + }, + ) + .await?; + + assert_eq!( + resp, + BcaRespTypeV0::PrepareSimplePayment(PrepareSimplePaymentResp { + current_block_number: 0, + current_block_hash: Hash::default(), + inputs: vec![input], + inputs_sum: SourceAmount::with_base0(57), + }) + ); + + Ok(()) + } +} diff --git a/rust-libs/modules/gva/bca/src/lib.rs b/rust-libs/modules/gva/bca/src/lib.rs new file mode 100644 index 000000000..0d34ff6c7 --- /dev/null +++ b/rust-libs/modules/gva/bca/src/lib.rs @@ -0,0 +1,385 @@ +// Copyright (C) 2020 Éloïs req_id: (), resp_type: ()SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +#![deny( + clippy::unwrap_used, + missing_copy_implementations, + trivial_casts, + trivial_numeric_casts, + unstable_features, + unused_import_braces +)] + +mod exec_req_type; +mod utils; + +const RESP_MIN_SIZE: usize = 64; +type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>; + +use crate::exec_req_type::ExecReqTypeError; +use crate::utils::AsyncReader; +use async_bincode::AsyncBincodeReader; +use bincode::Options as _; +use duniter_bca_types::{BcaReq, BcaReqTypeV0, BcaResp, BcaRespTypeV0, BcaRespV0, ReqExecError}; +use duniter_dbs::{FileBackend, SharedDbs}; +use futures::{prelude::stream::FuturesUnordered, StreamExt, TryStream, TryStreamExt}; +use once_cell::sync::OnceCell; +use smallvec::SmallVec; +use tokio::task::JoinError; + +#[cfg(test)] +use crate::tests::DbsReader; +#[cfg(not(test))] +use duniter_gva_dbs_reader::DbsReader; + +static BCA_EXECUTOR: OnceCell<BcaExecutor> = OnceCell::new(); + +pub fn set_bca_executor( + dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>, + dbs_reader: DbsReader, +) { + BCA_EXECUTOR + .set(BcaExecutor { + dbs_pool, + dbs_reader, + }) + .unwrap_or_else(|_| panic!("BCA_EXECUTOR already set !")) +} + +#[cfg(not(test))] +pub async fn execute<R: 'static + futures::AsyncRead + Send + Unpin>( + query_body_reader: R, + is_whitelisted: bool, +) -> Vec<u8> { + unsafe { + BCA_EXECUTOR + .get_unchecked() + .execute(query_body_reader, is_whitelisted) + .await + } +} + +#[derive(Clone)] +struct BcaExecutor { + dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>, + dbs_reader: DbsReader, +} +use uninit::extension_traits::VecCapacity; +impl BcaExecutor { + pub async fn execute<R: futures::AsyncRead + Send + Unpin>( + &self, + query_body_reader: R, + is_whitelisted: bool, + ) -> Vec<u8> { + let async_bincode_reader = + AsyncBincodeReader::<AsyncReader<R>, BcaReq>::from(AsyncReader(query_body_reader)); + self.execute_inner(async_bincode_reader, is_whitelisted) + .await + .into_iter() + .fold(Vec::new(), |mut vec, elem| { + // Write resp len + let out = vec.reserve_uninit(4); + out.copy_from_slice(&u32::to_be_bytes(elem.len() as u32)[..]); + unsafe { + // # Safety + // + // - `.copy_from_slice()` contract guarantees initialization + // of `out`, which, in turn, from `reserve_uninit`'s contract, + // leads to the `vec` extra capacity having been initialized. + vec.set_len(vec.len() + 4); + } + + // Write resp content + let out = vec.reserve_uninit(elem.len()); + out.copy_from_slice(&elem[..]); + unsafe { + // # Safety + // + // - `.copy_from_slice()` contract guarantees initialization + // of `out`, which, in turn, from `reserve_uninit`'s contract, + // leads to the `vec` extra capacity having been initialized. + vec.set_len(vec.len() + elem.len()); + } + vec + }) + } + async fn execute_inner( + &self, + stream: impl TryStream<Ok = BcaReq, Error = bincode::Error>, + is_whitelisted: bool, + ) -> Vec<RespBytes> { + match stream + .map_ok(|req| { + let self_clone = self.clone(); + tokio::spawn(async move { self_clone.execute_req(req, is_whitelisted).await }) + }) + .try_collect::<FuturesUnordered<_>>() + .await + { + Ok(futures_unordered) => { + futures_unordered + .map(|req_res: Result<BcaResp, JoinError>| { + let resp = match req_res { + Ok(resp) => Ok(resp), + Err(e) => Err(if e.is_cancelled() { + ReqExecError::Cancelled + } else if e.is_panic() { + ReqExecError::Panic + } else { + ReqExecError::Unknown + }), + }; + let mut resp_buffer = RespBytes::new(); + bincode_opts() + .serialize_into(&mut resp_buffer, &resp) + .expect("unreachable"); + resp_buffer + }) + .collect() + .await + } + Err(e) => { + let req_res: Result<BcaResp, ReqExecError> = + Err(ReqExecError::InvalidReq(e.to_string())); + let mut resp_buffer = RespBytes::new(); + bincode_opts() + .serialize_into(&mut resp_buffer, &req_res) + .expect("unreachable"); + vec![resp_buffer] + } + } + } + + #[inline(always)] + async fn execute_req(self, req: BcaReq, is_whitelisted: bool) -> BcaResp { + match req { + BcaReq::V0(req) => BcaResp::V0(BcaRespV0 { + req_id: req.req_id, + resp_type: match crate::exec_req_type::execute_req_type( + &self, + req.req_type, + is_whitelisted, + ) + .await + { + Ok(resp_type) => resp_type, + Err(e) => BcaRespTypeV0::Error(e.0), + }, + }), + _ => BcaResp::UnsupportedVersion, + } + } +} + +#[cfg(not(test))] +impl BcaExecutor { + #[inline(always)] + pub fn dbs_reader(&self) -> DbsReader { + self.dbs_reader + } +} + +pub(crate) fn bincode_opts() -> impl bincode::Options { + bincode::options() + .with_limit(u32::max_value() as u64) + .allow_trailing_bytes() +} + +#[cfg(test)] +mod tests { + use super::*; + pub use dubp::block::prelude::*; + pub use dubp::crypto::hashs::Hash; + pub use dubp::crypto::keys::ed25519::PublicKey; + pub use dubp::documents::transaction::TransactionInputV10; + pub use dubp::wallet::prelude::*; + pub use duniter_bca_types::BcaReqV0; + pub use duniter_dbs::databases::bc_v2::{BcV2DbReadable, BcV2DbRo}; + pub use duniter_dbs::databases::cm_v1::{CmV1Db, CmV1DbReadable}; + pub use duniter_dbs::databases::txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable}; + pub use duniter_dbs::kv_typed::prelude::*; + pub use duniter_dbs::BlockMetaV2; + pub use futures::TryStreamExt; + + mockall::mock! { + pub DbsReader { + fn find_inputs<BcDb: 'static + BcV2DbReadable, TxsMpDb: 'static + TxsMpV2DbReadable>( + &self, + bc_db: &BcDb, + txs_mp_db: &TxsMpDb, + amount: SourceAmount, + script: &WalletScriptV10, + use_mempool_sources: bool, + ) -> anyhow::Result<(Vec<TransactionInputV10>, SourceAmount)>; + fn get_current_block<CmDb: 'static + CmV1DbReadable>( + &self, + cm_db: &CmDb, + ) -> KvResult<Option<DubpBlockV10>>; + fn get_current_block_meta<CmDb: 'static + CmV1DbReadable>( + &self, + cm_db: &CmDb, + ) -> KvResult<Option<BlockMetaV2>>; + } + } + + pub type DbsReader = duniter_dbs::kv_typed::prelude::Arc<MockDbsReader>; + + impl BcaExecutor { + #[inline(always)] + pub fn dbs_reader(&self) -> DbsReader { + self.dbs_reader.clone() + } + } + + pub(crate) fn create_bca_executor(mock_dbs_reader: MockDbsReader) -> KvResult<BcaExecutor> { + let dbs = SharedDbs::mem()?; + let threadpool = + fast_threadpool::ThreadPool::start(fast_threadpool::ThreadPoolConfig::low(), dbs); + Ok(BcaExecutor { + dbs_pool: threadpool.into_async_handler(), + dbs_reader: duniter_dbs::kv_typed::prelude::Arc::new(mock_dbs_reader), + }) + } + + #[tokio::test] + async fn test_one_req_ok() -> Result<(), bincode::Error> { + let req = BcaReq::V0(BcaReqV0 { + req_id: 42, + req_type: BcaReqTypeV0::MembersCount, + }); + assert_eq!(bincode_opts().serialized_size(&req)?, 3); + let mut bytes = [0u8; 7]; + + bincode_opts().serialize_into(&mut bytes[4..], &req)?; + bytes[3] = 3; + + use bincode::Options; + //println!("bytes_for_bincode={:?}", &bytes[4..]); + assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?); + + let mut dbs_reader = MockDbsReader::new(); + dbs_reader + .expect_get_current_block::<CmV1Db<MemSingleton>>() + .times(1) + .returning(|_| Ok(Some(DubpBlockV10::default()))); + let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + + //println!("bytes={:?}", bytes); + let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await; + //println!("bytes_res={:?}", bytes_res); + let bca_res: Vec<Result<BcaResp, ReqExecError>> = + AsyncBincodeReader::<_, Result<BcaResp, ReqExecError>>::from(&bytes_res[..]) + .try_collect::<Vec<_>>() + .await?; + + assert_eq!( + bca_res, + vec![Ok(BcaResp::V0(BcaRespV0 { + req_id: 42, + resp_type: BcaRespTypeV0::MembersCount(0) + }))] + ); + + Ok(()) + } + + #[tokio::test] + async fn test_one_req_invalid() -> Result<(), bincode::Error> { + let req = BcaReq::V0(BcaReqV0 { + req_id: 42, + req_type: BcaReqTypeV0::MembersCount, + }); + assert_eq!(bincode_opts().serialized_size(&req)?, 3); + let mut bytes = [0u8; 7]; + + bincode_opts().serialize_into(&mut bytes[4..], &req)?; + bytes[3] = 2; + + use bincode::Options; + //println!("bytes_for_bincode={:?}", &bytes[4..]); + assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?); + + let bca_executor = + create_bca_executor(MockDbsReader::new()).expect("fail to create bca executor"); + + //println!("bytes={:?}", bytes); + let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await; + //println!("bytes_res={:?}", bytes_res); + let bca_res: Vec<Result<BcaResp, ReqExecError>> = + AsyncBincodeReader::<_, Result<BcaResp, ReqExecError>>::from(&bytes_res[..]) + .try_collect::<Vec<_>>() + .await?; + + assert_eq!( + bca_res, + vec![Err(ReqExecError::InvalidReq( + "io error: unexpected end of file".to_owned() + ))] + ); + + Ok(()) + } + + #[tokio::test] + async fn test_two_reqs_ok() -> Result<(), bincode::Error> { + let req1 = BcaReq::V0(BcaReqV0 { + req_id: 42, + req_type: BcaReqTypeV0::Ping, + }); + assert_eq!(bincode_opts().serialized_size(&req1)?, 3); + let req2 = BcaReq::V0(BcaReqV0 { + req_id: 57, + req_type: BcaReqTypeV0::MembersCount, + }); + assert_eq!(bincode_opts().serialized_size(&req2)?, 3); + + let mut bytes = [0u8; 14]; + bincode_opts().serialize_into(&mut bytes[4..], &req1)?; + bytes[3] = 3; + bincode_opts().serialize_into(&mut bytes[11..], &req2)?; + bytes[10] = 3; + + let mut dbs_reader = MockDbsReader::new(); + dbs_reader + .expect_get_current_block::<CmV1Db<MemSingleton>>() + .times(1) + .returning(|_| Ok(Some(DubpBlockV10::default()))); + let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); + + //println!("bytes={:?}", bytes); + let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await; + //println!("bytes_res={:?}", bytes_res); + let bca_res: Vec<Result<BcaResp, ReqExecError>> = + AsyncBincodeReader::<_, Result<BcaResp, ReqExecError>>::from(&bytes_res[..]) + .try_collect::<Vec<_>>() + .await?; + + assert_eq!( + bca_res, + vec![ + Ok(BcaResp::V0(BcaRespV0 { + req_id: 42, + resp_type: BcaRespTypeV0::Pong + })), + Ok(BcaResp::V0(BcaRespV0 { + req_id: 57, + resp_type: BcaRespTypeV0::MembersCount(0) + })) + ] + ); + + Ok(()) + } +} diff --git a/rust-libs/modules/gva/bca/src/utils.rs b/rust-libs/modules/gva/bca/src/utils.rs new file mode 100644 index 000000000..88a4e9b55 --- /dev/null +++ b/rust-libs/modules/gva/bca/src/utils.rs @@ -0,0 +1,33 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +pub(crate) struct AsyncReader<R: futures::AsyncRead + Unpin>(pub(crate) R); + +impl<R: futures::AsyncRead + Unpin> tokio::io::AsyncRead for AsyncReader<R> { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + match <R as futures::AsyncRead>::poll_read( + std::pin::Pin::new(&mut self.0), + cx, + buf.initialize_unfilled(), + ) { + std::task::Poll::Ready(res) => std::task::Poll::Ready(res.map(|n| buf.advance(n))), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} diff --git a/rust-libs/modules/gva/bca/types/Cargo.toml b/rust-libs/modules/gva/bca/types/Cargo.toml new file mode 100644 index 000000000..1502f7869 --- /dev/null +++ b/rust-libs/modules/gva/bca/types/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "duniter-bca-types" +version = "0.1.0" +authors = ["librelois <elois@duniter.org>"] +license = "AGPL-3.0" +edition = "2018" + +[dependencies] +dubp = { version = "0.49.0" } +serde = { version = "1.0.105", features = ["derive"] } +smallvec = { version = "1.4.0", features = ["serde"] } + +[features] +default = ["duniter"] + +client = ["dubp/client"] +duniter = ["dubp/duniter"] diff --git a/rust-libs/modules/gva/bca/types/src/lib.rs b/rust-libs/modules/gva/bca/types/src/lib.rs new file mode 100644 index 000000000..b9cef49b3 --- /dev/null +++ b/rust-libs/modules/gva/bca/types/src/lib.rs @@ -0,0 +1,81 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +#![deny( + clippy::unwrap_used, + missing_copy_implementations, + trivial_casts, + trivial_numeric_casts, + unstable_features, + unused_import_braces +)] + +pub mod prepare_payment; + +use crate::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp}; +use dubp::crypto::hashs::Hash; +use dubp::crypto::keys::ed25519::PublicKey; +use dubp::wallet::prelude::*; +use serde::{Deserialize, Serialize}; +//use smallvec::SmallVec; + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub enum BcaReq { + V0(BcaReqV0), + _V1, +} + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct BcaReqV0 { + pub req_id: usize, + pub req_type: BcaReqTypeV0, +} + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub enum BcaReqTypeV0 { + MembersCount, + PrepareSimplePayment(PrepareSimplePayment), + Ping, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub enum BcaResp { + V0(BcaRespV0), + UnsupportedVersion, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct BcaRespV0 { + pub req_id: usize, + pub resp_type: BcaRespTypeV0, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub enum BcaRespTypeV0 { + Error(String), + MembersCount(u64), + PrepareSimplePayment(PrepareSimplePaymentResp), + Pong, +} + +pub type BcaResult = Vec<Result<BcaResp, ReqExecError>>; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub enum ReqExecError { + Cancelled, + InvalidReq(String), + Panic, + Unknown, +} diff --git a/rust-libs/modules/gva/bca/types/src/prepare_payment.rs b/rust-libs/modules/gva/bca/types/src/prepare_payment.rs new file mode 100644 index 000000000..d35c6f204 --- /dev/null +++ b/rust-libs/modules/gva/bca/types/src/prepare_payment.rs @@ -0,0 +1,31 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::*; +use dubp::documents::transaction::TransactionInputV10; + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct PrepareSimplePayment { + pub issuer: PublicKey, + pub amount: SourceAmount, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct PrepareSimplePaymentResp { + pub current_block_number: u32, + pub current_block_hash: Hash, + pub inputs: Vec<TransactionInputV10>, + pub inputs_sum: SourceAmount, +} diff --git a/rust-libs/modules/gva/src/lib.rs b/rust-libs/modules/gva/src/lib.rs index 0a06e10c8..fbc44ce7e 100644 --- a/rust-libs/modules/gva/src/lib.rs +++ b/rust-libs/modules/gva/src/lib.rs @@ -235,7 +235,11 @@ impl GvaModule { software_version: &'static str, ) { log::info!("GvaServer::start: conf={:?}", conf); - let schema = duniter_gva_gql::build_schema_with_data( + duniter_bca::set_bca_executor( + dbs_pool.clone(), + duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), + ); + let gva_schema = duniter_gva_gql::build_schema_with_data( duniter_gva_gql::GvaSchemaData { dbs_reader: duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), dbs_pool, @@ -251,7 +255,7 @@ impl GvaModule { let graphql_post = warp_::graphql( &conf, - schema.clone(), + gva_schema.clone(), async_graphql::http::MultipartOptions::default(), ); @@ -273,7 +277,7 @@ impl GvaModule { let routes = graphql_playground .or(graphql_post) - .or(warp_::graphql_ws(&conf, schema.clone())) + .or(warp_::graphql_ws(&conf, gva_schema.clone())) .recover(|err: Rejection| async move { if let Some(warp_::BadRequest(err)) = err.find() { return Ok::<_, Infallible>(warp::reply::with_status( diff --git a/rust-libs/modules/gva/src/warp_.rs b/rust-libs/modules/gva/src/warp_.rs index 4e4d9aae5..0758d843d 100644 --- a/rust-libs/modules/gva/src/warp_.rs +++ b/rust-libs/modules/gva/src/warp_.rs @@ -13,11 +13,16 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use std::net::{IpAddr, SocketAddr}; +use std::{ + net::{IpAddr, SocketAddr}, + time::Duration, +}; use crate::anti_spam::{AntiSpam, AntiSpamResponse}; use crate::*; +const MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS: u64 = 5_000; + pub struct BadRequest(pub anyhow::Error); impl std::fmt::Debug for BadRequest { @@ -28,6 +33,16 @@ impl std::fmt::Debug for BadRequest { impl warp::reject::Reject for BadRequest {} +pub struct ReqExecTooLong; + +impl std::fmt::Debug for ReqExecTooLong { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "server error: request execution too long") + } +} + +impl warp::reject::Reject for ReqExecTooLong {} + struct GraphQlRequest { inner: async_graphql::BatchRequest, } @@ -78,17 +93,26 @@ impl GraphQlRequest { } } -struct GraphQlResponse(async_graphql::BatchResponse); -impl warp::reply::Reply for GraphQlResponse { +enum ServerResponse { + Bincode(Vec<u8>), + GraphQl(async_graphql::BatchResponse), +} + +impl warp::reply::Reply for ServerResponse { fn into_response(self) -> warp::reply::Response { - let mut resp = warp::reply::with_header( - warp::reply::json(&self.0), - "content-type", - "application/json", - ) - .into_response(); - add_cache_control_batch(&mut resp, &self.0); - resp + match self { + ServerResponse::Bincode(bytes) => bytes.into_response(), + ServerResponse::GraphQl(gql_batch_resp) => { + let mut resp = warp::reply::with_header( + warp::reply::json(&gql_batch_resp), + "content-type", + "application/json", + ) + .into_response(); + add_cache_control_batch(&mut resp, &gql_batch_resp); + resp + } + } } } @@ -118,7 +142,7 @@ fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql pub(crate) fn graphql( conf: &GvaConf, - schema: GvaSchema, + gva_schema: GvaSchema, opts: async_graphql::http::MultipartOptions, ) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone { let anti_spam = AntiSpam::from(conf); @@ -130,19 +154,19 @@ pub(crate) fn graphql( .and(warp::header::optional::<IpAddr>("X-Real-IP")) .and(warp::header::optional::<String>("content-type")) .and(warp::body::stream()) - .and(warp::any().map(move || opts.clone())) - .and(warp::any().map(move || schema.clone())) .and(warp::any().map(move || anti_spam.clone())) + .and(warp::any().map(move || gva_schema.clone())) + .and(warp::any().map(move || opts.clone())) .and_then( |method, query: String, remote_addr: Option<SocketAddr>, x_real_ip: Option<IpAddr>, - content_type, + content_type: Option<String>, body, - opts: Arc<async_graphql::http::MultipartOptions>, - schema, - anti_spam: AntiSpam| async move { + anti_spam: AntiSpam, + gva_schema: GvaSchema, + opts: Arc<async_graphql::http::MultipartOptions>| async move { let AntiSpamResponse { is_whitelisted, is_ok, @@ -153,36 +177,40 @@ pub(crate) fn graphql( if method == http::Method::GET { let request: async_graphql::Request = serde_urlencoded::from_str(&query) .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; - Ok::<_, Rejection>(( - schema, - GraphQlRequest::single(request.data(QueryContext { is_whitelisted })), + Ok(ServerResponse::GraphQl( + GraphQlRequest::single(request.data(QueryContext { is_whitelisted })) + .execute(gva_schema) + .await, )) } else { - let batch_request = GraphQlRequest::new( - async_graphql::http::receive_batch_body( - content_type, - futures::TryStreamExt::map_err(body, |err| { - std::io::Error::new(std::io::ErrorKind::Other, err) - }) - .map_ok(|mut buf| { - let remaining = warp::Buf::remaining(&buf); - warp::Buf::copy_to_bytes(&mut buf, remaining) - }) - .into_async_read(), - async_graphql::http::MultipartOptions::clone(&opts), + let body_reader = futures::TryStreamExt::map_err(body, |err| { + std::io::Error::new(std::io::ErrorKind::Other, err) + }) + .map_ok(|mut buf| { + let remaining = warp::Buf::remaining(&buf); + warp::Buf::copy_to_bytes(&mut buf, remaining) + }) + .into_async_read(); + if content_type.as_deref() == Some("application/bincode") { + tokio::time::timeout( + Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS), + process_bincode_batch_queries(body_reader, is_whitelisted), ) .await - .map_err(|err| warp::reject::custom(BadRequest(err.into())))?, - ); - if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE { - Ok::<_, Rejection>(( - schema, - batch_request.data(QueryContext { is_whitelisted }), - )) + .map_err(|_| warp::reject::custom(ReqExecTooLong))? } else { - Err(warp::reject::custom(BadRequest(anyhow::Error::msg( - r#"{ "error": "The batch contains too many requests" }"#, - )))) + tokio::time::timeout( + Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS), + process_json_batch_queries( + body_reader, + content_type, + gva_schema, + is_whitelisted, + *opts, + ), + ) + .await + .map_err(|_| warp::reject::custom(ReqExecTooLong))? } } } else { @@ -192,11 +220,45 @@ pub(crate) fn graphql( } }, ) - .and_then( - |(schema, batch_requests): (GvaSchema, GraphQlRequest)| async move { - Ok::<_, Infallible>(GraphQlResponse(batch_requests.execute(schema).await)) - }, +} + +async fn process_bincode_batch_queries( + body_reader: impl 'static + futures::AsyncRead + Send + Unpin, + is_whitelisted: bool, +) -> Result<ServerResponse, warp::Rejection> { + Ok(ServerResponse::Bincode( + duniter_bca::execute(body_reader, is_whitelisted).await, + )) +} + +async fn process_json_batch_queries( + body_reader: impl 'static + futures::AsyncRead + Send + Unpin, + content_type: Option<String>, + gva_schema: GvaSchema, + is_whitelisted: bool, + opts: async_graphql::http::MultipartOptions, +) -> Result<ServerResponse, warp::Rejection> { + let batch_request = GraphQlRequest::new( + async_graphql::http::receive_batch_body( + content_type, + body_reader, + async_graphql::http::MultipartOptions::clone(&opts), ) + .await + .map_err(|err| warp::reject::custom(BadRequest(err.into())))?, + ); + if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE { + Ok(ServerResponse::GraphQl( + batch_request + .data(QueryContext { is_whitelisted }) + .execute(gva_schema) + .await, + )) + } else { + Err(warp::reject::custom(BadRequest(anyhow::Error::msg( + r#"{ "error": "The batch contains too many requests" }"#, + )))) + } } pub(crate) fn graphql_ws( -- GitLab