Select Git revision
datas.rs 20.40 KiB
// Copyright (C) 2018 The Duniter Project Developers.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dup_crypto::keys::*;
use durs_message::requests::BlockchainRequest;
use durs_message::*;
use durs_network_documents::network_endpoint::*;
use durs_network_documents::network_head::*;
use durs_network_documents::*;
use std::collections::HashSet;
use std::sync::mpsc;
#[derive(Debug)]
pub struct WS2PModuleDatas {
pub router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
pub currency: Option<String>,
pub key_pair: Option<KeyPairEnum>,
pub conf: WS2PConf,
pub ssl: bool,
pub node_id: NodeId,
pub main_thread_channel: (
mpsc::Sender<WS2PThreadSignal>,
mpsc::Receiver<WS2PThreadSignal>,
),
pub ws2p_endpoints: HashMap<NodeFullId, (EndpointV1, WS2PConnectionState)>,
pub websockets: HashMap<NodeFullId, WsSender>,
pub requests_awaiting_response:
HashMap<ModuleReqId, (OldNetworkRequest, NodeFullId, SystemTime)>,
pub heads_cache: HashMap<NodeFullId, NetworkHead>,
pub my_head: Option<NetworkHead>,
pub uids_cache: HashMap<PubKey, String>,
pub count_dal_requests: u32,
}
#[inline]
#[cfg(not(feature = "ssl"))]
fn ssl() -> bool {
false
}
#[cfg(feature = "ssl")]
fn ssl() -> bool {
true
}
impl WS2PModuleDatas {
pub fn new(
router_sender: mpsc::Sender<RouterThreadMessage<DursMsg>>,
conf: WS2PConf,
node_id: NodeId,
) -> WS2PModuleDatas {
WS2PModuleDatas {
router_sender,
key_pair: None,
currency: None,
conf,
ssl: ssl(),
node_id,
main_thread_channel: mpsc::channel(),
ws2p_endpoints: HashMap::new(),
websockets: HashMap::new(),
requests_awaiting_response: HashMap::new(),
heads_cache: HashMap::new(),
my_head: None,
uids_cache: HashMap::new(),
count_dal_requests: 0,
}
}
pub fn open_db(db_path: &PathBuf) -> Result<sqlite::Connection, sqlite::Error> {
let conn: sqlite::Connection;
if !db_path.as_path().exists() {
conn = sqlite::open(db_path.as_path())?;
conn.execute(
"CREATE TABLE endpoints (hash_full_id TEXT, status INTEGER, node_id INTEGER, pubkey TEXT,
api INTEGER, version INTEGER, endpoint TEXT, last_check INTEGER);",
)?;
} else {
conn = sqlite::open(db_path.as_path())?;
}
Ok(conn)
}
pub fn send_dal_request(&mut self, req: &BlockchainRequest) {
self.count_dal_requests += 1;
if self.count_dal_requests == std::u32::MAX {
self.count_dal_requests = 0;
}
self.router_sender
.send(RouterThreadMessage::ModuleMessage(DursMsg::Request {
req_from: WS2PModule::name(),
req_to: ModuleRole::BlockchainDatas,
req_id: ModuleReqId(self.count_dal_requests),
req_content: DursReqContent::BlockchainRequest(req.clone()),
}))
.expect("Fail to send message to router !");
}
pub fn send_network_req_response(
&self,
requester: ModuleStaticName,
req_id: ModuleReqId,
response: NetworkResponse,
) {
self.router_sender
.send(RouterThreadMessage::ModuleMessage(DursMsg::Response {
res_from: WS2PModule::name(),
res_to: requester,
req_id,
res_content: DursResContent::NetworkResponse(response),
}))
.expect("Fail to send message to router !");
}
pub fn send_network_event(&self, event: &NetworkEvent) {
let module_event = match event {
NetworkEvent::ConnectionStateChange(_, _, _, _) => {
ModuleEvent::ConnectionsChangeNodeNetwork
}
NetworkEvent::ReceiveBlocks(_) => ModuleEvent::NewBlockFromNetwork,
NetworkEvent::ReceiveDocuments(network_docs) => {
if !network_docs.is_empty() {
match network_docs[0] {
BlockchainDocument::Block(_) => ModuleEvent::NewBlockFromNetwork,
BlockchainDocument::Transaction(_) => ModuleEvent::NewTxFromNetwork,
_ => ModuleEvent::NewWotDocFromNetwork,
}
} else {
return;
}
}
NetworkEvent::ReceiveHeads(_) => ModuleEvent::NewValidHeadFromNetwork,
NetworkEvent::ReceivePeers(_) => ModuleEvent::NewValidPeerFromNodeNetwork,
NetworkEvent::NewSelfPeer(_) => ModuleEvent::NewSelfPeer,
};
self.router_sender
.send(RouterThreadMessage::ModuleMessage(DursMsg::Event {
event_type: module_event,
event_content: DursEvent::NetworkEvent(event.clone()),
}))
.expect("Fail to send network event to router !");
}
fn count_established_connections(&self) -> usize {
let mut count_established_connections = 0;
for (_ws2p_full_id, (_ep, state)) in self.ws2p_endpoints.clone() {
if let WS2PConnectionState::Established = state {
count_established_connections += 1;
}
}
count_established_connections
}
pub fn connect_to_know_endpoints(&mut self) {
info!("WS2P: connect to know endpoints...");
let mut count_established_connections = 0;
let mut pubkeys = HashSet::new();
let mut reachable_endpoints = Vec::new();
let mut unreachable_endpoints = Vec::new();
for (_ws2p_full_id, (ep, state)) in self.ws2p_endpoints.clone() {
if ep.issuer == self.key_pair.unwrap().public_key() || !pubkeys.contains(&ep.issuer) {
match state {
WS2PConnectionState::Established => count_established_connections += 1,
WS2PConnectionState::NeverTry
| WS2PConnectionState::Close
| WS2PConnectionState::Denial => {
pubkeys.insert(ep.issuer);
reachable_endpoints.push(ep);
}
_ => {
pubkeys.insert(ep.issuer);
unreachable_endpoints.push(ep);
}
}
}
}
let mut free_outcoming_rooms =
self.conf.clone().outcoming_quota - count_established_connections;
while free_outcoming_rooms > 0 {
let ep = if !reachable_endpoints.is_empty() {
reachable_endpoints
.pop()
.expect("WS2P: Fail to pop() reachable_endpoints !")
} else if !unreachable_endpoints.is_empty() {
unreachable_endpoints
.pop()
.expect("WS2P: Fail to pop() unreachable_endpoints !")
} else {
break;
};
if !self.ssl && ep.port == 443 {
continue;
}
self.connect_to_without_checking_quotas(&ep);
free_outcoming_rooms -= 1;
}
}
pub fn connect_to(&mut self, endpoint: &EndpointV1) {
// Add endpoint to endpoints list (if there isn't already)
match self.ws2p_endpoints.get(
&endpoint
.node_full_id()
.expect("WS2P: Fail to get ep.node_full_id() !"),
) {
Some(_) => {
self.ws2p_endpoints
.get_mut(
&endpoint
.node_full_id()
.expect("WS2P: Fail to get ep.node_full_id() !"),
)
.expect("WS2P: Fail to get_mut() a ws2p_endpoint !")
.1 = WS2PConnectionState::NeverTry;
}
None => {
self.ws2p_endpoints.insert(
endpoint
.node_full_id()
.expect("WS2P: Fail to get ep.node_full_id() !"),
(endpoint.clone(), WS2PConnectionState::NeverTry),
);
}
};
if self.conf.clone().outcoming_quota > self.count_established_connections() {
self.connect_to_without_checking_quotas(&endpoint);
}
}
pub fn close_connection(
&mut self,
ws2p_full_id: &NodeFullId,
reason: WS2PCloseConnectionReason,
) {
match reason {
WS2PCloseConnectionReason::NegociationTimeout => {}
WS2PCloseConnectionReason::AuthMessInvalidSig
| WS2PCloseConnectionReason::Timeout
| WS2PCloseConnectionReason::WsError
| WS2PCloseConnectionReason::Unknow => {
self.ws2p_endpoints
.get_mut(ws2p_full_id)
.expect("Failure : attempt to delete a non-existent connection !")
.1 = WS2PConnectionState::Close
}
}
if let Some(websocket) = self.websockets.get(&ws2p_full_id) {
let _result = websocket.0.close(ws::CloseCode::Normal);
}
let _result = self.websockets.remove(ws2p_full_id);
}
pub fn ws2p_conn_message_pretreatment(&mut self, message: WS2PConnectionMessage) -> WS2PSignal {
let ws2p_full_id = message.0;
match message.1 {
WS2PConnectionMessagePayload::WrongUrl
| WS2PConnectionMessagePayload::FailOpenWS
| WS2PConnectionMessagePayload::FailToSplitWS => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = WS2PConnectionState::WSError;
return WS2PSignal::WSError(ws2p_full_id);
}
WS2PConnectionMessagePayload::TryToSendConnectMess => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = WS2PConnectionState::TryToSendConnectMess;
}
WS2PConnectionMessagePayload::FailSendConnectMess => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to mut ep !")
.1 = WS2PConnectionState::Unreachable;
}
WS2PConnectionMessagePayload::WebsocketOk(sender) => {
self.websockets.insert(ws2p_full_id, sender);
}
WS2PConnectionMessagePayload::ValidConnectMessage(response, new_con_state) => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = new_con_state;
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("Endpoint don't exist !")
.1 = WS2PConnectionState::ConnectMessOk;
debug!("Send: {:#?}", response);
if let Some(websocket) = self.websockets.get_mut(&ws2p_full_id) {
if websocket.0.send(Message::text(response)).is_err() {
return WS2PSignal::WSError(ws2p_full_id);
}
} else {
// Connection closed by remote peer
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("Endpoint don't exist !")
.1 = WS2PConnectionState::Close;
}
}
WS2PConnectionMessagePayload::ValidAckMessage(response, new_con_state) => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = new_con_state;
if let WS2PConnectionState::AckMessOk = self.ws2p_endpoints[&ws2p_full_id].1 {
debug!("Send: {:#?}", response);
if let Some(websocket) = self.websockets.get_mut(&ws2p_full_id) {
if websocket.0.send(Message::text(response)).is_err() {
return WS2PSignal::WSError(ws2p_full_id);
}
} else {
panic!("Fatal error : no websocket for {} !", ws2p_full_id);
}
}
}
WS2PConnectionMessagePayload::ValidOk(new_con_state) => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = new_con_state;
match self.ws2p_endpoints[&ws2p_full_id].1 {
WS2PConnectionState::OkMessOkWaitingAckMess => {}
WS2PConnectionState::Established => {
return WS2PSignal::ConnectionEstablished(ws2p_full_id);
}
_ => {
self.close_connection(&ws2p_full_id, WS2PCloseConnectionReason::Unknow);
return WS2PSignal::Empty;
}
}
}
WS2PConnectionMessagePayload::DalRequest(req_id, req_body) => {
return WS2PSignal::DalRequest(ws2p_full_id, req_id, req_body);
}
WS2PConnectionMessagePayload::PeerCard(body, ws2p_endpoints) => {
return WS2PSignal::PeerCard(ws2p_full_id, body, ws2p_endpoints);
}
WS2PConnectionMessagePayload::Heads(heads) => {
let mut applied_heads = Vec::with_capacity(heads.len());
for head in heads {
if let Ok(head) = NetworkHead::from_json_value(&head) {
if head.verify()
&& (self.my_head.is_none()
|| head.node_full_id()
!= self
.my_head
.clone()
.expect("WS2P: Fail to clone my_head")
.node_full_id())
&& head.apply(&mut self.heads_cache)
{
applied_heads.push(head);
}
}
}
return WS2PSignal::Heads(ws2p_full_id, applied_heads);
}
WS2PConnectionMessagePayload::Document(network_doc) => {
return WS2PSignal::Document(ws2p_full_id, network_doc);
}
WS2PConnectionMessagePayload::ReqResponse(req_id, response) => {
if self.requests_awaiting_response.len() > req_id.0 as usize {
if let Some((ref ws2p_request, ref recipient_fulld_id, ref _timestamp)) =
self.requests_awaiting_response.remove(&req_id)
{
return WS2PSignal::ReqResponse(
req_id,
*ws2p_request,
*recipient_fulld_id,
response,
);
}
}
}
WS2PConnectionMessagePayload::NegociationTimeout => {
match self.ws2p_endpoints[&ws2p_full_id].1 {
WS2PConnectionState::AckMessOk | WS2PConnectionState::ConnectMessOk => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = WS2PConnectionState::Denial
}
WS2PConnectionState::WaitingConnectMess => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = WS2PConnectionState::NoResponse
}
_ => {
self.ws2p_endpoints
.get_mut(&ws2p_full_id)
.expect("WS2P: Fail to get mut ep !")
.1 = WS2PConnectionState::Unreachable
}
}
self.close_connection(&ws2p_full_id, WS2PCloseConnectionReason::NegociationTimeout);
return WS2PSignal::NegociationTimeout(ws2p_full_id);
}
WS2PConnectionMessagePayload::Timeout => {
self.close_connection(&ws2p_full_id, WS2PCloseConnectionReason::Timeout);
return WS2PSignal::Timeout(ws2p_full_id);
}
WS2PConnectionMessagePayload::UnknowMessage => {
warn!("WS2P : Receive Unknow Message from {}.", &ws2p_full_id.1)
}
WS2PConnectionMessagePayload::WrongFormatMessage => warn!(
"WS2P : Receive Wrong Format Message from {}.",
&ws2p_full_id.1
),
WS2PConnectionMessagePayload::InvalidMessage => return WS2PSignal::Empty,
WS2PConnectionMessagePayload::Close => {
if self.websockets.contains_key(&ws2p_full_id) {
self.close_connection(
&ws2p_full_id,
WS2PCloseConnectionReason::AuthMessInvalidSig,
)
}
}
}
let connections_count = self.websockets.len();
if connections_count == 0 {
return WS2PSignal::NoConnection;
}
// Detect timeout requests
let mut requests_timeout = Vec::new();
for &(ref req, ref _ws2p_full_id, ref timestamp) in
self.requests_awaiting_response.clone().values()
{
if SystemTime::now().duration_since(*timestamp).unwrap() > Duration::new(20, 0) {
requests_timeout.push(req.get_req_full_id());
warn!("request timeout : {:?}", req);
}
}
// Delete (and resend) timeout requests
for req_id in requests_timeout {
//let ws2p_endpoints = self.ws2p_endpoints.clone();
let _request_option = self.requests_awaiting_response.remove(&req_id.1);
/*if let Some((request, _, _)) = request_option {
let _request_result = self.send_request_to_specific_node(
&get_random_connection(&ws2p_endpoints),
&request,
);
}*/
}
WS2PSignal::Empty
}
pub fn send_request_to_specific_node(
&mut self,
receiver_ws2p_full_id: &NodeFullId,
ws2p_request: &OldNetworkRequest,
) -> ws::Result<()> {
self.websockets
.get_mut(receiver_ws2p_full_id)
.expect("WS2P: Fail to get mut websocket !")
.0
.send(Message::text(
network_request_to_json(ws2p_request).to_string(),
))?;
self.requests_awaiting_response.insert(
ws2p_request.get_req_id(),
(*ws2p_request, *receiver_ws2p_full_id, SystemTime::now()),
);
debug!(
"send request {} to {}",
network_request_to_json(ws2p_request).to_string(),
receiver_ws2p_full_id
);
Ok(())
}
fn connect_to_without_checking_quotas(&mut self, endpoint: &EndpointV1) {
let endpoint_copy = endpoint.clone();
let conductor_sender_copy = self.main_thread_channel.0.clone();
let currency_copy = self.currency.clone();
let key_pair_copy = self.key_pair;
thread::spawn(move || {
let _result = connect_to_ws2p_endpoint(
&endpoint_copy,
&conductor_sender_copy,
¤cy_copy.expect("WS2PError : No currency !"),
key_pair_copy.expect("WS2PError : No key_pair !"),
);
});
}
}