From 3df2c503139ec400eab0c66c65c5e0a8b91a4c78 Mon Sep 17 00:00:00 2001 From: inso <insomniak.fr@gmaiL.com> Date: Wed, 14 Mar 2018 08:22:55 +0100 Subject: [PATCH] Fixes in circuit breaker pattern --- src/sakia/data/connectors/bma.py | 6 ++++++ src/sakia/data/processors/nodes.py | 11 +++++++++-- src/sakia/data/repositories/nodes.py | 11 +++++++++++ src/sakia/services/network.py | 2 +- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/sakia/data/connectors/bma.py b/src/sakia/data/connectors/bma.py index 071bd1b0..456f7d82 100644 --- a/src/sakia/data/connectors/bma.py +++ b/src/sakia/data/connectors/bma.py @@ -182,6 +182,8 @@ class BmaConnector: async def verified_get(self, currency, request, req_args): # If no node is known as a member, lookup synced nodes as a fallback synced_nodes = self._nodes_processor.synced_nodes(currency) + offline_nodes = self._nodes_processor.offline_synced_nodes(currency) + random_offline_node = random.sample(offline_nodes, min(1, len(offline_nodes))) nodes_generator = (n for n in synced_nodes) answers = {} answers_data = {} @@ -205,6 +207,10 @@ class BmaConnector: futures.append(self._verified_request(node, request(next( endpoint.conn_handler(session, proxy=self._user_parameters.proxy())), **req_args))) + if random_offline_node: + futures.append(self._verified_request(random_offline_node[0], request(next( + endpoint.conn_handler(session, proxy=self._user_parameters.proxy())), + **req_args))) except StopIteration: # When no more node is available, we go out of the while loop break diff --git a/src/sakia/data/processors/nodes.py b/src/sakia/data/processors/nodes.py index 1e334fc4..feb2db0b 100644 --- a/src/sakia/data/processors/nodes.py +++ b/src/sakia/data/processors/nodes.py @@ -40,6 +40,13 @@ class NodesProcessor: current_buid = self._repo.current_buid(currency=currency) return self._repo.get_synced_nodes(currency, current_buid) + def offline_synced_nodes(self, currency): + """ + Get nodes which are in the ONLINE state. + """ + current_buid = self._repo.current_buid(currency=currency) + return self._repo.get_offline_synced_nodes(currency, current_buid) + def synced_members_nodes(self, currency): """ Get nodes which are in the ONLINE state. @@ -168,13 +175,13 @@ class NodesProcessor: def handle_success(self, node): if not node.online(): node.last_state_change = time.time() - node.state = node.state - 1 + node.state = max(0, node.state - 1) self.update_node(node) def handle_failure(self, node, weight=1): if node.state + weight > Node.FAILURE_THRESHOLD and node.online(): node.last_state_change = time.time() - node.state += weight + node.state = min(5, node.state + weight) self.update_node(node) def drop_all(self, currency): diff --git a/src/sakia/data/repositories/nodes.py b/src/sakia/data/repositories/nodes.py index c670a81d..d09ff704 100644 --- a/src/sakia/data/repositories/nodes.py +++ b/src/sakia/data/repositories/nodes.py @@ -150,6 +150,17 @@ class NodesRepo: return [Node(*data) for data in datas] return [] + def get_offline_synced_nodes(self, currency, current_buid): + c = self._conn.execute("SELECT * FROM nodes " + "WHERE currency == ? " + "AND state > ?" + "AND current_buid == ?", + (currency, Node.FAILURE_THRESHOLD, current_buid)) + datas = c.fetchall() + if datas: + return [Node(*data) for data in datas] + return [] + def current_buid(self, currency): c = self._conn.execute("""SELECT COUNT(`uid`) FROM `nodes` diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index f4e10d75..60d8ded2 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -209,13 +209,13 @@ class NetworkService(QObject): except InvalidNodeCurrency as e: self._logger.debug(str(e)) if self._blockchain_service.initialized(): + self._processor.handle_success(node) try: identity = await self._identities_service.find_from_pubkey(node.pubkey) identity = await self._identities_service.load_requirements(identity) node.member = identity.member node.uid = identity.uid self._processor.update_node(node) - self._processor.handle_success(node) self.node_changed.emit(node) except errors.DuniterError as e: self._logger.error(e.message) -- GitLab