diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index ee20a010514f3b119b1d0d659e2bd0827ab81bd7..4f1068b4e25503e983ea3484796d57bab46bbb0b 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -34,6 +34,7 @@ class NodeConnector(QObject): error = pyqtSignal() identity_changed = pyqtSignal() neighbour_found = pyqtSignal(Peer) + block_found = pyqtSignal(BlockUID) FAILURE_THRESHOLD = 3 @@ -183,14 +184,14 @@ class NodeConnector(QObject): if msg.type == aiohttp.WSMsgType.TEXT: self._logger.debug("Received a block") block_data = bma.parse_text(msg.data, bma.ws.WS_BLOCk_SCHEMA) - await self.refresh_block(block_data) + self.block_found.emit(BlockUID(block_data['number'], block_data['hash'])) elif msg.type == aiohttp.WSMsgType.CLOSED: break elif msg.type == aiohttp.WSMsgType.ERROR: break except (aiohttp.WSServerHandshakeError, ValueError) as e: self._logger.debug("Websocket block {0} : {1}".format(type(e).__name__, str(e))) - await self.request_current_block() + self.handle_failure() except (ClientError, gaierror, TimeoutError) as e: self._logger.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) self.handle_failure() @@ -211,102 +212,6 @@ class NodeConnector(QObject): self._connected['block'] = False self._ws_tasks['block'] = None - async def request_current_block(self): - """ - Request a node on the HTTP GET interface - If an error occurs, the node is considered offline - """ - for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: - try: - block_data = await self.safe_request(endpoint, bma.blockchain.current, - proxy=self._user_parameters.proxy()) - if not block_data: - continue - await self.refresh_block(block_data) - return # Do not try any more endpoint - except errors.DuniterError as e: - if e.ucode == errors.BLOCK_NOT_FOUND: - self.node.previous_buid = BlockUID.empty() - self.handle_success() - else: - self.change_state_and_emit(Node.CORRUPTED) - self._logger.debug("Error in block reply : {0}".format(str(e))) - else: - if self.session.closed: - pass - else: - self._logger.debug("Could not connect to any BMA endpoint") - self.handle_failure() - - async def refresh_block(self, block_data): - """ - Refresh the blocks of this node - :param dict block_data: The block data in json format - """ - if not self.node.current_buid or self.node.current_buid.sha_hash != block_data['hash']: - for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: - conn_handler = next(endpoint.conn_handler(self.session, - proxy=self._user_parameters.proxy())) - self._logger.debug("Requesting {0}".format(conn_handler)) - try: - previous_block = await self.safe_request(endpoint, bma.blockchain.block, - proxy=self._user_parameters.proxy(), - req_args={'number': self.node.current_buid.number}) - if not previous_block: - continue - self.node.previous_buid = BlockUID(previous_block['number'], previous_block['hash']) - break # Do not try any more endpoint - except errors.DuniterError as e: - if e.ucode == errors.BLOCK_NOT_FOUND: - self.node.previous_buid = BlockUID.empty() - # we don't change state here - break - else: - self.change_state_and_emit(Node.CORRUPTED) - break - - finally: - if self.node.current_buid != BlockUID(block_data['number'], block_data['hash']): - self.node.current_buid = BlockUID(block_data['number'], block_data['hash']) - self.node.current_ts = block_data['medianTime'] - self._logger.debug("Changed block {0} -> {1}".format(self.node.current_buid.number, - block_data['number'])) - self.changed.emit() - else: - if self.session.closed: - pass - else: - self._logger.debug("Could not connect to any BMA endpoint") - self.handle_failure() - else: - self.handle_success() - - @asyncify - async def refresh_summary(self): - """ - Refresh the summary of this node - """ - for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: - try: - summary_data = await self.safe_request(endpoint, bma.node.summary, - proxy=self._user_parameters.proxy()) - if not summary_data: - continue - self.node.software = summary_data["duniter"]["software"] - self.node.version = summary_data["duniter"]["version"] - self.node.state = Node.ONLINE - self.identity_changed.emit() - return # Break endpoints loop - except errors.DuniterError as e: - self._logger.debug("Error in summary : {:}".format(str(e))) - self.handle_failure() - else: - if self.session.closed: - pass - else: - self._logger.debug("Could not connect to any BMA endpoint") - self.handle_failure() - async def connect_peers(self): """ Connects to the peer websocket entry point @@ -417,6 +322,28 @@ class NodeConnector(QObject): self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) else: self._logger.debug("Incorrect leaf reply") + + async def request_ws2p_heads(self): + """ + Refresh the list of peers knew by this node + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + try: + heads_data = await self.safe_request(endpoint, bma.network.heads, + proxy=self._user_parameters.proxy()) + if not heads_data: + continue + self.node.state = Node.ONLINE + return heads_data # Break endpoints loop + except errors.DuniterError as e: + self._logger.debug("Error in peers reply : {0}".format(str(e))) + self.handle_failure() + else: + if self.session.closed: + pass + else: + self._logger.debug("Could not connect to any BMA endpoint") + self.handle_failure() def handle_success(self): self.failure_count = 0 diff --git a/src/sakia/data/processors/nodes.py b/src/sakia/data/processors/nodes.py index d8bcf4e94a64ea99b18bf1f6f74d635cf2055790..4cd71993dd6b7797e1acfa5518aa2f1356964d2e 100644 --- a/src/sakia/data/processors/nodes.py +++ b/src/sakia/data/processors/nodes.py @@ -131,6 +131,24 @@ class NodesProcessor: ratio_synced = synced / total return ratio_synced + def update_ws2p(self, currency, head): + """ + Update the peer of a node + :param str currency: the currency of the peer + :param head: + :return: + """ + node = self._repo.get_one(pubkey=head.pubkey, currency=currency) + if node: + if node.current_buid < head.blockstamp: + logging.debug("Update node : {0}".format(head.pubkey[:5])) + node.previous_buid = node.current_buid + node.current_buid = head.blockstamp + node.state = Node.ONLINE + self._repo.update(node) + return node, True + return node, False + def update_peer(self, currency, peer): """ Update the peer of a node diff --git a/src/sakia/gui/navigation/network/table_model.py b/src/sakia/gui/navigation/network/table_model.py index 30594f64347ba420b76043a9d5876633de208505..2681f4c7bce1ea587734cf18ba09836346a517c5 100644 --- a/src/sakia/gui/navigation/network/table_model.py +++ b/src/sakia/gui/navigation/network/table_model.py @@ -31,8 +31,7 @@ class NetworkFilterProxyModel(QSortFilterProxyModel): left_data = int(left_data.split('\n')[0]) if left_data != '' else 0 right_data = int(right_data.split('\n')[0]) if right_data != '' else 0 - if left.column() in (NetworkTableModel.columns_types.index('current_block'), - NetworkTableModel.columns_types.index('current_time')): + if left.column() == NetworkTableModel.columns_types.index('current_block'): left_data = int(left_data) if left_data != '' else 0 right_data = int(right_data) if right_data != '' else 0 if left_data == right_data: @@ -89,14 +88,6 @@ class NetworkFilterProxyModel(QSortFilterProxyModel): if index.column() == NetworkTableModel.columns_types.index('current_hash'): return source_data[:10] - if index.column() == NetworkTableModel.columns_types.index('current_time') and source_data: - ts = self.blockchain_processor.adjusted_ts(self.app.currency, source_data) - return QLocale.toString( - QLocale(), - QDateTime.fromTime_t(ts), - QLocale.dateTimeFormat(QLocale(), QLocale.ShortFormat) - ) + " BAT" - if role == Qt.TextAlignmentRole: if source_index.column() == NetworkTableModel.columns_types.index('address') \ or source_index.column() == self.sourceModel().columns_types.index('current_block'): @@ -129,7 +120,6 @@ class NetworkTableModel(QAbstractTableModel): 'api': QT_TRANSLATE_NOOP('NetworkTableModel', 'API'), 'current_block': QT_TRANSLATE_NOOP("NetworkTableModel", 'Block'), 'current_hash': QT_TRANSLATE_NOOP("NetworkTableModel", 'Hash'), - 'current_time': QT_TRANSLATE_NOOP("NetworkTableModel", 'Time'), 'uid': QT_TRANSLATE_NOOP("NetworkTableModel", 'UID'), 'is_member': QT_TRANSLATE_NOOP("NetworkTableModel", 'Member'), 'pubkey': QT_TRANSLATE_NOOP("NetworkTableModel", 'Pubkey'), @@ -142,7 +132,6 @@ class NetworkTableModel(QAbstractTableModel): 'api', 'current_block', 'current_hash', - 'current_time', 'uid', 'is_member', 'pubkey', @@ -231,16 +220,16 @@ class NetworkTableModel(QAbstractTableModel): api = "\n".join(api_list) if node.current_buid: - number, block_hash, block_time = node.current_buid.number, node.current_buid.sha_hash, node.current_ts + number, block_hash = node.current_buid.number, node.current_buid.sha_hash else: - number, block_hash, block_time = "", "", "" + number, block_hash = "", "" state = node.state if not current_buid: current_buid = self.network_service.current_buid() if node.state == Node.ONLINE and node.current_buid != current_buid: state = NetworkTableModel.DESYNCED - return (address, port, api, number, block_hash, block_time, node.uid, + return (address, port, api, number, block_hash, node.uid, node.member, node.pubkey, node.software, node.version, node.root, state, node) diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index ae8fcacf31927d5afc9b4bf313790e9dda2fbdbe..9b6a401827a899958f098b207b2601f86fd3d7d4 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -1,10 +1,12 @@ import asyncio import logging import time +import random from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, Qt from duniterpy.api import errors -from duniterpy.documents import BlockUID +from duniterpy.documents.ws2p.heads import * +from duniterpy.documents.peer import BMAEndpoint from duniterpy.key import VerifyingKey from sakia.data.connectors import NodeConnector from sakia.data.entities import Node @@ -43,6 +45,7 @@ class NetworkService(QObject): self.add_connector(c) self.currency = currency self._must_crawl = False + self._ws2p_heads_refreshing = False self._block_found = self._processor.current_buid(self.currency) self._discovery_stack = [] self._blockchain_service = blockchain_service @@ -77,7 +80,14 @@ class NetworkService(QObject): """ connectors = [] - for node in node_processor.nodes(currency): + sample = [] + for n in node_processor.online_nodes(currency): + for e in n.endpoints: + if isinstance(e, BMAEndpoint): + sample.append(n) + continue + + for node in random.sample(sample, 6): connectors.append(NodeConnector(node, app.parameters)) network = cls(app, currency, node_processor, connectors, blockchain_service, identities_service) return network @@ -125,6 +135,7 @@ class NetworkService(QObject): Add a nod to the network. """ self._connectors.append(node_connector) + node_connector.block_found.connect(self.handle_new_block, type=Qt.UniqueConnection|Qt.QueuedConnection) node_connector.changed.connect(self.handle_change, type=Qt.UniqueConnection|Qt.QueuedConnection) node_connector.identity_changed.connect(self.handle_identity_change, type=Qt.UniqueConnection|Qt.QueuedConnection) node_connector.neighbour_found.connect(self.handle_new_node, type=Qt.UniqueConnection|Qt.QueuedConnection) @@ -158,7 +169,7 @@ class NetworkService(QObject): self.node_removed.emit(connector.node) for connector in self._connectors: - if self.continue_crawling(): + if self.continue_crawling() and len(self._connectors) <= 10: await connector.init_session() connector.refresh() if not first_loop: @@ -188,30 +199,19 @@ class NetworkService(QObject): connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) node = connector.node self._processor.insert_node(connector.node) - await connector.init_session() - connector.refresh(manual=True) - self.add_connector(connector) self.new_node_found.emit(node) except InvalidNodeCurrency as e: self._logger.debug(str(e)) if node and updated and self._blockchain_service.initialized(): try: - connector = next(conn for conn in self._connectors if conn.node == node) - except StopIteration: - self._logger.warning("A node not associated to" - " a connector was encoutered : {:}" - .format(node.pubkey[:7])) - else: - connector.refresh_summary() - try: - identity = await self._identities_service.find_from_pubkey(node.pubkey) - identity = await self._identities_service.load_requirements(identity) - node.member = identity.member - node.uid = identity.uid - self._processor.update_node(node) - self.node_changed.emit(node) - except errors.DuniterError as e: - self._logger.error(e.message) + identity = await self._identities_service.find_from_pubkey(node.pubkey) + identity = await self._identities_service.load_requirements(identity) + node.member = identity.member + node.uid = identity.uid + self._processor.update_node(node) + self.node_changed.emit(node) + except errors.DuniterError as e: + self._logger.error(e.message) def handle_new_node(self, peer): key = VerifyingKey(peer.pubkey) @@ -229,26 +229,57 @@ class NetworkService(QObject): self._processor.update_node(connector.node) self.node_changed.emit(connector.node) + def handle_new_block(self, block_uid): + if not self._ws2p_heads_refreshing: + self._ws2p_heads_refreshing = True + if self.current_buid() != block_uid: + asyncio.async(self.check_ws2p_heads()) + + async def check_ws2p_heads(self): + await asyncio.sleep(5) + futures = [] + for connector in self._connectors: + futures.append(connector.request_ws2p_heads()) + + responses = await asyncio.gather(*futures, return_exceptions=True) + + ws2p_heads = {} + for r in responses: + if isinstance(r, errors.DuniterError): + self._logger.debug("Exception in responses : " + str(r)) + continue + else: + if r: + for head_data in r["heads"]: + if "messageV2" in head_data: + head = HeadV2.from_inline(head_data["messageV2"], head_data["sigV2"]) + else: + head = HeadV1.from_inline(head_data["messageV2"], head_data["sigV2"]) + + VerifyingKey(head.pubkey).verify_ws2p_head(head) + if head.pubkey in ws2p_heads: + if ws2p_heads[head.pubkey].blockstamp < head.blockstamp: + ws2p_heads[head.pubkey] = head + else: + ws2p_heads[head.pubkey] = head + + for head in ws2p_heads.values(): + node, updated = self._processor.update_ws2p(self.currency, head) + if node and updated: + self.node_changed.emit(node) + + self._ws2p_heads_refreshing = False + + current_buid = self._processor.current_buid(self.currency) + self._logger.debug("{0} -> {1}".format(self._block_found.sha_hash[:10], current_buid.sha_hash[:10])) + if self._block_found.sha_hash != current_buid.sha_hash: + self._logger.debug("Latest block changed : {0}".format(current_buid.number)) + self.latest_block_changed.emit(current_buid) + self._logger.debug("Start refresh") + self._block_found = current_buid + asyncio.ensure_future(self._blockchain_service.handle_blockchain_progress(self._block_found)) + def handle_change(self): node_connector = self.sender() self._processor.update_node(node_connector.node) - self.node_changed.emit(node_connector.node) - - if node_connector.node.state == Node.ONLINE: - current_buid = self._processor.current_buid(self.currency) - self._logger.debug("{0} -> {1}".format(self._block_found.sha_hash[:10], current_buid.sha_hash[:10])) - if self._block_found.sha_hash != current_buid.sha_hash: - self._logger.debug("Latest block changed : {0}".format(current_buid.number)) - self.latest_block_changed.emit(current_buid) - # If new latest block is lower than the previously found one - # or if the previously found block is different locally - # than in the main chain, we declare a rollback - if current_buid < self._block_found \ - or node_connector.node.previous_buid != self._block_found: - self._logger.debug("Start rollback") - self._block_found = current_buid - #TODO: self._blockchain_service.rollback() - else: - self._logger.debug("Start refresh") - self._block_found = current_buid - asyncio.ensure_future(self._blockchain_service.handle_blockchain_progress(self._block_found)) + self.node_changed.emit(node_connector.node) \ No newline at end of file