diff --git a/Cargo.lock b/Cargo.lock index 8a925ab9a54f3ab31fbdedafd3c23997159842bd..f9ec290968ec6de3070b3bea1cf4d7f70a351aae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -348,6 +348,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async_io_stream" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d5ad740b7193a31e80950ab7fece57c38d426fcd23a729d9d7f4cf15bb63f94" +dependencies = [ + "futures", + "rustc_version 0.3.3", + "tokio", +] + [[package]] name = "atomic-waker" version = "1.0.0" @@ -593,7 +604,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0" dependencies = [ - "rustc_version", + "rustc_version 0.2.3", ] [[package]] @@ -1144,6 +1155,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-bincode", + "async_io_stream", "bincode", "dubp", "duniter-bca-types", @@ -2406,7 +2418,7 @@ dependencies = [ "cslice", "neon-build", "neon-runtime", - "semver", + "semver 0.9.0", ] [[package]] @@ -3228,7 +3240,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" dependencies = [ - "semver", + "semver 0.9.0", +] + +[[package]] +name = "rustc_version" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" +dependencies = [ + "semver 0.11.0", ] [[package]] @@ -3282,7 +3303,16 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" dependencies = [ - "semver-parser", + "semver-parser 0.7.0", +] + +[[package]] +name = "semver" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" +dependencies = [ + "semver-parser 0.10.2", ] [[package]] @@ -3291,6 +3321,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +[[package]] +name = "semver-parser" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +dependencies = [ + "pest", +] + [[package]] name = "serde" version = "1.0.124" @@ -3614,7 +3653,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6611ecf7fedefdb0f74d6194da1918f15c00ef97ae4bbd1f60a9c7ca2ae0eb14" dependencies = [ - "rustc_version", + "rustc_version 0.2.3", "terminal_size", ] diff --git a/rust-libs/modules/gva/bca/Cargo.toml b/rust-libs/modules/gva/bca/Cargo.toml index 79b5e05d877fea325ccce4fe020e5f8eaef833da..f4f6b1749eba6012060814430a98ec4f9ebee5fe 100644 --- a/rust-libs/modules/gva/bca/Cargo.toml +++ b/rust-libs/modules/gva/bca/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] anyhow = "1.0.33" async-bincode = "0.6.1" +async_io_stream = { version = "0.3.1", features = [ "tokio_io"] } bincode = "1.3" dubp = { version = "0.49.0", features = ["duniter"] } duniter-bca-types = { path = "types", features = ["duniter"] } diff --git a/rust-libs/modules/gva/bca/src/lib.rs b/rust-libs/modules/gva/bca/src/lib.rs index e615dbd1135f8c9b4ee2075fd1a606baca4b846a..91a8ba1b6bf5f3602cc6148c9a6c1900f868f11c 100644 --- a/rust-libs/modules/gva/bca/src/lib.rs +++ b/rust-libs/modules/gva/bca/src/lib.rs @@ -23,14 +23,13 @@ )] mod exec_req_type; -mod utils; const RESP_MIN_SIZE: usize = 64; type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>; use crate::exec_req_type::ExecReqTypeError; -use crate::utils::AsyncReader; use async_bincode::AsyncBincodeReader; +use async_io_stream::IoStream; use bincode::Options as _; use dubp::crypto::keys::{ed25519::Ed25519KeyPair, Signator}; use duniter_bca_types::{ @@ -68,14 +67,15 @@ pub fn set_bca_executor( } #[cfg(not(test))] -pub async fn execute<R: 'static + futures::AsyncRead + Send + Unpin>( - query_body_reader: R, - is_whitelisted: bool, -) -> Vec<u8> { +pub async fn execute<B, S>(query_body_stream: S, is_whitelisted: bool) -> Vec<u8> +where + B: AsRef<[u8]>, + S: 'static + TryStream<Ok = B, Error = std::io::Error> + Send + Unpin, +{ unsafe { BCA_EXECUTOR .get_unchecked() - .execute(query_body_reader, is_whitelisted) + .execute(query_body_stream, is_whitelisted) .await } } @@ -90,13 +90,13 @@ struct BcaExecutor { } use uninit::extension_traits::VecCapacity; impl BcaExecutor { - pub async fn execute<R: futures::AsyncRead + Send + Unpin>( - &self, - query_body_reader: R, - is_whitelisted: bool, - ) -> Vec<u8> { + pub async fn execute<B, S>(&self, query_body_stream: S, is_whitelisted: bool) -> Vec<u8> + where + B: AsRef<[u8]>, + S: 'static + TryStream<Ok = B, Error = std::io::Error> + Send + Unpin, + { let async_bincode_reader = - AsyncBincodeReader::<AsyncReader<R>, BcaReq>::from(AsyncReader(query_body_reader)); + AsyncBincodeReader::<IoStream<S, B>, BcaReq>::from(IoStream::new(query_body_stream)); self.execute_inner(async_bincode_reader, is_whitelisted) .await .into_iter() @@ -268,6 +268,12 @@ mod tests { }) } + pub(crate) fn io_stream<B: AsRef<[u8]>>( + bytes: B, + ) -> impl TryStream<Ok = B, Error = std::io::Error> { + futures::stream::iter(std::iter::once(Ok(bytes))) + } + #[tokio::test] async fn test_one_req_ok() -> Result<(), bincode::Error> { let req = BcaReq::V0(BcaReqV0 { @@ -292,7 +298,7 @@ mod tests { let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); //println!("bytes={:?}", bytes); - let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await; + let bytes_res = bca_executor.execute(io_stream(bytes), false).await; //println!("bytes_res={:?}", bytes_res); let bca_res: Vec<Result<BcaResp, BcaReqExecError>> = AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..]) @@ -330,7 +336,7 @@ mod tests { create_bca_executor(MockDbsReader::new()).expect("fail to create bca executor"); //println!("bytes={:?}", bytes); - let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await; + let bytes_res = bca_executor.execute(io_stream(bytes), false).await; //println!("bytes_res={:?}", bytes_res); let bca_res: Vec<Result<BcaResp, BcaReqExecError>> = AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..]) @@ -374,7 +380,7 @@ mod tests { let bca_executor = create_bca_executor(dbs_reader).expect("fail to create bca executor"); //println!("bytes={:?}", bytes); - let bytes_res = bca_executor.execute::<&[u8]>(&bytes[..], false).await; + let bytes_res = bca_executor.execute(io_stream(bytes), false).await; //println!("bytes_res={:?}", bytes_res); let bca_res: Vec<Result<BcaResp, BcaReqExecError>> = AsyncBincodeReader::<_, Result<BcaResp, BcaReqExecError>>::from(&bytes_res[..]) diff --git a/rust-libs/modules/gva/bca/src/utils.rs b/rust-libs/modules/gva/bca/src/utils.rs deleted file mode 100644 index 88a4e9b556de5222477be11ee250f80e713a271f..0000000000000000000000000000000000000000 --- a/rust-libs/modules/gva/bca/src/utils.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (C) 2020 Éloïs SANCHEZ. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -pub(crate) struct AsyncReader<R: futures::AsyncRead + Unpin>(pub(crate) R); - -impl<R: futures::AsyncRead + Unpin> tokio::io::AsyncRead for AsyncReader<R> { - fn poll_read( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll<std::io::Result<()>> { - match <R as futures::AsyncRead>::poll_read( - std::pin::Pin::new(&mut self.0), - cx, - buf.initialize_unfilled(), - ) { - std::task::Poll::Ready(res) => std::task::Poll::Ready(res.map(|n| buf.advance(n))), - std::task::Poll::Pending => std::task::Poll::Pending, - } - } -} diff --git a/rust-libs/modules/gva/bca/types/src/lib.rs b/rust-libs/modules/gva/bca/types/src/lib.rs index fd00de4f3a4a8debda71f02fdc0b49dcc7b1aa3d..7e71520856449e195019e4e6788cb656642f1c99 100644 --- a/rust-libs/modules/gva/bca/types/src/lib.rs +++ b/rust-libs/modules/gva/bca/types/src/lib.rs @@ -84,7 +84,7 @@ pub enum BcaRespTypeV0 { Pong, } -pub type BcaResult = Vec<Result<BcaResp, BcaReqExecError>>; +pub type BcaResult = Result<BcaResp, BcaReqExecError>; #[derive(Clone, Debug, Deserialize, Error, PartialEq, Eq, Serialize)] pub enum BcaReqExecError { diff --git a/rust-libs/modules/gva/src/warp_.rs b/rust-libs/modules/gva/src/warp_.rs index 0758d843dc0651b5cb2166d8bccca3957fc2b0b9..2673a34fc87a3e555092c2f8f48acee001ee91c1 100644 --- a/rust-libs/modules/gva/src/warp_.rs +++ b/rust-libs/modules/gva/src/warp_.rs @@ -18,6 +18,8 @@ use std::{ time::Duration, }; +use bytes::Bytes; + use crate::anti_spam::{AntiSpam, AntiSpamResponse}; use crate::*; @@ -183,18 +185,17 @@ pub(crate) fn graphql( .await, )) } else { - let body_reader = futures::TryStreamExt::map_err(body, |err| { + let body_stream = futures::TryStreamExt::map_err(body, |err| { std::io::Error::new(std::io::ErrorKind::Other, err) }) .map_ok(|mut buf| { let remaining = warp::Buf::remaining(&buf); warp::Buf::copy_to_bytes(&mut buf, remaining) - }) - .into_async_read(); + }); if content_type.as_deref() == Some("application/bincode") { tokio::time::timeout( Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS), - process_bincode_batch_queries(body_reader, is_whitelisted), + process_bincode_batch_queries(body_stream, is_whitelisted), ) .await .map_err(|_| warp::reject::custom(ReqExecTooLong))? @@ -202,7 +203,7 @@ pub(crate) fn graphql( tokio::time::timeout( Duration::from_millis(MAX_BATCH_REQ_PROCESS_DURATION_IN_MILLIS), process_json_batch_queries( - body_reader, + body_stream.into_async_read(), content_type, gva_schema, is_whitelisted, @@ -223,7 +224,7 @@ pub(crate) fn graphql( } async fn process_bincode_batch_queries( - body_reader: impl 'static + futures::AsyncRead + Send + Unpin, + body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin, is_whitelisted: bool, ) -> Result<ServerResponse, warp::Rejection> { Ok(ServerResponse::Bincode(