Skip to content
Snippets Groups Projects
Commit 3d1783c7 authored by Éloïs's avatar Éloïs
Browse files

[ref] gva:bca use async_io_stream instead of manual util conversion code

parent b7f09da2
No related branches found
No related tags found
1 merge request!1364Bca
......@@ -348,6 +348,17 @@ dependencies = [
"syn",
]
[[package]]
name = "async_io_stream"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d5ad740b7193a31e80950ab7fece57c38d426fcd23a729d9d7f4cf15bb63f94"
dependencies = [
"futures",
"rustc_version 0.3.3",
"tokio",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
......@@ -593,7 +604,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0"
dependencies = [
"rustc_version",
"rustc_version 0.2.3",
]
[[package]]
......@@ -1144,6 +1155,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-bincode",
"async_io_stream",
"bincode",
"dubp",
"duniter-bca-types",
......@@ -2406,7 +2418,7 @@ dependencies = [
"cslice",
"neon-build",
"neon-runtime",
"semver",
"semver 0.9.0",
]
[[package]]
......@@ -3228,7 +3240,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
"semver 0.9.0",
]
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
]
[[package]]
......@@ -3282,7 +3303,16 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
"semver-parser 0.7.0",
]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser 0.10.2",
]
[[package]]
......@@ -3291,6 +3321,15 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]]
name = "serde"
version = "1.0.124"
......@@ -3614,7 +3653,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14"
dependencies = [
"rustc_version",
"rustc_version 0.2.3",
"terminal_size",
]
......
......@@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
anyhow = "1.0.33"
async-bincode = "0.6.1"
async_io_stream = { version = "0.3.1", features = [ "tokio_io"] }
bincode = "1.3"
dubp = { version = "0.49.0", features = ["duniter"] }
duniter-bca-types = { path = "types", features = ["duniter"] }
......
......@@ -23,14 +23,13 @@
)]
mod exec_req_type;
mod utils;
const RESP_MIN_SIZE: usize = 64;
type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>;
use crate::exec_req_type::ExecReqTypeError;
use crate::utils::AsyncReader;
use async_bincode::AsyncBincodeReader;
use async_io_stream::IoStream;
use bincode::Options as _;
use dubp::crypto::keys::{ed25519::Ed25519KeyPair, Signator};
use duniter_bca_types::{
......@@ -68,14 +67,15 @@ pub fn set_bca_executor(
}
#[cfg(not(test))]
pub async fn execute<R: 'static + futures::AsyncRead + Send + Unpin>(
query_body_reader: R,
is_whitelisted: bool,
) -> Vec<u8> {
pub async fn execute<B, S>(query_body_stream: S, is_whitelisted: bool) -> Vec<u8>
where
B: AsRef<[u8]>,
S: 'static + TryStream<Ok = B, Error = std::io::Error> + Send + Unpin,
{
unsafe {
BCA_EXECUTOR
.get_unchecked()
.execute(query_body_reader, is_whitelisted)
.execute(query_body_stream, is_whitelisted)
.await
}
}
......@@ -90,13 +90,13 @@ struct BcaExecutor {
}
use uninit::extension_traits::VecCapacity;
impl BcaExecutor {
pub async fn execute<R: futures::AsyncRead + Send + Unpin>(
&self,
query_body_reader: R,
is_whitelisted: bool,
) -> Vec<u8> {
pub async fn execute<B, S>(&self, query_body_stream: S, is_whitelisted: bool) -> Vec<u8>
where
B: AsRef<[u8]>,
S: 'static + TryStream<Ok = B, Error = std::io::Error> + Send + Unpin,
{
let async_bincode_reader =
AsyncBincodeReader::<AsyncReader<R>, BcaReq>::from(AsyncReader(query_body_reader));
AsyncBincodeReader::<IoStream<S, B>, BcaReq>::from(IoStream::new(query_body_stream));
self.execute_inner(async_bincode_reader, is_whitelisted)
.await
.into_iter()
......@@ -268,6 +268,12 @@ mod tests {
})
}
pub(crate) fn io_stream<B: AsRef<[u8]>>(
bytes: B,
) -> impl TryStream<Ok = B, Error = std::io::Error> {
futures::stream::iter(std::iter::once(Ok(bytes)))
}
#[tokio::test]
async fn test_one_req_ok() -> Result<(), bincode::Error> {
let req = BcaReq::V0(BcaReqV0 {
......@@ -292,7 +298,7 @@ mod tests {
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await;
let bytes_res = bca_executor.execute(io_stream(bytes), false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, BcaReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..])
......@@ -330,7 +336,7 @@ mod tests {
create_bca_executor(MockDbsReader::new()).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await;
let bytes_res = bca_executor.execute(io_stream(bytes), false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, BcaReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..])
......@@ -374,7 +380,7 @@ mod tests {
let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor");
//println!("bytes={:?}", bytes);
let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await;
let bytes_res = bca_executor.execute(io_stream(bytes), false).await;
//println!("bytes_res={:?}", bytes_res);
let bca_res: Vec<Result<BcaResp, BcaReqExecError>> =
AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..])
......
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub(crate) struct AsyncReader<R: futures::AsyncRead + Unpin>(pub(crate) R);
impl<R: futures::AsyncRead + Unpin> tokio::io::AsyncRead for AsyncReader<R> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match <R as futures::AsyncRead>::poll_read(
std::pin::Pin::new(&mut self.0),
cx,
buf.initialize_unfilled(),
) {
std::task::Poll::Ready(res) => std::task::Poll::Ready(res.map(|n| buf.advance(n))),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
......@@ -84,7 +84,7 @@ pub enum BcaRespTypeV0 {
Pong,
}
pub type BcaResult = Vec<Result<BcaResp, BcaReqExecError>>;
pub type BcaResult = Result<BcaResp, BcaReqExecError>;
#[derive(Clone, Debug, Deserialize, Error, PartialEq, Eq, Serialize)]
pub enum BcaReqExecError {
......
......@@ -18,6 +18,8 @@ use std::{
time::Duration,
};
use bytes::Bytes;
use crate::anti_spam::{AntiSpam, AntiSpamResponse};
use crate::*;
......@@ -183,18 +185,17 @@ pub(crate) fn graphql(
.await,
))
} else {
let body_reader = futures::TryStreamExt::map_err(body, |err| {
let body_stream = futures::TryStreamExt::map_err(body, |err| {
std::io::Error::new(std::io::ErrorKind::Other, err)
})
.map_ok(|mut buf| {
let remaining = warp::Buf::remaining(&buf);
warp::Buf::copy_to_bytes(&mut buf, remaining)
})
.into_async_read();
});
if content_type.as_deref() == Some("application/bincode") {
tokio::time::timeout(
Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS),
process_bincode_batch_queries(body_reader, is_whitelisted),
process_bincode_batch_queries(body_stream, is_whitelisted),
)
.await
.map_err(|_| warp::reject::custom(ReqExecTooLong))?
......@@ -202,7 +203,7 @@ pub(crate) fn graphql(
tokio::time::timeout(
Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS),
process_json_batch_queries(
body_reader,
body_stream.into_async_read(),
content_type,
gva_schema,
is_whitelisted,
......@@ -223,7 +224,7 @@ pub(crate) fn graphql(
}
async fn process_bincode_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment