Commit fe7f21d6 authored by Hugo Trentesaux's avatar Hugo Trentesaux

[feat] ws2p: send sync info and close connection

parent df6983e4
Pipeline #6119 failed with stages
in 15 minutes and 48 seconds
......@@ -74,9 +74,9 @@ pub enum OldNetworkRequestError {
#[derive(Debug, Copy, Clone)]
/// Enum of different possible requests to the network module
pub enum NetworkRequest {
/// Get given block
/// Get the block number x
GetBlock(BlockNumber),
/// Get a blocks chunk from specified node
/// Get a blocks chunk from specified block number
GetBlocks(BlockNumber, u32),
/// Get pending wot documents from specified node
GetRequirementsPending {
......
......@@ -16,7 +16,7 @@
// use dup_crypto::hashs::Hash;
use dubp_documents::BlockHash;
use dubp_documents::Blockstamp;
use durs_network_documents::network_peer::PeerCardV11;
use durs_network_documents::network_peer::PeerCard;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] // Copy
/// WS2Pv2SyncInfo
......@@ -28,5 +28,5 @@ pub struct WS2Pv2SyncInfo {
/// Hash of the last block of each chunk (i.e. blocks whose number equal -1 modulo chunk_size)
pub milestones: Vec<BlockHash>,
/// Peer Cards allows node to request chunks to other nodes
pub peer_cards: Vec<PeerCardV11>,
pub peer_cards: Vec<PeerCard>,
}
......@@ -34,7 +34,7 @@ use std::sync::mpsc::{Receiver, SendError, Sender};
use std::time::SystemTime;
use unwrap::unwrap;
#[derive(Copy, Clone, Debug, Hash)]
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
/// WS2P Controller unique identitier
pub enum WS2PControllerId {
/// Client controller
......
......@@ -22,37 +22,39 @@ use dubp_documents::Blockstamp;
use durs_common_tools::fatal_error;
use durs_message::events::*;
/// process event from other module
pub fn process(orchestrator: &mut WS2POrchestrator, event_content: DursEvent) {
match event_content {
DursEvent::BlockchainEvent(blockchain_event) => {
match *blockchain_event {
// new block has been added to local blockchain
BlockchainEvent::StackUpValidBlock(block) => {
match *block {
BlockDocument::V10(block) => {
// updates current blockstamp
orchestrator.current_blockstamp = Some(Blockstamp {
id: block.number,
hash: block.hash.unwrap(),
});
// updates current milestones if necessary
if block.number.0 % CHUNK_SIZE == CHUNK_SIZE - 1 {
// TODO check that milestones correspond to what is expected
if let Some(milestones) = &mut orchestrator.milestones {
milestones.push(block.hash.unwrap());
} else {
fatal_error!(
"can not stack block while milestones are not set"
);
impl WS2POrchestrator {
/// process event from other module
pub fn process_event(self: &mut Self, event_content: DursEvent) {
match event_content {
DursEvent::BlockchainEvent(blockchain_event) => {
match *blockchain_event {
// new block has been added to local blockchain
BlockchainEvent::StackUpValidBlock(block) => {
match *block {
BlockDocument::V10(block) => {
// updates current blockstamp
self.current_blockstamp = Some(Blockstamp {
id: block.number,
hash: block.hash.unwrap(),
});
// updates current milestones if necessary
if block.number.0 % CHUNK_SIZE == CHUNK_SIZE - 1 {
// TODO check that milestones correspond to what is expected
if let Some(milestones) = &mut self.milestones {
milestones.push(block.hash.unwrap());
} else {
fatal_error!(
"can not stack block while milestones are not set"
);
}
}
}
}
}
_ => warn!("received unexpeced blockchain event"),
}
_ => warn!("received unexpeced blockchain event"),
}
_ => warn!("received unexpeced event"),
}
_ => warn!("received unexpeced event"),
}
}
......@@ -25,11 +25,11 @@ pub mod outgoing;
// use std::time::Duration;
// use std::time::SystemTime;
// use dup_crypto::hashs::Hash;
// use crate::responses;
// use crate::events;
// use crate::orchestrator;
use crate::controller::{WS2PControllerEvent, WS2PControllerId, WebsocketActionOrder};
use crate::events;
use crate::orchestrator;
use crate::orchestrator::outgoing::{ServiceMsg, WS2POutgoingOrchestrator};
use crate::responses;
use crate::websocket::WebsocketTrait;
use crate::MySelfWs2pNode;
use dubp_documents::BlockHash;
......@@ -40,7 +40,7 @@ use durs_message::DursMsg;
use durs_module::ModuleMessage;
use durs_module::*;
use durs_network_documents::network_endpoint::*;
use durs_network_documents::network_peer::PeerCardV11;
use durs_network_documents::network_peer::*;
use durs_network_documents::NodeFullId;
use maplit::hashset;
use std::collections::HashMap;
......@@ -72,19 +72,19 @@ pub enum OrchestratorMsg<M: ModuleMessage> {
#[derive(Debug)]
pub struct WS2POrchestrator {
/// module static name used for router requests
pub static_name: ModuleStaticName,
static_name: ModuleStaticName,
/// current blockstamp
pub current_blockstamp: Option<Blockstamp>,
/// milestones: hash of the last block of each chunk
pub milestones: Option<Vec<BlockHash>>,
/// List of known peers
pub peers: HashMap<NodeFullId, PeerCardV11>,
peers: HashMap<NodeFullId, PeerCard>,
/// the channel used to send message to router
pub router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
/// the channel used to send message to orchestrator
router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
/// the channel used to send message to orchestrator itself
pub sender: mpsc::Sender<OrchestratorMsg<DursMsg>>,
/// the channel used to receive message
pub receiver: mpsc::Receiver<OrchestratorMsg<DursMsg>>,
receiver: mpsc::Receiver<OrchestratorMsg<DursMsg>>,
/// channel used to give orders to outgoing orchestrator
pub outgoing_sender: mpsc::Sender<ServiceMsg>,
}
......@@ -105,7 +105,7 @@ impl WS2POrchestrator {
router_sender.clone(),
module_sender,
module_receiver,
static_name.clone(),
static_name,
api_name,
);
......@@ -131,7 +131,7 @@ impl WS2POrchestrator {
outgoing_sender: outgoing_orchestrator.sender.clone(),
};
return (outgoing_orchestrator, orchestrator);
(outgoing_orchestrator, orchestrator)
}
/// main loop of orchestrator
pub fn main_loop(&mut self) {
......@@ -154,7 +154,7 @@ impl WS2POrchestrator {
event,
..
} => {
orchestrator::event::process(self, event);
self.process_controller_event(event);
}
// messages from other modules wrapped into an orchestator message
OrchestratorMsg::ModuleMessage(mod_msg) => match mod_msg {
......@@ -163,7 +163,7 @@ impl WS2POrchestrator {
// event_type,
event_content,
..
} => { events::process(self, event_content); }
} => { self.process_event(event_content); }
DursMsg::Request {
// req_from,
// req_to,
......@@ -178,7 +178,7 @@ impl WS2POrchestrator {
res_content,
..
} => {
responses::process(self, res_content);
self.process_response(res_content);
},
DursMsg::Stop => {
break;
......@@ -272,5 +272,5 @@ fn start_proxy_thread(
}
});
return (proxy_sender_clone, proxy_receiver);
(proxy_sender_clone, proxy_receiver)
}
......@@ -20,55 +20,55 @@ use crate::controller::*;
use crate::orchestrator::outgoing::ServiceMsg;
use crate::orchestrator::WS2POrchestrator;
use durs_common_tools::fatal_error;
use durs_network_documents::network_peer::PeerCardV11;
use durs_network_documents::network_peer::*;
use durs_ws2p_messages::v2::connect::WS2Pv2ConnectType;
use durs_ws2p_messages::v2::sync_info::WS2Pv2SyncInfo;
/// process controllers events
pub fn process(orchestrator: &mut WS2POrchestrator, event: WS2PControllerEvent) {
match event {
WS2PControllerEvent::NewConnEstablished {
conn_type,
remote_full_id,
} => match conn_type {
WS2Pv2ConnectType::Sync { .. } => {
if let Some(target_blockstamp) = orchestrator.current_blockstamp {
if let Some(milestones) = &orchestrator.milestones {
if orchestrator
.outgoing_sender
.send(ServiceMsg::SendSyncInfo {
remote_full_id,
sync_info: WS2Pv2SyncInfo {
chunk_size: *CHUNK_SIZE,
target_blockstamp,
milestones: milestones.to_vec(),
peer_cards: orchestrator
.peers
.values()
.cloned()
.collect::<Vec<PeerCardV11>>(),
},
})
.is_ok()
{
debug!("sent sync info to outgoing orchestrator");
impl WS2POrchestrator {
/// process controllers events
pub fn process_controller_event(self: &mut Self, event: WS2PControllerEvent) {
match event {
WS2PControllerEvent::NewConnEstablished {
conn_type,
remote_full_id,
} => match conn_type {
WS2Pv2ConnectType::Sync { .. } => {
if let Some(target_blockstamp) = self.current_blockstamp {
if let Some(milestones) = &self.milestones {
let peer_cards =
self.peers.values().cloned().collect::<Vec<PeerCard>>();
if self
.outgoing_sender
.send(ServiceMsg::SendSyncInfo {
remote_full_id,
sync_info: WS2Pv2SyncInfo {
chunk_size: *CHUNK_SIZE,
target_blockstamp,
milestones: milestones.to_vec(),
peer_cards,
},
})
.is_ok()
{
debug!("sent sync info to outgoing orchestrator");
} else {
error!("can not join outgoing orchestrator");
}
} else {
error!("can not join outgoing orchestrator");
fatal_error!("can not send sync info if milestones are not known");
}
} else {
fatal_error!("can not send sync info if milestones are not known");
fatal_error!("can not send sync info if current blockstamp is not known");
}
} else {
fatal_error!("can not send sync info if current blockstamp is not known");
}
}
WS2Pv2ConnectType::Incoming => {}
WS2Pv2ConnectType::OutgoingClient => {}
WS2Pv2ConnectType::OutgoingServer => {}
WS2Pv2ConnectType::SyncAskChunk(_blockstamp) => {}
WS2Pv2ConnectType::SyncSendChunks => {}
},
WS2PControllerEvent::RecvValidMsg { .. } => {}
WS2PControllerEvent::StateChange { .. } => {}
WS2Pv2ConnectType::Incoming => {}
WS2Pv2ConnectType::OutgoingClient => {}
WS2Pv2ConnectType::OutgoingServer => {}
WS2Pv2ConnectType::SyncAskChunk(_blockstamp) => {}
WS2Pv2ConnectType::SyncSendChunks => {}
},
WS2PControllerEvent::RecvValidMsg { .. } => {}
WS2PControllerEvent::StateChange { .. } => {}
}
}
}
......@@ -21,9 +21,7 @@ use crate::constants::WS2P_DEFAULT_OUTCOMING_QUOTA;
use crate::controller::WS2PControllerId;
use crate::controller::WebsocketActionOrder;
use crate::orchestrator::OrchestratorMsg;
use crate::websocket::WebsocketAction;
use crate::websocket::WebsocketTrait;
use crate::websocket::WsError;
use crate::websocket::*;
use crate::MySelfWs2pNode;
use core::fmt::Debug;
use dubp_documents::Blockstamp;
......@@ -36,7 +34,9 @@ use durs_network::cli::sync::SyncOpt;
use durs_network_documents::network_endpoint::EndpointEnum;
use durs_network_documents::network_head::NetworkHead;
use durs_network_documents::{NodeFullId, NodeId};
use durs_ws2p_messages::v2::payload_container::*;
use durs_ws2p_messages::v2::sync_info::WS2Pv2SyncInfo;
use durs_ws2p_messages::v2::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::mpsc;
......@@ -67,7 +67,7 @@ pub enum ServiceMsg {
/// Data allowing the service to manage an outgoing connection
pub struct OutgoingConnection {
/// Endpoint
pub endpoint: EndpointEnum,
pub endpoint: Option<EndpointEnum>,
/// Controller channel
pub controller: mpsc::Sender<WebsocketActionOrder>,
}
......@@ -85,33 +85,36 @@ pub struct EndpointInError {
/// Outgoing connection management service
pub struct WS2POutgoingOrchestrator<WS: WebsocketTrait> {
/// websocket connector
pub websocket_connector: WS,
websocket_connector: WS,
/// module static name used for router requests
pub static_name: ModuleStaticName,
static_name: ModuleStaticName,
/// current blockstamp
pub current_blockstamp: Option<Blockstamp>,
current_blockstamp: Option<Blockstamp>,
/// Currency Name TODO migrate to self_node only
pub currency: CurrencyName,
currency: CurrencyName,
/// Local node datas
pub self_node: MySelfWs2pNode,
self_node: MySelfWs2pNode,
/// Outgoing connections quota
pub quota: usize,
quota: usize,
/// Index hashmap linking NodeFullId to WS2PControllerId
/// None key corresponds to the node whose id is not known : the synchronisation node
index: HashMap<Option<NodeFullId>, WS2PControllerId>,
/// List of known current blockstamps of other nodes
pub heads: HashMap<NodeFullId, NetworkHead>,
heads: HashMap<NodeFullId, NetworkHead>,
/// List of established connections
pub connections: HashMap<NodeFullId, OutgoingConnection>,
connections: HashMap<WS2PControllerId, OutgoingConnection>,
/// List of endpoinds whose last connection attempt failed
pub endpoints_in_error: HashMap<NodeFullId, EndpointInError>,
endpoints_in_error: HashMap<NodeFullId, EndpointInError>,
/// List of endpoints that have never been contacted
pub never_try_endpoints: Vec<EndpointEnum>,
never_try_endpoints: Vec<EndpointEnum>,
/// the channel used to send message to router
pub router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
/// Service sender
router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
/// Service sender (self sender)
pub sender: mpsc::Sender<ServiceMsg>,
/// Service receiver
pub receiver: mpsc::Receiver<ServiceMsg>,
receiver: mpsc::Receiver<ServiceMsg>,
/// Orchestrator sender used to send messages back to the main orchestrator
pub orchestrator_sender: mpsc::Sender<OrchestratorMsg<DursMsg>>,
orchestrator_sender: mpsc::Sender<OrchestratorMsg<DursMsg>>,
}
impl<WS: WebsocketTrait> WS2POutgoingOrchestrator<WS> {
......@@ -138,6 +141,7 @@ impl<WS: WebsocketTrait> WS2POutgoingOrchestrator<WS> {
current_blockstamp: None,
currency,
quota: *WS2P_DEFAULT_OUTCOMING_QUOTA,
index: HashMap::new(),
heads: HashMap::new(),
connections: HashMap::with_capacity(*WS2P_DEFAULT_OUTCOMING_QUOTA),
endpoints_in_error: HashMap::new(),
......@@ -198,19 +202,20 @@ impl<WS: WebsocketTrait> WS2POutgoingOrchestrator<WS> {
// Request CurrentBlockstamp and Milestones (always necessary)
self.request_current_blockstamp();
self.request_milestones();
// TODO local peer with generate_self_peer
// TODO retreive local peer on creation
// if in sync mode
if let Some(sync_opt) = sync_opt {
// open connection
if let Ok(_) = self
if self
.websocket_connector
.exec(WebsocketAction::ConnectToUrl {
url: sync_opt.source.unwrap(),
})
.is_ok()
{
} else {
fatal_error!("could not connect");
error!("could not connect");
}
}
......@@ -219,24 +224,78 @@ impl<WS: WebsocketTrait> WS2POutgoingOrchestrator<WS> {
// process incoming messages
while let Ok(msg) = self.receiver.recv_timeout(Duration::from_millis(10)) {
match msg {
ServiceMsg::NewController {
// id, sender
..
} => {
// self.connections.insert(
// id.get,
// OutgoingConnection {
// endpoint: None,
// controller: sender,
// },
// );
ServiceMsg::NewController { id, sender } => {
// adds the connection to local hashmap
self.connections.insert(
id,
OutgoingConnection {
endpoint: None,
controller: sender,
},
);
// maps the controller id to the "unknown" key
self.index.insert(None, id);
}
ServiceMsg::SendSyncInfo {
// remote_full_id,
// sync_info,
..
} => {}
_ => {}
remote_full_id,
sync_info,
} => match self.connections.get(&WS2PControllerId::Outgoing {
expected_remote_full_id: Some(remote_full_id),
}) {
Some(outgoing_connection) => {
// Encapsulate and binarize Sync_Info message
if let Ok((_ws2p_full_msg, bin_connect_msg)) =
WS2Pv2Message::encapsulate_payload(
self.currency.clone(),
self.self_node.my_node_id,
self.self_node.my_key_pair,
WS2Pv2MessagePayload::SyncInfo(sync_info),
)
{
// send the sync info message order to the controller
if outgoing_connection
.controller
.send(WebsocketActionOrder {
ws_action: WebsocketAction::SendMessage {
msg: WebsocketMessage::Bin(bin_connect_msg),
},
new_state_if_success: None,
new_state_if_fail: WS2PConnectionState::Unreachable,
})
.is_ok()
{
} else {
warn!("can not contact controller");
}
// decide wether to close the connection or to send the chunks
// close
if outgoing_connection
.controller
.send(WebsocketActionOrder {
ws_action: WebsocketAction::CloseConnection {
reason: Some("can not send chunks now".to_string()),
},
new_state_if_success: Some(WS2PConnectionState::Close),
new_state_if_fail: WS2PConnectionState::Close,
})
.is_ok()
{
} else {
warn!("can not contact controller");
}
} else {
fatal_error!("Dev error: Fail to sign own connect message !");
}
}
None => {
warn!("no outgoing connection corresponding to this id");
}
},
_ => {
unimplemented!();
}
}
}
......
......@@ -20,33 +20,35 @@ use crate::orchestrator::outgoing::ServiceMsg;
use crate::orchestrator::WS2POrchestrator;
use durs_message::responses::*;
/// process response from other module
pub fn process(orchestrator: &mut WS2POrchestrator, res_content: DursResContent) {
match res_content {
DursResContent::BlockchainResponse(blockchain_response) => match blockchain_response {
BlockchainResponse::CurrentBlockstamp(blockstamp) => {
// updates orchestrator's current blockstamp
orchestrator.current_blockstamp = Some(blockstamp);
// also informs outgoing orchestrator
if orchestrator
.outgoing_sender
.send(ServiceMsg::CurrentBlockstamp(blockstamp))
.is_ok()
{
} else {
error!("can not join outgoing orchestrator");
impl WS2POrchestrator {
/// process response from other module
pub fn process_response(self: &mut Self, res_content: DursResContent) {
match res_content {
DursResContent::BlockchainResponse(blockchain_response) => match blockchain_response {
BlockchainResponse::CurrentBlockstamp(blockstamp) => {
// updates orchestrator's current blockstamp
self.current_blockstamp = Some(blockstamp);
// also informs outgoing orchestrator
if self
.outgoing_sender
.send(ServiceMsg::CurrentBlockstamp(blockstamp))
.is_ok()
{
} else {
error!("can not join outgoing orchestrator");
}
}
}
BlockchainResponse::Milestones(milestones) => {
// updates orchestrator milestones
orchestrator.milestones = Some(milestones);
}
BlockchainResponse::Milestones(milestones) => {
// updates orchestrator milestones
self.milestones = Some(milestones);
}
_ => {
warn!("unexpected response");
}
},
_ => {
warn!("unexpected response");
}
},
_ => {
warn!("unexpected response");
}
}
}
......@@ -32,7 +32,7 @@ use std::sync::mpsc;
#[derive(Clone, Debug)]
/// Websocket message
pub enum WebsocketMessage {
/// Bnary message
/// Binary message
Bin(Vec<u8>),
/// String message
Str(String),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment