diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index b191be7ebbf96c1fb14ab5554df9d39d369c6385..ec079eac5d70d1be71121d994c01d71dc374d449 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -17,6 +17,15 @@ from sakia.errors import InvalidNodeCurrency from ..entities.node import Node +class NodeConnectorLoggerAdapter(logging.LoggerAdapter): + """ + This example adapter expects the passed in dict-like object to have a + 'connid' key, whose value in brackets is prepended to the log message. + """ + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['pubkey'][:5], msg), kwargs + + class NodeConnector(QObject): """ A node is a peer send from the client point of view. @@ -38,7 +47,8 @@ class NodeConnector(QObject): 'peer': False} self._user_parameters = user_parameters self.session = session - self._logger = logging.getLogger('sakia') + self._raw_logger = logging.getLogger('sakia') + self._logger = NodeConnectorLoggerAdapter(self._raw_logger, {'pubkey': self.node.pubkey}) def __del__(self): for ws in self._ws_tasks.values(): @@ -69,7 +79,7 @@ class NodeConnector(QObject): if currency and peer.currency != currency: raise InvalidNodeCurrency(currency, peer.currency) - node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID) + node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID, last_state_change=time.time()) logging.getLogger('sakia').debug("Node from address : {:}".format(str(node))) return cls(node, user_parameters, session=session) @@ -87,7 +97,7 @@ class NodeConnector(QObject): if currency and peer.currency != currency: raise InvalidNodeCurrency(currency, peer.currency) - node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID) + node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID, last_state_change=time.time()) logging.getLogger('sakia').debug("Node from peer : {:}".format(str(node))) return cls(node, user_parameters, session=None) @@ -98,11 +108,10 @@ class NodeConnector(QObject): data = await request(conn_handler, **req_args) return data except (ClientError, gaierror, TimeoutError, ConnectionRefusedError, ValueError) as e: - self._logger.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) + self._logger.debug("{0}".format(str(e))) self.change_state_and_emit(Node.OFFLINE) except jsonschema.ValidationError as e: self._logger.debug(str(e)) - self._logger.debug("Validation error : {0}".format(self.node.pubkey[:5])) self.change_state_and_emit(Node.CORRUPTED) async def init_session(self): @@ -151,11 +160,10 @@ class NodeConnector(QObject): ws_connection = bma.ws.block(conn_handler) async with ws_connection as ws: self._connected['block'] = True - self._logger.debug("Connected successfully to block ws : {0}" - .format(self.node.pubkey[:5])) + self._logger.debug("Connected successfully to block ws") async for msg in ws: if msg.tp == aiohttp.WSMsgType.TEXT: - self._logger.debug("Received a block : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Received a block") block_data = bma.parse_text(msg.data, bma.ws.WS_BLOCk_SCHEMA) await self.refresh_block(block_data) elif msg.tp == aiohttp.WSMsgType.CLOSED: @@ -163,15 +171,14 @@ class NodeConnector(QObject): elif msg.tp == aiohttp.WSMsgType.ERROR: break except (aiohttp.WSServerHandshakeError, ValueError) as e: - self._logger.debug("Websocket block {0} : {1} - {2}" - .format(type(e).__name__, str(e), self.node.pubkey[:5])) + self._logger.debug("Websocket block {0} : {1}".format(type(e).__name__, str(e))) await self.request_current_block() except (ClientError, gaierror, TimeoutError) as e: self._logger.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) self.change_state_and_emit(Node.OFFLINE) except jsonschema.ValidationError as e: self._logger.debug(str(e)) - self._logger.debug("Validation error : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Validation error") self.change_state_and_emit(Node.CORRUPTED) finally: self._connected['block'] = False @@ -196,9 +203,9 @@ class NodeConnector(QObject): self.change_state_and_emit(Node.ONLINE) else: self.change_state_and_emit(Node.CORRUPTED) - self._logger.debug("Error in block reply of {0} : {1}}".format(self.node.pubkey[:5], str(e))) + self._logger.debug("Error in block reply : {0}".format(str(e))) else: - self._logger.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Could not connect to any BMA endpoint") self.change_state_and_emit(Node.OFFLINE) async def refresh_block(self, block_data): @@ -237,7 +244,7 @@ class NodeConnector(QObject): block_data['number'])) self.changed.emit() else: - self._logger.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Could not connect to any BMA endpoint") self.change_state_and_emit(Node.OFFLINE) else: self.change_state_and_emit(Node.ONLINE) @@ -259,10 +266,10 @@ class NodeConnector(QObject): self.identity_changed.emit() return # Break endpoints loop except errors.DuniterError as e: - self._logger.debug("Error in summary of {0} : {1}".format(self.node.pubkey[:5], str(e))) + self._logger.debug("Error in summary : {:}".format(str(e))) self.change_state_and_emit(Node.OFFLINE) else: - self._logger.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Could not connect to any BMA endpoint") self.change_state_and_emit(Node.OFFLINE) async def connect_peers(self): @@ -278,10 +285,10 @@ class NodeConnector(QObject): ws_connection = bma.ws.peer(conn_handler) async with ws_connection as ws: self._connected['peer'] = True - self._logger.debug("Connected successfully to peer ws : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Connected successfully to peer ws") async for msg in ws: if msg.tp == aiohttp.WSMsgType.TEXT: - self._logger.debug("Received a peer : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Received a peer") peer_data = bma.parse_text(msg.data, bma.ws.WS_PEER_SCHEMA) self.refresh_peer_data(peer_data) elif msg.tp == aiohttp.WSMsgType.CLOSED: @@ -289,15 +296,14 @@ class NodeConnector(QObject): elif msg.tp == aiohttp.WSMsgType.ERROR: break except (aiohttp.WSServerHandshakeError, ValueError) as e: - self._logger.debug("Websocket peer {0} : {1} - {2}" - .format(type(e).__name__, str(e), self.node.pubkey[:5])) + self._logger.debug("Websocket peer {0} : {1}" + .format(type(e).__name__, str(e))) await self.request_peers() except (ClientError, gaierror, TimeoutError) as e: - self._logger.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) + self._logger.debug("{0}".format(str(e))) self.change_state_and_emit(Node.OFFLINE) except jsonschema.ValidationError as e: self._logger.debug(str(e)) - self._logger.debug("Validation error : {0}".format(self.node.pubkey[:5])) self.change_state_and_emit(Node.CORRUPTED) finally: self._connected['peer'] = False @@ -328,9 +334,8 @@ class NodeConnector(QObject): break self.refresh_peer_data(leaf_data['leaf']['value']) except (AttributeError, ValueError, errors.DuniterError) as e: - self._logger.debug("{pubkey} : Incorrect peer data in {leaf}" - .format(pubkey=self.node.pubkey[:5], - leaf=leaf_hash)) + self._logger.debug("Incorrect peer data in {leaf}" + .format(leaf=leaf_hash)) self.change_state_and_emit(Node.OFFLINE) else: self.node.merkle_peers_root = peers_data['root'] @@ -340,7 +345,7 @@ class NodeConnector(QObject): self._logger.debug("Error in peers reply : {0}".format(str(e))) self.change_state_and_emit(Node.OFFLINE) else: - self._logger.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self._logger.debug("Could not connect to any BMA endpoint") self.change_state_and_emit(Node.OFFLINE) def refresh_peer_data(self, peer_data): @@ -356,9 +361,6 @@ class NodeConnector(QObject): self._logger.debug("Incorrect leaf reply") def change_state_and_emit(self, new_state): - if self.node.state in (Node.CORRUPTED, Node.OFFLINE): - self.error.emit() - if self.node.state != new_state: self.node.last_state_change = time.time() self.node.state = new_state