Skip to content
Snippets Groups Projects
Commit c7537c3f authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

[feat] ws2p: implement ack and ok messages

parent 54ca7aaa
Branches
Tags
1 merge request!147Resolve "WS2Pv2: Implement ACK and OK messages processing"
Showing
with 322 additions and 170 deletions
...@@ -553,6 +553,7 @@ dependencies = [ ...@@ -553,6 +553,7 @@ dependencies = [
"bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"dubp-documents 0.12.0", "dubp-documents 0.12.0",
"dup-crypto 0.6.0", "dup-crypto 0.6.0",
"durs-common-tests-tools 0.1.0",
"durs-common-tools 0.1.0", "durs-common-tools 0.1.0",
"durs-conf 0.1.0-a0.1", "durs-conf 0.1.0-a0.1",
"durs-message 0.1.0-a0.1", "durs-message 0.1.0-a0.1",
......
...@@ -80,7 +80,7 @@ impl WS2PMessage { ...@@ -80,7 +80,7 @@ impl WS2PMessage {
pub fn parse_and_check_bin_message(bin_msg: &[u8]) -> Result<WS2PMessage, WS2PMessageError> { pub fn parse_and_check_bin_message(bin_msg: &[u8]) -> Result<WS2PMessage, WS2PMessageError> {
let msg: WS2PMessage = bincode::deserialize(&bin_msg)?; let msg: WS2PMessage = bincode::deserialize(&bin_msg)?;
let hash = msg.hash(); let hash = msg.hash();
//println!("DEBUG: parse_and_check_bin_message: hash={:?}", hash); //debug!("parse_and_check_bin_message: hash={:?}", hash);
// Compute hash len // Compute hash len
let hash_len = 33; let hash_len = 33;
// Compute signature len // Compute signature len
......
...@@ -105,7 +105,9 @@ mod tests { ...@@ -105,7 +105,9 @@ mod tests {
#[test] #[test]
fn test_ws2p_message_ack() { fn test_ws2p_message_ack() {
test_ws2p_message(WS2Pv2MessagePayload::Ack(Hash::random())); test_ws2p_message(WS2Pv2MessagePayload::Ack {
challenge: Hash::random(),
});
} }
#[test] #[test]
......
...@@ -38,7 +38,10 @@ pub enum WS2Pv2MessagePayload { ...@@ -38,7 +38,10 @@ pub enum WS2Pv2MessagePayload {
/// CONNECT message /// CONNECT message
Connect(Box<WS2Pv2ConnectMsg>), Connect(Box<WS2Pv2ConnectMsg>),
/// ACK message /// ACK message
Ack(Hash), Ack {
/// Hash previously sent in CONNECT message
challenge: Hash,
},
/// SECRET_FLAGS Message /// SECRET_FLAGS Message
SecretFlags(WS2Pv2SecretFlagsMsg), SecretFlags(WS2Pv2SecretFlagsMsg),
/// OK Message /// OK Message
......
...@@ -29,5 +29,8 @@ serde_json = "1.0.*" ...@@ -29,5 +29,8 @@ serde_json = "1.0.*"
structopt= "0.2.*" structopt= "0.2.*"
ws = { version = "0.7.*", features = ["permessage-deflate"] } ws = { version = "0.7.*", features = ["permessage-deflate"] }
[dev-dependencies]
durs-common-tests-tools = { path = "../../../tests-tools/common-tests-tools" }
[features] [features]
ssl = ["ws/ssl"] ssl = ["ws/ssl"]
// Copyright (C) 2018 The Durs Project Developers.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Process WS2P ACK message.
use crate::controllers::handler::*;
use crate::controllers::*;
use dup_crypto::hashs::Hash;
use durs_common_tools::fatal_error;
use durs_ws2p_messages::v2::ok::WS2Pv2OkMsg;
use ws::CloseCode;
/// Process WS2pv2 ACK Message
pub fn process_ws2p_v2_ack_msg(
handler: &mut Ws2pConnectionHandler, // handler contains original challenge
ack_msg_challenge: Hash,
) {
debug!("Receive ACK message !");
match handler.conn_datas.state {
WS2PConnectionState::OkMsgOkWaitingAckMsg => {
// already sent ack message and received ok response
process(handler, ack_msg_challenge, WS2PConnectionState::Established);
}
WS2PConnectionState::ConnectMessOk => {
// ack message not yet sent
process(handler, ack_msg_challenge, WS2PConnectionState::AckMsgOk);
}
_ => {
let _ = handler
.ws
.0
.close_with_reason(CloseCode::Invalid, "Unexpected ACK message !");
}
}
}
#[inline]
// process and apply given status in case of success
fn process(
handler: &mut Ws2pConnectionHandler,
ack_msg_challenge: Hash,
success_status: WS2PConnectionState,
) {
if handler.conn_datas.challenge != ack_msg_challenge {
handler.update_status(WS2PConnectionState::Denial);
} else {
handler.update_status(success_status);
send_ok_msg(handler);
}
}
// send ok message
fn send_ok_msg(handler: &mut Ws2pConnectionHandler) {
// generate empty Ok message
let ok_msg: WS2Pv2OkMsg = Default::default();
// Encapsulate and binarize OK message
if let Ok((_, bin_ok_msg)) = WS2Pv2Message::encapsulate_payload(
handler.currency.clone(),
handler.local_node.my_node_id,
handler.local_node.my_key_pair,
WS2Pv2MessagePayload::Ok(ok_msg),
) {
// Send Ok Message
match handler.ws.0.send(Message::binary(bin_ok_msg)) {
Ok(()) => {}
Err(_) => {
handler.conn_datas.state = WS2PConnectionState::Unreachable;
let _ = handler
.ws
.0
.close_with_reason(CloseCode::Error, "Fail to send Ok message !");
}
}
} else {
fatal_error!("Dev error: Fail to sign own ok message !");
}
}
...@@ -13,10 +13,11 @@ ...@@ -13,10 +13,11 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Process WS2P CONNECT mesage. //! Process WS2P CONNECT message.
use crate::controllers::handler::*; use crate::controllers::handler::*;
use crate::controllers::*; use crate::controllers::*;
use durs_common_tools::fatal_error;
use durs_network_documents::NodeFullId; use durs_network_documents::NodeFullId;
use durs_ws2p_messages::v2::connect::WS2Pv2ConnectMsg; use durs_ws2p_messages::v2::connect::WS2Pv2ConnectMsg;
use ws::CloseCode; use ws::CloseCode;
...@@ -29,7 +30,7 @@ pub fn process_ws2p_v2_connect_msg( ...@@ -29,7 +30,7 @@ pub fn process_ws2p_v2_connect_msg(
remote_full_id: NodeFullId, remote_full_id: NodeFullId,
connect_msg: &WS2Pv2ConnectMsg, connect_msg: &WS2Pv2ConnectMsg,
) { ) {
println!("DEBUG: Receive CONNECT message !"); debug!("Receive CONNECT message !");
// Get remote node datas // Get remote node datas
let remote_challenge = connect_msg.challenge; let remote_challenge = connect_msg.challenge;
...@@ -38,7 +39,7 @@ pub fn process_ws2p_v2_connect_msg( ...@@ -38,7 +39,7 @@ pub fn process_ws2p_v2_connect_msg(
peer_card: None, peer_card: None,
current_blockstamp: None, current_blockstamp: None,
}; };
if let WS2PConnectionState::WaitingConnectMess = handler.conn_datas.state { if let WS2PConnectionState::WaitingConnectMsg = handler.conn_datas.state {
// Check remote node datas // Check remote node datas
if let WS2Pv2ConnectType::Incoming = handler.conn_datas.connect_type { if let WS2Pv2ConnectType::Incoming = handler.conn_datas.connect_type {
handler.conn_datas.remote_full_id = Some(remote_full_id); handler.conn_datas.remote_full_id = Some(remote_full_id);
...@@ -97,25 +98,26 @@ pub fn process_ws2p_v2_connect_msg( ...@@ -97,25 +98,26 @@ pub fn process_ws2p_v2_connect_msg(
handler.send_new_conn_state_to_service(); handler.send_new_conn_state_to_service();
// Encapsulate and binarize ACK message // Encapsulate and binarize ACK message
let (_, bin_ack_msg) = WS2Pv2Message::encapsulate_payload( if let Ok((_, bin_ack_msg)) = WS2Pv2Message::encapsulate_payload(
handler.currency.clone(), handler.currency.clone(),
handler.local_node.my_node_id, handler.local_node.my_node_id,
handler.local_node.my_key_pair, handler.local_node.my_key_pair,
WS2Pv2MessagePayload::Ack(remote_challenge), WS2Pv2MessagePayload::Ack {
) challenge: remote_challenge,
.expect("WS2P : Fail to sign own ack message !"); },
// Send ACk Message ) {
match handler.ws.0.send(Message::binary(bin_ack_msg)) { // Send Ack Message
Ok(()) => { if let Ok(()) = handler.ws.0.send(Message::binary(bin_ack_msg)) {
// Update state // Update state
handler.conn_datas.state = WS2PConnectionState::ConnectMessOk; handler.conn_datas.state = WS2PConnectionState::ConnectMessOk;
} } else {
Err(_) => {
handler.conn_datas.state = WS2PConnectionState::Unreachable; handler.conn_datas.state = WS2PConnectionState::Unreachable;
let _ = handler let _ = handler
.ws .ws
.0 .0
.close_with_reason(CloseCode::Error, "Fail to send ACk message !"); .close_with_reason(CloseCode::Error, "Fail to send Ack message !");
} }
} else {
fatal_error!("Dev error: Fail to sign own ack message !")
} }
} }
...@@ -15,17 +15,17 @@ ...@@ -15,17 +15,17 @@
//! WS2P connection handler. //! WS2P connection handler.
pub mod ack_msg;
pub mod connect_msg; pub mod connect_msg;
pub mod ok_msg;
use crate::constants::*; use crate::constants::*;
use crate::controllers::*; use crate::controllers::*;
use crate::services::Ws2pServiceSender;
use crate::services::*; use crate::services::*;
use ws::{util::Token, CloseCode, /*Frame,*/ Handler, Handshake, Message};
//use dup_crypto::keys::KeyPairEnum;
use dubp_documents::CurrencyName; use dubp_documents::CurrencyName;
use durs_common_tools::fatal_error;
use durs_network_documents::NodeFullId; use durs_network_documents::NodeFullId;
//use durs_ws2p_messages::v2::api_features::WS2PFeatures;
use crate::services::Ws2pServiceSender;
use durs_ws2p_messages::v2::connect::generate_connect_message; use durs_ws2p_messages::v2::connect::generate_connect_message;
use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload; use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload;
use durs_ws2p_messages::v2::WS2Pv2Message; use durs_ws2p_messages::v2::WS2Pv2Message;
...@@ -35,6 +35,7 @@ use std::ops::Deref; ...@@ -35,6 +35,7 @@ use std::ops::Deref;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use ws::{util::Token, CloseCode, Handler, Handshake, Message};
const CONNECT: Token = Token(1); const CONNECT: Token = Token(1);
const EXPIRE: Token = Token(2); const EXPIRE: Token = Token(2);
...@@ -77,7 +78,8 @@ impl Ws2pConnectionHandler { ...@@ -77,7 +78,8 @@ impl Ws2pConnectionHandler {
) = mpsc::channel(); ) = mpsc::channel();
// Send controller sender to service // Send controller sender to service
println!("DEBUG: Send controller sender to service"); debug!("Send controller sender to service");
service_sender.send(Ws2pServiceSender::ControllerSender(sender))?; service_sender.send(Ws2pServiceSender::ControllerSender(sender))?;
Ok(Ws2pConnectionHandler { Ok(Ws2pConnectionHandler {
...@@ -103,6 +105,11 @@ impl Ws2pConnectionHandler { ...@@ -103,6 +105,11 @@ impl Ws2pConnectionHandler {
)) ))
.expect("WS2p Service unreacheable !"); .expect("WS2p Service unreacheable !");
} }
#[inline]
fn update_status(&mut self, status: WS2PConnectionState) {
self.conn_datas.state = status;
self.send_new_conn_state_to_service();
}
} }
fn print_opt_addr(addr: Option<SocketAddr>) -> String { fn print_opt_addr(addr: Option<SocketAddr>) -> String {
...@@ -121,14 +128,13 @@ impl Handler for Ws2pConnectionHandler { ...@@ -121,14 +128,13 @@ impl Handler for Ws2pConnectionHandler {
// Handler state or reject the connection based on the details of the Request // Handler state or reject the connection based on the details of the Request
// or Response, such as by checking cookies or Auth headers. // or Response, such as by checking cookies or Auth headers.
fn on_open(&mut self, handshake: Handshake) -> ws::Result<()> { fn on_open(&mut self, handshake: Handshake) -> ws::Result<()> {
#[cfg(test)] debug!(
println!( "open websocket from {}",
"TESTS: open websocket from {}",
print_opt_addr(handshake.peer_addr) print_opt_addr(handshake.peer_addr)
); );
// Update connection state // Update connection state
self.conn_datas.state = WS2PConnectionState::TryToSendConnectMess; self.conn_datas.state = WS2PConnectionState::TryToSendConnectMsg;
self.send_new_conn_state_to_service(); self.send_new_conn_state_to_service();
// Generate connect message // Generate connect message
...@@ -140,14 +146,12 @@ impl Handler for Ws2pConnectionHandler { ...@@ -140,14 +146,12 @@ impl Handler for Ws2pConnectionHandler {
); );
// Encapsulate and binarize connect message // Encapsulate and binarize connect message
let (_ws2p_full_msg, bin_connect_msg) = WS2Pv2Message::encapsulate_payload( if let Ok((_ws2p_full_msg, bin_connect_msg)) = WS2Pv2Message::encapsulate_payload(
self.currency.clone(), self.currency.clone(),
self.local_node.my_node_id, self.local_node.my_node_id,
self.local_node.my_key_pair, self.local_node.my_key_pair,
WS2Pv2MessagePayload::Connect(Box::new(connect_msg)), WS2Pv2MessagePayload::Connect(Box::new(connect_msg)),
) ) {
.expect("WS2P : Fail to sign own connect message !");
// Start negociation timeouts // Start negociation timeouts
self.ws.0.timeout(*WS2P_NEGOTIATION_TIMEOUT, CONNECT)?; self.ws.0.timeout(*WS2P_NEGOTIATION_TIMEOUT, CONNECT)?;
// Start expire timeout // Start expire timeout
...@@ -159,8 +163,8 @@ impl Handler for Ws2pConnectionHandler { ...@@ -159,8 +163,8 @@ impl Handler for Ws2pConnectionHandler {
match self.ws.0.send(Message::binary(bin_connect_msg)) { match self.ws.0.send(Message::binary(bin_connect_msg)) {
Ok(()) => { Ok(()) => {
// Update state // Update state
if let WS2PConnectionState::TryToSendConnectMess = self.conn_datas.state { if let WS2PConnectionState::TryToSendConnectMsg = self.conn_datas.state {
self.conn_datas.state = WS2PConnectionState::WaitingConnectMess; self.conn_datas.state = WS2PConnectionState::WaitingConnectMsg;
self.send_new_conn_state_to_service(); self.send_new_conn_state_to_service();
} }
// Log // Log
...@@ -168,9 +172,8 @@ impl Handler for Ws2pConnectionHandler { ...@@ -168,9 +172,8 @@ impl Handler for Ws2pConnectionHandler {
"Send CONNECT message to {}", "Send CONNECT message to {}",
print_opt_addr(handshake.peer_addr) print_opt_addr(handshake.peer_addr)
); );
#[cfg(test)] debug!(
println!( "Succesfully send CONNECT message to {}",
"TESTS: Succesfully send CONNECT message to {}",
print_opt_addr(handshake.peer_addr) print_opt_addr(handshake.peer_addr)
); );
} }
...@@ -181,9 +184,8 @@ impl Handler for Ws2pConnectionHandler { ...@@ -181,9 +184,8 @@ impl Handler for Ws2pConnectionHandler {
print_opt_addr(handshake.peer_addr), print_opt_addr(handshake.peer_addr),
e e
); );
#[cfg(test)] debug!(
println!( "Fail send CONNECT message to {}",
"TESTS: Fail send CONNECT message to {}",
print_opt_addr(handshake.peer_addr) print_opt_addr(handshake.peer_addr)
); );
let _ = self let _ = self
...@@ -192,25 +194,10 @@ impl Handler for Ws2pConnectionHandler { ...@@ -192,25 +194,10 @@ impl Handler for Ws2pConnectionHandler {
.close_with_reason(CloseCode::Error, "Fail to send CONNECT message !"); .close_with_reason(CloseCode::Error, "Fail to send CONNECT message !");
} }
} }
/*
// Send ws::Sender to WS2PConductor
let result = self
.conductor_sender
.send(WS2PThreadSignal::WS2PConnectionMessage(
WS2PConnectionMessage(
self.conn_datas.node_full_id(),
WS2PConnectionMessagePayload::WebsocketOk(WsSender(self.ws.clone())),
),
));
// If WS2PConductor is unrechable, close connection.
if result.is_err() {
debug!("Close ws2p connection because ws2p main thread is unrechable !");
self.ws.close(CloseCode::Normal)
} else {
// Send CONNECT Message
self.ws.send(self.connect_message.clone())
}*/
Ok(()) Ok(())
} else {
fatal_error!("Dev error: Fail to sign own connect message !");
}
} }
// `on_message` is roughly equivalent to the Handler closure. It takes a `Message` // `on_message` is roughly equivalent to the Handler closure. It takes a `Message`
...@@ -243,7 +230,7 @@ impl Handler for Ws2pConnectionHandler { ...@@ -243,7 +230,7 @@ impl Handler for Ws2pConnectionHandler {
self.conn_datas.last_mess_time = SystemTime::now(); self.conn_datas.last_mess_time = SystemTime::now();
if msg.is_binary() { if msg.is_binary() {
println!("DEBUG: Receive new message there is not a spam !"); debug!("Receive new message there is not a spam !");
match WS2PMessage::parse_and_check_bin_message(&msg.into_data()) { match WS2PMessage::parse_and_check_bin_message(&msg.into_data()) {
Ok(valid_msg) => match valid_msg { Ok(valid_msg) => match valid_msg {
WS2PMessage::V2(msg_v2) => { WS2PMessage::V2(msg_v2) => {
...@@ -260,9 +247,17 @@ impl Handler for Ws2pConnectionHandler { ...@@ -260,9 +247,17 @@ impl Handler for Ws2pConnectionHandler {
connect_msg, connect_msg,
); );
} }
WS2Pv2MessagePayload::Ack(_) => {} WS2Pv2MessagePayload::Ack {
challenge: ack_msg_challenge,
} => {
// Process ack message
ack_msg::process_ws2p_v2_ack_msg(self, ack_msg_challenge);
}
WS2Pv2MessagePayload::SecretFlags(_) => {} WS2Pv2MessagePayload::SecretFlags(_) => {}
WS2Pv2MessagePayload::Ok(_) => {} WS2Pv2MessagePayload::Ok(_) => {
// Process ok message
ok_msg::process_ws2p_v2_ok_msg(self);
}
WS2Pv2MessagePayload::Ko(_) => {} WS2Pv2MessagePayload::Ko(_) => {}
_ => { _ => {
if let WS2PConnectionState::Established = self.conn_datas.state { if let WS2PConnectionState::Established = self.conn_datas.state {
...@@ -278,7 +273,7 @@ impl Handler for Ws2pConnectionHandler { ...@@ -278,7 +273,7 @@ impl Handler for Ws2pConnectionHandler {
} }
}, },
Err(ws2p_msg_err) => { Err(ws2p_msg_err) => {
println!("DEBUG: Message is invalid : {:?}", ws2p_msg_err); warn!("Message is invalid : {:?}", ws2p_msg_err);
self.count_invalid_msgs += 1; self.count_invalid_msgs += 1;
if self.count_invalid_msgs >= *WS2P_INVALID_MSGS_LIMIT { if self.count_invalid_msgs >= *WS2P_INVALID_MSGS_LIMIT {
let _ = self.ws.0.close_with_reason( let _ = self.ws.0.close_with_reason(
......
// Copyright (C) 2018 The Durs Project Developers.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Process WS2P OK message.
use crate::controllers::handler::*;
use crate::controllers::*;
use ws::CloseCode;
/// Process WS2pv2 OK Message
pub fn process_ws2p_v2_ok_msg(handler: &mut Ws2pConnectionHandler) {
debug!("Receive OK message !");
match handler.conn_datas.state {
WS2PConnectionState::ConnectMessOk => {
handler.update_status(WS2PConnectionState::OkMsgOkWaitingAckMsg);
}
WS2PConnectionState::AckMsgOk => {
handler.update_status(WS2PConnectionState::Established);
}
_ => {
let _ = handler
.ws
.0
.close_with_reason(CloseCode::Invalid, "Unexpected OK message !");
}
}
}
...@@ -39,11 +39,10 @@ pub fn listen_on_ws2p_v2_endpoint( ...@@ -39,11 +39,10 @@ pub fn listen_on_ws2p_v2_endpoint(
// Log // Log
info!("Listen on {} ...", ws_url); info!("Listen on {} ...", ws_url);
println!("DEBUG: call function listen({}) ...", ws_url);
// Connect to websocket // Connect to websocket
listen(ws_url, move |ws| { listen(ws_url, move |ws| {
println!("DEBUG: Listen on host:port..."); info!("Listen on host:port...");
DeflateBuilder::new().build( DeflateBuilder::new().build(
Ws2pConnectionHandler::try_new( Ws2pConnectionHandler::try_new(
WsSender(ws), WsSender(ws),
......
...@@ -60,11 +60,11 @@ pub enum WS2PConnectionState { ...@@ -60,11 +60,11 @@ pub enum WS2PConnectionState {
/// Websocket error /// Websocket error
WSError, WSError,
/// Try to send connect message /// Try to send connect message
TryToSendConnectMess, TryToSendConnectMsg,
/// Endpoint unreachable /// Endpoint unreachable
Unreachable, Unreachable,
/// Waiting connect message /// Waiting connect message
WaitingConnectMess, WaitingConnectMsg,
/// No response /// No response
NoResponse, NoResponse,
/// Negociation timeout /// Negociation timeout
...@@ -72,14 +72,14 @@ pub enum WS2PConnectionState { ...@@ -72,14 +72,14 @@ pub enum WS2PConnectionState {
/// Receive valid connect message /// Receive valid connect message
ConnectMessOk, ConnectMessOk,
/// Receive valid OK message but wait ACK message /// Receive valid OK message but wait ACK message
OkMessOkWaitingAckMess, OkMsgOkWaitingAckMsg,
/// Receive valid ACK message /// Receive valid ACK message
AckMessOk, AckMsgOk,
/// Connection denial (maybe due to many different reasons : receive wrong message, wrong format, wrong signature, etc) /// Connection denial (maybe due to many different reasons : receive wrong message, wrong format, wrong signature, etc)
Denial, Denial,
/// Connection closed /// Connection closed
Close, Close,
/// Connection succesfully established /// Connection successfully established
Established, Established,
} }
......
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
//! WS2P outgoing connections controllers. //! WS2P outgoing connections controllers.
use dubp_documents::CurrencyName;
//use durs_module::ModuleReqId;
use crate::controllers::handler::Ws2pConnectionHandler; use crate::controllers::handler::Ws2pConnectionHandler;
use crate::controllers::*; use crate::controllers::*;
use crate::services::*; use crate::services::*;
use dubp_documents::CurrencyName;
use durs_common_tools::fatal_error;
use durs_network_documents::network_endpoint::EndpointEnum; use durs_network_documents::network_endpoint::EndpointEnum;
use durs_network_documents::NodeFullId; use durs_network_documents::NodeFullId;
use ws::connect; use ws::connect;
...@@ -47,19 +47,18 @@ pub fn connect_to_ws2p_v2_endpoint( ...@@ -47,19 +47,18 @@ pub fn connect_to_ws2p_v2_endpoint(
// Log // Log
info!("Try connection to {} ...", ws_url); info!("Try connection to {} ...", ws_url);
println!("DEBUG: Try connection to {} ...", ws_url);
// Connect to websocket // Connect to websocket
connect(ws_url, move |ws| { connect(ws_url, move |ws| {
DeflateBuilder::new().build( match Ws2pConnectionHandler::try_new(
Ws2pConnectionHandler::try_new(
WsSender(ws), WsSender(ws),
service_sender.clone(), service_sender.clone(),
currency.clone(), currency.clone(),
self_node.clone(), self_node.clone(),
conn_meta_datas.clone(), conn_meta_datas.clone(),
) ) {
.expect("WS2P Service unrechable"), Ok(handler) => DeflateBuilder::new().build(handler),
) Err(_e) => fatal_error!("WS2P Service unreachable"),
}
}) })
} }
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
use dubp_documents::CurrencyName; use dubp_documents::CurrencyName;
use dup_crypto::keys::KeyPair; use dup_crypto::keys::KeyPair;
use dup_crypto::keys::*; use dup_crypto::keys::*;
use durs_common_tests_tools::logger::init_logger_stdout;
use durs_network_documents::network_endpoint::*; use durs_network_documents::network_endpoint::*;
use durs_network_documents::*; use durs_network_documents::*;
use durs_ws2p::controllers::incoming_connections::*; use durs_ws2p::controllers::incoming_connections::*;
...@@ -36,15 +37,15 @@ pub fn currency() -> CurrencyName { ...@@ -36,15 +37,15 @@ pub fn currency() -> CurrencyName {
pub fn keypair1() -> ed25519::KeyPair { pub fn keypair1() -> ed25519::KeyPair {
ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate( ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate(
"JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV".as_bytes(), "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV1".as_bytes(),
"JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV_".as_bytes(), "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV1_".as_bytes(),
) )
} }
pub fn keypair2() -> ed25519::KeyPair { pub fn keypair2() -> ed25519::KeyPair {
ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate( ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate(
"JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV".as_bytes(), "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV2".as_bytes(),
"JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV_".as_bytes(), "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWdLkjrUhHV2_".as_bytes(),
) )
} }
...@@ -68,6 +69,11 @@ fn client_node() -> MySelfWs2pNode { ...@@ -68,6 +69,11 @@ fn client_node() -> MySelfWs2pNode {
#[test] #[test]
#[cfg(unix)] #[cfg(unix)]
fn test_connection_negociation() { fn test_connection_negociation() {
init_logger_stdout();
// ===== initialization =====
// client and server are initialized and launched in separate threads
let server_node = server_node(); let server_node = server_node();
let client_node = client_node(); let client_node = client_node();
...@@ -116,70 +122,81 @@ fn test_connection_negociation() { ...@@ -116,70 +122,81 @@ fn test_connection_negociation() {
) )
}); });
// Listen client service channel : we must receive controller sender // ===== opening connection =====
if let Ok(Ws2pServiceSender::ControllerSender(_)) = client_service_channel // we must get Ws2pServiceSender::ControllerSender from the client and server threads (but we ignore them)
.1 // we also test that the statuses match expected ones
.recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS))
{ let _client_controller = get_controller(&client_service_channel.1);
} else { let _server_controller = get_controller(&server_service_channel.1);
panic!("Not receive client controller sender");
} // TryToSendConnectMsg
let state = get_state(&client_service_channel.1); // client
// Listen client service channel : we must receive status TryToSendConnectMess assert!(state == WS2PConnectionState::TryToSendConnectMsg);
test_expected_states( let state = get_state(&server_service_channel.1); // server
&client_service_channel.1, assert!(state == WS2PConnectionState::TryToSendConnectMsg);
vec![WS2PConnectionState::TryToSendConnectMess],
); // WaitingConnectMsg
let state = get_state(&client_service_channel.1); // client
// Listen client service channel : we must receive status WaitingConnectMess assert!(state == WS2PConnectionState::WaitingConnectMsg);
test_expected_states( let state = get_state(&server_service_channel.1); // server
&client_service_channel.1, assert!(state == WS2PConnectionState::WaitingConnectMsg);
vec![WS2PConnectionState::WaitingConnectMess],
// ConnectMessOk
let state = get_state(&client_service_channel.1); // client
assert!(state == WS2PConnectionState::ConnectMessOk);
let state = get_state(&server_service_channel.1); // server
assert!(state == WS2PConnectionState::ConnectMessOk);
// Ack message
let state_1 = get_state(&client_service_channel.1); // client
let state_2 = get_state(&server_service_channel.1); // server
println!("state_1: {:?}", &state_1);
println!("state_2: {:?}", &state_2);
assert!(
// client faster
( state_1 == WS2PConnectionState::OkMsgOkWaitingAckMsg &&
state_2 == WS2PConnectionState::AckMsgOk ) ||
// server faster
( state_1 == WS2PConnectionState::AckMsgOk &&
state_2 == WS2PConnectionState::OkMsgOkWaitingAckMsg ) ||
// ack messages received at the same time
( state_1 == WS2PConnectionState::AckMsgOk &&
state_2 == WS2PConnectionState::AckMsgOk )
); );
// Listen server service channel : we must receive controller sender // Established
if let Ok(Ws2pServiceSender::ControllerSender(_)) = server_service_channel let state = get_state(&client_service_channel.1); // client
.1 assert!(state == WS2PConnectionState::Established);
.recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) let state = get_state(&server_service_channel.1); // server
{ assert!(state == WS2PConnectionState::Established);
} else {
panic!("Not receive server controller sender");
} }
// Listen server service channel : we must receive status TryToSendConnectMess // === functions used in above test ===
test_expected_states(
&server_service_channel.1,
vec![WS2PConnectionState::TryToSendConnectMess],
);
// Listen server service channel : we must receive status WaitingConnectMess // get the state in a receiver
test_expected_states( fn get_state(service_receiver: &mpsc::Receiver<Ws2pServiceSender>) -> WS2PConnectionState {
&server_service_channel.1,
vec![WS2PConnectionState::WaitingConnectMess],
);
// Listen server service channel : we must receive status ConnectMessOk
test_expected_states(
&server_service_channel.1,
vec![WS2PConnectionState::ConnectMessOk],
);
}
fn test_expected_states(
service_receiver: &mpsc::Receiver<Ws2pServiceSender>,
expected_states: Vec<WS2PConnectionState>,
) -> WS2PConnectionState {
if let Ws2pServiceSender::ChangeConnectionState(_, new_state) = service_receiver if let Ws2pServiceSender::ChangeConnectionState(_, new_state) = service_receiver
.recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) .recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS))
.expect("Receive nothing from controller :") .expect("Receive nothing from controller :")
{ {
for expected_state in expected_states {
if new_state == expected_state {
return new_state; return new_state;
} else {
panic!("Expect signal ChangeConnectionState, receive other !");
} }
} }
panic!("Receive unexpected state: {:?} !", new_state);
// get the controller from the thread
fn get_controller(
service_receiver: &mpsc::Receiver<Ws2pServiceSender>,
) -> mpsc::Sender<Ws2pControllerOrder> {
// we must receive controller sender
if let Ok(Ws2pServiceSender::ControllerSender(controller)) =
service_receiver.recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS))
{
return controller;
} else { } else {
panic!("Expect signal ChangeConnectionState, receive other !"); panic!("Not receive client controller sender");
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment