From c18abc35441fdab19b574ad29d35f50db60d85d5 Mon Sep 17 00:00:00 2001 From: Inso <insomniak.fr@gmail.com> Date: Thu, 28 Jan 2016 08:38:00 +0100 Subject: [PATCH] Some details in the algorithm with websockets --- src/sakia/core/net/network.py | 8 ++- src/sakia/core/net/node.py | 122 ++++++++++++++++++++++++---------- src/sakia/tools/decorators.py | 3 +- 3 files changed, 95 insertions(+), 38 deletions(-) diff --git a/src/sakia/core/net/network.py b/src/sakia/core/net/network.py index 4ba45bff..36516cf1 100644 --- a/src/sakia/core/net/network.py +++ b/src/sakia/core/net/network.py @@ -136,6 +136,8 @@ class Network(QObject): Stop network nodes crawling. """ self._must_crawl = False + for node in self.nodes: + node.close_ws() def continue_crawling(self): return self._must_crawl @@ -325,11 +327,15 @@ class Network(QObject): To stop this crawling, call "stop_crawling" method. """ self._must_crawl = True + first_loop = True while self.continue_crawling(): for node in self.nodes: if self.continue_crawling(): node.refresh() - await asyncio.sleep(15) + if not first_loop: + await asyncio.sleep(15) + first_loop = False + logging.debug("End of network discovery") @pyqtSlot(Peer, str) diff --git a/src/sakia/core/net/node.py b/src/sakia/core/net/node.py index 37fdce22..93738eda 100644 --- a/src/sakia/core/net/node.py +++ b/src/sakia/core/net/node.py @@ -64,9 +64,9 @@ class Node(QObject): self._software = software self._version = version self._fork_window = fork_window - self._refresh_counter = 0 - self._ws_opened = {'block': False, - 'peer': False} + self._refresh_counter = 1 + self._ws_connection = {'block': None, + 'peer': None} @classmethod async def from_address(cls, currency, address, port): @@ -203,6 +203,11 @@ class Node(QObject): } return data + def close_ws(self): + for ws in self._ws_connection.values(): + if ws: + asyncio.wait_for(ws.close(), timeout=15) + @property def pubkey(self): return self._pubkey @@ -302,7 +307,7 @@ class Node(QObject): :param bool manual: True if the refresh was manually initiated """ self.connect_current_block() - self.refresh_peers() + self.connect_peers() if self._refresh_counter % 20 == 0 or manual: self.refresh_informations() @@ -318,14 +323,17 @@ class Node(QObject): Connects to the websocket entry point of the node If the connection fails, it tries the fallback mode on HTTP GET """ - if not self._ws_opened['block']: + if not self._ws_connection['block']: try: conn_handler = self.endpoint.conn_handler() - self._ws_opened['block'] = True block_websocket = bma.websocket.Block(conn_handler) - async with block_websocket.connect() as ws: - async for msg in ws: + self._ws_connection['block'] = block_websocket.connect() + await self.request_current_block() + async with self._ws_connection['block']: + logging.debug("Connected successfully to block ws : {0}".format(self.pubkey[:5])) + async for msg in self._ws_connection['block']: if msg.tp == aiohttp.MsgType.text: + logging.debug("Received a block : {0}".format(self.pubkey[:5])) block_data = block_websocket.parse(msg.data) await self.refresh_block(block_data) elif msg.tp == aiohttp.MsgType.closed: @@ -333,10 +341,16 @@ class Node(QObject): elif msg.tp == aiohttp.MsgType.error: break except (WSServerHandshakeError, WSClientDisconnectedError, ClientResponseError) as e: - logging.debug("Websocket error : {0}".format(str(e))) - self.request_current_block() + logging.debug("Websocket block {0} : {1} - {2}".format(type(e).__name__, str(e), self.pubkey[:5])) + await self.request_current_block() + except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: + logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) + self.state = Node.OFFLINE + except jsonschema.ValidationError: + logging.debug("Validation error : {0}".format(self.pubkey[:5])) + self.state = Node.CORRUPTED finally: - self._ws_opened['block'] = False + self._ws_connection['block'] = None async def request_current_block(self): """ @@ -353,14 +367,14 @@ class Node(QObject): self.set_block(None) else: self.state = Node.OFFLINE - logging.debug("Error in block reply : {0}".format(self.pubkey)) + logging.debug("Error in block reply : {0}".format(self.pubkey[:5])) logging.debug(str(e)) self.changed.emit() except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(str(e), self.pubkey)) + logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) self.state = Node.OFFLINE except jsonschema.ValidationError: - logging.debug("Validation error : {0}".format(self.pubkey)) + logging.debug("Validation error : {0}".format(self.pubkey[:5])) self.state = Node.CORRUPTED async def refresh_block(self, block_data): @@ -384,14 +398,14 @@ class Node(QObject): self.main_chain_previous_block = None else: self.state = Node.OFFLINE - logging.debug("Error in previous block reply : {0}".format(self.pubkey)) + logging.debug("Error in previous block reply : {0}".format(self.pubkey[:5])) logging.debug(str(e)) self.changed.emit() except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(str(e), self.pubkey)) + logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) self.state = Node.OFFLINE except jsonschema.ValidationError: - logging.debug("Validation error : {0}".format(self.pubkey)) + logging.debug("Validation error : {0}".format(self.pubkey[:5])) self.state = Node.CORRUPTED finally: self.set_block(block_data) @@ -431,10 +445,10 @@ class Node(QObject): self.state = Node.OFFLINE self.changed.emit() except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey)) + logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) self.state = Node.OFFLINE except jsonschema.ValidationError: - logging.debug("Validation error : {0}".format(self.pubkey)) + logging.debug("Validation error : {0}".format(self.pubkey[:5])) self.state = Node.CORRUPTED @asyncify @@ -458,10 +472,10 @@ class Node(QObject): self.state = Node.OFFLINE self.changed.emit() except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey)) + logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) self.state = Node.OFFLINE except jsonschema.ValidationError: - logging.debug("Validation error : {0}".format(self.pubkey)) + logging.debug("Validation error : {0}".format(self.pubkey[:5])) self.state = Node.CORRUPTED @asyncify @@ -487,20 +501,53 @@ class Node(QObject): self.identity_changed.emit() except ValueError as e: if '404' in str(e): - logging.debug("UID not found : {0}".format(self.pubkey)) + logging.debug("UID not found : {0}".format(self.pubkey[:5])) else: - logging.debug("error in uid reply : {0}".format(self.pubkey)) + logging.debug("error in uid reply : {0}".format(self.pubkey[:5])) self.state = Node.OFFLINE self.identity_changed.emit() except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey)) + logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) self.state = Node.OFFLINE except jsonschema.ValidationError: - logging.debug("Validation error : {0}".format(self.pubkey)) + logging.debug("Validation error : {0}".format(self.pubkey[:5])) self.state = Node.CORRUPTED @asyncify - async def refresh_peers(self): + async def connect_peers(self): + """ + Connects to the peer websocket entry point + If the connection fails, it tries the fallback mode on HTTP GET + """ + if not self._ws_connection['peer']: + try: + conn_handler = self.endpoint.conn_handler() + peer_websocket = bma.websocket.Peer(conn_handler) + self._ws_connection['peer'] = peer_websocket.connect() + async with self._ws_connection['peer']: + logging.debug("Connected successfully to peer ws : {0}".format(self.pubkey[:5])) + async for msg in self._ws_connection['peer']: + if msg.tp == aiohttp.MsgType.text: + logging.debug("Received a peer : {0}".format(self.pubkey[:5])) + peer_data = peer_websocket.parse(msg.data) + await self.refresh_peer_data(peer_data) + elif msg.tp == aiohttp.MsgType.closed: + break + elif msg.tp == aiohttp.MsgType.error: + break + except (WSServerHandshakeError, WSClientDisconnectedError, ClientResponseError) as e: + logging.debug("Websocket peer {0} : {1} - {2}".format(type(e).__name__, str(e), self.pubkey[:5])) + await self.request_peers() + except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: + logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) + self.state = Node.OFFLINE + except jsonschema.ValidationError: + logging.debug("Validation error : {0}".format(self.pubkey[:5])) + self.state = Node.CORRUPTED + finally: + self._ws_connection['peer'] = None + + async def request_peers(self): """ Refresh the list of peers knew by this node """ @@ -515,24 +562,17 @@ class Node(QObject): for leaf_hash in leaves: try: leaf_data = await bma.network.peering.Peers(conn_handler).get(leaf=leaf_hash) - if "raw" in leaf_data['leaf']['value']: - str_doc = "{0}{1}\n".format(leaf_data['leaf']['value']['raw'], - leaf_data['leaf']['value']['signature']) - peer_doc = Peer.from_signed_raw(str_doc) - pubkey = leaf_data['leaf']['value']['pubkey'] - self.neighbour_found.emit(peer_doc, pubkey) - else: - logging.debug("Incorrect leaf reply") + self.refresh_peer_data(leaf_data['leaf']['value']) except (AttributeError, ValueError) as e: logging.debug("{pubkey} : Incorrect peer data in {leaf}".format(pubkey=self.pubkey[:5], leaf=leaf_hash)) self.state = Node.OFFLINE self.changed.emit() except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey)) + logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) self.state = Node.OFFLINE except jsonschema.ValidationError: - logging.debug("Validation error : {0}".format(self.pubkey)) + logging.debug("Validation error : {0}".format(self.pubkey[:5])) self.state = Node.CORRUPTED self._last_merkle = {'root' : peers_data['root'], 'leaves': peers_data['leaves']} @@ -547,6 +587,16 @@ class Node(QObject): logging.debug("Validation error : {0}".format(self.pubkey)) self.state = Node.CORRUPTED + def refresh_peer_data(self, peer_data): + if "raw" in peer_data: + str_doc = "{0}{1}\n".format(peer_data['raw'], + peer_data['signature']) + peer_doc = Peer.from_signed_raw(str_doc) + pubkey = peer_data['pubkey'] + self.neighbour_found.emit(peer_doc, pubkey) + else: + logging.debug("Incorrect leaf reply") + def __str__(self): return ','.join([str(self.pubkey), str(self.endpoint.server), str(self.endpoint.ipv4), str(self.endpoint.port), diff --git a/src/sakia/tools/decorators.py b/src/sakia/tools/decorators.py index 99080ce8..9dc30fad 100644 --- a/src/sakia/tools/decorators.py +++ b/src/sakia/tools/decorators.py @@ -17,7 +17,8 @@ def once_at_a_time(fn): try: args[0].__tasks.pop(fn.__name__) except KeyError: - logging.debug("Task already removed") + pass + #logging.debug("Task already removed") if getattr(args[0], "__tasks", None) is None: setattr(args[0], "__tasks", {}) -- GitLab