Skip to content
Snippets Groups Projects
Commit b9ea8da3 authored by Éloïs's avatar Éloïs
Browse files

Merge branch 'gva-batch' into 'dev'

[feat] gva: support batch requests

See merge request !1355
parents 235ddcef da32b48b
No related branches found
No related tags found
1 merge request!1355[feat] gva: support batch requests
......@@ -23,10 +23,12 @@ use std::{
time::Instant,
};
const COUNT_INTERVAL: usize = 40;
pub(super) const MAX_BATCH_SIZE: usize = 5;
const COUNT_INTERVAL: usize = 10;
const MIN_DURATION_INTERVAL: Duration = Duration::from_secs(20);
const LARGE_DURATION_INTERVAL: Duration = Duration::from_secs(180);
const REDUCED_COUNT_INTERVAL: usize = COUNT_INTERVAL - 5;
const REDUCED_COUNT_INTERVAL: usize = COUNT_INTERVAL / 2;
const MAX_BAN_COUNT: usize = 16;
const BAN_FORGET_MIN_DURATION: Duration = Duration::from_secs(180);
......
......@@ -28,7 +28,57 @@ impl std::fmt::Debug for BadRequest {
impl warp::reject::Reject for BadRequest {}
struct GraphQlResponse(async_graphql::Response);
struct GraphQlRequest {
inner: async_graphql::BatchRequest,
}
impl GraphQlRequest {
fn data<D: std::any::Any + Copy + Send + Sync>(self, data: D) -> Self {
match self.inner {
async_graphql::BatchRequest::Single(request) => {
Self::new(async_graphql::BatchRequest::Single(request.data(data)))
}
async_graphql::BatchRequest::Batch(requests) => {
Self::new(async_graphql::BatchRequest::Batch(
requests.into_iter().map(|req| req.data(data)).collect(),
))
}
}
}
#[allow(clippy::from_iter_instead_of_collect)]
async fn execute(self, schema: GvaSchema) -> async_graphql::BatchResponse {
use std::iter::FromIterator as _;
match self.inner {
async_graphql::BatchRequest::Single(request) => {
async_graphql::BatchResponse::Single(schema.execute(request).await)
}
async_graphql::BatchRequest::Batch(requests) => async_graphql::BatchResponse::Batch(
futures::stream::FuturesOrdered::from_iter(
requests
.into_iter()
.zip(std::iter::repeat(schema))
.map(|(request, schema)| async move { schema.execute(request).await }),
)
.collect()
.await,
),
}
}
fn len(&self) -> usize {
match &self.inner {
async_graphql::BatchRequest::Single(_) => 1,
async_graphql::BatchRequest::Batch(requests) => requests.len(),
}
}
fn new(inner: async_graphql::BatchRequest) -> Self {
Self { inner }
}
fn single(request: async_graphql::Request) -> Self {
Self::new(async_graphql::BatchRequest::Single(request))
}
}
struct GraphQlResponse(async_graphql::BatchResponse);
impl warp::reply::Reply for GraphQlResponse {
fn into_response(self) -> warp::reply::Response {
let mut resp = warp::reply::with_header(
......@@ -37,11 +87,25 @@ impl warp::reply::Reply for GraphQlResponse {
"application/json",
)
.into_response();
add_cache_control(&mut resp, &self.0);
add_cache_control_batch(&mut resp, &self.0);
resp
}
}
fn add_cache_control_batch(
http_resp: &mut warp::reply::Response,
batch_resp: &async_graphql::BatchResponse,
) {
match batch_resp {
async_graphql::BatchResponse::Single(resp) => add_cache_control(http_resp, resp),
async_graphql::BatchResponse::Batch(resps) => {
for resp in resps {
add_cache_control(http_resp, resp)
}
}
}
}
fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql::Response) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control.value() {
......@@ -89,9 +153,13 @@ pub(crate) fn graphql(
if method == http::Method::GET {
let request: async_graphql::Request = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request.data(QueryContext { is_whitelisted })))
Ok::<_, Rejection>((
schema,
GraphQlRequest::single(request.data(QueryContext { is_whitelisted })),
))
} else {
let request = async_graphql::http::receive_body(
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
futures::TryStreamExt::map_err(body, |err| {
std::io::Error::new(std::io::ErrorKind::Other, err)
......@@ -104,8 +172,18 @@ pub(crate) fn graphql(
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request.data(QueryContext { is_whitelisted })))
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok::<_, Rejection>((
schema,
batch_request.data(QueryContext { is_whitelisted }),
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
......@@ -115,8 +193,8 @@ pub(crate) fn graphql(
},
)
.and_then(
|(schema, request): (GvaSchema, async_graphql::Request)| async move {
Ok::<_, Infallible>(GraphQlResponse(schema.execute(request).await))
|(schema, batch_requests): (GvaSchema, GraphQlRequest)| async move {
Ok::<_, Infallible>(GraphQlResponse(batch_requests.execute(schema).await))
},
)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment