From 1ea7e3dc9fb1acfa30560f3effc023ec545377c2 Mon Sep 17 00:00:00 2001 From: vtexier <vit@free.fr> Date: Mon, 16 Mar 2020 10:56:53 +0100 Subject: [PATCH] [fix] fix Unclosed client session error Enhance use of aiohttp.session. Cleanup network class --- src/sakia/data/connectors/bma.py | 209 +++++++++++---------- src/sakia/data/connectors/node.py | 303 ++++++++++++++---------------- src/sakia/services/network.py | 22 --- 3 files changed, 252 insertions(+), 282 deletions(-) diff --git a/src/sakia/data/connectors/bma.py b/src/sakia/data/connectors/bma.py index 67fe8ae1..5f360c92 100644 --- a/src/sakia/data/connectors/bma.py +++ b/src/sakia/data/connectors/bma.py @@ -3,7 +3,7 @@ import aiohttp from aiohttp import ClientError from duniterpy.api import client, bma, errors from duniterpy.api.endpoint import BMAEndpoint, SecuredBMAEndpoint -from duniterpy.api.client import parse_error +from duniterpy.api.client import parse_error, Client from sakia.errors import NoPeerAvailable from pkg_resources import parse_version from socket import gaierror @@ -171,7 +171,7 @@ class BmaConnector: _user_parameters = attr.ib() _logger = attr.ib(default=attr.Factory(lambda: logging.getLogger("sakia"))) - async def _verified_request(self, node, request): + async def _verified_request(self, node, session, request): try: res = await request self._nodes_processor.handle_success(node) @@ -183,9 +183,11 @@ class BmaConnector: else: return e except BaseException as e: - self._logger.debug(str(e)) + self._logger.debug(type(e)) self._nodes_processor.handle_failure(node) return e + finally: + await session.close() async def verified_get(self, currency, request, req_args): # If no node is known as a member, lookup synced nodes as a fallback @@ -197,81 +199,84 @@ class BmaConnector: answers_data = {} nb_verification = min(max(1, 0.66 * len(synced_nodes)), 3) # We try to find agreeing nodes from one 1 to 66% of nodes, max 10 - session = aiohttp.ClientSession() - try: - while ( - max([len(nodes) for nodes in answers.values()] + [0]) <= nb_verification - ): - futures = [] + while max([len(nodes) for nodes in answers.values()] + [0]) <= nb_verification: + futures = [] - try: - for i in range(0, int(nb_verification * 1.4) + 1): - node = next(nodes_generator) - endpoints = filter_endpoints(request, [node]) - if not endpoints: - continue - endpoint = random.choice(endpoints) - self._logger.debug( - "Requesting {0} on endpoint {1}".format( - str(request.__name__), str(endpoint) - ) - ) - # create client - _client = client.Client( - endpoint, session, proxy=self._user_parameters.proxy() - ) - futures.append( - self._verified_request(node, _client(request, **req_args)) - ) - if random_offline_node: - node = random_offline_node[0] - endpoints = filter_endpoints(request, [node]) - if not endpoints: - continue - endpoint = random.choice(endpoints) - self._logger.debug( - "Requesting {0} on endpoint {1}".format( - str(request.__name__), str(endpoint) - ) + try: + for i in range(0, int(nb_verification * 1.4) + 1): + node = next(nodes_generator) + endpoints = filter_endpoints(request, [node]) + if not endpoints: + continue + endpoint = random.choice(endpoints) + self._logger.debug( + "Requesting {0} on endpoint {1}".format( + str(request.__name__), str(endpoint) ) - # create client - _client = client.Client( - endpoint, session, proxy=self._user_parameters.proxy() + ) + # create client + _client = client.Client( + endpoint, None, proxy=self._user_parameters.proxy() + ) + futures.append( + self._verified_request( + node, _client.session, _client(request, **req_args) ) - futures.append( - self._verified_request(node, _client(request, **req_args)) + ) + if random_offline_node: + node = random_offline_node[0] + endpoints = filter_endpoints(request, [node]) + if not endpoints: + continue + endpoint = random.choice(endpoints) + self._logger.debug( + "Requesting {0} on endpoint {1}".format( + str(request.__name__), str(endpoint) ) - except StopIteration: - # When no more node is available, we go out of the while loop - break - finally: - # Everytime we go out of the while loop, we gather the futures - if futures: - responses = await asyncio.gather( - *futures, return_exceptions=True + ) + # create client + _client = client.Client( + endpoint, None, proxy=self._user_parameters.proxy() + ) + futures.append( + self._verified_request( + node, _client.session, _client(request, **req_args) ) - for r in responses: - if isinstance(r, errors.DuniterError): - if r.ucode == errors.HTTP_LIMITATION: - self._logger.debug( - "Exception in responses: " + r.message - ) - continue - else: - data_hash = hash(r.ucode) - elif isinstance(r, BaseException): - self._logger.debug("Exception in responses: " + str(r)) + ) + except StopIteration: + # When no more node is available, we go out of the while loop + break + finally: + # Everytime we go out of the while loop, we gather the futures + if futures: + responses = await asyncio.gather(*futures, return_exceptions=True) + for r in responses: + if isinstance(r, errors.DuniterError): + if r.ucode == errors.HTTP_LIMITATION: + self._logger.debug( + "Exception in responses: " + r.message + ) continue else: - filtered_data = _filter_data(request, r) - data_hash = make_hash(filtered_data) - answers_data[data_hash] = r - if data_hash not in answers: - answers[data_hash] = [node] + data_hash = hash(r.ucode) + elif isinstance(r, BaseException): + if str(r).strip() == "": + self._logger.debug( + "Exception in responses: {}".format(type(r)) + ) else: - answers[data_hash].append(node) - finally: - await session.close() + self._logger.debug( + "Exception in responses: {}".format(str(r)) + ) + continue + else: + filtered_data = _filter_data(request, r) + data_hash = make_hash(filtered_data) + answers_data[data_hash] = r + if data_hash not in answers: + answers[data_hash] = [node] + else: + answers[data_hash].append(node) if len(answers_data) > 0: if request is bma.wot.lookup: @@ -287,34 +292,38 @@ class BmaConnector: ) tries = 0 while tries < 3 and endpoints: - endpoint = random.choice(endpoints) - endpoints.remove(endpoint) - try: - self._logger.debug( - "Requesting {0} on endpoint {1}".format( - str(request.__name__), str(endpoint) + async with aiohttp.ClientSession() as session: + endpoint = random.choice(endpoints) + endpoints.remove(endpoint) + try: + self._logger.debug( + "Requesting {0} on endpoint {1}".format( + str(request.__name__), str(endpoint) + ) + ) + _client = client.Client( + endpoint, session, proxy=self._user_parameters.proxy() ) - ) - _client = client.Client(endpoint, proxy=self._user_parameters.proxy()) - return await _client(request, **req_args) - except errors.DuniterError as e: - if e.ucode == errors.HTTP_LIMITATION: + return await _client(request, **req_args) + except errors.DuniterError as e: + if e.ucode == errors.HTTP_LIMITATION: + self._logger.debug(str(e)) + tries += 1 + else: + raise + except ( + ClientError, + gaierror, + asyncio.TimeoutError, + ValueError, + jsonschema.ValidationError, + ) as e: self._logger.debug(str(e)) tries += 1 - else: - raise - except ( - ClientError, - gaierror, - asyncio.TimeoutError, - ValueError, - jsonschema.ValidationError, - ) as e: - self._logger.debug(str(e)) - tries += 1 - except AttributeError as e: - if ("feed_appdata", "do_handshake") in str(e): - self._logger.debug(str(e)) + except AttributeError as e: + if "feed_appdata" in str(e) or "do_handshake" in str(e): + self._logger.debug(str(e)) + raise NoPeerAvailable("", len(endpoints)) async def get(self, currency, request, req_args={}, verify=True): @@ -355,16 +364,16 @@ class BmaConnector: replies = [] if len(endpoints) > 0: - async with aiohttp.ClientSession() as session: - for endpoint in endpoints: + for endpoint in endpoints: + async with aiohttp.ClientSession() as session: self._logger.debug("Trying to connect to: " + str(endpoint)) _client = client.Client( - endpoint, proxy=self._user_parameters.proxy() + endpoint, session, proxy=self._user_parameters.proxy() ) reply = asyncio.ensure_future(_client(request, **req_args)) replies.append(reply) - result = await asyncio.gather(*replies, return_exceptions=True) - return tuple(result) + result = await asyncio.gather(*replies, return_exceptions=True) + return tuple(result) else: raise NoPeerAvailable("", len(endpoints)) diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index 35aaed69..78c71167 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -54,9 +54,6 @@ class NodeConnector(QObject): self._ws_tasks = {"block": None, "peer": None} self._connected = {"block": False, "peer": False} self._user_parameters = user_parameters - if not session: - session = aiohttp.ClientSession() - self.session = session self._raw_logger = logging.getLogger("sakia") self._logger = NodeConnectorLoggerAdapter( self._raw_logger, {"pubkey": self.node.pubkey} @@ -80,10 +77,11 @@ class NodeConnector(QObject): :rtype: sakia.core.net.Node """ endpoint = get_bma_endpoint_from_server_address(address, port, secured) - # Create Client from endpoint string in Duniter format - client = Client(endpoint, proxy=user_parameters.proxy()) - peer_data = client(bma.network.peering) + async with aiohttp.ClientSession() as session: + # Create Client from endpoint string in Duniter format + client = Client(endpoint, session, proxy=user_parameters.proxy()) + peer_data = client(bma.network.peering) peer = Peer.from_signed_raw( "{0}{1}\n".format(peer_data["raw"], peer_data["signature"]) @@ -130,41 +128,35 @@ class NodeConnector(QObject): return cls(node, user_parameters, session=None) async def safe_request(self, endpoint, request, proxy, req_args={}): - try: - client = Client(endpoint, self.session, proxy) - data = await client(request, **req_args) - return data - except errors.DuniterError as e: - if e.ucode == 1006: - self._logger.debug("{0}".format(str(e))) - else: - raise - except ( - ClientError, - gaierror, - TimeoutError, - ConnectionRefusedError, - ValueError, - ) as e: - self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) - self.handle_failure() - except jsonschema.ValidationError as e: - self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) - self.handle_failure(weight=3) - except RuntimeError: - if self.session.closed: - pass - else: - raise - except AttributeError as e: - if ("feed_appdata", "do_handshake") in str(e): - self._logger.debug(str(e)) - else: - raise - - async def init_session(self): - if not self.session: - self.session = aiohttp.ClientSession() + async with aiohttp.ClientSession() as session: + try: + client = Client(endpoint, session, proxy) + data = await client(request, **req_args) + return data + except errors.DuniterError as e: + if e.ucode == 1006: + self._logger.debug("{0}".format(str(e))) + else: + raise + except ( + ClientError, + gaierror, + TimeoutError, + ConnectionRefusedError, + ValueError, + ) as e: + self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) + self.handle_failure() + except jsonschema.ValidationError as e: + self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) + self.handle_failure(weight=3) + except RuntimeError as e: + self._logger.error(str(e)) + except AttributeError as e: + if "feed_appdata" in str(e) or "do_handshake" in str(e): + self._logger.debug(str(e)) + else: + raise async def close_ws(self): for ws in self._ws_tasks.values(): @@ -179,7 +171,6 @@ class NodeConnector(QObject): else: closed = True await asyncio.sleep(0) - await self.session.close() await asyncio.sleep(0) def refresh(self, manual=False): @@ -205,61 +196,58 @@ class NodeConnector(QObject): """ for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: if not self._connected["block"]: - try: - client = Client( - endpoint, self.session, self._user_parameters.proxy() - ) - - # Create Web Socket connection on block path (async method) - ws = await client(bma.ws.block) # Type: WSConnection - self._connected["block"] = True - self._logger.debug("Connected successfully to block ws") - - loop = True - # Iterate on each message received... - while loop: - # Wait and capture next message - try: - block_data = await ws.receive_json() - jsonschema.validate(block_data, bma.ws.WS_BLOCK_SCHEMA) - self._logger.debug("Received a block") - self.block_found.emit( - BlockUID(block_data["number"], block_data["hash"]) - ) - except TypeError as exception: - self._logger.debug(exception) - self.handle_failure() - break - - # Close session - await client.close() - - except (aiohttp.WSServerHandshakeError, ValueError) as e: - self._logger.debug( - "Websocket block {0}: {1}".format(type(e).__name__, str(e)) - ) - self.handle_failure() - except (ClientError, gaierror, TimeoutError) as e: - self._logger.debug("{0}: {1}".format(str(e), self.node.pubkey[:5])) - self.handle_failure() - except jsonschema.ValidationError as e: - self._logger.debug( - "{:}:{:}".format(str(e.__class__.__name__), str(e)) - ) - self.handle_failure(weight=3) - except RuntimeError: - if self.session.closed: - pass - else: - raise - except AttributeError as e: - if ("feed_appdata", "do_handshake") in str(e): - self._logger.debug(str(e)) - else: - raise - finally: - self._connected["block"] = False - self._ws_tasks["block"] = None + async with aiohttp.ClientSession() as session: + try: + client = Client( + endpoint, session, self._user_parameters.proxy() + ) + + # Create Web Socket connection on block path (async method) + ws = await client(bma.ws.block) # Type: WSConnection + self._connected["block"] = True + self._logger.debug("Connected successfully to block ws") + + loop = True + # Iterate on each message received... + while loop: + # Wait and capture next message + try: + block_data = await ws.receive_json() + jsonschema.validate(block_data, bma.ws.WS_BLOCK_SCHEMA) + self._logger.debug("Received a block") + self.block_found.emit( + BlockUID(block_data["number"], block_data["hash"]) + ) + except TypeError as exception: + self._logger.debug(exception) + self.handle_failure() + break + + except (aiohttp.WSServerHandshakeError, ValueError) as e: + self._logger.debug( + "Websocket block {0}: {1}".format(type(e).__name__, str(e)) + ) + self.handle_failure() + except (ClientError, gaierror, TimeoutError) as e: + self._logger.debug( + "{0}: {1}".format(str(e), self.node.pubkey[:5]) + ) + self.handle_failure() + except jsonschema.ValidationError as e: + self._logger.debug( + "{:}:{:}".format(str(e.__class__.__name__), str(e)) + ) + self.handle_failure(weight=3) + except RuntimeError as e: + self._logger.error(str(e)) + except AttributeError as e: + if "feed_appdata" in str(e) or "do_handshake" in str(e): + self._logger.debug(str(e)) + else: + raise + finally: + self._connected["block"] = False + self._ws_tasks["block"] = None async def connect_peers(self): """ @@ -268,65 +256,64 @@ class NodeConnector(QObject): """ for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: if not self._connected["peer"]: - try: - client = Client( - endpoint, self.session, self._user_parameters.proxy() - ) - - # Create Web Socket connection on peer path (async method) - ws = await client(bma.ws.peer) # Type: WSConnection - self._connected["peer"] = True - self._logger.debug("Connected successfully to peer ws") - - loop = True - # Iterate on each message received... - while loop: - try: - # Wait and capture next message - peer_data = await ws.receive_json() - jsonschema.validate(peer_data, bma.ws.WS_PEER_SCHEMA) - self._logger.debug("Received a peer") - self.refresh_peer_data(peer_data) - except TypeError as exception: - self._logger.debug(exception) - break - - # Close session - await client.close() - - except (aiohttp.WSServerHandshakeError, ValueError) as e: - self._logger.debug( - "Websocket peer {0}: {1}".format(type(e).__name__, str(e)) - ) - await self.request_peers() - except (ClientError, gaierror, TimeoutError) as e: - self._logger.debug( - "{:}:{:}".format(str(e.__class__.__name__), str(e)) - ) - self.handle_failure() - except jsonschema.ValidationError as e: - self._logger.debug( - "{:}:{:}".format(str(e.__class__.__name__), str(e)) - ) - self.handle_failure(weight=3) - except RuntimeError: - if self.session.closed: - pass - else: - raise - except AttributeError as e: - if ("feed_appdata", "do_handshake") in str(e): - self._logger.debug(str(e)) - else: - raise - finally: - self._connected["peer"] = False - self._ws_tasks["peer"] = None + async with aiohttp.ClientSession() as session: + try: + client = Client( + endpoint, session, self._user_parameters.proxy() + ) + + # Create Web Socket connection on peer path (async method) + ws = await client(bma.ws.peer) # Type: WSConnection + self._connected["peer"] = True + self._logger.debug("Connected successfully to peer ws") + + loop = True + # Iterate on each message received... + while loop: + try: + # Wait and capture next message + peer_data = await ws.receive_json() + jsonschema.validate(peer_data, bma.ws.WS_PEER_SCHEMA) + self._logger.debug("Received a peer") + self.refresh_peer_data(peer_data) + except TypeError as exception: + self._logger.debug(exception) + break + + # Close session + await client.close() + + except (aiohttp.WSServerHandshakeError, ValueError) as e: + self._logger.debug( + "Websocket peer {0}: {1}".format(type(e).__name__, str(e)) + ) + await self.request_peers() + except (ClientError, gaierror, TimeoutError) as e: + self._logger.debug( + "{:}:{:}".format(str(e.__class__.__name__), str(e)) + ) + self.handle_failure() + except jsonschema.ValidationError as e: + self._logger.debug( + "{:}:{:}".format(str(e.__class__.__name__), str(e)) + ) + self.handle_failure(weight=3) + except RuntimeError as e: + self._logger.error(str(e)) + except AttributeError as e: + if "feed_appdata" in str(e) or "do_handshake" in str(e): + self._logger.debug(str(e)) + else: + raise + finally: + self._connected["peer"] = False + self._ws_tasks["peer"] = None async def request_peers(self): """ Refresh the list of peers knew by this node """ + found_peer_data = False for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: try: peers_data = await self.safe_request( @@ -354,8 +341,9 @@ class NodeConnector(QObject): if not leaf_data: break self.refresh_peer_data(leaf_data["leaf"]["value"]) + found_peer_data = True except (AttributeError, ValueError) as e: - if ("feed_appdata", "do_handshake") in str(e): + if "feed_appdata" in str(e) or "do_handshake" in str(e): self._logger.debug(str(e)) else: self._logger.debug( @@ -386,9 +374,7 @@ class NodeConnector(QObject): self._logger.debug("Error in peers reply: {0}".format(str(e))) self.handle_failure() else: - if self.session.closed: - pass - else: + if not found_peer_data: self._logger.debug("Could not connect to any BMA endpoint") self.handle_failure() @@ -428,11 +414,8 @@ class NodeConnector(QObject): self._logger.debug("Error in peers reply: {0}".format(str(e))) self.handle_failure() else: - if self.session.closed: - pass - else: - self._logger.debug("Could not connect to any BMA endpoint") - self.handle_failure() + self._logger.debug("Could not connect to any BMA endpoint") + self.handle_failure() def handle_success(self): self.success.emit() diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index 4ffe1ebe..21b638b1 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -61,27 +61,6 @@ class NetworkService(QObject): self._identities_service = identities_service self._discovery_loop_task = None - @classmethod - def create(cls, node_processor, node_connector): - """ - Create a new network with one knew node - Crawls the nodes from the first node to build the - community network - - :param sakia.data.processors.NodeProcessor node_processor: The nodes processor - :param sakia.data.connectors.NodeConnector node_connector: The first connector of the network service - :return: - """ - connectors = [node_connector] - node_processor.insert_node(node_connector.node) - network = cls( - node_connector.node.currency, - node_processor, - connectors, - node_connector.session, - ) - return network - @classmethod def load( cls, app, currency, node_processor, blockchain_service, identities_service @@ -176,7 +155,6 @@ class NetworkService(QObject): async def refresh_once(self): for connector in self._connectors: await asyncio.sleep(1) - await connector.init_session() connector.refresh(manual=True) async def discover_network(self): -- GitLab