Skip to content
Snippets Groups Projects
Commit 6f6e7880 authored by inso's avatar inso
Browse files

Faster WoT loading

parent 02c1c6b4
No related branches found
No related tags found
No related merge requests found
...@@ -180,10 +180,8 @@ class BmaConnector: ...@@ -180,10 +180,8 @@ class BmaConnector:
return e return e
async def verified_get(self, currency, request, req_args): async def verified_get(self, currency, request, req_args):
synced_nodes = self._nodes_processor.synced_members_nodes(currency) # If no node is known as a member, lookup synced nodes as a fallback
if not synced_nodes: synced_nodes = self._nodes_processor.synced_nodes(currency)
# If no node is known as a member, lookup synced nodes as a fallback
synced_nodes = self._nodes_processor.synced_nodes(currency)
nodes_generator = (n for n in synced_nodes) nodes_generator = (n for n in synced_nodes)
answers = {} answers = {}
answers_data = {} answers_data = {}
...@@ -241,7 +239,6 @@ class BmaConnector: ...@@ -241,7 +239,6 @@ class BmaConnector:
else: else:
return _best_answer(answers, answers_data, nb_verification) return _best_answer(answers, answers_data, nb_verification)
nodes = self._nodes_processor.nodes(currency)
raise NoPeerAvailable("", len(synced_nodes)) raise NoPeerAvailable("", len(synced_nodes))
async def simple_get(self, currency, request, req_args): async def simple_get(self, currency, request, req_args):
......
...@@ -51,7 +51,7 @@ class BaseGraph(QObject): ...@@ -51,7 +51,7 @@ class BaseGraph(QObject):
else: else:
return EdgeStatus.STRONG return EdgeStatus.STRONG
async def node_status(self, node_identity): def node_status(self, node_identity):
""" """
Return the status of the node depending Return the status of the node depending
:param sakia.core.registry.Identity node_identity: The identity of the node :param sakia.core.registry.Identity node_identity: The identity of the node
...@@ -60,23 +60,6 @@ class BaseGraph(QObject): ...@@ -60,23 +60,6 @@ class BaseGraph(QObject):
""" """
# new node # new node
node_status = NodeStatus.NEUTRAL node_status = NodeStatus.NEUTRAL
node_identity = await self.identities_service.load_requirements(node_identity)
if node_identity.pubkey in self._connections_processor.pubkeys():
node_status += NodeStatus.HIGHLIGHTED
if node_identity.member is False:
node_status += NodeStatus.OUT
return node_status
def offline_node_status(self, node_identity):
"""
Return the status of the node depending on its requirements. No network request.
:param sakia.core.registry.Identity node_identity: The identity of the node
:param sakia.core.registry.Identity account_identity: The identity of the account displayed
:return: HIGHLIGHTED if node_identity is account_identity and OUT if the node_identity is not a member
:rtype: sakia.core.graph.constants.NodeStatus
"""
# new node
node_status = NodeStatus.NEUTRAL
if node_identity.pubkey in self._connections_processor.pubkeys(): if node_identity.pubkey in self._connections_processor.pubkeys():
node_status += NodeStatus.HIGHLIGHTED node_status += NodeStatus.HIGHLIGHTED
if node_identity.member is False: if node_identity.member is False:
...@@ -167,7 +150,7 @@ class BaseGraph(QObject): ...@@ -167,7 +150,7 @@ class BaseGraph(QObject):
for certification in tuple(certifier_list): for certification in tuple(certifier_list):
certifier = self.identities_service.get_identity(certification.certifier) certifier = self.identities_service.get_identity(certification.certifier)
if certifier: if certifier:
node_status = self.offline_node_status(certifier) node_status = self.node_status(certifier)
self.add_certifier_node(certifier, identity, certification, node_status) self.add_certifier_node(certifier, identity, certification, node_status)
def add_offline_certified_list(self, certified_list, identity): def add_offline_certified_list(self, certified_list, identity):
...@@ -181,10 +164,10 @@ class BaseGraph(QObject): ...@@ -181,10 +164,10 @@ class BaseGraph(QObject):
for certification in tuple(certified_list): for certification in tuple(certified_list):
certified = self.identities_service.get_identity(certification.certified) certified = self.identities_service.get_identity(certification.certified)
if certified: if certified:
node_status = self.offline_node_status(certified) node_status = self.node_status(certified)
self.add_certified_node(identity, certified, certification, node_status) self.add_certified_node(identity, certified, certification, node_status)
async def add_certifier_list(self, certifier_list, identity): def add_certifier_list(self, certifier_list, identity):
""" """
Add list of certifiers to graph Add list of certifiers to graph
:param List[sakia.data.entities.Certification] certifier_list: List of certified from api :param List[sakia.data.entities.Certification] certifier_list: List of certified from api
...@@ -193,21 +176,16 @@ class BaseGraph(QObject): ...@@ -193,21 +176,16 @@ class BaseGraph(QObject):
""" """
try: try:
# add certifiers of uid # add certifiers of uid
for certification in tuple(certifier_list): for certification in certifier_list:
certifier = self.identities_service.get_identity(certification.certifier) certifier = self.identities_service.get_identity(certification.certifier)
futures = [self.node_status(certifier)]
if not certifier:
futures.append(self.identities_service.find_from_pubkey(certification.certifier))
results = await asyncio.gather(*futures)
node_status = results[0]
if not certifier: if not certifier:
certifier = results[1] certifier = certifier_list[certification]
self.identities_service.insert_or_update_identity(certifier) node_status = self.node_status(certifier)
self.add_certifier_node(certifier, identity, certification, node_status) self.add_certifier_node(certifier, identity, certification, node_status)
except NoPeerAvailable as e: except NoPeerAvailable as e:
logging.debug(str(e)) logging.debug(str(e))
async def add_certified_list(self, certified_list, identity): def add_certified_list(self, certified_list, identity):
""" """
Add list of certified from api to graph Add list of certified from api to graph
:param List[sakia.data.entities.Certification] certified_list: List of certified from api :param List[sakia.data.entities.Certification] certified_list: List of certified from api
...@@ -218,14 +196,9 @@ class BaseGraph(QObject): ...@@ -218,14 +196,9 @@ class BaseGraph(QObject):
# add certified by uid # add certified by uid
for certification in tuple(certified_list): for certification in tuple(certified_list):
certified = self.identities_service.get_identity(certification.certified) certified = self.identities_service.get_identity(certification.certified)
futures = [self.node_status(certified)]
if not certified:
futures.append(self.identities_service.find_from_pubkey(certification.certified))
results = await asyncio.gather(*futures)
node_status = results[0]
if not certified: if not certified:
certified = results[1] certified = certified_list[certification]
self.identities_service.insert_or_update_identity(certified) node_status = self.node_status(certified)
self.add_certified_node(identity, certified, certification, node_status) self.add_certified_node(identity, certified, certification, node_status)
except NoPeerAvailable as e: except NoPeerAvailable as e:
......
...@@ -18,30 +18,27 @@ class WoTGraph(BaseGraph): ...@@ -18,30 +18,27 @@ class WoTGraph(BaseGraph):
async def initialize(self, center_identity): async def initialize(self, center_identity):
self.nx_graph.clear() self.nx_graph.clear()
node_status = await self.node_status(center_identity) node_status = self.node_status(center_identity)
self.add_identity(center_identity, node_status) self.add_identity(center_identity, node_status)
# create Identity from node metadata # create Identity from node metadata
certifier_coro = asyncio.ensure_future(self.identities_service.load_certifiers_of(center_identity)) certifier_coro = self.identities_service.load_certifiers_of(center_identity)
certified_coro = asyncio.ensure_future(self.identities_service.load_certified_by(center_identity)) certified_coro = self.identities_service.load_certified_by(center_identity)
certifier_list, certified_list = await asyncio.gather(*[certifier_coro, certified_coro]) certifier_list, certified_list = await asyncio.gather(*[certifier_coro, certified_coro])
certified_list, certified_list = await self.identities_service.load_certs_in_lookup(center_identity, certifier_list, certified_list = await self.identities_service.load_certs_in_lookup(center_identity,
certifier_list, certifier_list,
certified_list) certified_list)
# populate graph with certifiers-of # populate graph with certifiers-of
certifier_coro = asyncio.ensure_future(self.add_certifier_list(certifier_list, center_identity)) self.add_certifier_list(certifier_list, center_identity)
# populate graph with certified-by # populate graph with certified-by
certified_coro = asyncio.ensure_future(self.add_certified_list(certified_list, center_identity)) self.add_certified_list(certified_list, center_identity)
await asyncio.gather(*[certifier_coro, certified_coro], return_exceptions=True)
await asyncio.sleep(0)
def offline_init(self, center_identity): def offline_init(self, center_identity):
node_status = self.offline_node_status(center_identity) node_status = self.node_status(center_identity)
self.add_identity(center_identity, node_status) self.add_identity(center_identity, node_status)
......
...@@ -151,16 +151,22 @@ class NodesRepo: ...@@ -151,16 +151,22 @@ class NodesRepo:
return [] return []
def current_buid(self, currency): def current_buid(self, currency):
c = self._conn.execute("""SELECT `current_buid`, c = self._conn.execute("""SELECT COUNT(`uid`)
COUNT(`current_buid`) AS `value_occurrence` FROM `nodes`
FROM `nodes` WHERE member == 1 AND currency == ?
WHERE member == 1 AND currency == ? LIMIT 1;""", (currency,))
GROUP BY `current_buid`
ORDER BY `value_occurrence` DESC
LIMIT 1;""", (currency,))
data = c.fetchone() data = c.fetchone()
if data: if data and data[0] > 3:
return block_uid(data[0]) c = self._conn.execute("""SELECT `current_buid`,
COUNT(`current_buid`) AS `value_occurrence`
FROM `nodes`
WHERE member == 1 AND currency == ?
GROUP BY `current_buid`
ORDER BY `value_occurrence` DESC
LIMIT 1;""", (currency,))
data = c.fetchone()
if data:
return block_uid(data[0])
else: else:
c = self._conn.execute("""SELECT `current_buid`, c = self._conn.execute("""SELECT `current_buid`,
COUNT(`current_buid`) AS `value_occurrence` COUNT(`current_buid`) AS `value_occurrence`
......
...@@ -3,7 +3,7 @@ import asyncio ...@@ -3,7 +3,7 @@ import asyncio
from duniterpy.api import bma, errors from duniterpy.api import bma, errors
from duniterpy.documents import BlockUID, block_uid from duniterpy.documents import BlockUID, block_uid
from sakia.errors import NoPeerAvailable from sakia.errors import NoPeerAvailable
from sakia.data.entities import Certification from sakia.data.entities import Certification, Identity
import logging import logging
...@@ -119,8 +119,8 @@ class IdentitiesService(QObject): ...@@ -119,8 +119,8 @@ class IdentitiesService(QObject):
async def load_certs_in_lookup(self, identity, certifiers, certified): async def load_certs_in_lookup(self, identity, certifiers, certified):
""" """
:param sakia.data.entities.Identity identity: the identity :param sakia.data.entities.Identity identity: the identity
:param list[sakia.data.entities.Certification] certifiers: the list of certifiers got in /wot/certifiers-of :param dict[sakia.data.entities.Certification] certifiers: the list of certifiers got in /wot/certifiers-of
:param list[sakia.data.entities.Certification] certified: the list of certified got in /wot/certified-by :param dict[sakia.data.entities.Certification] certified: the list of certified got in /wot/certified-by
""" """
try: try:
lookup_data = await self._bma_connector.get(self.currency, bma.wot.lookup, lookup_data = await self._bma_connector.get(self.currency, bma.wot.lookup,
...@@ -137,10 +137,14 @@ class IdentitiesService(QObject): ...@@ -137,10 +137,14 @@ class IdentitiesService(QObject):
block=other_data["meta"]["block_number"], block=other_data["meta"]["block_number"],
timestamp=0, timestamp=0,
signature=other_data['signature']) signature=other_data['signature'])
certifier = Identity(currency=self.currency,
pubkey=other_data["pubkey"],
uid=other_data["uids"][0],
member=other_data["isMember"])
if cert not in certifiers: if cert not in certifiers:
cert.timestamp = await self._blockchain_processor.timestamp(self.currency, cert.timestamp = await self._blockchain_processor.timestamp(self.currency,
cert.block) cert.block)
certifiers.append(cert) certifiers[cert] = certifier
# We save connections pubkeys # We save connections pubkeys
if self.is_identity_of_connection(identity): if self.is_identity_of_connection(identity):
self._certs_processor.insert_or_update_certification(cert) self._certs_processor.insert_or_update_certification(cert)
...@@ -151,8 +155,12 @@ class IdentitiesService(QObject): ...@@ -151,8 +155,12 @@ class IdentitiesService(QObject):
block=signed_data["cert_time"]["block"], block=signed_data["cert_time"]["block"],
timestamp=0, timestamp=0,
signature=signed_data['signature']) signature=signed_data['signature'])
certified_idty = Identity(currency=self.currency,
pubkey=signed_data["pubkey"],
uid=signed_data["uid"],
member=signed_data["isMember"])
if cert not in certified: if cert not in certified:
certified.append(cert) certified[cert] = certified_idty
# We save connections pubkeys # We save connections pubkeys
if self.is_identity_of_connection(identity): if self.is_identity_of_connection(identity):
cert.timestamp = await self._blockchain_processor.timestamp(self.currency, cert.timestamp = await self._blockchain_processor.timestamp(self.currency,
...@@ -170,7 +178,7 @@ class IdentitiesService(QObject): ...@@ -170,7 +178,7 @@ class IdentitiesService(QObject):
It does nothing if the identity is already written and updated with blockchain lookups It does nothing if the identity is already written and updated with blockchain lookups
:param sakia.data.entities.Identity identity: the identity :param sakia.data.entities.Identity identity: the identity
""" """
certifications = [] certifications = {}
try: try:
data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of, data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of,
{'search': identity.pubkey}) {'search': identity.pubkey})
...@@ -181,9 +189,13 @@ class IdentitiesService(QObject): ...@@ -181,9 +189,13 @@ class IdentitiesService(QObject):
block=certifier_data["cert_time"]["block"], block=certifier_data["cert_time"]["block"],
timestamp=certifier_data["cert_time"]["medianTime"], timestamp=certifier_data["cert_time"]["medianTime"],
signature=certifier_data['signature']) signature=certifier_data['signature'])
certifier = Identity(currency=self.currency,
pubkey=certifier_data["pubkey"],
uid=certifier_data["uid"],
member=certifier_data["isMember"])
if certifier_data['written']: if certifier_data['written']:
cert.written_on = certifier_data['written']['number'] cert.written_on = certifier_data['written']['number']
certifications.append(cert) certifications[cert] = certifier
# We save connections pubkeys # We save connections pubkeys
if identity.pubkey in self._connections_processor.pubkeys(): if identity.pubkey in self._connections_processor.pubkeys():
self._certs_processor.insert_or_update_certification(cert) self._certs_processor.insert_or_update_certification(cert)
...@@ -207,7 +219,7 @@ class IdentitiesService(QObject): ...@@ -207,7 +219,7 @@ class IdentitiesService(QObject):
It does nothing if the identity is already written and updated with blockchain lookups It does nothing if the identity is already written and updated with blockchain lookups
:param sakia.data.entities.Identity identity: the identity :param sakia.data.entities.Identity identity: the identity
""" """
certifications = [] certifications = {}
try: try:
data = await self._bma_connector.get(self.currency, bma.wot.certified_by, {'search': identity.pubkey}) data = await self._bma_connector.get(self.currency, bma.wot.certified_by, {'search': identity.pubkey})
for certified_data in data['certifications']: for certified_data in data['certifications']:
...@@ -217,9 +229,13 @@ class IdentitiesService(QObject): ...@@ -217,9 +229,13 @@ class IdentitiesService(QObject):
block=certified_data["cert_time"]["block"], block=certified_data["cert_time"]["block"],
timestamp=certified_data["cert_time"]["medianTime"], timestamp=certified_data["cert_time"]["medianTime"],
signature=certified_data['signature']) signature=certified_data['signature'])
certified = Identity(currency=self.currency,
pubkey=certified_data["pubkey"],
uid=certified_data["uid"],
member=certified_data["isMember"])
if certified_data['written']: if certified_data['written']:
cert.written_on = certified_data['written']['number'] cert.written_on = certified_data['written']['number']
certifications.append(cert) certifications[cert] = certified
# We save connections pubkeys # We save connections pubkeys
if identity.pubkey in self._connections_processor.pubkeys(): if identity.pubkey in self._connections_processor.pubkeys():
self._certs_processor.insert_or_update_certification(cert) self._certs_processor.insert_or_update_certification(cert)
...@@ -241,8 +257,8 @@ class IdentitiesService(QObject): ...@@ -241,8 +257,8 @@ class IdentitiesService(QObject):
""" """
Initialize certifications to and from a given identity Initialize certifications to and from a given identity
:param sakia.data.entities.Identity identity: :param sakia.data.entities.Identity identity:
:param function log_stream: Logger function :param callable log_stream: Logger function
:param function progress: Progress function for progress bar :param callable progress: Progress function for progress bar
""" """
if log_stream: if log_stream:
log_stream("Requesting certifiers of data") log_stream("Requesting certifiers of data")
...@@ -260,7 +276,7 @@ class IdentitiesService(QObject): ...@@ -260,7 +276,7 @@ class IdentitiesService(QObject):
log_stream("Requesting identities of certifications") log_stream("Requesting identities of certifications")
identities = [] identities = []
i = 0 i = 0
nb_certs = len(certified + certifiers) nb_certs = len(certified.keys() + certifiers.keys())
for cert in certifiers: for cert in certifiers:
if log_stream: if log_stream:
log_stream("Requesting identity... {0}/{1}".format(i, nb_certs)) log_stream("Requesting identity... {0}/{1}".format(i, nb_certs))
......
...@@ -199,33 +199,33 @@ class NetworkService(QObject): ...@@ -199,33 +199,33 @@ class NetworkService(QObject):
await asyncio.sleep(2) await asyncio.sleep(2)
else: else:
node, updated = self._processor.update_peer(self.currency, peer) node, updated = self._processor.update_peer(self.currency, peer)
if peer.blockUID.number + 2400 > self.current_buid().number: if not node:
if not node: self._logger.debug("New node found : {0}".format(peer.pubkey[:5]))
self._logger.debug("New node found : {0}".format(peer.pubkey[:5])) try:
try: connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters)
connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) node = connector.node
node = connector.node self._processor.insert_node(connector.node)
self._processor.insert_node(connector.node) self.new_node_found.emit(node)
self.new_node_found.emit(node) except InvalidNodeCurrency as e:
except InvalidNodeCurrency as e: self._logger.debug(str(e))
self._logger.debug(str(e)) if self._blockchain_service.initialized():
if self._blockchain_service.initialized(): try:
try: identity = await self._identities_service.find_from_pubkey(node.pubkey)
identity = await self._identities_service.find_from_pubkey(node.pubkey) identity = await self._identities_service.load_requirements(identity)
identity = await self._identities_service.load_requirements(identity) node.member = identity.member
node.member = identity.member node.uid = identity.uid
node.uid = identity.uid self._processor.update_node(node)
self._processor.update_node(node) self._processor.handle_success(node)
self._processor.handle_success(node) self.node_changed.emit(node)
self.node_changed.emit(node) except errors.DuniterError as e:
except errors.DuniterError as e: self._logger.error(e.message)
self._logger.error(e.message)
def handle_new_node(self, peer): def handle_new_node(self, peer):
key = VerifyingKey(peer.pubkey) key = VerifyingKey(peer.pubkey)
if key.verify_document(peer): if key.verify_document(peer):
if len(self._discovery_stack) < 1000 \ if len(self._discovery_stack) < 1000 \
and peer.signatures[0] not in [p.signatures[0] for p in self._discovery_stack]: and peer.blockUID.number + 2400 > self.current_buid().number \
and peer.signatures not in [p.signatures[0] for p in self._discovery_stack]:
self._logger.debug("Stacking new peer document : {0}".format(peer.pubkey)) self._logger.debug("Stacking new peer document : {0}".format(peer.pubkey))
self._discovery_stack.append(peer) self._discovery_stack.append(peer)
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment