From 3a76591ef319102a9ad5705ec72dcc251e2d0827 Mon Sep 17 00:00:00 2001 From: librelois <elois@ifee.fr> Date: Mon, 20 May 2019 21:28:36 +0200 Subject: [PATCH] [feat] ws2pv1: impl ws2pv1 requests --- lib/modules/ws2p-v1-legacy/Cargo.toml | 1 + lib/modules/ws2p-v1-legacy/src/constants.rs | 2 +- .../ws2p-v1-legacy/src/events/received.rs | 3 +- lib/modules/ws2p-v1-legacy/src/lib.rs | 255 +++++++----------- .../ws2p-v1-legacy/src/requests/received.rs | 13 +- .../ws2p-v1-legacy/src/requests/sent.rs | 9 +- .../ws2p-v1-legacy/src/responses/received.rs | 85 ++++++ lib/modules/ws2p-v1-legacy/src/serializer.rs | 20 -- .../ws2p-v1-legacy/src/serializers/block.rs | 92 +++++++ .../src/serializers/certification.rs | 32 +++ .../ws2p-v1-legacy/src/serializers/head.rs | 39 +++ .../src/serializers/identity.rs | 29 ++ .../src/serializers/membership.rs | 33 +++ .../ws2p-v1-legacy/src/serializers/mod.rs | 30 +++ .../ws2p-v1-legacy/src/serializers/revoked.rs | 25 ++ .../src/serializers/transaction.rs | 38 +++ .../src/ws_connections/handler.rs | 58 ++-- .../src/ws_connections/messages.rs | 135 +++++----- .../src/ws_connections/meta_datas.rs | 165 +++++++----- .../ws2p-v1-legacy/src/ws_connections/mod.rs | 14 +- .../src/ws_connections/requests/mod.rs | 217 +++++++++++++++ .../src/ws_connections/requests/received.rs | 70 +++++ .../src/ws_connections/requests/sent.rs | 44 ++- .../src/ws_connections/responses/mod.rs | 110 ++++++++ .../src/ws_connections/responses/received.rs | 88 ++++++ .../src/ws_connections/responses/sent.rs | 40 +++ 26 files changed, 1262 insertions(+), 385 deletions(-) delete mode 100644 lib/modules/ws2p-v1-legacy/src/serializer.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/block.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/certification.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/head.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/identity.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/membership.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/mod.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/revoked.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/serializers/transaction.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/ws_connections/responses/mod.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/ws_connections/responses/received.rs create mode 100644 lib/modules/ws2p-v1-legacy/src/ws_connections/responses/sent.rs diff --git a/lib/modules/ws2p-v1-legacy/Cargo.toml b/lib/modules/ws2p-v1-legacy/Cargo.toml index f89033ea..0fcddd90 100644 --- a/lib/modules/ws2p-v1-legacy/Cargo.toml +++ b/lib/modules/ws2p-v1-legacy/Cargo.toml @@ -29,6 +29,7 @@ serde = { version = "1.0.*", features = ["derive"] } serde_json = "1.0.*" structopt= "0.2.*" unwrap = "1.2.1" +uuid = { version = "0.7.4", features = ["serde", "v4"] } ws = "0.7.*" [features] diff --git a/lib/modules/ws2p-v1-legacy/src/constants.rs b/lib/modules/ws2p-v1-legacy/src/constants.rs index f9d92d6a..1eb189ed 100644 --- a/lib/modules/ws2p-v1-legacy/src/constants.rs +++ b/lib/modules/ws2p-v1-legacy/src/constants.rs @@ -37,7 +37,7 @@ pub static WS2P_DEFAULT_OUTCOMING_QUOTA: &'static usize = &10; pub static WS2P_NEGOTIATION_TIMEOUT: &'static u64 = &15; /// Maximum waiting time for a response to a request -//pub static WS2P_REQUEST_TIMEOUT : &'static u64 = &30; +pub static WS2P_V1_REQUESTS_TIMEOUT_IN_SECS: &'static u64 = &30; /// Maximum duration of inactivity of a connection (the connection will be closed after this delay) pub static WS2P_EXPIRE_TIMEOUT: &'static u64 = &120; diff --git a/lib/modules/ws2p-v1-legacy/src/events/received.rs b/lib/modules/ws2p-v1-legacy/src/events/received.rs index 2bed2025..6b137337 100644 --- a/lib/modules/ws2p-v1-legacy/src/events/received.rs +++ b/lib/modules/ws2p-v1-legacy/src/events/received.rs @@ -15,6 +15,7 @@ //! Sub-module managing events received from other durs modules +use crate::serializers::IntoWS2Pv1Json; use crate::*; use dubp_documents::Document; use durs_message::events::DursEvent; @@ -47,7 +48,7 @@ pub fn receive_event( NetworkEvent::ReceiveHeads(vec![unwrap!(ws2p_module.my_head.clone())]), ); // Send my head to all connections - let my_json_head = serializer::serialize_head(unwrap!(ws2p_module.my_head.clone())); + let my_json_head = unwrap!(ws2p_module.my_head.clone()).into_ws2p_v1_json(); trace!("Send my HEAD: {:#?}", my_json_head); let _results: Result<(), ws::Error> = ws2p_module .websockets diff --git a/lib/modules/ws2p-v1-legacy/src/lib.rs b/lib/modules/ws2p-v1-legacy/src/lib.rs index be1ce898..79076cca 100644 --- a/lib/modules/ws2p-v1-legacy/src/lib.rs +++ b/lib/modules/ws2p-v1-legacy/src/lib.rs @@ -15,6 +15,7 @@ //! WebSocketToPeer API for the Durs project. +#![allow(clippy::large_enum_variant)] #![deny( missing_debug_implementations, missing_copy_implementations, @@ -42,7 +43,7 @@ mod ok_message; pub mod parsers; mod requests; mod responses; -pub mod serializer; +pub mod serializers; mod subcommands; pub mod ws2p_db; pub mod ws_connections; @@ -51,11 +52,11 @@ use crate::ack_message::WS2PAckMessageV1; use crate::connect_message::WS2PConnectMessageV1; use crate::constants::*; use crate::ok_message::WS2POkMessageV1; -use crate::parsers::blocks::parse_json_block; use crate::requests::sent::send_dal_request; use crate::subcommands::WS2PSubCommands; use crate::ws2p_db::DbEndpoint; -use crate::ws_connections::messages::WS2PConnectionMessage; +use crate::ws_connections::messages::WS2Pv1Msg; +use crate::ws_connections::requests::{WS2Pv1ReqBody, WS2Pv1ReqFullId, WS2Pv1ReqId, WS2Pv1Request}; use crate::ws_connections::states::WS2PConnectionState; use crate::ws_connections::*; use dubp_documents::{Blockstamp, CurrencyName}; @@ -185,13 +186,17 @@ pub enum WS2PSignal { ConnectionEstablished(NodeFullId), NegociationTimeout(NodeFullId), Timeout(NodeFullId), - DalRequest(NodeFullId, ModuleReqId, serde_json::Value), + Request { + from: NodeFullId, + req_id: WS2Pv1ReqId, + body: WS2Pv1ReqBody, + }, PeerCard(NodeFullId, serde_json::Value, Vec<EndpointV1>), Heads(NodeFullId, Vec<NetworkHead>), Document(NodeFullId, BlockchainDocument), ReqResponse( - ModuleReqId, - OldNetworkRequest, + ModuleReqFullId, + WS2Pv1ReqBody, NodeFullId, serde_json::Value, ), @@ -227,8 +232,8 @@ pub struct WS2Pv1Module { pub my_head: Option<NetworkHead>, pub next_receiver: usize, pub node_id: NodeId, - pub requests_awaiting_response: - HashMap<ModuleReqId, (OldNetworkRequest, NodeFullId, SystemTime)>, + pub pending_received_requests: HashMap<ModuleReqId, WS2Pv1ReqFullId>, + pub requests_awaiting_response: HashMap<WS2Pv1ReqId, WS2Pv1PendingReqInfos>, pub router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>, pub soft_name: &'static str, pub soft_version: &'static str, @@ -238,6 +243,14 @@ pub struct WS2Pv1Module { pub uids_cache: HashMap<PubKey, String>, } +#[derive(Copy, Clone, Debug)] +pub struct WS2Pv1PendingReqInfos { + requester_module: ModuleReqFullId, + req_body: WS2Pv1ReqBody, + recipient_node: NodeFullId, + timestamp: SystemTime, +} + impl WS2Pv1Module { pub fn new( soft_meta_datas: &SoftwareMetaDatas<DuRsConf>, @@ -259,6 +272,7 @@ impl WS2Pv1Module { node_id: NodeId(soft_meta_datas.conf.my_node_id()), main_thread_channel: mpsc::channel(), next_receiver: 0, + pending_received_requests: HashMap::new(), ws2p_endpoints: HashMap::new(), websockets: HashMap::new(), requests_awaiting_response: HashMap::new(), @@ -273,7 +287,7 @@ impl WS2Pv1Module { #[derive(Debug)] pub enum WS2PThreadSignal { DursMsg(Box<DursMsg>), - WS2PConnectionMessage(WS2PConnectionMessage), + WS2Pv1Msg(WS2Pv1Msg), } #[derive(Copy, Clone, Debug)] @@ -390,14 +404,24 @@ impl DursModule<DuRsConf, DursMsg> for WS2Pv1Module { let mut conf = WS2PConf::default(); if global_conf.currency() == CurrencyName("g1-test".to_owned()) { - conf.sync_endpoints = vec![unwrap!(EndpointV1::parse_from_raw( - "WS2P 3eaab4c7 ts.gt.librelois.fr 443 /ws2p", - PubKey::Ed25519(unwrap!(ed25519::PublicKey::from_base58( - "CrznBiyq8G4RVUprH9jHmAw1n1iuzw8y9FdJbrESnaX7", - )),), - 0, - 0, - ))]; + conf.sync_endpoints = vec![ + unwrap!(EndpointV1::parse_from_raw( + "WS2P 3eaab4c7 ts.gt.librelois.fr 443 /ws2p", + PubKey::Ed25519(unwrap!(ed25519::PublicKey::from_base58( + "CrznBiyq8G4RVUprH9jHmAw1n1iuzw8y9FdJbrESnaX7", + )),), + 0, + 0, + )), + unwrap!(EndpointV1::parse_from_raw( + "WS2P 9c659296 localhost 10901", + PubKey::Ed25519(unwrap!(ed25519::PublicKey::from_base58( + "BFhFZv7p6kUxVyVStCD26fuDDGQop2h9nbGbaeD55Mkj", + )),), + 0, + 0, + )), + ]; } if let Some(module_user_conf) = module_user_conf.clone() { @@ -577,16 +601,15 @@ impl DursModule<DuRsConf, DursMsg> for WS2Pv1Module { // Start connect_to_know_endpoints(&mut ws2p_module); - ws2p_module.main_loop(start_time, soft_meta_datas); + ws2p_module.main_loop(start_time); Ok(()) } } impl WS2Pv1Module { - fn main_loop(mut self, start_time: SystemTime, soft_meta_datas: &SoftwareMetaDatas<DuRsConf>) { + fn main_loop(mut self, start_time: SystemTime) { // Initialize variables - let key_pair = self.key_pair; let mut last_ws2p_connecting_wave = SystemTime::now(); let mut last_ws2p_state_print = SystemTime::now(); let mut last_ws2p_endpoints_write = SystemTime::now(); @@ -600,8 +623,8 @@ impl WS2Pv1Module { .recv_timeout(Duration::from_millis(200)) { Ok(message) => match message { - WS2PThreadSignal::DursMsg(ref durs_mesage) => { - match *durs_mesage.deref() { + WS2PThreadSignal::DursMsg(durs_mesage) => { + match durs_mesage.deref() { DursMsg::Stop => break, DursMsg::Request { ref req_content, .. @@ -616,99 +639,40 @@ impl WS2Pv1Module { event_content, ), DursMsg::Response { - ref res_content, .. + req_id, + res_content, + .. } => { - if let DursResContent::BlockchainResponse(ref bc_res) = *res_content - { - match *bc_res.deref() { - BlockchainResponse::CurrentBlockstamp( - ref current_blockstamp_, - ) => { - debug!( - "WS2Pv1Module : receive DALResBc::CurrentBlockstamp({})", - self.current_blockstamp - ); - self.current_blockstamp = *current_blockstamp_; - if self.my_head.is_none() { - self.my_head = Some(heads::generate_my_head( - &key_pair, - NodeId(soft_meta_datas.conf.my_node_id()), - soft_meta_datas.soft_name, - soft_meta_datas.soft_version, - &self.current_blockstamp, - None, - )); - } - let event = - NetworkEvent::ReceiveHeads(vec![unwrap!(self - .my_head - .clone())]); - events::sent::send_network_event(&mut self, event); - } - BlockchainResponse::UIDs(ref uids) => { - // Add uids to heads - for head in self.heads_cache.values_mut() { - if let Some(uid_option) = uids.get(&head.pubkey()) { - if let Some(ref uid) = *uid_option { - head.set_uid(uid); - self.uids_cache - .insert(head.pubkey(), uid.to_string()); - } else { - self.uids_cache.remove(&head.pubkey()); - } - } - } - // Resent heads to other modules - let event = NetworkEvent::ReceiveHeads( - self.heads_cache.values().cloned().collect(), - ); - events::sent::send_network_event(&mut self, event); - // Resent to other modules connections that match receive uids - let events = self.ws2p_endpoints - .iter() - .filter_map(|(node_full_id, DbEndpoint { ep, state, .. })| { - if let Some(uid_option) = uids.get(&node_full_id.1) { - Some(NetworkEvent::ConnectionStateChange( - *node_full_id, - *state as u32, - uid_option.clone(), - ep.get_url(false, false) - .expect("Endpoint unreachable !"), - )) - } else { - None - } - }) - .collect(); - events::sent::send_network_events(&mut self, events); - } - _ => {} // Others BlockchainResponse variants - } - } + responses::received::receive_response( + &mut self, + *req_id, + res_content, + ); } _ => {} // Others DursMsg variants } } - WS2PThreadSignal::WS2PConnectionMessage(ws2p_conn_message) => { - match crate::ws_connections::messages::ws2p_conn_message_pretreatment( - &mut self, - ws2p_conn_message, + WS2PThreadSignal::WS2Pv1Msg(msg) => { + match crate::ws_connections::messages::ws2p_recv_message_pretreatment( + &mut self, msg, ) { WS2PSignal::NoConnection => { warn!("WS2PSignal::NoConnection"); } WS2PSignal::ConnectionEstablished(ws2p_full_id) => { - let req_id = + let module_req_id = ModuleReqId(self.requests_awaiting_response.len() as u32); let module_id = WS2Pv1Module::name(); debug!("WS2P: send req to: ({:?})", ws2p_full_id); let _current_request_result = ws_connections::requests::sent::send_request_to_specific_node( &mut self, + ModuleReqFullId(module_id, module_req_id), &ws2p_full_id, - &OldNetworkRequest::GetCurrent(ModuleReqFullId( - module_id, req_id, - )), + &WS2Pv1Request { + id: WS2Pv1ReqId::random(), + body: WS2Pv1ReqBody::GetCurrent, + }, ); if self.uids_cache.get(&ws2p_full_id.1).is_none() { send_dal_request( @@ -827,68 +791,24 @@ impl WS2Pv1Module { NetworkEvent::ReceiveDocuments(vec![network_doc]), ); } - WS2PSignal::ReqResponse(req_id, req, recipient_full_id, response) => { - match req { - OldNetworkRequest::GetCurrent(ref _req_id) => { - info!( - "WS2PSignal::ReceiveCurrent({}, {:?})", - req_id.0, req - ); - if let Some(block) = parse_json_block(&response) { - crate::responses::sent::send_network_req_response( - &self, - req.get_req_full_id().0, - req.get_req_full_id().1, - NetworkResponse::CurrentBlock( - ModuleReqFullId(WS2Pv1Module::name(), req_id), - recipient_full_id, - Box::new(block), - ), - ); - } - } - OldNetworkRequest::GetBlocks(ref _req_id, count, from) => { - info!( - "WS2PSignal::ReceiveChunk({}, {} blocks from {})", - req_id.0, count, from - ); - if response.is_array() { - let mut chunk = Vec::new(); - for json_block in unwrap!(response.as_array()) { - if let Some(block) = parse_json_block(json_block) { - chunk.push(block); - } else { - warn!("WS2Pv1Module: Error : fail to parse one json block !"); - } - } - debug!("Send chunk to followers : {}", from); - events::sent::send_network_event( - &mut self, - NetworkEvent::ReceiveBlocks(chunk), - ); - } - } - OldNetworkRequest::GetRequirementsPending( - _req_id, - min_cert, - ) => { - info!( - "WS2PSignal::ReceiveRequirementsPending({}, {})", - req_id.0, min_cert - ); - debug!("----------------------------------------"); - debug!("- BEGIN IDENTITIES PENDING -"); - debug!("----------------------------------------"); - debug!("{:#?}", response); - debug!("----------------------------------------"); - debug!("- END IDENTITIES PENDING -"); - debug!("----------------------------------------"); - } - _ => {} - } + WS2PSignal::Request { from, req_id, body } => { + ws_connections::requests::received::receive_ws2p_v1_request( + &mut self, from, req_id, body, + ); } + WS2PSignal::ReqResponse( + module_req_full_id, + ws2p_req_body, + recipient_full_id, + response, + ) => ws_connections::responses::received::receive_response( + &mut self, + module_req_full_id, + ws2p_req_body, + recipient_full_id, + response, + ), WS2PSignal::Empty => {} - _ => {} } } }, @@ -960,8 +880,9 @@ mod tests { use super::parsers::blocks::parse_json_block; use super::*; use crate::ws_connections::requests::sent::network_request_to_json; + use crate::ws_connections::requests::*; use dubp_documents::documents::block::BlockDocument; - use durs_module::DursModule; + use dubp_documents::BlockNumber; #[test] fn test_parse_json_block() { @@ -1131,13 +1052,19 @@ mod tests { #[test] fn ws2p_requests() { - let module_id = WS2Pv1Module::name(); - let request = - OldNetworkRequest::GetBlocks(ModuleReqFullId(module_id, ModuleReqId(58)), 50, 0); + let req_id_str = "fbcf0bfa-7e18-40cc-b300-5c797d27518e"; + let req_id = WS2Pv1ReqId::from_str(req_id_str).expect("fail to parse req_id"); + let request = WS2Pv1Request { + id: req_id, + body: WS2Pv1ReqBody::GetBlocks { + count: 50, + from_number: BlockNumber(0), + }, + }; assert_eq!( network_request_to_json(&request), json!({ - "reqId": format!("{:x}", 58), + "reqId": req_id_str, "body": { "name": "BLOCKS_CHUNK", "params": { @@ -1149,7 +1076,7 @@ mod tests { ); assert_eq!( network_request_to_json(&request).to_string(), - "{\"body\":{\"name\":\"BLOCKS_CHUNK\",\"params\":{\"count\":50,\"fromNumber\":0}},\"reqId\":\"3a\"}" + "{\"body\":{\"name\":\"BLOCKS_CHUNK\",\"params\":{\"count\":50,\"fromNumber\":0}},\"reqId\":\"fbcf0bfa-7e18-40cc-b300-5c797d27518e\"}" ); } diff --git a/lib/modules/ws2p-v1-legacy/src/requests/received.rs b/lib/modules/ws2p-v1-legacy/src/requests/received.rs index 838d2eff..fca0ebe7 100644 --- a/lib/modules/ws2p-v1-legacy/src/requests/received.rs +++ b/lib/modules/ws2p-v1-legacy/src/requests/received.rs @@ -16,15 +16,17 @@ //! Sub-module managing the inter-modules requests received. use crate::ws2p_db::DbEndpoint; +use crate::ws_connections::requests::{WS2Pv1ReqBody, WS2Pv1ReqId, WS2Pv1Request}; use crate::ws_connections::states::WS2PConnectionState; use crate::WS2Pv1Module; +use dubp_documents::BlockNumber; use durs_message::requests::DursReqContent; use durs_network::requests::OldNetworkRequest; pub fn receive_req(ws2p_module: &mut WS2Pv1Module, req_content: &DursReqContent) { if let DursReqContent::OldNetworkRequest(ref old_net_request) = *req_content { match *old_net_request { - OldNetworkRequest::GetBlocks(ref req_id, ref count, ref from) => { + OldNetworkRequest::GetBlocks(ref module_req_full_id, ref count, ref from) => { let mut receiver_index = 0; let mut real_receiver = None; for (ws2p_full_id, DbEndpoint { state, .. }) in ws2p_module.ws2p_endpoints.clone() { @@ -52,8 +54,15 @@ pub fn receive_req(ws2p_module: &mut WS2Pv1Module, req_content: &DursReqContent) let _blocks_request_result = crate::ws_connections::requests::sent::send_request_to_specific_node( ws2p_module, + *module_req_full_id, &real_receiver, - &OldNetworkRequest::GetBlocks(*req_id, *count, *from), + &WS2Pv1Request { + id: WS2Pv1ReqId::random(), + body: WS2Pv1ReqBody::GetBlocks { + count: *count, + from_number: BlockNumber(*from), + }, + }, ); } else { warn!("WS2P: not found peer to send request !"); diff --git a/lib/modules/ws2p-v1-legacy/src/requests/sent.rs b/lib/modules/ws2p-v1-legacy/src/requests/sent.rs index ad8963e5..624c8617 100644 --- a/lib/modules/ws2p-v1-legacy/src/requests/sent.rs +++ b/lib/modules/ws2p-v1-legacy/src/requests/sent.rs @@ -20,18 +20,23 @@ use durs_message::requests::{BlockchainRequest, DursReqContent}; use durs_message::*; use durs_module::{DursModule, ModuleReqId, ModuleRole, RouterThreadMessage}; -pub fn send_dal_request(ws2p_module: &mut WS2Pv1Module, req: &BlockchainRequest) { +pub fn send_dal_request(ws2p_module: &mut WS2Pv1Module, req: &BlockchainRequest) -> ModuleReqId { ws2p_module.count_dal_requests += 1; if ws2p_module.count_dal_requests == std::u32::MAX { ws2p_module.count_dal_requests = 0; } + + let req_id = ModuleReqId(ws2p_module.count_dal_requests); + ws2p_module .router_sender .send(RouterThreadMessage::ModuleMessage(DursMsg::Request { req_from: WS2Pv1Module::name(), req_to: ModuleRole::BlockchainDatas, - req_id: ModuleReqId(ws2p_module.count_dal_requests), + req_id, req_content: DursReqContent::BlockchainRequest(req.clone()), })) .expect("Fail to send message to router !"); + + req_id } diff --git a/lib/modules/ws2p-v1-legacy/src/responses/received.rs b/lib/modules/ws2p-v1-legacy/src/responses/received.rs index 64e0f487..7f64853b 100644 --- a/lib/modules/ws2p-v1-legacy/src/responses/received.rs +++ b/lib/modules/ws2p-v1-legacy/src/responses/received.rs @@ -14,3 +14,88 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. //! Sub-module managing the inter-modules responses received. + +use crate::ws_connections::responses::{WS2Pv1ReqRes, WS2Pv1ReqResBody}; +use crate::*; + +pub fn receive_response( + ws2p_module: &mut WS2Pv1Module, + req_id: ModuleReqId, + res_content: &DursResContent, +) { + if let DursResContent::BlockchainResponse(ref bc_res) = *res_content { + match *bc_res.deref() { + BlockchainResponse::CurrentBlockstamp(ref current_blockstamp_) => { + debug!( + "WS2Pv1Module : receive DALResBc::CurrentBlockstamp({})", + ws2p_module.current_blockstamp + ); + ws2p_module.current_blockstamp = *current_blockstamp_; + if ws2p_module.my_head.is_none() { + ws2p_module.my_head = Some(heads::generate_my_head( + &ws2p_module.key_pair, + ws2p_module.node_id, + ws2p_module.soft_name, + ws2p_module.soft_version, + &ws2p_module.current_blockstamp, + None, + )); + } + let event = NetworkEvent::ReceiveHeads(vec![unwrap!(ws2p_module.my_head.clone())]); + events::sent::send_network_event(ws2p_module, event); + } + BlockchainResponse::UIDs(ref uids) => { + // Add uids to heads + for head in ws2p_module.heads_cache.values_mut() { + if let Some(uid_option) = uids.get(&head.pubkey()) { + if let Some(ref uid) = *uid_option { + head.set_uid(uid); + ws2p_module + .uids_cache + .insert(head.pubkey(), uid.to_string()); + } else { + ws2p_module.uids_cache.remove(&head.pubkey()); + } + } + } + // Resent heads to other modules + let event = + NetworkEvent::ReceiveHeads(ws2p_module.heads_cache.values().cloned().collect()); + events::sent::send_network_event(ws2p_module, event); + // Resent to other modules connections that match receive uids + let events = ws2p_module + .ws2p_endpoints + .iter() + .filter_map(|(node_full_id, DbEndpoint { ep, state, .. })| { + if let Some(uid_option) = uids.get(&node_full_id.1) { + Some(NetworkEvent::ConnectionStateChange( + *node_full_id, + *state as u32, + uid_option.clone(), + ep.get_url(false, false).expect("Endpoint unreachable !"), + )) + } else { + None + } + }) + .collect(); + events::sent::send_network_events(ws2p_module, events); + } + BlockchainResponse::CurrentBlock(ref block_box, _blockstamp) => { + if let Some(ws2p_req_full_id) = + ws2p_module.pending_received_requests.remove(&req_id) + { + ws_connections::responses::sent::send_response( + ws2p_module, + ws2p_req_full_id.from, + WS2Pv1ReqRes { + req_id: ws2p_req_full_id.req_id, + body: WS2Pv1ReqResBody::GetCurrent(block_box.deref().clone()), + }, + ) + } + } + _ => {} // Others BlockchainResponse variants + } + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializer.rs b/lib/modules/ws2p-v1-legacy/src/serializer.rs deleted file mode 100644 index afac981c..00000000 --- a/lib/modules/ws2p-v1-legacy/src/serializer.rs +++ /dev/null @@ -1,20 +0,0 @@ -use durs_common_tools::fatal_error; -use durs_network_documents::network_head::*; - -use std::ops::Deref; - -pub fn serialize_head(head: NetworkHead) -> serde_json::Value { - match head { - NetworkHead::V2(box_head_v2) => { - let head_v2 = box_head_v2.deref(); - json!({ - "message": head_v2.message.to_string(), - "sig": head_v2.sig.to_string(), - "messageV2": head_v2.message_v2.to_string(), - "sigV2": head_v2.sig_v2.to_string(), - "step": head_v2.step + 1 - }) - } - _ => fatal_error!("HEAD version not supported !"), - } -} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/block.rs b/lib/modules/ws2p-v1-legacy/src/serializers/block.rs new file mode 100644 index 00000000..a896d954 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/block.rs @@ -0,0 +1,92 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize BlockDocument into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use dubp_documents::documents::block::BlockDocumentStringified; + +impl IntoWS2Pv1Json for BlockDocumentStringified { + fn into_ws2p_v1_json(self) -> serde_json::Value { + let actives = self + .actives + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + let certifications = self + .certifications + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + let identities = self + .identities + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + let joiners = self + .joiners + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + let leavers = self + .leavers + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + let revoked = self + .revoked + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + let transactions = self + .transactions + .into_iter() + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect::<Vec<serde_json::Value>>(); + + json!( { + "actives": actives, + "certifications": certifications, + "currency": self.currency, + "dividend": null, + "excluded": self.excluded, + "fork": false, + "hash": self.hash, + "identities": identities, + "inner_hash": self.inner_hash, + "issuer": self.issuers[0], + "issuersCount": self.issuers_count, + "issuersFrame": self.issuers_frame, + "issuersFrameVar": self.issuers_frame_var, + "joiners": joiners, + "leavers": leavers, + "medianTime": self.median_time, + "membersCount": self.members_count, + "monetaryMass": self.monetary_mass, + "nonce": self.nonce, + "number": self.number, + "parameters": self.parameters.unwrap_or_else(|| "".to_owned()), + "powMin": self.pow_min, + "previousHash": self.previous_hash, + "previousIssuer": self.previous_issuer, + "revoked": revoked, + "signature": self.signatures[0], + "time": self.time, + "transactions": transactions, + "unitbase": self.unit_base, + "version": 10 + }) + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/certification.rs b/lib/modules/ws2p-v1-legacy/src/serializers/certification.rs new file mode 100644 index 00000000..e8480765 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/certification.rs @@ -0,0 +1,32 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize CompactCertificationStringDocument into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use dubp_documents::documents::certification::CompactCertificationStringDocument; + +impl IntoWS2Pv1Json for CompactCertificationStringDocument { + fn into_ws2p_v1_json(self) -> serde_json::Value { + format!( + "{issuer}:{target}:{block_number}:{signature}", + issuer = self.issuer, + target = self.target, + block_number = self.block_number, + signature = self.signature, + ) + .into() + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/head.rs b/lib/modules/ws2p-v1-legacy/src/serializers/head.rs new file mode 100644 index 00000000..4c149c55 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/head.rs @@ -0,0 +1,39 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize HEAD into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use durs_common_tools::fatal_error; +use durs_network_documents::network_head::*; +use std::ops::Deref; + +impl IntoWS2Pv1Json for NetworkHead { + fn into_ws2p_v1_json(self) -> serde_json::Value { + match self { + NetworkHead::V2(box_head_v2) => { + let head_v2 = box_head_v2.deref(); + json!({ + "message": head_v2.message.to_string(), + "sig": head_v2.sig.to_string(), + "messageV2": head_v2.message_v2.to_string(), + "sigV2": head_v2.sig_v2.to_string(), + "step": head_v2.step + 1 + }) + } + _ => fatal_error!("HEAD version not supported !"), + } + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/identity.rs b/lib/modules/ws2p-v1-legacy/src/serializers/identity.rs new file mode 100644 index 00000000..9bafb30c --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/identity.rs @@ -0,0 +1,29 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize IdentityStringDocument into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use dubp_documents::documents::identity::IdentityStringDocument; + +impl IntoWS2Pv1Json for IdentityStringDocument { + fn into_ws2p_v1_json(self) -> serde_json::Value { + format!( + "{}:{}:{}:{}", + self.issuer, self.signature, self.blockstamp, self.username + ) + .into() + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/membership.rs b/lib/modules/ws2p-v1-legacy/src/serializers/membership.rs new file mode 100644 index 00000000..58076388 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/membership.rs @@ -0,0 +1,33 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize MembershipStringDocument into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use dubp_documents::documents::membership::MembershipStringDocument; + +impl IntoWS2Pv1Json for MembershipStringDocument { + fn into_ws2p_v1_json(self) -> serde_json::Value { + format!( + "{issuer}:{signature}:{blockstamp}:{idty_blockstamp}:{username}", + issuer = self.issuer, + signature = self.signature, + blockstamp = self.blockstamp, + idty_blockstamp = self.identity_blockstamp, + username = self.username, + ) + .into() + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/mod.rs b/lib/modules/ws2p-v1-legacy/src/serializers/mod.rs new file mode 100644 index 00000000..19a9d304 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/mod.rs @@ -0,0 +1,30 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize into WS2Pv1 json format + +pub mod block; +pub mod certification; +pub mod head; +pub mod identity; +pub mod membership; +pub mod revoked; +pub mod transaction; + +/// Into WS2pv1 JSON format +pub trait IntoWS2Pv1Json { + /// Into WS2pv1 JSON format + fn into_ws2p_v1_json(self) -> serde_json::Value; +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/revoked.rs b/lib/modules/ws2p-v1-legacy/src/serializers/revoked.rs new file mode 100644 index 00000000..f34974b2 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/revoked.rs @@ -0,0 +1,25 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize CompactRevocationStringDocument into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use dubp_documents::documents::revocation::CompactRevocationStringDocument; + +impl IntoWS2Pv1Json for CompactRevocationStringDocument { + fn into_ws2p_v1_json(self) -> serde_json::Value { + format!("{}:{}", self.issuer, self.signature,).into() + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/serializers/transaction.rs b/lib/modules/ws2p-v1-legacy/src/serializers/transaction.rs new file mode 100644 index 00000000..72e9f3eb --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/serializers/transaction.rs @@ -0,0 +1,38 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module that serialize TransactionDocument into WS2Pv1 json format + +use super::IntoWS2Pv1Json; +use dubp_documents::documents::transaction::TransactionDocumentStringified; + +impl IntoWS2Pv1Json for TransactionDocumentStringified { + fn into_ws2p_v1_json(self) -> serde_json::Value { + json!( { + "blockstamp": self.blockstamp, + "blockstampTime": 0, + "comment": self.comment, + "currency": self.currency, + "hash": self.hash, + "inputs": self.inputs, + "issuers": self.issuers, + "locktime": self.locktime, + "outputs": self.outputs, + "signatures": self.signatures, + "unlocks": self.unlocks, + "version": 10 + }) + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs index d53b59ea..08422feb 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/handler.rs @@ -104,12 +104,10 @@ impl Handler for Client { // Send ws::Sender to WS2PConductor let result = self .conductor_sender - .send(WS2PThreadSignal::WS2PConnectionMessage( - WS2PConnectionMessage( - self.conn_meta_datas.node_full_id(), - WS2PConnectionMessagePayload::WebsocketOk(WsSender(self.ws.clone())), - ), - )); + .send(WS2PThreadSignal::WS2Pv1Msg(WS2Pv1Msg { + from: self.conn_meta_datas.node_full_id(), + payload: WS2Pv1MsgPayload::WebsocketOk(WsSender(self.ws.clone())), + })); // If WS2PConductor is unrechable, close connection. if result.is_err() { debug!("Close ws2p connection because ws2p main thread is unrechable !"); @@ -150,21 +148,19 @@ impl Handler for Client { let s: String = msg .into_text() .expect("WS2P: Fail to convert message payload to String !"); - trace!("WS2P: receive mess: {}", s); + debug!("WS2P: receive mess: {}", s); let json_message: serde_json::Value = serde_json::from_str(&s) .expect("WS2P: Fail to convert string message ton json value !"); let result = self .conductor_sender - .send(WS2PThreadSignal::WS2PConnectionMessage( - WS2PConnectionMessage( - self.conn_meta_datas.node_full_id(), - self.conn_meta_datas.parse_and_check_incoming_message( - &self.currency, - self.key_pair, - &json_message, - ), + .send(WS2PThreadSignal::WS2Pv1Msg(WS2Pv1Msg { + from: self.conn_meta_datas.node_full_id(), + payload: self.conn_meta_datas.parse_and_check_incoming_message( + &self.currency, + self.key_pair, + &json_message, ), - )); + })); if result.is_err() { info!("Close ws2p connection because ws2p main thread is unrechable !"); self.ws.close(CloseCode::Normal)?; @@ -178,12 +174,10 @@ impl Handler for Client { if self.conn_meta_datas.state != WS2PConnectionState::Established { let _result = self.conductor_sender - .send(WS2PThreadSignal::WS2PConnectionMessage( - WS2PConnectionMessage( - self.conn_meta_datas.node_full_id(), - WS2PConnectionMessagePayload::NegociationTimeout, - ), - )); + .send(WS2PThreadSignal::WS2Pv1Msg(WS2Pv1Msg { + from: self.conn_meta_datas.node_full_id(), + payload: WS2Pv1MsgPayload::NegociationTimeout, + })); self.ws.close(CloseCode::Away) } else { Ok(()) @@ -192,12 +186,10 @@ impl Handler for Client { EXPIRE => { let _result = self .conductor_sender - .send(WS2PThreadSignal::WS2PConnectionMessage( - WS2PConnectionMessage( - self.conn_meta_datas.node_full_id(), - WS2PConnectionMessagePayload::Timeout, - ), - )); + .send(WS2PThreadSignal::WS2Pv1Msg(WS2Pv1Msg { + from: self.conn_meta_datas.node_full_id(), + payload: WS2Pv1MsgPayload::Timeout, + })); self.ws.close(CloseCode::Away) } _ => Ok(()), @@ -231,11 +223,9 @@ impl Handler for Client { } let _result = self .conductor_sender - .send(WS2PThreadSignal::WS2PConnectionMessage( - WS2PConnectionMessage( - self.conn_meta_datas.node_full_id(), - WS2PConnectionMessagePayload::Close, - ), - )); + .send(WS2PThreadSignal::WS2Pv1Msg(WS2Pv1Msg { + from: self.conn_meta_datas.node_full_id(), + payload: WS2Pv1MsgPayload::Close, + })); } } diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/messages.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/messages.rs index 158b2530..994a5b08 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/messages.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/messages.rs @@ -16,16 +16,20 @@ //! Define ws2p connections messages. use super::*; +use crate::ws_connections::requests::WS2Pv1ReqBody; use durs_network_documents::NodeFullId; use ws::Message; #[derive(Debug)] -/// WS2Pv1 connection Message -pub struct WS2PConnectionMessage(pub NodeFullId, pub WS2PConnectionMessagePayload); +/// WS2Pv1 Message +pub struct WS2Pv1Msg { + pub from: NodeFullId, + pub payload: WS2Pv1MsgPayload, +} #[derive(Debug)] -/// WS2Pv1 connection Message payload -pub enum WS2PConnectionMessagePayload { +/// WS2Pv1 Message payload +pub enum WS2Pv1MsgPayload { FailOpenWS, WrongUrl, FailToSplitWS, @@ -36,11 +40,14 @@ pub enum WS2PConnectionMessagePayload { ValidConnectMessage(String, WS2PConnectionState), ValidAckMessage(String, WS2PConnectionState), ValidOk(WS2PConnectionState), - DalRequest(ModuleReqId, serde_json::Value), + Request { + req_id: WS2Pv1ReqId, + body: WS2Pv1ReqBody, + }, PeerCard(serde_json::Value, Vec<EndpointV1>), Heads(Vec<serde_json::Value>), Document(BlockchainDocument), - ReqResponse(ModuleReqId, serde_json::Value), + ReqResponse(WS2Pv1ReqId, serde_json::Value), InvalidMessage, WrongFormatMessage, UnknowMessage, @@ -66,15 +73,17 @@ pub fn generate_connect_message( ) } -pub fn ws2p_conn_message_pretreatment( +pub fn ws2p_recv_message_pretreatment( ws2p_module: &mut WS2Pv1Module, - message: WS2PConnectionMessage, + message: WS2Pv1Msg, ) -> WS2PSignal { - let ws2p_full_id = message.0; - match message.1 { - WS2PConnectionMessagePayload::WrongUrl - | WS2PConnectionMessagePayload::FailOpenWS - | WS2PConnectionMessagePayload::FailToSplitWS => { + check_timeout_requests(ws2p_module); + + let ws2p_full_id = message.from; + match message.payload { + WS2Pv1MsgPayload::WrongUrl + | WS2Pv1MsgPayload::FailOpenWS + | WS2Pv1MsgPayload::FailToSplitWS => { let dal_ep = ws2p_module .ws2p_endpoints .get_mut(&ws2p_full_id) @@ -83,14 +92,14 @@ pub fn ws2p_conn_message_pretreatment( dal_ep.last_check = durs_common_tools::fns::time::current_timestamp(); return WS2PSignal::WSError(ws2p_full_id); } - WS2PConnectionMessagePayload::TryToSendConnectMess => { + WS2Pv1MsgPayload::TryToSendConnectMess => { ws2p_module .ws2p_endpoints .get_mut(&ws2p_full_id) .expect("WS2P: Fail to get mut ep !") .state = WS2PConnectionState::TryToSendConnectMess; } - WS2PConnectionMessagePayload::FailSendConnectMess => { + WS2Pv1MsgPayload::FailSendConnectMess => { let dal_ep = ws2p_module .ws2p_endpoints .get_mut(&ws2p_full_id) @@ -98,10 +107,10 @@ pub fn ws2p_conn_message_pretreatment( dal_ep.state = WS2PConnectionState::Unreachable; dal_ep.last_check = durs_common_tools::fns::time::current_timestamp(); } - WS2PConnectionMessagePayload::WebsocketOk(sender) => { + WS2Pv1MsgPayload::WebsocketOk(sender) => { ws2p_module.websockets.insert(ws2p_full_id, sender); } - WS2PConnectionMessagePayload::ValidConnectMessage(response, new_con_state) => { + WS2Pv1MsgPayload::ValidConnectMessage(response, new_con_state) => { ws2p_module .ws2p_endpoints .get_mut(&ws2p_full_id) @@ -122,7 +131,7 @@ pub fn ws2p_conn_message_pretreatment( dal_ep.last_check = durs_common_tools::fns::time::current_timestamp(); } } - WS2PConnectionMessagePayload::ValidAckMessage(response, new_con_state) => { + WS2Pv1MsgPayload::ValidAckMessage(response, new_con_state) => { ws2p_module .ws2p_endpoints .get_mut(&ws2p_full_id) @@ -140,7 +149,7 @@ pub fn ws2p_conn_message_pretreatment( } } } - WS2PConnectionMessagePayload::ValidOk(new_con_state) => { + WS2Pv1MsgPayload::ValidOk(new_con_state) => { ws2p_module .ws2p_endpoints .get_mut(&ws2p_full_id) @@ -165,13 +174,17 @@ pub fn ws2p_conn_message_pretreatment( return signal; } - WS2PConnectionMessagePayload::DalRequest(req_id, req_body) => { - return WS2PSignal::DalRequest(ws2p_full_id, req_id, req_body); + WS2Pv1MsgPayload::Request { req_id, body } => { + return WS2PSignal::Request { + from: ws2p_full_id, + req_id, + body, + }; } - WS2PConnectionMessagePayload::PeerCard(body, ws2p_endpoints) => { + WS2Pv1MsgPayload::PeerCard(body, ws2p_endpoints) => { return WS2PSignal::PeerCard(ws2p_full_id, body, ws2p_endpoints); } - WS2PConnectionMessagePayload::Heads(heads) => { + WS2Pv1MsgPayload::Heads(heads) => { let mut applied_heads = Vec::with_capacity(heads.len()); for head in heads { if let Ok(head) = NetworkHead::from_json_value(&head) { @@ -191,24 +204,26 @@ pub fn ws2p_conn_message_pretreatment( } return WS2PSignal::Heads(ws2p_full_id, applied_heads); } - WS2PConnectionMessagePayload::Document(network_doc) => { + WS2Pv1MsgPayload::Document(network_doc) => { return WS2PSignal::Document(ws2p_full_id, network_doc); } - WS2PConnectionMessagePayload::ReqResponse(req_id, response) => { - if ws2p_module.requests_awaiting_response.len() > req_id.0 as usize { - if let Some((ref ws2p_request, ref recipient_fulld_id, ref _timestamp)) = - ws2p_module.requests_awaiting_response.remove(&req_id) - { - return WS2PSignal::ReqResponse( - req_id, - *ws2p_request, - *recipient_fulld_id, - response, - ); - } + WS2Pv1MsgPayload::ReqResponse(ws2p_req_id, response) => { + if let Some(WS2Pv1PendingReqInfos { + ref requester_module, + ref req_body, + ref recipient_node, + .. + }) = ws2p_module.requests_awaiting_response.remove(&ws2p_req_id) + { + return WS2PSignal::ReqResponse( + *requester_module, + *req_body, + *recipient_node, + response, + ); } } - WS2PConnectionMessagePayload::NegociationTimeout => { + WS2Pv1MsgPayload::NegociationTimeout => { match ws2p_module.ws2p_endpoints[&ws2p_full_id].state { WS2PConnectionState::AckMessOk | WS2PConnectionState::ConnectMessOk => { ws2p_module @@ -240,7 +255,7 @@ pub fn ws2p_conn_message_pretreatment( ); return WS2PSignal::NegociationTimeout(ws2p_full_id); } - WS2PConnectionMessagePayload::Timeout => { + WS2Pv1MsgPayload::Timeout => { close_connection( ws2p_module, &ws2p_full_id, @@ -248,15 +263,13 @@ pub fn ws2p_conn_message_pretreatment( ); return WS2PSignal::Timeout(ws2p_full_id); } - WS2PConnectionMessagePayload::UnknowMessage => { - warn!("WS2P : Receive Unknow Message from {}.", &ws2p_full_id.1) - } - WS2PConnectionMessagePayload::WrongFormatMessage => warn!( + WS2Pv1MsgPayload::UnknowMessage => {} + WS2Pv1MsgPayload::WrongFormatMessage => warn!( "WS2P : Receive Wrong Format Message from {}.", &ws2p_full_id.1 ), - WS2PConnectionMessagePayload::InvalidMessage => return WS2PSignal::Empty, - WS2PConnectionMessagePayload::Close => close_connection( + WS2Pv1MsgPayload::InvalidMessage => return WS2PSignal::Empty, + WS2Pv1MsgPayload::Close => close_connection( ws2p_module, &ws2p_full_id, WS2PCloseConnectionReason::AuthMessInvalidSig, @@ -266,26 +279,26 @@ pub fn ws2p_conn_message_pretreatment( if connections_count == 0 { return WS2PSignal::NoConnection; } + WS2PSignal::Empty +} + +fn check_timeout_requests(ws2p_module: &mut WS2Pv1Module) { // Detect timeout requests let mut requests_timeout = Vec::new(); - for &(ref req, ref ws2p_full_id, ref timestamp) in - ws2p_module.requests_awaiting_response.clone().values() - { - if unwrap!(SystemTime::now().duration_since(*timestamp)) > Duration::new(20, 0) { - requests_timeout.push(req.get_req_full_id()); - warn!("request timeout : {:?} (sent to {:?})", req, ws2p_full_id); + + for (ws2p_req_id, pending_req_infos) in ws2p_module.requests_awaiting_response.iter() { + if unwrap!(SystemTime::now().duration_since(pending_req_infos.timestamp)) + > Duration::from_secs(*WS2P_V1_REQUESTS_TIMEOUT_IN_SECS) + { + requests_timeout.push(*ws2p_req_id); + warn!( + "request timeout : {:?} (sent to {:?})", + pending_req_infos.req_body, pending_req_infos.recipient_node + ); } } - // Delete (and resend) timeout requests - for req_id in requests_timeout { - //let ws2p_endpoints = ws2p_module.ws2p_endpoints.clone(); - let _request_option = ws2p_module.requests_awaiting_response.remove(&req_id.1); - /*if let Some((request, _, _)) = request_option { - let _request_result = ws2p_module.send_request_to_specific_node( - &get_random_connection(&ws2p_endpoints), - &request, - ); - }*/ + // Delete timeout requests + for ws2p_req_id in requests_timeout { + let _request_option = ws2p_module.requests_awaiting_response.remove(&ws2p_req_id); } - WS2PSignal::Empty } diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/meta_datas.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/meta_datas.rs index 33d16d32..51a98b82 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/meta_datas.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/meta_datas.rs @@ -15,15 +15,17 @@ //! WS2P connections meta datas. -use super::messages::WS2PConnectionMessagePayload; +use super::messages::WS2Pv1MsgPayload; use super::states::WS2PConnectionState; use crate::parsers::blocks::parse_json_block; +use crate::ws_connections::requests::{WS2Pv1ReqBody, WS2Pv1ReqId}; use crate::*; use dup_crypto::keys::*; -use durs_module::ModuleReqId; use durs_network::documents::BlockchainDocument; use durs_network_documents::network_endpoint::{ApiName, EndpointV1}; use durs_network_documents::NodeId; +use std::convert::TryFrom; + #[allow(deprecated)] #[derive(Debug, Clone)] pub struct WS2PConnectionMetaDatas { @@ -60,13 +62,13 @@ impl WS2PConnectionMetaDatas { &mut self, currency: &str, key_pair: KeyPairEnum, - m: &serde_json::Value, - ) -> WS2PConnectionMessagePayload { - if let Some(s) = m.get("auth") { + msg: &serde_json::Value, + ) -> WS2Pv1MsgPayload { + if let Some(s) = msg.get("auth") { if s.is_string() { match s.as_str().unwrap_or("") { "CONNECT" => { - let message = WS2PConnectMessageV1::parse(m, currency.to_string()) + let message = WS2PConnectMessageV1::parse(msg, currency.to_string()) .expect("Failed to parsing CONNECT Message !"); if message.verify() && message.pubkey == unwrap!(self.remote_pubkey) { match self.state { @@ -81,19 +83,19 @@ impl WS2PConnectionMetaDatas { signature: None, }; response.signature = Some(response.sign(key_pair)); - return WS2PConnectionMessagePayload::ValidConnectMessage( + return WS2Pv1MsgPayload::ValidConnectMessage( unwrap!(serde_json::to_string(&response)), self.state, ); } - _ => return WS2PConnectionMessagePayload::InvalidMessage, + _ => return WS2Pv1MsgPayload::InvalidMessage, } } else { warn!("The signature of message CONNECT is invalid !") } } "ACK" => { - let mut message = WS2PAckMessageV1::parse(m, currency.to_string()) + let mut message = WS2PAckMessageV1::parse(msg, currency.to_string()) .expect("Failed to parsing ACK Message !"); message.challenge = self.challenge.to_string(); if message.verify() { @@ -105,7 +107,7 @@ impl WS2PConnectionMetaDatas { WS2PConnectionState::OkMessOkWaitingAckMess => { WS2PConnectionState::Established } - _ => return WS2PConnectionMessagePayload::InvalidMessage, + _ => return WS2Pv1MsgPayload::InvalidMessage, }; let mut response = WS2POkMessageV1 { currency: currency.to_string(), @@ -114,7 +116,7 @@ impl WS2PConnectionMetaDatas { signature: None, }; response.signature = Some(response.sign(key_pair)); - return WS2PConnectionMessagePayload::ValidAckMessage( + return WS2Pv1MsgPayload::ValidAckMessage( unwrap!(serde_json::to_string(&response)), self.state, ); @@ -123,7 +125,7 @@ impl WS2PConnectionMetaDatas { } } "OK" => { - let mut message = WS2POkMessageV1::parse(m, currency.to_string()) + let mut message = WS2POkMessageV1::parse(msg, currency.to_string()) .expect("Failed to parsing OK Message !"); trace!("Received OK"); message.challenge = self.remote_challenge.to_string(); @@ -133,7 +135,7 @@ impl WS2PConnectionMetaDatas { match self.state { WS2PConnectionState::ConnectMessOk => { self.state = WS2PConnectionState::OkMessOkWaitingAckMess; - return WS2PConnectionMessagePayload::ValidOk(self.state); + return WS2Pv1MsgPayload::ValidOk(self.state); } WS2PConnectionState::AckMessOk => { info!( @@ -141,69 +143,83 @@ impl WS2PConnectionMetaDatas { self.remote_pubkey.expect("fail to get remote pubkey !") ); self.state = WS2PConnectionState::Established; - return WS2PConnectionMessagePayload::ValidOk(self.state); + return WS2Pv1MsgPayload::ValidOk(self.state); } _ => { warn!("WS2P Error : OK message not expected !"); - return WS2PConnectionMessagePayload::InvalidMessage; + return WS2Pv1MsgPayload::InvalidMessage; } } } else { warn!("The signature of message OK is invalid !"); - return WS2PConnectionMessagePayload::InvalidMessage; + return WS2Pv1MsgPayload::InvalidMessage; } } &_ => debug!("unknow message"), }; } }; - if let Some(req_id) = m.get("reqId") { + if let Some(req_id) = msg.get("reqId") { match req_id.as_str() { - Some(req_id) => match m.get("body") { + Some(req_id) => match msg.get("body") { Some(body) => { trace!("WS2P : Receive DAL Request from {}.", self.node_full_id()); - match u32::from_str_radix(req_id, 16) { - Ok(req_id) => { - return WS2PConnectionMessagePayload::DalRequest( - ModuleReqId(req_id), - body.clone(), + + let req_id = match WS2Pv1ReqId::from_str(req_id) { + Ok(req_id) => req_id, + Err(_) => { + warn!( + "WS2Pv1: receive invalid request: invalid req_id: '{}'", + req_id ); + return WS2Pv1MsgPayload::WrongFormatMessage; + } + }; + + match WS2Pv1ReqBody::try_from(body) { + Ok(body) => { + return WS2Pv1MsgPayload::Request { req_id, body }; + } + Err(_) => { + return WS2Pv1MsgPayload::WrongFormatMessage; } - Err(_) => return WS2PConnectionMessagePayload::WrongFormatMessage, } } None => { warn!("WS2P Error : invalid format : Request must contain a field body !"); - return WS2PConnectionMessagePayload::WrongFormatMessage; + return WS2Pv1MsgPayload::WrongFormatMessage; } }, None => { warn!("WS2P Error : invalid format : Request must contain a field body !"); - return WS2PConnectionMessagePayload::WrongFormatMessage; + return WS2Pv1MsgPayload::WrongFormatMessage; } } } - if let Some(req_id) = m.get("resId") { + if let Some(req_id) = msg.get("resId") { match req_id.as_str() { - Some(req_id_str) => match m.get("body") { - Some(body) => match u32::from_str_radix(req_id_str, 16) { + Some(req_id_str) => match msg.get("body") { + Some(body) => match WS2Pv1ReqId::from_str(req_id_str) { Ok(req_id) => { - return WS2PConnectionMessagePayload::ReqResponse( - ModuleReqId(req_id), - body.clone(), - ); + return WS2Pv1MsgPayload::ReqResponse(req_id, body.clone()); + } + Err(_) => { + return WS2Pv1MsgPayload::WrongFormatMessage; } - Err(_) => return WS2PConnectionMessagePayload::WrongFormatMessage, }, - None => match m.get("err") { + None => match msg.get("err") { Some(err) => warn!("Error in req : {:?}", err), - None => return WS2PConnectionMessagePayload::WrongFormatMessage, + None => { + return WS2Pv1MsgPayload::WrongFormatMessage; + } }, }, - None => return WS2PConnectionMessagePayload::WrongFormatMessage, + None => { + return WS2Pv1MsgPayload::WrongFormatMessage; + } } } - if let Some(body) = m.get("body") { + if let Some(body) = msg.get("body") { match body.get("name") { Some(s) => { if s.is_string() { @@ -211,27 +227,23 @@ impl WS2PConnectionMetaDatas { "BLOCK" => match body.get("block") { Some(block) => { if let Some(block_doc) = parse_json_block(&block) { - return WS2PConnectionMessagePayload::Document( + return WS2Pv1MsgPayload::Document( BlockchainDocument::Block(Box::new(block_doc)), ); } else { info!("WS2PSignal: receive invalid block (wrong format)."); }; } - None => return WS2PConnectionMessagePayload::WrongFormatMessage, + None => return WS2Pv1MsgPayload::WrongFormatMessage, }, "HEAD" => match body.get("heads") { Some(heads) => match heads.as_array() { Some(heads_array) => { - return WS2PConnectionMessagePayload::Heads( - heads_array.clone(), - ); - } - None => { - return WS2PConnectionMessagePayload::WrongFormatMessage + return WS2Pv1MsgPayload::Heads(heads_array.clone()); } + None => return WS2Pv1MsgPayload::WrongFormatMessage, }, - None => return WS2PConnectionMessagePayload::WrongFormatMessage, + None => return WS2Pv1MsgPayload::WrongFormatMessage, }, "PEER" => return self.parse_and_check_peer_message(body), "CERTIFICATION" => { @@ -239,33 +251,53 @@ impl WS2PConnectionMetaDatas { "WS2P : Receive CERTIFICATION from {}.", self.node_full_id() ); - /*return WS2PConnectionMessagePayload::Document( + /*return WS2Pv1MsgPayload::Document( BlockchainDocument::Certification(_) );*/ } - _ => { - /*trace!( - "WS2P : Receive Unknow Message from {}.", - self.node_full_id() + "IDENTITY" => { + trace!("WS2P : Receive IDENTITY from {}.", self.node_full_id()); + /*return WS2Pv1MsgPayload::Document( + BlockchainDocument::Identity(_) );*/ - return WS2PConnectionMessagePayload::UnknowMessage; + } + "MEMBERSHIP" => { + trace!("WS2P : Receive MEMBERSHIP from {}.", self.node_full_id()); + /*return WS2Pv1MsgPayload::Document( + BlockchainDocument::Membership(_) + );*/ + } + "TRANSACTION" => { + trace!("WS2P : Receive TRANSACTION from {}.", self.node_full_id()); + /*return WS2Pv1MsgPayload::Document( + BlockchainDocument::Transaction(_) + );*/ + } + name => { + warn!( + "WS2P : Receive unknown document name '{}' from '{}'.", + name, + self.node_full_id() + ); + return WS2Pv1MsgPayload::UnknowMessage; } }; } } None => { warn!("WS2P Error : invalid format : Body must contain a field name !"); - return WS2PConnectionMessagePayload::WrongFormatMessage; + return WS2Pv1MsgPayload::WrongFormatMessage; } } }; - WS2PConnectionMessagePayload::UnknowMessage + debug!( + "WS2P : Receive unknown message from '{}'.", + self.node_full_id() + ); + WS2Pv1MsgPayload::UnknowMessage } - pub fn parse_and_check_peer_message( - &mut self, - body: &serde_json::Value, - ) -> WS2PConnectionMessagePayload { + pub fn parse_and_check_peer_message(&mut self, body: &serde_json::Value) -> WS2Pv1MsgPayload { match body.get("peer") { Some(peer) => match peer.get("pubkey") { Some(raw_pubkey) => { @@ -287,22 +319,19 @@ impl WS2PConnectionMetaDatas { } } } - WS2PConnectionMessagePayload::PeerCard( - body.clone(), - ws2p_endpoints, - ) + WS2Pv1MsgPayload::PeerCard(body.clone(), ws2p_endpoints) } - None => WS2PConnectionMessagePayload::WrongFormatMessage, + None => WS2Pv1MsgPayload::WrongFormatMessage, }, - None => WS2PConnectionMessagePayload::WrongFormatMessage, + None => WS2Pv1MsgPayload::WrongFormatMessage, } } - Err(_) => WS2PConnectionMessagePayload::WrongFormatMessage, + Err(_) => WS2Pv1MsgPayload::WrongFormatMessage, } } - None => WS2PConnectionMessagePayload::WrongFormatMessage, + None => WS2Pv1MsgPayload::WrongFormatMessage, }, - None => WS2PConnectionMessagePayload::WrongFormatMessage, + None => WS2Pv1MsgPayload::WrongFormatMessage, } } } diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/mod.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/mod.rs index 0e302806..eb35567f 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/mod.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/mod.rs @@ -19,11 +19,11 @@ pub mod handler; pub mod messages; mod meta_datas; pub mod requests; +pub mod responses; pub mod states; use crate::*; use dup_crypto::keys::*; -use durs_module::ModuleReqId; use durs_network::documents::BlockchainDocument; use durs_network_documents::network_endpoint::EndpointV1; use rand::Rng; @@ -170,19 +170,17 @@ pub fn close_connection( } pub fn get_random_connection<S: ::std::hash::BuildHasher>( - connections: &HashMap<NodeFullId, (EndpointV1, WS2PConnectionState), S>, + connections: HashSet<&NodeFullId, S>, ) -> NodeFullId { let mut rng = rand::thread_rng(); let mut loop_count = 0; loop { - for (ws2p_full_id, (_ep, state)) in &(*connections) { + for ws2p_full_id in &connections { if loop_count > 10 { - return *ws2p_full_id; + return **ws2p_full_id; } - if let WS2PConnectionState::Established = state { - if rng.gen::<bool>() { - return *ws2p_full_id; - } + if rng.gen::<bool>() { + return **ws2p_full_id; } } loop_count += 1; diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/mod.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/mod.rs index 3dbfe5e9..0daccbb5 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/mod.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/mod.rs @@ -17,3 +17,220 @@ pub mod received; pub mod sent; + +use dubp_documents::BlockNumber; +use durs_network_documents::NodeFullId; +use serde::Serialize; +use std::convert::TryFrom; +use uuid::Uuid; + +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Serialize)] +pub struct WS2Pv1ReqId(pub Uuid); + +impl WS2Pv1ReqId { + #[inline] + pub fn random() -> Self { + WS2Pv1ReqId(Uuid::new_v4()) + } + #[inline] + pub fn to_hyphenated_string(&self) -> String { + self.0.to_hyphenated().to_string() + } +} + +impl std::str::FromStr for WS2Pv1ReqId { + type Err = uuid::parser::ParseError; + + fn from_str(source: &str) -> Result<Self, Self::Err> { + Ok(WS2Pv1ReqId(Uuid::parse_str(source)?)) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct WS2Pv1ReqFullId { + pub from: NodeFullId, + pub req_id: WS2Pv1ReqId, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +/// WS2Pv1 requet +pub struct WS2Pv1Request { + pub id: WS2Pv1ReqId, + pub body: WS2Pv1ReqBody, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +/// WS2Pv1 requets body +pub enum WS2Pv1ReqBody { + /// get current block + GetCurrent, + /// Get one block + GetBlock { + /// Block number + number: BlockNumber, + }, + /// Get a chunk of blocks + GetBlocks { + /// Number of blocks + count: u32, + /// First block number + from_number: BlockNumber, + }, + /// Get wot mempool + GetRequirementsPending { + /// The identities transmitted must have at least `minCert` certifications + min_cert: usize, + }, +} + +#[derive(Copy, Clone, Debug)] +pub struct WS2Pv1InvalidReqError; + +impl TryFrom<&serde_json::Value> for WS2Pv1ReqBody { + type Error = WS2Pv1InvalidReqError; + + fn try_from(json: &serde_json::Value) -> Result<WS2Pv1ReqBody, WS2Pv1InvalidReqError> { + let req_name = json.get("name").ok_or(WS2Pv1InvalidReqError)?; + match req_name.as_str().ok_or(WS2Pv1InvalidReqError)? { + "CURRENT" => Ok(WS2Pv1ReqBody::GetCurrent), + "BLOCK_BY_NUMBER" => { + let params = json + .get("params") + .ok_or(WS2Pv1InvalidReqError)? + .as_object() + .ok_or(WS2Pv1InvalidReqError)?; + let number = params + .get("number") + .ok_or(WS2Pv1InvalidReqError)? + .as_u64() + .ok_or(WS2Pv1InvalidReqError)?; + Ok(WS2Pv1ReqBody::GetBlock { + number: BlockNumber(u32::try_from(number).map_err(|_| WS2Pv1InvalidReqError)?), + }) + } + "BLOCKS_CHUNK" => { + let params = json + .get("params") + .ok_or(WS2Pv1InvalidReqError)? + .as_object() + .ok_or(WS2Pv1InvalidReqError)?; + let count = params + .get("count") + .ok_or(WS2Pv1InvalidReqError)? + .as_u64() + .ok_or(WS2Pv1InvalidReqError)?; + let from_number = params + .get("fromNumber") + .ok_or(WS2Pv1InvalidReqError)? + .as_u64() + .ok_or(WS2Pv1InvalidReqError)?; + Ok(WS2Pv1ReqBody::GetBlocks { + count: u32::try_from(count).map_err(|_| WS2Pv1InvalidReqError)?, + from_number: BlockNumber( + u32::try_from(from_number).map_err(|_| WS2Pv1InvalidReqError)?, + ), + }) + } + "WOT_REQUIREMENTS_OF_PENDING" => { + let params = json + .get("params") + .ok_or(WS2Pv1InvalidReqError)? + .as_object() + .ok_or(WS2Pv1InvalidReqError)?; + let min_cert = params + .get("minCert") + .ok_or(WS2Pv1InvalidReqError)? + .as_u64() + .ok_or(WS2Pv1InvalidReqError)?; + Ok(WS2Pv1ReqBody::GetRequirementsPending { + min_cert: usize::try_from(min_cert).map_err(|_| WS2Pv1InvalidReqError)?, + }) + } + _ => Err(WS2Pv1InvalidReqError), + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use serde_json::json; + + #[test] + fn parse_ws2p_v1_req_get_current() -> Result<(), WS2Pv1InvalidReqError> { + let json_req_body = json!({ + "name": "CURRENT", + "params": {} + }); + + let parsed_req = WS2Pv1ReqBody::try_from(&json_req_body)?; + + assert_eq!(WS2Pv1ReqBody::GetCurrent, parsed_req); + + Ok(()) + } + + #[test] + fn parse_ws2p_v1_req_get_block() -> Result<(), WS2Pv1InvalidReqError> { + let json_req_body = json!({ + "name": "BLOCK_BY_NUMBER", + "params": { + "number": 42, + } + }); + + let parsed_req = WS2Pv1ReqBody::try_from(&json_req_body)?; + + assert_eq!( + WS2Pv1ReqBody::GetBlock { + number: BlockNumber(42), + }, + parsed_req + ); + + Ok(()) + } + + #[test] + fn parse_ws2p_v1_req_get_blocks() -> Result<(), WS2Pv1InvalidReqError> { + let json_req_body = json!({ + "name": "BLOCKS_CHUNK", + "params": { + "count": 50, + "fromNumber": 100, + } + }); + + let parsed_req = WS2Pv1ReqBody::try_from(&json_req_body)?; + + assert_eq!( + WS2Pv1ReqBody::GetBlocks { + count: 50, + from_number: BlockNumber(100), + }, + parsed_req + ); + + Ok(()) + } + + #[test] + fn parse_ws2p_v1_req_get_requirements_pending() -> Result<(), WS2Pv1InvalidReqError> { + let json_req_body = json!({ + "name": "WOT_REQUIREMENTS_OF_PENDING", + "params": { + "minCert": 3, + } + }); + + let parsed_req = WS2Pv1ReqBody::try_from(&json_req_body)?; + + assert_eq!( + WS2Pv1ReqBody::GetRequirementsPending { min_cert: 3 }, + parsed_req + ); + + Ok(()) + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/received.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/received.rs index 8b137891..dc2294cd 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/received.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/received.rs @@ -1 +1,71 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. +//! Sub-module managing the WS2Pv1 requests received. + +use crate::requests::sent::send_dal_request; +use crate::ws_connections::requests::{WS2Pv1ReqBody, WS2Pv1ReqFullId, WS2Pv1ReqId}; +use crate::ws_connections::responses::{WS2Pv1ReqRes, WS2Pv1ReqResBody}; +use crate::WS2Pv1Module; +use durs_message::requests::BlockchainRequest; +use durs_network_documents::NodeFullId; + +pub fn receive_ws2p_v1_request( + ws2p_module: &mut WS2Pv1Module, + from: NodeFullId, + ws2p_req_id: WS2Pv1ReqId, + req_boby: WS2Pv1ReqBody, +) { + let module_req_id_opt = match req_boby { + WS2Pv1ReqBody::GetCurrent => Some(send_dal_request( + ws2p_module, + &BlockchainRequest::CurrentBlock, + )), + WS2Pv1ReqBody::GetBlock { number } => Some(send_dal_request( + ws2p_module, + &BlockchainRequest::BlockByNumber { + block_number: number, + }, + )), + WS2Pv1ReqBody::GetBlocks { from_number, count } => Some(send_dal_request( + ws2p_module, + &BlockchainRequest::Chunk { + first_block_number: from_number, + count, + }, + )), + WS2Pv1ReqBody::GetRequirementsPending { .. } => { + crate::ws_connections::responses::sent::send_response( + ws2p_module, + from, + WS2Pv1ReqRes { + req_id: ws2p_req_id, + body: WS2Pv1ReqResBody::GetRequirementsPending { identities: vec![] }, + }, + ); + None + } + }; + + if let Some(module_req_id) = module_req_id_opt { + ws2p_module.pending_received_requests.insert( + module_req_id, + WS2Pv1ReqFullId { + from, + req_id: ws2p_req_id, + }, + ); + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/sent.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/sent.rs index b0f027e6..d208dbb7 100644 --- a/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/sent.rs +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/requests/sent.rs @@ -15,25 +15,31 @@ //! Sub-module managing the WS2Pv1 requests sent. -use crate::WS2Pv1Module; -use durs_common_tools::fatal_error; -use durs_network::requests::OldNetworkRequest; +use super::{WS2Pv1ReqBody, WS2Pv1Request}; +use crate::{WS2Pv1Module, WS2Pv1PendingReqInfos}; +use durs_module::ModuleReqFullId; use durs_network_documents::NodeFullId; use std::time::SystemTime; use ws::Message; pub fn send_request_to_specific_node( ws2p_module: &mut WS2Pv1Module, + module_req_full_id: ModuleReqFullId, ws2p_full_id: &NodeFullId, - ws2p_request: &OldNetworkRequest, + ws2p_request: &WS2Pv1Request, ) -> ws::Result<()> { if let Some(ws) = ws2p_module.websockets.get_mut(ws2p_full_id) { let json_req = network_request_to_json(ws2p_request).to_string(); debug!("send request {} to {}", json_req, ws2p_full_id); ws.0.send(Message::text(json_req))?; ws2p_module.requests_awaiting_response.insert( - ws2p_request.get_req_id(), - (*ws2p_request, *ws2p_full_id, SystemTime::now()), + ws2p_request.id, + WS2Pv1PendingReqInfos { + req_body: ws2p_request.body, + requester_module: module_req_full_id, + recipient_node: *ws2p_full_id, + timestamp: SystemTime::now(), + }, ); } else { warn!("WS2P: Fail to get mut websocket !"); @@ -41,35 +47,25 @@ pub fn send_request_to_specific_node( Ok(()) } -pub fn network_request_to_json(request: &OldNetworkRequest) -> serde_json::Value { - let (request_id, request_type, request_params) = match *request { - OldNetworkRequest::GetCurrent(ref req_full_id) => (req_full_id.1, "CURRENT", json!({})), - OldNetworkRequest::GetBlocks(ref req_full_id, count, from_mumber) => ( - req_full_id.1, +pub fn network_request_to_json(request: &WS2Pv1Request) -> serde_json::Value { + let (request_type, request_params) = match request.body { + WS2Pv1ReqBody::GetCurrent => ("CURRENT", json!({})), + WS2Pv1ReqBody::GetBlock { ref number } => ("BLOCK_BY_NUMBER", json!({ "number": number })), + WS2Pv1ReqBody::GetBlocks { count, from_number } => ( "BLOCKS_CHUNK", json!({ "count": count, - "fromNumber": from_mumber + "fromNumber": from_number }), ), - OldNetworkRequest::GetRequirementsPending(ref req_full_id, min_cert) => ( - req_full_id.1, + WS2Pv1ReqBody::GetRequirementsPending { min_cert } => ( "WOT_REQUIREMENTS_OF_PENDING", json!({ "minCert": min_cert }), ), - OldNetworkRequest::GetConsensus(_) => { - fatal_error!("GetConsensus() request must be not convert to json !"); - } - OldNetworkRequest::GetHeadsCache(_) => { - fatal_error!("GetHeadsCache() request must be not convert to json !"); - } - OldNetworkRequest::GetEndpoints(_) => { - fatal_error!("GetEndpoints() request must be not convert to json !"); - } }; json!({ - "reqId": request_id, + "reqId": request.id.to_hyphenated_string(), "body" : { "name": request_type, "params": request_params diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/mod.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/mod.rs new file mode 100644 index 00000000..d293c19a --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/mod.rs @@ -0,0 +1,110 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module managing the WS2Pv1 responses sent and received. + +pub mod received; +pub mod sent; + +use crate::serializers::IntoWS2Pv1Json; +use crate::ws_connections::requests::WS2Pv1ReqId; +use dubp_documents::documents::block::BlockDocument; +use dubp_documents::ToStringObject; +use dup_crypto::keys::PubKey; + +/// WS2Pv1 request response +#[derive(Clone, Debug)] +pub struct WS2Pv1ReqRes { + /// WS2Pv1 request id + pub req_id: WS2Pv1ReqId, + /// WS2Pv1 request response body + pub body: WS2Pv1ReqResBody, +} + +impl Into<serde_json::Value> for WS2Pv1ReqRes { + fn into(self) -> serde_json::Value { + let mut map = serde_json::map::Map::with_capacity(2); + map.insert( + "resId".to_owned(), + self.req_id.to_hyphenated_string().into(), + ); + map.insert("body".to_owned(), self.body.into()); + serde_json::Value::Object(map) + } +} + +/// WS2Pv1 request response body +#[derive(Clone, Debug)] +pub enum WS2Pv1ReqResBody { + /// Response to request getCurrent + GetCurrent(BlockDocument), + // Response to request getBlock + GetBlock(BlockDocument), + // Response to request getBlocks + GetBlocks(Vec<BlockDocument>), + // Response to request getRequirementsPending + GetRequirementsPending { + identities: Vec<WS2Pv1IdentityRequirementsPending>, + }, +} + +impl Into<serde_json::Value> for WS2Pv1ReqResBody { + fn into(self) -> serde_json::Value { + match self { + WS2Pv1ReqResBody::GetCurrent(block_doc) => { + block_doc.to_string_object().into_ws2p_v1_json() + } + WS2Pv1ReqResBody::GetBlock(block_doc) => { + block_doc.to_string_object().into_ws2p_v1_json() + } + WS2Pv1ReqResBody::GetBlocks(blocks) => serde_json::Value::Array( + blocks + .iter() + .map(ToStringObject::to_string_object) + .map(IntoWS2Pv1Json::into_ws2p_v1_json) + .collect(), + ), + WS2Pv1ReqResBody::GetRequirementsPending { .. } => { + let mut map = serde_json::map::Map::with_capacity(1); + map.insert("identities".to_owned(), serde_json::Value::Array(vec![])); + serde_json::Value::Object(map) + } + } + } +} + +/// WS2Pv1 Identity requirements pending +#[derive(Clone, Debug)] +pub struct WS2Pv1IdentityRequirementsPending { + pub certifications: Vec<WS2pv1CertificationPending>, + pub expired: bool, + pub is_sentry: bool, + pub membership_expires_in: u64, + pub membership_pending_expires_in: u64, + // Some fields missing ... +} + +/// WS2Pv1 Certification pending +#[derive(Copy, Clone, Debug)] +pub struct WS2pv1CertificationPending { + /// Expires in + pub expires_in: u64, + /// From + pub from: PubKey, + /// Timestamp + pub timestamp: u64, + /// To + pub to: PubKey, +} diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/received.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/received.rs new file mode 100644 index 00000000..af6b10ff --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/received.rs @@ -0,0 +1,88 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module managing the WS2Pv1 responses received. + +use crate::parsers::blocks::parse_json_block; +use crate::*; +use durs_module::ModuleReqFullId; +use durs_network::requests::*; +use durs_network_documents::NodeFullId; + +pub fn receive_response( + ws2p_module: &mut WS2Pv1Module, + module_req_full_id: ModuleReqFullId, + ws2p_req_body: WS2Pv1ReqBody, + recipient_full_id: NodeFullId, + response: serde_json::Value, +) { + match ws2p_req_body { + WS2Pv1ReqBody::GetCurrent => { + info!( + "WS2PSignal::ReceiveCurrent({}, {:?})", + (module_req_full_id.1).0, + ws2p_req_body + ); + if let Some(block) = parse_json_block(&response) { + crate::responses::sent::send_network_req_response( + ws2p_module, + module_req_full_id.0, + module_req_full_id.1, + NetworkResponse::CurrentBlock( + ModuleReqFullId(WS2Pv1Module::name(), module_req_full_id.1), + recipient_full_id, + Box::new(block), + ), + ); + } + } + WS2Pv1ReqBody::GetBlocks { + from_number: from, .. + } => { + if response.is_array() { + let mut chunk = Vec::new(); + for json_block in unwrap!(response.as_array()) { + if let Some(block) = parse_json_block(json_block) { + chunk.push(block); + } else { + warn!("WS2Pv1Module: Error : fail to parse one json block !"); + } + } + info!( + "WS2PSignal::ReceiveChunk({}, {} blocks from {})", + (module_req_full_id.1).0, + chunk.len(), + from + ); + debug!("Send chunk to followers : {}", from); + events::sent::send_network_event(ws2p_module, NetworkEvent::ReceiveBlocks(chunk)); + } + } + WS2Pv1ReqBody::GetRequirementsPending { min_cert } => { + info!( + "WS2PSignal::ReceiveRequirementsPending({}, {})", + module_req_full_id.0, min_cert + ); + debug!("----------------------------------------"); + debug!("- BEGIN IDENTITIES PENDING -"); + debug!("----------------------------------------"); + debug!("{:#?}", response); + debug!("----------------------------------------"); + debug!("- END IDENTITIES PENDING -"); + debug!("----------------------------------------"); + } + _ => {} + } +} diff --git a/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/sent.rs b/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/sent.rs new file mode 100644 index 00000000..f4823410 --- /dev/null +++ b/lib/modules/ws2p-v1-legacy/src/ws_connections/responses/sent.rs @@ -0,0 +1,40 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module managing the WS2Pv1 responses sent. + +use crate::ws_connections::responses::WS2Pv1ReqRes; +use crate::WS2Pv1Module; +use durs_network_documents::NodeFullId; +use ws::{CloseCode, Message}; + +pub fn send_response( + ws2p_module: &mut WS2Pv1Module, + ws2p_req_from: NodeFullId, + response: WS2Pv1ReqRes, +) { + if let Some(ws_sender) = ws2p_module.websockets.get(&ws2p_req_from) { + let json_response: serde_json::Value = response.into(); + if ws_sender + .0 + .send(Message::text(json_response.to_string())) + .is_err() + { + let _ = ws_sender + .0 + .close_with_reason(CloseCode::Error, "Fail to send request response !"); + } + } +} -- GitLab