diff --git a/Cargo.lock b/Cargo.lock index 85b4ccabf77e056cce2667351c489ca4ddf57f8f..e3f1970cdce36ec79b13c0d001d9794398461677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,7 +783,6 @@ dependencies = [ "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "maplit 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -796,7 +795,6 @@ name = "durs-ws2p-messages" version = "0.3.0-dev" dependencies = [ "bincode 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "dubp-block-doc 0.1.0", "dubp-common-doc 0.1.0", "dubp-currency-params 0.2.0", @@ -806,10 +804,9 @@ dependencies = [ "durs-network-documents 0.4.0", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "pkstl 0.1.0", "pretty_assertions 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -830,7 +827,7 @@ dependencies = [ "durs-ws2p-messages 0.3.0-dev", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "maplit 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "pkstl 0.1.0", "pretty_assertions 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1087,17 +1084,6 @@ name = "libc" version = "0.2.62" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "libz-sys" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cc 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", - "pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", - "vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "log" version = "0.4.8" @@ -2270,8 +2256,6 @@ dependencies = [ "byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", - "libz-sys 1.0.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", "mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2366,7 +2350,6 @@ dependencies = [ "checksum lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" "checksum lazycell 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" "checksum libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)" = "34fcd2c08d2f832f376f4173a231990fa5aef4e99fb569867318a227ef4c06ba" -"checksum libz-sys 1.0.25 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb5e43362e38e2bca2fd5f5134c4d4564a23a5c28e9b95411652021a8675ebe" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" "checksum maplit 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "08cbb6b4fef96b6d77bfc40ec491b1690c779e77b05cd9f07f787ed376fd4c43" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" diff --git a/lib/crypto/src/keys/bin_signable.rs b/lib/crypto/src/keys/bin_signable.rs deleted file mode 100644 index 7ae96fb108f110564b45cc299a3610e5cfd2135d..0000000000000000000000000000000000000000 --- a/lib/crypto/src/keys/bin_signable.rs +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright (C) 2017-2019 The AXIOM TEAM Association. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -//! Generic code for signing data in binary format - -use super::*; -use serde::{Deserialize, Serialize}; - -/// Signatureable in binary format -pub trait BinSignable<'de>: Serialize + Deserialize<'de> { - /// Return entity issuer pubkey - fn issuer_pubkey(&self) -> PubKey; - /// Return signature - fn signature(&self) -> Option<Sig>; - /// Change signature - fn set_signature(&mut self, _signature: Sig); - /// Get binary datas without signature - fn get_bin_without_sig(&self) -> Result<Vec<u8>, failure::Error>; - /// Add signature to bin datas - fn add_sig_to_bin_datas(&self, bin_datas: &mut Vec<u8>); - /// Sign entity with a signator - fn sign(&mut self, signator: &SignatorEnum) -> Result<Vec<u8>, SignError> { - if self.signature().is_some() { - return Err(SignError::AlreadySign); - } - match self.issuer_pubkey() { - PubKey::Ed25519(_) => { - let mut bin_msg = self - .get_bin_without_sig() - .map_err(|e| SignError::SerdeError(e.to_string()))?; - let sig = signator.sign(&bin_msg); - self.set_signature(sig); - self.add_sig_to_bin_datas(&mut bin_msg); - Ok(bin_msg) - } - _ => Err(SignError::WrongAlgo), - } - } - /// Check signature of entity - fn verify(&self) -> Result<(), SigError> { - if let Some(signature) = self.signature() { - match self.issuer_pubkey() { - PubKey::Ed25519(pubkey) => match signature { - Sig::Ed25519(sig) => { - let signed_part: Vec<u8> = self - .get_bin_without_sig() - .map_err(|e| SigError::SerdeError(format!("{}", e)))?; - pubkey.verify(&signed_part, &sig) - /* - if pubkey.verify(&signed_part, &sig) { - Ok(()) - } else { - Err(SigError::InvalidSig()) - } - */ - } - _ => Err(SigError::NotSameAlgo), - }, - _ => Err(SigError::NotSameAlgo), - } - } else { - Err(SigError::NotSig) - } - } -} - -#[cfg(test)] -mod tests { - - use super::*; - use bincode; - - #[derive(Deserialize, Serialize)] - struct BinSignableTestImpl { - datas: Vec<u8>, - issuer: PubKey, - sig: Option<Sig>, - } - - impl BinSignable<'_> for BinSignableTestImpl { - #[inline] - fn add_sig_to_bin_datas(&self, bin_datas: &mut Vec<u8>) { - bin_datas - .extend_from_slice(&bincode::serialize(&self.sig).expect("Fail to binarize sig !")); - } - #[inline] - fn get_bin_without_sig(&self) -> Result<Vec<u8>, failure::Error> { - let mut bin_msg = bincode::serialize(&self)?; - let sig_size = bincode::serialized_size(&self.signature())?; - let bin_msg_len = bin_msg.len(); - bin_msg.truncate(bin_msg_len - (sig_size as usize)); - Ok(bin_msg) - } - fn issuer_pubkey(&self) -> PubKey { - self.issuer - } - fn signature(&self) -> Option<Sig> { - self.sig - } - fn set_signature(&mut self, new_signature: Sig) { - self.sig = Some(new_signature); - } - } - - #[test] - fn test_bin_signable() { - let key_pair = ed25519::KeyPairFromSeed32Generator::generate(Seed32::new([ - 0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, - 10, 11, 12, 13, 14, 15, - ])); - - let signator = SignatorEnum::Ed25519( - key_pair - .generate_signator() - .expect("fail to generate signator !"), - ); - - let mut bin_signable_datas = BinSignableTestImpl { - datas: vec![0, 1, 2, 3], - issuer: PubKey::Ed25519(key_pair.pubkey), - sig: None, - }; - - assert_eq!(Err(SigError::NotSig), bin_signable_datas.verify()); - - let _bin_msg = bin_signable_datas - .sign(&signator) - .expect("Fail to sign datas !"); - - assert_eq!( - Err(SignError::AlreadySign), - bin_signable_datas.sign(&signator) - ); - - assert_eq!(Ok(()), bin_signable_datas.verify()) - } -} diff --git a/lib/crypto/src/keys/mod.rs b/lib/crypto/src/keys/mod.rs index 34096c08ad3be06d9995edc648c10dee0b13dbcc..bcb527efad5146a482fb91690367ff8b8042b6a0 100644 --- a/lib/crypto/src/keys/mod.rs +++ b/lib/crypto/src/keys/mod.rs @@ -48,7 +48,6 @@ //! `ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/` //! with `=` as padding character. -pub mod bin_signable; pub mod ed25519; pub mod text_signable; diff --git a/lib/modules/blockchain/blockchain/src/lib.rs b/lib/modules/blockchain/blockchain/src/lib.rs index 91b00fdd1100745f7b6156f1feed2ed82a8c08e4..c5df482b203a67727b0ad9b3cd0a178d97f8d88f 100644 --- a/lib/modules/blockchain/blockchain/src/lib.rs +++ b/lib/modules/blockchain/blockchain/src/lib.rs @@ -270,7 +270,7 @@ impl BlockchainModule { } if let Some(_sync_opts) = sync_opts { - // TODO ... + // TODO HUGO... // take into account sync options } else { // Start main loop diff --git a/lib/modules/ws2p/ws2p-messages/Cargo.toml b/lib/modules/ws2p/ws2p-messages/Cargo.toml index c73692cfd1ad859b6513c2cf685aa4a3512e32fa..d932ab5efbe5a926c0a67cb18f8cbb1f70b739d2 100644 --- a/lib/modules/ws2p/ws2p-messages/Cargo.toml +++ b/lib/modules/ws2p/ws2p-messages/Cargo.toml @@ -10,8 +10,6 @@ edition = "2018" path = "lib.rs" [dependencies] -bincode = "1.0.*" -byteorder = "1.2.3" dubp-common-doc = { path = "../../../dubp/common-doc"} #, version = "0.1.0" } dubp-block-doc = { path = "../../../dubp/block-doc"} #, version = "0.1.0" } dubp-currency-params = { path = "../../../dubp/currency-params" } @@ -21,11 +19,11 @@ durs-common-tools = { path = "../../../tools/common-tools" } dup-crypto = { path = "../../../crypto" } failure = "0.1.5" log = "0.4.*" -serde = "1.0.*" -serde_derive = "1.0.*" -serde_json = "1.0.*" +pkstl = { path = "../../../tools/pkstl", features = ["bin","cbor"] } +serde = { version = "1.0.*", features = ["derive"] } [dev-dependencies] +bincode = "1.0.1" pretty_assertions = "0.5.1" -[features] \ No newline at end of file +[features] diff --git a/lib/modules/ws2p/ws2p-messages/lib.rs b/lib/modules/ws2p/ws2p-messages/lib.rs index 206f8b24e1428995a715d78ddd8954c593286562..4d474a9180cd5e275e480b4219d829cc13608475 100644 --- a/lib/modules/ws2p/ws2p-messages/lib.rs +++ b/lib/modules/ws2p/ws2p-messages/lib.rs @@ -31,19 +31,13 @@ #[macro_use] extern crate pretty_assertions;*/ -#[macro_use] -extern crate serde_derive; -#[macro_use] -extern crate log; - /// WS2Pv2 Messages pub mod v2; use crate::v2::WS2Pv2Message; -use dup_crypto::hashs::Hash; -use dup_crypto::keys::bin_signable::BinSignable; use dup_crypto::keys::*; -use durs_common_tools::fatal_error; +use pkstl::{IncomingMessage, SecureLayer}; +use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] /// WS2Pv2Message @@ -59,88 +53,68 @@ pub enum WS2PMessage { /// Enumerate errors can happen when parsing and checking messages #[derive(Debug)] pub enum WS2PMessageError { - /// Error at deserialization - DeserError(bincode::Error), /// Invalid hash InvalidHash, + /// Secure transport layer error + SecureLayerError(pkstl::Error), /// Invalid signature SigError(SigError), + /// Unexpected empty message + UnexpectedEmpty, } -impl From<bincode::Error> for WS2PMessageError { - fn from(e: bincode::Error) -> Self { - WS2PMessageError::DeserError(e) +impl From<pkstl::Error> for WS2PMessageError { + fn from(e: pkstl::Error) -> Self { + WS2PMessageError::SecureLayerError(e) } } impl WS2PMessage { - /// Get signature length - pub fn sig_len(&self) -> usize { - match self.signature() { - None => 1, // 1 byte: option - Some(Sig::Ed25519(_)) => 69, // 1 byte: option, 4 bytes: algorithm, 64 bytes: signature - Some(Sig::Schnorr()) => fatal_error!("Schnorr algo not yet implemented !"), - } - } - - /// Get message hash - pub fn hash(&self, bin_msg: &[u8]) -> Hash { - match *self { - WS2PMessage::V2(_) => Hash::compute(&bin_msg[0..(bin_msg.len() - self.sig_len())]), - WS2PMessage::_V0 | WS2PMessage::_V1 => { - fatal_error!("Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !") - } - } - } - /// Parse and check bin message - pub fn parse_and_check_bin_message(bin_msg: &[u8]) -> Result<WS2PMessage, WS2PMessageError> { - let msg: WS2PMessage = bincode::deserialize(&bin_msg)?; - match msg.verify() { - Ok(()) => Ok(msg), - Err(e) => Err(WS2PMessageError::SigError(e)), - } - } -} - -impl<'de> BinSignable<'de> for WS2PMessage { - #[inline] - fn add_sig_to_bin_datas(&self, bin_datas: &mut Vec<u8>) { - bin_datas.extend_from_slice( - &bincode::serialize(&self.signature()).expect("Fail to binarize sig !"), - ); - } - #[inline] - fn get_bin_without_sig(&self) -> Result<Vec<u8>, failure::Error> { - let mut bin_msg = bincode::serialize(&self)?; - let sig_size = bincode::serialized_size(&self.signature())?; - let bin_msg_len = bin_msg.len(); - bin_msg.truncate(bin_msg_len - (sig_size as usize)); - Ok(bin_msg) - } - fn issuer_pubkey(&self) -> PubKey { - match *self { - WS2PMessage::V2(ref msg_v2) => msg_v2.issuer_pubkey, - WS2PMessage::_V0 | WS2PMessage::_V1 => { - fatal_error!("Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !") - } - } - } - fn signature(&self) -> Option<Sig> { - match *self { - WS2PMessage::V2(ref msg_v2) => msg_v2.signature, - WS2PMessage::_V0 | WS2PMessage::_V1 => { - fatal_error!("Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !") - } - } - } - fn set_signature(&mut self, signature: Sig) { - match *self { - WS2PMessage::V2(ref mut msg_v2) => msg_v2.signature = Some(signature), - WS2PMessage::_V0 | WS2PMessage::_V1 => { - fatal_error!("Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !") + pub fn parse_and_check_incoming_bin_message( + secure_transport_layer: &mut SecureLayer, + bin_msg: &[u8], + ) -> Result<(Vec<WS2PMessage>, Option<PubKey>), WS2PMessageError> { + let incoming_msgs = secure_transport_layer.read::<WS2PMessage>(bin_msg)?; + + let mut incoming_ws2p_msgs = Vec::with_capacity(incoming_msgs.len()); + let mut remote_pubkey = None; + for incoming_msg in incoming_msgs { + match incoming_msg { + IncomingMessage::Connect { + custom_datas: ws2p_msg_opt, + peer_sig_public_key, + } => { + if let Some(ws2p_msg) = ws2p_msg_opt { + let mut pubkey_buffer = [0u8; 32]; + pubkey_buffer.copy_from_slice(&peer_sig_public_key[..]); + remote_pubkey = Some(PubKey::Ed25519(ed25519::PublicKey(pubkey_buffer))); + incoming_ws2p_msgs.push(ws2p_msg); + } else { + return Err(WS2PMessageError::UnexpectedEmpty); + } + } + IncomingMessage::Ack { + custom_datas: ws2p_msg_opt, + } => { + if let Some(ws2p_msg) = ws2p_msg_opt { + incoming_ws2p_msgs.push(ws2p_msg); + } else { + return Err(WS2PMessageError::UnexpectedEmpty); + } + } + IncomingMessage::Message { + datas: ws2p_msg_opt, + } => { + if let Some(ws2p_msg) = ws2p_msg_opt { + incoming_ws2p_msgs.push(ws2p_msg); + } else { + return Err(WS2PMessageError::UnexpectedEmpty); + } + } } } + Ok((incoming_ws2p_msgs, remote_pubkey)) } } @@ -154,7 +128,6 @@ mod tests { use dubp_common_doc::{BlockNumber, Blockstamp}; use dubp_currency_params::CurrencyName; use dubp_user_docs::documents::certification::*; - use dup_crypto::keys::bin_signable::BinSignable; use dup_crypto::keys::*; use durs_network_documents::network_endpoint::*; use durs_network_documents::network_peer::*; @@ -210,42 +183,22 @@ mod tests { } pub fn test_ws2p_message(payload: WS2Pv2MessagePayload) { - let keypair1 = keypair1(); - let signator = - SignatorEnum::Ed25519(keypair1.generate_signator().expect("fail to gen signator")); - let mut ws2p_message = WS2PMessage::V2(WS2Pv2Message { + let ws2p_message = WS2PMessage::V2(WS2Pv2Message { currency_name: CurrencyName(String::from("g1")), issuer_node_id: NodeId(0), - issuer_pubkey: PubKey::Ed25519(keypair1.public_key()), payload, - signature: None, }); - let sign_result = ws2p_message.sign(&signator); - if let Ok(bin_msg) = sign_result { - // Test binarization - assert_eq!( - serialize(&ws2p_message).expect("Fail to serialize WS2Pv2Message !"), - bin_msg - ); - // Test sign - ws2p_message - .verify() - .expect("WS2Pv2Message : Invalid signature !"); - // Test debinarization - let debinarization_result: Result<WS2PMessage, bincode::Error> = deserialize(&bin_msg); - if let Ok(ws2p_message2) = debinarization_result { - assert_eq!(ws2p_message, ws2p_message2); - } else { - panic!( - "Fail to debinarize ws2p_message : {:?}", - debinarization_result.err().unwrap() - ); - } + // Test binarization and debinarization + let bin_msg = serialize(&ws2p_message).expect("Fail to serialize WS2Pv2Message !"); + // Test debinarization + let debinarization_result: Result<WS2PMessage, bincode::Error> = deserialize(&bin_msg); + if let Ok(ws2p_message2) = debinarization_result { + assert_eq!(ws2p_message, ws2p_message2); } else { panic!( - "Fail to sign ws2p_message : {:?}", - sign_result.err().unwrap() + "Fail to debinarize ws2p_message : {:?}", + debinarization_result.err().unwrap() ); } } diff --git a/lib/modules/ws2p/ws2p-messages/v2/api_features.rs b/lib/modules/ws2p/ws2p-messages/v2/api_features.rs index e37ad4d12d9a34f73da9a5a01b24dc7d8f5cdc27..36a341c8bbbc9c66171b938d4d03ad246256425e 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/api_features.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/api_features.rs @@ -13,6 +13,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. +use serde::{Deserialize, Serialize}; + #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] /// WS2PFeatures pub struct WS2PFeatures(pub [u8; 4]); diff --git a/lib/modules/ws2p/ws2p-messages/v2/connect.rs b/lib/modules/ws2p/ws2p-messages/v2/connect.rs index 2a0c826925027718ac667b8da492f2a93dc6fd71..1dd082de24030412a95e06fc7f7b6db770a69a71 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/connect.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/connect.rs @@ -18,6 +18,7 @@ use super::api_features::WS2PFeatures; use dubp_common_doc::blockstamp::Blockstamp; use dup_crypto::hashs::Hash; use durs_network_documents::network_peer::PeerCardV11; +use serde::{Deserialize, Serialize}; /// WS2P v2 connect message min size pub static CONNECT_MSG_MIN_SIZE: &usize = &36; @@ -31,7 +32,7 @@ pub enum WS2Pv2ConnectType { OutgoingClient, /// Server outgoing connection OutgoingServer, - /// Sync connection (from blockstamp, or from genesis block if blockstamp is none) + /// Sync outgoing connection (from blockstamp, or from genesis block if blockstamp is none) Sync { /// block from which the sync should start from_blockstamp: Option<Blockstamp>, @@ -105,6 +106,7 @@ mod tests { use crate::tests::*; use dubp_common_doc::Blockstamp; use dup_crypto::keys::text_signable::TextSignable; + use dup_crypto::keys::*; #[test] fn test_ws2p_message_connect() { diff --git a/lib/modules/ws2p/ws2p-messages/v2/mod.rs b/lib/modules/ws2p/ws2p-messages/v2/mod.rs index c848790e10a8278a8b50ab57600901582250f3c8..8f89374dcde7a66298f0c241aa4251d74bfc3b21 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/mod.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/mod.rs @@ -33,9 +33,9 @@ pub mod sync_info; use crate::v2::payload_container::*; use crate::WS2PMessage; use dubp_currency_params::CurrencyName; -use dup_crypto::keys::bin_signable::BinSignable; -use dup_crypto::keys::*; use durs_network_documents::NodeId; +use serde::{Deserialize, Serialize}; +use std::io::BufWriter; /// WS2P v2 message metadata size pub static WS2P_V2_MESSAGE_METADATA_SIZE: &usize = &144; @@ -47,12 +47,8 @@ pub struct WS2Pv2Message { pub currency_name: CurrencyName, /// Issuer NodeId pub issuer_node_id: NodeId, - /// Issuer plublic key - pub issuer_pubkey: PubKey, /// Message payload pub payload: WS2Pv2MessagePayload, - /// Signature - pub signature: Option<Sig>, } impl WS2Pv2Message { @@ -61,21 +57,48 @@ impl WS2Pv2Message { /// Encapsulate message payload pub fn encapsulate_payload( + sl: &mut pkstl::SecureLayer, currency_name: CurrencyName, issuer_node_id: NodeId, - issuer_signator: &SignatorEnum, payload: WS2Pv2MessagePayload, - ) -> Result<(WS2PMessage, Vec<u8>), SignError> { - let mut msg = WS2PMessage::V2(WS2Pv2Message { - currency_name, - issuer_node_id, - issuer_pubkey: issuer_signator.public_key(), - payload, - signature: None, - }); - match msg.sign(issuer_signator) { - Ok(bin_msg) => Ok((msg, bin_msg)), - Err(e) => Err(e), + ) -> Result<Vec<u8>, pkstl::Error> { + match payload { + WS2Pv2MessagePayload::Connect(_) => { + let msg = WS2PMessage::V2(WS2Pv2Message { + currency_name, + issuer_node_id, + payload, + }); + let mut buffer = BufWriter::new(Vec::with_capacity(1_024)); + sl.write_connect_msg(Some(&msg), &mut buffer)?; + Ok(buffer + .into_inner() + .expect("Vec buffer must be always flushed")) + } + WS2Pv2MessagePayload::Ack { .. } => { + let msg = WS2PMessage::V2(WS2Pv2Message { + currency_name, + issuer_node_id, + payload, + }); + let mut buffer = BufWriter::new(Vec::with_capacity(1_024)); + sl.write_ack_msg(Some(&msg), &mut buffer)?; + Ok(buffer + .into_inner() + .expect("Vec buffer must be always flushed")) + } + _ => { + let msg = WS2PMessage::V2(WS2Pv2Message { + currency_name, + issuer_node_id, + payload, + }); + let mut buffer = BufWriter::new(Vec::with_capacity(1_024)); + sl.write(&msg, &mut buffer)?; + Ok(buffer + .into_inner() + .expect("Vec buffer must be always flushed")) + } } } } @@ -86,6 +109,7 @@ mod tests { use crate::tests::*; use dup_crypto::hashs::Hash; use dup_crypto::keys::text_signable::TextSignable; + use dup_crypto::keys::*; #[test] fn test_ws2p_message_ack() { diff --git a/lib/modules/ws2p/ws2p-messages/v2/ok.rs b/lib/modules/ws2p/ws2p-messages/v2/ok.rs index 6ec077f19d98697b85b49773d03bf2430afedcd0..8d3b48343595951ec903d63293bc92106a4c427f 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/ok.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/ok.rs @@ -13,6 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. +use serde::{Deserialize, Serialize}; use std::num::NonZeroU16; #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] diff --git a/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs b/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs index 5bfe5eac3a0f8067e6075df2f50276ec3962fa45..8524307d45374f1c432b8de04b94e147bc05f92b 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/payload_container.rs @@ -29,6 +29,7 @@ use dup_crypto::hashs::Hash; use durs_network_documents::network_head_v2::NetworkHeadV2; use durs_network_documents::network_head_v3::NetworkHeadV3; use durs_network_documents::network_peer::PeerCardV11; +use serde::{Deserialize, Serialize}; /// WS2P v2 message payload metadata size pub static WS2P_V2_MESSAGE_PAYLOAD_METADATA_SIZE: &usize = &8; diff --git a/lib/modules/ws2p/ws2p-messages/v2/req_responses.rs b/lib/modules/ws2p/ws2p-messages/v2/req_responses.rs index 3bdfd5753c198ed107a2c31af179cd29829d2ca8..f57d1a18542ee31c9ff7729d893701acab2a187a 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/req_responses.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/req_responses.rs @@ -19,6 +19,7 @@ use dubp_user_docs::documents::certification::CompactCertificationDocumentV10; use dubp_user_docs::documents::identity::v10::CompactIdentityDocumentV10; use dubp_user_docs::documents::membership::v10::CompactPoolMembershipDoc; use dup_crypto::hashs::Hash; +use serde::{Deserialize, Serialize}; use std::str; /// WS2Pv2 request response diff --git a/lib/modules/ws2p/ws2p-messages/v2/requests.rs b/lib/modules/ws2p/ws2p-messages/v2/requests.rs index f28f85037e68105f29ed294b3774e577dbcc23e3..7bc3f32b2e5b4da62eb5c72173c6ba7c063e11f7 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/requests.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/requests.rs @@ -15,6 +15,7 @@ use dubp_common_doc::blockstamp::Blockstamp; use dubp_common_doc::BlockNumber; +use serde::{Deserialize, Serialize}; /// WS2Pv2Request #[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs b/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs index ae967aa795bc5971827eef00f8c533fd063e4202..3849b980120d57295be5fd2f6f4e370a33bf0d56 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs @@ -14,6 +14,7 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use dup_crypto::keys::*; +use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] /// WS2Pv2SecretFlags diff --git a/lib/modules/ws2p/ws2p-messages/v2/sync_info.rs b/lib/modules/ws2p/ws2p-messages/v2/sync_info.rs index d6f734c3dad552a6d7f1aec2d6cadafc1ab03b86..c35e031b79c90211fa4ac03c4dae32dc1bf3fa9b 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/sync_info.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/sync_info.rs @@ -17,6 +17,7 @@ use dubp_common_doc::BlockHash; use dubp_common_doc::Blockstamp; use durs_network_documents::network_peer::PeerCard; +use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] // Copy /// WS2Pv2SyncInfo diff --git a/lib/modules/ws2p/ws2p-protocol/Cargo.toml b/lib/modules/ws2p/ws2p-protocol/Cargo.toml index a9a75c4eec971959cfd8cd9b46fe24024e9efb16..bf63736d357b8342743205df5be583d0a6629ae9 100644 --- a/lib/modules/ws2p/ws2p-protocol/Cargo.toml +++ b/lib/modules/ws2p/ws2p-protocol/Cargo.toml @@ -25,10 +25,10 @@ durs-message= { path = "../../../core/message" } durs-network= { path = "../../../core/network" } failure = "0.1.5" log = "0.4.*" +pkstl = { path = "../../../tools/pkstl", features = ["bin","cbor"] } serde = "1.0.*" serde_derive = "1.0.*" unwrap = "1.2.1" -maplit = "1.0.1" [dev-dependencies] pretty_assertions = "0.5.1" diff --git a/lib/modules/ws2p/ws2p-protocol/src/constants.rs b/lib/modules/ws2p/ws2p-protocol/src/constants.rs index d5d95e25179252ffdafaf8dd132fdaa535dbf54a..5cccc702dae1405b2ccdcda8bd7e948a0e3c7d36 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/constants.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/constants.rs @@ -31,22 +31,22 @@ pub static WS2P_SPAM_LIMIT: &usize = &6; pub static WS2P_SPAM_SLEEP_TIME_IN_SEC: &u64 = &100; /// Number of invalid messages tolerated -pub static WS2P_INVALID_MSGS_LIMIT: &'static usize = &5; +pub static WS2P_INVALID_MSGS_LIMIT: &usize = &5; /// Number of simultaneous outgoing connection -pub static WS2P_DEFAULT_OUTCOMING_QUOTA: &'static usize = &10; +pub static WS2P_DEFAULT_OUTCOMING_QUOTA: &usize = &10; -/*pub static WS2P_OUTCOMING_INTERVAL_AT_STARTUP: &'static u64 = &75; -pub static WS2P_OUTCOMING_INTERVAL: &'static u64 = &300;*/ +/*pub static WS2P_OUTCOMING_INTERVAL_AT_STARTUP: &u64 = &75; +pub static WS2P_OUTCOMING_INTERVAL: &u64 = &300;*/ /// Timeout before receiving message from service -pub static WS2P_RECV_SERVICE_FREQ_IN_MS: &'static u64 = &1_000; +pub static WS2P_RECV_SERVICE_FREQ_IN_MS: &u64 = &1_000; /* -pub static WS2P_REQUEST_TIMEOUT: &'static u64 = &30_000; -pub static DURATION_BEFORE_RECORDING_ENDPOINT: &'static u64 = &180; -pub static BLOCKS_REQUEST_INTERVAL: &'static u64 = &60; -pub static PENDING_IDENTITIES_REQUEST_INTERVAL: &'static u64 = &40; +pub static WS2P_REQUEST_TIMEOUT: &u64 = &30_000; +pub static DURATION_BEFORE_RECORDING_ENDPOINT: &u64 = &180; +pub static BLOCKS_REQUEST_INTERVAL: &u64 = &60; +pub static PENDING_IDENTITIES_REQUEST_INTERVAL: &u64 = &40; */ /// Chunk size (in blocks), can differ with blockchain chunk size -pub static CHUNK_SIZE: &'static u32 = &500; +pub static CHUNK_SIZE: &u32 = &500; diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/mod.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/mod.rs index d222eb24d6a81df0bf9055b6020df0ff6e7387e3..ee7c5d1e5b2192e3f2e7fa367651e9e93f2e9613 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/mod.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/mod.rs @@ -25,12 +25,13 @@ use crate::connection_state::WS2PConnectionState; use crate::constants; use crate::orchestrator::OrchestratorMsg; use crate::websocket::{WebsocketAction, WebsocketIncomingEvent}; +use dup_crypto::keys::*; use durs_module::ModuleMessage; use durs_network_documents::NodeFullId; use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; use durs_ws2p_messages::WS2PMessage; use failure::Fail; -use std::sync::mpsc::{Receiver, SendError, Sender}; +use std::sync::mpsc::{Receiver, Sender}; use std::time::SystemTime; use unwrap::unwrap; @@ -66,7 +67,7 @@ impl WS2PControllerId { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] /// Event transmitted to the orchestrator pub enum WS2PControllerEvent { /// New connection established @@ -75,6 +76,8 @@ pub enum WS2PControllerEvent { conn_type: WS2Pv2ConnectType, /// Remote node full id remote_full_id: NodeFullId, + /// Connection secure layer + secure_layer: pkstl::SecureLayer, }, /// Connection state change StateChange { @@ -89,8 +92,8 @@ pub enum WS2PControllerEvent { }, } -#[derive(Debug)] /// WS2P Controller +#[derive(Debug)] pub struct WS2PController<M: ModuleMessage> { /// Controller id pub id: WS2PControllerId, @@ -100,14 +103,28 @@ pub struct WS2PController<M: ModuleMessage> { pub meta_datas: WS2PControllerMetaDatas, /// Controller receiver pub receiver: Receiver<WebsocketActionOrder>, + /// Connection Security Layer + pub secure_layer: pkstl::SecureLayer, } -#[derive(Copy, Clone, Debug, Fail)] +#[derive(Debug, Fail)] /// WS2P Controller process error pub enum WS2PControllerProcessError { /// Orchestrator unreacheable #[fail(display = "WS2P Orchestrator unreachable")] OrchestratorUnreacheable, + /// Receive connect message without public key + #[fail(display = "Receive connect message without public key")] + ReceiveConnectMsgWithoutPubKey, + /// Security error + #[fail(display = "Security error: {:?}", _0)] + SecurityError(pkstl::Error), +} + +impl From<pkstl::Error> for WS2PControllerProcessError { + fn from(e: pkstl::Error) -> Self { + WS2PControllerProcessError::SecurityError(e) + } } /// Websocket action order @@ -177,16 +194,35 @@ impl<M: ModuleMessage> WS2PController<M> { id: WS2PControllerId, meta_datas: WS2PControllerMetaDatas, orchestrator_sender: Sender<OrchestratorMsg<M>>, - ) -> Result<WS2PController<M>, SendError<OrchestratorMsg<M>>> { + ) -> Result<WS2PController<M>, WS2PControllerProcessError> { let (sender, receiver) = std::sync::mpsc::channel(); - orchestrator_sender.send(OrchestratorMsg::ControllerSender { id, sender })?; + orchestrator_sender + .send(OrchestratorMsg::ControllerSender { id, sender }) + .map_err(|_| WS2PControllerProcessError::OrchestratorUnreacheable)?; + + let mut my_key_pair_seed_buffer = [0u8; 32]; + my_key_pair_seed_buffer.copy_from_slice(meta_datas.local_node.my_key_pair.seed().as_ref()); + + let secure_layer = pkstl::SecureLayer::create( + pkstl::SecureLayerConfig { + message_format: pkstl::MessageFormat::Cbor, + ..pkstl::SecureLayerConfig::default() + }, + Some(pkstl::Seed32::new(my_key_pair_seed_buffer)), + if let Some(ref remote_node) = meta_datas.remote_node { + Some(remote_node.remote_full_id.1.to_bytes_vector()) + } else { + None + }, + )?; Ok(WS2PController { id, meta_datas, orchestrator_sender, receiver, + secure_layer, }) } @@ -205,7 +241,7 @@ impl<M: ModuleMessage> WS2PController<M> { pub fn process( &mut self, event: WebsocketIncomingEvent, - ) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + ) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { match event { WebsocketIncomingEvent::OnOpen { remote_addr } => on_open::process(self, remote_addr), WebsocketIncomingEvent::OnMessage { msg } => on_message::process(self, msg), @@ -222,7 +258,7 @@ impl<M: ModuleMessage> WS2PController<M> { reason.unwrap_or_else(|| "".to_owned()) ); self.update_conn_state(WS2PConnectionState::Close)?; - Ok(None) + Ok(vec![]) } } } diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs index e89760f790492b20c153279aab64ceb0cbb43b1d..7f619e2be59911b9ef4c93282e82cd731f3167db 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs @@ -38,7 +38,7 @@ use std::time::{Duration, SystemTime}; pub fn process<M: ModuleMessage>( controller: &mut WS2PController<M>, msg: WebsocketMessage, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { // Update last_mess_time controller.meta_datas.last_mess_time = SystemTime::now(); @@ -64,67 +64,90 @@ pub fn process<M: ModuleMessage>( *constants::WS2P_SPAM_SLEEP_TIME_IN_SEC, )); controller.meta_datas.last_mess_time = SystemTime::now(); - return Ok(None); + return Ok(vec![]); } if let WebsocketMessage::Bin(bin_msg) = msg { log::debug!("Receive new bin message there is not a spam !"); - match WS2PMessage::parse_and_check_bin_message(&bin_msg) { - Ok(valid_msg) => match valid_msg { - WS2PMessage::V2(ref msg_v2) => { - match msg_v2.payload { - WS2Pv2MessagePayload::Connect(ref box_connect_msg) => { - let connect_msg = box_connect_msg.deref(); - // Get remote node id - let remote_full_id = - NodeFullId(msg_v2.issuer_node_id, msg_v2.issuer_pubkey); - // Process connect message - connect_msg::process_ws2p_v2p_connect_msg( - controller, - remote_full_id, - connect_msg, - ) - } - WS2Pv2MessagePayload::Ack { - challenge: ack_msg_challenge, - } => { - // Process ack message - ack_msg::process_ws2p_v2p_ack_msg(controller, ack_msg_challenge) - } - WS2Pv2MessagePayload::SecretFlags(ref secret_flags) => { - secret_flags::process_ws2p_v2p_secret_flags_msg( - controller, - secret_flags, - ) - } - WS2Pv2MessagePayload::Ok(_) => { - // Process ok message - ok_msg::process_ws2p_v2p_ok_msg(controller) - } - WS2Pv2MessagePayload::Ko(_) => Ok(close_with_reason( - "Receive Ko message !", - WS2PConnectionState::Denial, - )), - _ => { - if let WS2PConnectionState::Established = controller.meta_datas.state { - controller - .send_event(WS2PControllerEvent::RecvValidMsg { - ws2p_msg: valid_msg, - }) - .map(|_| None) - } else { - Ok(close_with_reason( - "Receive datas message on negociation !", + match WS2PMessage::parse_and_check_incoming_bin_message( + &mut controller.secure_layer, + &bin_msg, + ) { + Ok((valid_msgs, remote_pubkey_opt)) => { + let mut orders: Vec<WebsocketActionOrder> = Vec::new(); + for valid_msg in valid_msgs { + match valid_msg { + WS2PMessage::V2(ref msg_v2) => match msg_v2.payload { + WS2Pv2MessagePayload::Connect(ref box_connect_msg) => { + let connect_msg = box_connect_msg.deref(); + + // Get remote node id + let remote_full_id = if let Some(remote_pubkey) = remote_pubkey_opt + { + NodeFullId(msg_v2.issuer_node_id, remote_pubkey) + } else { + return Err( + WS2PControllerProcessError::ReceiveConnectMsgWithoutPubKey, + ); + }; + + // Process connect message + orders.append(&mut connect_msg::process_ws2p_v2p_connect_msg( + controller, + remote_full_id, + connect_msg, + )?); + } + WS2Pv2MessagePayload::Ack { + challenge: ack_msg_challenge, + } => { + // Process ack message + orders.append(&mut ack_msg::process_ws2p_v2p_ack_msg( + controller, + ack_msg_challenge, + )?); + } + WS2Pv2MessagePayload::SecretFlags(ref secret_flags) => { + // Process secret flags + orders.append( + &mut secret_flags::process_ws2p_v2p_secret_flags_msg( + controller, + secret_flags, + )?, + ); + } + WS2Pv2MessagePayload::Ok(_) => { + // Process ok message + orders.append(&mut ok_msg::process_ws2p_v2p_ok_msg(controller)?); + } + WS2Pv2MessagePayload::Ko(_) => { + return Ok(close_with_reason( + "Receive Ko message !", WS2PConnectionState::Denial, )) } - } + _ => { + if let WS2PConnectionState::Established = + controller.meta_datas.state + { + controller.send_event(WS2PControllerEvent::RecvValidMsg { + ws2p_msg: valid_msg, + })?; + } else { + return Ok(close_with_reason( + "Receive datas message on negociation !", + WS2PConnectionState::Denial, + )); + } + } + }, + WS2PMessage::_V0 | WS2PMessage::_V1 => fatal_error!( + "Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !" + ), } } - WS2PMessage::_V0 | WS2PMessage::_V1 => { - fatal_error!("Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !") - } - }, + Ok(orders) + } Err(ws2p_msg_err) => { log::warn!("Message is invalid : {:?}", ws2p_msg_err); controller.meta_datas.count_invalid_msgs += 1; @@ -134,7 +157,7 @@ pub fn process<M: ModuleMessage>( WS2PConnectionState::Denial, )) } else { - Ok(None) + Ok(vec![]) } } } @@ -146,12 +169,12 @@ pub fn process<M: ModuleMessage>( } } -fn close_with_reason(reason: &str, new_state: WS2PConnectionState) -> Option<WebsocketActionOrder> { - Some(WebsocketActionOrder { +fn close_with_reason(reason: &str, new_state: WS2PConnectionState) -> Vec<WebsocketActionOrder> { + vec![WebsocketActionOrder { ws_action: WebsocketAction::CloseConnection { reason: Some(reason.to_owned()), }, new_state_if_success: Some(new_state), new_state_if_fail: new_state, - }) + }] } diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs index c987782caa9b23418bca9d922ad993c217e824b7..7943f1d5d3148ceb5b60a35f1f95dc13abd24d0a 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs @@ -30,7 +30,7 @@ use log::error; pub fn process_ws2p_v2p_ack_msg<M: ModuleMessage>( controller: &mut WS2PController<M>, // controller contains original challenge ack_msg_challenge: Hash, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { log::debug!("Receive ACK message !"); match controller.meta_datas.state { @@ -59,13 +59,13 @@ fn process<M: ModuleMessage>( controller: &mut WS2PController<M>, ack_msg_challenge: Hash, success_status: WS2PConnectionState, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { if controller.meta_datas.challenge != ack_msg_challenge { controller .update_conn_state(WS2PConnectionState::Denial) - .map(|_| None) + .map(|_| vec![]) } else { - Ok(Some(send_ok_msg(controller, success_status))) + Ok(vec![send_ok_msg(controller, success_status)]) } } @@ -78,10 +78,10 @@ fn send_ok_msg<M: ModuleMessage>( let ok_msg = WS2Pv2OkMsg::default(); // Encapsulate and binarize OK message - if let Ok((_, bin_ok_msg)) = WS2Pv2Message::encapsulate_payload( + if let Ok(bin_ok_msg) = WS2Pv2Message::encapsulate_payload( + &mut controller.secure_layer, controller.meta_datas.currency.clone(), controller.meta_datas.local_node.my_node_id, - &controller.meta_datas.signator, WS2Pv2MessagePayload::Ok(ok_msg), ) { // Order the sending of a OK message diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs index e5a3791e68c96c0dc19c7bef9d5d9f10ab3091c6..b9879e7a89e7a58db51add34f9e98aeca7378ef7 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs @@ -33,7 +33,7 @@ pub fn process_ws2p_v2p_connect_msg<M: ModuleMessage>( controller: &mut WS2PController<M>, remote_full_id: NodeFullId, connect_msg: &WS2Pv2ConnectMsg, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { log::debug!("Receive CONNECT message !"); // Get remote node datas @@ -82,22 +82,22 @@ pub fn process_ws2p_v2p_connect_msg<M: ModuleMessage>( } // Encapsulate and binarize ACK message - if let Ok((_, bin_ack_msg)) = WS2Pv2Message::encapsulate_payload( + if let Ok(bin_ack_msg) = WS2Pv2Message::encapsulate_payload( + &mut controller.secure_layer, controller.meta_datas.currency.clone(), controller.meta_datas.local_node.my_node_id, - &controller.meta_datas.signator, WS2Pv2MessagePayload::Ack { challenge: remote_challenge, }, ) { // Order the sending of a OK message - Ok(Some(WebsocketActionOrder { + Ok(vec![WebsocketActionOrder { ws_action: WebsocketAction::SendMessage { msg: WebsocketMessage::Bin(bin_ack_msg), }, new_state_if_success: Some(WS2PConnectionState::ConnectMessOk), new_state_if_fail: WS2PConnectionState::Unreachable, - })) + }]) } else { fatal_error!("Dev error: Fail to sign own ack message !") } diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs index 6dd2ee4aa8559d8cfb6b5591577a0698385f1be6..d727f77f4773c42cb8393379f501714f3519a217 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs @@ -21,36 +21,36 @@ use crate::controller::{ }; use durs_common_tools::fatal_error; use durs_module::ModuleMessage; -use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; use log::error; -use unwrap::unwrap; /// Process WS2P v2+ OK Message pub fn process_ws2p_v2p_ok_msg<M: ModuleMessage>( controller: &mut WS2PController<M>, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { log::debug!("Receive OK message !"); match controller.meta_datas.state { WS2PConnectionState::ConnectMessOk | WS2PConnectionState::SecretFlagsOkWaitingAckMsg => { controller.update_conn_state(WS2PConnectionState::OkMsgOkWaitingAckMsg)?; - Ok(None) + Ok(vec![]) } WS2PConnectionState::AckMsgOk | WS2PConnectionState::SecretFlagsOk => { controller.meta_datas.state = WS2PConnectionState::Established; + let secure_layer = controller.secure_layer.try_clone()?; controller.send_event(WS2PControllerEvent::NewConnEstablished { - conn_type: if controller.meta_datas.connect_type != WS2Pv2ConnectType::Incoming { - controller.meta_datas.connect_type - } else { - unwrap!(controller.meta_datas.remote_connect_type) - }, + conn_type: controller.meta_datas.connect_type, /*if controller.id != WS2PControllerId::Incoming { + controller.meta_datas.connect_type + } else { + unwrap!(controller.meta_datas.remote_connect_type) + }*/ remote_full_id: if let Some(ref remote_node) = controller.meta_datas.remote_node { remote_node.remote_full_id } else { fatal_error!("remote_node must be valued in process_ws2p_v2p_ok_msg() !") }, + secure_layer, })?; - Ok(None) + Ok(vec![]) } _ => Ok(super::close_with_reason( "Unexpected OK message !", diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs index a85b5dc830703c24fb6371142bb1ae6c300e5cb7..a6d6b93f327705da9f41cf6e10a79530e7810e9b 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs @@ -27,7 +27,7 @@ use durs_ws2p_messages::v2::secret_flags::WS2Pv2SecretFlagsMsg; pub fn process_ws2p_v2p_secret_flags_msg<M: ModuleMessage>( controller: &mut WS2PController<M>, secret_flags: &WS2Pv2SecretFlagsMsg, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { // SECRET_FLAGS informations must never be logged in prod #[cfg(test)] log::debug!("Receive SECRET_FLAGS message !"); @@ -38,9 +38,9 @@ pub fn process_ws2p_v2p_secret_flags_msg<M: ModuleMessage>( secret_flags, WS2PConnectionState::SecretFlagsOkWaitingAckMsg, ) - .map(|_| None), + .map(|_| vec![]), WS2PConnectionState::AckMsgOk => { - process(controller, secret_flags, WS2PConnectionState::SecretFlagsOk).map(|_| None) + process(controller, secret_flags, WS2PConnectionState::SecretFlagsOk).map(|_| vec![]) } _ => Ok(super::close_with_reason( "Unexpected SECRET_FLAGS message !", @@ -54,6 +54,6 @@ fn process<M: ModuleMessage>( _secret_flags: &WS2Pv2SecretFlagsMsg, success_state: WS2PConnectionState, ) -> Result<(), WS2PControllerProcessError> { - // TODO .. traitement des secrets flags + // TODO HUGO .. traitement des secrets flags controller.update_conn_state(success_state) } diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs index ca8c4d21dfb85194a58ebf675bf657a8f9336642..3ddf2ce9c8cc0e98273adf5e96207c8f60c9dd67 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs @@ -30,7 +30,7 @@ use std::net::SocketAddr; pub fn process<M: ModuleMessage>( controller: &mut WS2PController<M>, remote_addr_opt: Option<SocketAddr>, -) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { +) -> Result<Vec<WebsocketActionOrder>, WS2PControllerProcessError> { log::debug!("open websocket from {}", print_opt_addr(remote_addr_opt)); // Update connection state @@ -45,20 +45,20 @@ pub fn process<M: ModuleMessage>( ); // Encapsulate and binarize connect message - if let Ok((_ws2p_full_msg, bin_connect_msg)) = WS2Pv2Message::encapsulate_payload( + if let Ok(bin_connect_msg) = WS2Pv2Message::encapsulate_payload( + &mut controller.secure_layer, controller.meta_datas.currency.clone(), controller.meta_datas.local_node.my_node_id, - &controller.meta_datas.signator, WS2Pv2MessagePayload::Connect(Box::new(connect_msg)), ) { // Order the sending of a CONNECT message - Ok(Some(WebsocketActionOrder { + Ok(vec![WebsocketActionOrder { ws_action: WebsocketAction::SendMessage { msg: WebsocketMessage::Bin(bin_connect_msg), }, new_state_if_success: Some(WS2PConnectionState::WaitingConnectMsg), new_state_if_fail: WS2PConnectionState::Unreachable, - })) + }]) } else { fatal_error!("Dev error: Fail to sign own connect message !") } diff --git a/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs b/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs index 009649e165316ab59319a27a3ef042b8809b5232..8ad10d40c8395f7573ec9f39c8f36aeadbe392db 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs @@ -36,14 +36,11 @@ use dubp_common_doc::BlockHash; use dubp_common_doc::Blockstamp; use dubp_common_doc::CurrencyName; use dup_crypto::keys::{KeyPair, SignatorEnum}; -use durs_common_tools::fatal_error; use durs_message::DursMsg; use durs_module::ModuleMessage; use durs_module::*; -use durs_network_documents::network_endpoint::*; use durs_network_documents::network_peer::*; use durs_network_documents::NodeFullId; -use maplit::hashset; use std::collections::HashMap; use std::sync::mpsc; use std::thread; @@ -69,6 +66,13 @@ pub enum OrchestratorMsg<M: ModuleMessage> { ModuleMessage(M), } +/// Data allowing the service to manage an incoming connection +#[derive(Debug)] +pub struct IncomingConnection { + /// Connection Security Layer + pub secure_layer: pkstl::SecureLayer, +} + /// Main orchestrator #[derive(Debug)] pub struct WS2POrchestrator { @@ -76,6 +80,8 @@ pub struct WS2POrchestrator { static_name: ModuleStaticName, /// current blockstamp pub current_blockstamp: Option<Blockstamp>, + /// List of established incoming connections + pub incoming_connections: HashMap<NodeFullId, IncomingConnection>, /// milestones: hash of the last block of each chunk pub milestones: Option<Vec<BlockHash>>, /// List of known peers @@ -98,26 +104,24 @@ impl WS2POrchestrator { pub fn new_pair<WS: WebsocketTrait>( static_name: ModuleStaticName, router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>, - module_sender: mpsc::Sender<DursMsg>, module_receiver: mpsc::Receiver<DursMsg>, - api_name: ApiName, self_node: MySelfWs2pNode, self_signator: SignatorEnum, ) -> (WS2POutgoingOrchestrator<WS>, WS2POrchestrator) { + // Create main orchestrator channel + let (orchestrator_sender, orchestrator_receiver) = mpsc::channel(); + // starts proxy thread - let (orchestrator_sender, orchestrator_receiver) = start_proxy_thread( - router_sender.clone(), - module_sender, - module_receiver, - static_name, - api_name, - ); + start_proxy_thread(orchestrator_sender.clone(), module_receiver); + + // Start ws server + // TODO ESZ // Regenerate self_signator for main orchestrator let self_signator_2 = self_node .my_key_pair .generate_signator() - .expect("corrupted keypair"); // TODO handle error + .expect("corrupted keypair"); // TODO HUGO handle error // OUTGOING: initialize the outgoing orchestrator let outgoing_orchestrator = WS2POutgoingOrchestrator::<WS>::new( @@ -134,6 +138,7 @@ impl WS2POrchestrator { let orchestrator = WS2POrchestrator { static_name, current_blockstamp: None, + incoming_connections: HashMap::new(), milestones: None, peers: HashMap::new(), router_sender, @@ -162,11 +167,10 @@ impl WS2POrchestrator { } } OrchestratorMsg::ControllerEvent { - // controller_id, + controller_id, event, - .. } => { - self.process_controller_event(event); + event::process_controller_event(self, controller_id, event); } // messages from other modules wrapped into an orchestator message OrchestratorMsg::ModuleMessage(mod_msg) => match mod_msg { @@ -209,80 +213,39 @@ impl WS2POrchestrator { // the proxy thread role is to convert DursMsg to OrchestratorMsg fn start_proxy_thread( - // sender for router - router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>, - // sender of ws2p module - module_sender: mpsc::Sender<DursMsg>, + // sender for ws2p orchestrator + orchestrator_sender: mpsc::Sender<OrchestratorMsg<DursMsg>>, // receiver of ws2p module module_receiver: mpsc::Receiver<DursMsg>, - static_name: ModuleStaticName, - api_name: ApiName, -) -> ( - mpsc::Sender<OrchestratorMsg<DursMsg>>, - mpsc::Receiver<OrchestratorMsg<DursMsg>>, ) { - // Create proxy channel - let (proxy_sender, proxy_receiver): ( - mpsc::Sender<OrchestratorMsg<DursMsg>>, - mpsc::Receiver<OrchestratorMsg<DursMsg>>, - ) = mpsc::channel(); - - // clone these - let proxy_sender_clone = proxy_sender.clone(); - // Launch a proxy thread that transform DursMsg to OrchestratorMsg thread::spawn(move || { - // register to router - if router_sender - .send(RouterThreadMessage::ModuleRegistration { - static_name, - sender: module_sender, - roles: vec![ModuleRole::InterNodesNetwork], - events_subscription: vec![ - ModuleEvent::NewValidBlock, - ModuleEvent::NewWotDocInPool, - ModuleEvent::NewTxinPool, - ], - reserved_apis_parts: vec![ApiPart { - name: api_name, - versions: hashset![ApiVersion(2)], - }], - endpoints: vec![], - }) - .is_ok() - { - debug!("Send ws2p sender to main thread."); - loop { - match module_receiver.recv() { - Ok(message) => { - // stop flag - let stop = if let DursMsg::Stop = message { - true - } else { - false - }; - // forward message to orchestrator - match proxy_sender.send(OrchestratorMsg::ModuleMessage(message)) { - Ok(_) => { - if stop { - break; - }; - } - Err(_) => debug!( - "ws2p proxy : failed to relay DursMsg to Orchestrator main thread !" - ), + loop { + match module_receiver.recv() { + Ok(message) => { + // stop flag + let stop = if let DursMsg::Stop = message { + true + } else { + false + }; + // forward message to orchestrator + match orchestrator_sender.send(OrchestratorMsg::ModuleMessage(message)) { + Ok(_) => { + if stop { + break; + }; } - } - Err(e) => { - warn!("{}", e); - break; + Err(_) => debug!( + "ws2p proxy : failed to relay DursMsg to Orchestrator main thread !" + ), } } + Err(e) => { + warn!("{}", e); + break; + } } - } else { - fatal_error!("ws2p module failed to send registration to router !"); } }); - - (proxy_sender_clone, proxy_receiver) } diff --git a/lib/modules/ws2p/ws2p-protocol/src/orchestrator/event.rs b/lib/modules/ws2p/ws2p-protocol/src/orchestrator/event.rs index aedab0d4bc4d374d35d67af7b511d75589c93173..6b54f368d5a2117aa30c6027fe0813087633202e 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/orchestrator/event.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/orchestrator/event.rs @@ -18,57 +18,82 @@ use crate::constants::CHUNK_SIZE; use crate::controller::*; use crate::orchestrator::outgoing::ServiceMsg; -use crate::orchestrator::WS2POrchestrator; +use crate::orchestrator::{IncomingConnection, WS2POrchestrator}; use durs_common_tools::fatal_error; use durs_network_documents::network_peer::*; use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; use durs_ws2p_messages::v2::sync_info::WS2Pv2SyncInfo; -impl WS2POrchestrator { - /// process controllers events - pub fn process_controller_event(self: &mut Self, event: WS2PControllerEvent) { - match event { - WS2PControllerEvent::NewConnEstablished { - conn_type, - remote_full_id, - } => match conn_type { - WS2Pv2ConnectType::Sync { .. } => { - if let Some(target_blockstamp) = self.current_blockstamp { - if let Some(milestones) = &self.milestones { - let peer_cards = - self.peers.values().cloned().collect::<Vec<PeerCard>>(); - if self - .outgoing_sender - .send(ServiceMsg::SendSyncInfo { - remote_full_id, - sync_info: WS2Pv2SyncInfo { - chunk_size: *CHUNK_SIZE, - target_blockstamp, - milestones: milestones.to_vec(), - peer_cards, - }, - }) - .is_ok() - { - debug!("sent sync info to outgoing orchestrator"); +/// process controllers events +pub fn process_controller_event( + ws2p_orchestrator: &mut WS2POrchestrator, + controller_id: WS2PControllerId, + event: WS2PControllerEvent, +) { + match event { + WS2PControllerEvent::NewConnEstablished { + conn_type, + remote_full_id, + secure_layer, + } => match controller_id { + WS2PControllerId::Incoming => { + ws2p_orchestrator + .incoming_connections + .insert(remote_full_id, IncomingConnection { secure_layer }); + match conn_type { + WS2Pv2ConnectType::Sync { .. } => { + if let Some(target_blockstamp) = ws2p_orchestrator.current_blockstamp { + if let Some(milestones) = &ws2p_orchestrator.milestones { + let peer_cards = ws2p_orchestrator + .peers + .values() + .cloned() + .collect::<Vec<PeerCard>>(); + let _sync_info = WS2Pv2SyncInfo { + chunk_size: *CHUNK_SIZE, + target_blockstamp, + milestones: milestones.to_vec(), + peer_cards, + }; + // TODO send sync info to remote node + + // TODO send chunks or close connection } else { - error!("can not join outgoing orchestrator"); + fatal_error!("can not send sync info if milestones are not known"); } } else { - fatal_error!("can not send sync info if milestones are not known"); + fatal_error!( + "can not send sync info if current blockstamp is not known" + ); } - } else { - fatal_error!("can not send sync info if current blockstamp is not known"); } + WS2Pv2ConnectType::OutgoingClient => unimplemented!(), + WS2Pv2ConnectType::OutgoingServer => unimplemented!(), + WS2Pv2ConnectType::SyncAskChunk(_blockstamp) => unimplemented!(), + WS2Pv2ConnectType::SyncSendChunks => unimplemented!(), + WS2Pv2ConnectType::Incoming => unreachable!(), + } + } + WS2PControllerId::Client { .. } => { + unimplemented!(); + } + WS2PControllerId::Outgoing { .. } => { + if ws2p_orchestrator + .outgoing_sender + .send(ServiceMsg::NewEstablishedConnection { + controller_id, + secure_layer, + }) + .is_ok() + { + debug!("sent secure layer outgoing orchestrator"); + } else { + error!("can not join outgoing orchestrator"); } - WS2Pv2ConnectType::Incoming => {} - WS2Pv2ConnectType::OutgoingClient => {} - WS2Pv2ConnectType::OutgoingServer => {} - WS2Pv2ConnectType::SyncAskChunk(_blockstamp) => {} - WS2Pv2ConnectType::SyncSendChunks => {} - }, - WS2PControllerEvent::RecvValidMsg { .. } => {} - WS2PControllerEvent::StateChange { .. } => {} - } + unimplemented!(); + } + }, + WS2PControllerEvent::RecvValidMsg { .. } => {} + WS2PControllerEvent::StateChange { .. } => {} } } diff --git a/lib/modules/ws2p/ws2p-protocol/src/orchestrator/outgoing.rs b/lib/modules/ws2p/ws2p-protocol/src/orchestrator/outgoing.rs index d431a9760f1e51eb5575c346ee53d1e1ff64b22a..00aabdf0f7e42a56486ca151e6d4bc246bc078e8 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/orchestrator/outgoing.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/orchestrator/outgoing.rs @@ -27,7 +27,6 @@ use core::fmt::Debug; use dubp_common_doc::Blockstamp; use dubp_common_doc::CurrencyName; use dup_crypto::keys::SignatorEnum; -use durs_common_tools::fatal_error; use durs_message::requests::*; use durs_message::*; use durs_module::*; @@ -44,7 +43,7 @@ use std::sync::mpsc; use std::time::Duration; /// messages used to give orders or signals to outgoing orchestrator -#[derive(Clone, Debug)] // Copy +#[derive(Debug)] pub enum ServiceMsg { /// current block changed CurrentBlockstamp(Blockstamp), @@ -52,6 +51,8 @@ pub enum ServiceMsg { SendSyncInfo { /// node to which we want to send info remote_full_id: NodeFullId, + /// Connection secure layer + secure_layer: pkstl::SecureLayer, /// info to send sync_info: WS2Pv2SyncInfo, }, @@ -62,15 +63,43 @@ pub enum ServiceMsg { /// controller sender sender: mpsc::Sender<WebsocketActionOrder>, }, + /// got new established connection + NewEstablishedConnection { + /// controller id + controller_id: WS2PControllerId, + /// connection secure layer + secure_layer: pkstl::SecureLayer, + }, } -#[derive(Debug, Clone)] /// Data allowing the service to manage an outgoing connection +#[derive(Debug)] pub struct OutgoingConnection { /// Endpoint pub endpoint: Option<EndpointEnum>, /// Controller channel pub controller: mpsc::Sender<WebsocketActionOrder>, + /// Connection Security Layer + pub secure_layer: Option<pkstl::SecureLayer>, +} + +impl OutgoingConnection { + // close outgoing connection + fn close(&self) { + if self + .controller + .send(WebsocketActionOrder { + ws_action: WebsocketAction::CloseConnection { + reason: Some("can not send chunks now".to_string()), + }, + new_state_if_success: Some(WS2PConnectionState::Close), + new_state_if_fail: WS2PConnectionState::Close, + }) + .is_err() + { + warn!("can not contact controller"); + } + } } #[derive(Debug, Copy, Clone)] @@ -236,27 +265,40 @@ impl<WS: WebsocketTrait> WS2POutgoingOrchestrator<WS> { OutgoingConnection { endpoint: None, controller: sender, + secure_layer: None, }, ); // maps the controller id to the "unknown" key self.index.insert(None, id); } + ServiceMsg::NewEstablishedConnection { + controller_id, + secure_layer, + } => { + // Add the secure layer to the outgoing connection + if let Some(ref mut outgoing_connection) = + self.connections.get_mut(&controller_id) + { + outgoing_connection.secure_layer = Some(secure_layer); + } else { + debug!("receive signal NewEstablishedConnection for a connection thaht not exist in OutgoingOrchestrator"); + } + } ServiceMsg::SendSyncInfo { remote_full_id, + mut secure_layer, sync_info, - } => match self.connections.get(&WS2PControllerId::Outgoing { + } => match self.connections.get_mut(&WS2PControllerId::Outgoing { expected_remote_full_id: Some(remote_full_id), }) { - Some(outgoing_connection) => { + Some(ref mut outgoing_connection) => { // Encapsulate and binarize Sync_Info message - if let Ok((_ws2p_full_msg, bin_connect_msg)) = - WS2Pv2Message::encapsulate_payload( - self.currency.clone(), - self.self_node.my_node_id, - &self.self_signator, - WS2Pv2MessagePayload::SyncInfo(sync_info), - ) - { + if let Ok(bin_connect_msg) = WS2Pv2Message::encapsulate_payload( + &mut secure_layer, + self.currency.clone(), + self.self_node.my_node_id, + WS2Pv2MessagePayload::SyncInfo(sync_info), + ) { // send the sync info message order to the controller if outgoing_connection .controller @@ -272,27 +314,19 @@ impl<WS: WebsocketTrait> WS2POutgoingOrchestrator<WS> { } else { warn!("can not contact controller"); } - - // decide wether to close the connection or to send the chunks - - // close - if outgoing_connection - .controller - .send(WebsocketActionOrder { - ws_action: WebsocketAction::CloseConnection { - reason: Some("can not send chunks now".to_string()), - }, - new_state_if_success: Some(WS2PConnectionState::Close), - new_state_if_fail: WS2PConnectionState::Close, - }) - .is_ok() - { - } else { - warn!("can not contact controller"); - } } else { - fatal_error!("Dev error: Fail to sign own connect message !"); + // Security error, close the connection and ban pubkey and ip for a few minutes + outgoing_connection.close(); + // TODO ban pubkey and ip for a few minutes } + + // Get connection secure layer + outgoing_connection.secure_layer = Some(secure_layer); + + // decide wether to close the connection or to send the chunks + + // close + outgoing_connection.close(); } None => { warn!("no outgoing connection corresponding to this id"); diff --git a/lib/modules/ws2p/ws2p-protocol/src/websocket.rs b/lib/modules/ws2p/ws2p-protocol/src/websocket.rs index cd1e3c204c1fa0dfe0cfa14104a1ccde7e3e771f..4e0354f6c38180cae6116584a70523c8ea3c26ad 100644 --- a/lib/modules/ws2p/ws2p-protocol/src/websocket.rs +++ b/lib/modules/ws2p/ws2p-protocol/src/websocket.rs @@ -63,10 +63,6 @@ pub enum WebsocketIncomingEvent { #[derive(Clone, Debug)] /// Websocket action pub enum WebsocketAction { - // /// Listen on address - // Listen { - // addr: impl ToSocketAddrs + Debug, // TODO how can I do this ? - // }, /// Connect to websocket url ConnectToUrl { /// Websocket url diff --git a/lib/modules/ws2p/ws2p/Cargo.toml b/lib/modules/ws2p/ws2p/Cargo.toml index e8ded95095619f91a16d5e4432d5895d7ced77a2..df70eb98abd10a5b64492222be12e3225ba72293 100644 --- a/lib/modules/ws2p/ws2p/Cargo.toml +++ b/lib/modules/ws2p/ws2p/Cargo.toml @@ -10,7 +10,6 @@ edition = "2018" path = "src/lib.rs" [dependencies] -rand = "0.5.*" bincode = "1.0.*" dubp-common-doc = { path = "../../../dubp/common-doc"} #, version = "0.1.0" } dubp-currency-params = { path = "../../../dubp/currency-params" } @@ -31,7 +30,7 @@ serde = "1.0.*" serde_derive = "1.0.*" serde_json = "1.0.*" structopt= "0.2.*" -ws = { version = "0.9.*", features = ["permessage-deflate"] } +ws = "0.9.*" [dev-dependencies] durs-common-tests-tools = { path = "../../../tests-tools/common-tests-tools" } diff --git a/lib/modules/ws2p/ws2p/src/constants.rs b/lib/modules/ws2p/ws2p/src/constants.rs index 9531921b8e8934b96c88ca91c6148a4aa720fb1f..90d419cce6550b2707cca386a2f0a3b1cf092276 100644 --- a/lib/modules/ws2p/ws2p/src/constants.rs +++ b/lib/modules/ws2p/ws2p/src/constants.rs @@ -13,5 +13,5 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -pub static API_NAME: &'static str = "WS2P"; -pub static MODULE_NAME: &'static str = "ws2p"; +pub static API_NAME: &str = "WS2P"; +pub static MODULE_NAME: &str = "ws2p"; diff --git a/lib/modules/ws2p/ws2p/src/controllers/handler.rs b/lib/modules/ws2p/ws2p/src/controllers/handler.rs index 3cfe999d2cb1bde868841db1fc270650126348c3..74221228c5cdc7bb18cddb4f0ebd8214876051c2 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/handler.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/handler.rs @@ -129,17 +129,16 @@ impl Handler for Ws2pConnectionHandler { match self.controller.process(WebsocketIncomingEvent::OnOpen { remote_addr: self.remote_addr_opt, }) { - Ok(ws_action_order_opt) => { + Ok(ws_action_orders) => { // Start RECV_SERVICE timeout self.ws .0 .timeout(*WS2P_RECV_SERVICE_FREQ_IN_MS, RECV_SERVICE)?; - // Execute websocket action order - if let Some(ws_action_order) = ws_action_order_opt { - self.exec_ws_action(ws_action_order) - } else { - Ok(()) + // Execute websocket action orders + for ws_action_order in ws_action_orders { + self.exec_ws_action(ws_action_order)?; } + Ok(()) } Err(e) => self.exec_ws_action(WebsocketActionOrder { ws_action: WebsocketAction::CloseConnection { @@ -162,12 +161,12 @@ impl Handler for Ws2pConnectionHandler { .controller .process(WebsocketIncomingEvent::OnMessage { msg }) { - Ok(ws_action_order_opt) => { - if let Some(ws_action_order) = ws_action_order_opt { - self.exec_ws_action(ws_action_order) - } else { - Ok(()) + Ok(ws_action_orders) => { + // Execute websocket action orders + for ws_action_order in ws_action_orders { + self.exec_ws_action(ws_action_order)?; } + Ok(()) } Err(e) => self.exec_ws_action(WebsocketActionOrder { ws_action: WebsocketAction::CloseConnection { diff --git a/lib/modules/ws2p/ws2p/src/controllers/incoming_connections.rs b/lib/modules/ws2p/ws2p/src/controllers/incoming_connections.rs index d3e306d1cc82e071105fe281edf9a09a893f04c8..efbe750549c55584c736dc9e03c9d537eea1f24a 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/incoming_connections.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/incoming_connections.rs @@ -28,7 +28,6 @@ use durs_ws2p_protocol::MySelfWs2pNode; use std::fmt::Debug; use std::net::ToSocketAddrs; use std::sync::mpsc; -use ws::deflate::DeflateBuilder; use ws::listen; /// Listen on WSPv2 host:port @@ -50,11 +49,11 @@ pub fn listen_on_ws2p_v2_endpoint<A: ToSocketAddrs + Debug>( ), orchestrator_sender.clone(), ) { - Ok(controller) => DeflateBuilder::new().build(Ws2pConnectionHandler { + Ok(controller) => Ws2pConnectionHandler { ws: WsSender(ws), remote_addr_opt: None, controller, - }), + }, Err(_e) => fatal_error!("WS2P Orchestrator unreachable"), } }) diff --git a/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections.rs b/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections.rs index 182d9fd3249da13a6a8b68d24b9e55d402fd6af6..f84c589fb17b6249abfd421c1e6abf6fe19974a8 100644 --- a/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections.rs +++ b/lib/modules/ws2p/ws2p/src/controllers/outgoing_connections.rs @@ -27,7 +27,6 @@ use durs_ws2p_protocol::controller::{WS2PController, WS2PControllerId}; use durs_ws2p_protocol::orchestrator::OrchestratorMsg; use durs_ws2p_protocol::MySelfWs2pNode; use ws::connect; -use ws::deflate::DeflateBuilder; //use durs_network::*; use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; use std::sync::mpsc; @@ -60,11 +59,11 @@ pub fn connect_to_ws2p_v2_endpoint( ), orchestrator_sender.clone(), ) { - Ok(controller) => DeflateBuilder::new().build(Ws2pConnectionHandler { + Ok(controller) => Ws2pConnectionHandler { ws: WsSender(ws), remote_addr_opt: None, controller, - }), + }, Err(_e) => fatal_error!("WS2P Service unreachable"), } }) @@ -94,11 +93,11 @@ pub fn connect_to_url( ), orchestrator_sender.clone(), ) { - Ok(controller) => DeflateBuilder::new().build(Ws2pConnectionHandler { + Ok(controller) => Ws2pConnectionHandler { ws: WsSender(ws), remote_addr_opt: None, controller, - }), + }, Err(_e) => fatal_error!("WS2P Service unreachable"), } }) diff --git a/lib/modules/ws2p/ws2p/src/lib.rs b/lib/modules/ws2p/ws2p/src/lib.rs index b9717929ec97494d1031a11cdde51dbab678095b..7d879d562f9ba83ac7590160442421435be0bcb6 100644 --- a/lib/modules/ws2p/ws2p/src/lib.rs +++ b/lib/modules/ws2p/ws2p/src/lib.rs @@ -59,8 +59,9 @@ use durs_ws2p_messages::v2::api_features::WS2PFeatures; use durs_ws2p_protocol::constants as protocol_constants; use durs_ws2p_protocol::orchestrator::outgoing::WS2POutgoingOrchestrator; use durs_ws2p_protocol::orchestrator::WS2POrchestrator; +//use durs_ws2p_protocol::websocket::WebsocketTrait; use durs_ws2p_protocol::MySelfWs2pNode; -use rand::Rng; +use maplit::hashset; use std::sync::mpsc; use std::thread; @@ -149,13 +150,19 @@ impl ApiModule<DuRsConf, DursMsg> for WS2PModule { impl NetworkModule<DuRsConf, DursMsg> for WS2PModule { // --- SYNC --- fn sync( - _soft_meta_datas: &SoftwareMetaDatas<DuRsConf>, + soft_meta_datas: &SoftwareMetaDatas<DuRsConf>, keys: RequiredKeysContent, - _conf: WS2PConf, + conf: WS2PConf, router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>, sync_params: SyncOpt, ) -> Result<(), SyncError> { - if let Ok((mut outgoing_orchestrator, mut orchestrator)) = init(keys, router_sender) { + if let Ok((mut outgoing_orchestrator, mut orchestrator)) = init( + conf, + keys, + router_sender, + soft_meta_datas, + Some(sync_params.clone()), + ) { // starts listening module messages thread::spawn(move || { debug!("started orchestrator thread"); @@ -187,6 +194,7 @@ impl DursModule<DuRsConf, DursMsg> for WS2PModule { type ModuleConf = WS2PConf; type ModuleOpt = WS2POpt; + #[inline] fn name() -> ModuleStaticName { ModuleStaticName(constants::MODULE_NAME) } @@ -229,12 +237,14 @@ impl DursModule<DuRsConf, DursMsg> for WS2PModule { } // --- START --- fn start( - _soft_meta_datas: &SoftwareMetaDatas<DuRsConf>, + soft_meta_datas: &SoftwareMetaDatas<DuRsConf>, keys: RequiredKeysContent, - _conf: WS2PConf, + conf: WS2PConf, router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>, ) -> Result<(), failure::Error> { - if let Ok((mut outgoing_orchestrator, mut orchestrator)) = init(keys, router_sender) { + if let Ok((mut outgoing_orchestrator, mut orchestrator)) = + init(conf, keys, router_sender, soft_meta_datas, None) + { // starts listening module messages thread::spawn(move || { orchestrator.main_loop(); @@ -251,10 +261,13 @@ impl DursModule<DuRsConf, DursMsg> for WS2PModule { } } -// initialize orchestrators +// initialize WS2Pv2 module orchestrators fn init( + _conf: WS2PConf, keys: RequiredKeysContent, router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>, + soft_meta_datas: &SoftwareMetaDatas<DuRsConf>, + sync_params_opt: Option<SyncOpt>, ) -> Result< ( WS2POutgoingOrchestrator<WebsocketConnector>, @@ -269,22 +282,57 @@ fn init( return Err(WS2PError::UnexpectedKeys.into()); }; + // Get node id + let node_id = NodeId(soft_meta_datas.conf.my_node_id()); + // Create module channel let (module_sender, module_receiver): (mpsc::Sender<DursMsg>, mpsc::Receiver<DursMsg>) = mpsc::channel(); - // generate node id - let node_id = NodeId(rand::thread_rng().gen::<u32>()); + // Register module to router + if router_sender + .send(RouterThreadMessage::ModuleRegistration { + static_name: WS2PModule::name(), + sender: module_sender, + roles: vec![ModuleRole::InterNodesNetwork], + events_subscription: vec![ + ModuleEvent::NewValidBlock, + ModuleEvent::NewWotDocInPool, + ModuleEvent::NewTxinPool, + ], + reserved_apis_parts: vec![ApiPart { + name: ApiName(constants::API_NAME.to_owned()), + versions: hashset![ApiVersion(2)], + }], + endpoints: vec![], + }) + .is_ok() + { + debug!("Send ws2p sender to main thread."); + } else { + fatal_error!("Fail to register WS2P module to router"); + } + + // Get currency name + let mut currency = CurrencyName(durs_conf::constants::DEFAULT_CURRENCY.to_owned()); + if let Some(sync_params) = sync_params_opt { + if let Some(currency_str) = sync_params.currency { + currency = CurrencyName(currency_str); + } + } else { + // Wait currency signal + // TODO HUGO get currency name + } // generate self signator - let self_signator = key_pair.generate_signator().expect("corrupted keypair"); // TODO handle error + let self_signator = key_pair.generate_signator().expect("corrupted keypair"); // TODO HUGO handle error // generate peer card let peer_card = if let Ok(peer_card) = generate_peer::_generate_self_peer( - CurrencyName("mok".to_string()), // TODO get currency name + currency.clone(), &self_signator, node_id, - BlockNumber(0), // yet unknown TODO replace when known + BlockNumber(0), // yet unknown TODO HUGO replace when known vec![], ) { peer_card @@ -292,7 +340,7 @@ fn init( fatal_error!("can not sign self peer"); }; - // init self node + // generate self node let self_node = MySelfWs2pNode::new( node_id, key_pair, @@ -300,13 +348,20 @@ fn init( PeerCard::V11(peer_card), ); + // Create and start ws server + /*if sync_params_opt.is_none() { + WebsocketConnector::new( + currency.clone(), + self_node.clone(), + orchestrator_sender, + ) + }*/ + // initialize the Orchestrator pair let (outgoing_orchestrator, orchestrator) = WS2POrchestrator::new_pair::<WebsocketConnector>( WS2PModule::name(), router_sender, - module_sender, module_receiver, - ApiName(constants::API_NAME.to_owned()), self_node.clone(), self_signator, ); diff --git a/lib/modules/ws2p/ws2p/tests/connection_negociation.rs b/lib/modules/ws2p/ws2p/tests/connection_negociation.rs index 897583d492a20181cebd78d696322fdf3dec4011..a2a5114a47fa1159682e5acdab26c83d149807d9 100644 --- a/lib/modules/ws2p/ws2p/tests/connection_negociation.rs +++ b/lib/modules/ws2p/ws2p/tests/connection_negociation.rs @@ -102,7 +102,7 @@ fn client_node() -> MySelfWs2pNode { } } -//#[ignore] +#[ignore] #[test] #[cfg(unix)] fn test_connection_negociation_denial() { @@ -231,7 +231,7 @@ fn test_connection_negociation_success() { }); // ===== opening connection ===== - // we must get Ws2pServiceSender::ControllerSender from the client and server threads (but we ignore them) + // we must get Ws2PServiceSender::ControllerSender from the client and server threads (but we ignore them) // we also test that the statuses match expected ones let _client_controller = get_controller(&client_service_channel.1); @@ -275,35 +275,44 @@ fn test_connection_negociation_success() { ); // Established for client - expected_event( + expected_event_new_conn_established( &client_service_channel.1, - WS2PControllerEvent::NewConnEstablished { - conn_type: WS2Pv2ConnectType::OutgoingServer, - remote_full_id: server_node.get_full_id(), - }, + WS2Pv2ConnectType::OutgoingServer, + server_node.get_full_id(), ); // Established for server - expected_event( + expected_event_new_conn_established( &server_service_channel.1, - WS2PControllerEvent::NewConnEstablished { - conn_type: WS2Pv2ConnectType::OutgoingServer, - remote_full_id: client_node.get_full_id(), - }, + WS2Pv2ConnectType::Incoming, + client_node.get_full_id(), ); } // === functions used in above test === // Get established event in a receiver -fn expected_event( +fn expected_event_new_conn_established( orchestrator_receiver: &mpsc::Receiver<OrchestratorMsg<DursMsg>>, - expected_event: WS2PControllerEvent, + expected_conn_type: WS2Pv2ConnectType, + expected_remote_full_id: NodeFullId, ) { match orchestrator_receiver .recv_timeout(Duration::from_millis(*TIMEOUT_IN_MS)) .expect("Receive nothing from controller :") { - OrchestratorMsg::ControllerEvent { event, .. } => assert_eq!(expected_event, event), + OrchestratorMsg::ControllerEvent { event, .. } => { + if let WS2PControllerEvent::NewConnEstablished { + conn_type, + remote_full_id, + .. + } = event + { + assert_eq!(expected_conn_type, conn_type); + assert_eq!(expected_remote_full_id, remote_full_id); + } else { + panic!("Expect event NewConnEstablished, receive '{:?}' !", event); + } + } other => panic!("Expect signal ControllerEvent, receive '{:?}' !", other), } }