Skip to content
Snippets Groups Projects
Commit 162ffdfb authored by Éloïs's avatar Éloïs
Browse files

Merge branch 'bca' into 'dev'

Bca

See merge request !1364
parents b5cd4abe e25e393d
No related branches found
No related tags found
1 merge request!1364Bca
Showing
with 1387 additions and 63 deletions
......@@ -25,6 +25,37 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e"
[[package]]
name = "aes"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561"
dependencies = [
"aes-soft",
"aesni",
"cipher",
]
[[package]]
name = "aes-soft"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be14c7498ea50828a38d0e24a765ed2effe92a705885b57d029cd67d45744072"
dependencies = [
"cipher",
"opaque-debug 0.3.0",
]
[[package]]
name = "aesni"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2e11f5e94c2f7d386164cc2aa1f97823fed6f259e486940a71c174dd01b0ce"
dependencies = [
"cipher",
"opaque-debug 0.3.0",
]
[[package]]
name = "ahash"
version = "0.3.8"
......@@ -83,6 +114,21 @@ dependencies = [
"syn",
]
[[package]]
name = "async-bincode"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a31c08aa335b3ab414d29bdefe1f4353408abf93f3db1e3e2cc78d3ec4f0d43"
dependencies = [
"bincode",
"byteorder",
"bytes 1.0.1",
"futures-core",
"futures-sink",
"serde",
"tokio",
]
[[package]]
name = "async-channel"
version = "1.5.1"
......@@ -302,6 +348,17 @@ dependencies = [
"syn",
]
[[package]]
name = "async_io_stream"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d5ad740b7193a31e80950ab7fece57c38d426fcd23a729d9d7f4cf15bb63f94"
dependencies = [
"futures",
"rustc_version 0.3.3",
"tokio",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
......@@ -547,7 +604,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0"
dependencies = [
"rustc_version",
"rustc_version 0.2.3",
]
[[package]]
......@@ -603,6 +660,15 @@ dependencies = [
"envmnt",
]
[[package]]
name = "cipher"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12f8e7987cbd042a63249497f41aed09f8e65add917ea6566effbc56578d6801"
dependencies = [
"generic-array 0.14.4",
]
[[package]]
name = "clap"
version = "2.33.3"
......@@ -1083,6 +1149,40 @@ dependencies = [
"smallvec",
]
[[package]]
name = "duniter-bca"
version = "0.1.0"
dependencies = [
"anyhow",
"async-bincode",
"async_io_stream",
"bincode",
"dubp",
"duniter-bca-types",
"duniter-dbs",
"duniter-gva-db",
"duniter-gva-dbs-reader",
"duniter-mempools",
"fast-threadpool",
"futures",
"mockall",
"once_cell",
"smallvec",
"tokio",
"uninit",
]
[[package]]
name = "duniter-bca-types"
version = "0.1.0"
dependencies = [
"bincode",
"dubp",
"serde",
"smallvec",
"thiserror",
]
[[package]]
name = "duniter-conf"
version = "0.1.0"
......@@ -1165,7 +1265,9 @@ dependencies = [
"async-graphql",
"async-mutex",
"async-trait",
"bytes 1.0.1",
"dubp",
"duniter-bca",
"duniter-conf",
"duniter-dbs",
"duniter-gva-db",
......@@ -1376,12 +1478,16 @@ version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0be04829b31b18bacf5317001366d807e5fbd02085ee6348508c1299b5bcaf6c"
dependencies = [
"aes",
"arrayvec",
"base64 0.13.0",
"blake3",
"bs58 0.4.0",
"byteorder",
"cryptoxide",
"ed25519-bip32",
"getrandom 0.2.2",
"once_cell",
"ring",
"serde",
"thiserror",
......@@ -1389,6 +1495,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "ed25519-bip32"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8827180a2b511141fbe49141e50b31a8d542465e0fb572f81f36feea2addfe92"
dependencies = [
"cryptoxide",
]
[[package]]
name = "either"
version = "1.6.1"
......@@ -2305,7 +2420,7 @@ dependencies = [
"cslice",
"neon-build",
"neon-runtime",
"semver",
"semver 0.9.0",
]
[[package]]
......@@ -3127,7 +3242,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
"semver 0.9.0",
]
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
]
[[package]]
......@@ -3181,7 +3305,16 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
"semver-parser 0.7.0",
]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser 0.10.2",
]
[[package]]
......@@ -3190,6 +3323,15 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]]
name = "serde"
version = "1.0.124"
......@@ -3513,7 +3655,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14"
dependencies = [
"rustc_version",
"rustc_version 0.2.3",
"terminal_size",
]
......
......@@ -42,6 +42,8 @@ members = [
"rust-libs/duniter-module",
"rust-libs/duniter-server",
"rust-libs/modules/gva",
"rust-libs/modules/gva/bca",
"rust-libs/modules/gva/bca/types",
"rust-libs/modules/gva/dbs-reader",
"rust-libs/modules/gva/gql",
"rust-libs/modules/gva/indexer",
......
......@@ -54,6 +54,11 @@ license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 },
]
[[licenses.exceptions]]
allow = ["Unlicense"]
name = "async_io_stream"
version = "0.3.1"
[sources]
unknown-registry = "deny"
unknown-git = "deny"
......@@ -41,7 +41,7 @@ declare_types! {
let gva_conf = rust_server_conf_stringified.gva;
let currency = rust_server_conf_stringified.currency;
let server_pubkey = if let Some(self_keypair_str) = rust_server_conf_stringified.self_keypair {
let self_key_pair = if let Some(self_keypair_str) = rust_server_conf_stringified.self_keypair {
into_neon_res(&mut cx, crate::crypto::keypair_from_expanded_base58_secret_key(&self_keypair_str))?
} else {
Ed25519KeyPair::generate_random().expect("fail to gen random keyypair")
......@@ -49,7 +49,7 @@ declare_types! {
let txs_mempool_size = rust_server_conf_stringified.txs_mempool_size as usize;
let conf = DuniterConf {
gva: gva_conf,
self_key_pair: server_pubkey,
self_key_pair,
txs_mempool_size
};
......
......@@ -11,7 +11,9 @@ arrayvec = "0.5.1"
async-graphql = "2.2.0"
async-mutex = "1.4.0"
async-trait = "0.1.41"
bytes = "1.0"
dubp = { version = "0.49.0", features = ["duniter"] }
duniter-bca = { path = "./bca" }
duniter-conf = { path = "../../duniter-conf" }
duniter-dbs = { path = "../../duniter-dbs" }
duniter-gva-db = { path = "./db" }
......
[package]
name = "duniter-bca"
version = "0.1.0"
authors = ["librelois <elois@duniter.org>"]
license = "AGPL-3.0"
edition = "2018"
[dependencies]
anyhow = "1.0.33"
async-bincode = "0.6.1"
async_io_stream = { version = "0.3.1", features = [ "tokio_io"] }
bincode = "1.3"
dubp = { version = "0.49.0", features = ["duniter"] }
duniter-bca-types = { path = "types", features = ["duniter"] }
duniter-dbs = { path = "../../../duniter-dbs" }
duniter-gva-db = { path = "../db" }
duniter-gva-dbs-reader = { path = "../dbs-reader" }
duniter-mempools = { path = "../../../duniter-mempools" }
fast-threadpool = "0.2.3"
futures = "0.3.6"
once_cell = "1.5"
smallvec = { version = "1.4.0", features = ["serde", "write"] }
tokio = { version = "1.2", features = ["macros", "rt-multi-thread"] }
uninit = "0.4.0"
[dev-dependencies]
duniter-dbs = { path = "../../../duniter-dbs", features = ["mem"] }
tokio = { version = "1.2", features = ["macros", "rt-multi-thread", "time"] }
mockall = "0.8.0"
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
mod last_blockstamp_out_of_fork_window;
mod members_count;
mod prepare_simple_payment;
mod send_txs;
use dubp::crypto::keys::KeyPair;
use crate::*;
#[derive(Debug, PartialEq)]
pub(super) struct ExecReqTypeError(pub(super) String);
impl<E> From<E> for ExecReqTypeError
where
E: ToString,
{
fn from(e: E) -> Self {
Self(e.to_string())
}
}
pub(super) async fn execute_req_type(
bca_executor: &BcaExecutor,
req_type: BcaReqTypeV0,
_is_whitelisted: bool,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
match req_type {
BcaReqTypeV0::LastBlockstampOutOfForkWindow => {
last_blockstamp_out_of_fork_window::exec_req_last_blockstamp_out_of_fork_window(
bca_executor,
)
.await
}
BcaReqTypeV0::MembersCount => members_count::exec_req_members_count(bca_executor).await,
BcaReqTypeV0::PrepareSimplePayment(params) => {
prepare_simple_payment::exec_req_prepare_simple_payment(bca_executor, params).await
}
BcaReqTypeV0::ProofServerPubkey { challenge } => Ok(BcaRespTypeV0::ProofServerPubkey {
challenge,
server_pubkey: bca_executor.self_keypair.public_key(),
sig: bca_executor
.self_keypair
.generate_signator()
.sign(&challenge),
}),
BcaReqTypeV0::Ping => Ok(BcaRespTypeV0::Pong),
BcaReqTypeV0::SendTxs(txs) => send_txs::send_txs(bca_executor, txs).await,
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::common::prelude::*;
pub(super) async fn exec_req_last_blockstamp_out_of_fork_window(
bca_executor: &BcaExecutor,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
let dbs_reader = bca_executor.dbs_reader();
bca_executor
.dbs_pool
.execute(move |dbs| {
if let Some(current_block) = dbs_reader.get_current_block_meta(&dbs.cm_db)? {
let block_ref_number = if current_block.number < 101 {
0
} else {
current_block.number - 101
};
let block_ref_hash = dbs_reader
.block(&dbs.bc_db_ro, U32BE(block_ref_number))?
.expect("unreachable")
.hash;
Ok::<_, ExecReqTypeError>(BcaRespTypeV0::LastBlockstampOutOfForkWindow(
Blockstamp {
number: BlockNumber(block_ref_number),
hash: BlockHash(block_ref_hash),
},
))
} else {
Err("no blockchain".into())
}
})
.await?
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::*;
#[tokio::test]
async fn test_exec_req_last_blockstamp_out_of_fork_window_no_blockchain() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(None));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_last_blockstamp_out_of_fork_window(&bca_executor).await;
assert_eq!(resp_res, Err(ExecReqTypeError("no blockchain".into())));
}
#[tokio::test]
async fn test_exec_req_last_blockstamp_out_of_fork_window_ok() -> Result<(), ExecReqTypeError> {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_block()
.times(1)
.returning(|_, _| Ok(Some(BlockMetaV2::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp = exec_req_last_blockstamp_out_of_fork_window(&bca_executor).await?;
assert_eq!(
resp,
BcaRespTypeV0::LastBlockstampOutOfForkWindow(Blockstamp::default())
);
Ok(())
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::block::prelude::*;
pub(super) async fn exec_req_members_count(
bca_executor: &BcaExecutor,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
let dbs_reader = bca_executor.dbs_reader();
Ok(bca_executor
.dbs_pool
.execute(move |dbs| match dbs_reader.get_current_block(&dbs.cm_db) {
Ok(Some(current_block)) => {
BcaRespTypeV0::MembersCount(current_block.members_count() as u64)
}
Ok(None) => BcaRespTypeV0::Error("no blockchain".to_owned()),
Err(e) => BcaRespTypeV0::Error(e.to_string()),
})
.await?)
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software current_block_number: (), current_block_hash: (), inputs: (), inputs_sum: (): you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::wallet::prelude::*;
use duniter_bca_types::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp};
pub(super) async fn exec_req_prepare_simple_payment(
bca_executor: &BcaExecutor,
params: PrepareSimplePayment,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
let mut amount = params.amount;
let issuer = params.issuer;
let dbs_reader = bca_executor.dbs_reader();
let (amount, block_ref_number, block_ref_hash, (inputs, inputs_sum)) = bca_executor
.dbs_pool
.execute(move |dbs| {
if let Some(current_block) = dbs_reader.get_current_block_meta(&dbs.cm_db)? {
let block_ref_number = if current_block.number < 101 {
0
} else {
current_block.number - 101
};
let block_ref_hash = dbs_reader
.block(&dbs.bc_db_ro, U32BE(block_ref_number))?
.expect("unreachable")
.hash;
let current_base = current_block.unit_base as i64;
if amount.base() > current_base {
Err("too long base".into())
} else {
while amount.base() < current_base {
amount = amount.increment_base();
}
Ok::<_, ExecReqTypeError>((
amount,
block_ref_number,
block_ref_hash,
dbs_reader.find_inputs(
&dbs.bc_db_ro,
&dbs.txs_mp_db,
amount,
&WalletScriptV10::single(WalletConditionV10::Sig(issuer)),
false,
)?,
))
}
} else {
Err("no blockchain".into())
}
})
.await??;
if inputs_sum < amount {
return Err("insufficient balance".into());
}
Ok(BcaRespTypeV0::PrepareSimplePayment(
PrepareSimplePaymentResp {
current_block_number: block_ref_number,
current_block_hash: block_ref_hash,
inputs,
inputs_sum,
},
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::*;
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_no_blockchain() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(None));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 0),
},
)
.await;
assert_eq!(resp_res, Err(ExecReqTypeError("no blockchain".into())));
}
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_too_long_base() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_block()
.times(1)
.returning(|_, _| Ok(Some(BlockMetaV2::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 1),
},
)
.await;
assert_eq!(resp_res, Err(ExecReqTypeError("too long base".into())));
}
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_insufficient_balance() {
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_block()
.times(1)
.returning(|_, _| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_find_inputs::<BcV2DbRo<FileBackend>, TxsMpV2Db<FileBackend>>()
.times(1)
.returning(|_, _, _, _, _| Ok((vec![], SourceAmount::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp_res = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 0),
},
)
.await;
assert_eq!(
resp_res,
Err(ExecReqTypeError("insufficient balance".into()))
);
}
#[tokio::test]
async fn test_exec_req_prepare_simple_payment_ok() -> Result<(), ExecReqTypeError> {
let input = TransactionInputV10 {
amount: SourceAmount::with_base0(57),
id: SourceIdV10::Utxo(UtxoIdV10 {
tx_hash: Hash::default(),
output_index: 3,
}),
};
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block_meta::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_block()
.times(1)
.returning(|_, _| Ok(Some(BlockMetaV2::default())));
dbs_reader
.expect_find_inputs::<BcV2DbRo<FileBackend>, TxsMpV2Db<FileBackend>>()
.times(1)
.returning(move |_, _, _, _, _| Ok((vec![input], SourceAmount::with_base0(57))));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
let resp = exec_req_prepare_simple_payment(
&bca_executor,
PrepareSimplePayment {
issuer: PublicKey::default(),
amount: SourceAmount::new(42, 0),
},
)
.await?;
assert_eq!(
resp,
BcaRespTypeV0::PrepareSimplePayment(PrepareSimplePaymentResp {
current_block_number: 0,
current_block_hash: Hash::default(),
inputs: vec![input],
inputs_sum: SourceAmount::with_base0(57),
})
);
Ok(())
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::{crypto::keys::KeyPair, documents::transaction::TransactionDocumentTrait};
use duniter_bca_types::{
rejected_tx::{RejectedTx, RejectedTxReason},
Txs,
};
pub(super) async fn send_txs(
bca_executor: &BcaExecutor,
txs: Txs,
) -> Result<BcaRespTypeV0, ExecReqTypeError> {
let expected_currency = bca_executor.currency.clone();
let server_pubkey = bca_executor.self_keypair.public_key();
let txs_mempool = bca_executor.txs_mempool;
let mut rejected_txs = Vec::new();
for (i, tx) in txs.into_iter().enumerate() {
if let Err(e) = tx.verify(Some(&expected_currency)) {
rejected_txs.push(RejectedTx {
tx_index: i as u16,
reason: RejectedTxReason::InvalidTx(e.to_string()),
});
} else if let Err(rejected_tx) = bca_executor
.dbs_pool
.execute(move |dbs| {
txs_mempool
.add_pending_tx(&dbs.bc_db_ro, server_pubkey, &dbs.txs_mp_db, &tx)
.map_err(|e| RejectedTx {
tx_index: i as u16,
reason: match e {
duniter_mempools::TxMpError::Db(e) => {
RejectedTxReason::DbError(e.to_string())
}
duniter_mempools::TxMpError::Full => RejectedTxReason::MempoolFull,
duniter_mempools::TxMpError::TxAlreadyWritten => {
RejectedTxReason::TxAlreadyWritten
}
},
})
})
.await?
{
rejected_txs.push(rejected_tx);
}
}
Ok(BcaRespTypeV0::RejectedTxs(rejected_txs))
}
// Copyright (C) 2020 Éloïs req_id: (), resp_type: ()SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![deny(
clippy::unwrap_used,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unstable_features,
unused_import_braces
)]
mod exec_req_type;
const RESP_MIN_SIZE: usize = 64;
type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>;
use crate::exec_req_type::ExecReqTypeError;
use async_bincode::AsyncBincodeReader;
use async_io_stream::IoStream;
use bincode::Options as _;
use dubp::crypto::keys::{ed25519::Ed25519KeyPair, Signator};
use duniter_bca_types::{
bincode_opts, BcaReq, BcaReqExecError, BcaReqTypeV0, BcaResp, BcaRespTypeV0, BcaRespV0,
};
pub use duniter_dbs::kv_typed::prelude::*;
use duniter_dbs::{FileBackend, SharedDbs};
use futures::{prelude::stream::FuturesUnordered, StreamExt, TryStream, TryStreamExt};
use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use tokio::task::JoinError;
#[cfg(test)]
use crate::tests::DbsReader;
#[cfg(not(test))]
use duniter_gva_dbs_reader::DbsReader;
static BCA_EXECUTOR: OnceCell<BcaExecutor> = OnceCell::new();
pub fn set_bca_executor(
currency: String,
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
dbs_reader: DbsReader,
self_keypair: Ed25519KeyPair,
software_version: &'static str,
txs_mempool: duniter_mempools::TxsMempool,
) {
BCA_EXECUTOR
.set(BcaExecutor {
currency,
dbs_pool,
dbs_reader,
self_keypair,
software_version,
txs_mempool,
})
.unwrap_or_else(|_| panic!("BCA_EXECUTOR already set !"))
}
#[cfg(not(test))]
pub async fn execute<B, S>(query_body_stream: S, is_whitelisted: bool) -> Vec<u8>
where
B: AsRef<[u8]>,
S: 'static + TryStream<Ok = B, Error = std::io::Error> + Send + Unpin,
{
unsafe {
BCA_EXECUTOR
.get_unchecked()
.execute(query_body_stream, is_whitelisted)
.await
}
}
#[derive(Clone)]
struct BcaExecutor {
currency: String,
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
dbs_reader: DbsReader,
self_keypair: Ed25519KeyPair,
software_version: &'static str,
txs_mempool: duniter_mempools::TxsMempool,
}
use uninit::extension_traits::VecCapacity;
impl BcaExecutor {
pub async fn execute<B, S>(&self, query_body_stream: S, is_whitelisted: bool) -> Vec<u8>
where
B: AsRef<[u8]>,
S: 'static + TryStream<Ok = B, Error = std::io::Error> + Send + Unpin,
{
let async_bincode_reader =
AsyncBincodeReader::<IoStream<S, B>, BcaReq>::from(IoStream::new(query_body_stream));
self.execute_inner(async_bincode_reader, is_whitelisted)
.await
.into_iter()
.fold(Vec::new(), |mut vec, elem| {
// Write resp len
let out = vec.reserve_uninit(4);
out.copy_from_slice(&u32::to_be_bytes(elem.len() as u32)[..]);
unsafe {
// # Safety
//
// - `.copy_from_slice()` contract guarantees initialization
// of `out`, which, in turn, from `reserve_uninit`'s contract,
// leads to the `vec` extra capacity having been initialized.
vec.set_len(vec.len() + 4);
}
// Write resp content
let out = vec.reserve_uninit(elem.len());
out.copy_from_slice(&elem[..]);
unsafe {
// # Safety
//
// - `.copy_from_slice()` contract guarantees initialization
// of `out`, which, in turn, from `reserve_uninit`'s contract,
// leads to the `vec` extra capacity having been initialized.
vec.set_len(vec.len() + elem.len());
}
vec
})
}
async fn execute_inner(
&self,
stream: impl TryStream<Ok = BcaReq, Error = bincode::Error>,
is_whitelisted: bool,
) -> Vec<RespBytes> {
match stream
.map_ok(|req| {
let self_clone = self.clone();
tokio::spawn(async move { self_clone.execute_req(req, is_whitelisted).await })
})
.try_collect::<FuturesUnordered<_>>()
.await
{
Ok(futures_unordered) => {
futures_unordered
.map(|req_res: Result<BcaResp, JoinError>| {
let resp = match req_res {
Ok(resp) => Ok(resp),
Err(e) => Err(if e.is_cancelled() {
BcaReqExecError::Cancelled
} else if e.is_panic() {
BcaReqExecError::Panic
} else {
BcaReqExecError::Unknown
}),
};
let mut resp_buffer = RespBytes::new();
bincode_opts()
.serialize_into(&mut resp_buffer, &resp)
.expect("unreachable");
resp_buffer
})
.collect()
.await
}
Err(e) => {
let req_res: Result<BcaResp, BcaReqExecError> =
Err(BcaReqExecError::InvalidReq(e.to_string()));
let mut resp_buffer = RespBytes::new();
bincode_opts()
.serialize_into(&mut resp_buffer, &req_res)
.expect("unreachable");
vec![resp_buffer]
}
}
}
#[inline(always)]
async fn execute_req(self, req: BcaReq, is_whitelisted: bool) -> BcaResp {
match req {
BcaReq::V0(req) => BcaResp::V0(BcaRespV0 {
req_id: req.req_id,
resp_type: match crate::exec_req_type::execute_req_type(
&self,
req.req_type,
is_whitelisted,
)
.await
{
Ok(resp_type) => resp_type,
Err(e) => BcaRespTypeV0::Error(e.0),
},
}),
_ => BcaResp::UnsupportedVersion,
}
}
}
#[cfg(not(test))]
impl BcaExecutor {
#[inline(always)]
pub fn dbs_reader(&self) -> DbsReader {
self.dbs_reader
}
}
#[cfg(test)]
mod tests {
use super::*;
pub use dubp::{
block::prelude::*,
crypto::{
hashs::Hash,
keys::{ed25519::PublicKey, KeyPair, Seed32},
},
documents::transaction::TransactionInputV10,
wallet::prelude::*,
};
pub use duniter_bca_types::BcaReqV0;
pub use duniter_dbs::databases::bc_v2::{BcV2DbReadable, BcV2DbRo};
pub use duniter_dbs::databases::cm_v1::{CmV1Db, CmV1DbReadable};
pub use duniter_dbs::databases::txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable};
pub use duniter_dbs::BlockMetaV2;
pub use futures::TryStreamExt;
mockall::mock! {
pub DbsReader {
fn block(&self, bc_db: &BcV2DbRo<FileBackend>, number: U32BE) -> KvResult<Option<BlockMetaV2>>;
fn find_inputs<BcDb: 'static + BcV2DbReadable, TxsMpDb: 'static + TxsMpV2DbReadable>(
&self,
bc_db: &BcDb,
txs_mp_db: &TxsMpDb,
amount: SourceAmount,
script: &WalletScriptV10,
use_mempool_sources: bool,
) -> anyhow::Result<(Vec<TransactionInputV10>, SourceAmount)>;
fn get_current_block<CmDb: 'static + CmV1DbReadable>(
&self,
cm_db: &CmDb,
) -> KvResult<Option<DubpBlockV10>>;
fn get_current_block_meta<CmDb: 'static + CmV1DbReadable>(
&self,
cm_db: &CmDb,
) -> KvResult<Option<BlockMetaV2>>;
}
}
pub type DbsReader = duniter_dbs::kv_typed::prelude::Arc<MockDbsReader>;
impl BcaExecutor {
#[inline(always)]
pub fn dbs_reader(&self) -> DbsReader {
self.dbs_reader.clone()
}
}
pub(crate) fn create_bca_executor(mock_dbs_reader: MockDbsReader) -> KvResult<BcaExecutor> {
let dbs = SharedDbs::mem()?;
let threadpool =
fast_threadpool::ThreadPool::start(fast_threadpool::ThreadPoolConfig::low(), dbs);
Ok(BcaExecutor {
currency: "g1".to_owned(),
dbs_pool: threadpool.into_async_handler(),
dbs_reader: duniter_dbs::kv_typed::prelude::Arc::new(mock_dbs_reader),
self_keypair: Ed25519KeyPair::from_seed(
Seed32::random().expect("fail to gen random seed"),
),
software_version: "test",
txs_mempool: duniter_mempools::TxsMempool::new(10),
})
}
pub(crate) fn io_stream<B: AsRef<[u8]>>(
bytes: B,
) -> impl TryStream<Ok = B, Error = std::io::Error> {
futures::stream::iter(std::iter::once(Ok(bytes)))
}
#[tokio::test]
async fn test_one_req_ok() -> Result<(), bincode::Error> {
let req = BcaReq::V0(BcaReqV0 {
req_id: 42,
req_type: BcaReqTypeV0::MembersCount,
});
assert_eq!(bincode_opts().serialized_size(&req)?, 3);
let mut bytes = [0u8; 7];
bincode_opts().serialize_into(&mut bytes[4..], &req)?;
bytes[3] = 3;
use bincode::Options;
//println!("bytes_for_bincode={:?}", &bytes[4..]);
assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?);
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(DubpBlockV10::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute(io_stream(bytes), false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, BcaReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..])
.try_collect::<Vec<_>>()
.await?;
assert_eq!(
bca_res,
vec![Ok(BcaResp::V0(BcaRespV0 {
req_id: 42,
resp_type: BcaRespTypeV0::MembersCount(0)
}))]
);
Ok(())
}
#[tokio::test]
async fn test_one_req_invalid() -> Result<(), bincode::Error> {
let req = BcaReq::V0(BcaReqV0 {
req_id: 42,
req_type: BcaReqTypeV0::MembersCount,
});
assert_eq!(bincode_opts().serialized_size(&req)?, 3);
let mut bytes = [0u8; 7];
bincode_opts().serialize_into(&mut bytes[4..], &req)?;
bytes[3] = 2;
use bincode::Options;
//println!("bytes_for_bincode={:?}", &bytes[4..]);
assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?);
let bca_executor =
create_bca_executor(MockDbsReader::new()).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute(io_stream(bytes), false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, BcaReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..])
.try_collect::<Vec<_>>()
.await?;
assert_eq!(
bca_res,
vec![Err(BcaReqExecError::InvalidReq(
"io error: unexpected end of file".to_owned()
))]
);
Ok(())
}
#[tokio::test]
async fn test_two_reqs_ok() -> Result<(), bincode::Error> {
let req1 = BcaReq::V0(BcaReqV0 {
req_id: 42,
req_type: BcaReqTypeV0::Ping,
});
assert_eq!(bincode_opts().serialized_size(&req1)?, 3);
let req2 = BcaReq::V0(BcaReqV0 {
req_id: 57,
req_type: BcaReqTypeV0::MembersCount,
});
assert_eq!(bincode_opts().serialized_size(&req2)?, 3);
let mut bytes = [0u8; 14];
bincode_opts().serialize_into(&mut bytes[4..], &req1)?;
bytes[3] = 3;
bincode_opts().serialize_into(&mut bytes[11..], &req2)?;
bytes[10] = 3;
let mut dbs_reader = MockDbsReader::new();
dbs_reader
.expect_get_current_block::<CmV1Db<MemSingleton>>()
.times(1)
.returning(|_| Ok(Some(DubpBlockV10::default())));
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute(io_stream(bytes), false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, BcaReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..])
.try_collect::<Vec<_>>()
.await?;
assert_eq!(
bca_res,
vec![
Ok(BcaResp::V0(BcaRespV0 {
req_id: 42,
resp_type: BcaRespTypeV0::Pong
})),
Ok(BcaResp::V0(BcaRespV0 {
req_id: 57,
resp_type: BcaRespTypeV0::MembersCount(0)
}))
]
);
Ok(())
}
}
[package]
name = "duniter-bca-types"
version = "0.1.0"
authors = ["librelois <elois@duniter.org>"]
license = "AGPL-3.0"
edition = "2018"
[dependencies]
bincode = "1.3"
dubp = { version = "0.49.0" }
serde = { version = "1.0.105", features = ["derive"] }
smallvec = { version = "1.4.0", features = ["serde"] }
thiserror = "1.0.20"
[features]
default = ["duniter"]
client = ["dubp/client"]
duniter = ["dubp/duniter"]
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![deny(
clippy::unwrap_used,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unstable_features,
unused_import_braces
)]
pub mod prepare_payment;
pub mod rejected_tx;
use crate::prepare_payment::{PrepareSimplePayment, PrepareSimplePaymentResp};
use bincode::Options as _;
use dubp::crypto::keys::ed25519::{PublicKey, Signature};
use dubp::wallet::prelude::*;
use dubp::{common::prelude::Blockstamp, crypto::hashs::Hash};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use thiserror::Error;
pub fn bincode_opts() -> impl bincode::Options {
bincode::options()
.with_limit(u32::max_value() as u64)
.allow_trailing_bytes()
}
// Request
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaReq {
V0(BcaReqV0),
_V1,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct BcaReqV0 {
pub req_id: usize,
pub req_type: BcaReqTypeV0,
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaReqTypeV0 {
LastBlockstampOutOfForkWindow,
MembersCount,
PrepareSimplePayment(PrepareSimplePayment),
ProofServerPubkey { challenge: [u8; 16] },
Ping,
SendTxs(Txs),
}
// Request types helpers
pub type Txs = SmallVec<[dubp::documents::transaction::TransactionDocumentV10; 1]>;
// Response
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaResp {
V0(BcaRespV0),
UnsupportedVersion,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct BcaRespV0 {
pub req_id: usize,
pub resp_type: BcaRespTypeV0,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum BcaRespTypeV0 {
Error(String),
ProofServerPubkey {
challenge: [u8; 16],
server_pubkey: PublicKey,
sig: Signature,
},
LastBlockstampOutOfForkWindow(Blockstamp),
MembersCount(u64),
PrepareSimplePayment(PrepareSimplePaymentResp),
Pong,
RejectedTxs(Vec<rejected_tx::RejectedTx>),
}
// Result and error
pub type BcaResult = Result<BcaResp, BcaReqExecError>;
#[derive(Clone, Debug, Deserialize, Error, PartialEq, Eq, Serialize)]
pub enum BcaReqExecError {
#[error("task cancelled")]
Cancelled,
#[error("Invalid request: {0}")]
InvalidReq(String),
#[error("task panicked")]
Panic,
#[error("Unknown error")]
Unknown,
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::documents::transaction::TransactionInputV10;
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct PrepareSimplePayment {
pub issuer: PublicKey,
pub amount: SourceAmount,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct PrepareSimplePaymentResp {
pub current_block_number: u32,
pub current_block_hash: Hash,
pub inputs: Vec<TransactionInputV10>,
pub inputs_sum: SourceAmount,
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct RejectedTx {
pub tx_index: u16,
pub reason: RejectedTxReason,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum RejectedTxReason {
DbError(String),
InvalidTx(String),
MempoolFull,
TxAlreadyWritten,
}
......@@ -28,10 +28,13 @@ mod warp_;
pub use duniter_conf::gva_conf::GvaConf;
use async_graphql::http::GraphQLPlaygroundConfig;
use dubp::common::crypto::keys::{ed25519::PublicKey, KeyPair as _};
use dubp::common::prelude::*;
use dubp::documents::transaction::TransactionDocumentV10;
use dubp::{block::DubpBlockV10, crypto::hashs::Hash};
use dubp::{
common::crypto::keys::{ed25519::PublicKey, KeyPair as _},
crypto::keys::ed25519::Ed25519KeyPair,
};
use duniter_conf::DuniterMode;
use duniter_dbs::databases::txs_mp_v2::TxsMpV2DbReadable;
use duniter_dbs::prelude::*;
......@@ -52,7 +55,7 @@ pub struct GvaModule {
gva_db_ro: &'static GvaV1DbRo<FileBackend>,
mempools: Mempools,
mode: DuniterMode,
self_pubkey: PublicKey,
self_keypair: Ed25519KeyPair,
software_version: &'static str,
}
......@@ -119,7 +122,7 @@ impl duniter_module::DuniterModule for GvaModule {
gva_db_ro: get_gva_db_ro(profile_path_opt),
mempools,
mode,
self_pubkey: conf.self_key_pair.public_key(),
self_keypair: conf.self_key_pair.clone(),
software_version,
},
endpoints,
......@@ -136,7 +139,7 @@ impl duniter_module::DuniterModule for GvaModule {
gva_db_ro,
mempools,
mode,
self_pubkey,
self_keypair,
software_version,
} = self;
......@@ -148,7 +151,7 @@ impl duniter_module::DuniterModule for GvaModule {
dbs_pool,
gva_db_ro,
mempools,
self_pubkey,
self_keypair,
software_version,
)
.await
......@@ -231,11 +234,20 @@ impl GvaModule {
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
gva_db_ro: &'static GvaV1DbRo<FileBackend>,
mempools: Mempools,
self_pubkey: PublicKey,
self_keypair: Ed25519KeyPair,
software_version: &'static str,
) {
log::info!("GvaServer::start: conf={:?}", conf);
let schema = duniter_gva_gql::build_schema_with_data(
let self_pubkey = self_keypair.public_key();
duniter_bca::set_bca_executor(
currency.clone(),
dbs_pool.clone(),
duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
self_keypair,
software_version,
mempools.txs,
);
let gva_schema = duniter_gva_gql::build_schema_with_data(
duniter_gva_gql::GvaSchemaData {
dbs_reader: duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
dbs_pool,
......@@ -251,7 +263,7 @@ impl GvaModule {
let graphql_post = warp_::graphql(
&conf,
schema.clone(),
gva_schema.clone(),
async_graphql::http::MultipartOptions::default(),
);
......@@ -273,7 +285,7 @@ impl GvaModule {
let routes = graphql_playground
.or(graphql_post)
.or(warp_::graphql_ws(&conf, schema.clone()))
.or(warp_::graphql_ws(&conf, gva_schema.clone()))
.recover(|err: Rejection| async move {
if let Some(warp_::BadRequest(err)) = err.find() {
return Ok::<_, Infallible>(warp::reply::with_status(
......
......@@ -13,11 +13,18 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::net::{IpAddr, SocketAddr};
use std::{
net::{IpAddr, SocketAddr},
time::Duration,
};
use bytes::Bytes;
use crate::anti_spam::{AntiSpam, AntiSpamResponse};
use crate::*;
const MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS: u64 = 5_000;
pub struct BadRequest(pub anyhow::Error);
impl std::fmt::Debug for BadRequest {
......@@ -28,6 +35,16 @@ impl std::fmt::Debug for BadRequest {
impl warp::reject::Reject for BadRequest {}
pub struct ReqExecTooLong;
impl std::fmt::Debug for ReqExecTooLong {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "server error: request execution too long")
}
}
impl warp::reject::Reject for ReqExecTooLong {}
struct GraphQlRequest {
inner: async_graphql::BatchRequest,
}
......@@ -78,19 +95,28 @@ impl GraphQlRequest {
}
}
struct GraphQlResponse(async_graphql::BatchResponse);
impl warp::reply::Reply for GraphQlResponse {
enum ServerResponse {
Bincode(Vec<u8>),
GraphQl(async_graphql::BatchResponse),
}
impl warp::reply::Reply for ServerResponse {
fn into_response(self) -> warp::reply::Response {
match self {
ServerResponse::Bincode(bytes) => bytes.into_response(),
ServerResponse::GraphQl(gql_batch_resp) => {
let mut resp = warp::reply::with_header(
warp::reply::json(&self.0),
warp::reply::json(&gql_batch_resp),
"content-type",
"application/json",
)
.into_response();
add_cache_control_batch(&mut resp, &self.0);
add_cache_control_batch(&mut resp, &gql_batch_resp);
resp
}
}
}
}
fn add_cache_control_batch(
http_resp: &mut warp::reply::Response,
......@@ -118,7 +144,7 @@ fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql
pub(crate) fn graphql(
conf: &GvaConf,
schema: GvaSchema,
gva_schema: GvaSchema,
opts: async_graphql::http::MultipartOptions,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
let anti_spam = AntiSpam::from(conf);
......@@ -130,19 +156,19 @@ pub(crate) fn graphql(
.and(warp::header::optional::<IpAddr>("X-Real-IP"))
.and(warp::header::optional::<String>("content-type"))
.and(warp::body::stream())
.and(warp::any().map(move || opts.clone()))
.and(warp::any().map(move || schema.clone()))
.and(warp::any().map(move || anti_spam.clone()))
.and(warp::any().map(move || gva_schema.clone()))
.and(warp::any().map(move || opts.clone()))
.and_then(
|method,
query: String,
remote_addr: Option<SocketAddr>,
x_real_ip: Option<IpAddr>,
content_type,
content_type: Option<String>,
body,
opts: Arc<async_graphql::http::MultipartOptions>,
schema,
anti_spam: AntiSpam| async move {
anti_spam: AntiSpam,
gva_schema: GvaSchema,
opts: Arc<async_graphql::http::MultipartOptions>| async move {
let AntiSpamResponse {
is_whitelisted,
is_ok,
......@@ -153,36 +179,39 @@ pub(crate) fn graphql(
if method == http::Method::GET {
let request: async_graphql::Request = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((
schema,
GraphQlRequest::single(request.data(QueryContext { is_whitelisted })),
Ok(ServerResponse::GraphQl(
GraphQlRequest::single(request.data(QueryContext { is_whitelisted }))
.execute(gva_schema)
.await,
))
} else {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
futures::TryStreamExt::map_err(body, |err| {
let body_stream = futures::TryStreamExt::map_err(body, |err| {
std::io::Error::new(std::io::ErrorKind::Other, err)
})
.map_ok(|mut buf| {
let remaining = warp::Buf::remaining(&buf);
warp::Buf::copy_to_bytes(&mut buf, remaining)
})
.into_async_read(),
async_graphql::http::MultipartOptions::clone(&opts),
});
if content_type.as_deref() == Some("application/bincode") {
tokio::time::timeout(
Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS),
process_bincode_batch_queries(body_stream, is_whitelisted),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok::<_, Rejection>((
schema,
batch_request.data(QueryContext { is_whitelisted }),
))
.map_err(|_| warp::reject::custom(ReqExecTooLong))?
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
tokio::time::timeout(
Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS),
process_json_batch_queries(
body_stream.into_async_read(),
content_type,
gva_schema,
is_whitelisted,
*opts,
),
)
.await
.map_err(|_| warp::reject::custom(ReqExecTooLong))?
}
}
} else {
......@@ -192,11 +221,45 @@ pub(crate) fn graphql(
}
},
)
.and_then(
|(schema, batch_requests): (GvaSchema, GraphQlRequest)| async move {
Ok::<_, Infallible>(GraphQlResponse(batch_requests.execute(schema).await))
},
}
async fn process_bincode_batch_queries(
body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
duniter_bca::execute(body_reader, is_whitelisted).await,
))
}
async fn process_json_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
content_type: Option<String>,
gva_schema: GvaSchema,
is_whitelisted: bool,
opts: async_graphql::http::MultipartOptions,
) -> Result<ServerResponse, warp::Rejection> {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
body_reader,
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok(ServerResponse::GraphQl(
batch_request
.data(QueryContext { is_whitelisted })
.execute(gva_schema)
.await,
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}
pub(crate) fn graphql_ws(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment