Skip to content
Snippets Groups Projects
Commit 02a24c84 authored by Éloïs's avatar Éloïs
Browse files

[feat] gva: add subset bca

parent b5cd4abe
Branches
No related tags found
1 merge request!1364Bca
Showing
with 1060 additions and 50 deletions
......@@ -25,6 +25,37 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e"
[[package]]
name = "aes"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561"
dependencies = [
"aes-soft",
"aesni",
"cipher",
]
[[package]]
name = "aes-soft"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be14c7498ea50828a38d0e24a765ed2effe92a705885b57d029cd67d45744072"
dependencies = [
"cipher",
"opaque-debug 0.3.0",
]
[[package]]
name = "aesni"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2e11f5e94c2f7d386164cc2aa1f97823fed6f259e486940a71c174dd01b0ce"
dependencies = [
"cipher",
"opaque-debug 0.3.0",
]
[[package]]
name = "ahash"
version = "0.3.8"
......@@ -83,6 +114,21 @@ dependencies = [
"syn",
]
[[package]]
name = "async-bincode"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a31c08aa335b3ab414d29bdefe1f4353408abf93f3db1e3e2cc78d3ec4f0d43"
dependencies = [
"bincode",
"byteorder",
"bytes 1.0.1",
"futures-core",
"futures-sink",
"serde",
"tokio",
]
[[package]]
name = "async-channel"
version = "1.5.1"
......@@ -603,6 +649,15 @@ dependencies = [
"envmnt",
]
[[package]]
name = "cipher"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12f8e7987cbd042a63249497f41aed09f8e65add917ea6566effbc56578d6801"
dependencies = [
"generic-array 0.14.4",
]
[[package]]
name = "clap"
version = "2.33.3"
......@@ -1083,6 +1138,36 @@ dependencies = [
"smallvec",
]
[[package]]
name = "duniter-bca"
version = "0.1.0"
dependencies = [
"anyhow",
"async-bincode",
"bincode",
"dubp",
"duniter-bca-types",
"duniter-dbs",
"duniter-gva-db",
"duniter-gva-dbs-reader",
"fast-threadpool",
"futures",
"mockall",
"once_cell",
"smallvec",
"tokio",
"uninit",
]
[[package]]
name = "duniter-bca-types"
version = "0.1.0"
dependencies = [
"dubp",
"serde",
"smallvec",
]
[[package]]
name = "duniter-conf"
version = "0.1.0"
......@@ -1165,7 +1250,9 @@ dependencies = [
"async-graphql",
"async-mutex",
"async-trait",
"bytes 1.0.1",
"dubp",
"duniter-bca",
"duniter-conf",
"duniter-dbs",
"duniter-gva-db",
......@@ -1376,12 +1463,16 @@ version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0be04829b31b18bacf5317001366d807e5fbd02085ee6348508c1299b5bcaf6c"
dependencies = [
"aes",
"arrayvec",
"base64 0.13.0",
"blake3",
"bs58 0.4.0",
"byteorder",
"cryptoxide",
"ed25519-bip32",
"getrandom 0.2.2",
"once_cell",
"ring",
"serde",
"thiserror",
......@@ -1389,6 +1480,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "ed25519-bip32"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8827180a2b511141fbe49141e50b31a8d542465e0fb572f81f36feea2addfe92"
dependencies = [
"cryptoxide",
]
[[package]]
name = "either"
version = "1.6.1"
......
......@@ -42,6 +42,8 @@ members = [
"rust-libs/duniter-module",
"rust-libs/duniter-server",
"rust-libs/modules/gva",
"rust-libs/modules/gva/bca",
"rust-libs/modules/gva/bca/types",
"rust-libs/modules/gva/dbs-reader",
"rust-libs/modules/gva/gql",
"rust-libs/modules/gva/indexer",
......
......@@ -11,7 +11,9 @@ arrayvec = "0.5.1"
async-graphql = "2.2.0"
async-mutex = "1.4.0"
async-trait = "0.1.41"
bytes = "1.0"
dubp = { version = "0.49.0", features = ["duniter"] }
duniter-bca = { path = "./bca" }
duniter-conf = { path = "../../duniter-conf" }
duniter-dbs = { path = "../../duniter-dbs" }
duniter-gva-db = { path = "./db" }
......
[package]
name = "duniter-bca"
version = "0.1.0"
authors = ["librelois <elois@duniter.org>"]
license = "AGPL-3.0"
edition = "2018"
[dependencies]
anyhow = "1.0.33"
async-bincode = "0.6.1"
bincode = "1.3"
dubp = { version = "0.49.0", features = ["duniter"] }
duniter-bca-types = { path = "types", features = ["duniter"] }
duniter-dbs = { path = "../../../duniter-dbs" }
duniter-gva-db = { path = "../db" }
duniter-gva-dbs-reader = { path = "../dbs-reader" }
fast-threadpool = "0.2.3"
futures = "0.3.6"
once_cell = "1.5"
smallvec = { version = "1.4.0", features = ["serde", "write"] }
tokio = { version = "1.2", features = ["macros", "rt-multi-thread"] }
uninit = "0.4.0"
[dev-dependencies]
duniter-dbs = { path = "../../../duniter-dbs", features = ["mem"] }
tokio = { version = "1.2", features = ["macros", "rt-multi-thread", "time"] }
mockall = "0.8.0"
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
mod members_count;
mod prepare_simple_payment;
use crate::*;
#[derive(Debug, PartialEq)]
pub(super) struct ExecReqTypeError(pub(super) String);
impl<E> From<E> for ExecReqTypeError
where
E: ToString,
{
fn from(e: E) -> Self {
Self(e.to_string())
}
}
pub(super) async fn execute_req_type(
bca_executor: &BcaExecutor,
req_type: BcaReqTypeV0,
_is_whitelisted: bool,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
match req_type {
BcaReqTypeV0::MembersCount => members_count::exec_req_members_count(bca_executor).await,
BcaReqTypeV0::PrepareSimplePayment(params) => {
prepare_simple_payment::exec_req_prepare_simple_payment(bca_executor, params).await
}
BcaReqTypeV0::Ping => Ok(BcaRespTypeV0::Pong),
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::block::prelude::*;
pub(super) async fn exec_req_members_count(
bca_executor: &BcaExecutor,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
let dbs_reader = bca_executor.dbs_reader();
Ok(bca_executor
.dbs_pool
.execute(move |dbs| match dbs_reader.get_current_block(&dbs.cm_db) {
Ok(Some(current_block)) => {
BcaRespTypeV0::MembersCount(current_block.members_count() as u64)
}
Ok(None) => BcaRespTypeV0::Error("no blockchain".to_owned()),
Err(e) => BcaRespTypeV0::Error(e.to_string()),
})
.await?)
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software current_block_number: (), current_block_hash: (), inputs: (), inputs_sum: (): you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::wallet::prelude::*;
use duniter_bca_types::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp};
pub(super) async fn exec_req_prepare_simple_payment(
bca_executor: &BcaExecutor,
params: PrepareSimplePayment,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
let mut amount = params.amount;
let issuer = params.issuer;
let dbs_reader = bca_executor.dbs_reader();
let (amount, current_block, (inputs, inputs_sum)) = bca_executor
.dbs_pool
.execute(move |dbs| {
if let Some(current_block) = dbs_reader.get_current_block_meta(&dbs.cm_db)? {
let current_base = current_block.unit_base as i64;
if amount.base() > current_base {
Err("too long base".into())
} else {
while amount.base() < current_base {
amount = amount.increment_base();
}
Ok::<_, ExecReqTypeError>((
amount,
current_block,
dbs_reader.find_inputs(
&dbs.bc_db_ro,
&dbs.txs_mp_db,
amount,
&WalletScriptV10::single(WalletConditionV10::Sig(issuer)),
false,
)?,
))
}
} else {
Err("no blockchain".into())
}
})
.await??;
if inputs_sum < amount {
return Err("insufficient balance".into());
}
Ok(BcaRespTypeV0::PrepareSimplePayment(
PrepareSimplePaymentResp {
current_block_number: current_block.number,
current_block_hash: current_block.hash,
inputs,
inputs_sum,
},
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::*;
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_no_blockchain() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(None));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 0),
},
)
.await;
assert_eq!(resp_res, Err(ExecReqTypeError("no blockchain".into())));
}
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_too_long_base() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 1),
},
)
.await;
assert_eq!(resp_res, Err(ExecReqTypeError("too long base".into())));
}
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_insufficient_balance() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_find_inputs::<BcV2DbRo<FileBackend>, TxsMpV2Db<FileBackend>>()
.times(1)
.returning(|_, _, _, _, _| Ok((vec![], SourceAmount::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 0),
},
)
.await;
assert_eq!(
resp_res,
Err(ExecReqTypeError("insufficient balance".into()))
);
}
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_ok() -> Result<(), ExecReqTypeError> {
let input = TransactionInputV10 {
amount: SourceAmount::with_base0(57),
id: SourceIdV10::Utxo(UtxoIdV10 {
tx_hash: Hash::default(),
output_index: 3,
}),
};
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_find_inputs::<BcV2DbRo<FileBackend>, TxsMpV2Db<FileBackend>>()
.times(1)
.returning(move |_, _, _, _, _| Ok((vec![input], SourceAmount::with_base0(57))));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 0),
},
)
.await?;
assert_eq!(
resp,
BcaRespTypeV0::PrepareSimplePayment(PrepareSimplePaymentResp {
current_block_number: 0,
current_block_hash: Hash::default(),
inputs: vec![input],
inputs_sum: SourceAmount::with_base0(57),
})
);
Ok(())
}
}
// Copyright (C) 2020 Éloïs req_id: (), resp_type: ()SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![deny(
clippy::unwrap_used,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unstable_features,
unused_import_braces
)]
mod exec_req_type;
mod utils;
const RESP_MIN_SIZE: usize = 64;
type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>;
use crate::exec_req_type::ExecReqTypeError;
use crate::utils::AsyncReader;
use async_bincode::AsyncBincodeReader;
use bincode::Options as _;
use duniter_bca_types::{BcaReq, BcaReqTypeV0, BcaResp, BcaRespTypeV0, BcaRespV0, ReqExecError};
use duniter_dbs::{FileBackend, SharedDbs};
use futures::{prelude::stream::FuturesUnordered, StreamExt, TryStream, TryStreamExt};
use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use tokio::task::JoinError;
#[cfg(test)]
use crate::tests::DbsReader;
#[cfg(not(test))]
use duniter_gva_dbs_reader::DbsReader;
static BCA_EXECUTOR: OnceCell<BcaExecutor> = OnceCell::new();
pub fn set_bca_executor(
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
dbs_reader: DbsReader,
) {
BCA_EXECUTOR
.set(BcaExecutor {
dbs_pool,
dbs_reader,
})
.unwrap_or_else(|_| panic!("BCA_EXECUTOR already set !"))
}
#[cfg(not(test))]
pub async fn execute<R: 'static + futures::AsyncRead + Send + Unpin>(
query_body_reader: R,
is_whitelisted: bool,
) -> Vec<u8> {
unsafe {
BCA_EXECUTOR
.get_unchecked()
.execute(query_body_reader, is_whitelisted)
.await
}
}
#[derive(Clone)]
struct BcaExecutor {
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
dbs_reader: DbsReader,
}
use uninit::extension_traits::VecCapacity;
impl BcaExecutor {
pub async fn execute<R: futures::AsyncRead + Send + Unpin>(
&self,
query_body_reader: R,
is_whitelisted: bool,
) -> Vec<u8> {
let async_bincode_reader =
AsyncBincodeReader::<AsyncReader<R>, BcaReq>::from(AsyncReader(query_body_reader));
self.execute_inner(async_bincode_reader, is_whitelisted)
.await
.into_iter()
.fold(Vec::new(), |mut vec, elem| {
// Write resp len
let out = vec.reserve_uninit(4);
out.copy_from_slice(&u32::to_be_bytes(elem.len() as u32)[..]);
unsafe {
// # Safety
//
// - `.copy_from_slice()` contract guarantees initialization
// of `out`, which, in turn, from `reserve_uninit`'s contract,
// leads to the `vec` extra capacity having been initialized.
vec.set_len(vec.len() + 4);
}
// Write resp content
let out = vec.reserve_uninit(elem.len());
out.copy_from_slice(&elem[..]);
unsafe {
// # Safety
//
// - `.copy_from_slice()` contract guarantees initialization
// of `out`, which, in turn, from `reserve_uninit`'s contract,
// leads to the `vec` extra capacity having been initialized.
vec.set_len(vec.len() + elem.len());
}
vec
})
}
async fn execute_inner(
&self,
stream: impl TryStream<Ok = BcaReq, Error = bincode::Error>,
is_whitelisted: bool,
) -> Vec<RespBytes> {
match stream
.map_ok(|req| {
let self_clone = self.clone();
tokio::spawn(async move { self_clone.execute_req(req, is_whitelisted).await })
})
.try_collect::<FuturesUnordered<_>>()
.await
{
Ok(futures_unordered) => {
futures_unordered
.map(|req_res: Result<BcaResp, JoinError>| {
let resp = match req_res {
Ok(resp) => Ok(resp),
Err(e) => Err(if e.is_cancelled() {
ReqExecError::Cancelled
} else if e.is_panic() {
ReqExecError::Panic
} else {
ReqExecError::Unknown
}),
};
let mut resp_buffer = RespBytes::new();
bincode_opts()
.serialize_into(&mut resp_buffer, &resp)
.expect("unreachable");
resp_buffer
})
.collect()
.await
}
Err(e) => {
let req_res: Result<BcaResp, ReqExecError> =
Err(ReqExecError::InvalidReq(e.to_string()));
let mut resp_buffer = RespBytes::new();
bincode_opts()
.serialize_into(&mut resp_buffer, &req_res)
.expect("unreachable");
vec![resp_buffer]
}
}
}
#[inline(always)]
async fn execute_req(self, req: BcaReq, is_whitelisted: bool) -> BcaResp {
match req {
BcaReq::V0(req) => BcaResp::V0(BcaRespV0 {
req_id: req.req_id,
resp_type: match crate::exec_req_type::execute_req_type(
&self,
req.req_type,
is_whitelisted,
)
.await
{
Ok(resp_type) => resp_type,
Err(e) => BcaRespTypeV0::Error(e.0),
},
}),
_ => BcaResp::UnsupportedVersion,
}
}
}
#[cfg(not(test))]
impl BcaExecutor {
#[inline(always)]
pub fn dbs_reader(&self) -> DbsReader {
self.dbs_reader
}
}
pub(crate) fn bincode_opts() -> impl bincode::Options {
bincode::options()
.with_limit(u32::max_value() as u64)
.allow_trailing_bytes()
}
#[cfg(test)]
mod tests {
use super::*;
pub use dubp::block::prelude::*;
pub use dubp::crypto::hashs::Hash;
pub use dubp::crypto::keys::ed25519::PublicKey;
pub use dubp::documents::transaction::TransactionInputV10;
pub use dubp::wallet::prelude::*;
pub use duniter_bca_types::BcaReqV0;
pub use duniter_dbs::databases::bc_v2::{BcV2DbReadable, BcV2DbRo};
pub use duniter_dbs::databases::cm_v1::{CmV1Db, CmV1DbReadable};
pub use duniter_dbs::databases::txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable};
pub use duniter_dbs::kv_typed::prelude::*;
pub use duniter_dbs::BlockMetaV2;
pub use futures::TryStreamExt;
mockall::mock! {
pub DbsReader {
fn find_inputs<BcDb: 'static + BcV2DbReadable, TxsMpDb: 'static + TxsMpV2DbReadable>(
&self,
bc_db: &BcDb,
txs_mp_db: &TxsMpDb,
amount: SourceAmount,
script: &WalletScriptV10,
use_mempool_sources: bool,
) -> anyhow::Result<(Vec<TransactionInputV10>, SourceAmount)>;
fn get_current_block<CmDb: 'static + CmV1DbReadable>(
&self,
cm_db: &CmDb,
) -> KvResult<Option<DubpBlockV10>>;
fn get_current_block_meta<CmDb: 'static + CmV1DbReadable>(
&self,
cm_db: &CmDb,
) -> KvResult<Option<BlockMetaV2>>;
}
}
pub type DbsReader = duniter_dbs::kv_typed::prelude::Arc<MockDbsReader>;
impl BcaExecutor {
#[inline(always)]
pub fn dbs_reader(&self) -> DbsReader {
self.dbs_reader.clone()
}
}
pub(crate) fn create_bca_executor(mock_dbs_reader: MockDbsReader) -> KvResult<BcaExecutor> {
let dbs = SharedDbs::mem()?;
let threadpool =
fast_threadpool::ThreadPool::start(fast_threadpool::ThreadPoolConfig::low(), dbs);
Ok(BcaExecutor {
dbs_pool: threadpool.into_async_handler(),
dbs_reader: duniter_dbs::kv_typed::prelude::Arc::new(mock_dbs_reader),
})
}
#[tokio::test]
async fn test_one_req_ok() -> Result<(), bincode::Error> {
let req = BcaReq::V0(BcaReqV0 {
req_id: 42,
req_type: BcaReqTypeV0::MembersCount,
});
assert_eq!(bincode_opts().serialized_size(&req)?, 3);
let mut bytes = [0u8; 7];
bincode_opts().serialize_into(&mut bytes[4..], &req)?;
bytes[3] = 3;
use bincode::Options;
//println!("bytes_for_bincode={:?}", &bytes[4..]);
assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?);
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(DubpBlockV10::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, ReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, ReqExecError>>::from(&bytes_res[..])
.try_collect::<Vec<_>>()
.await?;
assert_eq!(
bca_res,
vec![Ok(BcaResp::V0(BcaRespV0 {
req_id: 42,
resp_type: BcaRespTypeV0::MembersCount(0)
}))]
);
Ok(())
}
#[tokio::test]
async fn test_one_req_invalid() -> Result<(), bincode::Error> {
let req = BcaReq::V0(BcaReqV0 {
req_id: 42,
req_type: BcaReqTypeV0::MembersCount,
});
assert_eq!(bincode_opts().serialized_size(&req)?, 3);
let mut bytes = [0u8; 7];
bincode_opts().serialize_into(&mut bytes[4..], &req)?;
bytes[3] = 2;
use bincode::Options;
//println!("bytes_for_bincode={:?}", &bytes[4..]);
assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?);
let bca_executor =
create_bca_executor(MockDbsReader::new()).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, ReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, ReqExecError>>::from(&bytes_res[..])
.try_collect::<Vec<_>>()
.await?;
assert_eq!(
bca_res,
vec![Err(ReqExecError::InvalidReq(
"io error: unexpected end of file".to_owned()
))]
);
Ok(())
}
#[tokio::test]
async fn test_two_reqs_ok() -> Result<(), bincode::Error> {
let req1 = BcaReq::V0(BcaReqV0 {
req_id: 42,
req_type: BcaReqTypeV0::Ping,
});
assert_eq!(bincode_opts().serialized_size(&req1)?, 3);
let req2 = BcaReq::V0(BcaReqV0 {
req_id: 57,
req_type: BcaReqTypeV0::MembersCount,
});
assert_eq!(bincode_opts().serialized_size(&req2)?, 3);
let mut bytes = [0u8; 14];
bincode_opts().serialize_into(&mut bytes[4..], &req1)?;
bytes[3] = 3;
bincode_opts().serialize_into(&mut bytes[11..], &req2)?;
bytes[10] = 3;
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(DubpBlockV10::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, ReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, ReqExecError>>::from(&bytes_res[..])
.try_collect::<Vec<_>>()
.await?;
assert_eq!(
bca_res,
vec![
Ok(BcaResp::V0(BcaRespV0 {
req_id: 42,
resp_type: BcaRespTypeV0::Pong
})),
Ok(BcaResp::V0(BcaRespV0 {
req_id: 57,
resp_type: BcaRespTypeV0::MembersCount(0)
}))
]
);
Ok(())
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub(crate) struct AsyncReader<R: futures::AsyncRead + Unpin>(pub(crate) R);
impl<R: futures::AsyncRead + Unpin> tokio::io::AsyncRead for AsyncReader<R> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match <R as futures::AsyncRead>::poll_read(
std::pin::Pin::new(&mut self.0),
cx,
buf.initialize_unfilled(),
) {
std::task::Poll::Ready(res) => std::task::Poll::Ready(res.map(|n| buf.advance(n))),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
[package]
name = "duniter-bca-types"
version = "0.1.0"
authors = ["librelois <elois@duniter.org>"]
license = "AGPL-3.0"
edition = "2018"
[dependencies]
dubp = { version = "0.49.0" }
serde = { version = "1.0.105", features = ["derive"] }
smallvec = { version = "1.4.0", features = ["serde"] }
[features]
default = ["duniter"]
client = ["dubp/client"]
duniter = ["dubp/duniter"]
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![deny(
clippy::unwrap_used,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unstable_features,
unused_import_braces
)]
pub mod prepare_payment;
use crate::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp};
use dubp::crypto::hashs::Hash;
use dubp::crypto::keys::ed25519::PublicKey;
use dubp::wallet::prelude::*;
use serde::{Deserialize, Serialize};
//use smallvec::SmallVec;
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaReq {
V0(BcaReqV0),
_V1,
}
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct BcaReqV0 {
pub req_id: usize,
pub req_type: BcaReqTypeV0,
}
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaReqTypeV0 {
MembersCount,
PrepareSimplePayment(PrepareSimplePayment),
Ping,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaResp {
V0(BcaRespV0),
UnsupportedVersion,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct BcaRespV0 {
pub req_id: usize,
pub resp_type: BcaRespTypeV0,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaRespTypeV0 {
Error(String),
MembersCount(u64),
PrepareSimplePayment(PrepareSimplePaymentResp),
Pong,
}
pub type BcaResult = Vec<Result<BcaResp, ReqExecError>>;
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum ReqExecError {
Cancelled,
InvalidReq(String),
Panic,
Unknown,
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::documents::transaction::TransactionInputV10;
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct PrepareSimplePayment {
pub issuer: PublicKey,
pub amount: SourceAmount,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct PrepareSimplePaymentResp {
pub current_block_number: u32,
pub current_block_hash: Hash,
pub inputs: Vec<TransactionInputV10>,
pub inputs_sum: SourceAmount,
}
......@@ -235,7 +235,11 @@ impl GvaModule {
software_version: &'static str,
) {
log::info!("GvaServer::start: conf={:?}", conf);
let schema = duniter_gva_gql::build_schema_with_data(
duniter_bca::set_bca_executor(
dbs_pool.clone(),
duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
);
let gva_schema = duniter_gva_gql::build_schema_with_data(
duniter_gva_gql::GvaSchemaData {
dbs_reader: duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
dbs_pool,
......@@ -251,7 +255,7 @@ impl GvaModule {
let graphql_post = warp_::graphql(
&conf,
schema.clone(),
gva_schema.clone(),
async_graphql::http::MultipartOptions::default(),
);
......@@ -273,7 +277,7 @@ impl GvaModule {
let routes = graphql_playground
.or(graphql_post)
.or(warp_::graphql_ws(&conf, schema.clone()))
.or(warp_::graphql_ws(&conf, gva_schema.clone()))
.recover(|err: Rejection| async move {
if let Some(warp_::BadRequest(err)) = err.find() {
return Ok::<_, Infallible>(warp::reply::with_status(
......
......@@ -13,11 +13,16 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::net::{IpAddr, SocketAddr};
use std::{
net::{IpAddr, SocketAddr},
time::Duration,
};
use crate::anti_spam::{AntiSpam, AntiSpamResponse};
use crate::*;
const MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS: u64 = 5_000;
pub struct BadRequest(pub anyhow::Error);
impl std::fmt::Debug for BadRequest {
......@@ -28,6 +33,16 @@ impl std::fmt::Debug for BadRequest {
impl warp::reject::Reject for BadRequest {}
pub struct ReqExecTooLong;
impl std::fmt::Debug for ReqExecTooLong {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "server error: request execution too long")
}
}
impl warp::reject::Reject for ReqExecTooLong {}
struct GraphQlRequest {
inner: async_graphql::BatchRequest,
}
......@@ -78,19 +93,28 @@ impl GraphQlRequest {
}
}
struct GraphQlResponse(async_graphql::BatchResponse);
impl warp::reply::Reply for GraphQlResponse {
enum ServerResponse {
Bincode(Vec<u8>),
GraphQl(async_graphql::BatchResponse),
}
impl warp::reply::Reply for ServerResponse {
fn into_response(self) -> warp::reply::Response {
match self {
ServerResponse::Bincode(bytes) => bytes.into_response(),
ServerResponse::GraphQl(gql_batch_resp) => {
let mut resp = warp::reply::with_header(
warp::reply::json(&self.0),
warp::reply::json(&gql_batch_resp),
"content-type",
"application/json",
)
.into_response();
add_cache_control_batch(&mut resp, &self.0);
add_cache_control_batch(&mut resp, &gql_batch_resp);
resp
}
}
}
}
fn add_cache_control_batch(
http_resp: &mut warp::reply::Response,
......@@ -118,7 +142,7 @@ fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql
pub(crate) fn graphql(
conf: &GvaConf,
schema: GvaSchema,
gva_schema: GvaSchema,
opts: async_graphql::http::MultipartOptions,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
let anti_spam = AntiSpam::from(conf);
......@@ -130,19 +154,19 @@ pub(crate) fn graphql(
.and(warp::header::optional::<IpAddr>("X-Real-IP"))
.and(warp::header::optional::<String>("content-type"))
.and(warp::body::stream())
.and(warp::any().map(move || opts.clone()))
.and(warp::any().map(move || schema.clone()))
.and(warp::any().map(move || anti_spam.clone()))
.and(warp::any().map(move || gva_schema.clone()))
.and(warp::any().map(move || opts.clone()))
.and_then(
|method,
query: String,
remote_addr: Option<SocketAddr>,
x_real_ip: Option<IpAddr>,
content_type,
content_type: Option<String>,
body,
opts: Arc<async_graphql::http::MultipartOptions>,
schema,
anti_spam: AntiSpam| async move {
anti_spam: AntiSpam,
gva_schema: GvaSchema,
opts: Arc<async_graphql::http::MultipartOptions>| async move {
let AntiSpamResponse {
is_whitelisted,
is_ok,
......@@ -153,36 +177,40 @@ pub(crate) fn graphql(
if method == http::Method::GET {
let request: async_graphql::Request = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((
schema,
GraphQlRequest::single(request.data(QueryContext { is_whitelisted })),
Ok(ServerResponse::GraphQl(
GraphQlRequest::single(request.data(QueryContext { is_whitelisted }))
.execute(gva_schema)
.await,
))
} else {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
futures::TryStreamExt::map_err(body, |err| {
let body_reader = futures::TryStreamExt::map_err(body, |err| {
std::io::Error::new(std::io::ErrorKind::Other, err)
})
.map_ok(|mut buf| {
let remaining = warp::Buf::remaining(&buf);
warp::Buf::copy_to_bytes(&mut buf, remaining)
})
.into_async_read(),
async_graphql::http::MultipartOptions::clone(&opts),
.into_async_read();
if content_type.as_deref() == Some("application/bincode") {
tokio::time::timeout(
Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS),
process_bincode_batch_queries(body_reader, is_whitelisted),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok::<_, Rejection>((
schema,
batch_request.data(QueryContext { is_whitelisted }),
))
.map_err(|_| warp::reject::custom(ReqExecTooLong))?
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
tokio::time::timeout(
Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS),
process_json_batch_queries(
body_reader,
content_type,
gva_schema,
is_whitelisted,
*opts,
),
)
.await
.map_err(|_| warp::reject::custom(ReqExecTooLong))?
}
}
} else {
......@@ -192,11 +220,45 @@ pub(crate) fn graphql(
}
},
)
.and_then(
|(schema, batch_requests): (GvaSchema, GraphQlRequest)| async move {
Ok::<_, Infallible>(GraphQlResponse(batch_requests.execute(schema).await))
},
}
async fn process_bincode_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
duniter_bca::execute(body_reader, is_whitelisted).await,
))
}
async fn process_json_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
content_type: Option<String>,
gva_schema: GvaSchema,
is_whitelisted: bool,
opts: async_graphql::http::MultipartOptions,
) -> Result<ServerResponse, warp::Rejection> {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
body_reader,
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok(ServerResponse::GraphQl(
batch_request
.data(QueryContext { is_whitelisted })
.execute(gva_schema)
.await,
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}
pub(crate) fn graphql_ws(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment