Skip to content
Snippets Groups Projects
Commit c229cb54 authored by inso's avatar inso
Browse files

Verify data against multiple nodes

parent d1e166b0
No related branches found
No related tags found
No related merge requests found
......@@ -5,4 +5,5 @@ git+https://github.com/hynek/attrs.git@master
git+https://github.com/duniter/duniter-python-api.git@dev
pytest
pytest-asyncio
dpath
git+https://github.com/Insoleet/mirage@master
......@@ -10,8 +10,92 @@ import asyncio
import random
import jsonschema
import attr
import dpath
import copy
def make_hash(o):
"""
Makes a hash from a dictionary, list, tuple or set to any level, that contains
only other hashable types (including any lists, tuples, sets, and
dictionaries).
"""
if isinstance(o, (set, tuple, list)):
return tuple([make_hash(e) for e in o])
elif not isinstance(o, dict):
return hash(o)
new_o = copy.deepcopy(o)
for k, v in new_o.items():
new_o[k] = make_hash(v)
return hash(tuple(frozenset(sorted(new_o.items()))))
def filter_endpoints(request, nodes):
def compare_versions(node, version):
if node.version and node.version != '':
try:
return parse_version(node.version) >= parse_version(version)
except TypeError:
return False
else:
return True
filters = {
bma.ud.history: lambda n: compare_versions(n, "0.11.0"),
bma.tx.history: lambda n: compare_versions(n, "0.11.0"),
bma.blockchain.membership: lambda n: compare_versions(n, "0.14")
}
if request in filters:
nodes = [n for n in nodes if filters[request](n)]
endpoints = []
for n in nodes:
endpoints += [e for e in n.endpoints if type(e) in (BMAEndpoint, SecuredBMAEndpoint)]
return endpoints
def _compare_json(first, second):
"""
Compare two json dicts
:param first: the first dictionnary
:param second: the second dictionnary
:return: True if the json dicts are the same
:rtype: bool
"""
def ordered(obj):
if isinstance(obj, dict):
try:
return sorted((k, ordered(v)) for k, v in obj.items())
except TypeError:
return obj
if isinstance(obj, list):
try:
return sorted(ordered(x) for x in obj)
except TypeError:
return obj
else:
return obj
return ordered(first) == ordered(second)
def _filter_blockchain_data(request, data):
"""
:param request:
:param dict data:
:return:
"""
include_only = {
bma.wot.lookup: "",
bma.tx.history: "/history/[send|received]"
}
if request in include_only:
data = dpath.util.search(data, include_only[request])
return data
@attr.s()
class BmaConnector:
"""
......@@ -21,51 +105,57 @@ class BmaConnector:
_user_parameters = attr.ib()
_logger = attr.ib(default=attr.Factory(lambda: logging.getLogger('sakia')))
def filter_endpoints(self, request, nodes):
def compare_versions(node, version):
if node.version and node.version != '':
try:
return parse_version(node.version) >= parse_version(version)
except TypeError:
return False
else:
return True
filters = {
bma.ud.history: lambda n: compare_versions(n, "0.11.0"),
bma.tx.history: lambda n: compare_versions(n, "0.11.0"),
bma.blockchain.membership: lambda n: compare_versions(n, "0.14")
}
if request in filters:
nodes = [n for n in nodes if filters[request](n)]
endpoints = []
for n in nodes:
endpoints += [e for e in n.endpoints if type(e) in (BMAEndpoint, SecuredBMAEndpoint)]
return endpoints
async def get(self, currency, request, req_args={}):
async def get(self, currency, request, req_args={}, verify=True):
"""
Start a request to the network but don't cache its result.
:param str currency: the currency requested
:param class request: A bma request class calling for data
:param dict req_args: Arguments to pass to the request constructor
:param dict get_args: Arguments to pass to the request __get__ method
:param bool verify: Verify returned value against multiple nodes
:return: The returned data
"""
endpoints = self.filter_endpoints(request, self._nodes_processor.synced_nodes(currency))
tries = 0
while tries < 3 and endpoints:
endpoint = random.choice(endpoints)
endpoints.remove(endpoint)
try:
self._logger.debug("Requesting {0} on endpoint {1}".format(str(request.__name__), str(endpoint)))
async with aiohttp.ClientSession() as session:
json_data = await request(endpoint.conn_handler(session, proxy=self._user_parameters.proxy()), **req_args)
return json_data
except (ClientError, ServerDisconnectedError, gaierror,
asyncio.TimeoutError, ValueError, jsonschema.ValidationError) as e:
self._logger.debug(str(e))
tries += 1
synced_nodes = self._nodes_processor.synced_nodes(currency)
nodes_generator = (n for n in synced_nodes)
answers = {}
answers_data = {}
nb_verification = min(max(1, 0.66*len(synced_nodes)), 10)
try:
# On chercher des reponses jusqu'a obtenir un nombre de noeuds d'accords de 1 à 66% des noeuds, max 10
while max([len(ans) for ans in answers.values()] + [0]) <= nb_verification:
node = next(nodes_generator)
endpoints = filter_endpoints(request, [node])
tries = 0
while tries < 3 and endpoints:
endpoint = random.choice(endpoints)
endpoints.remove(endpoint)
try:
self._logger.debug("Requesting {0} on endpoint {1}".format(str(request.__name__), str(endpoint)))
async with aiohttp.ClientSession() as session:
json_data = await request(endpoint.conn_handler(session, proxy=self._user_parameters.proxy()),
**req_args)
if verify:
filtered_data = _filter_blockchain_data(request, json_data)
data_hash = make_hash(filtered_data)
answers_data[data_hash] = json_data
if data_hash not in answers:
answers[data_hash] = [node]
else:
answers[data_hash].append(node)
break
else:
return json_data
except (ClientError, ServerDisconnectedError, gaierror,
asyncio.TimeoutError, ValueError, jsonschema.ValidationError) as e:
self._logger.debug(str(e))
tries += 1
except StopIteration:
pass
for dict_hash in answers:
if len(answers[dict_hash]) >= nb_verification:
return answers_data[dict_hash]
raise NoPeerAvailable("", len(endpoints))
async def broadcast(self, currency, request, req_args={}):
......@@ -82,7 +172,7 @@ class BmaConnector:
.. note:: If one node accept the requests (returns 200),
the broadcast should be considered accepted by the network.
"""
filtered_endpoints = self.filter_endpoints(request, self._nodes_processor.synced_nodes(currency))
filtered_endpoints = filter_endpoints(request, self._nodes_processor.synced_nodes(currency))
endpoints = random.sample(filtered_endpoints, 6) if len(filtered_endpoints) > 6 else filtered_endpoints
replies = []
......
......@@ -287,7 +287,7 @@ class NodeConnector(QObject):
try:
data = await self.safe_request(endpoint, bma.wot.lookup,
proxy=self._user_parameters.proxy(),
req_args={'search':self.node.pubkey})
req_args={'search': self.node.pubkey})
if not data:
continue
self.node.state = Node.ONLINE
......@@ -303,6 +303,7 @@ class NodeConnector(QObject):
if self.node.uid != uid:
self.node.uid = uid
self.identity_changed.emit()
break
except errors.DuniterError as e:
if e.ucode == errors.NO_MATCHING_IDENTITY:
self._logger.debug("UID not found : {0}".format(self.node.pubkey[:5]))
......@@ -310,6 +311,7 @@ class NodeConnector(QObject):
self._logger.debug("error in uid reply : {0}".format(self.node.pubkey[:5]))
self.node.state = Node.OFFLINE
self.identity_changed.emit()
break
else:
self._logger.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5]))
self.node.state = Node.OFFLINE
......
......@@ -6,6 +6,7 @@ from ..connectors import BmaConnector
from duniterpy.api import bma
from duniterpy.documents import Transaction
import sqlite3
import asyncio
@attr.s
......@@ -66,6 +67,7 @@ class DividendsProcessor:
if input.source == "D" and input.origin_id == identity.pubkey and input.index not in block_numbers:
block = await self._bma_connector.get(identity.currency,
bma.blockchain.block, req_args={'number': input.index})
await asyncio.sleep(0.5)
dividend = Dividend(currency=identity.currency,
pubkey=identity.pubkey,
......
......@@ -67,7 +67,7 @@ class IdentitiesService(QObject):
"""
try:
search = await self._bma_connector.get(self.currency, bma.blockchain.memberships,
{'search': identity.pubkey})
req_args={'search': identity.pubkey})
blockstamp = BlockUID.empty()
membership_data = None
......@@ -98,7 +98,8 @@ class IdentitiesService(QObject):
"""
certifications = []
try:
data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of, {'search': identity.pubkey})
data = await self._bma_connector.get(self.currency, bma.wot.certifiers_of,
{'search': identity.pubkey})
for certifier_data in data['certifications']:
cert = Certification(currency=self.currency,
certified=data["pubkey"],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment