Skip to content
Snippets Groups Projects
Commit 1ea7e3dc authored by Vincent Texier's avatar Vincent Texier
Browse files

[fix] fix Unclosed client session error

Enhance use of aiohttp.session.
Cleanup network class
parent ca076736
No related branches found
No related tags found
1 merge request!777Dev
...@@ -3,7 +3,7 @@ import aiohttp ...@@ -3,7 +3,7 @@ import aiohttp
from aiohttp import ClientError from aiohttp import ClientError
from duniterpy.api import client, bma, errors from duniterpy.api import client, bma, errors
from duniterpy.api.endpoint import BMAEndpoint, SecuredBMAEndpoint from duniterpy.api.endpoint import BMAEndpoint, SecuredBMAEndpoint
from duniterpy.api.client import parse_error from duniterpy.api.client import parse_error, Client
from sakia.errors import NoPeerAvailable from sakia.errors import NoPeerAvailable
from pkg_resources import parse_version from pkg_resources import parse_version
from socket import gaierror from socket import gaierror
...@@ -171,7 +171,7 @@ class BmaConnector: ...@@ -171,7 +171,7 @@ class BmaConnector:
_user_parameters = attr.ib() _user_parameters = attr.ib()
_logger = attr.ib(default=attr.Factory(lambda: logging.getLogger("sakia"))) _logger = attr.ib(default=attr.Factory(lambda: logging.getLogger("sakia")))
async def _verified_request(self, node, request): async def _verified_request(self, node, session, request):
try: try:
res = await request res = await request
self._nodes_processor.handle_success(node) self._nodes_processor.handle_success(node)
...@@ -183,9 +183,11 @@ class BmaConnector: ...@@ -183,9 +183,11 @@ class BmaConnector:
else: else:
return e return e
except BaseException as e: except BaseException as e:
self._logger.debug(str(e)) self._logger.debug(type(e))
self._nodes_processor.handle_failure(node) self._nodes_processor.handle_failure(node)
return e return e
finally:
await session.close()
async def verified_get(self, currency, request, req_args): async def verified_get(self, currency, request, req_args):
# If no node is known as a member, lookup synced nodes as a fallback # If no node is known as a member, lookup synced nodes as a fallback
...@@ -197,11 +199,7 @@ class BmaConnector: ...@@ -197,11 +199,7 @@ class BmaConnector:
answers_data = {} answers_data = {}
nb_verification = min(max(1, 0.66 * len(synced_nodes)), 3) nb_verification = min(max(1, 0.66 * len(synced_nodes)), 3)
# We try to find agreeing nodes from one 1 to 66% of nodes, max 10 # We try to find agreeing nodes from one 1 to 66% of nodes, max 10
session = aiohttp.ClientSession() while max([len(nodes) for nodes in answers.values()] + [0]) <= nb_verification:
try:
while (
max([len(nodes) for nodes in answers.values()] + [0]) <= nb_verification
):
futures = [] futures = []
try: try:
...@@ -218,10 +216,12 @@ class BmaConnector: ...@@ -218,10 +216,12 @@ class BmaConnector:
) )
# create client # create client
_client = client.Client( _client = client.Client(
endpoint, session, proxy=self._user_parameters.proxy() endpoint, None, proxy=self._user_parameters.proxy()
) )
futures.append( futures.append(
self._verified_request(node, _client(request, **req_args)) self._verified_request(
node, _client.session, _client(request, **req_args)
)
) )
if random_offline_node: if random_offline_node:
node = random_offline_node[0] node = random_offline_node[0]
...@@ -236,10 +236,12 @@ class BmaConnector: ...@@ -236,10 +236,12 @@ class BmaConnector:
) )
# create client # create client
_client = client.Client( _client = client.Client(
endpoint, session, proxy=self._user_parameters.proxy() endpoint, None, proxy=self._user_parameters.proxy()
) )
futures.append( futures.append(
self._verified_request(node, _client(request, **req_args)) self._verified_request(
node, _client.session, _client(request, **req_args)
)
) )
except StopIteration: except StopIteration:
# When no more node is available, we go out of the while loop # When no more node is available, we go out of the while loop
...@@ -247,9 +249,7 @@ class BmaConnector: ...@@ -247,9 +249,7 @@ class BmaConnector:
finally: finally:
# Everytime we go out of the while loop, we gather the futures # Everytime we go out of the while loop, we gather the futures
if futures: if futures:
responses = await asyncio.gather( responses = await asyncio.gather(*futures, return_exceptions=True)
*futures, return_exceptions=True
)
for r in responses: for r in responses:
if isinstance(r, errors.DuniterError): if isinstance(r, errors.DuniterError):
if r.ucode == errors.HTTP_LIMITATION: if r.ucode == errors.HTTP_LIMITATION:
...@@ -260,7 +260,14 @@ class BmaConnector: ...@@ -260,7 +260,14 @@ class BmaConnector:
else: else:
data_hash = hash(r.ucode) data_hash = hash(r.ucode)
elif isinstance(r, BaseException): elif isinstance(r, BaseException):
self._logger.debug("Exception in responses: " + str(r)) if str(r).strip() == "":
self._logger.debug(
"Exception in responses: {}".format(type(r))
)
else:
self._logger.debug(
"Exception in responses: {}".format(str(r))
)
continue continue
else: else:
filtered_data = _filter_data(request, r) filtered_data = _filter_data(request, r)
...@@ -270,8 +277,6 @@ class BmaConnector: ...@@ -270,8 +277,6 @@ class BmaConnector:
answers[data_hash] = [node] answers[data_hash] = [node]
else: else:
answers[data_hash].append(node) answers[data_hash].append(node)
finally:
await session.close()
if len(answers_data) > 0: if len(answers_data) > 0:
if request is bma.wot.lookup: if request is bma.wot.lookup:
...@@ -287,6 +292,7 @@ class BmaConnector: ...@@ -287,6 +292,7 @@ class BmaConnector:
) )
tries = 0 tries = 0
while tries < 3 and endpoints: while tries < 3 and endpoints:
async with aiohttp.ClientSession() as session:
endpoint = random.choice(endpoints) endpoint = random.choice(endpoints)
endpoints.remove(endpoint) endpoints.remove(endpoint)
try: try:
...@@ -295,7 +301,9 @@ class BmaConnector: ...@@ -295,7 +301,9 @@ class BmaConnector:
str(request.__name__), str(endpoint) str(request.__name__), str(endpoint)
) )
) )
_client = client.Client(endpoint, proxy=self._user_parameters.proxy()) _client = client.Client(
endpoint, session, proxy=self._user_parameters.proxy()
)
return await _client(request, **req_args) return await _client(request, **req_args)
except errors.DuniterError as e: except errors.DuniterError as e:
if e.ucode == errors.HTTP_LIMITATION: if e.ucode == errors.HTTP_LIMITATION:
...@@ -313,8 +321,9 @@ class BmaConnector: ...@@ -313,8 +321,9 @@ class BmaConnector:
self._logger.debug(str(e)) self._logger.debug(str(e))
tries += 1 tries += 1
except AttributeError as e: except AttributeError as e:
if ("feed_appdata", "do_handshake") in str(e): if "feed_appdata" in str(e) or "do_handshake" in str(e):
self._logger.debug(str(e)) self._logger.debug(str(e))
raise NoPeerAvailable("", len(endpoints)) raise NoPeerAvailable("", len(endpoints))
async def get(self, currency, request, req_args={}, verify=True): async def get(self, currency, request, req_args={}, verify=True):
...@@ -355,11 +364,11 @@ class BmaConnector: ...@@ -355,11 +364,11 @@ class BmaConnector:
replies = [] replies = []
if len(endpoints) > 0: if len(endpoints) > 0:
async with aiohttp.ClientSession() as session:
for endpoint in endpoints: for endpoint in endpoints:
async with aiohttp.ClientSession() as session:
self._logger.debug("Trying to connect to: " + str(endpoint)) self._logger.debug("Trying to connect to: " + str(endpoint))
_client = client.Client( _client = client.Client(
endpoint, proxy=self._user_parameters.proxy() endpoint, session, proxy=self._user_parameters.proxy()
) )
reply = asyncio.ensure_future(_client(request, **req_args)) reply = asyncio.ensure_future(_client(request, **req_args))
replies.append(reply) replies.append(reply)
......
...@@ -54,9 +54,6 @@ class NodeConnector(QObject): ...@@ -54,9 +54,6 @@ class NodeConnector(QObject):
self._ws_tasks = {"block": None, "peer": None} self._ws_tasks = {"block": None, "peer": None}
self._connected = {"block": False, "peer": False} self._connected = {"block": False, "peer": False}
self._user_parameters = user_parameters self._user_parameters = user_parameters
if not session:
session = aiohttp.ClientSession()
self.session = session
self._raw_logger = logging.getLogger("sakia") self._raw_logger = logging.getLogger("sakia")
self._logger = NodeConnectorLoggerAdapter( self._logger = NodeConnectorLoggerAdapter(
self._raw_logger, {"pubkey": self.node.pubkey} self._raw_logger, {"pubkey": self.node.pubkey}
...@@ -80,9 +77,10 @@ class NodeConnector(QObject): ...@@ -80,9 +77,10 @@ class NodeConnector(QObject):
:rtype: sakia.core.net.Node :rtype: sakia.core.net.Node
""" """
endpoint = get_bma_endpoint_from_server_address(address, port, secured) endpoint = get_bma_endpoint_from_server_address(address, port, secured)
# Create Client from endpoint string in Duniter format
client = Client(endpoint, proxy=user_parameters.proxy())
async with aiohttp.ClientSession() as session:
# Create Client from endpoint string in Duniter format
client = Client(endpoint, session, proxy=user_parameters.proxy())
peer_data = client(bma.network.peering) peer_data = client(bma.network.peering)
peer = Peer.from_signed_raw( peer = Peer.from_signed_raw(
...@@ -130,8 +128,9 @@ class NodeConnector(QObject): ...@@ -130,8 +128,9 @@ class NodeConnector(QObject):
return cls(node, user_parameters, session=None) return cls(node, user_parameters, session=None)
async def safe_request(self, endpoint, request, proxy, req_args={}): async def safe_request(self, endpoint, request, proxy, req_args={}):
async with aiohttp.ClientSession() as session:
try: try:
client = Client(endpoint, self.session, proxy) client = Client(endpoint, session, proxy)
data = await client(request, **req_args) data = await client(request, **req_args)
return data return data
except errors.DuniterError as e: except errors.DuniterError as e:
...@@ -151,21 +150,14 @@ class NodeConnector(QObject): ...@@ -151,21 +150,14 @@ class NodeConnector(QObject):
except jsonschema.ValidationError as e: except jsonschema.ValidationError as e:
self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e))) self._logger.debug("{:}:{:}".format(str(e.__class__.__name__), str(e)))
self.handle_failure(weight=3) self.handle_failure(weight=3)
except RuntimeError: except RuntimeError as e:
if self.session.closed: self._logger.error(str(e))
pass
else:
raise
except AttributeError as e: except AttributeError as e:
if ("feed_appdata", "do_handshake") in str(e): if "feed_appdata" in str(e) or "do_handshake" in str(e):
self._logger.debug(str(e)) self._logger.debug(str(e))
else: else:
raise raise
async def init_session(self):
if not self.session:
self.session = aiohttp.ClientSession()
async def close_ws(self): async def close_ws(self):
for ws in self._ws_tasks.values(): for ws in self._ws_tasks.values():
if ws: if ws:
...@@ -179,7 +171,6 @@ class NodeConnector(QObject): ...@@ -179,7 +171,6 @@ class NodeConnector(QObject):
else: else:
closed = True closed = True
await asyncio.sleep(0) await asyncio.sleep(0)
await self.session.close()
await asyncio.sleep(0) await asyncio.sleep(0)
def refresh(self, manual=False): def refresh(self, manual=False):
...@@ -205,9 +196,10 @@ class NodeConnector(QObject): ...@@ -205,9 +196,10 @@ class NodeConnector(QObject):
""" """
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
if not self._connected["block"]: if not self._connected["block"]:
async with aiohttp.ClientSession() as session:
try: try:
client = Client( client = Client(
endpoint, self.session, self._user_parameters.proxy() endpoint, session, self._user_parameters.proxy()
) )
# Create Web Socket connection on block path (async method) # Create Web Socket connection on block path (async method)
...@@ -231,29 +223,25 @@ class NodeConnector(QObject): ...@@ -231,29 +223,25 @@ class NodeConnector(QObject):
self.handle_failure() self.handle_failure()
break break
# Close session
await client.close()
except (aiohttp.WSServerHandshakeError, ValueError) as e: except (aiohttp.WSServerHandshakeError, ValueError) as e:
self._logger.debug( self._logger.debug(
"Websocket block {0}: {1}".format(type(e).__name__, str(e)) "Websocket block {0}: {1}".format(type(e).__name__, str(e))
) )
self.handle_failure() self.handle_failure()
except (ClientError, gaierror, TimeoutError) as e: except (ClientError, gaierror, TimeoutError) as e:
self._logger.debug("{0}: {1}".format(str(e), self.node.pubkey[:5])) self._logger.debug(
"{0}: {1}".format(str(e), self.node.pubkey[:5])
)
self.handle_failure() self.handle_failure()
except jsonschema.ValidationError as e: except jsonschema.ValidationError as e:
self._logger.debug( self._logger.debug(
"{:}:{:}".format(str(e.__class__.__name__), str(e)) "{:}:{:}".format(str(e.__class__.__name__), str(e))
) )
self.handle_failure(weight=3) self.handle_failure(weight=3)
except RuntimeError: except RuntimeError as e:
if self.session.closed: self._logger.error(str(e))
pass
else:
raise
except AttributeError as e: except AttributeError as e:
if ("feed_appdata", "do_handshake") in str(e): if "feed_appdata" in str(e) or "do_handshake" in str(e):
self._logger.debug(str(e)) self._logger.debug(str(e))
else: else:
raise raise
...@@ -268,9 +256,10 @@ class NodeConnector(QObject): ...@@ -268,9 +256,10 @@ class NodeConnector(QObject):
""" """
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
if not self._connected["peer"]: if not self._connected["peer"]:
async with aiohttp.ClientSession() as session:
try: try:
client = Client( client = Client(
endpoint, self.session, self._user_parameters.proxy() endpoint, session, self._user_parameters.proxy()
) )
# Create Web Socket connection on peer path (async method) # Create Web Socket connection on peer path (async method)
...@@ -309,13 +298,10 @@ class NodeConnector(QObject): ...@@ -309,13 +298,10 @@ class NodeConnector(QObject):
"{:}:{:}".format(str(e.__class__.__name__), str(e)) "{:}:{:}".format(str(e.__class__.__name__), str(e))
) )
self.handle_failure(weight=3) self.handle_failure(weight=3)
except RuntimeError: except RuntimeError as e:
if self.session.closed: self._logger.error(str(e))
pass
else:
raise
except AttributeError as e: except AttributeError as e:
if ("feed_appdata", "do_handshake") in str(e): if "feed_appdata" in str(e) or "do_handshake" in str(e):
self._logger.debug(str(e)) self._logger.debug(str(e))
else: else:
raise raise
...@@ -327,6 +313,7 @@ class NodeConnector(QObject): ...@@ -327,6 +313,7 @@ class NodeConnector(QObject):
""" """
Refresh the list of peers knew by this node Refresh the list of peers knew by this node
""" """
found_peer_data = False
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
try: try:
peers_data = await self.safe_request( peers_data = await self.safe_request(
...@@ -354,8 +341,9 @@ class NodeConnector(QObject): ...@@ -354,8 +341,9 @@ class NodeConnector(QObject):
if not leaf_data: if not leaf_data:
break break
self.refresh_peer_data(leaf_data["leaf"]["value"]) self.refresh_peer_data(leaf_data["leaf"]["value"])
found_peer_data = True
except (AttributeError, ValueError) as e: except (AttributeError, ValueError) as e:
if ("feed_appdata", "do_handshake") in str(e): if "feed_appdata" in str(e) or "do_handshake" in str(e):
self._logger.debug(str(e)) self._logger.debug(str(e))
else: else:
self._logger.debug( self._logger.debug(
...@@ -386,9 +374,7 @@ class NodeConnector(QObject): ...@@ -386,9 +374,7 @@ class NodeConnector(QObject):
self._logger.debug("Error in peers reply: {0}".format(str(e))) self._logger.debug("Error in peers reply: {0}".format(str(e)))
self.handle_failure() self.handle_failure()
else: else:
if self.session.closed: if not found_peer_data:
pass
else:
self._logger.debug("Could not connect to any BMA endpoint") self._logger.debug("Could not connect to any BMA endpoint")
self.handle_failure() self.handle_failure()
...@@ -427,9 +413,6 @@ class NodeConnector(QObject): ...@@ -427,9 +413,6 @@ class NodeConnector(QObject):
except errors.DuniterError as e: except errors.DuniterError as e:
self._logger.debug("Error in peers reply: {0}".format(str(e))) self._logger.debug("Error in peers reply: {0}".format(str(e)))
self.handle_failure() self.handle_failure()
else:
if self.session.closed:
pass
else: else:
self._logger.debug("Could not connect to any BMA endpoint") self._logger.debug("Could not connect to any BMA endpoint")
self.handle_failure() self.handle_failure()
......
...@@ -61,27 +61,6 @@ class NetworkService(QObject): ...@@ -61,27 +61,6 @@ class NetworkService(QObject):
self._identities_service = identities_service self._identities_service = identities_service
self._discovery_loop_task = None self._discovery_loop_task = None
@classmethod
def create(cls, node_processor, node_connector):
"""
Create a new network with one knew node
Crawls the nodes from the first node to build the
community network
:param sakia.data.processors.NodeProcessor node_processor: The nodes processor
:param sakia.data.connectors.NodeConnector node_connector: The first connector of the network service
:return:
"""
connectors = [node_connector]
node_processor.insert_node(node_connector.node)
network = cls(
node_connector.node.currency,
node_processor,
connectors,
node_connector.session,
)
return network
@classmethod @classmethod
def load( def load(
cls, app, currency, node_processor, blockchain_service, identities_service cls, app, currency, node_processor, blockchain_service, identities_service
...@@ -176,7 +155,6 @@ class NetworkService(QObject): ...@@ -176,7 +155,6 @@ class NetworkService(QObject):
async def refresh_once(self): async def refresh_once(self):
for connector in self._connectors: for connector in self._connectors:
await asyncio.sleep(1) await asyncio.sleep(1)
await connector.init_session()
connector.refresh(manual=True) connector.refresh(manual=True)
async def discover_network(self): async def discover_network(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment