Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • duniter-v2s-issue-123-industrialize-releases
  • feat/1/increase-antispam-limits
  • fix/1/add_get_transactions_for_bma
  • fix/1/test-cgeek
  • master
5 results

Target

Select target project
  • nodes/rust/modules/duniter-gva
  • tuxmain/duniter-gva
  • aya/duniter-gva
3 results
Select Git revision
  • certifications
  • edition_cleanup
  • idty_by_username
  • master
4 results
Show changes
...@@ -37,7 +37,9 @@ pub fn build_schema_with_data(data: GvaSchemaData, logger: bool) -> GvaSchema { ...@@ -37,7 +37,9 @@ pub fn build_schema_with_data(data: GvaSchemaData, logger: bool) -> GvaSchema {
mutations::MutationRoot::default(), mutations::MutationRoot::default(),
subscriptions::SubscriptionRoot::default(), subscriptions::SubscriptionRoot::default(),
) )
.data(data); .data(data)
.validation_mode(async_graphql::ValidationMode::Fast)
.limit_depth(10);
if logger { if logger {
builder = builder.extension(async_graphql::extensions::Logger); builder = builder.extension(async_graphql::extensions::Logger);
} }
......
...@@ -18,8 +18,6 @@ use duniter_gva_dbs_reader::blocks_chunks::{CHUNK_FILE_EXT, CHUNK_FILE_PREFIX}; ...@@ -18,8 +18,6 @@ use duniter_gva_dbs_reader::blocks_chunks::{CHUNK_FILE_EXT, CHUNK_FILE_PREFIX};
use flate2::write::ZlibEncoder; use flate2::write::ZlibEncoder;
use flate2::Compression; use flate2::Compression;
const CHUNK_SIZE: u32 = 4_096;
pub fn apply_block_blocks_chunk<B: Backend>( pub fn apply_block_blocks_chunk<B: Backend>(
block: &DubpBlockV10, block: &DubpBlockV10,
gva_db: &GvaV1Db<B>, gva_db: &GvaV1Db<B>,
...@@ -33,7 +31,7 @@ pub fn apply_block_blocks_chunk<B: Backend>( ...@@ -33,7 +31,7 @@ pub fn apply_block_blocks_chunk<B: Backend>(
GvaBlockDbV1(DubpBlock::V10(block.clone())), GvaBlockDbV1(DubpBlock::V10(block.clone())),
)?; )?;
if (block_number + 1) % CHUNK_SIZE == 0 { if (block_number + 1) % BLOCKS_CHUNK_SIZE == 0 {
let current_chunk: Vec<GvaBlockDbV1> = gva_db let current_chunk: Vec<GvaBlockDbV1> = gva_db
.current_blocks_chunk() .current_blocks_chunk()
.iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?; .iter(.., |it| it.values().collect::<Result<Vec<_>, _>>())?;
...@@ -41,7 +39,7 @@ pub fn apply_block_blocks_chunk<B: Backend>( ...@@ -41,7 +39,7 @@ pub fn apply_block_blocks_chunk<B: Backend>(
.serialize(&current_chunk) .serialize(&current_chunk)
.map_err(|e| KvError::DeserError(e.into()))?; .map_err(|e| KvError::DeserError(e.into()))?;
let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref()); let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref());
let chunk_index = U32BE(block_number / CHUNK_SIZE); let chunk_index = U32BE(block_number / BLOCKS_CHUNK_SIZE);
gva_db gva_db
.blocks_chunk_hash_write() .blocks_chunk_hash_write()
.upsert(chunk_index, HashDb(chunk_hash))?; .upsert(chunk_index, HashDb(chunk_hash))?;
...@@ -66,9 +64,9 @@ pub fn revert_block_blocks_chunk<B: Backend>( ...@@ -66,9 +64,9 @@ pub fn revert_block_blocks_chunk<B: Backend>(
let block_number = block.number().0; let block_number = block.number().0;
let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks"); let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks");
gva_db.write(|mut db| { gva_db.write(|mut db| {
if (block_number + 1) % CHUNK_SIZE == 0 { if (block_number + 1) % BLOCKS_CHUNK_SIZE == 0 {
// Uncompress last compressed chunk and replace it in current chunk // Uncompress last compressed chunk and replace it in current chunk
let chunk_index = U32BE(block_number / CHUNK_SIZE); let chunk_index = U32BE(block_number / BLOCKS_CHUNK_SIZE);
if let Some(current_chunk_bin) = if let Some(current_chunk_bin) =
duniter_gva_dbs_reader::blocks_chunks::read_compressed_chunk( duniter_gva_dbs_reader::blocks_chunks::read_compressed_chunk(
chunk_index.0, chunk_index.0,
...@@ -81,7 +79,7 @@ pub fn revert_block_blocks_chunk<B: Backend>( ...@@ -81,7 +79,7 @@ pub fn revert_block_blocks_chunk<B: Backend>(
let current_chunk: Vec<GvaBlockDbV1> = bincode_db() let current_chunk: Vec<GvaBlockDbV1> = bincode_db()
.deserialize(current_chunk_bin.as_ref()) .deserialize(current_chunk_bin.as_ref())
.map_err(|e| KvError::DeserError(e.into()))?; .map_err(|e| KvError::DeserError(e.into()))?;
let current_chunk_begin = block_number - CHUNK_SIZE + 1; let current_chunk_begin = block_number - BLOCKS_CHUNK_SIZE + 1;
for (i, block) in current_chunk.into_iter().enumerate() { for (i, block) in current_chunk.into_iter().enumerate() {
db.current_blocks_chunk db.current_blocks_chunk
.upsert(U32BE(current_chunk_begin + i as u32), block); .upsert(U32BE(current_chunk_begin + i as u32), block);
......
...@@ -80,12 +80,13 @@ fn get_next_wot_id() -> WotId { ...@@ -80,12 +80,13 @@ fn get_next_wot_id() -> WotId {
next_wot_id_.add_assign(1); next_wot_id_.add_assign(1);
next_wot_id next_wot_id
} else { } else {
NEXT_WOT_ID = Some(if let Some(main_wot) = MAIN_WOT.get() { let next_wot_id = if let Some(main_wot) = MAIN_WOT.get() {
main_wot.read().size() main_wot.read().size()
} else { } else {
0 0
}); };
0 NEXT_WOT_ID = Some(next_wot_id + 1);
next_wot_id
} }
}) })
} }
......
...@@ -46,11 +46,17 @@ use duniter_gva_db::*; ...@@ -46,11 +46,17 @@ use duniter_gva_db::*;
use duniter_gva_gql::{GvaSchema, QueryContext}; use duniter_gva_gql::{GvaSchema, QueryContext};
use duniter_gva_indexer::{get_gva_db_ro, get_gva_db_rw}; use duniter_gva_indexer::{get_gva_db_ro, get_gva_db_rw};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use std::{convert::Infallible, path::Path}; use std::{
convert::Infallible,
path::{Path, PathBuf},
};
#[cfg(test)] #[cfg(test)]
use tests::get_public_ips; use tests::get_public_ips;
use warp::{http::Response as HttpResponse, Filter as _, Rejection}; use warp::{http::Response as HttpResponse, Filter as _, Rejection};
const PLAYGROUND_SUB_PATH: &str = "playground";
const SUBSCRIPTION_SUB_PATH: &str = "subscription";
#[derive(Debug)] #[derive(Debug)]
pub struct GvaModule { pub struct GvaModule {
conf: GvaConf, conf: GvaConf,
...@@ -59,6 +65,7 @@ pub struct GvaModule { ...@@ -59,6 +65,7 @@ pub struct GvaModule {
gva_db_ro: &'static GvaV1DbRo<FileBackend>, gva_db_ro: &'static GvaV1DbRo<FileBackend>,
mempools: Mempools, mempools: Mempools,
mode: DuniterMode, mode: DuniterMode,
profile_path_opt: Option<PathBuf>,
self_keypair: Ed25519KeyPair, self_keypair: Ed25519KeyPair,
software_version: &'static str, software_version: &'static str,
} }
...@@ -122,6 +129,7 @@ impl duniter_core::module::DuniterModule for GvaModule { ...@@ -122,6 +129,7 @@ impl duniter_core::module::DuniterModule for GvaModule {
gva_db_ro: get_gva_db_ro(profile_path_opt), gva_db_ro: get_gva_db_ro(profile_path_opt),
mempools, mempools,
mode, mode,
profile_path_opt: profile_path_opt.map(ToOwned::to_owned),
self_keypair, self_keypair,
software_version, software_version,
}, },
...@@ -139,6 +147,7 @@ impl duniter_core::module::DuniterModule for GvaModule { ...@@ -139,6 +147,7 @@ impl duniter_core::module::DuniterModule for GvaModule {
gva_db_ro, gva_db_ro,
mempools, mempools,
mode, mode,
profile_path_opt,
self_keypair, self_keypair,
software_version, software_version,
} = self; } = self;
...@@ -151,6 +160,7 @@ impl duniter_core::module::DuniterModule for GvaModule { ...@@ -151,6 +160,7 @@ impl duniter_core::module::DuniterModule for GvaModule {
dbs_pool, dbs_pool,
gva_db_ro, gva_db_ro,
mempools, mempools,
profile_path_opt,
self_keypair, self_keypair,
software_version, software_version,
) )
...@@ -232,12 +242,14 @@ impl duniter_core::module::DuniterModule for GvaModule { ...@@ -232,12 +242,14 @@ impl duniter_core::module::DuniterModule for GvaModule {
} }
impl GvaModule { impl GvaModule {
#[allow(clippy::too_many_arguments)]
async fn start_inner( async fn start_inner(
conf: GvaConf, conf: GvaConf,
currency: String, currency: String,
dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
gva_db_ro: &'static GvaV1DbRo<FileBackend>, gva_db_ro: &'static GvaV1DbRo<FileBackend>,
mempools: Mempools, mempools: Mempools,
profile_path_opt: Option<PathBuf>,
self_keypair: Ed25519KeyPair, self_keypair: Ed25519KeyPair,
software_version: &'static str, software_version: &'static str,
) { ) {
...@@ -250,6 +262,7 @@ impl GvaModule { ...@@ -250,6 +262,7 @@ impl GvaModule {
AsyncAccessor::new(), AsyncAccessor::new(),
dbs_pool.clone(), dbs_pool.clone(),
duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
profile_path_opt,
self_keypair, self_keypair,
software_version, software_version,
mempools.txs, mempools.txs,
...@@ -270,32 +283,16 @@ impl GvaModule { ...@@ -270,32 +283,16 @@ impl GvaModule {
); );
// Create warp server routes // Create warp server routes
let graphql_post = warp_::graphql( let gva_route = warp_::gva_route(
&conf, &conf,
gva_schema.clone(), gva_schema.clone(),
async_graphql::http::MultipartOptions::default(), async_graphql::http::MultipartOptions::default(),
); );
let gva_playground_route = warp_::gva_playground_route(&conf);
let gva_subscription_route = warp_::gva_subscription_route(&conf, gva_schema.clone());
let conf_clone = conf.clone(); // Define recover function
let graphql_playground = let recover_func = |err: Rejection| async move {
warp::path::path(conf.path.clone())
.and(warp::get())
.map(move || {
HttpResponse::builder()
.header("content-type", "text/html")
.body(async_graphql::http::playground_source(
GraphQLPlaygroundConfig::new(&format!("/{}", &conf_clone.path))
.subscription_endpoint(&format!(
"/{}",
&conf_clone.subscriptions_path,
)),
))
});
let routes = graphql_playground
.or(graphql_post)
.or(warp_::graphql_ws(&conf, gva_schema.clone()))
.recover(|err: Rejection| async move {
if let Some(warp_::BadRequest(err)) = err.find() { if let Some(warp_::BadRequest(err)) = err.find() {
return Ok::<_, Infallible>(warp::reply::with_status( return Ok::<_, Infallible>(warp::reply::with_status(
err.to_string(), err.to_string(),
...@@ -307,9 +304,32 @@ impl GvaModule { ...@@ -307,9 +304,32 @@ impl GvaModule {
"INTERNAL_SERVER_ERROR".to_string(), "INTERNAL_SERVER_ERROR".to_string(),
http::StatusCode::INTERNAL_SERVER_ERROR, http::StatusCode::INTERNAL_SERVER_ERROR,
)) ))
}); };
// Start warp server // Start warp server
if conf.playground {
Self::run_warp_server(
&conf,
gva_route
.or(gva_subscription_route)
.or(gva_playground_route)
.recover(recover_func),
)
.await
} else {
Self::run_warp_server(
&conf,
gva_route.or(gva_subscription_route).recover(recover_func),
)
.await
}
}
async fn run_warp_server<F>(conf: &GvaConf, routes: F)
where
F: warp::Filter<Error = Infallible> + Clone + Send + Sync + 'static,
F::Extract: warp::Reply,
{
log::info!( log::info!(
"GVA server listen on http://{}:{}/{}", "GVA server listen on http://{}:{}/{}",
conf.ip4, conf.ip4,
...@@ -318,7 +338,7 @@ impl GvaModule { ...@@ -318,7 +338,7 @@ impl GvaModule {
); );
if let Some(ip6) = conf.ip6 { if let Some(ip6) = conf.ip6 {
log::info!( log::info!(
"GVA server listen on http://{}:{}/{}", "GVA server listen on http://[{}]:{}/{}",
ip6, ip6,
conf.port, conf.port,
&conf.path &conf.path
...@@ -375,17 +395,6 @@ impl GvaModule { ...@@ -375,17 +395,6 @@ impl GvaModule {
remote_port, remote_port,
conf.get_remote_path(), conf.get_remote_path(),
)); ));
endpoints.push(format!(
"GVASUB {}{} {} {}",
if remote_port == 443 || conf.remote_tls.unwrap_or_default() {
"S "
} else {
""
},
remote_hosh,
remote_port,
conf.get_remote_subscriptions_path(),
));
} }
Ok(endpoints) Ok(endpoints)
} }
...@@ -435,13 +444,7 @@ mod tests { ...@@ -435,13 +444,7 @@ mod tests {
}, },
) )
.await?; .await?;
assert_eq!( assert_eq!(endpoints, vec!["GVA 0.0.0.0 [::] 30901 gva".to_owned(),]);
endpoints,
vec![
"GVA 0.0.0.0 [::] 30901 gva".to_owned(),
"GVASUB 0.0.0.0 [::] 30901 gva-sub".to_owned()
]
);
// only ip4 find // only ip4 find
let endpoints = test_gen_endpoints( let endpoints = test_gen_endpoints(
...@@ -452,13 +455,7 @@ mod tests { ...@@ -452,13 +455,7 @@ mod tests {
}, },
) )
.await?; .await?;
assert_eq!( assert_eq!(endpoints, vec!["GVA 0.0.0.0 30901 gva".to_owned(),]);
endpoints,
vec![
"GVA 0.0.0.0 30901 gva".to_owned(),
"GVASUB 0.0.0.0 30901 gva-sub".to_owned()
]
);
// only ip6 find // only ip6 find
let endpoints = test_gen_endpoints( let endpoints = test_gen_endpoints(
...@@ -469,13 +466,7 @@ mod tests { ...@@ -469,13 +466,7 @@ mod tests {
}, },
) )
.await?; .await?;
assert_eq!( assert_eq!(endpoints, vec!["GVA [::] 30901 gva".to_owned(),]);
endpoints,
vec![
"GVA [::] 30901 gva".to_owned(),
"GVASUB [::] 30901 gva-sub".to_owned()
]
);
// No ips find // No ips find
assert!(test_gen_endpoints( assert!(test_gen_endpoints(
......
...@@ -142,7 +142,25 @@ fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql ...@@ -142,7 +142,25 @@ fn add_cache_control(http_resp: &mut warp::reply::Response, resp: &async_graphql
} }
} }
pub(crate) fn graphql( pub(crate) fn gva_playground_route(
conf: &GvaConf,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
let gva_path = conf.path.clone();
warp::path::path(gva_path.clone())
.and(warp::path::path(PLAYGROUND_SUB_PATH))
.and(warp::get())
.map(move || {
HttpResponse::builder()
.header("content-type", "text/html")
.body(async_graphql::http::playground_source(
GraphQLPlaygroundConfig::new(&format!("/{}", &gva_path)).subscription_endpoint(
&format!("/{}/{}", &gva_path, SUBSCRIPTION_SUB_PATH),
),
))
})
}
pub(crate) fn gva_route(
conf: &GvaConf, conf: &GvaConf,
gva_schema: GvaSchema, gva_schema: GvaSchema,
opts: async_graphql::http::MultipartOptions, opts: async_graphql::http::MultipartOptions,
...@@ -150,6 +168,7 @@ pub(crate) fn graphql( ...@@ -150,6 +168,7 @@ pub(crate) fn graphql(
let anti_spam = AntiSpam::from(conf); let anti_spam = AntiSpam::from(conf);
let opts = Arc::new(opts); let opts = Arc::new(opts);
warp::path::path(conf.path.clone()) warp::path::path(conf.path.clone())
.and(warp::path::end())
.and(warp::method()) .and(warp::method())
.and(warp::query::raw().or(warp::any().map(String::new)).unify()) .and(warp::query::raw().or(warp::any().map(String::new)).unify())
.and(warp::addr::remote()) .and(warp::addr::remote())
...@@ -223,51 +242,13 @@ pub(crate) fn graphql( ...@@ -223,51 +242,13 @@ pub(crate) fn graphql(
) )
} }
async fn process_bincode_batch_queries( pub(crate) fn gva_subscription_route(
body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
duniter_bda::execute(body_reader, is_whitelisted).await,
))
}
async fn process_json_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
content_type: Option<String>,
gva_schema: GvaSchema,
is_whitelisted: bool,
opts: async_graphql::http::MultipartOptions,
) -> Result<ServerResponse, warp::Rejection> {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
body_reader,
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok(ServerResponse::GraphQl(
batch_request
.data(QueryContext { is_whitelisted })
.execute(gva_schema)
.await,
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}
pub(crate) fn graphql_ws(
conf: &GvaConf, conf: &GvaConf,
schema: GvaSchema, schema: GvaSchema,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone { ) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
let anti_spam = AntiSpam::from(conf); let anti_spam = AntiSpam::from(conf);
warp::path::path(conf.subscriptions_path.clone()) warp::path::path(conf.path.clone())
.and(warp::path::path(SUBSCRIPTION_SUB_PATH))
.and(warp::addr::remote()) .and(warp::addr::remote())
.and(warp::header::optional::<IpAddr>("X-Real-IP")) .and(warp::header::optional::<IpAddr>("X-Real-IP"))
.and(warp::ws()) .and(warp::ws())
...@@ -326,3 +307,42 @@ pub(crate) fn graphql_ws( ...@@ -326,3 +307,42 @@ pub(crate) fn graphql_ws(
))) )))
}) })
} }
async fn process_bincode_batch_queries(
body_reader: impl 'static + futures::TryStream<Ok = Bytes, Error = std::io::Error> + Send + Unpin,
is_whitelisted: bool,
) -> Result<ServerResponse, warp::Rejection> {
Ok(ServerResponse::Bincode(
duniter_bda::execute(body_reader, is_whitelisted).await,
))
}
async fn process_json_batch_queries(
body_reader: impl 'static + futures::AsyncRead + Send + Unpin,
content_type: Option<String>,
gva_schema: GvaSchema,
is_whitelisted: bool,
opts: async_graphql::http::MultipartOptions,
) -> Result<ServerResponse, warp::Rejection> {
let batch_request = GraphQlRequest::new(
async_graphql::http::receive_batch_body(
content_type,
body_reader,
async_graphql::http::MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?,
);
if is_whitelisted || batch_request.len() <= anti_spam::MAX_BATCH_SIZE {
Ok(ServerResponse::GraphQl(
batch_request
.data(QueryContext { is_whitelisted })
.execute(gva_schema)
.await,
))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
r#"{ "error": "The batch contains too many requests" }"#,
))))
}
}