diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index 6c3f8bbb18984018055fe6236fc7eb3cdeb6cba1..75ec387bfff4db700be9a15312ee0a1921a60dd9 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -36,7 +36,7 @@ class NodeConnector(QObject): 'peer': None} self._connected = {'block': False, 'peer': False} - self._session = session + self.session = session self._refresh_counter = 1 self._logger = logging.getLogger('sakia') @@ -45,12 +45,8 @@ class NodeConnector(QObject): if ws: ws.cancel() - @property - def session(self): - return self._session - @classmethod - async def from_address(cls, currency, secured, address, port, session): + async def from_address(cls, currency, secured, address, port): """ Factory method to get a node from a given address :param str currency: The node currency. None if we don't know\ @@ -64,6 +60,7 @@ class NodeConnector(QObject): """ http_scheme = "https" if secured else "http" ws_scheme = "ws" if secured else "wss" + session = aiohttp.ClientSession() peer_data = await bma.network.peering(ConnectionHandler(http_scheme, ws_scheme, address, port, session)) peer = Peer.from_signed_raw("{0}{1}\n".format(peer_data['raw'], @@ -78,7 +75,7 @@ class NodeConnector(QObject): return cls(node, session) @classmethod - def from_peer(cls, currency, peer, session): + def from_peer(cls, currency, peer): """ Factory method to get a node from a peer document. :param str currency: The node currency. None if we don't know\ @@ -93,11 +90,11 @@ class NodeConnector(QObject): node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID) logging.getLogger('sakia').debug("Node from peer : {:}".format(str(node))) - return cls(node, session) + return cls(node, None) async def safe_request(self, endpoint, request, req_args={}): try: - conn_handler = endpoint.conn_handler(self._session) + conn_handler = endpoint.conn_handler(self.session) data = await request(conn_handler, **req_args) return data except (ClientError, gaierror, TimeoutError, ConnectionRefusedError, DisconnectedError, ValueError) as e: @@ -108,6 +105,10 @@ class NodeConnector(QObject): self._logger.debug("Validation error : {0}".format(self.node.pubkey[:5])) self.node.state = Node.CORRUPTED + async def init_session(self): + if not self.session: + self.session = aiohttp.ClientSession() + async def close_ws(self): for ws in self._ws_tasks.values(): if ws: @@ -122,6 +123,7 @@ class NodeConnector(QObject): else: closed = True await asyncio.sleep(0) + await self.session.close() await asyncio.sleep(0) def refresh(self, manual=False): @@ -153,7 +155,7 @@ class NodeConnector(QObject): for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: if not self._connected['block']: try: - conn_handler = endpoint.conn_handler(self._session) + conn_handler = endpoint.conn_handler(self.session) ws_connection = bma.ws.block(conn_handler) async with ws_connection as ws: self._connected['block'] = True @@ -219,7 +221,7 @@ class NodeConnector(QObject): self.node.state = Node.ONLINE if not self.node.current_buid or self.node.current_buid.sha_hash != block_data['hash']: for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: - conn_handler = endpoint.conn_handler() + conn_handler = endpoint.conn_handler(self.session) self._logger.debug("Requesting {0}".format(conn_handler)) try: previous_block = await self.safe_request(endpoint, bma.blockchain.block, @@ -315,7 +317,7 @@ class NodeConnector(QObject): for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: if not self._connected['peer']: try: - conn_handler = endpoint.conn_handler(self._session) + conn_handler = endpoint.conn_handler(self.session) ws_connection = bma.ws.peer(conn_handler) async with ws_connection as ws: self._connected['peer'] = True diff --git a/src/sakia/data/processors/connections.py b/src/sakia/data/processors/connections.py index db1200523c84f41448dc3113d7bb9b818e58f3e8..b6d785e678b529ebbfb88c9764332e26d5b03355 100644 --- a/src/sakia/data/processors/connections.py +++ b/src/sakia/data/processors/connections.py @@ -32,5 +32,8 @@ class ConnectionsProcessor: def connections(self): return self._connections_repo.get_all() + def connections_to(self, currency): + return self._connections_repo.get_all(currency=currency) + def currencies(self): return self._connections_repo.get_currencies() diff --git a/src/sakia/services/blockchain.py b/src/sakia/services/blockchain.py index 9d8264ee9e447e7322767eec413ef80c3ac20fb1..1de1e7f8b546e8d2049c9e4a879594b2c2c8282c 100644 --- a/src/sakia/services/blockchain.py +++ b/src/sakia/services/blockchain.py @@ -1,6 +1,8 @@ from PyQt5.QtCore import QObject import math import logging +from duniterpy.api.errors import DuniterError +from sakia.errors import NoPeerAvailable class BlockchainService(QObject): @@ -32,12 +34,15 @@ class BlockchainService(QObject): """ Handle a new current block uid """ - with_identities = await self._blockchain_processor.new_blocks_with_identities(self.currency) - with_money = await self._blockchain_processor.new_blocks_with_money(self.currency) - blocks = await self._blockchain_processor.blocks(with_identities + with_money, self.currency) - await self._identities_service.handle_new_blocks(blocks) - await self._transactions_service.handle_new_blocks(blocks) - self.app.db.commit() + try: + with_identities = await self._blockchain_processor.new_blocks_with_identities(self.currency) + with_money = await self._blockchain_processor.new_blocks_with_money(self.currency) + blocks = await self._blockchain_processor.blocks(with_identities + with_money, self.currency) + await self._identities_service.handle_new_blocks(blocks) + await self._transactions_service.handle_new_blocks(blocks) + self.app.db.commit() + except (NoPeerAvailable, DuniterError) as e: + self._logger.debug(str(e)) def current_buid(self): return self._blockchain_processor.current_buid(self.currency) diff --git a/src/sakia/services/identities.py b/src/sakia/services/identities.py index e6c3f2c998b3b1eed16d370d6cf59e360efa2f1f..56ef48b3a1dadec806b26a43e685541cbac09808 100644 --- a/src/sakia/services/identities.py +++ b/src/sakia/services/identities.py @@ -53,7 +53,7 @@ class IdentitiesService(QObject): return blockchain_time - cert_time < parameters.sig_window * parameters.avg_gen_time def _get_connections_identities(self): - connections = self._connections_processor.connections(self.currency) + connections = self._connections_processor.connections_to(self.currency) identities = [] for c in connections: identities.append(self._identities_processor.get_identity(self.currency, c.pubkey)) diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index fd49ca190bff19b9e723610c784669743f173649..b5cc0a3fd0ab334a09a416bd2214aa3523e5c764 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -42,7 +42,6 @@ class NetworkService(QObject): self.currency = currency self._must_crawl = False self._block_found = self._processor.current_buid(self.currency) - self._client_session = session self._discovery_stack = [] self._blockchain_service = blockchain_service self._discovery_loop_task = None @@ -109,15 +108,8 @@ class NetworkService(QObject): self._logger.debug("Closing {0} websockets".format(len(close_tasks))) if len(close_tasks) > 0: await asyncio.wait(close_tasks, timeout=15) - if closing: - self._logger.debug("Closing client session") - await self._client_session.close() self._logger.debug("Closed") - @property - def session(self): - return self._client_session - def continue_crawling(self): return self._must_crawl @@ -174,6 +166,7 @@ class NetworkService(QObject): async def refresh_once(self): for connector in self._connectors: await asyncio.sleep(1) + await connector.init_session() connector.refresh(manual=True) async def discover_network(self): @@ -181,16 +174,13 @@ class NetworkService(QObject): Start crawling which never stops. To stop this crawling, call "stop_crawling" method. """ - session = aiohttp.ClientSession() - for connector in self._connectors: - connector.session = session - self._must_crawl = True first_loop = True asyncio.ensure_future(self.discovery_loop()) while self.continue_crawling(): for connector in self._connectors: if self.continue_crawling(): + await connector.init_session() connector.refresh() if not first_loop: await asyncio.sleep(15) @@ -211,8 +201,9 @@ class NetworkService(QObject): if self._processor.unknown_node(self.currency, peer.pubkey): self._logger.debug("New node found : {0}".format(peer.pubkey[:5])) try: - connector = NodeConnector.from_peer(self.currency, peer, self.session) + connector = NodeConnector.from_peer(self.currency, peer) self._processor.insert_node(connector.node) + await connector.init_session() connector.refresh(manual=True) self.add_connector(connector) self.nodes_changed.emit()