diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py index e8e9c7b7e8286beca2ce3edf2d5116286d0e837a..f859aacbc9d0e0ec5d36a5b921a9524abc53520b 100644 --- a/src/sakia/data/connectors/node.py +++ b/src/sakia/data/connectors/node.py @@ -73,7 +73,7 @@ class NodeConnector(QObject): conn_handler = endpoint.conn_handler() data = await request(conn_handler, **req_args).get(self._session, **get_args) return data - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: + except (ClientError, gaierror, TimeoutError, ConnectionRefusedError, DisconnectedError, ValueError) as e: logging.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) self.node.state = Node.OFFLINE except jsonschema.ValidationError as e: diff --git a/src/sakia/data/processors/__init__.py b/src/sakia/data/processors/__init__.py index 462ae146b8924e925f1f786a876fd2f87f53de00..f3fd90b57c0353c81855616870ef930dfc0793fe 100644 --- a/src/sakia/data/processors/__init__.py +++ b/src/sakia/data/processors/__init__.py @@ -1,3 +1,3 @@ from .nodes import NodesProcessor -from .communities import CommunityProcessor -from .identities import IdentityProcessor +from .identities import IdentitiesProcessor +from .certifications import CertificationsProcessor \ No newline at end of file diff --git a/src/sakia/data/processors/certifications.py b/src/sakia/data/processors/certifications.py new file mode 100644 index 0000000000000000000000000000000000000000..a01ea4e0debb3ffe454104119601c895ffabe62e --- /dev/null +++ b/src/sakia/data/processors/certifications.py @@ -0,0 +1,22 @@ +import attr +from ..entities import Certification + + +@attr.s +class CertificationsProcessor: + _currency = attr.ib() # :type str + _repo = attr.ib() # :type sakia.data.repositories.CertificationsRepo + _bma_connector = attr.ib() # :type sakia.data.connectors.bma.BmaConnector + + def create_certification(self, cert, blockstamp): + """ + Creates a certification and insert it in the db + :param duniterpy.documents.Certification cert: + :param duniterpy.documents.BlockUID blockstamp: + :return: the instanciated certification + :rtype: sakia.data.entities.Certification + """ + cert = Certification(self._currency, cert.pubkey_from, cert.pubkey_to, cert.timestamp, + 0, cert.signatures[0], blockstamp) + self._repo.insert(cert) + return cert diff --git a/src/sakia/data/processors/identities.py b/src/sakia/data/processors/identities.py index 2348c3a3cabf7a5d171f9017367275b53e7a8a3b..defb3bf97964a45ee7a77186d70b160e9328f3e4 100644 --- a/src/sakia/data/processors/identities.py +++ b/src/sakia/data/processors/identities.py @@ -7,11 +7,12 @@ from sakia.errors import NoPeerAvailable @attr.s -class IdentityProcessor: - _repo = attr.ib() # :type sakia.data.repositories.IdentitiesRepo +class IdentitiesProcessor: + _currency = attr.ib() # :type str + _identities_repo = attr.ib() # :type sakia.data.repositories.IdentitiesRepo _bma_connector = attr.ib() # :type sakia.data.connectors.bma.BmaConnector - async def find_from_pubkey(self, currency, pubkey): + async def find_from_pubkey(self, pubkey): """ Get the list of identities corresponding to a pubkey from the network and the local db @@ -19,7 +20,7 @@ class IdentityProcessor: :param pubkey: :rtype: list[sakia.data.entities.Identity] """ - identities = self._repo.get_all(currency=currency, pubkey=pubkey) + identities = self._identities_repo.get_all(currency=self._currency, pubkey=pubkey) tries = 0 while tries < 3: try: @@ -29,15 +30,32 @@ class IdentityProcessor: if result["pubkey"] == pubkey: uids = result['uids'] for uid_data in uids: - identity = Identity(currency, pubkey) + identity = Identity(self._currency, pubkey) identity.uid = uid_data['uid'] identity.blockstamp = data['sigDate'] identity.signature = data['self'] if identity not in identities: identities.append(identity) - self._repo.insert(identity) + self._identities_repo.insert(identity) except (errors.DuniterError, asyncio.TimeoutError, ClientError) as e: tries += 1 except NoPeerAvailable: return identities return identities + + def get_written(self, pubkey): + """ + Get identities from a given certification document + :param str pubkey: the pubkey of the identity + + :rtype: sakia.data.entities.Identity + """ + return self._identities_repo.get_written(**{'currency': self._currency, 'pubkey': pubkey}) + + def update_identity(self, identity): + """ + Saves an identity state in the db + :param identity: + :return: + """ + self._identities_repo.update(identity) diff --git a/src/sakia/data/processors/nodes.py b/src/sakia/data/processors/nodes.py index d89a57ea9f2f176a81a82ae7476baf19d3844bbf..a85cd42a7d862701c622fb4e8a7b5e641aa3c793 100644 --- a/src/sakia/data/processors/nodes.py +++ b/src/sakia/data/processors/nodes.py @@ -35,6 +35,23 @@ class NodesProcessor: else: self._repo.insert(node) + def insert_node(self, node): + """ + Update node in the repository. + First involves basic checks about pubkey and primary key constraints. + + :param sakia.data.entities.Node node: the node to update + """ + self._repo.insert(node) + + def unknown_node(self, pubkey): + """ + Search for pubkey in the repository. + :param str pubkey: the pubkey to lookup + """ + other_node = self._repo.get_one(**{'currency': self._currency, 'pubkey': pubkey}) + return other_node is None + def nodes(self): """ Get all knew nodes. diff --git a/src/sakia/data/repositories/identities.py b/src/sakia/data/repositories/identities.py index 3af88b1a0e6f5b09e8a3b398e2b98356b57ab2b2..6563b20f85a319eda46a7f6606a655baa43ce396 100644 --- a/src/sakia/data/repositories/identities.py +++ b/src/sakia/data/repositories/identities.py @@ -1,5 +1,7 @@ import attr +from duniterpy.documents.block import BlockUID + from ..entities import Identity @@ -65,6 +67,40 @@ class IdentitiesRepo: if data: return Identity(*data) + def get_written(self, offset=0, limit=1000, sort_by="currency", sort_order="ASC", **search): + """ + Get an identity in the database written in the blockchain + and corresponding to the search + + :param dict search: the criterions of the lookup + :rtype: List[sakia.data.entities.Identity] + """ + with self._conn: + filters = [] + values = [] + for k, v in search.items(): + if isinstance(v, bool): + v = int(v) + filters.append("{k}=?".format(k=k)) + values.append(v) + + request = """SELECT * FROM identities WHERE {filters} + AND ms_written_on!="{empty_buid}" + ORDER BY {sort_by} {sort_order} + LIMIT {limit} OFFSET {offset}""" \ + .format(filters=" AND ".join(filters), + empty_buid=str(BlockUID.empty()), + offset=offset, + limit=limit, + sort_by=sort_by, + sort_order=sort_order + ) + c = self._conn.execute(request, tuple(values)) + datas = c.fetchall() + if datas: + return [Identity(*data) for data in datas] + return [] + def get_all(self, **search): """ Get all existing identity in the database corresponding to the search @@ -75,6 +111,8 @@ class IdentitiesRepo: filters = [] values = [] for k, v in search.items(): + if isinstance(v, bool): + v = int(v) filters.append("{k}=?".format(k=k)) values.append(v) diff --git a/src/sakia/services/__init__.py b/src/sakia/services/__init__.py index 391f3f96909275e47196b2f72237388596698ebd..95bd45c68fc1ca3dbb91e9fffe96c0d065bf6ff7 100644 --- a/src/sakia/services/__init__.py +++ b/src/sakia/services/__init__.py @@ -1 +1,2 @@ -from .network import NetworkService \ No newline at end of file +from .network import NetworkService +from .identities import IdentitiesService \ No newline at end of file diff --git a/src/sakia/services/identities.py b/src/sakia/services/identities.py new file mode 100644 index 0000000000000000000000000000000000000000..f78256b81cb4443118969ec77031e86868c852cc --- /dev/null +++ b/src/sakia/services/identities.py @@ -0,0 +1,135 @@ +from PyQt5.QtCore import QObject +import asyncio +from duniterpy.api import bma + + +class IdentitiesService(QObject): + """ + Identities service is managing new blocks received + to update data locally + """ + def __init__(self, currency, identities_processor, certs_processor, bma_connector): + """ + Constructor the identities service + + :param str currency: The currency name of the community + :param sakia.data.processors.IdentitiesProcessor identities_processor: the identities processor for given currency + :param sakia.data.processors.CertificationsProcessor certs_processor: the certifications processor for given currency + :param sakia.data.connectors.BmaConnector bma_connector: The connector to BMA API + """ + super().__init__() + self._identities_processor = identities_processor + self._certs_processor = certs_processor + self._bma_connector = bma_connector + self.currency = currency + + def _parse_revocations(self, block): + """ + Parse revoked pubkeys found in a block and refresh local data + + :param duniterpy.documents.Block block: the block received + :return: list of pubkeys updated + """ + revoked = set([]) + for rev in block.revoked: + revoked.add(rev.pubkey) + + for pubkey in revoked: + written = self._identities_processor.get_written(pubkey) + # we update every written identities known locally + if written: + written.revoked_on = block.blockUID + written.member = False + return revoked + + def _parse_memberships(self, block): + """ + Parse memberships pubkeys found in a block and refresh local data + + :param duniterpy.documents.Block block: the block received + :return: list of pubkeys requiring a refresh of requirements + """ + need_refresh = [] + for ms in block.joiners + block.actives: + written = self._identities_processor.get_written(ms.issuer) + # we update every written identities known locally + if written: + written.membership_written_on = block.blockUID + written.membership_type = "IN" + written.membership_buid = ms.membership_ts + self._identities_processor.update_identity(written) + # If the identity was not member + # it can become one + if not written.member: + need_refresh.append(written) + + for ms in block.leavers: + written = self._identities_processor.get_written(ms.issuer) + # we update every written identities known locally + if written: + written.membership_written_on = block.blockUID + written.membership_type = "OUT" + written.membership_buid = ms.membership_ts + self._identities_processor.update_identity(written) + # If the identity was not member + # it can stop to be one + if not written.member: + need_refresh.append(written) + return need_refresh + + def _parse_certifications(self, block): + """ + Parse certified pubkeys found in a block and refresh local data + :param duniterpy.documents.Block block: + :return: + """ + need_refresh = set([]) + for cert in block.certifications: + written = self._identities_processor.get_written(cert.pubkey_to) + # if we have locally a written identity matching the certification + if written or self._identities_processor.get_written(cert.pubkey_from): + self._certs_processor.create_certification(cert, block.blockUID) + # we update every written identities known locally + if written: + # A certification can change the requirements state + # of an identity + need_refresh.add(written) + return need_refresh + + async def refresh_requirements(self, identity): + """ + Refresh a given identity information + :param sakia.data.entities.Identity identity: + :return: + """ + requirements = await self._bma_connector.get(bma.wot.Requirements, get_args={'search': identity.pubkey}) + identity_data = requirements['identities'][0] + identity.uid = identity_data["uid"] + identity.blockstamp = identity["meta"]["timestamp"] + identity.member = identity["membershipExpiresIn"] > 0 and identity["outdistanced"] is False + self._identities_processor.update_identity(identity) + + def parse_block(self, block): + """ + Parse a block to refresh local data + :param block: + :return: + """ + self._parse_revocations(block) + need_refresh = [] + need_refresh += self._parse_memberships(block) + need_refresh += self._parse_certifications(block) + return set(need_refresh) + + async def handle_new_block(self, block): + """ + Handle new block received and refresh local data + :param duniterpy.documents.Block block: the received block + """ + need_refresh = self.parse_block(block) + refresh_futures = [] + # for every identity for which we need a refresh, we gather + # requirements requests + for identity in need_refresh: + refresh_futures.append(self.refresh_requirements(identity)) + await asyncio.gather(refresh_futures) diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index 374cf2356ae7f5fd5f3f95531ec4128ae23e2806..83e081df2cab518d18108e27e928c90dba12bea5 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -1,8 +1,3 @@ -""" -Created on 24 févr. 2015 - -@author: inso -""" from sakia.data.connectors import NodeConnector from sakia.data.entities import Node from sakia.errors import InvalidNodeCurrency @@ -11,11 +6,9 @@ import logging import time import asyncio from duniterpy.key import VerifyingKey -from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QTimer +from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject from collections import Counter -MAX_CONFIRMATIONS = 6 - class NetworkService(QObject): """ @@ -27,18 +20,16 @@ class NetworkService(QObject): new_block_mined = pyqtSignal(int) blockchain_rollback = pyqtSignal(int) - def __init__(self, currency, repo, processor, connectors, session): + def __init__(self, currency, processor, connectors, session): """ Constructor of a network :param str currency: The currency name of the community - :param sakia.data.repositories.NodesRepository repo: the nodes repository :param sakia.data.processors.NodesProcessor processor: the nodes processor for given currency :param list connectors: The connectors to nodes of the network :param aiohttp.ClientSession session: The main aiohttp client session """ super().__init__() - self._repo = repo self._processor = processor self._connectors = [] for c in connectors: @@ -50,17 +41,19 @@ class NetworkService(QObject): self._discovery_stack = [] @classmethod - def create(cls, repo, processor, node_connector): + def create(cls, processor, node_connector): """ Create a new network with one knew node Crawls the nodes from the first node to build the community network - :param node_connector: The first connector of the network service + :param sakia.data.processors.NodeProcessor processor: The nodes processor + :param sakia.data.connectors.NodeConnector node_connector: The first connector of the network service + :return: """ connectors = [node_connector] - repo.insert(node_connector.node) - network = cls(node_connector.node.currency, repo, processor, connectors, node_connector.session) + processor.insert_node(node_connector.node) + network = cls(node_connector.node.currency, processor, connectors, node_connector.session) return network def start_coroutines(self): @@ -177,12 +170,11 @@ class NetworkService(QObject): try: await asyncio.sleep(1) peer = self._discovery_stack.pop() - pubkeys = [n.pubkey for n in self._processor.nodes()] - if peer.pubkey not in pubkeys: + if self._processor.unknown_node(peer.pubkey): logging.debug("New node found : {0}".format(peer.pubkey[:5])) try: connector = NodeConnector.from_peer(self.currency, peer, self.session) - self._repo.insert(connector.node) + self._processor.insert_node(connector.node) connector.refresh(manual=True) self.add_connector(connector) self.nodes_changed.emit() @@ -206,7 +198,7 @@ class NetworkService(QObject): @pyqtSlot() def handle_identity_change(self): connector = self.sender() - self._repo.update(connector.node) + self._processor.update_node(connector.node) self.nodes_changed.emit() @pyqtSlot() @@ -225,7 +217,7 @@ class NetworkService(QObject): if node_connector.node.state in (Node.ONLINE, Node.DESYNCED): self._check_nodes_sync() self.nodes_changed.emit() - self._repo.update(node_connector.node) + self._processor.update_node(node_connector.node) if node_connector.node.state == Node.ONLINE: current_buid = self._processor.current_buid() diff --git a/src/sakia/tests/technical/test_identities_service.py b/src/sakia/tests/technical/test_identities_service.py new file mode 100644 index 0000000000000000000000000000000000000000..f1ad4434abf2805908666efc732a811f2f7b2022 --- /dev/null +++ b/src/sakia/tests/technical/test_identities_service.py @@ -0,0 +1,38 @@ +import unittest +import sqlite3 +from duniterpy.documents import BlockUID, Block +from sakia.tests.mocks.bma.nice_blockchain import bma_blockchain_0 +from sakia.tests import QuamashTest +from sakia.services import IdentitiesService +from sakia.data.repositories import CertificationsRepo, IdentitiesRepo, MetaDatabase +from sakia.data.processors import CertificationsProcessor, IdentitiesProcessor + + +class TestIdentitiesService(unittest.TestCase, QuamashTest): + def setUp(self): + self.setUpQuamash() + sqlite3.register_adapter(BlockUID, str) + sqlite3.register_adapter(bool, int) + sqlite3.register_adapter(list, lambda ls: '\n'.join([str(v) for v in ls])) + sqlite3.register_adapter(tuple, lambda ls: '\n'.join([str(v) for v in ls])) + sqlite3.register_converter("BOOLEAN", lambda v: bool(int(v))) + self.con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) + + def tearDown(self): + self.tearDownQuamash() + + def test_new_block_with_unknown_identities(self): + meta_repo = MetaDatabase(self.con) + meta_repo.prepare() + meta_repo.upgrade_database() + identities_repo = IdentitiesRepo(self.con) + certs_repo = CertificationsRepo(self.con) + identities_processor = IdentitiesProcessor("testcurrency", identities_repo, None) + certs_processor = CertificationsProcessor("testcurrency", certs_repo, None) + identities_service = IdentitiesService("testcurrency", identities_processor, certs_processor, None) + block = Block.from_signed_raw("{0}{1}\n".format(bma_blockchain_0["raw"], bma_blockchain_0["signature"])) + identities_service.parse_block(block) + self.assertEqual(identities_processor.get_written("8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU"), []) + self.assertEqual(identities_processor.get_written("HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk"), []) + self.assertEqual(identities_processor.get_written("BMAVuMDcGhYAV4wA27DL1VXX2ZARZGJYaMwpf7DJFMYH"), []) + self.assertEqual(identities_processor.get_written("37qBxM4hLV2jfyYo2bNzAjkeLngLr2r7G2HpdpKieVxw"), []) diff --git a/src/sakia/tests/technical/test_network_service.py b/src/sakia/tests/technical/test_network_service.py index e86ec44f06366580302a574aded703eb496dd7bc..6c37c9399f2a9c59580bc82253aa26ff31da2d26 100644 --- a/src/sakia/tests/technical/test_network_service.py +++ b/src/sakia/tests/technical/test_network_service.py @@ -38,7 +38,7 @@ class TestNetworkService(unittest.TestCase, QuamashTest): node_connector = NodeConnector.from_peer(peer_document.currency, peer_document, session) processor = NodesProcessor(peer_document.currency, nodes_repo) - network_service = NetworkService.create(nodes_repo, processor, node_connector) + network_service = NetworkService.create(processor, node_connector) network_service._must_crawl = True asyncio.ensure_future(network_service.discovery_loop()) diff --git a/src/sakia/tests/unit/core/test_bma_access.py b/src/sakia/tests/unit/core/test_bma_access.py deleted file mode 100644 index b4a3257b403e5eb7447d3e61aee186e07435f295..0000000000000000000000000000000000000000 --- a/src/sakia/tests/unit/core/test_bma_access.py +++ /dev/null @@ -1,49 +0,0 @@ -import unittest -from unittest.mock import Mock -import time -from PyQt5.QtCore import QLocale -from sakia.core.registry.identities import Identity, IdentitiesRegistry, LocalState, BlockchainState - -from sakia.tests.mocks.bma import nice_blockchain, corrupted -from sakia.tests import QuamashTest -from sakia.core import Application, Community -from sakia.core.net import Network, Node -from duniterpy.documents.peer import Peer -from sakia.core.net.api.bma.access import BmaAccess - - -class TestBmaAccess(unittest.TestCase, QuamashTest): - def setUp(self): - self.setUpQuamash() - QLocale.setDefault(QLocale("en_GB")) - self.identities_registry = IdentitiesRegistry() - - self.application = Application(self.qapplication, self.lp, self.identities_registry) - self.application.preferences['notifications'] = False - - self.peer = Peer.from_signed_raw("""Version: 2 -Type: Peer -Currency: meta_brouzouf -PublicKey: 8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU -Block: 48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8 -Endpoints: -BASIC_MERKLED_API duniter.inso.ovh 80 -82o1sNCh1bLpUXU6nacbK48HBcA9Eu2sPkL1/3c2GtDPxBUZd2U2sb7DxwJ54n6ce9G0Oy7nd1hCxN3fS0oADw== -""") - self.node = Node(self.peer, - "", "HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk", - None, Node.ONLINE, - time.time(), {}, "duniter", "0.12.0", 0, Mock("aiohttp.ClientSession")) - self.network = Network.create(self.node) - self.bma_access = BmaAccess.create(self.network) - self.community = Community("test_currency", self.network, self.bma_access) - - def tearDown(self): - self.tearDownQuamash() - - def test_compare_json_with_nonetype(self): - res = self.bma_access._compare_json({}, corrupted.bma_null_data) - self.assertFalse(res) - - def test_filter_nodes(self): - pass#TODO