diff --git a/src/sakia/data/connectors/bma.py b/src/sakia/data/connectors/bma.py index 6515f5d0727ca3598316c66e3c07aa7939844483..1d91c91967dc15f83b4c055fc813fb4b489432bf 100644 --- a/src/sakia/data/connectors/bma.py +++ b/src/sakia/data/connectors/bma.py @@ -163,6 +163,14 @@ class BmaConnector: _user_parameters = attr.ib() _logger = attr.ib(default=attr.Factory(lambda: logging.getLogger('sakia'))) + async def _verified_request(self, node, request): + try: + return await request + except BaseException as e: + self._logger.debug(str(e)) + self._nodes_processor.handle_failure(node) + return e + async def verified_get(self, currency, request, req_args): synced_nodes = self._nodes_processor.synced_members_nodes(currency) if not synced_nodes: @@ -171,7 +179,7 @@ class BmaConnector: nodes_generator = (n for n in synced_nodes) answers = {} answers_data = {} - nb_verification = min(max(1, 0.66 * len(synced_nodes)), 10) + nb_verification = min(max(1, 0.66 * len(synced_nodes)), 6) # We try to find agreeing nodes from one 1 to 66% of nodes, max 10 session = aiohttp.ClientSession() filtered_data = {} @@ -180,7 +188,7 @@ class BmaConnector: futures = [] try: - for i in range(0, int(nb_verification)+1): + for i in range(0, int(nb_verification*1.4)+1): node = next(nodes_generator) endpoints = filter_endpoints(request, [node]) if not endpoints: @@ -188,9 +196,9 @@ class BmaConnector: endpoint = random.choice(endpoints) self._logger.debug( "Requesting {0} on endpoint {1}".format(str(request.__name__), str(endpoint))) - futures.append(request(next( + futures.append(self._verified_request(node, request(next( endpoint.conn_handler(session, proxy=self._user_parameters.proxy())), - **req_args)) + **req_args))) except StopIteration: # When no more node is available, we go out of the while loop break diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index 4f1068b4e25503e983ea3484796d57bab46bbb0b..e31a63c49a628820ce5fd1812bc1b9a7d76ddfea 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -31,12 +31,11 @@ class NodeConnector(QObject): A node is a peer send from the client point of view. """ changed = pyqtSignal() - error = pyqtSignal() + success = pyqtSignal() + failure = pyqtSignal(int) identity_changed = pyqtSignal() neighbour_found = pyqtSignal(Peer) block_found = pyqtSignal(BlockUID) - - FAILURE_THRESHOLD = 3 def __init__(self, node, user_parameters, session=None): """ @@ -101,7 +100,8 @@ class NodeConnector(QObject): if currency and peer.currency != currency: raise InvalidNodeCurrency(currency, peer.currency) - node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID, last_state_change=time.time()) + node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID, + current_buid=peer.blockUID, last_state_change=time.time()) logging.getLogger('sakia').debug("Node from peer : {:}".format(str(node))) return cls(node, user_parameters, session=None) @@ -121,7 +121,7 @@ class NodeConnector(QObject): self.handle_failure() except jsonschema.ValidationError as e: self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) - self.change_state_and_emit(Node.CORRUPTED) + self.handle_failure(weight=3) except RuntimeError: if self.session.closed: pass @@ -197,7 +197,7 @@ class NodeConnector(QObject): self.handle_failure() except jsonschema.ValidationError as e: self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) - self.change_state_and_emit(Node.CORRUPTED) + self.handle_failure(weight=3) except RuntimeError: if self.session.closed: pass @@ -270,7 +270,6 @@ class NodeConnector(QObject): proxy=self._user_parameters.proxy()) if not peers_data: continue - self.node.state = Node.ONLINE if peers_data['root'] != self.node.merkle_peers_root: leaves = [leaf for leaf in peers_data['leaves'] if leaf not in self.node.merkle_peers_leaves] @@ -333,7 +332,7 @@ class NodeConnector(QObject): proxy=self._user_parameters.proxy()) if not heads_data: continue - self.node.state = Node.ONLINE + self.handle_success() return heads_data # Break endpoints loop except errors.DuniterError as e: self._logger.debug("Error in peers reply : {0}".format(str(e))) @@ -344,19 +343,9 @@ class NodeConnector(QObject): else: self._logger.debug("Could not connect to any BMA endpoint") self.handle_failure() - + def handle_success(self): - self.failure_count = 0 - self.change_state_and_emit(Node.ONLINE) - - def handle_failure(self, weight=1): - self.failure_count += weight - if self.failure_count > NodeConnector.FAILURE_THRESHOLD: - self.change_state_and_emit(Node.OFFLINE) + self.success.emit() - def change_state_and_emit(self, new_state): - if self.node.state != new_state: - self._logger.debug("Changing state {0} > {1}".format(self.node.state, new_state)) - self.node.last_state_change = time.time() - self.node.state = new_state - self.changed.emit() + def handle_failure(self, weight=1): + self.failure.emit(weight) diff --git a/src/sakia/data/entities/node.py b/src/sakia/data/entities/node.py index e1e560d5626c32c25db82d70a0c4259982533354..dc9c91f17c6c2836c8573dfd1a4df034aa2989c1 100644 --- a/src/sakia/data/entities/node.py +++ b/src/sakia/data/entities/node.py @@ -27,16 +27,16 @@ class Node: """ A node can have multiple states : - - ONLINE : The node is available for requests - - OFFLINE: The node is disconnected + - ONLINE <= 3: The node is available for requests + - OFFLINE > 3: The node is disconnected - DESYNCED : The node is online but is desynced from the network - - CORRUPTED : The node is corrupted, some weird behaviour is going on """ MERKLE_EMPTY_ROOT = "01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b" - ONLINE = 1 - OFFLINE = 2 - CORRUPTED = 4 + FAILURE_THRESHOLD = 3 + + def online(self): + return self.state <= Node.FAILURE_THRESHOLD # The currency handled by this node currency = attr.ib(convert=str) @@ -55,7 +55,7 @@ class Node: # The previous block uid in /blockchain/current previous_buid = attr.ib(convert=block_uid, cmp=False, default=None, hash=False) # The state of the node in Sakia - state = attr.ib(convert=int, cmp=False, default=OFFLINE, hash=False) + state = attr.ib(convert=int, cmp=False, default=0, hash=False) # The version of the software in /node/summary software = attr.ib(convert=str, cmp=False, default="", hash=False) # The version of the software in /node/summary diff --git a/src/sakia/data/processors/nodes.py b/src/sakia/data/processors/nodes.py index 4cd71993dd6b7797e1acfa5518aa2f1356964d2e..a128b9308a750c9ce61277b264c4d9678fdb21c5 100644 --- a/src/sakia/data/processors/nodes.py +++ b/src/sakia/data/processors/nodes.py @@ -4,6 +4,7 @@ from sakia.constants import ROOT_SERVERS from ..entities import Node from duniterpy.documents import BlockUID, endpoint import logging +import time @attr.s @@ -21,7 +22,7 @@ class NodesProcessor: pubkey=pubkey, endpoints=ROOT_SERVERS[currency]["nodes"][pubkey], peer_blockstamp=BlockUID.empty(), - state=Node.ONLINE) + state=0) self._repo.insert(node) def current_buid(self, currency): @@ -37,20 +38,20 @@ class NodesProcessor: Get nodes which are in the ONLINE state. """ current_buid = self._repo.current_buid(currency=currency) - return self._repo.get_all(currency=currency, state=Node.ONLINE, current_buid=current_buid) + return self._repo.get_synced_nodes(currency, current_buid) def synced_members_nodes(self, currency): """ Get nodes which are in the ONLINE state. """ current_buid = self._repo.current_buid(currency=currency) - return self._repo.get_all(currency=currency, state=Node.ONLINE, member=True, current_buid=current_buid) + return self._repo.get_synced_members_nodes(currency, current_buid) def online_nodes(self, currency): """ Get nodes which are in the ONLINE state. """ - return self._repo.get_all(currency=currency, state=Node.ONLINE) + return self._repo.get_online_nodes(currency) def delete_node(self, node): self._repo.drop(node) @@ -144,7 +145,7 @@ class NodesProcessor: logging.debug("Update node : {0}".format(head.pubkey[:5])) node.previous_buid = node.current_buid node.current_buid = head.blockstamp - node.state = Node.ONLINE + node.state = 0 self._repo.update(node) return node, True return node, False @@ -165,6 +166,18 @@ class NodesProcessor: return node, True return node, False + def handle_success(self, node): + if not node.online(): + node.last_state_change = time.time() + node.state = 0 + self.update_node(node) + + def handle_failure(self, node, weight=1): + if node.state + weight > Node.FAILURE_THRESHOLD and node.online(): + node.last_state_change = time.time() + node.state += weight + self.update_node(node) + def drop_all(self, currency): nodes = self._repo.get_all() for n in nodes: diff --git a/src/sakia/data/repositories/nodes.py b/src/sakia/data/repositories/nodes.py index 7ff4a0ec0004928d530ffef8d6c1cb0dc2c21183..3d537093b0dcdb3774d7160e53c131966a32a8f2 100644 --- a/src/sakia/data/repositories/nodes.py +++ b/src/sakia/data/repositories/nodes.py @@ -111,14 +111,61 @@ class NodesRepo: WHERE currency=? AND pubkey=?""", where_fields) + def get_offline_nodes(self, currency): + c = self._conn.execute("SELECT * FROM nodes WHERE currency == ? AND state > ?;", + (currency, Node.FAILURE_THRESHOLD)) + datas = c.fetchall() + if datas: + return [Node(*data) for data in datas] + return [] + + def get_synced_nodes(self, currency, current_buid): + c = self._conn.execute("SELECT * FROM nodes " + "WHERE currency == ? " + "AND state <= ?" + "AND current_buid == ?", + (currency, Node.FAILURE_THRESHOLD, current_buid)) + datas = c.fetchall() + if datas: + return [Node(*data) for data in datas] + return [] + + def get_synced_members_nodes(self, currency, current_buid): + c = self._conn.execute("SELECT * FROM nodes " + "WHERE currency == ? " + "AND state <= ?" + "AND current_buid == ?" + "AND member == 1", + (currency, Node.FAILURE_THRESHOLD, current_buid)) + datas = c.fetchall() + if datas: + return [Node(*data) for data in datas] + return [] + + def get_online_nodes(self, currency): + c = self._conn.execute("SELECT * FROM nodes WHERE currency == ? AND state <= ?;", + (currency, Node.FAILURE_THRESHOLD)) + datas = c.fetchall() + if datas: + return [Node(*data) for data in datas] + return [] + def current_buid(self, currency): - c = self._conn.execute("""SELECT `current_buid`, + req = """SELECT `current_buid`, COUNT(`current_buid`) AS `value_occurrence` FROM `nodes` WHERE state == 1 AND member == 1 AND currency == ? GROUP BY `current_buid` ORDER BY `value_occurrence` DESC - LIMIT 1;""", (currency,)) + LIMIT 1;""" + + c = self._conn.execute("""SELECT `current_buid`, + COUNT(`current_buid`) AS `value_occurrence` + FROM `nodes` + WHERE state <= ? AND member == 1 AND currency == ? + GROUP BY `current_buid` + ORDER BY `value_occurrence` DESC + LIMIT 1;""", (Node.FAILURE_THRESHOLD, currency,)) data = c.fetchone() if data: return block_uid(data[0]) @@ -126,10 +173,10 @@ class NodesRepo: c = self._conn.execute("""SELECT `current_buid`, COUNT(`current_buid`) AS `value_occurrence` FROM `nodes` - WHERE state == 1 AND currency == ? + WHERE state <= ? AND currency == ? GROUP BY `current_buid` ORDER BY `value_occurrence` DESC - LIMIT 1;""", (currency,)) + LIMIT 1;""", (Node.FAILURE_THRESHOLD, currency,)) data = c.fetchone() if data: return block_uid(data[0]) diff --git a/src/sakia/gui/navigation/network/table_model.py b/src/sakia/gui/navigation/network/table_model.py index 2681f4c7bce1ea587734cf18ba09836346a517c5..97fbb93c7b25595e96a68a9e43b71adb118c45b1 100644 --- a/src/sakia/gui/navigation/network/table_model.py +++ b/src/sakia/gui/navigation/network/table_model.py @@ -142,26 +142,25 @@ class NetworkTableModel(QAbstractTableModel): 'node' ) + ONLINE = 0 + OFFLINE = 1 DESYNCED = 3 node_colors = { - Node.ONLINE: QColor('#99ff99'), - Node.OFFLINE: QColor('#ff9999'), - DESYNCED: QColor('#ffbd81'), - Node.CORRUPTED: QColor(Qt.lightGray) + ONLINE: QColor('#99ff99'), + OFFLINE: QColor('#ff9999'), + DESYNCED: QColor('#ffbd81') } node_icons = { - Node.ONLINE: ":/icons/synchronized", - Node.OFFLINE: ":/icons/offline", - DESYNCED: ":/icons/forked", - Node.CORRUPTED: ":/icons/corrupted" + ONLINE: ":/icons/synchronized", + OFFLINE: ":/icons/offline", + DESYNCED: ":/icons/forked" } node_states = { - Node.ONLINE: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Online'), - Node.OFFLINE: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Offline'), - DESYNCED: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Unsynchronized'), - Node.CORRUPTED: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Corrupted') + ONLINE: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Online'), + OFFLINE: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Offline'), + DESYNCED: lambda: QT_TRANSLATE_NOOP("NetworkTableModel", 'Unsynchronized') } def __init__(self, network_service, parent=None): @@ -226,7 +225,11 @@ class NetworkTableModel(QAbstractTableModel): state = node.state if not current_buid: current_buid = self.network_service.current_buid() - if node.state == Node.ONLINE and node.current_buid != current_buid: + if node.online(): + state = NetworkTableModel.ONLINE + else: + state = NetworkTableModel.OFFLINE + if node.online() and node.current_buid != current_buid: state = NetworkTableModel.DESYNCED return (address, port, api, number, block_hash, node.uid, diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index 9b6a401827a899958f098b207b2601f86fd3d7d4..cce6991db41759e5d2aa7b5422aa40dc504d5175 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -159,22 +159,17 @@ class NetworkService(QObject): self.refresh_once() while self.continue_crawling(): if not first_loop: - for connector in self._connectors: - if connector.node.state in (Node.OFFLINE, Node.CORRUPTED) \ - and connector.node.last_state_change + 3600 < time.time(): - await connector.close_ws() - connector.disconnect() + for node in self._processor.nodes(self.currency): + if node.state > Node.FAILURE_THRESHOLD and node.last_state_change + 3600 < time.time(): + for connector in self._connectors: + if connector.node.pubkey == node.pubkey: + await connector.close_ws() + connector.disconnect() + self._connectors.remove(connector) self._processor.delete_node(connector.node) - self._connectors.remove(connector) self.node_removed.emit(connector.node) - for connector in self._connectors: - if self.continue_crawling() and len(self._connectors) <= 10: - await connector.init_session() - connector.refresh() - if not first_loop: - await asyncio.sleep(15) - + self.run_ws2p_check() first_loop = False await asyncio.sleep(15) @@ -193,25 +188,26 @@ class NetworkService(QObject): await asyncio.sleep(2) else: node, updated = self._processor.update_peer(self.currency, peer) - if not node and peer.blockUID.number + 2400 > self.current_buid().number: - self._logger.debug("New node found : {0}".format(peer.pubkey[:5])) - try: - connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) - node = connector.node - self._processor.insert_node(connector.node) - self.new_node_found.emit(node) - except InvalidNodeCurrency as e: - self._logger.debug(str(e)) - if node and updated and self._blockchain_service.initialized(): - try: - identity = await self._identities_service.find_from_pubkey(node.pubkey) - identity = await self._identities_service.load_requirements(identity) - node.member = identity.member - node.uid = identity.uid - self._processor.update_node(node) - self.node_changed.emit(node) - except errors.DuniterError as e: - self._logger.error(e.message) + if peer.blockUID.number + 2400 > self.current_buid().number: + if not node: + self._logger.debug("New node found : {0}".format(peer.pubkey[:5])) + try: + connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) + node = connector.node + self._processor.insert_node(connector.node) + self.new_node_found.emit(node) + except InvalidNodeCurrency as e: + self._logger.debug(str(e)) + if self._blockchain_service.initialized(): + try: + identity = await self._identities_service.find_from_pubkey(node.pubkey) + identity = await self._identities_service.load_requirements(identity) + node.member = identity.member + node.uid = identity.uid + self._processor.update_node(node) + self.node_changed.emit(node) + except errors.DuniterError as e: + self._logger.error(e.message) def handle_new_node(self, peer): key = VerifyingKey(peer.pubkey) @@ -230,10 +226,13 @@ class NetworkService(QObject): self.node_changed.emit(connector.node) def handle_new_block(self, block_uid): + if self.current_buid() != block_uid: + self.run_ws2p_check() + + def run_ws2p_check(self): if not self._ws2p_heads_refreshing: self._ws2p_heads_refreshing = True - if self.current_buid() != block_uid: - asyncio.async(self.check_ws2p_heads()) + asyncio.async(self.check_ws2p_heads()) async def check_ws2p_heads(self): await asyncio.sleep(5) @@ -248,13 +247,15 @@ class NetworkService(QObject): if isinstance(r, errors.DuniterError): self._logger.debug("Exception in responses : " + str(r)) continue + elif isinstance(r, BaseException): + self._logger.debug("Exception in responses : " + str(r)) else: if r: for head_data in r["heads"]: if "messageV2" in head_data: - head = HeadV2.from_inline(head_data["messageV2"], head_data["sigV2"]) + head, _ = HeadV2.from_inline(head_data["messageV2"], head_data["sigV2"]) else: - head = HeadV1.from_inline(head_data["messageV2"], head_data["sigV2"]) + head, _ = HeadV1.from_inline(head_data["message"], head_data["sig"]) VerifyingKey(head.pubkey).verify_ws2p_head(head) if head.pubkey in ws2p_heads: @@ -282,4 +283,14 @@ class NetworkService(QObject): def handle_change(self): node_connector = self.sender() self._processor.update_node(node_connector.node) - self.node_changed.emit(node_connector.node) \ No newline at end of file + self.node_changed.emit(node_connector.node) + + def handle_success(self): + node_connector = self.sender() + self._processor.handle_success(node_connector.node) + self.changed.emit() + + def handle_failure(self, weight=1): + node_connector = self.sender() + self._processor.handle_failure(node_connector.node, weight) + self.changed.emit() diff --git a/tests/unit/data/test_nodes_repo.py b/tests/unit/data/test_nodes_repo.py index ac83284f2442f52db2fd4be24cf4ff86fc36f85d..e06d11bf1a33e199da0498e5a3d1a8f68ec0b3fd 100644 --- a/tests/unit/data/test_nodes_repo.py +++ b/tests/unit/data/test_nodes_repo.py @@ -15,7 +15,7 @@ UNKNOWNAPI some useless information""", current_buid="15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", current_ts=12376543345, previous_buid="14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", - state=Node.ONLINE, + state=0, software="duniter", version="0.30.17") nodes_repo.insert(inserted) @@ -28,7 +28,7 @@ UNKNOWNAPI some useless information""", assert node.endpoints[2] == UnknownEndpoint("UNKNOWNAPI", ["some", "useless", "information"]) assert node.previous_buid == block_uid("14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67") assert node.current_buid == block_uid("15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67") - assert node.state == Node.ONLINE + assert node.state == 0 assert node.software == "duniter" assert node.version == "0.30.17" assert node.merkle_peers_root == Node.MERKLE_EMPTY_ROOT @@ -51,7 +51,7 @@ UNKNOWNAPI some useless information""", "15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", 12376543345, "14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", - Node.ONLINE, + 0, "duniter", "0.30.17")) nodes_repo.insert(Node("testcurrency", @@ -62,7 +62,7 @@ UNKNOWNAPI some useless information""", "18-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", 12376543345, "12-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", - Node.ONLINE, + 0, "duniter", "0.30.2a5")) nodes = nodes_repo.get_all(currency="testcurrency") @@ -83,7 +83,7 @@ UNKNOWNAPI some useless information""", "15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", 12376543345, "14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", - Node.ONLINE, + 0, "duniter") nodes_repo.insert(node) node.previous_buid = node.current_buid