diff --git a/Cargo.toml b/Cargo.toml index c15c5c204603cb3cb5c7f97307a3d9e63fb01b19..c05aacdb169ab23d9d89044380491456f1b1a784 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ "lib/modules/tui", "lib/modules/ws2p-v1-legacy", "lib/modules/ws2p/ws2p", - "lib/modules/ws2p/ws2p-messages", + "lib/modules/ws2p/ws2p-protocol", "lib/tests-tools/crypto-tests-tools", "lib/tests-tools/documents-tests-tools", "lib/tests-tools/common-tests-tools", diff --git a/lib/modules/ws2p/ws2p-messages/v2/connect.rs b/lib/modules/ws2p/ws2p-messages/v2/connect.rs index 71ce3d80f5203fc9689ab6cec1bcc30c806cb33d..aa085fb57ea02b197173a76d26dd2789e977127d 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/connect.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/connect.rs @@ -48,15 +48,22 @@ impl WS2PConnectFlags { pub fn res_sync_chunk(&self) -> bool { 0b1111_1011 | self.0[0] == 255u8 } + /// Check flag CLIENT + pub fn client(&self) -> bool { + 0b1111_0111 | self.0[0] == 255u8 + } } impl From<WS2Pv2ConnectType> for WS2PConnectFlags { fn from(connect_type: WS2Pv2ConnectType) -> Self { match connect_type { - WS2Pv2ConnectType::Classic | WS2Pv2ConnectType::Incoming => WS2PConnectFlags(vec![]), + WS2Pv2ConnectType::Incoming | WS2Pv2ConnectType::OutgoingServer => { + WS2PConnectFlags(vec![]) + } + WS2Pv2ConnectType::OutgoingClient => WS2PConnectFlags(vec![8u8]), WS2Pv2ConnectType::Sync(_) => WS2PConnectFlags(vec![1u8]), - WS2Pv2ConnectType::AskChunk(_) => WS2PConnectFlags(vec![3u8]), - WS2Pv2ConnectType::SendChunks => WS2PConnectFlags(vec![5u8]), + WS2Pv2ConnectType::SyncAskChunk(_) => WS2PConnectFlags(vec![3u8]), + WS2Pv2ConnectType::SyncSendChunks => WS2PConnectFlags(vec![5u8]), } } } @@ -64,16 +71,18 @@ impl From<WS2Pv2ConnectType> for WS2PConnectFlags { #[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] /// WS2Pv2ConnectType pub enum WS2Pv2ConnectType { - /// Classic outgoing connection - Classic, /// Incoming connection Incoming, + /// Client outgoing connection + OutgoingClient, + /// Server outgoing connection + OutgoingServer, /// Sync outgoing connection (from blockstamp, or from genesis block if blockstamp is none) Sync(Option<Blockstamp>), /// Sync outgoing connection to request chunk - AskChunk(Blockstamp), + SyncAskChunk(Blockstamp), /// Sync outgoing connection to send chunk - SendChunks, + SyncSendChunks, } impl WS2Pv2ConnectType { @@ -84,14 +93,14 @@ impl WS2Pv2ConnectType { ) -> WS2Pv2ConnectType { if !flags.is_empty() && flags.sync() { if flags.ask_sync_chunk() && blockstamp.is_some() { - WS2Pv2ConnectType::AskChunk(blockstamp.expect("safe unwrap")) + WS2Pv2ConnectType::SyncAskChunk(blockstamp.expect("safe unwrap")) } else if flags.res_sync_chunk() { - WS2Pv2ConnectType::SendChunks + WS2Pv2ConnectType::SyncSendChunks } else { WS2Pv2ConnectType::Sync(blockstamp) } } else { - WS2Pv2ConnectType::Classic + WS2Pv2ConnectType::OutgoingServer } } } @@ -135,7 +144,7 @@ pub fn generate_connect_message( challenge: Hash, peer_card: Option<PeerCardV11>, ) -> WS2Pv2ConnectMsg { - let chunkstamp = if let WS2Pv2ConnectType::AskChunk(chunkstamp) = connect_type { + let chunkstamp = if let WS2Pv2ConnectType::SyncAskChunk(chunkstamp) = connect_type { Some(chunkstamp) } else { None diff --git a/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs b/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs index 113dbce3ec136ea3c8db39f9cf7ac91d08cc7653..68e44fdf43c5d54ba3d333b81250231f81fe44c5 100644 --- a/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs +++ b/lib/modules/ws2p/ws2p-messages/v2/secret_flags.rs @@ -33,14 +33,15 @@ impl WS2Pv2SecretFlags { pub fn _low_flow_demand(&self) -> bool { self.0[0] | 0b1111_1110 == 255u8 } - /// Check flag MEMBER_PUBKEY - pub fn member_pubkey(&self) -> bool { - self.0[0] | 0b1111_1101 == 255u8 - } - /// Check flag MEMBER_PROOF - pub fn member_proof(&self) -> bool { - self.0[0] | 0b1111_1011 == 255u8 - } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +/// Member proof +pub struct MemberProof { + /// Member pubkey + pub pubkey: PubKey, + /// Proof that the sender node is a member (Signature of the challenge send by other node in their CONNECT message.) + pub sig: Sig, } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] @@ -48,17 +49,14 @@ impl WS2Pv2SecretFlags { pub struct WS2Pv2SecretFlagsMsg { /// Secret flags pub secret_flags: WS2Pv2SecretFlags, - /// - pub member_pubkey: Option<PubKey>, - /// Proof that the sender node is a member (Signature of the challenge send by other node in their CONNECT message.) - pub member_proof: Option<Sig>, + /// Member proof + pub member_proof: Option<MemberProof>, } impl Default for WS2Pv2SecretFlagsMsg { fn default() -> Self { WS2Pv2SecretFlagsMsg { secret_flags: WS2Pv2SecretFlags(vec![]), - member_pubkey: None, member_proof: None, } } @@ -75,9 +73,11 @@ mod tests { let keypair1 = keypair1(); let challenge = Hash::random(); let msg = WS2Pv2SecretFlagsMsg { - secret_flags: WS2Pv2SecretFlags(vec![6u8]), - member_pubkey: Some(PubKey::Ed25519(keypair1.public_key())), - member_proof: Some(Sig::Ed25519(keypair1.private_key().sign(&challenge.0))), + secret_flags: WS2Pv2SecretFlags(vec![]), + member_proof: Some(MemberProof { + pubkey: PubKey::Ed25519(keypair1.public_key()), + sig: Sig::Ed25519(keypair1.private_key().sign(&challenge.0)), + }), }; test_ws2p_message(WS2Pv2MessagePayload::SecretFlags(msg)); } diff --git a/lib/modules/ws2p/ws2p-protocol/Cargo.toml b/lib/modules/ws2p/ws2p-protocol/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..abd5bd695992c4cb5e87ab31632eae139b2cb82e --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "durs-ws2p-protocol" +version = "0.1.0-a0.1" +authors = ["librelois <elois@ifee.fr>"] +description = "WebSocketToPeer V2+ API Protocol." +license = "AGPL-3.0" +edition = "2018" + +[lib] +path = "src/lib.rs" + +[dependencies] +dup-crypto = { path = "../../../tools/crypto" } +durs-common-tools = { path = "../../../tools/common-tools" } +dubp-documents= { path = "../../../tools/documents" } +durs-module = { path = "../../../core/module" } +durs-network-documents = { path = "../../../tools/network-documents" } +durs-ws2p-messages = { path = "../ws2p-messages" } +failure = "0.1.5" +log = "0.4.*" +serde = "1.0.*" +serde_derive = "1.0.*" +unwrap = "1.2.1" + +[dev-dependencies] +pretty_assertions = "0.5.1" + +[features] \ No newline at end of file diff --git a/lib/modules/ws2p/ws2p-protocol/src/connection_state.rs b/lib/modules/ws2p/ws2p-protocol/src/connection_state.rs new file mode 100644 index 0000000000000000000000000000000000000000..8f73ee2e606d0a61306018ae20745e223e945ce8 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/connection_state.rs @@ -0,0 +1,53 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub module that define WS2P connection state. + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +/// WS2P connection state +pub enum WS2PConnectionState { + /// Never try to establish this connection + NeverTry, + /// Try to open websocket + TryToOpenWS, + /// Websocket error + WSError, + /// Try to send connect message + TryToSendConnectMsg, + /// Endpoint unreachable + Unreachable, + /// Waiting connect message + WaitingConnectMsg, + /// No response + NoResponse, + /// Negociation timeout + NegociationTimeout, + /// Receive valid connect message + ConnectMessOk, + /// Receive valid OK message but wait ACK message + OkMsgOkWaitingAckMsg, + /// Receive valid ACK message + AckMsgOk, + /// Receive valid SECRET_FLAGS message but wait ACK message + SecretFlagsOkWaitingAckMsg, + /// Receive valid SECRET_FLAGS message + SecretFlagsOk, + /// Connection denial (maybe due to many different reasons : receive wrong message, wrong format, wrong signature, etc) + Denial, + /// Connection closed + Close, + /// Connection successfully established + Established, +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/constants.rs b/lib/modules/ws2p/ws2p-protocol/src/constants.rs new file mode 100644 index 0000000000000000000000000000000000000000..ce200fea0c0b682a72b5f16f5e736b26d20ba440 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/constants.rs @@ -0,0 +1,34 @@ +// Copyright (C) 2018 The Durs Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! WS2P V2+ Protocol constants + +/// Connection negociation timeout +pub static WS2P_NEGOTIATION_TIMEOUT_IN_SECS: &'static u64 = &15; + +/// Conection expiration timeout +pub static WS2P_EXPIRE_TIMEOUT_IN_SECS: &'static u64 = &120; + +/// Interval between 2 messages from which it''s perhaps a spam (in milliseconds) +pub static WS2P_SPAM_INTERVAL_IN_MILLI_SECS: &'static u64 = &80; + +/// Number of consecutive closed messages from which messages will be considered as spam. +pub static WS2P_SPAM_LIMIT: &'static usize = &6; + +/// Rest time in a situation of proven spam +pub static WS2P_SPAM_SLEEP_TIME_IN_SEC: &'static u64 = &100; + +/// Number of invalid messages tolerated +pub static WS2P_INVALID_MSGS_LIMIT: &'static usize = &5; diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller.rs b/lib/modules/ws2p/ws2p-protocol/src/controller.rs new file mode 100644 index 0000000000000000000000000000000000000000..82befe1ea99dcbd261761003e58762b713aa4733 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller.rs @@ -0,0 +1,255 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! WebSocketToPeer V2+ API Protocol. +//! Controller manage one WS2P connection. + +pub mod meta_datas; +mod on_message; +mod on_open; + +use self::meta_datas::WS2PControllerMetaDatas; +use crate::connection_state::WS2PConnectionState; +use crate::constants; +use crate::orchestrator::OrchestratorMsg; +use crate::websocket::{WebsocketAction, WebsocketIncomingEvent}; +use durs_module::ModuleMessage; +use durs_network_documents::NodeFullId; +use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; +use durs_ws2p_messages::WS2PMessage; +use failure::Fail; +use std::sync::mpsc::{Receiver, SendError, Sender}; +use std::time::SystemTime; +use unwrap::unwrap; + +#[derive(Copy, Clone, Debug, Hash)] +/// WS2P Controller unique identitier +pub enum WS2PControllerId { + /// Client controller + Client { + /// Expected remote node full id + expected_remote_full_id: Option<NodeFullId>, + }, + /// Server Incoming controller + Incoming, + /// Server outgoing controller + Outgoing { + /// Expected remote node full id + expected_remote_full_id: Option<NodeFullId>, + }, +} + +impl WS2PControllerId { + /// Get expected remote node full id + pub fn expected_remote_full_id(&self) -> Option<NodeFullId> { + match self { + WS2PControllerId::Client { + expected_remote_full_id, + } + | WS2PControllerId::Outgoing { + expected_remote_full_id, + } => *expected_remote_full_id, + WS2PControllerId::Incoming => None, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// Event transmitted to the orchestrator +pub enum WS2PControllerEvent { + /// New connection established + NewConnEstablished { + /// Connection type + conn_type: WS2Pv2ConnectType, + /// Remote node full id + remote_full_id: NodeFullId, + }, + /// Connection state change + StateChange { + /// New connection state + new_state: WS2PConnectionState, + }, + /// The controller only reports a message if it cannot process it entirely on its own. + /// For example, connection negotiation messages are not sent back. + RecvValidMsg { + /// WS2P Message + ws2p_msg: WS2PMessage, + }, +} + +#[derive(Debug)] +/// WS2P Controller +pub struct WS2PController<M: ModuleMessage> { + /// Controller id + pub id: WS2PControllerId, + /// Orchestrator sender + pub orchestrator_sender: Sender<OrchestratorMsg<M>>, + /// Controller meta datas + pub meta_datas: WS2PControllerMetaDatas, + /// Controller receiver + pub receiver: Receiver<WebsocketActionOrder>, +} + +#[derive(Copy, Clone, Debug, Fail)] +/// WS2P Controller process error +pub enum WS2PControllerProcessError { + /// Orchestrator unreacheable + #[fail(display = "WS2P Orchestrator unreachable")] + OrchestratorUnreacheable, +} + +/// Websocket action order +#[derive(Clone, Debug)] +pub struct WebsocketActionOrder { + /// Websocket actio, + pub ws_action: WebsocketAction, + /// New state if action success + pub new_state_if_success: Option<WS2PConnectionState>, + /// New state if action fail + pub new_state_if_fail: WS2PConnectionState, +} + +impl WebsocketActionOrder { + /// Close connection + #[inline] + pub fn close() -> Self { + WebsocketActionOrder::close_with_reason(None) + } + /// Close connection with reason + #[inline] + pub fn close_with_reason(reason: Option<String>) -> Self { + WebsocketActionOrder { + ws_action: WebsocketAction::CloseConnection { reason }, + new_state_if_success: Some(WS2PConnectionState::Close), + new_state_if_fail: WS2PConnectionState::Unreachable, + } + } +} + +impl<M: ModuleMessage> WS2PController<M> { + /// Check timeouts + pub fn check_timeouts(&mut self) -> Option<WebsocketActionOrder> { + let now = SystemTime::now(); + + if self.meta_datas.state == WS2PConnectionState::Established { + if unwrap!(now.duration_since(self.meta_datas.last_mess_time)).as_secs() + > *constants::WS2P_EXPIRE_TIMEOUT_IN_SECS + { + Some(WebsocketActionOrder { + ws_action: WebsocketAction::CloseConnection { + reason: Some("Closing due to inactivity.".to_owned()), + }, + new_state_if_success: Some(WS2PConnectionState::Close), + new_state_if_fail: WS2PConnectionState::Unreachable, + }) + } else { + None + } + } else if unwrap!(now.duration_since(self.meta_datas.creation_time)).as_secs() + > *constants::WS2P_NEGOTIATION_TIMEOUT_IN_SECS + { + Some(WebsocketActionOrder { + ws_action: WebsocketAction::CloseConnection { + reason: Some("Negociation timeout.".to_owned()), + }, + new_state_if_success: Some(WS2PConnectionState::Close), + new_state_if_fail: WS2PConnectionState::Unreachable, + }) + } else { + None + } + } + + /// Try to instanciate new controller + pub fn try_new( + id: WS2PControllerId, + meta_datas: WS2PControllerMetaDatas, + orchestrator_sender: Sender<OrchestratorMsg<M>>, + ) -> Result<WS2PController<M>, SendError<OrchestratorMsg<M>>> { + let (sender, receiver) = std::sync::mpsc::channel(); + + orchestrator_sender.send(OrchestratorMsg::ControllerSender(sender))?; + + Ok(WS2PController { + id, + meta_datas, + orchestrator_sender, + receiver, + }) + } + + /// Get orchestrator sender + pub fn get_pending_ws_actions(&self) -> Vec<WebsocketActionOrder> { + let mut ws_actions = Vec::new(); + + while let Ok(ws_action) = self.receiver.recv() { + ws_actions.push(ws_action); + } + + ws_actions + } + + /// Process a websocket incoming event + pub fn process( + &mut self, + event: WebsocketIncomingEvent, + ) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + match event { + WebsocketIncomingEvent::OnOpen { remote_addr } => on_open::process(self, remote_addr), + WebsocketIncomingEvent::OnMessage { msg } => on_message::process(self, msg), + WebsocketIncomingEvent::OnClose { close_code, reason } => { + let remote_str = if let Some(remote_node) = &self.meta_datas.remote_node { + remote_node.remote_full_id.to_string() + } else { + "unknow".to_owned() + }; + log::warn!( + "Connection with remote '{}' closed (close_code={}, reason={}).", + remote_str, + close_code, + reason.unwrap_or_else(|| "".to_owned()) + ); + self.update_conn_state(WS2PConnectionState::Close)?; + Ok(None) + } + } + } + + fn send_event(&mut self, event: WS2PControllerEvent) -> Result<(), WS2PControllerProcessError> { + if self + .orchestrator_sender + .send(OrchestratorMsg::ControllerEvent { + controller_id: self.id, + event, + }) + .is_err() + && self.meta_datas.state != WS2PConnectionState::Close + { + Err(WS2PControllerProcessError::OrchestratorUnreacheable) + } else { + Ok(()) + } + } + + #[inline] + /// Update connection state + pub fn update_conn_state( + &mut self, + new_state: WS2PConnectionState, + ) -> Result<(), WS2PControllerProcessError> { + self.meta_datas.state = new_state; + self.send_event(WS2PControllerEvent::StateChange { new_state }) + } +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/meta_datas.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/meta_datas.rs new file mode 100644 index 0000000000000000000000000000000000000000..3550f756f80794de7447fc3dc7e27170551a01a7 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/meta_datas.rs @@ -0,0 +1,96 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub module define WS2P controller meta datas + +use crate::connection_state::WS2PConnectionState; +use crate::MySelfWs2pNode; +use dubp_documents::{Blockstamp, CurrencyName}; +use dup_crypto::hashs::Hash; +use durs_network_documents::network_peer::PeerCardV11; +use durs_network_documents::NodeFullId; +use durs_ws2p_messages::v2::api_features::WS2PFeatures; +use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; +use std::time::SystemTime; + +#[derive(Debug, Clone)] +/// WS2p Connection meta datas +pub struct WS2PControllerMetaDatas { + /// Local challenge + pub challenge: Hash, + /// connect type + pub connect_type: WS2Pv2ConnectType, + /// Count invalid messages + pub count_invalid_msgs: usize, + /// Currency name + pub currency: CurrencyName, + /// Controller creation time + pub creation_time: SystemTime, + /// Connection features + pub features: Option<WS2PFeatures>, + /// Timestamp of last received message + pub last_mess_time: SystemTime, + /// Local node properties + pub local_node: MySelfWs2pNode, + /// Remote connect type + pub remote_connect_type: Option<WS2Pv2ConnectType>, + /// Remote node datas + pub remote_node: Option<Ws2pRemoteNodeDatas>, + /// Indicator required for the anti-spam mechanism + pub spam_interval: bool, + /// Indicator required for the anti-spam mechanism + pub spam_counter: usize, + /// Connection state + pub state: WS2PConnectionState, +} + +impl WS2PControllerMetaDatas { + /// Instanciate new WS2PControllerMetaDatas + pub fn new( + challenge: Hash, + connect_type: WS2Pv2ConnectType, + currency: CurrencyName, + local_node: MySelfWs2pNode, + ) -> Self { + WS2PControllerMetaDatas { + challenge, + connect_type, + count_invalid_msgs: 0, + currency, + creation_time: SystemTime::now(), + features: None, + last_mess_time: SystemTime::now(), + local_node, + remote_connect_type: None, + remote_node: None, + spam_interval: false, + spam_counter: 0, + state: WS2PConnectionState::TryToOpenWS, + } + } +} + +#[derive(Debug, Clone)] +/// WS2P remote node datas +pub struct Ws2pRemoteNodeDatas { + /// Remote challenge + pub challenge: Hash, + /// Remote current blockstamp + pub current_blockstamp: Option<Blockstamp>, + /// Remote peer card + pub peer_card: Option<PeerCardV11>, + /// Remote full id + pub remote_full_id: NodeFullId, +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs new file mode 100644 index 0000000000000000000000000000000000000000..b0e5fcda5a2efba72c1107356830c0a64dc7fcc2 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message.rs @@ -0,0 +1,157 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Controller process event ws message received + +mod ack_msg; +mod connect_msg; +mod ok_msg; +mod secret_flags; + +use super::{WS2PController, WS2PControllerProcessError, WebsocketActionOrder}; +use crate::connection_state::WS2PConnectionState; +use crate::constants; +use crate::controller::WS2PControllerEvent; +use crate::websocket::{WebsocketAction, WebsocketMessage}; +use durs_common_tools::fatal_error; +use durs_module::ModuleMessage; +use durs_network_documents::NodeFullId; +use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload; +use durs_ws2p_messages::WS2PMessage; +use log::error; +use std::ops::Deref; +use std::thread; +use std::time::{Duration, SystemTime}; + +pub fn process<M: ModuleMessage>( + controller: &mut WS2PController<M>, + msg: WebsocketMessage, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + // Update last_mess_time + controller.meta_datas.last_mess_time = SystemTime::now(); + + // Spam ? + if SystemTime::now() + .duration_since(controller.meta_datas.last_mess_time) + .unwrap() + > Duration::new(*constants::WS2P_SPAM_INTERVAL_IN_MILLI_SECS, 0) + { + if controller.meta_datas.spam_interval { + controller.meta_datas.spam_counter += 1; + } else { + controller.meta_datas.spam_interval = true; + controller.meta_datas.spam_counter = 2; + } + } else { + controller.meta_datas.spam_interval = false; + controller.meta_datas.spam_counter = 0; + } + // Spam ? + if controller.meta_datas.spam_counter >= *constants::WS2P_SPAM_LIMIT { + thread::sleep(Duration::from_millis( + *constants::WS2P_SPAM_SLEEP_TIME_IN_SEC, + )); + controller.meta_datas.last_mess_time = SystemTime::now(); + return Ok(None); + } + + if let WebsocketMessage::Bin(bin_msg) = msg { + log::debug!("Receive new bin message there is not a spam !"); + match WS2PMessage::parse_and_check_bin_message(&bin_msg) { + Ok(valid_msg) => match valid_msg { + WS2PMessage::V2(ref msg_v2) => { + match msg_v2.payload { + WS2Pv2MessagePayload::Connect(ref box_connect_msg) => { + let connect_msg = box_connect_msg.deref(); + // Get remote node id + let remote_full_id = + NodeFullId(msg_v2.issuer_node_id, msg_v2.issuer_pubkey); + // Process connect message + connect_msg::process_ws2p_v2p_connect_msg( + controller, + remote_full_id, + connect_msg, + ) + } + WS2Pv2MessagePayload::Ack { + challenge: ack_msg_challenge, + } => { + // Process ack message + ack_msg::process_ws2p_v2p_ack_msg(controller, ack_msg_challenge) + } + WS2Pv2MessagePayload::SecretFlags(ref secret_flags) => { + secret_flags::process_ws2p_v2p_secret_flags_msg( + controller, + secret_flags, + ) + } + WS2Pv2MessagePayload::Ok(_) => { + // Process ok message + ok_msg::process_ws2p_v2p_ok_msg(controller) + } + WS2Pv2MessagePayload::Ko(_) => Ok(close_with_reason( + "Receive Ko message !", + WS2PConnectionState::Denial, + )), + _ => { + if let WS2PConnectionState::Established = controller.meta_datas.state { + controller + .send_event(WS2PControllerEvent::RecvValidMsg { + ws2p_msg: valid_msg, + }) + .map(|_| None) + } else { + Ok(close_with_reason( + "Receive datas message on negociation !", + WS2PConnectionState::Denial, + )) + } + } + } + } + WS2PMessage::_V0 | WS2PMessage::_V1 => { + fatal_error!("Dev error: must not use WS2PMessage version < 2 in WS2Pv2+ !") + } + }, + Err(ws2p_msg_err) => { + log::warn!("Message is invalid : {:?}", ws2p_msg_err); + controller.meta_datas.count_invalid_msgs += 1; + if controller.meta_datas.count_invalid_msgs >= *constants::WS2P_INVALID_MSGS_LIMIT { + Ok(close_with_reason( + "Receive several invalid messages !", + WS2PConnectionState::Denial, + )) + } else { + Ok(None) + } + } + } + } else { + Ok(close_with_reason( + "Receive str message !", + WS2PConnectionState::Denial, + )) + } +} + +fn close_with_reason(reason: &str, new_state: WS2PConnectionState) -> Option<WebsocketActionOrder> { + Some(WebsocketActionOrder { + ws_action: WebsocketAction::CloseConnection { + reason: Some(reason.to_owned()), + }, + new_state_if_success: Some(new_state), + new_state_if_fail: new_state, + }) +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs new file mode 100644 index 0000000000000000000000000000000000000000..cc829e0032a8e05e8170bd33befa827d27a67921 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ack_msg.rs @@ -0,0 +1,98 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module process reception of ACK message + +use crate::connection_state::WS2PConnectionState; +use crate::controller::{WS2PController, WS2PControllerProcessError, WebsocketActionOrder}; +use crate::websocket::{WebsocketAction, WebsocketMessage}; +use dup_crypto::hashs::Hash; +use durs_common_tools::fatal_error; +use durs_module::ModuleMessage; +use durs_ws2p_messages::v2::ok::WS2Pv2OkMsg; +use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload; +use durs_ws2p_messages::v2::WS2Pv2Message; +use log::error; + +/// Process WS2P v2+ ACK Message +pub fn process_ws2p_v2p_ack_msg<M: ModuleMessage>( + controller: &mut WS2PController<M>, // controller contains original challenge + ack_msg_challenge: Hash, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + log::debug!("Receive ACK message !"); + + match controller.meta_datas.state { + WS2PConnectionState::OkMsgOkWaitingAckMsg => { + // already sent ack message and received ok response + process( + controller, + ack_msg_challenge, + WS2PConnectionState::Established, + ) + } + WS2PConnectionState::ConnectMessOk => { + // ack message not yet sent + process(controller, ack_msg_challenge, WS2PConnectionState::AckMsgOk) + } + _ => Ok(super::close_with_reason( + "Unexpected ACK message !", + WS2PConnectionState::Denial, + )), + } +} + +#[inline] +// process and apply given status in case of success +fn process<M: ModuleMessage>( + controller: &mut WS2PController<M>, + ack_msg_challenge: Hash, + success_status: WS2PConnectionState, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + if controller.meta_datas.challenge != ack_msg_challenge { + controller + .update_conn_state(WS2PConnectionState::Denial) + .map(|_| None) + } else { + Ok(Some(send_ok_msg(controller, success_status))) + } +} + +// send ok message +fn send_ok_msg<M: ModuleMessage>( + controller: &mut WS2PController<M>, + success_status: WS2PConnectionState, +) -> WebsocketActionOrder { + // generate empty Ok message + let ok_msg = WS2Pv2OkMsg::default(); + + // Encapsulate and binarize OK message + if let Ok((_, bin_ok_msg)) = WS2Pv2Message::encapsulate_payload( + controller.meta_datas.currency.clone(), + controller.meta_datas.local_node.my_node_id, + controller.meta_datas.local_node.my_key_pair, + WS2Pv2MessagePayload::Ok(ok_msg), + ) { + // Order the sending of a OK message + WebsocketActionOrder { + ws_action: WebsocketAction::SendMessage { + msg: WebsocketMessage::Bin(bin_ok_msg), + }, + new_state_if_success: Some(success_status), + new_state_if_fail: WS2PConnectionState::Unreachable, + } + } else { + fatal_error!("Dev error: Fail to sign own ok message !"); + } +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs new file mode 100644 index 0000000000000000000000000000000000000000..857f625621dad2795983ff8f3bf99676b69934d4 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/connect_msg.rs @@ -0,0 +1,120 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module process reception of CONNECT message + +use crate::connection_state::WS2PConnectionState; +use crate::controller::meta_datas::Ws2pRemoteNodeDatas; +use crate::controller::{WS2PController, WS2PControllerProcessError, WebsocketActionOrder}; +use crate::websocket::{WebsocketAction, WebsocketMessage}; +use durs_common_tools::fatal_error; +use durs_module::ModuleMessage; +use durs_network_documents::NodeFullId; +use durs_ws2p_messages::v2::connect::{WS2Pv2ConnectMsg, WS2Pv2ConnectType}; +use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload; +use durs_ws2p_messages::v2::WS2Pv2Message; +use log::error; +use unwrap::unwrap; + +/// Process WS2P v2+ CONNECT Message +pub fn process_ws2p_v2p_connect_msg<M: ModuleMessage>( + controller: &mut WS2PController<M>, + remote_full_id: NodeFullId, + connect_msg: &WS2Pv2ConnectMsg, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + log::debug!("Receive CONNECT message !"); + + // Get remote node datas + let remote_challenge = connect_msg.challenge; + let remote_node_datas = Ws2pRemoteNodeDatas { + challenge: connect_msg.challenge, + current_blockstamp: None, + peer_card: None, + remote_full_id, + }; + + if let WS2PConnectionState::WaitingConnectMsg = controller.meta_datas.state { + log::info!( + "TMP DEBUG: my_connect_type={:?}", + controller.meta_datas.connect_type + ); + // Check remote node datas + if let WS2Pv2ConnectType::Incoming = controller.meta_datas.connect_type { + controller.meta_datas.remote_node = Some(remote_node_datas); + // Get remote_connect_type + controller.meta_datas.remote_connect_type = Some(WS2Pv2ConnectType::from_flags( + &connect_msg.flags_queries, + connect_msg.chunkstamp, + )); + } else { + let expected_full_id = unwrap!(controller.id.expected_remote_full_id()); + log::info!("TMP DEBUG: remote_full_id={}", remote_full_id); + log::info!("TMP DEBUG: expected_full_id={}", expected_full_id); + if remote_full_id == expected_full_id { + controller.meta_datas.remote_node = Some(remote_node_datas); + } else { + return Ok(super::close_with_reason( + "Unexpected PUBKEY or NODE_ID !", + WS2PConnectionState::Denial, + )); + } + // Flags not allowed from incoming node + if !connect_msg.flags_queries.is_empty() { + super::close_with_reason( + "Unexpected CONNECT FLAGS from incoming node. !", + WS2PConnectionState::Denial, + ); + } + // Get remote_connect_type + controller.meta_datas.remote_connect_type = Some(WS2Pv2ConnectType::Incoming); + } + } else { + super::close_with_reason("Unexpected CONNECT message !", WS2PConnectionState::Denial); + } + + // Check features compatibility + match controller + .meta_datas + .local_node + .my_features + .check_features_compatibility(&connect_msg.api_features) + { + Ok(merged_features) => controller.meta_datas.features = Some(merged_features), + Err(_) => { + super::close_with_reason("Unsupported features !", WS2PConnectionState::Denial); + } + } + + // Encapsulate and binarize ACK message + if let Ok((_, bin_ack_msg)) = WS2Pv2Message::encapsulate_payload( + controller.meta_datas.currency.clone(), + controller.meta_datas.local_node.my_node_id, + controller.meta_datas.local_node.my_key_pair, + WS2Pv2MessagePayload::Ack { + challenge: remote_challenge, + }, + ) { + // Order the sending of a OK message + Ok(Some(WebsocketActionOrder { + ws_action: WebsocketAction::SendMessage { + msg: WebsocketMessage::Bin(bin_ack_msg), + }, + new_state_if_success: Some(WS2PConnectionState::ConnectMessOk), + new_state_if_fail: WS2PConnectionState::Unreachable, + })) + } else { + fatal_error!("Dev error: Fail to sign own ack message !") + } +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs new file mode 100644 index 0000000000000000000000000000000000000000..37985be741bf129682074ba938db249aa99364fe --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/ok_msg.rs @@ -0,0 +1,60 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module process reception of OK message + +use crate::connection_state::WS2PConnectionState; +use crate::controller::{ + WS2PController, WS2PControllerEvent, WS2PControllerProcessError, WebsocketActionOrder, +}; +use durs_common_tools::fatal_error; +use durs_module::ModuleMessage; +use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; +use log::error; +use unwrap::unwrap; + +/// Process WS2P v2+ OK Message +pub fn process_ws2p_v2p_ok_msg<M: ModuleMessage>( + controller: &mut WS2PController<M>, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + log::debug!("Receive OK message !"); + + match controller.meta_datas.state { + WS2PConnectionState::ConnectMessOk | WS2PConnectionState::SecretFlagsOkWaitingAckMsg => { + controller.update_conn_state(WS2PConnectionState::OkMsgOkWaitingAckMsg)?; + Ok(None) + } + WS2PConnectionState::AckMsgOk | WS2PConnectionState::SecretFlagsOk => { + controller.meta_datas.state = WS2PConnectionState::Established; + controller.send_event(WS2PControllerEvent::NewConnEstablished { + conn_type: if controller.meta_datas.connect_type != WS2Pv2ConnectType::Incoming { + controller.meta_datas.connect_type + } else { + unwrap!(controller.meta_datas.remote_connect_type) + }, + remote_full_id: if let Some(ref remote_node) = controller.meta_datas.remote_node { + remote_node.remote_full_id + } else { + fatal_error!("remote_node must be valued in process_ws2p_v2p_ok_msg() !") + }, + })?; + Ok(None) + } + _ => Ok(super::close_with_reason( + "Unexpected OK message !", + WS2PConnectionState::Denial, + )), + } +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs new file mode 100644 index 0000000000000000000000000000000000000000..331e71d793616094598eeb81da354d45b8cf01d7 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_message/secret_flags.rs @@ -0,0 +1,59 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Sub-module process reception of SECRET_FLAGS message + +use crate::connection_state::WS2PConnectionState; +use crate::controller::{WS2PController, WS2PControllerProcessError, WebsocketActionOrder}; +//use durs_common_tools::fatal_error; +use durs_module::ModuleMessage; +//use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; +use durs_ws2p_messages::v2::secret_flags::WS2Pv2SecretFlagsMsg; +//use log::error; +//use unwrap::unwrap; + +pub fn process_ws2p_v2p_secret_flags_msg<M: ModuleMessage>( + controller: &mut WS2PController<M>, + secret_flags: &WS2Pv2SecretFlagsMsg, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + // SECRET_FLAGS informations must never be logged in prod + #[cfg(test)] + log::debug!("Receive SECRET_FLAGS message !"); + + match controller.meta_datas.state { + WS2PConnectionState::ConnectMessOk => process( + controller, + secret_flags, + WS2PConnectionState::SecretFlagsOkWaitingAckMsg, + ) + .map(|_| None), + WS2PConnectionState::AckMsgOk => { + process(controller, secret_flags, WS2PConnectionState::SecretFlagsOk).map(|_| None) + } + _ => Ok(super::close_with_reason( + "Unexpected SECRET_FLAGS message !", + WS2PConnectionState::Denial, + )), + } +} + +fn process<M: ModuleMessage>( + controller: &mut WS2PController<M>, + _secret_flags: &WS2Pv2SecretFlagsMsg, + success_state: WS2PConnectionState, +) -> Result<(), WS2PControllerProcessError> { + // TODO .. traitement des secrets flags + controller.update_conn_state(success_state) +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs b/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs new file mode 100644 index 0000000000000000000000000000000000000000..c7991ed604bce1597fba27768e35d6e3946e2b8f --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/controller/on_open.rs @@ -0,0 +1,72 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Controller process event ws connection opened + +use super::{ + WS2PConnectionState, WS2PController, WS2PControllerProcessError, WebsocketActionOrder, +}; +use crate::websocket::{WebsocketAction, WebsocketMessage}; +use durs_common_tools::fatal_error; +use durs_module::ModuleMessage; +use durs_ws2p_messages::v2::connect::generate_connect_message; +use durs_ws2p_messages::v2::payload_container::WS2Pv2MessagePayload; +use durs_ws2p_messages::v2::WS2Pv2Message; +use log::error; +use std::net::SocketAddr; + +pub fn process<M: ModuleMessage>( + controller: &mut WS2PController<M>, + remote_addr_opt: Option<SocketAddr>, +) -> Result<Option<WebsocketActionOrder>, WS2PControllerProcessError> { + log::debug!("open websocket from {}", print_opt_addr(remote_addr_opt)); + + // Update connection state + controller.update_conn_state(WS2PConnectionState::TryToSendConnectMsg)?; + + // Generate connect message + let connect_msg = generate_connect_message( + controller.meta_datas.connect_type, + controller.meta_datas.local_node.my_features.clone(), + controller.meta_datas.challenge, + None, + ); + + // Encapsulate and binarize connect message + if let Ok((_ws2p_full_msg, bin_connect_msg)) = WS2Pv2Message::encapsulate_payload( + controller.meta_datas.currency.clone(), + controller.meta_datas.local_node.my_node_id, + controller.meta_datas.local_node.my_key_pair, + WS2Pv2MessagePayload::Connect(Box::new(connect_msg)), + ) { + // Order the sending of a CONNECT message + Ok(Some(WebsocketActionOrder { + ws_action: WebsocketAction::SendMessage { + msg: WebsocketMessage::Bin(bin_connect_msg), + }, + new_state_if_success: Some(WS2PConnectionState::WaitingConnectMsg), + new_state_if_fail: WS2PConnectionState::Unreachable, + })) + } else { + fatal_error!("Dev error: Fail to sign own connect message !") + } +} + +fn print_opt_addr(addr: Option<SocketAddr>) -> String { + match addr { + Some(addr) => format!("{}", addr), + None => String::from(""), + } +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/lib.rs b/lib/modules/ws2p/ws2p-protocol/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..9ce88c506e89bf0d8b68a3e57c1bc5ec35c34041 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/lib.rs @@ -0,0 +1,57 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! WebSocketToPeer V2+ API Protocol. + +#![allow(clippy::large_enum_variant)] +#![deny( + missing_docs, + missing_debug_implementations, + missing_copy_implementations, + trivial_casts, + trivial_numeric_casts, + unsafe_code, + unstable_features, + unused_import_braces, + unused_qualifications +)] + +pub mod connection_state; +pub mod constants; +pub mod controller; +pub mod orchestrator; +pub mod websocket; + +use dup_crypto::keys::{KeyPair, KeyPairEnum}; +use durs_network_documents::{NodeFullId, NodeId}; +use durs_ws2p_messages::v2::api_features::WS2PFeatures; + +/// Store self WS2P properties +#[derive(Debug, Clone, PartialEq)] +pub struct MySelfWs2pNode { + /// Local node id + pub my_node_id: NodeId, + /// Local network keypair + pub my_key_pair: KeyPairEnum, + /// Local node WWS2PFeatures + pub my_features: WS2PFeatures, +} + +impl MySelfWs2pNode { + /// Get self node full id + pub fn get_full_id(&self) -> NodeFullId { + NodeFullId(self.my_node_id, self.my_key_pair.public_key()) + } +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs b/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs new file mode 100644 index 0000000000000000000000000000000000000000..177a9da802ea0355e1b9661e1bd391de5438460b --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/orchestrator.rs @@ -0,0 +1,38 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! WebSocketToPeer V2+ API Protocol. +//! Orchestrator manage WS2P Node. + +use std::sync::mpsc::Sender; + +use crate::controller::{WS2PControllerEvent, WS2PControllerId, WebsocketActionOrder}; +use durs_module::ModuleMessage; + +/// Orchestrator message +#[derive(Debug)] +pub enum OrchestratorMsg<M: ModuleMessage> { + /// Controller sender + ControllerSender(Sender<WebsocketActionOrder>), + /// Controller event + ControllerEvent { + /// Controller unique identifier + controller_id: WS2PControllerId, + /// Controller event + event: WS2PControllerEvent, + }, + /// Module message + ModuleMessage(M), +} diff --git a/lib/modules/ws2p/ws2p-protocol/src/websocket.rs b/lib/modules/ws2p/ws2p-protocol/src/websocket.rs new file mode 100644 index 0000000000000000000000000000000000000000..e9b641e4e3d7ab46d880e42a34424d8670531eb2 --- /dev/null +++ b/lib/modules/ws2p/ws2p-protocol/src/websocket.rs @@ -0,0 +1,70 @@ +// Copyright (C) 2018 The Duniter Project Developers. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! WebSocketToPeer V2+ API Protocol. +//! Define websocket message, event and action + +use std::net::SocketAddr; + +#[derive(Clone, Debug)] +/// Websocket message +pub enum WebsocketMessage { + /// Bnary message + Bin(Vec<u8>), + /// String message + Str(String), +} + +#[derive(Clone, Debug)] +/// Websocket incoming event +pub enum WebsocketIncomingEvent { + /// Connection opening + OnOpen { + /// Remote addr + remote_addr: Option<SocketAddr>, + }, + /// Receive message + OnMessage { + /// Message content + msg: WebsocketMessage, + }, + /// Connection closed + OnClose { + /// Close code + close_code: u16, + /// Close reason + reason: Option<String>, + }, +} + +#[derive(Clone, Debug)] +/// Websocket action +pub enum WebsocketAction { + /// Connect to websocket url + ConnectTo { + /// Websocket url + url: String, + }, + /// Send message in websocket + SendMessage { + /// message content + msg: WebsocketMessage, + }, + /// Close connection + CloseConnection { + /// Give a reason for the remote + reason: Option<String>, + }, +} diff --git a/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs b/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs deleted file mode 100644 index e1ce00566a5901d78f163de0b07449bacad1619c..0000000000000000000000000000000000000000 --- a/lib/modules/ws2p/ws2p/src/controllers/handler/connect_msg.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright (C) 2018 The Durs Project Developers. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -//! Process WS2P CONNECT message. - -use crate::controllers::handler::*; -use crate::controllers::*; -use durs_common_tools::fatal_error; -use durs_network_documents::NodeFullId; -use durs_ws2p_messages::v2::connect::WS2Pv2ConnectMsg; -use ws::CloseCode; -//use services::Ws2pServiceSender; -//use std::sync::mpsc; - -/// Process WS2pv2 CONNECT Message -pub fn process_ws2p_v2_connect_msg( - handler: &mut Ws2pConnectionHandler, - remote_full_id: NodeFullId, - connect_msg: &WS2Pv2ConnectMsg, -) { - debug!("Receive CONNECT message !"); - - // Get remote node datas - let remote_challenge = connect_msg.challenge; - let remote_node_datas = Ws2pRemoteNodeDatas { - challenge: connect_msg.challenge, - peer_card: None, - current_blockstamp: None, - }; - if let WS2PConnectionState::WaitingConnectMsg = handler.conn_datas.state { - // Check remote node datas - if let WS2Pv2ConnectType::Incoming = handler.conn_datas.connect_type { - handler.conn_datas.remote_full_id = Some(remote_full_id); - handler.conn_datas.remote_datas = Some(remote_node_datas); - // Get remote_connect_type - handler.conn_datas.remote_connect_type = Some(WS2Pv2ConnectType::from_flags( - &connect_msg.flags_queries, - connect_msg.chunkstamp, - )); - } else { - let expected_full_id = handler - .conn_datas - .remote_full_id - .expect("Outcoming connection must have expected remote node full id !"); - if remote_full_id == expected_full_id { - handler.conn_datas.remote_datas = Some(remote_node_datas); - } else { - let _ = handler - .ws - .0 - .close_with_reason(CloseCode::Invalid, "Unexpected PUBKEY or NODE_ID !"); - } - // Flags not allowed from incoming node - if !connect_msg.flags_queries.is_empty() { - let _ = handler.ws.0.close_with_reason( - CloseCode::Invalid, - "Unexpected CONNECT FLAGS from incoming node. !", - ); - } - // Get remote_connect_type - handler.conn_datas.remote_connect_type = Some(WS2Pv2ConnectType::Incoming); - } - } else { - let _ = handler - .ws - .0 - .close_with_reason(CloseCode::Invalid, "Unexpected CONNECT message !"); - } - // Check features compatibility - match handler - .local_node - .my_features - .check_features_compatibility(&connect_msg.api_features) - { - Ok(merged_features) => handler.conn_datas.features = Some(merged_features), - Err(_) => { - let _ = handler - .ws - .0 - .close_with_reason(CloseCode::Unsupported, "Unsupported features !"); - } - } - - // Update Status to ConnectMessOk - handler.conn_datas.state = WS2PConnectionState::ConnectMessOk; - handler.send_new_conn_state_to_service(); - - // Encapsulate and binarize ACK message - if let Ok((_, bin_ack_msg)) = WS2Pv2Message::encapsulate_payload( - handler.currency.clone(), - handler.local_node.my_node_id, - handler.local_node.my_key_pair, - WS2Pv2MessagePayload::Ack { - challenge: remote_challenge, - }, - ) { - // Send Ack Message - if let Ok(()) = handler.ws.0.send(Message::binary(bin_ack_msg)) { - // Update state - handler.conn_datas.state = WS2PConnectionState::ConnectMessOk; - } else { - handler.conn_datas.state = WS2PConnectionState::Unreachable; - let _ = handler - .ws - .0 - .close_with_reason(CloseCode::Error, "Fail to send Ack message !"); - } - } else { - fatal_error!("Dev error: Fail to sign own ack message !") - } -} diff --git a/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs b/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs deleted file mode 100644 index 81797f1c1014b4e63a118d6be3554aa3afcb760c..0000000000000000000000000000000000000000 --- a/lib/modules/ws2p/ws2p/src/controllers/incoming_connections/mod.rs +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (C) 2018 The Durs Project Developers. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -//! WS2P incoming connections controllers. - -use dubp_documents::CurrencyName; -//use durs_module::ModuleReqId; -use crate::controllers::handler::Ws2pConnectionHandler; -use crate::controllers::*; -use crate::services::*; -use ws::deflate::DeflateBuilder; -use ws::listen; -//use durs_network::*; -use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType; -use std::sync::mpsc; - -/// Listen on WSPv2 host:port -pub fn listen_on_ws2p_v2_endpoint( - currency: &CurrencyName, - service_sender: mpsc::Sender<Ws2pServiceSender>, - self_node: MySelfWs2pNode, - host: &str, - port: u16, -) -> ws::Result<()> { - // Get endpoint url - let ws_url = format!("{}:{}", host, port); - - // Log - info!("Listen on {} ...", ws_url); - - // Connect to websocket - listen(ws_url, move |ws| { - info!("Listen on host:port..."); - DeflateBuilder::new().build( - Ws2pConnectionHandler::try_new( - WsSender(ws), - service_sender.clone(), - currency.clone(), - self_node.clone(), - Ws2pConnectionDatas::new(WS2Pv2ConnectType::Incoming), - ) - .expect("WS2P Service unrechable"), - ) - }) -} - -#[cfg(test)] -mod tests { - use super::*; - use dup_crypto::keys::*; - use std::thread; - use std::time::Duration; - - pub fn _keypair1() -> ed25519::KeyPair { - ed25519::KeyPairFromSaltedPasswordGenerator::with_default_parameters().generate( - "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV".as_bytes(), - "JhxtHB7UcsDbA9wMSyMKXUzBZUQvqVyB32KwzS9SWoLkjrUhHV_".as_bytes(), - ) - } - - //#[test] - fn _listen_on_localhost() { - // create service channel - let (service_sender, _service_receiver): ( - mpsc::Sender<Ws2pServiceSender>, - mpsc::Receiver<Ws2pServiceSender>, - ) = mpsc::channel(); - - thread::spawn(move || { - let result = listen_on_ws2p_v2_endpoint( - &CurrencyName(String::from("default")), - service_sender, - MySelfWs2pNode { - my_node_id: NodeId(1), - my_key_pair: KeyPairEnum::Ed25519(_keypair1()), - my_features: WS2PFeatures(vec![5u8]), - }, - "localhost", - 10899, - ); - if let Err(e) = result { - panic!("Listen error: {}", e); - } - }); - - thread::sleep(Duration::from_secs(10)); - - // Force to print stdout - assert!(false); - } -}