From 6f6e78809fd8b68d0e64672c3a0cf11cf304b0fe Mon Sep 17 00:00:00 2001 From: inso <insomniak.fr@gmaiL.com> Date: Tue, 13 Mar 2018 20:05:45 +0100 Subject: [PATCH] Faster WoT loading --- src/sakia/data/connectors/bma.py | 7 ++--- src/sakia/data/graphs/base_graph.py | 47 ++++++---------------------- src/sakia/data/graphs/wot_graph.py | 17 +++++----- src/sakia/data/repositories/nodes.py | 24 ++++++++------ src/sakia/services/identities.py | 40 ++++++++++++++++------- src/sakia/services/network.py | 44 +++++++++++++------------- 6 files changed, 84 insertions(+), 95 deletions(-) diff --git a/src/sakia/data/connectors/bma.py b/src/sakia/data/connectors/bma.py index 9aa648f8..071bd1b0 100644 --- a/src/sakia/data/connectors/bma.py +++ b/src/sakia/data/connectors/bma.py @@ -180,10 +180,8 @@ class BmaConnector: return e async def verified_get(self, currency, request, req_args): - synced_nodes = self._nodes_processor.synced_members_nodes(currency) - if not synced_nodes: - # If no node is known as a member, lookup synced nodes as a fallback - synced_nodes = self._nodes_processor.synced_nodes(currency) + # If no node is known as a member, lookup synced nodes as a fallback + synced_nodes = self._nodes_processor.synced_nodes(currency) nodes_generator = (n for n in synced_nodes) answers = {} answers_data = {} @@ -241,7 +239,6 @@ class BmaConnector: else: return _best_answer(answers, answers_data, nb_verification) - nodes = self._nodes_processor.nodes(currency) raise NoPeerAvailable("", len(synced_nodes)) async def simple_get(self, currency, request, req_args): diff --git a/src/sakia/data/graphs/base_graph.py b/src/sakia/data/graphs/base_graph.py index 63456e26..f6355963 100644 --- a/src/sakia/data/graphs/base_graph.py +++ b/src/sakia/data/graphs/base_graph.py @@ -51,7 +51,7 @@ class BaseGraph(QObject): else: return EdgeStatus.STRONG - async def node_status(self, node_identity): + def node_status(self, node_identity): """ Return the status of the node depending :param sakia.core.registry.Identity node_identity: The identity of the node @@ -60,23 +60,6 @@ class BaseGraph(QObject): """ # new node node_status = NodeStatus.NEUTRAL - node_identity = await self.identities_service.load_requirements(node_identity) - if node_identity.pubkey in self._connections_processor.pubkeys(): - node_status += NodeStatus.HIGHLIGHTED - if node_identity.member is False: - node_status += NodeStatus.OUT - return node_status - - def offline_node_status(self, node_identity): - """ - Return the status of the node depending on its requirements. No network request. - :param sakia.core.registry.Identity node_identity: The identity of the node - :param sakia.core.registry.Identity account_identity: The identity of the account displayed - :return: HIGHLIGHTED if node_identity is account_identity and OUT if the node_identity is not a member - :rtype: sakia.core.graph.constants.NodeStatus - """ - # new node - node_status = NodeStatus.NEUTRAL if node_identity.pubkey in self._connections_processor.pubkeys(): node_status += NodeStatus.HIGHLIGHTED if node_identity.member is False: @@ -167,7 +150,7 @@ class BaseGraph(QObject): for certification in tuple(certifier_list): certifier = self.identities_service.get_identity(certification.certifier) if certifier: - node_status = self.offline_node_status(certifier) + node_status = self.node_status(certifier) self.add_certifier_node(certifier, identity, certification, node_status) def add_offline_certified_list(self, certified_list, identity): @@ -181,10 +164,10 @@ class BaseGraph(QObject): for certification in tuple(certified_list): certified = self.identities_service.get_identity(certification.certified) if certified: - node_status = self.offline_node_status(certified) + node_status = self.node_status(certified) self.add_certified_node(identity, certified, certification, node_status) - async def add_certifier_list(self, certifier_list, identity): + def add_certifier_list(self, certifier_list, identity): """ Add list of certifiers to graph :param List[sakia.data.entities.Certification] certifier_list: List of certified from api @@ -193,21 +176,16 @@ class BaseGraph(QObject): """ try: # add certifiers of uid - for certification in tuple(certifier_list): + for certification in certifier_list: certifier = self.identities_service.get_identity(certification.certifier) - futures = [self.node_status(certifier)] - if not certifier: - futures.append(self.identities_service.find_from_pubkey(certification.certifier)) - results = await asyncio.gather(*futures) - node_status = results[0] if not certifier: - certifier = results[1] - self.identities_service.insert_or_update_identity(certifier) + certifier = certifier_list[certification] + node_status = self.node_status(certifier) self.add_certifier_node(certifier, identity, certification, node_status) except NoPeerAvailable as e: logging.debug(str(e)) - async def add_certified_list(self, certified_list, identity): + def add_certified_list(self, certified_list, identity): """ Add list of certified from api to graph :param List[sakia.data.entities.Certification] certified_list: List of certified from api @@ -218,14 +196,9 @@ class BaseGraph(QObject): # add certified by uid for certification in tuple(certified_list): certified = self.identities_service.get_identity(certification.certified) - futures = [self.node_status(certified)] - if not certified: - futures.append(self.identities_service.find_from_pubkey(certification.certified)) - results = await asyncio.gather(*futures) - node_status = results[0] if not certified: - certified = results[1] - self.identities_service.insert_or_update_identity(certified) + certified = certified_list[certification] + node_status = self.node_status(certified) self.add_certified_node(identity, certified, certification, node_status) except NoPeerAvailable as e: diff --git a/src/sakia/data/graphs/wot_graph.py b/src/sakia/data/graphs/wot_graph.py index 58f9b81c..5c94e63e 100644 --- a/src/sakia/data/graphs/wot_graph.py +++ b/src/sakia/data/graphs/wot_graph.py @@ -18,30 +18,27 @@ class WoTGraph(BaseGraph): async def initialize(self, center_identity): self.nx_graph.clear() - node_status = await self.node_status(center_identity) + node_status = self.node_status(center_identity) self.add_identity(center_identity, node_status) # create Identity from node metadata - certifier_coro = asyncio.ensure_future(self.identities_service.load_certifiers_of(center_identity)) - certified_coro = asyncio.ensure_future(self.identities_service.load_certified_by(center_identity)) + certifier_coro = self.identities_service.load_certifiers_of(center_identity) + certified_coro = self.identities_service.load_certified_by(center_identity) certifier_list, certified_list = await asyncio.gather(*[certifier_coro, certified_coro]) - certified_list, certified_list = await self.identities_service.load_certs_in_lookup(center_identity, + certifier_list, certified_list = await self.identities_service.load_certs_in_lookup(center_identity, certifier_list, certified_list) # populate graph with certifiers-of - certifier_coro = asyncio.ensure_future(self.add_certifier_list(certifier_list, center_identity)) + self.add_certifier_list(certifier_list, center_identity) # populate graph with certified-by - certified_coro = asyncio.ensure_future(self.add_certified_list(certified_list, center_identity)) - - await asyncio.gather(*[certifier_coro, certified_coro], return_exceptions=True) - await asyncio.sleep(0) + self.add_certified_list(certified_list, center_identity) def offline_init(self, center_identity): - node_status = self.offline_node_status(center_identity) + node_status = self.node_status(center_identity) self.add_identity(center_identity, node_status) diff --git a/src/sakia/data/repositories/nodes.py b/src/sakia/data/repositories/nodes.py index ee02104c..c670a81d 100644 --- a/src/sakia/data/repositories/nodes.py +++ b/src/sakia/data/repositories/nodes.py @@ -151,16 +151,22 @@ class NodesRepo: return [] def current_buid(self, currency): - c = self._conn.execute("""SELECT `current_buid`, - COUNT(`current_buid`) AS `value_occurrence` - FROM `nodes` - WHERE member == 1 AND currency == ? - GROUP BY `current_buid` - ORDER BY `value_occurrence` DESC - LIMIT 1;""", (currency,)) + c = self._conn.execute("""SELECT COUNT(`uid`) + FROM `nodes` + WHERE member == 1 AND currency == ? + LIMIT 1;""", (currency,)) data = c.fetchone() - if data: - return block_uid(data[0]) + if data and data[0] > 3: + c = self._conn.execute("""SELECT `current_buid`, + COUNT(`current_buid`) AS `value_occurrence` + FROM `nodes` + WHERE member == 1 AND currency == ? + GROUP BY `current_buid` + ORDER BY `value_occurrence` DESC + LIMIT 1;""", (currency,)) + data = c.fetchone() + if data: + return block_uid(data[0]) else: c = self._conn.execute("""SELECT `current_buid`, COUNT(`current_buid`) AS `value_occurrence` diff --git a/src/sakia/services/identities.py b/src/sakia/services/identities.py index 197f6757..f4888ce1 100644 --- a/src/sakia/services/identities.py +++ b/src/sakia/services/identities.py @@ -3,7 +3,7 @@ import asyncio from duniterpy.api import bma, errors from duniterpy.documents import BlockUID, block_uid from sakia.errors import NoPeerAvailable -from sakia.data.entities import Certification +from sakia.data.entities import Certification, Identity import logging @@ -119,8 +119,8 @@ class IdentitiesService(QObject): async def load_certs_in_lookup(self, identity, certifiers, certified): """ :param sakia.data.entities.Identity identity: the identity - :param list[sakia.data.entities.Certification] certifiers: the list of certifiers got in /wot/certifiers-of - :param list[sakia.data.entities.Certification] certified: the list of certified got in /wot/certified-by + :param dict[sakia.data.entities.Certification] certifiers: the list of certifiers got in /wot/certifiers-of + :param dict[sakia.data.entities.Certification] certified: the list of certified got in /wot/certified-by """ try: lookup_data = await self._bma_connector.get(self.currency, bma.wot.lookup, @@ -137,10 +137,14 @@ class IdentitiesService(QObject): block=other_data["meta"]["block_number"], timestamp=0, signature=other_data['signature']) + certifier = Identity(currency=self.currency, + pubkey=other_data["pubkey"], + uid=other_data["uids"][0], + member=other_data["isMember"]) if cert not in certifiers: cert.timestamp = await self._blockchain_processor.timestamp(self.currency, cert.block) - certifiers.append(cert) + certifiers[cert] = certifier # We save connections pubkeys if self.is_identity_of_connection(identity): self._certs_processor.insert_or_update_certification(cert) @@ -151,8 +155,12 @@ class IdentitiesService(QObject): block=signed_data["cert_time"]["block"], timestamp=0, signature=signed_data['signature']) + certified_idty = Identity(currency=self.currency, + pubkey=signed_data["pubkey"], + uid=signed_data["uid"], + member=signed_data["isMember"]) if cert not in certified: - certified.append(cert) + certified[cert] = certified_idty # We save connections pubkeys if self.is_identity_of_connection(identity): cert.timestamp = await self._blockchain_processor.timestamp(self.currency, @@ -170,7 +178,7 @@ class IdentitiesService(QObject): It does nothing if the identity is already written and updated with blockchain lookups :param sakia.data.entities.Identity identity: the identity """ - certifications = [] + certifications = {} try: data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of, {'search': identity.pubkey}) @@ -181,9 +189,13 @@ class IdentitiesService(QObject): block=certifier_data["cert_time"]["block"], timestamp=certifier_data["cert_time"]["medianTime"], signature=certifier_data['signature']) + certifier = Identity(currency=self.currency, + pubkey=certifier_data["pubkey"], + uid=certifier_data["uid"], + member=certifier_data["isMember"]) if certifier_data['written']: cert.written_on = certifier_data['written']['number'] - certifications.append(cert) + certifications[cert] = certifier # We save connections pubkeys if identity.pubkey in self._connections_processor.pubkeys(): self._certs_processor.insert_or_update_certification(cert) @@ -207,7 +219,7 @@ class IdentitiesService(QObject): It does nothing if the identity is already written and updated with blockchain lookups :param sakia.data.entities.Identity identity: the identity """ - certifications = [] + certifications = {} try: data = await self._bma_connector.get(self.currency, bma.wot.certified_by, {'search': identity.pubkey}) for certified_data in data['certifications']: @@ -217,9 +229,13 @@ class IdentitiesService(QObject): block=certified_data["cert_time"]["block"], timestamp=certified_data["cert_time"]["medianTime"], signature=certified_data['signature']) + certified = Identity(currency=self.currency, + pubkey=certified_data["pubkey"], + uid=certified_data["uid"], + member=certified_data["isMember"]) if certified_data['written']: cert.written_on = certified_data['written']['number'] - certifications.append(cert) + certifications[cert] = certified # We save connections pubkeys if identity.pubkey in self._connections_processor.pubkeys(): self._certs_processor.insert_or_update_certification(cert) @@ -241,8 +257,8 @@ class IdentitiesService(QObject): """ Initialize certifications to and from a given identity :param sakia.data.entities.Identity identity: - :param function log_stream: Logger function - :param function progress: Progress function for progress bar + :param callable log_stream: Logger function + :param callable progress: Progress function for progress bar """ if log_stream: log_stream("Requesting certifiers of data") @@ -260,7 +276,7 @@ class IdentitiesService(QObject): log_stream("Requesting identities of certifications") identities = [] i = 0 - nb_certs = len(certified + certifiers) + nb_certs = len(certified.keys() + certifiers.keys()) for cert in certifiers: if log_stream: log_stream("Requesting identity... {0}/{1}".format(i, nb_certs)) diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index eae35138..f4e10d75 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -199,33 +199,33 @@ class NetworkService(QObject): await asyncio.sleep(2) else: node, updated = self._processor.update_peer(self.currency, peer) - if peer.blockUID.number + 2400 > self.current_buid().number: - if not node: - self._logger.debug("New node found : {0}".format(peer.pubkey[:5])) - try: - connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) - node = connector.node - self._processor.insert_node(connector.node) - self.new_node_found.emit(node) - except InvalidNodeCurrency as e: - self._logger.debug(str(e)) - if self._blockchain_service.initialized(): - try: - identity = await self._identities_service.find_from_pubkey(node.pubkey) - identity = await self._identities_service.load_requirements(identity) - node.member = identity.member - node.uid = identity.uid - self._processor.update_node(node) - self._processor.handle_success(node) - self.node_changed.emit(node) - except errors.DuniterError as e: - self._logger.error(e.message) + if not node: + self._logger.debug("New node found : {0}".format(peer.pubkey[:5])) + try: + connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) + node = connector.node + self._processor.insert_node(connector.node) + self.new_node_found.emit(node) + except InvalidNodeCurrency as e: + self._logger.debug(str(e)) + if self._blockchain_service.initialized(): + try: + identity = await self._identities_service.find_from_pubkey(node.pubkey) + identity = await self._identities_service.load_requirements(identity) + node.member = identity.member + node.uid = identity.uid + self._processor.update_node(node) + self._processor.handle_success(node) + self.node_changed.emit(node) + except errors.DuniterError as e: + self._logger.error(e.message) def handle_new_node(self, peer): key = VerifyingKey(peer.pubkey) if key.verify_document(peer): if len(self._discovery_stack) < 1000 \ - and peer.signatures[0] not in [p.signatures[0] for p in self._discovery_stack]: + and peer.blockUID.number + 2400 > self.current_buid().number \ + and peer.signatures not in [p.signatures[0] for p in self._discovery_stack]: self._logger.debug("Stacking new peer document : {0}".format(peer.pubkey)) self._discovery_stack.append(peer) else: -- GitLab