Skip to content
Snippets Groups Projects
Select Git revision
  • 9e044f7025b062a8056eff7b4ec4d67c0c3b38a7
  • dev default protected
  • vainamoinen197-transactiondocument-replace-vec-fields-by-smallvec-2
  • dvermd/200-keypairs-dewif
  • elois/wot
  • jawaka/155-dbex-add-dump-fork-tree-command
  • elois/195-bcdbwriteop
  • elois/deps-crypto
  • elois/gva-monetary-mass
  • elois/191-sled
  • elois/195
  • ji_emme/gva-humantimefield
  • 184-gva-rename-commontime-field-to-blockchaintime
  • ji_emme/182-gva-implement-block-meta-data
  • ji_emme/rml14
  • hugo/151-ws2pv2-sync
  • ji_emme/181-gva-implement-identity-request
  • ji_emme/89-implement-client-api-gva-graphql-verification-api
  • logo
  • test-juniper-from-schema
  • elois/exemple-gva-global-context
  • v0.2.0-a4 protected
  • v0.2.0-a2 protected
  • v0.2.0-a protected
  • v0.1.1-a1 protected
  • documents/v0.10.0-b1 protected
  • crypto/v0.4.0-b1 protected
  • crypto/v0.3.0-b3 protected
  • crypto/v0.3.0-b2 protected
  • crypto/v0.3.0-b1 protected
  • wot/v0.8.0-a0.9 protected
  • wot/v0.8.0-a0.8 protected
  • 0.1.0-a0.1 protected
  • v0.0.1-a0.12 protected
  • v0.0.1-a0.11 protected
  • v0.0.1-a0.10 protected
  • v0.0.1-a0.9 protected
  • v0.0.1-a0.8 protected
  • v0.0.1-a0.7 protected
  • v0.0.1-a0.6 protected
  • v0.0.1-a0.5 protected
41 results

datas.rs

