Skip to content
Snippets Groups Projects
Commit 3df2c503 authored by inso's avatar inso
Browse files

Fixes in circuit breaker pattern

parent 36bd4459
No related branches found
No related tags found
No related merge requests found
...@@ -182,6 +182,8 @@ class BmaConnector: ...@@ -182,6 +182,8 @@ class BmaConnector:
async def verified_get(self, currency, request, req_args): async def verified_get(self, currency, request, req_args):
# If no node is known as a member, lookup synced nodes as a fallback # If no node is known as a member, lookup synced nodes as a fallback
synced_nodes = self._nodes_processor.synced_nodes(currency) synced_nodes = self._nodes_processor.synced_nodes(currency)
offline_nodes = self._nodes_processor.offline_synced_nodes(currency)
random_offline_node = random.sample(offline_nodes, min(1, len(offline_nodes)))
nodes_generator = (n for n in synced_nodes) nodes_generator = (n for n in synced_nodes)
answers = {} answers = {}
answers_data = {} answers_data = {}
...@@ -205,6 +207,10 @@ class BmaConnector: ...@@ -205,6 +207,10 @@ class BmaConnector:
futures.append(self._verified_request(node, request(next( futures.append(self._verified_request(node, request(next(
endpoint.conn_handler(session, proxy=self._user_parameters.proxy())), endpoint.conn_handler(session, proxy=self._user_parameters.proxy())),
**req_args))) **req_args)))
if random_offline_node:
futures.append(self._verified_request(random_offline_node[0], request(next(
endpoint.conn_handler(session, proxy=self._user_parameters.proxy())),
**req_args)))
except StopIteration: except StopIteration:
# When no more node is available, we go out of the while loop # When no more node is available, we go out of the while loop
break break
......
...@@ -40,6 +40,13 @@ class NodesProcessor: ...@@ -40,6 +40,13 @@ class NodesProcessor:
current_buid = self._repo.current_buid(currency=currency) current_buid = self._repo.current_buid(currency=currency)
return self._repo.get_synced_nodes(currency, current_buid) return self._repo.get_synced_nodes(currency, current_buid)
def offline_synced_nodes(self, currency):
"""
Get nodes which are in the ONLINE state.
"""
current_buid = self._repo.current_buid(currency=currency)
return self._repo.get_offline_synced_nodes(currency, current_buid)
def synced_members_nodes(self, currency): def synced_members_nodes(self, currency):
""" """
Get nodes which are in the ONLINE state. Get nodes which are in the ONLINE state.
...@@ -168,13 +175,13 @@ class NodesProcessor: ...@@ -168,13 +175,13 @@ class NodesProcessor:
def handle_success(self, node): def handle_success(self, node):
if not node.online(): if not node.online():
node.last_state_change = time.time() node.last_state_change = time.time()
node.state = node.state - 1 node.state = max(0, node.state - 1)
self.update_node(node) self.update_node(node)
def handle_failure(self, node, weight=1): def handle_failure(self, node, weight=1):
if node.state + weight > Node.FAILURE_THRESHOLD and node.online(): if node.state + weight > Node.FAILURE_THRESHOLD and node.online():
node.last_state_change = time.time() node.last_state_change = time.time()
node.state += weight node.state = min(5, node.state + weight)
self.update_node(node) self.update_node(node)
def drop_all(self, currency): def drop_all(self, currency):
......
...@@ -150,6 +150,17 @@ class NodesRepo: ...@@ -150,6 +150,17 @@ class NodesRepo:
return [Node(*data) for data in datas] return [Node(*data) for data in datas]
return [] return []
def get_offline_synced_nodes(self, currency, current_buid):
c = self._conn.execute("SELECT * FROM nodes "
"WHERE currency == ? "
"AND state > ?"
"AND current_buid == ?",
(currency, Node.FAILURE_THRESHOLD, current_buid))
datas = c.fetchall()
if datas:
return [Node(*data) for data in datas]
return []
def current_buid(self, currency): def current_buid(self, currency):
c = self._conn.execute("""SELECT COUNT(`uid`) c = self._conn.execute("""SELECT COUNT(`uid`)
FROM `nodes` FROM `nodes`
......
...@@ -209,13 +209,13 @@ class NetworkService(QObject): ...@@ -209,13 +209,13 @@ class NetworkService(QObject):
except InvalidNodeCurrency as e: except InvalidNodeCurrency as e:
self._logger.debug(str(e)) self._logger.debug(str(e))
if self._blockchain_service.initialized(): if self._blockchain_service.initialized():
self._processor.handle_success(node)
try: try:
identity = await self._identities_service.find_from_pubkey(node.pubkey) identity = await self._identities_service.find_from_pubkey(node.pubkey)
identity = await self._identities_service.load_requirements(identity) identity = await self._identities_service.load_requirements(identity)
node.member = identity.member node.member = identity.member
node.uid = identity.uid node.uid = identity.uid
self._processor.update_node(node) self._processor.update_node(node)
self._processor.handle_success(node)
self.node_changed.emit(node) self.node_changed.emit(node)
except errors.DuniterError as e: except errors.DuniterError as e:
self._logger.error(e.message) self._logger.error(e.message)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment