From c7537c3f747fc9d5949f16f7796ed86c5bd65e50 Mon Sep 17 00:00:00 2001 From: Hugo Trentesaux Date: Tue, 14 May 2019 19:31:29 +0200 Subject: [PATCH] [feat] ws2p: implement ack and ok messages --- Cargo.lock | 1 + lib/modules/ws2p/ws2p-messages/lib.rs | 2 +- lib/modules/ws2p/ws2p-messages/v2/mod.rs | 4 +- .../ws2p-messages/v2/payload_container.rs | 5 +- lib/modules/ws2p/ws2p/Cargo.toml | 5 +- .../ws2p/src/controllers/handler/ack_msg.rs | 91 +++++++++++ .../src/controllers/handler/connect_msg.rs | 28 ++-- .../ws2p/ws2p/src/controllers/handler/mod.rs | 147 +++++++++--------- .../ws2p/src/controllers/handler/ok_msg.rs | 40 +++++ .../controllers/incoming_connections/mod.rs | 3 +- lib/modules/ws2p/ws2p/src/controllers/mod.rs | 10 +- .../controllers/outgoing_connections/mod.rs | 25 ++- .../ws2p/ws2p/tests/connection_negociation.rs | 131 +++++++++------- 13 files changed, 322 insertions(+), 170 deletions(-) create mode 100644 lib/modules/ws2p/ws2p/src/controllers/handler/ack_msg.rs create mode 100644 lib/modules/ws2p/ws2p/src/controllers/handler/ok_msg.rs diff --git a/Cargo.lock b/Cargo.lock index 7ca56d07..ea1da06d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -553,6 +553,7 @@ dependencies = [ "bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "dubp-documents 0.12.0", "dup-crypto 0.6.0", + "durs-common-tests-tools 0.1.0", "durs-common-tools 0.1.0", "durs-conf 0.1.0-a0.1", "durs-message 0.1.0-a0.1", diff --git a/lib/modules/ws2p/ws2p-messages/lib.rs b/lib/modules/ws2p/ws2p-messages/lib.rs index 8f439c3c..3ca69abf 100644 --- a/lib/modules/ws2p/ws2p-messages/lib.rs +++ b/lib/modules/ws2p/ws2p-messages/lib.rs @@ -80,7 +80,7 @@ impl WS2PMessage { pub fn parse_and_check_bin_message(bin_msg: &[u8]) -> Result { let msg: WS2PMessage = bincode::deserialize(&bin_msg)?; let hash = msg.hash(); - //println!("DEBUG: parse_and_check_bin_message: hash={:?}", hash); + //debug!("parse_and_check_bin_message: hash={:?}", hash); // Compute hash len let hash_len = 33; // Compute signature len diff --git a/lib/modules/ws2p/ws2p-messages/v2/mod.rs b/lib/modules/ws2p/ws2p-messages/v2/mod.rs index fc3f21d2..412d3345 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/mod.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/mod.rs @@ -105,7 +105,9 @@ mod tests { #[test] fn test_ws2p_message_ack() { - test_ws2p_message(WS2Pv2MessagePayload::Ack(Hash::random())); + test_ws2p_message(WS2Pv2MessagePayload::Ack { + challenge: Hash::random(), + }); } #[test] diff --git a/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs b/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs index 92bce766..8da91177 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs @@ -38,7 +38,10 @@ pub enum WS2Pv2MessagePayload { /// CONNECT message Connect(Box), /// ACK message - Ack(Hash), + Ack { + /// Hash previously sent in CONNECT message + challenge: Hash, + }, /// SECRET_FLAGS Message SecretFlags(WS2Pv2SecretFlagsMsg), /// OK Message diff --git a/lib/modules/ws2p/ws2p/Cargo.toml b/lib/modules/ws2p/ws2p/Cargo.toml index c35642da..b368910e 100644 --- a/lib/modules/ws2p/ws2p/Cargo.toml +++ b/lib/modules/ws2p/ws2p/Cargo.toml @@ -29,5 +29,8 @@ serde_json = "1.0.*" structopt= "0.2.*" ws = { version = "0.7.*", features = ["permessage-deflate"] } +[dev-dependencies] +durs-common-tests-tools = { path = "../../../tests-tools/common-tests-tools" } + [features] -ssl = ["ws/ssl"] \ No newline at end of file +ssl = ["ws/ssl"] diff --git a/lib/modules/ws2p/ws2p/src/controllers/handler/ack_msg.rs b/lib/modules/ws2p/ws2p/src/controllers/handler/ack_msg.rs new file mode 100644 index 00000000..fa3e948b --- /dev/null +++ b/lib/modules/ws2p/ws2p/src/controllers/handler/ack_msg.rs @@ -0,0 +1,91 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Process WS2P ACK message. + +use crate::controllers::handler::*; +use crate::controllers::*; +use dup_crypto::hashs::Hash; +use durs_common_tools::fatal_error; +use durs_ws2p_messages::v2::ok::WS2Pv2OkMsg; +use ws::CloseCode; + +/// Process WS2pv2 ACK Message +pub fn process_ws2p_v2_ack_msg( + handler: &mut Ws2pConnectionHandler, // handler contains original challenge + ack_msg_challenge: Hash, +) { + debug!("Receive ACK message !"); + + match handler.conn_datas.state { + WS2PConnectionState::OkMsgOkWaitingAckMsg => { + // already sent ack message and received ok response + process(handler, ack_msg_challenge, WS2PConnectionState::Established); + } + WS2PConnectionState::ConnectMessOk => { + // ack message not yet sent + process(handler, ack_msg_challenge, WS2PConnectionState::AckMsgOk); + } + _ => { + let _ = handler + .ws + .0 + .close_with_reason(CloseCode::Invalid, "Unexpected ACK message !"); + } + } +} + +#[inline] +// process and apply given status in case of success +fn process( + handler: &mut Ws2pConnectionHandler, + ack_msg_challenge: Hash, + success_status: WS2PConnectionState, +) { + if handler.conn_datas.challenge != ack_msg_challenge { + handler.update_status(WS2PConnectionState::Denial); + } else { + handler.update_status(success_status); + send_ok_msg(handler); + } +} + +// send ok message +fn send_ok_msg(handler: &mut Ws2pConnectionHandler) { + // generate empty Ok message + let ok_msg: WS2Pv2OkMsg = Default::default(); + + // Encapsulate and binarize OK message + if let Ok((_, bin_ok_msg)) = WS2Pv2Message::encapsulate_payload( + handler.currency.clone(), + handler.local_node.my_node_id, + handler.local_node.my_key_pair, + WS2Pv2MessagePayload::Ok(ok_msg), + ) { + // Send Ok Message + match handler.ws.0.send(Message::binary(bin_ok_msg)) { + Ok(()) => {} + Err(_) => { + handler.conn_datas.state = WS2PConnectionState::Unreachable; + let _ = handler + .ws + .0 + .close_with_reason(CloseCode::Error, "Fail to send Ok message !"); + } + } + } else { + fatal_error!("Dev error: Fail to sign own ok message !"); + } +} diff --git a/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs b/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs index 7ba8205d..e1ce0056 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs @@ -13,10 +13,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -//! Process WS2P CONNECT mesage. +//! Process WS2P CONNECT message. use crate::controllers::handler::*; use crate::controllers::*; +use durs_common_tools::fatal_error; use durs_network_documents::NodeFullId; use durs_ws2p_messages::v2::connect::WS2Pv2ConnectMsg; use ws::CloseCode; @@ -29,7 +30,7 @@ pub fn process_ws2p_v2_connect_msg( remote_full_id: NodeFullId, connect_msg: &WS2Pv2ConnectMsg, ) { - println!("DEBUG: Receive CONNECT message !"); + debug!("Receive CONNECT message !"); // Get remote node datas let remote_challenge = connect_msg.challenge; @@ -38,7 +39,7 @@ pub fn process_ws2p_v2_connect_msg( peer_card: None, current_blockstamp: None, }; - if let WS2PConnectionState::WaitingConnectMess = handler.conn_datas.state { + if let WS2PConnectionState::WaitingConnectMsg = handler.conn_datas.state { // Check remote node datas if let WS2Pv2ConnectType::Incoming = handler.conn_datas.connect_type { handler.conn_datas.remote_full_id = Some(remote_full_id); @@ -97,25 +98,26 @@ pub fn process_ws2p_v2_connect_msg( handler.send_new_conn_state_to_service(); // Encapsulate and binarize ACK message - let (_, bin_ack_msg) = WS2Pv2Message::encapsulate_payload( + if let Ok((_, bin_ack_msg)) = WS2Pv2Message::encapsulate_payload( handler.currency.clone(), handler.local_node.my_node_id, handler.local_node.my_key_pair, - WS2Pv2MessagePayload::Ack(remote_challenge), - ) - .expect("WS2P : Fail to sign own ack message !"); - // Send ACk Message - match handler.ws.0.send(Message::binary(bin_ack_msg)) { - Ok(()) => { + WS2Pv2MessagePayload::Ack { + challenge: remote_challenge, + }, + ) { + // Send Ack Message + if let Ok(()) = handler.ws.0.send(Message::binary(bin_ack_msg)) { // Update state handler.conn_datas.state = WS2PConnectionState::ConnectMessOk; - } - Err(_) => { + } else { handler.conn_datas.state = WS2PConnectionState::Unreachable; let _ = handler .ws .0 - .close_with_reason(CloseCode::Error, "Fail to send ACk message !"); + .close_with_reason(CloseCode::Error, "Fail to send Ack message !"); } + } else { + fatal_error!("Dev error: Fail to sign own ack message !") } } diff --git a/lib/modules/ws2p/ws2p/src/controllers/handler/mod.rs b/lib/modules/ws2p/ws2p/src/controllers/handler/mod.rs index 45e9fc97..a30a4cef 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/handler/mod.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/handler/mod.rs @@ -15,17 +15,17 @@ //! WS2P connection handler. +pub mod ack_msg; pub mod connect_msg; +pub mod ok_msg; use crate::constants::*; use crate::controllers::*; +use crate::services::Ws2pServiceSender; use crate::services::*; -use ws::{util::Token, CloseCode, /*Frame,*/ Handler, Handshake, Message}; -//use dup_crypto::keys::KeyPairEnum; use dubp_documents::CurrencyName; +use durs_common_tools::fatal_error; use durs_network_documents::NodeFullId; -//use durs_ws2p_messages::v2::api_features::WS2PFeatures; -use crate::services::Ws2pServiceSender; use durs_ws2p_messages::v2::connect::generate_connect_message; use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload; use durs_ws2p_messages::v2::WS2Pv2Message; @@ -35,6 +35,7 @@ use std::ops::Deref; use std::sync::mpsc; use std::thread; use std::time::Duration; +use ws::{util::Token, CloseCode, Handler, Handshake, Message}; const CONNECT: Token = Token(1); const EXPIRE: Token = Token(2); @@ -77,7 +78,8 @@ impl Ws2pConnectionHandler { ) = mpsc::channel(); // Send controller sender to service - println!("DEBUG: Send controller sender to service"); + debug!("Send controller sender to service"); + service_sender.send(Ws2pServiceSender::ControllerSender(sender))?; Ok(Ws2pConnectionHandler { @@ -103,6 +105,11 @@ impl Ws2pConnectionHandler { )) .expect("WS2p Service unreacheable !"); } + #[inline] + fn update_status(&mut self, status: WS2PConnectionState) { + self.conn_datas.state = status; + self.send_new_conn_state_to_service(); + } } fn print_opt_addr(addr: Option) -> String { @@ -121,14 +128,13 @@ impl Handler for Ws2pConnectionHandler { // Handler state or reject the connection based on the details of the Request // or Response, such as by checking cookies or Auth headers. fn on_open(&mut self, handshake: Handshake) -> ws::Result<()> { - #[cfg(test)] - println!( - "TESTS: open websocket from {}", + debug!( + "open websocket from {}", print_opt_addr(handshake.peer_addr) ); // Update connection state - self.conn_datas.state = WS2PConnectionState::TryToSendConnectMess; + self.conn_datas.state = WS2PConnectionState::TryToSendConnectMsg; self.send_new_conn_state_to_service(); // Generate connect message @@ -140,77 +146,58 @@ impl Handler for Ws2pConnectionHandler { ); // Encapsulate and binarize connect message - let (_ws2p_full_msg, bin_connect_msg) = WS2Pv2Message::encapsulate_payload( + if let Ok((_ws2p_full_msg, bin_connect_msg)) = WS2Pv2Message::encapsulate_payload( self.currency.clone(), self.local_node.my_node_id, self.local_node.my_key_pair, WS2Pv2MessagePayload::Connect(Box::new(connect_msg)), - ) - .expect("WS2P : Fail to sign own connect message !"); - - // Start negociation timeouts - self.ws.0.timeout(*WS2P_NEGOTIATION_TIMEOUT, CONNECT)?; - // Start expire timeout - self.ws - .0 - .timeout(*WS2P_EXPIRE_TIMEOUT_IN_SECS * 1_000, EXPIRE)?; + ) { + // Start negociation timeouts + self.ws.0.timeout(*WS2P_NEGOTIATION_TIMEOUT, CONNECT)?; + // Start expire timeout + self.ws + .0 + .timeout(*WS2P_EXPIRE_TIMEOUT_IN_SECS * 1_000, EXPIRE)?; - // Send connect message - match self.ws.0.send(Message::binary(bin_connect_msg)) { - Ok(()) => { - // Update state - if let WS2PConnectionState::TryToSendConnectMess = self.conn_datas.state { - self.conn_datas.state = WS2PConnectionState::WaitingConnectMess; - self.send_new_conn_state_to_service(); + // Send connect message + match self.ws.0.send(Message::binary(bin_connect_msg)) { + Ok(()) => { + // Update state + if let WS2PConnectionState::TryToSendConnectMsg = self.conn_datas.state { + self.conn_datas.state = WS2PConnectionState::WaitingConnectMsg; + self.send_new_conn_state_to_service(); + } + // Log + info!( + "Send CONNECT message to {}", + print_opt_addr(handshake.peer_addr) + ); + debug!( + "Succesfully send CONNECT message to {}", + print_opt_addr(handshake.peer_addr) + ); + } + Err(e) => { + self.conn_datas.state = WS2PConnectionState::Unreachable; + warn!( + "Fail to send CONNECT message to {} : {}", + print_opt_addr(handshake.peer_addr), + e + ); + debug!( + "Fail send CONNECT message to {}", + print_opt_addr(handshake.peer_addr) + ); + let _ = self + .ws + .0 + .close_with_reason(CloseCode::Error, "Fail to send CONNECT message !"); } - // Log - info!( - "Send CONNECT message to {}", - print_opt_addr(handshake.peer_addr) - ); - #[cfg(test)] - println!( - "TESTS: Succesfully send CONNECT message to {}", - print_opt_addr(handshake.peer_addr) - ); - } - Err(e) => { - self.conn_datas.state = WS2PConnectionState::Unreachable; - warn!( - "Fail to send CONNECT message to {} : {}", - print_opt_addr(handshake.peer_addr), - e - ); - #[cfg(test)] - println!( - "TESTS: Fail send CONNECT message to {}", - print_opt_addr(handshake.peer_addr) - ); - let _ = self - .ws - .0 - .close_with_reason(CloseCode::Error, "Fail to send CONNECT message !"); } - } - /* - // Send ws::Sender to WS2PConductor - let result = self - .conductor_sender - .send(WS2PThreadSignal::WS2PConnectionMessage( - WS2PConnectionMessage( - self.conn_datas.node_full_id(), - WS2PConnectionMessagePayload::WebsocketOk(WsSender(self.ws.clone())), - ), - )); - // If WS2PConductor is unrechable, close connection. - if result.is_err() { - debug!("Close ws2p connection because ws2p main thread is unrechable !"); - self.ws.close(CloseCode::Normal) + Ok(()) } else { - // Send CONNECT Message - self.ws.send(self.connect_message.clone()) - }*/ - Ok(()) + fatal_error!("Dev error: Fail to sign own connect message !"); + } } // `on_message` is roughly equivalent to the Handler closure. It takes a `Message` @@ -243,7 +230,7 @@ impl Handler for Ws2pConnectionHandler { self.conn_datas.last_mess_time = SystemTime::now(); if msg.is_binary() { - println!("DEBUG: Receive new message there is not a spam !"); + debug!("Receive new message there is not a spam !"); match WS2PMessage::parse_and_check_bin_message(&msg.into_data()) { Ok(valid_msg) => match valid_msg { WS2PMessage::V2(msg_v2) => { @@ -260,9 +247,17 @@ impl Handler for Ws2pConnectionHandler { connect_msg, ); } - WS2Pv2MessagePayload::Ack(_) => {} + WS2Pv2MessagePayload::Ack { + challenge: ack_msg_challenge, + } => { + // Process ack message + ack_msg::process_ws2p_v2_ack_msg(self, ack_msg_challenge); + } WS2Pv2MessagePayload::SecretFlags(_) => {} - WS2Pv2MessagePayload::Ok(_) => {} + WS2Pv2MessagePayload::Ok(_) => { + // Process ok message + ok_msg::process_ws2p_v2_ok_msg(self); + } WS2Pv2MessagePayload::Ko(_) => {} _ => { if let WS2PConnectionState::Established = self.conn_datas.state { @@ -278,7 +273,7 @@ impl Handler for Ws2pConnectionHandler { } }, Err(ws2p_msg_err) => { - println!("DEBUG: Message is invalid : {:?}", ws2p_msg_err); + warn!("Message is invalid : {:?}", ws2p_msg_err); self.count_invalid_msgs += 1; if self.count_invalid_msgs >= *WS2P_INVALID_MSGS_LIMIT { let _ = self.ws.0.close_with_reason( diff --git a/lib/modules/ws2p/ws2p/src/controllers/handler/ok_msg.rs b/lib/modules/ws2p/ws2p/src/controllers/handler/ok_msg.rs new file mode 100644 index 00000000..a59b596b --- /dev/null +++ b/lib/modules/ws2p/ws2p/src/controllers/handler/ok_msg.rs @@ -0,0 +1,40 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! Process WS2P OK message. + +use crate::controllers::handler::*; +use crate::controllers::*; +use ws::CloseCode; + +/// Process WS2pv2 OK Message +pub fn process_ws2p_v2_ok_msg(handler: &mut Ws2pConnectionHandler) { + debug!("Receive OK message !"); + + match handler.conn_datas.state { + WS2PConnectionState::ConnectMessOk => { + handler.update_status(WS2PConnectionState::OkMsgOkWaitingAckMsg); + } + WS2PConnectionState::AckMsgOk => { + handler.update_status(WS2PConnectionState::Established); + } + _ => { + let _ = handler + .ws + .0 + .close_with_reason(CloseCode::Invalid, "Unexpected OK message !"); + } + } +} diff --git a/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs b/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs index 702e9729..81797f1c 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs @@ -39,11 +39,10 @@ pub fn listen_on_ws2p_v2_endpoint( // Log info!("Listen on {} ...", ws_url); - println!("DEBUG: call function listen({}) ...", ws_url); // Connect to websocket listen(ws_url, move |ws| { - println!("DEBUG: Listen on host:port..."); + info!("Listen on host:port..."); DeflateBuilder::new().build( Ws2pConnectionHandler::try_new( WsSender(ws), diff --git a/lib/modules/ws2p/ws2p/src/controllers/mod.rs b/lib/modules/ws2p/ws2p/src/controllers/mod.rs index 602c7731..778a4e48 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/mod.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/mod.rs @@ -60,11 +60,11 @@ pub enum WS2PConnectionState { /// Websocket error WSError, /// Try to send connect message - TryToSendConnectMess, + TryToSendConnectMsg, /// Endpoint unreachable Unreachable, /// Waiting connect message - WaitingConnectMess, + WaitingConnectMsg, /// No response NoResponse, /// Negociation timeout @@ -72,14 +72,14 @@ pub enum WS2PConnectionState { /// Receive valid connect message ConnectMessOk, /// Receive valid OK message but wait ACK message - OkMessOkWaitingAckMess, + OkMsgOkWaitingAckMsg, /// Receive valid ACK message - AckMessOk, + AckMsgOk, /// Connection denial (maybe due to many different reasons : receive wrong message, wrong format, wrong signature, etc) Denial, /// Connection closed Close, - /// Connection succesfully established + /// Connection successfully established Established, } diff --git a/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections/mod.rs b/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections/mod.rs index dd186d3e..ec854a85 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections/mod.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections/mod.rs @@ -15,11 +15,11 @@ //! WS2P outgoing connections controllers. -use dubp_documents::CurrencyName; -//use durs_module::ModuleReqId; use crate::controllers::handler::Ws2pConnectionHandler; use crate::controllers::*; use crate::services::*; +use dubp_documents::CurrencyName; +use durs_common_tools::fatal_error; use durs_network_documents::network_endpoint::EndpointEnum; use durs_network_documents::NodeFullId; use ws::connect; @@ -47,19 +47,18 @@ pub fn connect_to_ws2p_v2_endpoint( // Log info!("Try connection to {} ...", ws_url); - println!("DEBUG: Try connection to {} ...", ws_url); // Connect to websocket connect(ws_url, move |ws| { - DeflateBuilder::new().build( - Ws2pConnectionHandler::try_new( - WsSender(ws), - service_sender.clone(), - currency.clone(), - self_node.clone(), - conn_meta_datas.clone(), - ) - .expect("WS2P Service unrechable"), - ) + match Ws2pConnectionHandler::try_new( + WsSender(ws), + service_sender.clone(), + currency.clone(), + self_node.clone(), + conn_meta_datas.clone(), + ) { + Ok(handler) => DeflateBuilder::new().build(handler), + Err(_e) => fatal_error!("WS2P Service unreachable"), + } }) } diff --git a/lib/modules/ws2p/ws2p/tests/connection_negociation.rs b/lib/modules/ws2p/ws2p/tests/connection_negociation.rs index e5443a3c..7006a1b2 100644 --- a/lib/modules/ws2p/ws2p/tests/connection_negociation.rs +++ b/lib/modules/ws2p/ws2p/tests/connection_negociation.rs @@ -16,6 +16,7 @@ use dubp_documents::CurrencyName; use dup_crypto::keys::KeyPair; use dup_crypto::keys::*; +use durs_common_tests_tools::logger::init_logger_stdout; use durs_network_documents::network_endpoint::*; use durs_network_documents::*; use durs_ws2p::controllers::incoming_connections::*; @@ -36,15 +37,15 @@ pub fn currency() -> CurrencyName { pub fn keypair1() -> ed25519::KeyPair { ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate( - "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV".as_bytes(), - "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV_".as_bytes(), + "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV1".as_bytes(), + "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV1_".as_bytes(), ) } pub fn keypair2() -> ed25519::KeyPair { ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate( - "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV".as_bytes(), - "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV_".as_bytes(), + "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV2".as_bytes(), + "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV2_".as_bytes(), ) } @@ -68,6 +69,11 @@ fn client_node() -> MySelfWs2pNode { #[test] #[cfg(unix)] fn test_connection_negociation() { + init_logger_stdout(); + + // ===== initialization ===== + // client and server are initialized and launched in separate threads + let server_node = server_node(); let client_node = client_node(); @@ -116,70 +122,81 @@ fn test_connection_negociation() { ) }); - // Listen client service channel : we must receive controller sender - if let Ok(Ws2pServiceSender::ControllerSender(_)) = client_service_channel - .1 - .recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) - { - } else { - panic!("Not receive client controller sender"); - } - - // Listen client service channel : we must receive status TryToSendConnectMess - test_expected_states( - &client_service_channel.1, - vec![WS2PConnectionState::TryToSendConnectMess], + // ===== opening connection ===== + // we must get Ws2pServiceSender::ControllerSender from the client and server threads (but we ignore them) + // we also test that the statuses match expected ones + + let _client_controller = get_controller(&client_service_channel.1); + let _server_controller = get_controller(&server_service_channel.1); + + // TryToSendConnectMsg + let state = get_state(&client_service_channel.1); // client + assert!(state == WS2PConnectionState::TryToSendConnectMsg); + let state = get_state(&server_service_channel.1); // server + assert!(state == WS2PConnectionState::TryToSendConnectMsg); + + // WaitingConnectMsg + let state = get_state(&client_service_channel.1); // client + assert!(state == WS2PConnectionState::WaitingConnectMsg); + let state = get_state(&server_service_channel.1); // server + assert!(state == WS2PConnectionState::WaitingConnectMsg); + + // ConnectMessOk + let state = get_state(&client_service_channel.1); // client + assert!(state == WS2PConnectionState::ConnectMessOk); + let state = get_state(&server_service_channel.1); // server + assert!(state == WS2PConnectionState::ConnectMessOk); + + // Ack message + let state_1 = get_state(&client_service_channel.1); // client + let state_2 = get_state(&server_service_channel.1); // server + + println!("state_1: {:?}", &state_1); + println!("state_2: {:?}", &state_2); + + assert!( + // client faster + ( state_1 == WS2PConnectionState::OkMsgOkWaitingAckMsg && + state_2 == WS2PConnectionState::AckMsgOk ) || + // server faster + ( state_1 == WS2PConnectionState::AckMsgOk && + state_2 == WS2PConnectionState::OkMsgOkWaitingAckMsg ) || + // ack messages received at the same time + ( state_1 == WS2PConnectionState::AckMsgOk && + state_2 == WS2PConnectionState::AckMsgOk ) ); - // Listen client service channel : we must receive status WaitingConnectMess - test_expected_states( - &client_service_channel.1, - vec![WS2PConnectionState::WaitingConnectMess], - ); + // Established + let state = get_state(&client_service_channel.1); // client + assert!(state == WS2PConnectionState::Established); + let state = get_state(&server_service_channel.1); // server + assert!(state == WS2PConnectionState::Established); +} - // Listen server service channel : we must receive controller sender - if let Ok(Ws2pServiceSender::ControllerSender(_)) = server_service_channel - .1 +// === functions used in above test === + +// get the state in a receiver +fn get_state(service_receiver: &mpsc::Receiver) -> WS2PConnectionState { + if let Ws2pServiceSender::ChangeConnectionState(_, new_state) = service_receiver .recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) + .expect("Receive nothing from controller :") { + return new_state; } else { - panic!("Not receive server controller sender"); + panic!("Expect signal ChangeConnectionState, receive other !"); } - - // Listen server service channel : we must receive status TryToSendConnectMess - test_expected_states( - &server_service_channel.1, - vec![WS2PConnectionState::TryToSendConnectMess], - ); - - // Listen server service channel : we must receive status WaitingConnectMess - test_expected_states( - &server_service_channel.1, - vec![WS2PConnectionState::WaitingConnectMess], - ); - - // Listen server service channel : we must receive status ConnectMessOk - test_expected_states( - &server_service_channel.1, - vec![WS2PConnectionState::ConnectMessOk], - ); } -fn test_expected_states( +// get the controller from the thread +fn get_controller( service_receiver: &mpsc::Receiver, - expected_states: Vec, -) -> WS2PConnectionState { - if let Ws2pServiceSender::ChangeConnectionState(_, new_state) = service_receiver - .recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) - .expect("Receive nothing from controller :") +) -> mpsc::Sender { + // we must receive controller sender + if let Ok(Ws2pServiceSender::ControllerSender(controller)) = + service_receiver.recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) { - for expected_state in expected_states { - if new_state == expected_state { - return new_state; - } - } - panic!("Receive unexpected state: {:?} !", new_state); + return controller; } else { - panic!("Expect signal ChangeConnectionState, receive other !"); + panic!("Not receive client controller sender"); } } -- 2.22.0