Skip to content
Snippets Groups Projects
Select Git revision
  • c8d178fd8981995f7077dfc910d00a00367e757f
  • master default protected
  • feat/1/increase-antispam-limits
  • duniter-v2s-issue-123-industrialize-releases
  • fix/1/test-cgeek
  • fix/1/add_get_transactions_for_bma
6 results

lib.rs

Blame
  • lib.rs 16.50 KiB
    //  Copyright (C) 2020 Éloïs SANCHEZ.
    //
    // This program is free software: you can redistribute it and/or modify
    // it under the terms of the GNU Affero General Public License as
    // published by the Free Software Foundation, either version 3 of the
    // License, or (at your option) any later version.
    //
    // This program is distributed in the hope that it will be useful,
    // but WITHOUT ANY WARRANTY; without even the implied warranty of
    // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    // GNU Affero General Public License for more details.
    //
    // You should have received a copy of the GNU Affero General Public License
    // along with this program.  If not, see <https://www.gnu.org/licenses/>.
    
    #![deny(
        clippy::unwrap_used,
        missing_copy_implementations,
        trivial_casts,
        trivial_numeric_casts,
        unstable_features,
        unused_import_braces
    )]
    
    mod anti_spam;
    mod warp_;
    
    use async_graphql::http::GraphQLPlaygroundConfig;
    use duniter_core::common::{currency_params::CurrencyParameters, prelude::*};
    use duniter_core::dbs::databases::txs_mp_v2::TxsMpV2DbReadable;
    use duniter_core::dbs::prelude::*;
    use duniter_core::dbs::{kv_typed::prelude::*, FileBackend};
    use duniter_core::documents::transaction::TransactionDocumentV10;
    use duniter_core::global::AsyncAccessor;
    use duniter_core::mempools::Mempools;
    #[cfg(not(test))]
    use duniter_core::module::public_ips::get_public_ips;
    use duniter_core::{block::DubpBlockV10, crypto::hashs::Hash};
    use duniter_core::{
        common::crypto::keys::{ed25519::PublicKey, KeyPair as _},
        crypto::keys::ed25519::Ed25519KeyPair,
    };
    use duniter_core::{conf::DuniterMode, module::Endpoint};
    use duniter_gva_conf::GvaConf;
    use duniter_gva_db::*;
    use duniter_gva_gql::{GvaSchema, QueryContext};
    use duniter_gva_indexer::{get_gva_db_ro, get_gva_db_rw};
    use futures::{StreamExt, TryStreamExt};
    use std::{
        convert::Infallible,
        path::{Path, PathBuf},
    };
    #[cfg(test)]
    use tests::get_public_ips;
    use warp::{http::Response as HttpResponse, Filter as _, Rejection};
    
    const PLAYGROUND_SUB_PATH: &str = "playground";
    const SUBSCRIPTION_SUB_PATH: &str = "subscription";
    
    #[derive(Debug)]
    pub struct GvaModule {
        conf: GvaConf,
        currency: String,
        dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
        gva_db_ro: &'static GvaV1DbRo<FileBackend>,
        mempools: Mempools,
        mode: DuniterMode,
        profile_path_opt: Option<PathBuf>,
        self_keypair: Ed25519KeyPair,
        software_version: &'static str,
    }
    
    #[async_trait::async_trait]
    impl duniter_core::module::DuniterModule for GvaModule {
        const INDEX_BLOCKS: bool = true;
        const MODULE_NAME: &'static str = "gva";
    
        type Conf = GvaConf;
    
        fn apply_block(
            block: &DubpBlockV10,
            _conf: &duniter_core::conf::DuniterCoreConf,
            currency_params: CurrencyParameters,
            profile_path_opt: Option<&Path>,
        ) -> KvResult<()> {
            if let Some(profile_path) = profile_path_opt {
                let gva_db = get_gva_db_rw(profile_path_opt);
                duniter_gva_indexer::apply_block_blocks_chunk(&block, &gva_db, profile_path)?;
                duniter_gva_indexer::apply_block(&block, currency_params, gva_db)
            } else {
                let gva_db = get_gva_db_rw(profile_path_opt);
                duniter_gva_indexer::apply_block(&block, currency_params, gva_db)
            }
        }
        fn revert_block(
            block: &DubpBlockV10,
            _conf: &duniter_core::conf::DuniterCoreConf,
            currency_params: CurrencyParameters,
            profile_path_opt: Option<&Path>,
        ) -> KvResult<()> {
            if let Some(profile_path) = profile_path_opt {
                let gva_db = get_gva_db_rw(profile_path_opt);
                duniter_gva_indexer::revert_block_blocks_chunk(&block, &gva_db, profile_path)?;
                duniter_gva_indexer::revert_block(&block, currency_params, gva_db)
            } else {
                let gva_db = get_gva_db_rw(profile_path_opt);
                duniter_gva_indexer::revert_block(&block, currency_params, gva_db)
            }
        }
        async fn init(
            conf: Self::Conf,
            core_conf: &duniter_core::conf::DuniterCoreConf,
            currency: &str,
            dbs_pool: &fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
            mempools: Mempools,
            mode: duniter_core::conf::DuniterMode,
            profile_path_opt: Option<&Path>,
            software_version: &'static str,
        ) -> anyhow::Result<(Self, Vec<duniter_core::module::Endpoint>)> {
            let self_keypair = core_conf.self_key_pair.clone();
    
            let endpoints = Self::gen_endpoints(&conf).await?;
    
            Ok((
                GvaModule {
                    conf,
                    currency: currency.to_owned(),
                    dbs_pool: dbs_pool.to_owned(),
                    gva_db_ro: get_gva_db_ro(profile_path_opt),
                    mempools,
                    mode,
                    profile_path_opt: profile_path_opt.map(ToOwned::to_owned),
                    self_keypair,
                    software_version,
                },
                endpoints,
            ))
        }
    
        async fn start(self) -> anyhow::Result<()> {
            // Do not start GVA server on js tests
            if std::env::var_os("DUNITER_JS_TESTS") != Some("yes".into()) {
                let GvaModule {
                    conf,
                    currency,
                    dbs_pool,
                    gva_db_ro,
                    mempools,
                    mode,
                    profile_path_opt,
                    self_keypair,
                    software_version,
                } = self;
    
                if let DuniterMode::Start = mode {
                    if conf.enabled {
                        GvaModule::start_inner(
                            conf,
                            currency,
                            dbs_pool,
                            gva_db_ro,
                            mempools,
                            profile_path_opt,
                            self_keypair,
                            software_version,
                        )
                        .await
                    }
                }
            }
            Ok(())
        }
        // Needed for BMA only, will be removed when the migration is complete.
        fn get_transactions_history_for_bma(
            dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>,
            profile_path_opt: Option<&Path>,
            pubkey: PublicKey,
        ) -> KvResult<Option<duniter_core::module::TxsHistoryForBma>> {
            let gva_db = get_gva_db_ro(profile_path_opt);
            let duniter_gva_dbs_reader::txs_history::TxsHistory {
                sent,
                received,
                sending,
                pending,
            } = dbs_pool
                .execute(move |dbs| {
                    duniter_gva_dbs_reader::txs_history::get_transactions_history_for_bma(
                        gva_db,
                        &dbs.txs_mp_db,
                        pubkey,
                    )
                })
                .expect("dbs pool disconnected")?;
            Ok(Some(duniter_core::module::TxsHistoryForBma {
                sent: sent
                    .into_iter()
                    .map(
                        |GvaTxDbV1 {
                             tx,
                             written_block,
                             written_time,
                         }| (tx, written_block, written_time),
                    )
                    .collect(),
                received: received
                    .into_iter()
                    .map(
                        |GvaTxDbV1 {
                             tx,
                             written_block,
                             written_time,
                         }| (tx, written_block, written_time),
                    )
                    .collect(),
                sending,
                pending,
            }))
        }
        // Needed for BMA only, will be removed when the migration is complete.
        fn get_tx_by_hash(
            dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>,
            hash: Hash,
            profile_path_opt: Option<&Path>,
        ) -> KvResult<Option<(TransactionDocumentV10, Option<BlockNumber>)>> {
            let gva_db = get_gva_db_ro(profile_path_opt);
            dbs_pool
                .execute(move |dbs| {
                    if let Some(tx) = dbs
                        .txs_mp_db
                        .txs()
                        .get(&duniter_core::dbs::HashKeyV2(hash))?
                    {
                        Ok(Some((tx.doc, None)))
                    } else if let Some(tx_db) = gva_db.txs().get(&duniter_core::dbs::HashKeyV2(hash))? {
                        Ok(Some((tx_db.tx, Some(tx_db.written_block.number))))
                    } else {
                        Ok(None)
                    }
                })
                .expect("dbs pool disconnected")
        }
    }
    
    impl GvaModule {
        #[allow(clippy::too_many_arguments)]
        async fn start_inner(
            conf: GvaConf,
            currency: String,
            dbs_pool: fast_threadpool::ThreadPoolAsyncHandler<SharedDbs<FileBackend>>,
            gva_db_ro: &'static GvaV1DbRo<FileBackend>,
            mempools: Mempools,
            profile_path_opt: Option<PathBuf>,
            self_keypair: Ed25519KeyPair,
            software_version: &'static str,
        ) {
            log::info!("GvaServer::start: conf={:?}", conf);
    
            // Create BdaExecutor and GvaSchema
            let self_pubkey = self_keypair.public_key();
            duniter_bda::set_bda_executor(
                currency.clone(),
                AsyncAccessor::new(),
                dbs_pool.clone(),
                duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
                profile_path_opt,
                self_keypair,
                software_version,
                mempools.txs,
            );
            let gva_schema = duniter_gva_gql::build_schema_with_data(
                duniter_gva_gql::GvaSchemaData {
                    cm_accessor: AsyncAccessor::new(),
                    dbs_reader: duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro),
                    dbs_pool,
                    server_meta_data: duniter_gva_gql::ServerMetaData {
                        currency,
                        self_pubkey,
                        software_version,
                    },
                    txs_mempool: mempools.txs,
                },
                true,
            );
    
            // Create warp server routes
            let gva_route = warp_::gva_route(
                &conf,
                gva_schema.clone(),
                async_graphql::http::MultipartOptions::default(),
            );
            let gva_playground_route = warp_::gva_playground_route(&conf);
            let gva_subscription_route = warp_::gva_subscription_route(&conf, gva_schema.clone());
    
            // Define recover function
            let recover_func = |err: Rejection| async move {
                if let Some(warp_::BadRequest(err)) = err.find() {
                    return Ok::<_, Infallible>(warp::reply::with_status(
                        err.to_string(),
                        http::StatusCode::BAD_REQUEST,
                    ));
                }
    
                Ok(warp::reply::with_status(
                    "INTERNAL_SERVER_ERROR".to_string(),
                    http::StatusCode::INTERNAL_SERVER_ERROR,
                ))
            };
    
            // Start warp server
            if conf.playground {
                Self::run_warp_server(
                    &conf,
                    gva_route
                        .or(gva_subscription_route)
                        .or(gva_playground_route)
                        .recover(recover_func),
                )
                .await
            } else {
                Self::run_warp_server(
                    &conf,
                    gva_route.or(gva_subscription_route).recover(recover_func),
                )
                .await
            }
        }
    
        async fn run_warp_server<F>(conf: &GvaConf, routes: F)
        where
            F: warp::Filter<Error = Infallible> + Clone + Send + Sync + 'static,
            F::Extract: warp::Reply,
        {
            log::info!(
                "GVA server listen on http://{}:{}/{}",
                conf.ip4,
                conf.port,
                &conf.path
            );
            if let Some(ip6) = conf.ip6 {
                log::info!(
                    "GVA server listen on http://[{}]:{}/{}",
                    ip6,
                    conf.port,
                    &conf.path
                );
                futures::future::join(
                    warp::serve(routes.clone()).run((conf.ip4, conf.port)),
                    warp::serve(routes).run((ip6, conf.port)),
                )
                .await;
            } else {
                warp::serve(routes).run((conf.ip4, conf.port)).await;
            }
            log::warn!("GVA server stopped");
        }
    
        async fn gen_endpoints(conf: &GvaConf) -> anyhow::Result<Vec<Endpoint>> {
            let mut endpoints = Vec::new();
    
            if conf.enabled {
                // Fill remote hosh
                let remote_hosh = if let Some(remote_host) = conf.remote_host.clone() {
                    remote_host
                } else {
                    let public_ips = get_public_ips().await;
                    let mut remote_hosh = String::new();
                    if let Some(ip4) = public_ips.public_ip4_opt {
                        remote_hosh.push_str(ip4.to_string().as_str());
                        remote_hosh.push(' ');
                    }
                    if let Some(ip6) = public_ips.public_ip6_opt {
                        remote_hosh.push_str(&format!("[{}]", ip6.to_string()));
                    } else if !remote_hosh.is_empty() {
                        remote_hosh.pop();
                    } else {
                        return Err(anyhow::Error::msg(
                            "Fail to found public IPs, please configure remote_host manually",
                        ));
                    }
    
                    remote_hosh
                };
                // Fill remote port
                let remote_port = conf.get_remote_port();
    
                // Push endpoints
                endpoints.push(format!(
                    "GVA {}{} {} {}",
                    if remote_port == 443 || conf.remote_tls.unwrap_or_default() {
                        "S "
                    } else {
                        ""
                    },
                    remote_hosh,
                    remote_port,
                    conf.get_remote_path(),
                ));
            }
            Ok(endpoints)
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
        use duniter_core::mempools::Mempools;
        use duniter_core::module::DuniterModule;
        use duniter_core::{conf::DuniterCoreConf, module::public_ips::PublicIPs};
        use fast_threadpool::{ThreadPool, ThreadPoolConfig};
        use std::net::{Ipv4Addr, Ipv6Addr};
        use unwrap::unwrap;
    
        static PUBLIC_IPS_MOCK: async_mutex::Mutex<Option<PublicIPs>> = async_mutex::Mutex::new(None);
    
        pub async fn get_public_ips() -> PublicIPs {
            let public_ips = *PUBLIC_IPS_MOCK.lock().await;
            public_ips.unwrap_or(PublicIPs {
                public_ip4_opt: None,
                public_ip6_opt: None,
            })
        }
    
        async fn test_gen_endpoints(
            conf: &GvaConf,
            public_ips: PublicIPs,
        ) -> anyhow::Result<Vec<Endpoint>> {
            PUBLIC_IPS_MOCK.lock().await.replace(public_ips);
            GvaModule::gen_endpoints(&conf).await
        }
    
        #[tokio::test]
        async fn gen_endpoints() -> anyhow::Result<()> {
            let conf = GvaConf {
                enabled: true,
                ..Default::default()
            };
    
            // ip4 and ip6 find
            let endpoints = test_gen_endpoints(
                &conf,
                PublicIPs {
                    public_ip4_opt: Some(Ipv4Addr::UNSPECIFIED),
                    public_ip6_opt: Some(Ipv6Addr::UNSPECIFIED),
                },
            )
            .await?;
            assert_eq!(endpoints, vec!["GVA 0.0.0.0 [::] 30901 gva".to_owned(),]);
    
            // only ip4 find
            let endpoints = test_gen_endpoints(
                &conf,
                PublicIPs {
                    public_ip4_opt: Some(Ipv4Addr::UNSPECIFIED),
                    public_ip6_opt: None,
                },
            )
            .await?;
            assert_eq!(endpoints, vec!["GVA 0.0.0.0 30901 gva".to_owned(),]);
    
            // only ip6 find
            let endpoints = test_gen_endpoints(
                &conf,
                PublicIPs {
                    public_ip4_opt: None,
                    public_ip6_opt: Some(Ipv6Addr::UNSPECIFIED),
                },
            )
            .await?;
            assert_eq!(endpoints, vec!["GVA [::] 30901 gva".to_owned(),]);
    
            // No ips find
            assert!(test_gen_endpoints(
                &conf,
                PublicIPs {
                    public_ip4_opt: None,
                    public_ip6_opt: None,
                },
            )
            .await
            .is_err());
    
            Ok(())
        }
    
        #[tokio::test]
        #[ignore]
        async fn launch_mem_gva() -> anyhow::Result<()> {
            let dbs = unwrap!(SharedDbs::mem());
            let threadpool = ThreadPool::start(ThreadPoolConfig::default(), dbs);
    
            GvaModule::init(
                GvaConf::default(),
                &DuniterCoreConf::default(),
                "",
                &threadpool.into_async_handler(),
                Mempools::default(),
                duniter_core::conf::DuniterMode::Start,
                None,
                "test",
            )
            .await?
            .0
            .start()
            .await?;
    
            Ok(())
        }
    }