Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • nodes/rust/modules/duniter-gva
  • tuxmain/duniter-gva
  • aya/duniter-gva
3 results
Select Git revision
Loading items
Show changes
......@@ -37,7 +37,9 @@ pub fn build_schema_with_data(data: GvaSchemaData, logger: bool) -> GvaSchema {
mutations::MutationRoot::default(),
subscriptions::SubscriptionRoot::default(),
)
.data(data);
.data(data)
.validation_mode(async_graphql::ValidationMode::Fast)
.limit_depth(10);
if logger {
builder = builder.extension(async_graphql::extensions::Logger);
}
......
......@@ -18,8 +18,6 @@ use duniter_gva_dbs_reader::blocks_chunks::{CHUNK_FILE_EXT, CHUNK_FILE_PREFIX};
use flate2::write::ZlibEncoder;
use flate2::Compression;
const CHUNK_SIZE: u32 = 4_096;
pub fn apply_block_blocks_chunk<B: Backend>(
block: &DubpBlockV10,
gva_db: &GvaV1Db<B>,
......@@ -33,7 +31,7 @@ pub fn apply_block_blocks_chunk<B: Backend>(
GvaBlockDbV1(DubpBlock::V10(block.clone())),
)?;
if (block_number + 1) % CHUNK_SIZE == 0 {
if (block_number + 1) % BLOCKS_CHUNK_SIZE == 0 {
let current_chunk: Vec<GvaBlockDbV1> = gva_db
.current_blocks_chunk()
.iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?;
......@@ -41,7 +39,7 @@ pub fn apply_block_blocks_chunk<B: Backend>(
.serialize(&current_chunk)
.map_err(|e| KvError::DeserError(e.into()))?;
let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref());
let chunk_index = U32BE(block_number / CHUNK_SIZE);
let chunk_index = U32BE(block_number / BLOCKS_CHUNK_SIZE);
gva_db
.blocks_chunk_hash_write()
.upsert(chunk_index, HashDb(chunk_hash))?;
......@@ -66,9 +64,9 @@ pub fn revert_block_blocks_chunk<B: Backend>(
let block_number = block.number().0;
let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| {
if (block_number + 1) % CHUNK_SIZE == 0 {
if (block_number + 1) % BLOCKS_CHUNK_SIZE == 0 {
// Uncompress last compressed chunk and replace it in current chunk
let chunk_index = U32BE(block_number / CHUNK_SIZE);
let chunk_index = U32BE(block_number / BLOCKS_CHUNK_SIZE);
if let Some(current_chunk_bin) =
duniter_gva_dbs_reader::blocks_chunks::read_compressed_chunk(
chunk_index.0,
......@@ -81,7 +79,7 @@ pub fn revert_block_blocks_chunk<B: Backend>(
let current_chunk: Vec<GvaBlockDbV1> = bincode_db()
.deserialize(current_chunk_bin.as_ref())
.map_err(|e| KvError::DeserError(e.into()))?;
let current_chunk_begin = block_number - CHUNK_SIZE + 1;
let current_chunk_begin = block_number - BLOCKS_CHUNK_SIZE + 1;
for (i, block) in current_chunk.into_iter().enumerate() {
db.current_blocks_chunk
.upsert(U32BE(current_chunk_begin + i as u32), block);
......
......@@ -80,12 +80,13 @@ fn get_next_wot_id() -> WotId {
next_wot_id_.add_assign(1);
next_wot_id
} else {
NEXT_WOT_ID = Some(if let Some(main_wot) = MAIN_WOT.get() {
let next_wot_id = if let Some(main_wot) = MAIN_WOT.get() {
main_wot.read().size()
} else {
0
});
0
};
NEXT_WOT_ID = Some(next_wot_id + 1);
next_wot_id
}
})
}
......
......@@ -46,11 +46,17 @@ use duniter_gva_db::*;
use duniter_gva_gql::{GvaSchema, QueryContext};
use duniter_gva_indexer::{get_gva_db_ro, get_gva_db_rw};
use futures::{StreamExt, TryStreamExt};
use std::{convert::Infallible, path::Path};
use std::{
convert::Infallible,
path::{Path, PathBuf},
};
#[cfg(test)]
use tests::get_public_ips;
use warp::{http::Response as HttpResponse, Filter as _, Rejection};
const PLAYGROUND_SUB_PATH: &str = "playground";
const SUBSCRIPTION_SUB_PATH: &str = "subscription";
#[derive(Debug)]
pub struct GvaModule {
conf: GvaConf,
......@@ -59,6 +65,7 @@ pub struct GvaModule {
gva_db_ro: &'static GvaV1DbRo<FileBackend>,
mempools: Mempools,
mode: DuniterMode,
profile_path_opt: Option<PathBuf>,
self_keypair: Ed25519KeyPair,
software_version: &'static str,
}
......@@ -122,6 +129,7 @@ impl duniter_core::module::DuniterModule for GvaModule {
gva_db_ro: get_gva_db_ro(profile_path_opt),
mempools,
mode,
profile_path_opt: profile_path_opt.map(ToOwned::to_owned),
self_keypair,
software_version,
},
......@@ -139,6 +147,7 @@ impl duniter_core::module::DuniterModule for GvaModule {
gva_db_ro,
mempools,
mode,
profile_path_opt,
self_keypair,
software_version,
} = self;
......@@ -151,6 +160,7 @@ impl duniter_core::module::DuniterModule for GvaModule {
dbs_pool,
gva_db_ro,
mempools,
profile_path_opt,
self_keypair,
software_version,
)
......@@ -232,12 +242,14 @@ impl duniter_core::module::DuniterModule for GvaModule {
}
impl GvaModule {
#[allow(clippy::too_many_arguments)]
async fn start_inner(
conf: GvaConf,
currency: String,
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
gva_db_ro: &'static GvaV1DbRo<FileBackend>,
mempools: Mempools,
profile_path_opt: Option<PathBuf>,
self_keypair: Ed25519KeyPair,
software_version: &'static str,
) {
......@@ -250,6 +262,7 @@ impl GvaModule {
AsyncAccessor::new(),
dbs_pool.clone(),
duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
profile_path_opt,
self_keypair,
software_version,
mempools.txs,
......@@ -270,32 +283,16 @@ impl GvaModule {
);
// Create warp server routes
let graphql_post = warp_::graphql(
let gva_route = warp_::gva_route(
&conf,
gva_schema.clone(),
async_graphql::http::MultipartOptions::default(),
);
let gva_playground_route = warp_::gva_playground_route(&conf);
let gva_subscription_route = warp_::gva_subscription_route(&conf, gva_schema.clone());
let conf_clone = conf.clone();
let graphql_playground =
warp::path::path(conf.path.clone())
.and(warp::get())
.map(move || {
HttpResponse::builder()
.header("content-type", "text/html")
.body(async_graphql::http::playground_source(
GraphQLPlaygroundConfig::new(&format!("/{}", &conf_clone.path))
.subscription_endpoint(&format!(
"/{}",
&conf_clone.subscriptions_path,
)),
))
});
let routes = graphql_playground
.or(graphql_post)
.or(warp_::graphql_ws(&conf, gva_schema.clone()))
.recover(|err: Rejection| async move {
// Define recover function
let recover_func = |err: Rejection| async move {
if let Some(warp_::BadRequest(err)) = err.find() {
return Ok::<_, Infallible>(warp::reply::with_status(
err.to_string(),
......@@ -307,9 +304,32 @@ impl GvaModule {
"INTERNAL_SERVER_ERROR".to_string(),
http::StatusCode::INTERNAL_SERVER_ERROR,
))
});
};
// Start warp server
if conf.playground {
Self::run_warp_server(
&conf,
gva_route
.or(gva_subscription_route)
.or(gva_playground_route)
.recover(recover_func),
)
.await
} else {
Self::run_warp_server(
&conf,
gva_route.or(gva_subscription_route).recover(recover_func),
)
.await
}
}
async fn run_warp_server<F>(conf: &GvaConf, routes: F)
where
F: warp::Filter<Error = Infallible> + Clone + Send + Sync + 'static,
F::Extract: warp::Reply,
{
log::info!(
"GVA server listen on http://{}:{}/{}",
conf.ip4,
......@@ -318,7 +338,7 @@ impl GvaModule {
);
if let Some(ip6) = conf.ip6 {
log::info!(
"GVA server listen on http://{}:{}/{}",
"GVA server listen on http://[{}]:{}/{}",
ip6,
conf.port,
&conf.path
......@@ -375,17 +395,6 @@ impl GvaModule {
remote_port,
conf.get_remote_path(),
));
endpoints.push(format!(
"GVASUB {}{} {} {}",
if remote_port == 443 || conf.remote_tls.unwrap_or_default() {
"S "
} else {
""
},
remote_hosh,
remote_port,
conf.get_remote_subscriptions_path(),
));
}
Ok(endpoints)
}
......@@ -435,13 +444,7 @@ mod tests {
},
)
.await?;
assert_eq!(
endpoints,
vec![
"GVA 0.0.0.0 [::] 30901 gva".to_owned(),
"GVASUB 0.0.0.0 [::] 30901 gva-sub".to_owned()
]
);
assert_eq!(endpoints, vec!["GVA 0.0.0.0 [::] 30901 gva".to_owned(),]);
// only ip4 find
let endpoints = test_gen_endpoints(
......@@ -452,13 +455,7 @@ mod tests {
},
)
.await?;
assert_eq!(
endpoints,
vec![
"GVA 0.0.0.0 30901 gva".to_owned(),
"GVASUB 0.0.0.0 30901 gva-sub".to_owned()
]
);
assert_eq!(endpoints, vec!["GVA 0.0.0.0 30901 gva".to_owned(),]);
// only ip6 find
let endpoints = test_gen_endpoints(
......@@ -469,13 +466,7 @@ mod tests {
},
)
.await?;
assert_eq!(
endpoints,
vec![
"GVA [::] 30901 gva".to_owned(),
"GVASUB [::] 30901 gva-sub".to_owned()
]
);
assert_eq!(endpoints, vec!["GVA [::] 30901 gva".to_owned(),]);
// No ips find
assert!(test_gen_endpoints(
......
......@@ -142,7 +142,25 @@ fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql
}
}
pub(crate) fn graphql(
pub(crate) fn gva_playground_route(
conf: &GvaConf,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
let gva_path = conf.path.clone();
warp::path::path(gva_path.clone())
.and(warp::path::path(PLAYGROUND_SUB_PATH))
.and(warp::get())
.map(move || {
HttpResponse::builder()
.header("content-type", "text/html")
.body(async_graphql::http::playground_source(
GraphQLPlaygroundConfig::new(&format!("/{}", &gva_path)).subscription_endpoint(
&format!("/{}/{}", &gva_path, SUBSCRIPTION_SUB_PATH),
),
))
})
}
pub(crate) fn gva_route(
conf: &GvaConf,
gva_schema: GvaSchema,
opts: async_graphql::http::MultipartOptions,
......@@ -150,6 +168,7 @@ pub(crate) fn graphql(
let anti_spam = AntiSpam::from(conf);
let opts = Arc::new(opts);
warp::path::path(conf.path.clone())
.and(warp::path::end())
.and(warp::method())
.and(warp::query::raw().or(warp::any().map(String::new)).unify())
.and(warp::addr::remote())
......@@ -223,51 +242,13 @@ pub(crate) fn graphql(
)
}
async fn process_bincode_batch_queries(
body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
duniter_bda::execute(body_reader, is_whitelisted).await,
))
}
async fn process_json_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
content_type: Option<String>,
gva_schema: GvaSchema,
is_whitelisted: bool,
opts: async_graphql::http::MultipartOptions,
) -> Result<ServerResponse, warp::Rejection> {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
body_reader,
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok(ServerResponse::GraphQl(
batch_request
.data(QueryContext { is_whitelisted })
.execute(gva_schema)
.await,
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}
pub(crate) fn graphql_ws(
pub(crate) fn gva_subscription_route(
conf: &GvaConf,
schema: GvaSchema,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
let anti_spam = AntiSpam::from(conf);
warp::path::path(conf.subscriptions_path.clone())
warp::path::path(conf.path.clone())
.and(warp::path::path(SUBSCRIPTION_SUB_PATH))
.and(warp::addr::remote())
.and(warp::header::optional::<IpAddr>("X-Real-IP"))
.and(warp::ws())
......@@ -326,3 +307,42 @@ pub(crate) fn graphql_ws(
)))
})
}
async fn process_bincode_batch_queries(
body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
duniter_bda::execute(body_reader, is_whitelisted).await,
))
}
async fn process_json_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
content_type: Option<String>,
gva_schema: GvaSchema,
is_whitelisted: bool,
opts: async_graphql::http::MultipartOptions,
) -> Result<ServerResponse, warp::Rejection> {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
body_reader,
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok(ServerResponse::GraphQl(
batch_request
.data(QueryContext { is_whitelisted })
.execute(gva_schema)
.await,
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}