Skip to main content
Sign in
Snippets Groups Projects
Select Git revision
  • d268303a5e4772e2f4ef4819dab260df87c9d83c
  • dev default protected
  • vainamoinen197-transactiondocument-replace-vec-fields-by-smallvec-2
  • dvermd/200-keypairs-dewif
  • elois/wot
  • jawaka/155-dbex-add-dump-fork-tree-command
  • elois/195-bcdbwriteop
  • elois/deps-crypto
  • elois/gva-monetary-mass
  • elois/191-sled
  • elois/195
  • ji_emme/gva-humantimefield
  • 184-gva-rename-commontime-field-to-blockchaintime
  • ji_emme/182-gva-implement-block-meta-data
  • ji_emme/rml14
  • hugo/151-ws2pv2-sync
  • ji_emme/181-gva-implement-identity-request
  • ji_emme/89-implement-client-api-gva-graphql-verification-api
  • logo
  • test-juniper-from-schema
  • elois/exemple-gva-global-context
  • v0.2.0-a4 protected
  • v0.2.0-a2 protected
  • v0.2.0-a protected
  • v0.1.1-a1 protected
  • documents/v0.10.0-b1 protected
  • crypto/v0.4.0-b1 protected
  • crypto/v0.3.0-b3 protected
  • crypto/v0.3.0-b2 protected
  • crypto/v0.3.0-b1 protected
  • wot/v0.8.0-a0.9 protected
  • wot/v0.8.0-a0.8 protected
  • 0.1.0-a0.1 protected
  • v0.0.1-a0.12 protected
  • v0.0.1-a0.11 protected
  • v0.0.1-a0.10 protected
  • v0.0.1-a0.9 protected
  • v0.0.1-a0.8 protected
  • v0.0.1-a0.7 protected
  • v0.0.1-a0.6 protected
  • v0.0.1-a0.5 protected
41 results

lib.rs

