Skip to content
Snippets Groups Projects
Commit 3a9c7831 authored by inso's avatar inso
Browse files

Processors and Connectors

parent c2b91d8f
No related branches found
No related tags found
No related merge requests found
from PyQt5.QtCore import QObject, pyqtSlot
from duniterpy.api import bma
from duniterpy.api import errors
from .....tools.exceptions import NoPeerAvailable
from ..... import __version__
import logging
from aiohttp.errors import ClientError, ServerDisconnectedError
import asyncio
import random
from socket import gaierror
import jsonschema
from pkg_resources import parse_version
import copy
class BmaAccess(QObject):
"""
This class is used to access BMA API.
"""
__saved_requests = [str(bma.blockchain.Block), str(bma.blockchain.Parameters)]
def __init__(self, data, network):
"""
Constructor of a network
:param dict data: The data present in this cache
:param sakia.core.net.network.Network network: The network used to connect
"""
super().__init__()
self._data = data
self._rollback_to = None
self._pending_requests = {}
self._network = network
@classmethod
def create(cls, network):
"""
Initialize a new BMAAccess object with empty data.
:param sakia.core.net.network.Network network:
:return: A new BmaAccess object
:rtype: sakia.core.net.api.bma.access.BmaAccess
"""
return cls({}, network)
@property
def data(self):
return self._data.copy()
def load_from_json(self, json_data):
"""
Put data in the cache from json datas.
:param dict data: The cache in json format
"""
data = {}
for entry in json_data['entries']:
key = entry['key']
cache_key = (key[0], key[1], key[2], key[3], key[4])
data[cache_key] = entry['value']
self._data = data
self._rollback_to = json_data['rollback']
def jsonify(self):
"""
Get the cache in json format
:return: The cache as a dict in json format
"""
data = {k: self._data[k] for k in self._data.keys()}
entries = []
for d in data:
entries.append({'key': d,
'value': data[d]})
return {'rollback': self._rollback_to,
'entries': entries}
@staticmethod
def _gen_cache_key(request, req_args, get_args):
return (str(request),
str(tuple(frozenset(sorted(req_args.keys())))),
str(tuple(frozenset(sorted(req_args.values())))),
str(tuple(frozenset(sorted(get_args.keys())))),
str(tuple(frozenset(sorted(get_args.values())))))
def _compare_json(self, first, second):
"""
Compare two json dicts
:param first: the first dictionnary
:param second: the second dictionnary
:return: True if the json dicts are the same
:rtype: bool
"""
def ordered(obj):
if isinstance(obj, dict):
try:
return sorted((k, ordered(v)) for k, v in obj.items())
except TypeError:
return obj
if isinstance(obj, list):
try:
return sorted(ordered(x) for x in obj)
except TypeError:
return obj
else:
return obj
return ordered(first) == ordered(second)
def _get_from_cache(self, request, req_args, get_args):
"""
Get data from the cache
:param request: The requested data
:param cache_key: The key
:rtype: tuple[bool, dict]
"""
cache_key = BmaAccess._gen_cache_key(request, req_args, get_args)
if cache_key in self._data.keys():
cached_data = self._data[cache_key]
need_reload = True
# If we detected a rollback
# We reload if we don't know if this block changed or not
if self._rollback_to:
if request is bma.blockchain.Block:
if get_args["number"] >= self._rollback_to:
need_reload = True
if request is bma.blockchain.Parameters and self._rollback_to == 0:
need_reload = True
elif str(request) in BmaAccess.__saved_requests \
or cached_data['metadata']['block_hash'] == self._network.current_blockUID.sha_hash:
need_reload = False
ret_data = copy.deepcopy(cached_data['value'])
else:
need_reload = True
ret_data = None
return need_reload, ret_data
def _update_rollback(self, request, req_args, get_args, data):
"""
Update the rollback
If the request is a bma/blockchain/Block, we check if
the hash answered is the same as our hash, in which case,
we know that the rollback didn't reset blocks before this one
:param class request: A bma request class calling for data
:param dict req_args: Arguments to pass to the request constructor
:param dict get_args: Arguments to pass to the request __get__ method
:param dict data: Json data got from the blockchain
"""
if self._rollback_to and request is bma.blockchain.Block:
if get_args['number'] >= self._rollback_to:
cache_key = BmaAccess._gen_cache_key(request, req_args, get_args)
if cache_key in self._data and self._data[cache_key]['value']['hash'] == data['hash']:
self._rollback_to = get_args['number']
def _update_cache(self, request, req_args, get_args, data):
"""
Update data in cache and returns True if cached data changed
:param class request: A bma request class calling for data
:param dict req_args: Arguments to pass to the request constructor
:param dict get_args: Arguments to pass to the request __get__ method
:param dict data: Json data to save in cache
:return: True if data changed
:rtype: bool
"""
self._update_rollback(request, req_args, get_args, data)
cache_key = BmaAccess._gen_cache_key(request, req_args, get_args)
if cache_key not in self._data:
self._data[cache_key] = {'metadata': {},
'value': {}}
self._data[cache_key]['metadata']['block_number'] = self._network.current_blockUID.number
self._data[cache_key]['metadata']['block_hash'] = self._network.current_blockUID.sha_hash
self._data[cache_key]['metadata']['sakia_version'] = __version__
if not self._compare_json(self._data[cache_key]['value'], data):
self._data[cache_key]['value'] = copy.deepcopy(data)
return True
return False
def _invalidate_cache(self, post_request):
"""
Invalidate data depending on posted request
:param class post_request: The posted request
"""
invalidated = {bma.wot.Add: bma.wot.Lookup}
if post_request in invalidated:
invalidated_cache = self._data.copy()
for data in self._data:
if data[0] == str(invalidated[post_request]):
invalidated_cache.pop(data)
self._data = invalidated_cache
def rollback(self):
"""
When a rollback is detected, we move the rollback cursor to 0
"""
self._rollback_to = 0
def filter_nodes(self, request, nodes):
def compare_versions(node, version):
if node.version and node.version != '':
try:
return parse_version(node.version) >= parse_version(version)
except TypeError:
return False
else:
return True
filters = {
bma.ud.History: lambda n: compare_versions(n, "0.11.0"),
bma.tx.History: lambda n: compare_versions(n, "0.11.0"),
bma.blockchain.Membership: lambda n: compare_versions(n, "0.14")
}
if request in filters:
return [n for n in nodes if filters[request](n)]
else:
return nodes
async def future_request(self, request, req_args={}, get_args={}):
"""
Start a request to the network and returns a future.
:param class request: A bma request class calling for data
:param dict req_args: Arguments to pass to the request constructor
:param dict get_args: Arguments to pass to the request __get__ method
:return: The future data
:rtype: dict
"""
data = self._get_from_cache(request, req_args, get_args)
need_reload = data[0]
json_data = data[1]
nodes = self.filter_nodes(request, self._network.synced_nodes)
if need_reload and len(nodes) > 0:
tries = 0
while tries < 3:
node = random.choice(nodes)
conn_handler = node.endpoint.conn_handler()
req = request(conn_handler, **req_args)
try:
json_data = await req.get(**get_args, session=self._network.session)
self._update_cache(request, req_args, get_args, json_data)
return json_data
except (ClientError, ServerDisconnectedError, gaierror, asyncio.TimeoutError, ValueError) as e:
tries += 1
except jsonschema.ValidationError as e:
logging.debug(str(e))
tries += 1
if len(nodes) == 0 or json_data is None:
raise NoPeerAvailable("", len(nodes))
return json_data
async def simple_request(self, request, req_args={}, get_args={}):
"""
Start a request to the network but don't cache its result.
:param class request: A bma request class calling for data
:param dict req_args: Arguments to pass to the request constructor
:param dict get_args: Arguments to pass to the request __get__ method
:return: The returned data
"""
nodes = self.filter_nodes(request, self._network.synced_nodes)
if len(nodes) > 0:
node = random.choice(nodes)
req = request(node.endpoint.conn_handler(), **req_args)
tries = 0
json_data = None
while tries < 3:
try:
json_data = await req.get(**get_args, session=self._network.session)
return json_data
except (ClientError, ServerDisconnectedError, gaierror, asyncio.TimeoutError, ValueError) as e:
tries += 1
#except jsonschema.ValidationError as e:
# logging.debug(str(e))
# tries += 1
if len(nodes) == 0 or not json_data:
raise NoPeerAvailable("", len(nodes))
return json_data
async def broadcast(self, request, req_args={}, post_args={}):
"""
Broadcast data to a network.
Sends the data to all knew nodes.
:param request: A duniterpy bma request class
:param req_args: Arguments to pass to the request constructor
:param post_args: Arguments to pass to the request __post__ method
:return: All nodes replies
:rtype: tuple of aiohttp replies
.. note:: If one node accept the requests (returns 200),
the broadcast should be considered accepted by the network.
"""
nodes = random.sample(self._network.synced_nodes, 6) \
if len(self._network.synced_nodes) > 6 \
else self._network.synced_nodes
replies = []
if len(nodes) > 0:
for node in nodes:
logging.debug("Trying to connect to : " + node.pubkey)
conn_handler = node.endpoint.conn_handler()
req = request(conn_handler, **req_args)
reply = asyncio.ensure_future(req.post(**post_args, session=self._network.session))
replies.append(reply)
self._invalidate_cache(request)
else:
raise NoPeerAvailable("", len(nodes))
try:
result = await asyncio.gather(*replies)
return tuple(result)
except (ClientError, ServerDisconnectedError, gaierror, asyncio.TimeoutError, ValueError) as e:
pass
return ()
from duniterpy.api import bma
import logging
from aiohttp.errors import ClientError, ServerDisconnectedError
import asyncio
import random
from socket import gaierror
import jsonschema
from pkg_resources import parse_version
import attr
from sakia.errors import NoPeerAvailable
@attr.s
class BmaConnector:
"""
This class is used to access BMA API.
"""
__saved_requests = [str(bma.blockchain.Block), str(bma.blockchain.Parameters)]
_nodes_processor = attr.ib()
def filter_nodes(self, request, nodes):
def compare_versions(node, version):
if node.version and node.version != '':
try:
return parse_version(node.version) >= parse_version(version)
except TypeError:
return False
else:
return True
filters = {
bma.ud.History: lambda n: compare_versions(n, "0.11.0"),
bma.tx.History: lambda n: compare_versions(n, "0.11.0"),
bma.blockchain.Membership: lambda n: compare_versions(n, "0.14")
}
if request in filters:
return [n for n in nodes if filters[request](n)]
else:
return nodes
async def get(self, request, req_args={}, get_args={}):
"""
Start a request to the network but don't cache its result.
:param class request: A bma request class calling for data
:param dict req_args: Arguments to pass to the request constructor
:param dict get_args: Arguments to pass to the request __get__ method
:return: The returned data
"""
nodes = self.filter_nodes(request, self._network.synced_nodes)
if len(nodes) > 0:
tries = 0
json_data = None
while tries < 3:
node = random.choice(nodes)
nodes.pop(node)
req = request(node.endpoint.conn_handler(), **req_args)
try:
json_data = await req.get(**get_args, session=self._network.session)
return json_data
except (ClientError, ServerDisconnectedError, gaierror,
asyncio.TimeoutError, ValueError, jsonschema.ValidationError) as e:
logging.debug(str(e))
tries += 1
if len(nodes) == 0 or not json_data:
raise NoPeerAvailable("", len(nodes))
return json_data
async def broadcast(self, request, req_args={}, post_args={}):
"""
Broadcast data to a network.
Sends the data to all knew nodes.
:param request: A duniterpy bma request class
:param req_args: Arguments to pass to the request constructor
:param post_args: Arguments to pass to the request __post__ method
:return: All nodes replies
:rtype: tuple of aiohttp replies
.. note:: If one node accept the requests (returns 200),
the broadcast should be considered accepted by the network.
"""
nodes = random.sample(self._network.synced_nodes, 6) \
if len(self._network.synced_nodes) > 6 \
else self._network.synced_nodes
replies = []
if len(nodes) > 0:
for node in nodes:
logging.debug("Trying to connect to : " + node.pubkey)
conn_handler = node.endpoint.conn_handler()
req = request(conn_handler, **req_args)
reply = asyncio.ensure_future(req.post(**post_args, session=self._network.session))
replies.append(reply)
self._invalidate_cache(request)
else:
raise NoPeerAvailable("", len(nodes))
try:
result = await asyncio.gather(*replies)
return tuple(result)
except (ClientError, ServerDisconnectedError, gaierror, asyncio.TimeoutError, ValueError) as e:
logging.debug(str(e))
return ()
import attr
from duniterpy.documents import block_uid
from duniterpy.documents import block_uid, BlockUID
@attr.s(slots=True)
@attr.s()
class Identity:
currency = attr.ib(convert=str)
pubkey = attr.ib(convert=str)
uid = attr.ib(convert=str)
blockstamp = attr.ib(convert=block_uid)
signature = attr.ib(convert=str)
timestamp = attr.ib(convert=int)
revoked = attr.ib(validator=attr.validators.instance_of(bool))
member = attr.ib(validator=attr.validators.instance_of(bool))
membership_buid = attr.ib(convert=block_uid)
membership_timestamp = attr.ib(convert=int)
uid = attr.ib(convert=str, default="")
blockstamp = attr.ib(convert=block_uid, default=BlockUID.empty())
signature = attr.ib(convert=str, default="", cmp=False)
timestamp = attr.ib(convert=int, default=0, cmp=False)
written = attr.ib(validator=attr.validators.instance_of(bool), default=False, cmp=False)
revoked = attr.ib(validator=attr.validators.instance_of(bool), default=False, cmp=False)
member = attr.ib(validator=attr.validators.instance_of(bool), default=False, cmp=False)
membership_buid = attr.ib(convert=block_uid, default=BlockUID.empty(), cmp=False)
membership_timestamp = attr.ib(convert=int, default=0, cmp=False)
import attr
from ..entities import Identity
from duniterpy.api import bma, errors
import asyncio
from aiohttp.errors import ClientError
from sakia.errors import NoPeerAvailable
@attr.s
class IdentityProcessor:
_repo = attr.ib() # :type sakia.data.repositories.IdentitiesRepo
_bma_connector = attr.ib() # :type sakia.data.connectors.bma.BmaConnector
async def find_from_pubkey(self, currency, pubkey):
"""
Get the list of identities corresponding to a pubkey
from the network and the local db
:param currency:
:param pubkey:
:rtype: list[sakia.data.entities.Identity]
"""
identities = self._repo.get_all(currency=currency, pubkey=pubkey)
tries = 0
while tries < 3:
try:
data = await self._bma_connector.get(bma.wot.Lookup,
req_args={'search': pubkey})
for result in data['results']:
if result["pubkey"] == pubkey:
uids = result['uids']
for uid_data in uids:
identity = Identity(currency, pubkey)
identity.uid = uid_data['uid']
identity.blockstamp = data['sigDate']
identity.signature = data['self']
if identity not in identities:
identities.append(identity)
self._repo.insert(identity)
except (errors.DuniterError, asyncio.TimeoutError, ClientError) as e:
tries += 1
except NoPeerAvailable:
return identities
return identities
......@@ -7,6 +7,7 @@ class IdentitiesRepo:
"""The repository for Identities entities.
"""
_conn = attr.ib() # :type sqlite3.Connection
_primary_keys = (Identity.currency, Identity.pubkey, Identity.uid, Identity.blockstamp)
def insert(self, identity):
"""
......@@ -14,7 +15,10 @@ class IdentitiesRepo:
:param sakia.data.entities.Identity identity: the identity to commit
"""
with self._conn:
self._conn.execute("INSERT INTO identities VALUES (?,?,?,?,?,?,?,?,?,?)", attr.astuple(identity))
identity_tuple = attr.astuple(identity)
values = ",".join(['?']*len(identity_tuple))
self._conn.execute("INSERT INTO identities "
"VALUES ({0})".format(values), identity_tuple)
def update(self, identity):
"""
......@@ -22,10 +26,13 @@ class IdentitiesRepo:
:param sakia.data.entities.Identity identity: the identity to update
"""
with self._conn:
updated_fields = attr.astuple(identity, filter=attr.filters.exclude(*IdentitiesRepo._primary_keys))
where_fields = attr.astuple(identity, filter=attr.filters.include(*IdentitiesRepo._primary_keys))
self._conn.execute("UPDATE identities SET "
"signature=?, "
"ts=?,"
"revoked=?"
"written=?,"
"revoked=?,"
"member=?,"
"ms_buid=?,"
"ms_timestamp=?"
......@@ -33,10 +40,7 @@ class IdentitiesRepo:
"currency=? AND "
"pubkey=? AND "
"uid=? AND "
"blockstamp=?", attr.astuple(identity)[4:] + (identity.currency,
identity.pubkey,
identity.uid,
identity.blockstamp)
"blockstamp=?", updated_fields + where_fields
)
def get_one(self, **search):
......@@ -82,11 +86,15 @@ class IdentitiesRepo:
return [Identity(*data) for data in datas]
return []
def drop(self, currency, pubkey):
def drop(self, identity):
"""
Drop an existing identity from the database
:param str currency:
:param str pubkey:
:param sakia.data.entities.Identity identity: the identity to update
"""
with self._conn:
self._conn.execute("DELETE FROM identities WHERE currency=? AND pubkey=?", (currency, pubkey))
\ No newline at end of file
where_fields = attr.astuple(identity, filter=attr.filters.include(*IdentitiesRepo._primary_keys))
self._conn.execute("DELETE FROM identities WHERE "
"currency=? AND "
"pubkey=? AND "
"uid=? AND "
"blockstamp=?", where_fields)
......@@ -50,6 +50,7 @@ class MetaDatabase:
"blockstamp varchar(100),"
"signature varchar(100),"
"ts int,"
"written boolean,"
"revoked boolean,"
"member boolean,"
"ms_buid varchar(100),"
......
"""
Created on 9 févr. 2014
@author: inso
"""
class Error(Exception):
def __init__(self, message):
"""
Constructor
"""
self.message = "Error : " + message
def __str__(self):
return self.message
class NotEnoughChangeError(Error):
"""
Exception raised when trying to send money but user
is missing change
"""
def __init__(self, available, currency, nb_inputs, requested):
"""
Constructor
"""
super() .__init__(
"Only {0} {1} available in {2} sources, needs {3}"
.format(available,
currency,
nb_inputs,
requested))
class NoPeerAvailable(Error):
"""
Exception raised when a community doesn't have any
peer available.
"""
def __init__(self, currency, nbpeers):
"""
Constructor
"""
super() .__init__(
"No peer answered in {0} community ({1} peers available)"
.format(currency, nbpeers))
......@@ -26,6 +26,8 @@ class TestIdentitiesRepo(unittest.TestCase):
"H41/8OGV2W4CLKbE35kk5t1HJQsb3jEM0/QGLUf80CwJvGZf3HvVCcNtHPUFoUBKEDQO9mPK3KJkqOoxHpqHCw==",
1473108382,
False,
False,
False,
None,
0))
identity = identities_repo.get_one(currency="testcurrency",
......@@ -44,7 +46,7 @@ class TestIdentitiesRepo(unittest.TestCase):
self.assertEqual(identity.member, False)
self.assertEqual(identity.membership_buid, BlockUID.empty())
self.assertEqual(identity.membership_timestamp, 0)
identities_repo.drop("testcurrency", "7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ")
identities_repo.drop(identity)
identity = identities_repo.get_one(currency="testcurrency",
pubkey="7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ",
uid="john",
......@@ -64,6 +66,8 @@ class TestIdentitiesRepo(unittest.TestCase):
"H41/8OGV2W4CLKbE35kk5t1HJQsb3jEM0/QGLUf80CwJvGZf3HvVCcNtHPUFoUBKEDQO9mPK3KJkqOoxHpqHCw==",
1473108382,
False,
False,
False,
None,
0))
identities_repo.insert(Identity("testcurrency", "FADxcH5LmXGmGFgdixSes6nWnC4Vb4pRUBYT81zQRhjn",
......@@ -72,6 +76,8 @@ class TestIdentitiesRepo(unittest.TestCase):
"H41/8OGV2W4CLKbE35kk5t1HJQsb3jEM0/QGLUf80CwJvGZf3HvVCcNtHPUFoUBKEDQO9mPK3KJkqOoxHpqHCw==",
1455433535,
False,
False,
False,
None,
0))
identities = identities_repo.get_all(currency="testcurrency")
......@@ -90,6 +96,8 @@ class TestIdentitiesRepo(unittest.TestCase):
"H41/8OGV2W4CLKbE35kk5t1HJQsb3jEM0/QGLUf80CwJvGZf3HvVCcNtHPUFoUBKEDQO9mPK3KJkqOoxHpqHCw==",
1473108382,
False,
False,
False,
None,
0)
identities_repo.insert(identity)
......@@ -98,7 +106,3 @@ class TestIdentitiesRepo(unittest.TestCase):
identity2 = identities_repo.get_one(currency="testcurrency",
pubkey="7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ")
self.assertTrue(identity2.member)
"""
Created on 9 févr. 2014
@author: inso
"""
class Error(Exception):
def __init__(self, message):
"""
Constructor
"""
self.message = "Error : " + message
def __str__(self):
return self.message
class NotMemberOfCommunityError(Error):
"""
Exception raised when adding a community the account is not a member of
"""
def __init__(self, account, community):
"""
Constructor
"""
super() \
.__init__(account + " is not a member of " + community)
class LookupFailureError(Error):
"""
Exception raised when looking for a person in a community
who isnt present in key list
"""
def __init__(self, value, community):
"""
Constructor
"""
super() .__init__(
"Person looked by {0} in {1} not found ".format(value, community))
class MembershipNotFoundError(Error):
"""
Exception raised when looking for a person in a community
who isnt present in key list
"""
def __init__(self, value, community):
"""
Constructor
"""
super() .__init__(
"Membership searched by " +
value +
" in " +
community +
" not found ")
class AlgorithmNotImplemented(Error):
"""
Exception raised when a coin uses an algorithm not known
"""
def __init__(self, algo_name):
"""
Constructor
"""
super() \
.__init__("Algorithm " + algo_name + " not implemented.")
class KeyAlreadyUsed(Error):
"""
Exception raised trying to add an account using
a key already used for another account.
"""
def __init__(self, new_account, keyid, found_account):
"""
Constructor
"""
super() .__init__(
"""Cannot add account {0} :
the key {1} is already used by {2}""".format(new_account,
keyid,
found_account)
)
class NameAlreadyExists(Error):
"""
Exception raised trying to add an account using
a key already used for another account.
"""
def __init__(self, account_name):
"""
Constructor
"""
super() .__init__(
"Cannot add account " +
account_name +
" the name already exists")
class BadAccountFile(Error):
"""
Exception raised trying to add an account using
a key already used for another account.
"""
def __init__(self, path):
"""
Constructor
"""
super() .__init__(
"File " + path + " is not an account file")
class NotEnoughMoneyError(Error):
"""
Exception raised trying to add an account using
a key already used for another account.
"""
def __init__(self, available, currency, nb_inputs, requested):
"""
Constructor
"""
super() .__init__(
"Only {0} {1} available in {2} sources, needs {3}"
.format(available,
currency,
nb_inputs,
requested))
class NoPeerAvailable(Error):
"""
Exception raised when a community doesn't have any
peer available.
"""
def __init__(self, currency, nbpeers):
"""
Constructor
"""
super() .__init__(
"No peer answered in {0} community ({1} peers available)"
.format(currency, nbpeers))
class InvalidNodeCurrency(Error):
"""
Exception raised when a node doesn't use the intended currency
"""
def __init__(self, currency, node_currency):
"""
Constructor
"""
super() .__init__(
"Node is working for {0} currency, but should be {1}"
.format(node_currency, currency))
class ContactAlreadyExists(Error):
"""
Exception raised when a community doesn't have any
peer available.
"""
def __init__(self, new_contact, already_contact):
"""
Constructor
"""
super() .__init__(
"Cannot add {0}, he/she has the same pubkey as {1} contact"
.format(new_contact, already_contact))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment