diff --git a/requirements.txt b/requirements.txt index 75ee25c58bd5cd81d6ca2abee42b9aec27ecc9bd..24c1ad02f0722b8dbd9854932a82947511203d73 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ git+https://github.com/hynek/attrs.git@master git+https://github.com/duniter/duniter-python-api.git@dev pytest pytest-asyncio +dpath git+https://github.com/Insoleet/mirage@master diff --git a/src/sakia/data/connectors/bma.py b/src/sakia/data/connectors/bma.py index 6058a14570ea2a4ece9ce57356998ec6061b7483..a7e4ab63e3bb84477c8388471f8f993b85663b22 100644 --- a/src/sakia/data/connectors/bma.py +++ b/src/sakia/data/connectors/bma.py @@ -10,8 +10,92 @@ import asyncio import random import jsonschema import attr +import dpath +import copy +def make_hash(o): + """ + Makes a hash from a dictionary, list, tuple or set to any level, that contains + only other hashable types (including any lists, tuples, sets, and + dictionaries). + """ + + if isinstance(o, (set, tuple, list)): + return tuple([make_hash(e) for e in o]) + elif not isinstance(o, dict): + return hash(o) + + new_o = copy.deepcopy(o) + for k, v in new_o.items(): + new_o[k] = make_hash(v) + + return hash(tuple(frozenset(sorted(new_o.items())))) + + +def filter_endpoints(request, nodes): + def compare_versions(node, version): + if node.version and node.version != '': + try: + return parse_version(node.version) >= parse_version(version) + except TypeError: + return False + else: + return True + filters = { + bma.ud.history: lambda n: compare_versions(n, "0.11.0"), + bma.tx.history: lambda n: compare_versions(n, "0.11.0"), + bma.blockchain.membership: lambda n: compare_versions(n, "0.14") + } + if request in filters: + nodes = [n for n in nodes if filters[request](n)] + endpoints = [] + for n in nodes: + endpoints += [e for e in n.endpoints if type(e) in (BMAEndpoint, SecuredBMAEndpoint)] + return endpoints + + +def _compare_json(first, second): + """ + Compare two json dicts + :param first: the first dictionnary + :param second: the second dictionnary + :return: True if the json dicts are the same + :rtype: bool + """ + + def ordered(obj): + if isinstance(obj, dict): + try: + return sorted((k, ordered(v)) for k, v in obj.items()) + except TypeError: + return obj + if isinstance(obj, list): + try: + return sorted(ordered(x) for x in obj) + except TypeError: + return obj + else: + return obj + + return ordered(first) == ordered(second) + + +def _filter_blockchain_data(request, data): + """ + + :param request: + :param dict data: + :return: + """ + include_only = { + bma.wot.lookup: "", + bma.tx.history: "/history/[send|received]" + } + if request in include_only: + data = dpath.util.search(data, include_only[request]) + return data + @attr.s() class BmaConnector: """ @@ -21,51 +105,57 @@ class BmaConnector: _user_parameters = attr.ib() _logger = attr.ib(default=attr.Factory(lambda: logging.getLogger('sakia'))) - def filter_endpoints(self, request, nodes): - def compare_versions(node, version): - if node.version and node.version != '': - try: - return parse_version(node.version) >= parse_version(version) - except TypeError: - return False - else: - return True - filters = { - bma.ud.history: lambda n: compare_versions(n, "0.11.0"), - bma.tx.history: lambda n: compare_versions(n, "0.11.0"), - bma.blockchain.membership: lambda n: compare_versions(n, "0.14") - } - if request in filters: - nodes = [n for n in nodes if filters[request](n)] - endpoints = [] - for n in nodes: - endpoints += [e for e in n.endpoints if type(e) in (BMAEndpoint, SecuredBMAEndpoint)] - return endpoints - - async def get(self, currency, request, req_args={}): + async def get(self, currency, request, req_args={}, verify=True): """ Start a request to the network but don't cache its result. :param str currency: the currency requested :param class request: A bma request class calling for data :param dict req_args: Arguments to pass to the request constructor - :param dict get_args: Arguments to pass to the request __get__ method + :param bool verify: Verify returned value against multiple nodes :return: The returned data """ - endpoints = self.filter_endpoints(request, self._nodes_processor.synced_nodes(currency)) - tries = 0 - while tries < 3 and endpoints: - endpoint = random.choice(endpoints) - endpoints.remove(endpoint) - try: - self._logger.debug("Requesting {0} on endpoint {1}".format(str(request.__name__), str(endpoint))) - async with aiohttp.ClientSession() as session: - json_data = await request(endpoint.conn_handler(session, proxy=self._user_parameters.proxy()), **req_args) - return json_data - except (ClientError, ServerDisconnectedError, gaierror, - asyncio.TimeoutError, ValueError, jsonschema.ValidationError) as e: - self._logger.debug(str(e)) - tries += 1 + synced_nodes = self._nodes_processor.synced_nodes(currency) + nodes_generator = (n for n in synced_nodes) + answers = {} + answers_data = {} + nb_verification = min(max(1, 0.66*len(synced_nodes)), 10) + try: + # On chercher des reponses jusqu'a obtenir un nombre de noeuds d'accords de 1 Ã 66% des noeuds, max 10 + while max([len(ans) for ans in answers.values()] + [0]) <= nb_verification: + node = next(nodes_generator) + endpoints = filter_endpoints(request, [node]) + tries = 0 + while tries < 3 and endpoints: + endpoint = random.choice(endpoints) + endpoints.remove(endpoint) + try: + self._logger.debug("Requesting {0} on endpoint {1}".format(str(request.__name__), str(endpoint))) + async with aiohttp.ClientSession() as session: + json_data = await request(endpoint.conn_handler(session, proxy=self._user_parameters.proxy()), + **req_args) + if verify: + filtered_data = _filter_blockchain_data(request, json_data) + data_hash = make_hash(filtered_data) + answers_data[data_hash] = json_data + if data_hash not in answers: + answers[data_hash] = [node] + else: + answers[data_hash].append(node) + break + else: + return json_data + except (ClientError, ServerDisconnectedError, gaierror, + asyncio.TimeoutError, ValueError, jsonschema.ValidationError) as e: + self._logger.debug(str(e)) + tries += 1 + except StopIteration: + pass + + for dict_hash in answers: + if len(answers[dict_hash]) >= nb_verification: + return answers_data[dict_hash] + raise NoPeerAvailable("", len(endpoints)) async def broadcast(self, currency, request, req_args={}): @@ -82,7 +172,7 @@ class BmaConnector: .. note:: If one node accept the requests (returns 200), the broadcast should be considered accepted by the network. """ - filtered_endpoints = self.filter_endpoints(request, self._nodes_processor.synced_nodes(currency)) + filtered_endpoints = filter_endpoints(request, self._nodes_processor.synced_nodes(currency)) endpoints = random.sample(filtered_endpoints, 6) if len(filtered_endpoints) > 6 else filtered_endpoints replies = [] diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index 34cce487f7c4858496c9bef78951d1be1ca99f23..21dda26115a64dbd63915ba00426a4890ca433fc 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -287,7 +287,7 @@ class NodeConnector(QObject): try: data = await self.safe_request(endpoint, bma.wot.lookup, proxy=self._user_parameters.proxy(), - req_args={'search':self.node.pubkey}) + req_args={'search': self.node.pubkey}) if not data: continue self.node.state = Node.ONLINE @@ -303,6 +303,7 @@ class NodeConnector(QObject): if self.node.uid != uid: self.node.uid = uid self.identity_changed.emit() + break except errors.DuniterError as e: if e.ucode == errors.NO_MATCHING_IDENTITY: self._logger.debug("UID not found : {0}".format(self.node.pubkey[:5])) @@ -310,6 +311,7 @@ class NodeConnector(QObject): self._logger.debug("error in uid reply : {0}".format(self.node.pubkey[:5])) self.node.state = Node.OFFLINE self.identity_changed.emit() + break else: self._logger.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) self.node.state = Node.OFFLINE diff --git a/src/sakia/data/processors/dividends.py b/src/sakia/data/processors/dividends.py index d10f7c7a3b3610fc73cca1eefabdafdd91e7b3f0..d2fddcac70a28646696e9d1bcb8071b19ab262d9 100644 --- a/src/sakia/data/processors/dividends.py +++ b/src/sakia/data/processors/dividends.py @@ -6,6 +6,7 @@ from ..connectors import BmaConnector from duniterpy.api import bma from duniterpy.documents import Transaction import sqlite3 +import asyncio @attr.s @@ -66,6 +67,7 @@ class DividendsProcessor: if input.source == "D" and input.origin_id == identity.pubkey and input.index not in block_numbers: block = await self._bma_connector.get(identity.currency, bma.blockchain.block, req_args={'number': input.index}) + await asyncio.sleep(0.5) dividend = Dividend(currency=identity.currency, pubkey=identity.pubkey, diff --git a/src/sakia/services/identities.py b/src/sakia/services/identities.py index dffeb88c9da22779e86c79425cb0efbbc912a878..b43be8256bc90fa39864a1c1407f9aef898fcc4b 100644 --- a/src/sakia/services/identities.py +++ b/src/sakia/services/identities.py @@ -67,7 +67,7 @@ class IdentitiesService(QObject): """ try: search = await self._bma_connector.get(self.currency, bma.blockchain.memberships, - {'search': identity.pubkey}) + req_args={'search': identity.pubkey}) blockstamp = BlockUID.empty() membership_data = None @@ -98,7 +98,8 @@ class IdentitiesService(QObject): """ certifications = [] try: - data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of, {'search': identity.pubkey}) + data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of, + {'search': identity.pubkey}) for certifier_data in data['certifications']: cert = Certification(currency=self.currency, certified=data["pubkey"],