Blame
  • lib.rs 53.81 KiB
    //  Copyright (C) 2018  The Duniter Project Developers.
    //
    // This program is free software: you can redistribute it and/or modify
    // it under the terms of the GNU Affero General Public License as
    // published by the Free Software Foundation, either version 3 of the
    // License, or (at your option) any later version.
    //
    // This program is distributed in the hope that it will be useful,
    // but WITHOUT ANY WARRANTY; without even the implied warranty of
    // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    // GNU Affero General Public License for more details.
    //
    // You should have received a copy of the GNU Affero General Public License
    // along with this program.  If not, see <https://www.gnu.org/licenses/>.
    
    //! Crate containing Duniter-rust core.
    
    #![cfg_attr(feature = "strict", deny(warnings))]
    #![cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
    #![deny(
        missing_debug_implementations,
        missing_copy_implementations,
        trivial_casts,
        unsafe_code,
        unstable_features,
        unused_import_braces,
        unused_qualifications
    )]
    #![recursion_limit = "256"]
    
    #[macro_use]
    extern crate lazy_static;
    #[macro_use]
    extern crate log;
    #[macro_use]
    extern crate serde_derive;
    #[macro_use]
    extern crate serde_json;
    
    extern crate duniter_conf;
    extern crate duniter_crypto;
    extern crate duniter_dal;
    extern crate duniter_documents;
    extern crate duniter_message;
    extern crate duniter_module;
    extern crate duniter_network;
    extern crate rand;
    extern crate sqlite;
    extern crate ws;
    
    mod ack_message;
    mod connect_message;
    pub mod constants;
    mod datas;
    mod heads;
    mod ok_message;
    pub mod parsers;
    pub mod serializer;
    pub mod ws2p_connection;
    pub mod ws2p_db;
    pub mod ws2p_requests;
    
    use ack_message::WS2PAckMessageV1;
    use connect_message::WS2PConnectMessageV1;
    use constants::*;
    use datas::*;
    use duniter_conf::DuRsConf;
    use duniter_crypto::keys::*;
    use duniter_dal::dal_event::DALEvent;
    use duniter_dal::dal_requests::{DALReqBlockchain, DALRequest, DALResBlockchain, DALResponse};
    use duniter_documents::Blockstamp;
    use duniter_message::DuniterMessage;
    use duniter_module::*;
    use duniter_network::network_endpoint::*;
    use duniter_network::network_head::*;
    use duniter_network::*;
    use ok_message::WS2POkMessageV1;
    use parsers::blocks::parse_json_block;
    use std::collections::HashMap;
    use std::ops::Deref;
    use std::path::PathBuf;
    use std::sync::mpsc;
    use std::thread;
    use std::time::{Duration, SystemTime, UNIX_EPOCH};
    use ws::Message;
    use ws2p_connection::*;
    use ws2p_requests::network_request_to_json;
    
    #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
    /// WS2P Configuration
    pub struct WS2PConf {
        /// Limit of outcoming connections
        pub outcoming_quota: usize,
        /// Default WS2P endpoints provides by configuration file
        pub sync_endpoints: Vec<NetworkEndpoint>,
    }
    
    impl Default for WS2PConf {
        fn default() -> Self {
            WS2PConf {
                outcoming_quota: *WS2P_DEFAULT_OUTCOMING_QUOTA,
                sync_endpoints: vec![
                    NetworkEndpoint::parse_from_raw(
                        "WS2P c1c39a0a g1-monit.librelois.fr 443 /ws2p",
                        PubKey::Ed25519(
                            ed25519::PublicKey::from_base58(
                                "D9D2zaJoWYWveii1JRYLVK3J4Z7ZH3QczoKrnQeiM6mx",
                            ).unwrap(),
                        ),
                        0,
                        0,
                    ).unwrap(),
                    NetworkEndpoint::parse_from_raw(
                        "WS2P b48824f0 g1.monnaielibreoccitanie.org 443 /ws2p",
                        PubKey::Ed25519(
                            ed25519::PublicKey::from_base58(
                                "7v2J4badvfWQ6qwRdCwhhJfAsmKwoxRUNpJHiJHj7zef",
                            ).unwrap(),
                        ),
                        0,
                        0,
                    ).unwrap(),
                ],
            }
        }
    }
    
    #[derive(Debug)]
    /// Store a Signal receive from network (after message treatment)
    pub enum WS2PSignal {
        /// Receive a websocket error from a connextion. `NodeFullId` store the identifier of connection.
        WSError(NodeFullId),
        /// A new connection is successfully established with `NodeFullId`.
        ConnectionEstablished(NodeFullId),
        NegociationTimeout(NodeFullId),
        Timeout(NodeFullId),
        DalRequest(NodeFullId, ModuleReqId, serde_json::Value),
        PeerCard(NodeFullId, serde_json::Value, Vec<NetworkEndpoint>),
        Heads(NodeFullId, Vec<NetworkHead>),
        Document(NodeFullId, NetworkDocument),
        ReqResponse(ModuleReqId, NetworkRequest, NodeFullId, serde_json::Value),
        Empty,
        NoConnection,
    }
    
    #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
    pub enum NetworkConsensusError {
        InsufficientData(usize),
        Fork,
    }
    
    #[derive(Debug)]
    pub enum SendRequestError {
        RequestTypeMustNotBeTransmitted(),
        WSError(usize, Vec<ws::Error>),
    }
    
    #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
    pub struct WS2PModule {}
    
    #[derive(Debug)]
    pub enum WS2PThreadSignal {
        DuniterMessage(Box<DuniterMessage>),
        WS2PConnectionMessage(WS2PConnectionMessage),
    }
    
    pub trait WS2PMessage: Sized {
        fn parse(v: &serde_json::Value, currency: String) -> Option<Self>;
        fn to_raw(&self) -> String;
        fn sign(&self, key_pair: KeyPairEnum) -> Sig {
            key_pair.sign(self.to_raw().as_bytes())
        }
        fn verify(&self) -> bool;
        //fn parse_and_verify(v: serde_json::Value, currency: String) -> bool;
    }
    
    impl Default for WS2PModule {
        fn default() -> WS2PModule {
            WS2PModule {}
        }
    }
    
    impl NetworkModule<DuRsConf, DuniterMessage> for WS2PModule {
        fn sync(
            _soft_meta_datas: &SoftwareMetaDatas<DuRsConf>,
            _keys: RequiredKeysContent,
            _conf: WS2PConf,
            _main_sender: mpsc::Sender<RooterThreadMessage<DuniterMessage>>,
            _sync_endpoint: SyncEndpoint,
        ) -> Result<(), ModuleInitError> {
            println!("Downlaod blockchain from network...");
            println!("Error : not yet implemented !");
            Ok(())
        }
    }
    
    impl DuniterModule<DuRsConf, DuniterMessage> for WS2PModule {
        type ModuleConf = WS2PConf;
    
        fn id() -> ModuleId {
            ModuleId(String::from("ws2p"))
        }
        fn priority() -> ModulePriority {
            ModulePriority::Essential()
        }
        fn ask_required_keys() -> RequiredKeys {
            RequiredKeys::NetworkKeyPair()
        }
        fn start(
            soft_meta_datas: &SoftwareMetaDatas<DuRsConf>,
            keys: RequiredKeysContent,
            conf: WS2PConf,
            rooter_sender: mpsc::Sender<RooterThreadMessage<DuniterMessage>>,
            load_conf_only: bool,
        ) -> Result<(), ModuleInitError> {
            // Get start time
            let start_time = SystemTime::now();
    
            // Define WS2PModuleDatas
            let mut ws2p_module = WS2PModuleDatas {
                followers: Vec::new(),
                key_pair: None,
                currency: None,
                conf,
                node_id: NodeUUID(soft_meta_datas.conf.my_node_id()),
                main_thread_channel: mpsc::channel(),
                ws2p_endpoints: HashMap::new(),
                websockets: HashMap::new(),
                requests_awaiting_response: HashMap::new(),
                heads_cache: HashMap::new(),
                my_head: None,
                uids_cache: HashMap::new(),
            };
    
            // load conf
            let key_pair = match keys {
                RequiredKeysContent::NetworkKeyPair(key_pair) => key_pair,
                _ => panic!("WS2PModule fatal error at load_conf() : keys != NetworkKeyPair"),
            };
            let mut ws2p_endpoints = HashMap::new();
            for ep in ws2p_module.conf.sync_endpoints.clone() {
                ws2p_endpoints.insert(
                    ep.node_full_id()
                        .expect("Fail to get endpoint node_full_id"),
                    (ep.clone(), WS2PConnectionState::Close),
                );
                info!("Load sync endpoint {}", ep.raw());
            }
            ws2p_module.key_pair = Some(key_pair);
            ws2p_module.currency = Some(soft_meta_datas.conf.currency().to_string());
            ws2p_module.ws2p_endpoints = ws2p_endpoints;
    
            // Create ws2p main thread channel
            let ws2p_sender_clone = ws2p_module.main_thread_channel.0.clone();
    
            // Create proxy channel
            let (proxy_sender, proxy_receiver): (
                mpsc::Sender<DuniterMessage>,
                mpsc::Receiver<DuniterMessage>,
            ) = mpsc::channel();
            let proxy_sender_clone = proxy_sender.clone();
    
            // Launch a proxy thread that transform DuniterMessage to WS2PThreadSignal(DuniterMessage)
            thread::spawn(move || {
                // Send proxy sender to main
                rooter_sender
                    .send(RooterThreadMessage::ModuleSender(proxy_sender_clone))
                    .expect("Fatal error : ws2p module fail to send is sender channel !");
                debug!("Send ws2p sender to main thread.");
                loop {
                    match proxy_receiver.recv() {
                        Ok(message) => {
                            ws2p_sender_clone
                                .send(WS2PThreadSignal::DuniterMessage(Box::new(message.clone())))
                                .expect(
                                    "Fatal error : fail to relay DuniterMessage to ws2p main thread !",
                                );
                            if let DuniterMessage::Stop() = message {
                                break;
                            };
                        }
                        Err(e) => panic!(format!("{}", e)),
                    }
                }
            });
    
            // open ws2p bdd
            let mut db_path =
                duniter_conf::datas_path(&soft_meta_datas.profile, &soft_meta_datas.conf.currency());
            db_path.push("ws2p.db");
            let db = WS2PModuleDatas::open_db(&db_path).expect("Fatal error : fail to open WS2P DB !");
    
            // Get ws2p endpoints in BDD
            let mut count = 0;
            let dal_enpoints =
                ws2p_db::get_endpoints_for_api(&db, &NetworkEndpointApi(String::from("WS2P")));
            for ep in dal_enpoints {
                if ep.api() == NetworkEndpointApi(String::from("WS2P"))
                    && (cfg!(feature = "ssl") || ep.port() != 443)
                {
                    count += 1;
                    ws2p_module.ws2p_endpoints.insert(
                        ep.node_full_id()
                            .expect("WS2P: Fail to get ep.node_full_id() !"),
                        (ep.clone(), WS2PConnectionState::from(ep.status())),
                    );
                }
            }
            info!("Load {} endpoints from bdd !", count);
    
            // Stop here in load_conf_only mode
            if load_conf_only {
                return Ok(());
            }
    
            // Initialize variables
            let mut last_ws2p_connecting_wave = SystemTime::now();
            let mut last_ws2p_connections_print = SystemTime::now();
            let mut endpoints_to_update_status: HashMap<NodeFullId, SystemTime> = HashMap::new();
            let mut last_identities_request = UNIX_EPOCH;
            let mut current_blockstamp = Blockstamp::default();
            let mut next_receiver = 0;
    
            // Start
            ws2p_module.connect_to_know_endpoints();
            loop {
                match ws2p_module
                    .main_thread_channel
                    .1
                    .recv_timeout(Duration::from_millis(200))
                {
                    Ok(message) => match message {
                        WS2PThreadSignal::DuniterMessage(ref duniter_mesage) => {
                            match *duniter_mesage.deref() {
                                DuniterMessage::Stop() => break,
                                DuniterMessage::Followers(ref new_followers) => {
                                    info!("WS2P module receive followers !");
                                    for new_follower in new_followers {
                                        debug!("WS2PModule : push one follower.");
                                        ws2p_module.followers.push(new_follower.clone());
                                        if current_blockstamp == Blockstamp::default() {
                                            // Request local current blockstamp
                                            ws2p_module.send_dal_request(
                                                &DALRequest::BlockchainRequest(
                                                    DALReqBlockchain::CurrentBlock(ModuleReqFullId(
                                                        WS2PModule::id(),
                                                        ModuleReqId(0),
                                                    )),
                                                ),
                                            );
                                        } else {
                                            if ws2p_module.my_head.is_none() {
                                                ws2p_module.my_head = Some(heads::generate_my_head(
                                                    &key_pair,
                                                    NodeUUID(soft_meta_datas.conf.my_node_id()),
                                                    soft_meta_datas.soft_name,
                                                    soft_meta_datas.soft_version,
                                                    &current_blockstamp,
                                                    None,
                                                ));
                                            }
                                            ws2p_module.send_network_event(
                                                &NetworkEvent::ReceiveHeads(vec![
                                                    ws2p_module.my_head.clone().unwrap(),
                                                ]),
                                            );
                                        }
                                    }
                                }
                                DuniterMessage::NetworkRequest(ref request) => match *request {
                                    NetworkRequest::GetBlocks(
                                        ref req_id,
                                        ref receiver,
                                        ref count,
                                        ref from,
                                    ) => {
                                        if *receiver == NodeFullId::default() {
                                            let mut receiver_index = 0;
                                            let mut real_receiver = NodeFullId::default();
                                            for (ws2p_full_id, (_ep, state)) in
                                                ws2p_module.ws2p_endpoints.clone()
                                            {
                                                if let WS2PConnectionState::Established = state {
                                                    if receiver_index == next_receiver {
                                                        real_receiver = ws2p_full_id;
                                                        break;
                                                    }
                                                    receiver_index += 1;
                                                }
                                            }
                                            if real_receiver == NodeFullId::default() {
                                                next_receiver = 0;
                                                for (ws2p_full_id, (_ep, state)) in
                                                    ws2p_module.ws2p_endpoints.clone()
                                                {
                                                    if let WS2PConnectionState::Established = state {
                                                        real_receiver = ws2p_full_id;
                                                        break;
                                                    }
                                                }
                                            } else {
                                                next_receiver += 1;
                                            }
                                            if real_receiver != NodeFullId::default() {
                                                let _blocks_request_result = ws2p_module
                                                    .send_request_to_specific_node(
                                                        &real_receiver,
                                                        &NetworkRequest::GetBlocks(
                                                            req_id.clone(),
                                                            *receiver,
                                                            *count,
                                                            *from,
                                                        ),
                                                    );
                                            }
                                        } else {
                                            let _blocks_request_result = ws2p_module
                                                .send_request_to_specific_node(
                                                    &receiver,
                                                    &NetworkRequest::GetBlocks(
                                                        req_id.clone(),
                                                        *receiver,
                                                        *count,
                                                        *from,
                                                    ),
                                                );
                                        }
                                    }
                                    NetworkRequest::GetEndpoints(ref _request) => {}
                                    _ => {}
                                },
                                DuniterMessage::DALEvent(ref dal_event) => match *dal_event {
                                    DALEvent::StackUpValidBlock(ref _block, ref blockstamp) => {
                                        current_blockstamp = *blockstamp;
                                        debug!(
                                            "WS2PModule : current_blockstamp = {}",
                                            current_blockstamp
                                        );
                                        ws2p_module.my_head = Some(heads::generate_my_head(
                                            &key_pair,
                                            NodeUUID(soft_meta_datas.conf.my_node_id()),
                                            soft_meta_datas.soft_name,
                                            soft_meta_datas.soft_version,
                                            &current_blockstamp,
                                            None,
                                        ));
                                        ws2p_module.send_network_event(&NetworkEvent::ReceiveHeads(
                                            vec![ws2p_module.my_head.clone().unwrap()],
                                        ));
                                        // Send my head to all connections
                                        let my_json_head = serializer::serialize_head(
                                            ws2p_module.my_head.clone().unwrap(),
                                        );
                                        trace!("Send my HEAD: {:#?}", my_json_head);
                                        let _results: Result<
                                            (),
                                            ws::Error,
                                        > = ws2p_module
                                            .websockets
                                            .iter_mut()
                                            .map(|ws| {
                                                (ws.1).0.send(Message::text(
                                                    json!({
                                                    "name": "HEAD",
                                                    "body": {
                                                        "heads": [my_json_head]
                                                    }
                                                }).to_string(),
                                                ))
                                            })
                                            .collect();
                                    }
                                    DALEvent::RevertBlocks(ref _blocks) => {}
                                    _ => {}
                                },
                                DuniterMessage::DALResponse(ref dal_res) => match *dal_res.deref() {
                                    DALResponse::Blockchain(ref dal_res_bc) => {
                                        match *dal_res_bc.deref() {
                                            DALResBlockchain::CurrentBlock(
                                                ref _requester_full_id,
                                                ref current_block,
                                                ref current_blockstamp_,
                                            ) => {
                                                let _current_block = current_block.deref();
                                                debug!(
                                                    "WS2PModule : receive DALResBc::CurrentBlock({})",
                                                    current_blockstamp
                                                );
                                                current_blockstamp = *current_blockstamp_;
                                                if ws2p_module.my_head.is_none() {
                                                    ws2p_module.my_head =
                                                        Some(heads::generate_my_head(
                                                            &key_pair,
                                                            NodeUUID(soft_meta_datas.conf.my_node_id()),
                                                            soft_meta_datas.soft_name,
                                                            soft_meta_datas.soft_version,
                                                            &current_blockstamp,
                                                            None,
                                                        ));
                                                }
                                                ws2p_module.send_network_event(
                                                    &NetworkEvent::ReceiveHeads(vec![
                                                        ws2p_module.my_head.clone().unwrap(),
                                                    ]),
                                                );
                                            }
                                            DALResBlockchain::UIDs(ref uids) => {
                                                // Add uids to heads
                                                for head in ws2p_module.heads_cache.values_mut() {
                                                    if let Some(uid_option) = uids.get(&head.pubkey()) {
                                                        if let Some(ref uid) = *uid_option {
                                                            head.set_uid(uid);
                                                            ws2p_module
                                                                .uids_cache
                                                                .insert(head.pubkey(), uid.to_string());
                                                        } else {
                                                            ws2p_module
                                                                .uids_cache
                                                                .remove(&head.pubkey());
                                                        }
                                                    }
                                                }
                                                // Resent heads to other modules
                                                ws2p_module.send_network_event(
                                                    &NetworkEvent::ReceiveHeads(
                                                        ws2p_module
                                                            .heads_cache
                                                            .values()
                                                            .cloned()
                                                            .collect(),
                                                    ),
                                                );
                                                // Resent to other modules connections that match receive uids
                                                for (node_full_id, (ep, conn_state)) in
                                                    &ws2p_module.ws2p_endpoints
                                                {
                                                    if let Some(uid_option) = uids.get(&node_full_id.1)
                                                    {
                                                        ws2p_module.send_network_event(
                                                            &NetworkEvent::ConnectionStateChange(
                                                                *node_full_id,
                                                                *conn_state as u32,
                                                                uid_option.clone(),
                                                                ep.get_url(false),
                                                            ),
                                                        );
                                                    }
                                                }
                                            }
                                            _ => {}
                                        }
                                    }
                                    DALResponse::Pendings(_, _) => {}
                                },
                                _ => {}
                            }
                        }
                        WS2PThreadSignal::WS2PConnectionMessage(ws2p_conn_message) => match ws2p_module
                            .ws2p_conn_message_pretreatment(ws2p_conn_message)
                        {
                            WS2PSignal::NoConnection => {
                                warn!("WS2PSignal::NoConnection");
                            }
                            WS2PSignal::ConnectionEstablished(ws2p_full_id) => {
                                let req_id =
                                    ModuleReqId(ws2p_module.requests_awaiting_response.len() as u32);
                                let module_id = WS2PModule::id();
                                let _current_request_result = ws2p_module
                                    .send_request_to_specific_node(
                                        &ws2p_full_id,
                                        &NetworkRequest::GetCurrent(
                                            ModuleReqFullId(module_id, req_id),
                                            ws2p_full_id,
                                        ),
                                    );
                                if ws2p_module.uids_cache.get(&ws2p_full_id.1).is_none() {
                                    ws2p_module.send_dal_request(&DALRequest::BlockchainRequest(
                                        DALReqBlockchain::UIDs(vec![ws2p_full_id.1]),
                                    ));
                                }
                                ws2p_module.send_network_event(&NetworkEvent::ConnectionStateChange(
                                    ws2p_full_id,
                                    WS2PConnectionState::Established as u32,
                                    ws2p_module.uids_cache.get(&ws2p_full_id.1).cloned(),
                                    ws2p_module.ws2p_endpoints[&ws2p_full_id].0.get_url(false),
                                ));
                            }
                            WS2PSignal::WSError(ws2p_full_id) => {
                                endpoints_to_update_status.insert(ws2p_full_id, SystemTime::now());
                                ws2p_module.send_network_event(&NetworkEvent::ConnectionStateChange(
                                    ws2p_full_id,
                                    WS2PConnectionState::WSError as u32,
                                    ws2p_module.uids_cache.get(&ws2p_full_id.1).cloned(),
                                    ws2p_module.ws2p_endpoints[&ws2p_full_id].0.get_url(false),
                                ));
                            }
                            WS2PSignal::NegociationTimeout(ws2p_full_id) => {
                                endpoints_to_update_status.insert(ws2p_full_id, SystemTime::now());
                                ws2p_module.send_network_event(&NetworkEvent::ConnectionStateChange(
                                    ws2p_full_id,
                                    WS2PConnectionState::Denial as u32,
                                    ws2p_module.uids_cache.get(&ws2p_full_id.1).cloned(),
                                    ws2p_module.ws2p_endpoints[&ws2p_full_id].0.get_url(false),
                                ));
                            }
                            WS2PSignal::Timeout(ws2p_full_id) => {
                                endpoints_to_update_status.insert(ws2p_full_id, SystemTime::now());
                                ws2p_module.send_network_event(&NetworkEvent::ConnectionStateChange(
                                    ws2p_full_id,
                                    WS2PConnectionState::Close as u32,
                                    ws2p_module.uids_cache.get(&ws2p_full_id.1).cloned(),
                                    ws2p_module.ws2p_endpoints[&ws2p_full_id].0.get_url(false),
                                ));
                            }
                            WS2PSignal::PeerCard(_ws2p_full_id, _peer_card, ws2p_endpoints) => {
                                //trace!("WS2PSignal::PeerCard({})", ws2p_full_id);
                                //ws2p_module.send_network_event(NetworkEvent::ReceivePeers(_));
                                for ep in ws2p_endpoints {
                                    match ws2p_module.ws2p_endpoints.get(
                                        &ep.node_full_id()
                                            .expect("WS2P: Fail to get ep.node_full_id() !"),
                                    ) {
                                        Some(_) => {}
                                        None => {
                                            if let Some(_api) =
                                                ws2p_db::string_to_api(&ep.api().0.clone())
                                            {
                                                endpoints_to_update_status.insert(
                                                    ep.node_full_id().expect(
                                                        "WS2P: Fail to get ep.node_full_id() !",
                                                    ),
                                                    SystemTime::now(),
                                                );
                                            }
                                            if cfg!(feature = "ssl") || ep.port() != 443 {
                                                ws2p_module.connect_to(&ep);
                                            }
                                        }
                                    };
                                }
                            }
                            WS2PSignal::Heads(ws2p_full_id, heads) => {
                                trace!("WS2PSignal::Heads({}, {:?})", ws2p_full_id, heads.len());
                                ws2p_module.send_dal_request(&DALRequest::BlockchainRequest(
                                    DALReqBlockchain::UIDs(heads.iter().map(|h| h.pubkey()).collect()),
                                ));
                                ws2p_module.send_network_event(&NetworkEvent::ReceiveHeads(
                                    heads
                                        .iter()
                                        .map(|head| {
                                            let mut new_head = head.clone();
                                            if let Some(uid) =
                                                ws2p_module.uids_cache.get(&head.pubkey())
                                            {
                                                new_head.set_uid(uid);
                                            }
                                            new_head
                                        })
                                        .collect(),
                                ));
                            }
                            WS2PSignal::Document(ws2p_full_id, network_doc) => {
                                trace!("WS2PSignal::Document({})", ws2p_full_id);
                                ws2p_module.send_network_event(&NetworkEvent::ReceiveDocuments(vec![
                                    network_doc,
                                ]));
                            }
                            WS2PSignal::ReqResponse(req_id, req, recipient_full_id, response) => {
                                match req {
                                    NetworkRequest::GetCurrent(ref _req_id, _receiver) => {
                                        info!(
                                            "WS2PSignal::ReceiveCurrent({}, {:?}, {:#?})",
                                            req_id.0, req, response
                                        );
                                        if let Some(block) = parse_json_block(&response) {
                                            ws2p_module.send_network_event(&NetworkEvent::ReqResponse(
                                                Box::new(NetworkResponse::CurrentBlock(
                                                    ModuleReqFullId(WS2PModule::id(), req_id),
                                                    recipient_full_id,
                                                    Box::new(block),
                                                )),
                                            ));
                                        }
                                    }
                                    NetworkRequest::GetBlocks(ref _req_id, _receiver, _count, from) => {
                                        info!("WS2PSignal::ReceiveChunk({}, {:?})", req_id.0, req);
                                        if response.is_array() {
                                            let mut chunk = Vec::new();
                                            for json_block in response.as_array().unwrap() {
                                                if let Some(block) = parse_json_block(json_block) {
                                                    chunk.push(NetworkDocument::Block(block));
                                                } else {
                                                    warn!("WS2PModule: Error : fail to parse one json block !");
                                                }
                                            }
                                            debug!("Send chunk to followers : {}", from);
                                            ws2p_module.send_network_event(
                                                &NetworkEvent::ReceiveDocuments(chunk),
                                            );
                                        }
                                    }
                                    NetworkRequest::GetRequirementsPending(
                                        _req_id,
                                        _receiver,
                                        min_cert,
                                    ) => {
                                        info!(
                                            "WS2PSignal::ReceiveRequirementsPending({}, {})",
                                            req_id.0, min_cert
                                        );
                                        debug!("----------------------------------------");
                                        debug!("-      BEGIN IDENTITIES PENDING        -");
                                        debug!("----------------------------------------");
                                        debug!("{:#?}", response);
                                        debug!("----------------------------------------");
                                        debug!("-       END IDENTITIES PENDING         -");
                                        debug!("----------------------------------------");
                                    }
                                    _ => {}
                                }
                            }
                            WS2PSignal::Empty => {}
                            _ => {}
                        },
                    },
                    Err(e) => match e {
                        mpsc::RecvTimeoutError::Disconnected => {
                            panic!("Disconnected ws2p module !");
                        }
                        mpsc::RecvTimeoutError::Timeout => {}
                    },
                }
                if SystemTime::now()
                    .duration_since(last_ws2p_connections_print)
                    .unwrap() > Duration::new(5, 0)
                {
                    last_ws2p_connections_print = SystemTime::now();
                    let mut connected_nodes = Vec::new();
                    for (k, (_ep, state)) in ws2p_module.ws2p_endpoints.clone() {
                        if let WS2PConnectionState::Established = state {
                            connected_nodes.push(k);
                        }
                    }
                    // Print network consensus
                    match ws2p_module.get_network_consensus() {
                        Ok(consensus_blockstamp) => {
                            debug!(
                                "WS2PModule : get_network_consensus() = {:?}",
                                consensus_blockstamp
                            );
                            if current_blockstamp.id.0 < (consensus_blockstamp.id.0 + 2) {
                                warn!("We probably are in a fork branch !");
                            }
                        }
                        Err(e) => warn!("{:?}", e),
                    }
                    // Print current_blockstamp
                    info!(
                        "WS2PModule : current_blockstamp() = {:?}",
                        current_blockstamp
                    );
                    // New WS2P connection wave
                    if connected_nodes.len() < ws2p_module.conf.clone().outcoming_quota
                        && (SystemTime::now()
                            .duration_since(last_ws2p_connecting_wave)
                            .unwrap()
                            > Duration::new(*WS2P_OUTCOMING_INTERVAL, 0)
                            || (SystemTime::now()
                                .duration_since(last_ws2p_connecting_wave)
                                .unwrap()
                                > Duration::new(*WS2P_OUTCOMING_INTERVAL_AT_STARTUP, 0)
                                && SystemTime::now().duration_since(start_time).unwrap()
                                    < Duration::new(*WS2P_OUTCOMING_INTERVAL, 0)))
                    {
                        last_ws2p_connecting_wave = SystemTime::now();
                        info!("Connected to know endpoints...");
                        ws2p_module.connect_to_know_endpoints();
                    }
                    /*// Request blocks from network
                    if SystemTime::now()
                        .duration_since(last_blocks_request)
                        .unwrap() > Duration::new(*BLOCKS_REQUEST_INTERVAL, 0)
                        && SystemTime::now().duration_since(start_time).unwrap() > Duration::new(10, 0)
                    {
                        let mut request_blocks_from = current_blockstamp.id.0;
                        if request_blocks_from > 0 {
                            request_blocks_from += 1;
                        }
                        info!("get chunks from all connections...");
                        let module_id = WS2PModule::id();
                        let _blocks_request_result =
                            ws2p_module.send_request_to_all_connections(&NetworkRequest::GetBlocks(
                                ModuleReqFullId(module_id, ModuleReqId(0 as u32)),
                                NodeFullId::default(),
                                50,
                                request_blocks_from,
                            ));
                        last_blocks_request = SystemTime::now();
                    }*/
                    // Request pending_identities from network
                    if SystemTime::now()
                        .duration_since(last_identities_request)
                        .unwrap()
                        > Duration::new(*PENDING_IDENTITIES_REQUEST_INTERVAL, 0)
                        && SystemTime::now().duration_since(start_time).unwrap() > Duration::new(10, 0)
                    {
                        /*info!("get pending_identities from all connections...");
                                        let _blocks_request_result = ws2p_module.send_request_to_all_connections(
                                            &NetworkRequest::GetRequirementsPending(ModuleReqId(0 as u32), 5),
                                        );*/
                        last_identities_request = SystemTime::now();
                    }
                    // Write pending endpoints
                    for (ep_full_id, received_time) in endpoints_to_update_status.clone() {
                        if SystemTime::now().duration_since(received_time).unwrap()
                            > Duration::new(*DURATION_BEFORE_RECORDING_ENDPOINT, 0)
                        {
                            if let Some(&(ref ep, ref state)) =
                                ws2p_module.ws2p_endpoints.get(&ep_full_id)
                            {
                                /*let dal_endpoint = duniter_dal::endpoint::DALEndpoint::new(
                                                    state.clone() as u32,
                                                    ep.node_uuid().unwrap().0,
                                                    ep.pubkey(),
                                                    duniter_dal::endpoint::string_to_api(&ep.api().0).unwrap(),
                                                    1,
                                                    ep.to_string(),
                                                    received_time.duration_since(UNIX_EPOCH).unwrap(),
                                                );*/
                                ws2p_db::write_endpoint(
                                    &db,
                                    &ep,
                                    state.to_u32(),
                                    SystemTime::now()
                                        .duration_since(UNIX_EPOCH)
                                        .unwrap()
                                        .as_secs(),
                                );
                            }
                            endpoints_to_update_status.remove(&ep_full_id);
                        } else {
                            info!(
                                "Write {} endpoint in {} secs.",
                                ep_full_id,
                                *DURATION_BEFORE_RECORDING_ENDPOINT
                                    - SystemTime::now()
                                        .duration_since(received_time)
                                        .unwrap()
                                        .as_secs()
                            );
                        }
                    }
                    // ..
                }
            }
            Ok(())
        }
    }
    
    #[cfg(test)]
    mod tests {
        extern crate duniter_conf;
        extern crate duniter_crypto;
        extern crate duniter_dal;
        extern crate duniter_documents;
        extern crate duniter_message;
        extern crate duniter_module;
        extern crate duniter_network;
    
        use super::parsers::blocks::parse_json_block;
        use super::*;
        use duniter_crypto::keys::PublicKey;
        use duniter_documents::blockchain::v10::documents::BlockDocument;
        use duniter_module::DuniterModule;
        use duniter_network::network_endpoint::{NetworkEndpoint, NetworkEndpointApi};
        use duniter_network::NetworkBlock;
        use std::fs;
        use std::path::PathBuf;
        use std::time::{SystemTime, UNIX_EPOCH};
    
        #[test]
        fn test_parse_json_block() {
            let json_block = json!({
                "fork": false,
                "version": 10,
                "nonce": 10500000059239 as u64,
                "number": 109966,
                "powMin": 88,
                "time": 1523300656,
                "medianTime": 1523295259,
                "membersCount": 933,
                "monetaryMass": 146881563,
                "unitbase": 0,
                "issuersCount": 44,
                "issuersFrame": 221,
                "issuersFrameVar": 0,
                "currency": "g1",
                "issuer": "GRBPV3Y7PQnB9LaZhSGuS3BqBJbSHyibzYq65kTh1nQ4",
                "signature": "GCg2Lti3TdxWlhA8JF8pRI+dRQ0XZVtcC4BqO/COTpjTQFdWG6qmUNVvdeYCtR/lu1JQe3N/IhrbyV6L/6I+Cg==",
                "hash": "000000EF5B2AA849F4C3AF3D35E1284EA1F34A9F617EA806CE8371619023DC74",
                "parameters": "",
                "previousHash": "000004C00602F8A27AE078DE6351C0DDA1EA0974A78D2BEFA7DFBE7B7C3146FD",
                "previousIssuer": "5SwfQubSat5SunNafCsunEGTY93nVM4kLSsuprNqQb6S",
                "inner_hash": "61F02B1A6AE2E4B9A1FD66CE673258B4B21C0076795571EE3C9DC440DD06C46C",
                "dividend": null,
                "identities": [],
                "joiners": [],
                "actives": [],
                "leavers": [],
                "revoked": [],
                "excluded": [],
                "certifications": [
                    "Hm5qjaNuHogNRdGZ4vgnLA9DMZVUu5YWzVup5mubuxCc:8AmdBsimcLziXaCS4AcVUfPx7rkjeic7482dLbBkuZw6:109964:yHKBGMeuxyIqFb295gVNK6neRC+U0tmsX1Zed3TLjS3ZZHYYycE1piLcYsTKll4ifNVp6rm+hd/CLdHYB+29CA==",
                    "BncjgJeFpGsMCCsUfzNLEexjsbuX3V2mg9P67ov2LkwK:DyBUBNpzpfvjtwYYSaVMM6ST6t2DNg3NCE9CU9bRQFhF:105864:cJEGW9WxJwlMA2+4LNAK4YieyseUy1WIkFh1YLYD+JJtJEoCSnIQRXzhiAoRpGaj0bRz8sTpwI6PRkuVoDJJDQ=="
                ],
                "transactions": [
                    {
                    "version": 10,
                    "currency": "g1",
                    "locktime": 0,
                    "hash": "80FE1E83DC4D0B722CA5F8363EFC6A3E29071032EBB71C1E0DF8D4FEA589C698",
                    "blockstamp": "109964-00000168105D4A8A8BC8C0DC70033F45ABE472782C75A7F2074D0F4D4A3B7B2B",
                    "blockstampTime": 0,
                    "issuers": [
                        "6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT"
                    ],
                    "inputs": [
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:98284",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:98519",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:98779",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:99054",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:99326",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:99599",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:99884",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:100174",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:100469",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:100746",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:101036",
                        "1001:0:D:6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT:101327"
                    ],
                    "outputs": [
                        "12000:0:SIG(HmH5beJqKGMeotcQUrSW7Wo5tKvAksHmfYXfiSQ9EbWz)",
                        "12:0:SIG(6PiqcuUWhyiBF3Lgcht8c1yfk6gMfQzcUc46CqrJfeLT)"
                    ],
                    "unlocks": [
                        "0:SIG(0)",
                        "1:SIG(0)",
                        "2:SIG(0)",
                        "3:SIG(0)",
                        "4:SIG(0)",
                        "5:SIG(0)",
                        "6:SIG(0)",
                        "7:SIG(0)",
                        "8:SIG(0)",
                        "9:SIG(0)",
                        "10:SIG(0)",
                        "11:SIG(0)"
                    ],
                    "signatures": [
                        "MZxoKxYgwufh/s5mwLCsYEZXtIsP1hEKCyAzLipJsvCbR9xj7wXUw0C/ahwvZfBtR7+QVPIfLmwYEol1JcHjDw=="
                    ],
                    "comment": "Adhesion 2018"
                    },
                    {
                    "version": 10,
                    "currency": "g1",
                    "locktime": 0,
                    "hash": "B80507412B35BD5EB437AE0D3EB97E60E3A4974F5CDEA1AF7E2127C0E943481F",
                    "blockstamp": "109964-00000168105D4A8A8BC8C0DC70033F45ABE472782C75A7F2074D0F4D4A3B7B2B",
                    "blockstampTime": 0,
                    "issuers": [
                        "8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU"
                    ],
                    "inputs": [
                        "1001:0:D:8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU:91560",
                        "1001:0:D:8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU:91850",
                        "1001:0:D:8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU:92111",
                        "1001:0:D:8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU:92385",
                        "1001:0:D:8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU:92635"
                    ],
                    "outputs": [
                        "5000:0:SIG(BzHnbec1Gov7dLSt1EzJS7vikoQCECeuvZs4wamZAcT1)",
                        "5:0:SIG(8gundJEbfm73Kx3jjw8YivJyz8qD2igjf6baCBLFCxPU)"
                    ],
                    "unlocks": [
                        "0:SIG(0)",
                        "1:SIG(0)",
                        "2:SIG(0)",
                        "3:SIG(0)",
                        "4:SIG(0)"
                    ],
                    "signatures": [
                        "A+ukwRvLWs1gZQ0KAqAnknEgmRQHdrnOvNuBx/WZqje17BAPrVxSxKpqwU6MiajU+ppigsYp6Bu0FdPf/tGnCQ=="
                    ],
                    "comment": ""
                    },
                    {
                    "version": 10,
                    "currency": "g1",
                    "locktime": 0,
                    "hash": "D8970E6629C0381A78534EEDD86803E9215A7EC4C494BAEA79EB19425F9B4D31",
                    "blockstamp": "109964-00000168105D4A8A8BC8C0DC70033F45ABE472782C75A7F2074D0F4D4A3B7B2B",
                    "blockstampTime": 0,
                    "issuers": [
                        "FnSXE7QyBfs4ozoYAt5NEewWhHEPorf38cNXu3kX9xsg"
                    ],
                    "inputs": [
                        "1000:0:D:FnSXE7QyBfs4ozoYAt5NEewWhHEPorf38cNXu3kX9xsg:36597",
                        "1000:0:D:FnSXE7QyBfs4ozoYAt5NEewWhHEPorf38cNXu3kX9xsg:36880",
                        "1000:0:D:FnSXE7QyBfs4ozoYAt5NEewWhHEPorf38cNXu3kX9xsg:37082"
                    ],
                    "outputs": [
                        "3000:0:SIG(BBC8Rnh4CWN1wBrPLevK7GRFFVDVw7Lu24YNMUmhqoHU)"
                    ],
                    "unlocks": [
                        "0:SIG(0)",
                        "1:SIG(0)",
                        "2:SIG(0)"
                    ],
                    "signatures": [
                        "OpiF/oQfIigOeAtsteukU0w9FPSELE+BVTxhmsQ8bEeYGlwovG2VF8ZFiJkLLPi6vFuKgwzULJfjNGd97twZCw=="
                    ],
                    "comment": "1 billet pour une seance.pour un chouette film"
                    }
                ],
            });
            let mut block: BlockDocument =
                match parse_json_block(&json_block).expect("Fail to parse test json block !") {
                    NetworkBlock::V10(network_block_v10) => network_block_v10.uncompleted_block_doc,
                    _ => {
                        panic!("Test block must be a v10 block !");
                    }
                };
            assert_eq!(
                block
                    .inner_hash
                    .expect("Try to get inner_hash of an uncompleted or reduce block !")
                    .to_hex(),
                "61F02B1A6AE2E4B9A1FD66CE673258B4B21C0076795571EE3C9DC440DD06C46C"
            );
            block.compute_hash();
            assert_eq!(
                block
                    .hash
                    .expect("Try to get hash of an uncompleted or reduce block !")
                    .0
                    .to_hex(),
                "000000EF5B2AA849F4C3AF3D35E1284EA1F34A9F617EA806CE8371619023DC74"
            );
        }
    
        #[test]
        fn endpoint_db_tests() {
            let test_db_path = PathBuf::from("test.db");
            if test_db_path.as_path().exists() {
                fs::remove_file(&test_db_path).unwrap();
            }
            let db = WS2PModuleDatas::open_db(&test_db_path).unwrap();
    
            let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
    
            let mut endpoint = NetworkEndpoint::parse_from_raw(
                "WS2P cb06a19b g1.imirhil.fr 53012 /",
                PubKey::Ed25519(
                    ed25519::PublicKey::from_base58("5gJYnQp8v7bWwk7EWRoL8vCLof1r3y9c6VDdnGSM1GLv")
                        .unwrap(),
                ),
                1,
                current_time.as_secs(),
            ).expect("Failt to parse test endpoint !");
    
            ws2p_db::write_endpoint(&db, &endpoint, 1, current_time.as_secs());
            let mut written_endpoints =
                ws2p_db::get_endpoints_for_api(&db, &NetworkEndpointApi(String::from("WS2P")));
            assert_eq!(endpoint, written_endpoints.pop().unwrap());
    
            // Test status update
            endpoint.set_status(3);
            ws2p_db::write_endpoint(&db, &endpoint, 3, current_time.as_secs());
            let mut written_endpoints =
                ws2p_db::get_endpoints_for_api(&db, &NetworkEndpointApi(String::from("WS2P")));
            assert_eq!(endpoint, written_endpoints.pop().unwrap());
        }
    
        #[test]
        fn ws2p_requests() {
            let module_id = WS2PModule::id();
            let request = NetworkRequest::GetBlocks(
                ModuleReqFullId(module_id, ModuleReqId(58)),
                NodeFullId::default(),
                50,
                0,
            );
            assert_eq!(
                network_request_to_json(&request),
                json!({
                "reqId": format!("{:x}", 58),
                "body": {
                    "name": "BLOCKS_CHUNK",
                    "params": {
                        "count": 50,
                        "fromNumber": 0
                    }
                }
            })
            );
            assert_eq!(
                network_request_to_json(&request).to_string(),
                "{\"body\":{\"name\":\"BLOCKS_CHUNK\",\"params\":{\"count\":50,\"fromNumber\":0}},\"reqId\":\"3a\"}"
            );
        }
    
        #[test]
        fn ws2p_parse_head() {
            let head = json!({
                "message": "WS2POTMIC:HEAD:1:D9D2zaJoWYWveii1JRYLVK3J4Z7ZH3QczoKrnQeiM6mx:104512-0000051B9CE9C1CA89F269375A6751FB88B9E88DE47A36506057E5BFBCFBB276:c1c39a0a:duniter:1.6.21:3",
                "sig": "trtK9GXvTdfND995ohWEderpO3NkIqi1X6mBeVvMcaHckq+lIGqjWvJ9t9Vccz5t+VGaSmGUihDl4q6eldIYBw==",
                "messageV2": "WS2POTMIC:HEAD:2:D9D2zaJoWYWveii1JRYLVK3J4Z7ZH3QczoKrnQeiM6mx:104512-0000051B9CE9C1CA89F269375A6751FB88B9E88DE47A36506057E5BFBCFBB276:c1c39a0a:duniter:1.6.21:3:25:22",
                "sigV2": "x6ehPMuYjGY+z7wEGnJGyMBxMKUdu01RWaF0b0XCtoVjg67cCvT4H0V/Qcxn4bAGqzy5ux2fA7NiI+81bBnqDw==",
                "step": 0
            });
            let mut heads_count = 0;
            if let Some(head) = NetworkHead::from_json_value(&head) {
                if let NetworkHead::V2(ref head_v2) = head {
                    heads_count += 1;
                    assert_eq!(
                        head_v2.message.to_string(),
                        String::from("WS2POTMIC:HEAD:1:D9D2zaJoWYWveii1JRYLVK3J4Z7ZH3QczoKrnQeiM6mx:104512-0000051B9CE9C1CA89F269375A6751FB88B9E88DE47A36506057E5BFBCFBB276:c1c39a0a:duniter:1.6.21:3")
                    );
                }
                assert_eq!(head.verify(), true);
            }
            assert_eq!(heads_count, 1);
        }
    }