Blame
  • datas.rs 20.40 KiB
    //  Copyright (C) 2018  The Duniter Project Developers.
    //
    // This program is free software: you can redistribute it and/or modify
    // it under the terms of the GNU Affero General Public License as
    // published by the Free Software Foundation, either version 3 of the
    // License, or (at your option) any later version.
    //
    // This program is distributed in the hope that it will be useful,
    // but WITHOUT ANY WARRANTY; without even the implied warranty of
    // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    // GNU Affero General Public License for more details.
    //
    // You should have received a copy of the GNU Affero General Public License
    // along with this program.  If not, see <https://www.gnu.org/licenses/>.
    
    use crate::*;
    use dup_crypto::keys::*;
    use durs_message::requests::BlockchainRequest;
    use durs_message::*;
    use durs_network_documents::network_endpoint::*;
    use durs_network_documents::network_head::*;
    use durs_network_documents::*;
    use std::collections::HashSet;
    use std::sync::mpsc;
    
    #[derive(Debug)]
    pub struct WS2PModuleDatas {
        pub router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
        pub currency: Option<String>,
        pub key_pair: Option<KeyPairEnum>,
        pub conf: WS2PConf,
        pub ssl: bool,
        pub node_id: NodeId,
        pub main_thread_channel: (
            mpsc::Sender<WS2PThreadSignal>,
            mpsc::Receiver<WS2PThreadSignal>,
        ),
        pub ws2p_endpoints: HashMap<NodeFullId, (EndpointV1, WS2PConnectionState)>,
        pub websockets: HashMap<NodeFullId, WsSender>,
        pub requests_awaiting_response:
            HashMap<ModuleReqId, (OldNetworkRequest, NodeFullId, SystemTime)>,
        pub heads_cache: HashMap<NodeFullId, NetworkHead>,
        pub my_head: Option<NetworkHead>,
        pub uids_cache: HashMap<PubKey, String>,
        pub count_dal_requests: u32,
    }
    
    #[inline]
    #[cfg(not(feature = "ssl"))]
    fn ssl() -> bool {
        false
    }
    #[cfg(feature = "ssl")]
    fn ssl() -> bool {
        true
    }
    
    impl WS2PModuleDatas {
        pub fn new(
            router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
            conf: WS2PConf,
            node_id: NodeId,
        ) -> WS2PModuleDatas {
            WS2PModuleDatas {
                router_sender,
                key_pair: None,
                currency: None,
                conf,
                ssl: ssl(),
                node_id,
                main_thread_channel: mpsc::channel(),
                ws2p_endpoints: HashMap::new(),
                websockets: HashMap::new(),
                requests_awaiting_response: HashMap::new(),
                heads_cache: HashMap::new(),
                my_head: None,
                uids_cache: HashMap::new(),
                count_dal_requests: 0,
            }
        }
        pub fn open_db(db_path: &PathBuf) -> Result<sqlite::Connection, sqlite::Error> {
            let conn: sqlite::Connection;
            if !db_path.as_path().exists() {
                conn = sqlite::open(db_path.as_path())?;
                conn.execute(
                    "CREATE TABLE endpoints (hash_full_id TEXT, status INTEGER, node_id INTEGER, pubkey TEXT,
                    api INTEGER, version INTEGER, endpoint TEXT, last_check INTEGER);",
                )?;
            } else {
                conn = sqlite::open(db_path.as_path())?;
            }
            Ok(conn)
        }
        pub fn send_dal_request(&mut self, req: &BlockchainRequest) {
            self.count_dal_requests += 1;
            if self.count_dal_requests == std::u32::MAX {
                self.count_dal_requests = 0;
            }
            self.router_sender
                .send(RouterThreadMessage::ModuleMessage(DursMsg::Request {
                    req_from: WS2PModule::name(),
                    req_to: ModuleRole::BlockchainDatas,
                    req_id: ModuleReqId(self.count_dal_requests),
                    req_content: DursReqContent::BlockchainRequest(req.clone()),
                }))
                .expect("Fail to send message to router !");
        }
        pub fn send_network_req_response(
            &self,
            requester: ModuleStaticName,
            req_id: ModuleReqId,
            response: NetworkResponse,
        ) {
            self.router_sender
                .send(RouterThreadMessage::ModuleMessage(DursMsg::Response {
                    res_from: WS2PModule::name(),
                    res_to: requester,
                    req_id,
                    res_content: DursResContent::NetworkResponse(response),
                }))
                .expect("Fail to send message to router !");
        }
        pub fn send_network_event(&self, event: &NetworkEvent) {
            let module_event = match event {
                NetworkEvent::ConnectionStateChange(_, _, _, _) => {
                    ModuleEvent::ConnectionsChangeNodeNetwork
                }
                NetworkEvent::ReceiveBlocks(_) => ModuleEvent::NewBlockFromNetwork,
                NetworkEvent::ReceiveDocuments(network_docs) => {
                    if !network_docs.is_empty() {
                        match network_docs[0] {
                            BlockchainDocument::Block(_) => ModuleEvent::NewBlockFromNetwork,
                            BlockchainDocument::Transaction(_) => ModuleEvent::NewTxFromNetwork,
                            _ => ModuleEvent::NewWotDocFromNetwork,
                        }
                    } else {
                        return;
                    }
                }
                NetworkEvent::ReceiveHeads(_) => ModuleEvent::NewValidHeadFromNetwork,
                NetworkEvent::ReceivePeers(_) => ModuleEvent::NewValidPeerFromNodeNetwork,
                NetworkEvent::NewSelfPeer(_) => ModuleEvent::NewSelfPeer,
            };
            self.router_sender
                .send(RouterThreadMessage::ModuleMessage(DursMsg::Event {
                    event_type: module_event,
                    event_content: DursEvent::NetworkEvent(event.clone()),
                }))
                .expect("Fail to send network event to router !");
        }
        fn count_established_connections(&self) -> usize {
            let mut count_established_connections = 0;
            for (_ws2p_full_id, (_ep, state)) in self.ws2p_endpoints.clone() {
                if let WS2PConnectionState::Established = state {
                    count_established_connections += 1;
                }
            }
            count_established_connections
        }
        pub fn connect_to_know_endpoints(&mut self) {
            info!("WS2P: connect to know endpoints...");
            let mut count_established_connections = 0;
            let mut pubkeys = HashSet::new();
            let mut reachable_endpoints = Vec::new();
            let mut unreachable_endpoints = Vec::new();
            for (_ws2p_full_id, (ep, state)) in self.ws2p_endpoints.clone() {
                if ep.issuer == self.key_pair.unwrap().public_key() || !pubkeys.contains(&ep.issuer) {
                    match state {
                        WS2PConnectionState::Established => count_established_connections += 1,
                        WS2PConnectionState::NeverTry
                        | WS2PConnectionState::Close
                        | WS2PConnectionState::Denial => {
                            pubkeys.insert(ep.issuer);
                            reachable_endpoints.push(ep);
                        }
                        _ => {
                            pubkeys.insert(ep.issuer);
                            unreachable_endpoints.push(ep);
                        }
                    }
                }
            }
            let mut free_outcoming_rooms =
                self.conf.clone().outcoming_quota - count_established_connections;
            while free_outcoming_rooms > 0 {
                let ep = if !reachable_endpoints.is_empty() {
                    reachable_endpoints
                        .pop()
                        .expect("WS2P: Fail to pop() reachable_endpoints !")
                } else if !unreachable_endpoints.is_empty() {
                    unreachable_endpoints
                        .pop()
                        .expect("WS2P: Fail to pop() unreachable_endpoints !")
                } else {
                    break;
                };
                if !self.ssl && ep.port == 443 {
                    continue;
                }
                self.connect_to_without_checking_quotas(&ep);
                free_outcoming_rooms -= 1;
            }
        }
        pub fn connect_to(&mut self, endpoint: &EndpointV1) {
            // Add endpoint to endpoints list (if there isn't already)
            match self.ws2p_endpoints.get(
                &endpoint
                    .node_full_id()
                    .expect("WS2P: Fail to get ep.node_full_id() !"),
            ) {
                Some(_) => {
                    self.ws2p_endpoints
                        .get_mut(
                            &endpoint
                                .node_full_id()
                                .expect("WS2P: Fail to get ep.node_full_id() !"),
                        )
                        .expect("WS2P: Fail to get_mut() a ws2p_endpoint !")
                        .1 = WS2PConnectionState::NeverTry;
                }
                None => {
                    self.ws2p_endpoints.insert(
                        endpoint
                            .node_full_id()
                            .expect("WS2P: Fail to get ep.node_full_id() !"),
                        (endpoint.clone(), WS2PConnectionState::NeverTry),
                    );
                }
            };
            if self.conf.clone().outcoming_quota > self.count_established_connections() {
                self.connect_to_without_checking_quotas(&endpoint);
            }
        }
        pub fn close_connection(
            &mut self,
            ws2p_full_id: &NodeFullId,
            reason: WS2PCloseConnectionReason,
        ) {
            match reason {
                WS2PCloseConnectionReason::NegociationTimeout => {}
                WS2PCloseConnectionReason::AuthMessInvalidSig
                | WS2PCloseConnectionReason::Timeout
                | WS2PCloseConnectionReason::WsError
                | WS2PCloseConnectionReason::Unknow => {
                    self.ws2p_endpoints
                        .get_mut(ws2p_full_id)
                        .expect("Failure : attempt to delete a non-existent connection !")
                        .1 = WS2PConnectionState::Close
                }
            }
            if let Some(websocket) = self.websockets.get(&ws2p_full_id) {
                let _result = websocket.0.close(ws::CloseCode::Normal);
            }
            let _result = self.websockets.remove(ws2p_full_id);
        }
        pub fn ws2p_conn_message_pretreatment(&mut self, message: WS2PConnectionMessage) -> WS2PSignal {
            let ws2p_full_id = message.0;
            match message.1 {
                WS2PConnectionMessagePayload::WrongUrl
                | WS2PConnectionMessagePayload::FailOpenWS
                | WS2PConnectionMessagePayload::FailToSplitWS => {
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("WS2P: Fail to get mut ep !")
                        .1 = WS2PConnectionState::WSError;
                    return WS2PSignal::WSError(ws2p_full_id);
                }
                WS2PConnectionMessagePayload::TryToSendConnectMess => {
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("WS2P: Fail to get mut ep !")
                        .1 = WS2PConnectionState::TryToSendConnectMess;
                }
                WS2PConnectionMessagePayload::FailSendConnectMess => {
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("WS2P: Fail to mut ep !")
                        .1 = WS2PConnectionState::Unreachable;
                }
                WS2PConnectionMessagePayload::WebsocketOk(sender) => {
                    self.websockets.insert(ws2p_full_id, sender);
                }
                WS2PConnectionMessagePayload::ValidConnectMessage(response, new_con_state) => {
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("WS2P: Fail to get mut ep !")
                        .1 = new_con_state;
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("Endpoint don't exist !")
                        .1 = WS2PConnectionState::ConnectMessOk;
                    debug!("Send: {:#?}", response);
                    if let Some(websocket) = self.websockets.get_mut(&ws2p_full_id) {
                        if websocket.0.send(Message::text(response)).is_err() {
                            return WS2PSignal::WSError(ws2p_full_id);
                        }
                    } else {
                        // Connection closed by remote peer
                        self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("Endpoint don't exist !")
                        .1 = WS2PConnectionState::Close;
                    }
                }
                WS2PConnectionMessagePayload::ValidAckMessage(response, new_con_state) => {
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("WS2P: Fail to get mut ep !")
                        .1 = new_con_state;
                    if let WS2PConnectionState::AckMessOk = self.ws2p_endpoints[&ws2p_full_id].1 {
                        debug!("Send: {:#?}", response);
                        if let Some(websocket) = self.websockets.get_mut(&ws2p_full_id) {
                            if websocket.0.send(Message::text(response)).is_err() {
                                return WS2PSignal::WSError(ws2p_full_id);
                            }
                        } else {
                            panic!("Fatal error : no websocket for {} !", ws2p_full_id);
                        }
                    }
                }
                WS2PConnectionMessagePayload::ValidOk(new_con_state) => {
                    self.ws2p_endpoints
                        .get_mut(&ws2p_full_id)
                        .expect("WS2P: Fail to get mut ep !")
                        .1 = new_con_state;
                    match self.ws2p_endpoints[&ws2p_full_id].1 {
                        WS2PConnectionState::OkMessOkWaitingAckMess => {}
                        WS2PConnectionState::Established => {
                            return WS2PSignal::ConnectionEstablished(ws2p_full_id);
                        }
                        _ => {
                            self.close_connection(&ws2p_full_id, WS2PCloseConnectionReason::Unknow);
                            return WS2PSignal::Empty;
                        }
                    }
                }
                WS2PConnectionMessagePayload::DalRequest(req_id, req_body) => {
                    return WS2PSignal::DalRequest(ws2p_full_id, req_id, req_body);
                }
                WS2PConnectionMessagePayload::PeerCard(body, ws2p_endpoints) => {
                    return WS2PSignal::PeerCard(ws2p_full_id, body, ws2p_endpoints);
                }
                WS2PConnectionMessagePayload::Heads(heads) => {
                    let mut applied_heads = Vec::with_capacity(heads.len());
                    for head in heads {
                        if let Ok(head) = NetworkHead::from_json_value(&head) {
                            if head.verify()
                                && (self.my_head.is_none()
                                    || head.node_full_id()
                                        != self
                                            .my_head
                                            .clone()
                                            .expect("WS2P: Fail to clone my_head")
                                            .node_full_id())
                                && head.apply(&mut self.heads_cache)
                            {
                                applied_heads.push(head);
                            }
                        }
                    }
                    return WS2PSignal::Heads(ws2p_full_id, applied_heads);
                }
                WS2PConnectionMessagePayload::Document(network_doc) => {
                    return WS2PSignal::Document(ws2p_full_id, network_doc);
                }
                WS2PConnectionMessagePayload::ReqResponse(req_id, response) => {
                    if self.requests_awaiting_response.len() > req_id.0 as usize {
                        if let Some((ref ws2p_request, ref recipient_fulld_id, ref _timestamp)) =
                            self.requests_awaiting_response.remove(&req_id)
                        {
                            return WS2PSignal::ReqResponse(
                                req_id,
                                *ws2p_request,
                                *recipient_fulld_id,
                                response,
                            );
                        }
                    }
                }
                WS2PConnectionMessagePayload::NegociationTimeout => {
                    match self.ws2p_endpoints[&ws2p_full_id].1 {
                        WS2PConnectionState::AckMessOk | WS2PConnectionState::ConnectMessOk => {
                            self.ws2p_endpoints
                                .get_mut(&ws2p_full_id)
                                .expect("WS2P: Fail to get mut ep !")
                                .1 = WS2PConnectionState::Denial
                        }
                        WS2PConnectionState::WaitingConnectMess => {
                            self.ws2p_endpoints
                                .get_mut(&ws2p_full_id)
                                .expect("WS2P: Fail to get mut ep !")
                                .1 = WS2PConnectionState::NoResponse
                        }
                        _ => {
                            self.ws2p_endpoints
                                .get_mut(&ws2p_full_id)
                                .expect("WS2P: Fail to get mut ep !")
                                .1 = WS2PConnectionState::Unreachable
                        }
                    }
                    self.close_connection(&ws2p_full_id, WS2PCloseConnectionReason::NegociationTimeout);
                    return WS2PSignal::NegociationTimeout(ws2p_full_id);
                }
                WS2PConnectionMessagePayload::Timeout => {
                    self.close_connection(&ws2p_full_id, WS2PCloseConnectionReason::Timeout);
                    return WS2PSignal::Timeout(ws2p_full_id);
                }
                WS2PConnectionMessagePayload::UnknowMessage => {
                    warn!("WS2P : Receive Unknow Message from {}.", &ws2p_full_id.1)
                }
                WS2PConnectionMessagePayload::WrongFormatMessage => warn!(
                    "WS2P : Receive Wrong Format Message from {}.",
                    &ws2p_full_id.1
                ),
                WS2PConnectionMessagePayload::InvalidMessage => return WS2PSignal::Empty,
                WS2PConnectionMessagePayload::Close => {
                    if self.websockets.contains_key(&ws2p_full_id) {
                        self.close_connection(
                            &ws2p_full_id,
                            WS2PCloseConnectionReason::AuthMessInvalidSig,
                        )
                    }
                }
            }
            let connections_count = self.websockets.len();
            if connections_count == 0 {
                return WS2PSignal::NoConnection;
            }
            // Detect timeout requests
            let mut requests_timeout = Vec::new();
            for &(ref req, ref _ws2p_full_id, ref timestamp) in
                self.requests_awaiting_response.clone().values()
            {
                if SystemTime::now().duration_since(*timestamp).unwrap() > Duration::new(20, 0) {
                    requests_timeout.push(req.get_req_full_id());
                    warn!("request timeout : {:?}", req);
                }
            }
            // Delete (and resend) timeout requests
            for req_id in requests_timeout {
                //let ws2p_endpoints = self.ws2p_endpoints.clone();
                let _request_option = self.requests_awaiting_response.remove(&req_id.1);
                /*if let Some((request, _, _)) = request_option {
                    let _request_result = self.send_request_to_specific_node(
                        &get_random_connection(&ws2p_endpoints),
                        &request,
                    );
                }*/
            }
            WS2PSignal::Empty
        }
    
        pub fn send_request_to_specific_node(
            &mut self,
            receiver_ws2p_full_id: &NodeFullId,
            ws2p_request: &OldNetworkRequest,
        ) -> ws::Result<()> {
            self.websockets
                .get_mut(receiver_ws2p_full_id)
                .expect("WS2P: Fail to get mut websocket !")
                .0
                .send(Message::text(
                    network_request_to_json(ws2p_request).to_string(),
                ))?;
            self.requests_awaiting_response.insert(
                ws2p_request.get_req_id(),
                (*ws2p_request, *receiver_ws2p_full_id, SystemTime::now()),
            );
            debug!(
                "send request {} to {}",
                network_request_to_json(ws2p_request).to_string(),
                receiver_ws2p_full_id
            );
            Ok(())
        }
    
        fn connect_to_without_checking_quotas(&mut self, endpoint: &EndpointV1) {
            let endpoint_copy = endpoint.clone();
            let conductor_sender_copy = self.main_thread_channel.0.clone();
            let currency_copy = self.currency.clone();
            let key_pair_copy = self.key_pair;
            thread::spawn(move || {
                let _result = connect_to_ws2p_endpoint(
                    &endpoint_copy,
                    &conductor_sender_copy,
                    &currency_copy.expect("WS2PError : No currency !"),
                    key_pair_copy.expect("WS2PError : No key_pair !"),
                );
            });
        }
    }