diff --git a/src/sakia/core/community.py b/src/sakia/core/community.py index 40d0474bdb0a16a80958aab6b46f0fc2e47e6601..cb640069c0b07166b999beb2347d9ea5cefc54d7 100644 --- a/src/sakia/core/community.py +++ b/src/sakia/core/community.py @@ -10,10 +10,10 @@ import math from PyQt5.QtCore import QObject -from ..tools.exceptions import NoPeerAvailable -from .net.network import Network +from sakia.errors import NoPeerAvailable +from sakia.data.processors import NodesProcessor from duniterpy.api import bma, errors -from .net.api.bma.access import BmaAccess +from sakia.data.connectors import BmaConnector class Community(QObject): @@ -23,21 +23,21 @@ class Community(QObject): .. warning:: The currency name is supposed to be unique in sakia but nothing exists in duniter to assert that a currency name is unique. """ - def __init__(self, currency, network, bma_access): + def __init__(self, currency, nodes_processor, bma_connector): """ Initialize community attributes with a currency and a network. :param str currency: The currency name of the community. - :param sakia.core.net.network.Network network: The network of the community - :param sakia.core.net.api.bma.access.BmaAccess bma_access: The BMA Access object + :param sakia.data.processors.NodesProcessor nodes_processor: The network of the community + :param sakia.data.connectors.BmaConnector bma_connector: The BMA connector object .. warning:: The community object should be created using its factory class methods. """ super().__init__() self.currency = currency - self._network = network - self._bma_access = bma_access + self._nodes_processor = nodes_processor + self._bma_connector = bma_connector @classmethod def create(cls, node): diff --git a/src/sakia/core/net/__init__.py b/src/sakia/core/net/__init__.py deleted file mode 100644 index 6ea6364fb9f0854431d8352f088cf781026b1a1d..0000000000000000000000000000000000000000 --- a/src/sakia/core/net/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .node import Node -from .network import Network \ No newline at end of file diff --git a/src/sakia/core/net/api/bma/__init__.py b/src/sakia/core/net/api/bma/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/sakia/core/net/network.py b/src/sakia/core/net/network.py deleted file mode 100644 index d28357f19db7823c2ef187a840953e7c25e17bc0..0000000000000000000000000000000000000000 --- a/src/sakia/core/net/network.py +++ /dev/null @@ -1,441 +0,0 @@ -""" -Created on 24 févr. 2015 - -@author: inso -""" -from .node import Node -from ...tools.exceptions import InvalidNodeCurrency -from ...tools.decorators import asyncify -import logging -import aiohttp -import time -import asyncio -from duniterpy.documents import Peer, Block, BlockUID, MalformedDocumentError -from duniterpy.key import VerifyingKey -from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QTimer -from collections import Counter - -MAX_CONFIRMATIONS = 6 - - -class Network(QObject): - """ - A network is managing nodes polling and crawling of a - given community. - """ - nodes_changed = pyqtSignal() - root_nodes_changed = pyqtSignal() - new_block_mined = pyqtSignal(int) - blockchain_rollback = pyqtSignal(int) - - def __init__(self, currency, nodes, session): - """ - Constructor of a network - - :param str currency: The currency name of the community - :param list nodes: The root nodes of the network - """ - super().__init__() - self._root_nodes = nodes - self._nodes = [] - for n in nodes: - self.add_node(n) - self.currency = currency - self._must_crawl = False - self._block_found = self.current_blockUID - self._timer = QTimer() - self._client_session = session - self._discovery_stack = [] - - @classmethod - def create(cls, node): - """ - Create a new network with one knew node - Crawls the nodes from the first node to build the - community network - - :param node: The first knew node of the network - """ - nodes = [node] - network = cls(node.currency, nodes, node.session) - return network - - def merge_with_json(self, json_data, file_version): - """ - We merge with knew nodes when we - last stopped sakia - - :param dict json_data: Nodes in json format - :param NormalizedVersion file_version: The node version - """ - for data in json_data: - try: - node = Node.from_json(self.currency, data, file_version, self.session) - if node.pubkey not in [n.pubkey for n in self.nodes]: - self.add_node(node) - logging.debug("Loading : {:}".format(data['pubkey'])) - else: - other_node = [n for n in self.nodes if n.pubkey == node.pubkey][0] - other_node._uid = node.uid - other_node._version = node.version - other_node._software = node.software - other_node._peer = node.peer - switch = False - if other_node.block and node.block: - if other_node.block['hash'] != node.block['hash']: - switch = True - else: - switch = True - if switch: - other_node.set_block(node.block) - other_node.last_change = node.last_change - other_node.state = node.state - except MalformedDocumentError: - logging.debug("Could not load node {0}".format(data)) - - @classmethod - def from_json(cls, currency, json_data, file_version): - """ - Load a network from a configured community - - :param str currency: The currency name of a community - :param dict json_data: A json_data view of a network - :param NormalizedVersion file_version: the version of the json file - """ - session = aiohttp.ClientSession() - nodes = [] - for data in json_data: - try: - node = Node.from_json(currency, data, file_version, session) - nodes.append(node) - except MalformedDocumentError: - logging.debug("Could not load node {0}".format(data)) - network = cls(currency, nodes, session) - return network - - def jsonify(self): - """ - Get the network in json format. - - :return: The network as a dict in json format. - """ - data = [] - for node in self.nodes: - data.append(node.jsonify()) - return data - - @property - def quality(self): - """ - Get a ratio of the synced nodes vs the rest - """ - synced = len(self.synced_nodes) - total = len(self.nodes) - if total == 0: - ratio_synced = 0 - else: - ratio_synced = synced / total - return ratio_synced - - def start_coroutines(self): - """ - Start network nodes crawling - :return: - """ - asyncio.ensure_future(self.discover_network()) - - async def stop_coroutines(self, closing=False): - """ - Stop network nodes crawling. - """ - self._must_crawl = False - close_tasks = [] - logging.debug("Start closing") - for node in self.nodes: - close_tasks.append(asyncio.ensure_future(node.close_ws())) - logging.debug("Closing {0} websockets".format(len(close_tasks))) - if len(close_tasks) > 0: - await asyncio.wait(close_tasks, timeout=15) - if closing: - logging.debug("Closing client session") - await self._client_session.close() - logging.debug("Closed") - - @property - def session(self): - return self._client_session - - def continue_crawling(self): - return self._must_crawl - - @property - def synced_nodes(self): - """ - Get nodes which are in the ONLINE state. - """ - return [n for n in self.nodes if n.state == Node.ONLINE] - - @property - def online_nodes(self): - """ - Get nodes which are in the ONLINE state. - """ - return [n for n in self.nodes if n.state in (Node.ONLINE, Node.DESYNCED)] - - @property - def nodes(self): - """ - Get all knew nodes. - """ - return self._nodes - - @property - def root_nodes(self): - """ - Get root nodes. - """ - return self._root_nodes - - @property - def current_blockUID(self): - """ - Get the latest block considered valid - It is the most frequent last block of every known nodes - """ - blocks = [n.block for n in self.synced_nodes if n.block] - if len(blocks) > 0: - return BlockUID(blocks[0]['number'], blocks[0]['hash']) - else: - return BlockUID.empty() - - def _check_nodes_sync(self): - """ - Check nodes sync with the following rules : - 1 : The block of the majority - 2 : The more last different issuers - 3 : The more difficulty - 4 : The biggest number or timestamp - """ - # rule number 1 : block of the majority - blocks = [n.block['hash'] for n in self.online_nodes if n.block] - blocks_occurences = Counter(blocks) - blocks_by_occurences = {} - for key, value in blocks_occurences.items(): - the_block = [n.block for n in self.online_nodes if n.block and n.block['hash'] == key][0] - if value not in blocks_by_occurences: - blocks_by_occurences[value] = [the_block] - else: - blocks_by_occurences[value].append(the_block) - - if len(blocks_by_occurences) == 0: - for n in [n for n in self.online_nodes if n.state in (Node.ONLINE, Node.DESYNCED)]: - n.state = Node.ONLINE - return - - most_present = max(blocks_by_occurences.keys()) - - if len(blocks_by_occurences[most_present]) > 1: - # rule number 2 : more last different issuers - # not possible atm - blocks_by_issuers = blocks_by_occurences.copy() - most_issuers = max(blocks_by_issuers.keys()) - if len(blocks_by_issuers[most_issuers]) > 1: - # rule number 3 : biggest PowMin - blocks_by_powmin = {} - for block in blocks_by_issuers[most_issuers]: - if block['powMin'] in blocks_by_powmin: - blocks_by_powmin[block['powMin']].append(block) - else: - blocks_by_powmin[block['powMin']] = [block] - bigger_powmin = max(blocks_by_powmin.keys()) - if len(blocks_by_powmin[bigger_powmin]) > 1: - # rule number 3 : latest timestamp - blocks_by_ts = {} - for block in blocks_by_powmin[bigger_powmin]: - blocks_by_ts[block['time']] = block - latest_ts = max(blocks_by_ts.keys()) - synced_block_hash = blocks_by_ts[latest_ts]['hash'] - else: - synced_block_hash = blocks_by_powmin[bigger_powmin][0]['hash'] - else: - synced_block_hash = blocks_by_issuers[most_issuers][0]['hash'] - else: - synced_block_hash = blocks_by_occurences[most_present][0]['hash'] - - for n in self.online_nodes: - if n.block and n.block['hash'] == synced_block_hash: - n.state = Node.ONLINE - else: - n.state = Node.DESYNCED - - def _check_nodes_unique(self): - """ - Check that all nodes are unique by them pubkeys - """ - pubkeys = set() - unique_nodes = [] - for n in self.nodes: - if n.pubkey not in pubkeys: - unique_nodes.append(n) - pubkeys.add(n.pubkey) - - self._nodes = unique_nodes - - def confirmations(self, block_number): - """ - Get the number of confirmations of a data - :param int block_number: The block number of the data - :return: the number of confirmations of a data - :rtype: int - """ - if block_number: - if block_number > self.current_blockUID.number: - raise ValueError("Could not compute confirmations : data block number is after current block") - return self.current_blockUID.number - block_number + 1 - else: - return 0 - - def add_node(self, node): - """ - Add a nod to the network. - """ - self._nodes.append(node) - node.changed.connect(self.handle_change) - node.error.connect(self.handle_error) - node.identity_changed.connect(self.handle_identity_change) - node.neighbour_found.connect(self.handle_new_node) - logging.debug("{:} connected".format(node.pubkey[:5])) - - def add_root_node(self, node): - """ - Add a node to the root nodes list - """ - self._root_nodes.append(node) - self.root_nodes_changed.emit() - - def remove_root_node(self, node): - """ - Remove a node from the root nodes list - """ - self._root_nodes.remove(node) - self.root_nodes_changed.emit() - - def is_root_node(self, node): - """ - Check if this node is in the root nodes - """ - return node in self._root_nodes - - def root_node_index(self, index): - """ - Get the index of a root node from its index - in all nodes list - """ - node = self.nodes[index] - return self._root_nodes.index(node) - - @asyncify - async def refresh_once(self): - for node in self._nodes: - await asyncio.sleep(1) - node.refresh(manual=True) - - async def discover_network(self): - """ - Start crawling which never stops. - To stop this crawling, call "stop_crawling" method. - """ - self._must_crawl = True - first_loop = True - asyncio.ensure_future(self.pop_discovery_stack()) - while self.continue_crawling(): - for node in self.nodes: - if self.continue_crawling(): - node.refresh() - if not first_loop: - await asyncio.sleep(15) - first_loop = False - await asyncio.sleep(15) - - logging.debug("End of network discovery") - - async def pop_discovery_stack(self): - """ - Handle poping of nodes in discovery stack - :return: - """ - while self.continue_crawling(): - try: - await asyncio.sleep(1) - peer = self._discovery_stack.pop() - pubkeys = [n.pubkey for n in self.nodes] - if peer.pubkey not in pubkeys: - logging.debug("New node found : {0}".format(peer.pubkey[:5])) - try: - node = Node.from_peer(self.currency, peer, self.session) - node.refresh(manual=True) - self.add_node(node) - self.nodes_changed.emit() - except InvalidNodeCurrency as e: - logging.debug(str(e)) - else: - node = [n for n in self.nodes if n.pubkey == peer.pubkey][0] - if node.peer.blockUID.number < peer.blockUID.number: - logging.debug("Update node : {0}".format(peer.pubkey[:5])) - node.peer = peer - except IndexError: - await asyncio.sleep(2) - - def handle_new_node(self, peer): - key = VerifyingKey(peer.pubkey) - if key.verify_document(peer): - if len(self._discovery_stack) < 1000 \ - and peer.signatures[0] not in [p.signatures[0] for p in self._discovery_stack]: - logging.debug("Stacking new peer document : {0}".format(peer.pubkey)) - self._discovery_stack.append(peer) - else: - logging.debug("Wrong document received : {0}".format(peer.signed_raw())) - - @pyqtSlot() - def handle_identity_change(self): - node = self.sender() - self._check_nodes_unique() - if node in self._root_nodes: - self.root_nodes_changed.emit() - self.nodes_changed.emit() - - @pyqtSlot() - def handle_error(self): - node = self.sender() - if node.state in (Node.OFFLINE, Node.CORRUPTED) and \ - node.last_change + 3600 < time.time(): - node.disconnect() - self.nodes.remove(node) - self.nodes_changed.emit() - - @pyqtSlot() - def handle_change(self): - node = self.sender() - - if node.state in (Node.ONLINE, Node.DESYNCED): - self._check_nodes_sync() - self._check_nodes_unique() - self.nodes_changed.emit() - - if node.state == Node.ONLINE: - logging.debug("{0} -> {1}".format(self._block_found.sha_hash[:10], self.current_blockUID.sha_hash[:10])) - if self._block_found.sha_hash != self.current_blockUID.sha_hash: - logging.debug("Latest block changed : {0}".format(self.current_blockUID.number)) - # If new latest block is lower than the previously found one - # or if the previously found block is different locally - # than in the main chain, we declare a rollback - if self._block_found.number and \ - self.current_blockUID.number <= self._block_found.number \ - or node.main_chain_previous_block and \ - node.main_chain_previous_block['hash'] != self._block_found.sha_hash: - - self._block_found = self.current_blockUID - self.blockchain_rollback.emit(self.current_blockUID.number) - else: - self._block_found = self.current_blockUID - self.new_block_mined.emit(self.current_blockUID.number) diff --git a/src/sakia/core/net/node.py b/src/sakia/core/net/node.py deleted file mode 100644 index 44b5195932d88b40e4b13278219ebc02018ba378..0000000000000000000000000000000000000000 --- a/src/sakia/core/net/node.py +++ /dev/null @@ -1,643 +0,0 @@ -""" -Created on 21 févr. 2015 - -@author: inso -""" - -from duniterpy.documents.peer import Peer, Endpoint, BMAEndpoint -from duniterpy.documents import Block, BlockUID, MalformedDocumentError -from ...tools.exceptions import InvalidNodeCurrency -from ...tools.decorators import asyncify -from duniterpy.api import bma, errors -from duniterpy.api.bma import ConnectionHandler - -from aiohttp.errors import WSClientDisconnectedError, WSServerHandshakeError, ClientResponseError -from aiohttp.errors import ClientError, DisconnectedError -from asyncio import TimeoutError -import logging -import time -import jsonschema -import asyncio -import aiohttp -from pkg_resources import parse_version -from socket import gaierror - -from PyQt5.QtCore import QObject, pyqtSignal - - -class Node(QObject): - """ - A node is a peer send from the client point of view. - This node can have multiple states : - - ONLINE : The node is available for requests - - OFFLINE: The node is disconnected - - DESYNCED : The node is online but is desynced from the network - - CORRUPTED : The node is corrupted, some weird behaviour is going on - """ - - ONLINE = 1 - OFFLINE = 2 - DESYNCED = 3 - CORRUPTED = 4 - - changed = pyqtSignal() - error = pyqtSignal() - identity_changed = pyqtSignal() - neighbour_found = pyqtSignal(Peer) - - def __init__(self, peer, uid, pubkey, block, - state, last_change, last_merkle, - software, version, fork_window, - session): - """ - Constructor - """ - super().__init__() - self._peer = peer - self._uid = uid - self._pubkey = pubkey - self._block = block - self.main_chain_previous_block = None - self._state = state - self._neighbours = [] - self._last_change = last_change - self._last_merkle = last_merkle - self._software = software - self._version = version - self._fork_window = fork_window - self._refresh_counter = 19 - self._ws_tasks = {'block': None, - 'peer': None} - self._connected = {'block': False, - 'peer': False} - self._session = session - - def __del__(self): - for ws in self._ws_tasks.values(): - if ws: - ws.cancel() - - @classmethod - async def from_address(cls, currency, address, port, session): - """ - Factory method to get a node from a given address - - :param str currency: The node currency. None if we don't know\ - the currency it should have, for example if its the first one we add - :param str address: The node address - :param int port: The node port - :return: A new node - :rtype: sakia.core.net.Node - """ - peer_data = await bma.network.Peering(ConnectionHandler(address, port)).get(session) - - peer = Peer.from_signed_raw("{0}{1}\n".format(peer_data['raw'], - peer_data['signature'])) - - if currency is not None: - if peer.currency != currency: - raise InvalidNodeCurrency(peer.currency, currency) - - node = cls(peer, - "", peer.pubkey, None, Node.ONLINE, time.time(), - {'root': "", 'leaves': []}, "", "", 0, session) - logging.debug("Node from address : {:}".format(str(node))) - return node - - @classmethod - def from_peer(cls, currency, peer, session): - """ - Factory method to get a node from a peer document. - - :param str currency: The node currency. None if we don't know\ - the currency it should have, for example if its the first one we add - :param peer: The peer document - :return: A new node - :rtype: sakia.core.net.Node - """ - if currency is not None: - if peer.currency != currency: - raise InvalidNodeCurrency(peer.currency, currency) - - node = cls(peer, "", peer.pubkey, None, - Node.OFFLINE, time.time(), - {'root': "", 'leaves': []}, - "", "", 0, session) - logging.debug("Node from peer : {:}".format(str(node))) - return node - - @classmethod - def from_json(cls, currency, data, file_version, session): - """ - Loads a node from json data - - :param str currency: the currency of the community - :param dict data: the json data of the node - :param NormalizedVersion file_version: the version of the file - :return: A new node - :rtype: Node - """ - endpoints = [] - uid = "" - pubkey = "" - software = "" - version = "" - fork_window = 0 - block = None - last_change = time.time() - state = Node.OFFLINE - if 'uid' in data: - uid = data['uid'] - - if 'pubkey' in data: - pubkey = data['pubkey'] - - if 'last_change' in data: - last_change = data['last_change'] - - if 'block' in data: - block = data['block'] - - if 'state' in data: - state = data['state'] - - if 'software' in data: - software = data['software'] - - if 'version' in data: - version = data['version'] - - if 'fork_window' in data: - fork_window = data['fork_window'] - - if parse_version("0.11") <= file_version < parse_version("0.12dev1") : - for endpoint_data in data['endpoints']: - endpoints.append(Endpoint.from_inline(endpoint_data)) - - if currency in data: - currency = data['currency'] - - peer = Peer(2, currency, pubkey, BlockUID(0, Block.Empty_Hash), endpoints, "SOMEFAKESIGNATURE") - else: - peer = Peer.from_signed_raw(data['peer']) - - node = cls(peer, uid, pubkey, block, - state, last_change, - {'root': "", 'leaves': []}, - software, version, fork_window, session) - - logging.debug("Node from json : {:}".format(str(node))) - return node - - def jsonify_root_node(self): - logging.debug("Saving root node : {:}".format(str(self))) - data = {'pubkey': self._pubkey, - 'uid': self._uid, - 'peer': self._peer.signed_raw()} - return data - - def jsonify(self): - logging.debug("Saving node : {:}".format(str(self))) - data = {'pubkey': self._pubkey, - 'uid': self._uid, - 'peer': self._peer.signed_raw(), - 'state': self._state, - 'last_change': self._last_change, - 'block': self.block, - 'software': self._software, - 'version': self._version, - 'fork_window': self._fork_window - } - return data - - async def close_ws(self): - for ws in self._ws_tasks.values(): - if ws: - ws.cancel() - await asyncio.sleep(0) - closed = False - while not closed: - for ws in self._ws_tasks.values(): - if ws: - closed = False - break - else: - closed = True - await asyncio.sleep(0) - await asyncio.sleep(0) - - @property - def session(self): - return self._session - - @property - def pubkey(self): - return self._pubkey - - @property - def endpoint(self) -> BMAEndpoint: - return next((e for e in self._peer.endpoints if type(e) is BMAEndpoint)) - - @property - def block(self): - return self._block - - def set_block(self, block): - self._block = block - - @property - def state(self): - return self._state - - @property - def currency(self): - return self._peer.currency - - @property - def neighbours(self): - return self._neighbours - - @property - def uid(self): - return self._uid - - @property - def last_change(self): - return self._last_change - - @property - def software(self): - return self._software - - @property - def peer(self): - return self._peer - - @peer.setter - def peer(self, new_peer): - if self._peer != new_peer: - self._peer = new_peer - self.changed.emit() - - @software.setter - def software(self, new_soft): - if self._software != new_soft: - self._software = new_soft - self.changed.emit() - - @property - def version(self): - return self._version - - @version.setter - def version(self, new_version): - if self._version != new_version: - self._version = new_version - self.changed.emit() - - @last_change.setter - def last_change(self, val): - #logging.debug("{:} | Changed state : {:}".format(self.pubkey[:5], - # val)) - self._last_change = val - - @state.setter - def state(self, new_state): - #logging.debug("{:} | Last state : {:} / new state : {:}".format(self.pubkey[:5], - # self.state, new_state)) - - if self._state != new_state: - self.last_change = time.time() - self._state = new_state - self.changed.emit() - if new_state in (Node.OFFLINE, Node.ONLINE): - self.error.emit() - - @property - def fork_window(self): - return self._fork_window - - @fork_window.setter - def fork_window(self, new_fork_window): - if self._fork_window != new_fork_window: - self._fork_window = new_fork_window - self.changed.emit() - - def refresh(self, manual=False): - """ - Refresh all data of this node - :param bool manual: True if the refresh was manually initiated - """ - if not self._ws_tasks['block']: - self._ws_tasks['block'] = asyncio.ensure_future(self.connect_current_block()) - - if not self._ws_tasks['peer']: - self._ws_tasks['peer'] = asyncio.ensure_future(self.connect_peers()) - - if manual: - asyncio.ensure_future(self.request_peers()) - - if self._refresh_counter % 20 == 0 or manual: - self.refresh_informations() - self.refresh_uid() - self.refresh_summary() - self._refresh_counter = self._refresh_counter if manual else 1 - else: - self._refresh_counter += 1 - - async def connect_current_block(self): - """ - Connects to the websocket entry point of the node - If the connection fails, it tries the fallback mode on HTTP GET - """ - if not self._connected['block']: - try: - conn_handler = self.endpoint.conn_handler() - block_websocket = bma.ws.Block(conn_handler) - ws_connection = block_websocket.connect(self._session) - async with ws_connection as ws: - self._connected['block'] = True - logging.debug("Connected successfully to block ws : {0}".format(self.pubkey[:5])) - async for msg in ws: - if msg.tp == aiohttp.MsgType.text: - logging.debug("Received a block : {0}".format(self.pubkey[:5])) - block_data = block_websocket.parse_text(msg.data) - await self.refresh_block(block_data) - elif msg.tp == aiohttp.MsgType.closed: - break - elif msg.tp == aiohttp.MsgType.error: - break - except (WSServerHandshakeError, WSClientDisconnectedError, ClientResponseError, ValueError) as e: - logging.debug("Websocket block {0} : {1} - {2}".format(type(e).__name__, str(e), self.pubkey[:5])) - await self.request_current_block() - except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - finally: - self._connected['block'] = False - self._ws_tasks['block'] = None - - async def request_current_block(self): - """ - Request a node on the HTTP GET interface - If an error occurs, the node is considered offline - """ - try: - conn_handler = self.endpoint.conn_handler() - block_data = await bma.blockchain.Current(conn_handler).get(self._session) - await self.refresh_block(block_data) - except errors.DuniterError as e: - if e.ucode == errors.BLOCK_NOT_FOUND: - self.main_chain_previous_block = None - self.set_block(None) - else: - self.state = Node.OFFLINE - logging.debug("Error in block reply : {0}".format(self.pubkey[:5])) - logging.debug(str(e)) - self.changed.emit() - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: - logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - - async def refresh_block(self, block_data): - """ - Refresh the blocks of this node - :param dict block_data: The block data in json format - """ - conn_handler = self.endpoint.conn_handler() - - logging.debug("Requesting {0}".format(conn_handler)) - block_hash = block_data['hash'] - self.state = Node.ONLINE - - if not self.block or block_hash != self.block['hash']: - try: - if self.block: - self.main_chain_previous_block = await bma.blockchain.Block(conn_handler, - self.block['number']).get(self._session) - except errors.DuniterError as e: - if e.ucode == errors.BLOCK_NOT_FOUND: - self.main_chain_previous_block = None - else: - self.state = Node.OFFLINE - logging.debug("Error in previous block reply : {0}".format(self.pubkey[:5])) - logging.debug(str(e)) - self.changed.emit() - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: - logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - finally: - self.set_block(block_data) - logging.debug("Changed block {0} -> {1}".format(self.block['number'], - block_data['number'])) - self.changed.emit() - - @asyncify - async def refresh_informations(self): - """ - Refresh basic information (pubkey and currency) - """ - conn_handler = self.endpoint.conn_handler() - - try: - peering_data = await bma.network.Peering(conn_handler).get(self._session) - node_pubkey = peering_data["pubkey"] - node_currency = peering_data["currency"] - self.state = Node.ONLINE - - if peering_data['raw'] != self.peer.raw(): - peer = Peer.from_signed_raw("{0}{1}\n".format(peering_data['raw'], peering_data['signature'])) - if peer.blockUID.number > peer.blockUID.number: - self.peer = Peer.from_signed_raw("{0}{1}\n".format(peering_data['raw'], peering_data['signature'])) - - if node_pubkey != self.pubkey: - self._pubkey = node_pubkey - self.identity_changed.emit() - - if node_currency != self.currency: - self.state = Node.CORRUPTED - logging.debug("Change : new state corrupted") - self.changed.emit() - - except errors.DuniterError as e: - if e.ucode == errors.PEER_NOT_FOUND: - logging.debug("Error in peering reply : {0}".format(str(e))) - self.state = Node.OFFLINE - self.changed.emit() - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) - self.state = Node.OFFLINE - except (MalformedDocumentError, jsonschema.ValidationError) as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - - @asyncify - async def refresh_summary(self): - """ - Refresh the summary of this node - """ - conn_handler = self.endpoint.conn_handler() - - try: - summary_data = await bma.node.Summary(conn_handler).get(self._session) - self.software = summary_data["duniter"]["software"] - self.version = summary_data["duniter"]["version"] - self.state = Node.ONLINE - if "forkWindowSize" in summary_data["duniter"]: - self.fork_window = summary_data["duniter"]["forkWindowSize"] - else: - self.fork_window = 0 - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - - @asyncify - async def refresh_uid(self): - """ - Refresh the node UID - """ - conn_handler = self.endpoint.conn_handler() - try: - data = await bma.wot.Lookup(conn_handler, self.pubkey).get(self._session) - self.state = Node.ONLINE - timestamp = BlockUID.empty() - uid = "" - for result in data['results']: - if result["pubkey"] == self.pubkey: - uids = result['uids'] - for uid in uids: - if BlockUID.from_str(uid["meta"]["timestamp"]) >= timestamp: - timestamp = uid["meta"]["timestamp"] - uid = uid["uid"] - if self._uid != uid: - self._uid = uid - self.identity_changed.emit() - except errors.DuniterError as e: - if e.ucode == errors.NO_MATCHING_IDENTITY: - logging.debug("UID not found : {0}".format(self.pubkey[:5])) - else: - logging.debug("error in uid reply : {0}".format(self.pubkey[:5])) - self.state = Node.OFFLINE - self.identity_changed.emit() - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - - async def connect_peers(self): - """ - Connects to the peer websocket entry point - If the connection fails, it tries the fallback mode on HTTP GET - """ - if not self._connected['peer']: - try: - conn_handler = self.endpoint.conn_handler() - peer_websocket = bma.ws.Peer(conn_handler) - ws_connection = peer_websocket.connect(self._session) - async with ws_connection as ws: - self._connected['peer'] = True - logging.debug("Connected successfully to peer ws : {0}".format(self.pubkey[:5])) - async for msg in ws: - if msg.tp == aiohttp.MsgType.text: - logging.debug("Received a peer : {0}".format(self.pubkey[:5])) - peer_data = peer_websocket.parse_text(msg.data) - self.refresh_peer_data(peer_data) - elif msg.tp == aiohttp.MsgType.closed: - break - elif msg.tp == aiohttp.MsgType.error: - break - except (WSServerHandshakeError, WSClientDisconnectedError, ClientResponseError, ValueError) as e: - logging.debug("Websocket peer {0} : {1} - {2}".format(type(e).__name__, str(e), self.pubkey[:5])) - await self.request_peers() - except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(str(e), self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - finally: - self._connected['peer'] = False - self._ws_tasks['peer'] = None - - async def request_peers(self): - """ - Refresh the list of peers knew by this node - """ - conn_handler = self.endpoint.conn_handler() - - try: - peers_data = await bma.network.peering.Peers(conn_handler).get(leaves='true', session=self._session) - self.state = Node.ONLINE - if peers_data['root'] != self._last_merkle['root']: - leaves = [leaf for leaf in peers_data['leaves'] - if leaf not in self._last_merkle['leaves']] - for leaf_hash in leaves: - try: - leaf_data = await bma.network.peering.Peers(conn_handler).get(leaf=leaf_hash, - session=self._session) - self.refresh_peer_data(leaf_data['leaf']['value']) - except (AttributeError, ValueError, errors.DuniterError) as e: - logging.debug("{pubkey} : Incorrect peer data in {leaf}".format(pubkey=self.pubkey[:5], - leaf=leaf_hash)) - self.state = Node.OFFLINE - self.changed.emit() - except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey[:5])) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey[:5])) - self.state = Node.CORRUPTED - self._last_merkle = {'root' : peers_data['root'], - 'leaves': peers_data['leaves']} - except errors.DuniterError as e: - if e.ucode == errors.PEER_NOT_FOUND: - logging.debug("Error in peers reply") - self.state = Node.OFFLINE - self.changed.emit() - except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: - logging.debug("{0} : {1}".format(type(e).__name__, self.pubkey)) - self.state = Node.OFFLINE - except jsonschema.ValidationError as e: - logging.debug(str(e)) - logging.debug("Validation error : {0}".format(self.pubkey)) - self.state = Node.CORRUPTED - - def refresh_peer_data(self, peer_data): - if "raw" in peer_data: - try: - str_doc = "{0}{1}\n".format(peer_data['raw'], - peer_data['signature']) - peer_doc = Peer.from_signed_raw(str_doc) - self.neighbour_found.emit(peer_doc) - except MalformedDocumentError as e: - logging.debug(str(e)) - else: - logging.debug("Incorrect leaf reply") - - def __str__(self): - return ','.join([str(self.pubkey), str(self.endpoint.server), - str(self.endpoint.ipv4), str(self.endpoint.port), - str(self.block['number'] if self.block else "None"), - str(self.currency), str(self.state), str(self.neighbours)]) diff --git a/src/sakia/data/connectors/__init__.py b/src/sakia/data/connectors/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..50431678702ffa85b431cf7b1cab28a519252f7d 100644 --- a/src/sakia/data/connectors/__init__.py +++ b/src/sakia/data/connectors/__init__.py @@ -0,0 +1,2 @@ +from .node import NodeConnector +from .bma import BmaConnector diff --git a/src/sakia/data/connectors/bma.py b/src/sakia/data/connectors/bma.py index acb272a6f0aed815d5ce1a8d84c02a0d87135b98..c7e4415b2802c4e29cebab1ec82e17b6d6efc659 100644 --- a/src/sakia/data/connectors/bma.py +++ b/src/sakia/data/connectors/bma.py @@ -48,7 +48,7 @@ class BmaConnector: :param dict get_args: Arguments to pass to the request __get__ method :return: The returned data """ - nodes = self.filter_nodes(request, self._network.synced_nodes) + nodes = self.filter_nodes(request, self._nodes_processor.synced_nodes) if len(nodes) > 0: tries = 0 json_data = None @@ -81,9 +81,9 @@ class BmaConnector: .. note:: If one node accept the requests (returns 200), the broadcast should be considered accepted by the network. """ - nodes = random.sample(self._network.synced_nodes, 6) \ - if len(self._network.synced_nodes) > 6 \ - else self._network.synced_nodes + nodes = random.sample(self._nodes_processor.synced_nodes, 6) \ + if len(self._nodes_processor.synced_nodes) > 6 \ + else self._nodes_processor.synced_nodes replies = [] if len(nodes) > 0: for node in nodes: @@ -92,7 +92,6 @@ class BmaConnector: req = request(conn_handler, **req_args) reply = asyncio.ensure_future(req.post(**post_args, session=self._network.session)) replies.append(reply) - self._invalidate_cache(request) else: raise NoPeerAvailable("", len(nodes)) diff --git a/src/sakia/data/connectors/node.py b/src/sakia/data/connectors/node.py new file mode 100644 index 0000000000000000000000000000000000000000..e8e9c7b7e8286beca2ce3edf2d5116286d0e837a --- /dev/null +++ b/src/sakia/data/connectors/node.py @@ -0,0 +1,366 @@ +from duniterpy.documents.peer import Peer +from duniterpy.documents import BlockUID, MalformedDocumentError, BMAEndpoint, block_uid +from ...tools.decorators import asyncify +from duniterpy.api import bma, errors + +from aiohttp.errors import WSClientDisconnectedError, WSServerHandshakeError, ClientResponseError +from aiohttp.errors import ClientError, DisconnectedError +from asyncio import TimeoutError +import logging +import jsonschema +import asyncio +import aiohttp +from socket import gaierror + +from PyQt5.QtCore import QObject, pyqtSignal + +from sakia.errors import InvalidNodeCurrency +from ..entities.node import Node + + +class NodeConnector(QObject): + """ + A node is a peer send from the client point of view. + """ + changed = pyqtSignal() + error = pyqtSignal() + identity_changed = pyqtSignal() + neighbour_found = pyqtSignal(Peer) + + def __init__(self, node, session): + """ + Constructor + """ + super().__init__() + self.node = node + self._ws_tasks = {'block': None, + 'peer': None} + self._connected = {'block': False, + 'peer': False} + self._session = session + self._refresh_counter = 1 + + def __del__(self): + for ws in self._ws_tasks.values(): + if ws: + ws.cancel() + + @property + def session(self): + return self._session + + @classmethod + def from_peer(cls, currency, peer, session): + """ + Factory method to get a node from a peer document. + :param str currency: The node currency. None if we don't know\ + the currency it should have, for example if its the first one we add + :param peer: The peer document + :return: A new node + :rtype: sakia.core.net.Node + """ + if currency is not None: + if peer.currency != currency: + raise InvalidNodeCurrency(peer.currency, currency) + + node = Node(peer.currency, peer.pubkey, peer.endpoints, peer.blockUID) + logging.debug("Node from peer : {:}".format(str(node))) + + return cls(node, session) + + async def _safe_request(self, endpoint, request, req_args={}, get_args={}): + try: + conn_handler = endpoint.conn_handler() + data = await request(conn_handler, **req_args).get(self._session, **get_args) + return data + except (ClientError, gaierror, TimeoutError, DisconnectedError, ValueError) as e: + logging.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + except jsonschema.ValidationError as e: + logging.debug(str(e)) + logging.debug("Validation error : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.CORRUPTED + + async def close_ws(self): + for ws in self._ws_tasks.values(): + if ws: + ws.cancel() + await asyncio.sleep(0) + closed = False + while not closed: + for ws in self._ws_tasks.values(): + if ws: + closed = False + break + else: + closed = True + await asyncio.sleep(0) + await asyncio.sleep(0) + + def refresh(self, manual=False): + """ + Refresh all data of this node + :param bool manual: True if the refresh was manually initiated + """ + if not self._ws_tasks['block']: + self._ws_tasks['block'] = asyncio.ensure_future(self.connect_current_block()) + + if not self._ws_tasks['peer']: + self._ws_tasks['peer'] = asyncio.ensure_future(self.connect_peers()) + + if manual: + asyncio.ensure_future(self.request_peers()) + + if self._refresh_counter % 20 == 0 or manual: + self.refresh_uid() + self.refresh_summary() + self._refresh_counter = self._refresh_counter if manual else 1 + else: + self._refresh_counter += 1 + + async def connect_current_block(self): + """ + Connects to the websocket entry point of the node + If the connection fails, it tries the fallback mode on HTTP GET + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + if not self._connected['block']: + try: + conn_handler = endpoint.conn_handler() + block_websocket = bma.ws.Block(conn_handler) + ws_connection = block_websocket.connect(self._session) + async with ws_connection as ws: + self._connected['block'] = True + logging.debug("Connected successfully to block ws : {0}" + .format(self.node.pubkey[:5])) + async for msg in ws: + if msg.tp == aiohttp.MsgType.text: + logging.debug("Received a block : {0}".format(self.node.pubkey[:5])) + block_data = block_websocket.parse_text(msg.data) + await self.refresh_block(block_data) + elif msg.tp == aiohttp.MsgType.closed: + break + elif msg.tp == aiohttp.MsgType.error: + break + except (WSServerHandshakeError, WSClientDisconnectedError, + ClientResponseError, ValueError) as e: + logging.debug("Websocket block {0} : {1} - {2}" + .format(type(e).__name__, str(e), self.node.pubkey[:5])) + await self.request_current_block() + except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: + logging.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + except jsonschema.ValidationError as e: + logging.debug(str(e)) + logging.debug("Validation error : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.CORRUPTED + finally: + self.changed.emit() + self._connected['block'] = False + self._ws_tasks['block'] = None + + async def request_current_block(self): + """ + Request a node on the HTTP GET interface + If an error occurs, the node is considered offline + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + try: + block_data = await self._safe_request(endpoint, bma.blockchain.Current, self._session) + await self.refresh_block(block_data) + return # Do not try any more endpoint + except errors.DuniterError as e: + if e.ucode == errors.BLOCK_NOT_FOUND: + self.node.previous_buid = BlockUID.empty() + else: + self.node.state = Node.OFFLINE + logging.debug("Error in block reply of {0} : {1}}".format(self.node.pubkey[:5], str(e))) + finally: + self.changed.emit() + else: + logging.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.changed.emit() + + async def refresh_block(self, block_data): + """ + Refresh the blocks of this node + :param dict block_data: The block data in json format + """ + self.node.state = Node.ONLINE + if not self.node.current_buid or self.node.current_buid.sha_hash != block_data['hash']: + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + conn_handler = endpoint.conn_handler() + logging.debug("Requesting {0}".format(conn_handler)) + try: + previous_block = await self._safe_request(endpoint, bma.blockchain.Block, + req_args={'number': self.node.current_buid.number}) + self.node.previous_buid = BlockUID(previous_block['number'], previous_block['hash']) + return # Do not try any more endpoint + except errors.DuniterError as e: + if e.ucode == errors.BLOCK_NOT_FOUND: + self.node.previous_buid = BlockUID.empty() + self.node.current_buid = BlockUID.empty() + else: + self.node.state = Node.OFFLINE + logging.debug("Error in previous block reply of {0} : {1}".format(self.node.pubkey[:5], str(e))) + finally: + self.node.current_buid = BlockUID(block_data['number'], block_data['hash']) + logging.debug("Changed block {0} -> {1}".format(self.node.current_buid.number, + block_data['number'])) + self.changed.emit() + else: + logging.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.changed.emit() + + @asyncify + async def refresh_summary(self): + """ + Refresh the summary of this node + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + try: + summary_data = await self._safe_request(endpoint, bma.node.Summary) + self.node.software = summary_data["duniter"]["software"] + self.node.version = summary_data["duniter"]["version"] + self.node.state = Node.ONLINE + return # Break endpoints loop + except errors.DuniterError as e: + self.node.state = Node.OFFLINE + logging.debug("Error in summary of {0} : {1}".format(self.node.pubkey[:5], str(e))) + finally: + self.changed.emit() + else: + logging.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.changed.emit() + + @asyncify + async def refresh_uid(self): + """ + Refresh the node UID + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + try: + data = await self._safe_request(endpoint, bma.wot.Lookup, + req_args={'search':self.node.pubkey}, + get_args={}) + self.node.state = Node.ONLINE + timestamp = BlockUID.empty() + uid = "" + for result in data['results']: + if result["pubkey"] == self.node.pubkey: + uids = result['uids'] + for uid in uids: + if BlockUID.from_str(uid["meta"]["timestamp"]) >= timestamp: + timestamp = uid["meta"]["timestamp"] + uid = uid["uid"] + if self.node.uid != uid: + self.node.uid = uid + self.identity_changed.emit() + except errors.DuniterError as e: + if e.ucode == errors.NO_MATCHING_IDENTITY: + logging.debug("UID not found : {0}".format(self.node.pubkey[:5])) + else: + logging.debug("error in uid reply : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.identity_changed.emit() + else: + logging.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.changed.emit() + + async def connect_peers(self): + """ + Connects to the peer websocket entry point + If the connection fails, it tries the fallback mode on HTTP GET + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + if not self._connected['peer']: + try: + conn_handler = endpoint.conn_handler() + peer_websocket = bma.ws.Peer(conn_handler) + ws_connection = peer_websocket.connect(self._session) + async with ws_connection as ws: + self._connected['peer'] = True + logging.debug("Connected successfully to peer ws : {0}".format(self.node.pubkey[:5])) + async for msg in ws: + if msg.tp == aiohttp.MsgType.text: + logging.debug("Received a peer : {0}".format(self.node.pubkey[:5])) + peer_data = peer_websocket.parse_text(msg.data) + self.refresh_peer_data(peer_data) + elif msg.tp == aiohttp.MsgType.closed: + break + elif msg.tp == aiohttp.MsgType.error: + break + except (WSServerHandshakeError, WSClientDisconnectedError, + ClientResponseError, ValueError) as e: + logging.debug("Websocket peer {0} : {1} - {2}" + .format(type(e).__name__, str(e), self.node.pubkey[:5])) + await self.request_peers() + except (ClientError, gaierror, TimeoutError, DisconnectedError) as e: + logging.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + except jsonschema.ValidationError as e: + logging.debug(str(e)) + logging.debug("Validation error : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.CORRUPTED + finally: + self._connected['peer'] = False + self._ws_tasks['peer'] = None + self.changed.emit() + else: + logging.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.changed.emit() + + async def request_peers(self): + """ + Refresh the list of peers knew by this node + """ + for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]: + try: + peers_data = await self._safe_request(endpoint, bma.network.peering.Peers, + get_args={'leaves': 'true'}) + self.node.state = Node.ONLINE + if peers_data['root'] != self.node.merkle_peers_root: + leaves = [leaf for leaf in peers_data['leaves'] + if leaf not in self.node.merkle_peers_leaves] + for leaf_hash in leaves: + try: + leaf_data = await self._safe_request(endpoint, + bma.network.peering.Peers, + get_args={'leaf': leaf_hash}) + self.refresh_peer_data(leaf_data['leaf']['value']) + except (AttributeError, ValueError, errors.DuniterError) as e: + logging.debug("{pubkey} : Incorrect peer data in {leaf}" + .format(pubkey=self.node.pubkey[:5], + leaf=leaf_hash)) + self.node.state = Node.OFFLINE + finally: + self.changed.emit() + self.node.merkle_peers_root = tuple(peers_data['root']) + self.node.merkle_peers_leaves = tuple(peers_data['leaves']) + return # Break endpoints loop + except errors.DuniterError as e: + logging.debug("Error in peers reply : {0}".format(str(e))) + self.node.state = Node.OFFLINE + finally: + self.changed.emit() + else: + logging.debug("Could not connect to any BMA endpoint : {0}".format(self.node.pubkey[:5])) + self.node.state = Node.OFFLINE + self.changed.emit() + + def refresh_peer_data(self, peer_data): + if "raw" in peer_data: + try: + str_doc = "{0}{1}\n".format(peer_data['raw'], + peer_data['signature']) + peer_doc = Peer.from_signed_raw(str_doc) + self.neighbour_found.emit(peer_doc) + except MalformedDocumentError as e: + logging.debug(str(e)) + else: + logging.debug("Incorrect leaf reply") diff --git a/src/sakia/data/entities/node.py b/src/sakia/data/entities/node.py index 56eb3229f8eba72bc13292dca01f8ee06c91137b..27126a3a250fae70be1d460abd5f9a6c9bc6d9fd 100644 --- a/src/sakia/data/entities/node.py +++ b/src/sakia/data/entities/node.py @@ -1,11 +1,13 @@ import attr -import json from duniterpy.documents import block_uid, endpoint -def _list_of_endpoints(value): - if isinstance(value, list): +def _tuple_of_endpoints(value): + if isinstance(value, tuple): return value + elif isinstance(value, list): + l = [endpoint(e) for e in value] + return tuple(l) elif isinstance(value, str): list_of_str = value.split('\n') conv = [] @@ -16,29 +18,60 @@ def _list_of_endpoints(value): raise TypeError("Can't convert {0} to list of endpoints".format(value)) -def _merkle_nodes(value): - if isinstance(value, dict): - return value - elif isinstance(value, str): - return json.loads(value) - else: - raise TypeError("Can't convert {0} to merkle nodes tree".format(value)) +def _tuple_of_hashes(ls): + if isinstance(ls, tuple): + return ls + elif isinstance(ls, list): + return tuple([str(a) for a in ls]) + elif isinstance(ls, str): + if ls: # if string is not empty + return tuple([str(a) for a in ls.split('\n')]) + else: + return tuple() @attr.s() class Node: + """ + + A node can have multiple states : + - ONLINE : The node is available for requests + - OFFLINE: The node is disconnected + - DESYNCED : The node is online but is desynced from the network + - CORRUPTED : The node is corrupted, some weird behaviour is going on + """ + MERKLE_EMPTY_ROOT = "01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b" + ONLINE = 1 OFFLINE = 2 DESYNCED = 3 CORRUPTED = 4 + # The currency handled by this node currency = attr.ib(convert=str) + # The pubkey of the node pubkey = attr.ib(convert=str) - endpoints = attr.ib(convert=_list_of_endpoints, cmp=False) - current_buid = attr.ib(convert=block_uid, cmp=False) - previous_buid = attr.ib(convert=block_uid, cmp=False) - state = attr.ib(convert=int, cmp=False) - software = attr.ib(convert=str, cmp=False) - version = attr.ib(convert=str, cmp=False) - merkle_nodes = attr.ib(convert=_merkle_nodes, cmp=False) + # The endpoints of the node, in a list of Endpoint objects format + endpoints = attr.ib(convert=_tuple_of_endpoints, cmp=False) + # The previous block uid in /blockchain/current + peer_blockstamp = attr.ib(convert=block_uid, cmp=False) + # The uid of the owner of node + uid = attr.ib(convert=str, cmp=False, default="") + # The current block uid in /blockchain/current + current_buid = attr.ib(convert=block_uid, cmp=False, default=None) + # The previous block uid in /blockchain/current + previous_buid = attr.ib(convert=block_uid, cmp=False, default=None) + # The state of the node in Sakia + state = attr.ib(convert=int, cmp=False, default=OFFLINE) + # The version of the software in /node/summary + software = attr.ib(convert=str, cmp=False, default="") + # The version of the software in /node/summary + version = attr.ib(convert=str, cmp=False, default="") + # Root of the merkle peers tree, default = sha256 of empty string + merkle_peers_root = attr.ib(convert=str, cmp=False, + default=MERKLE_EMPTY_ROOT) + # Leaves of the merkle peers tree + merkle_peers_leaves = attr.ib(convert=_tuple_of_hashes, cmp=False, default=tuple()) + # Define if this node is a root node in Sakia + root = attr.ib(convert=bool, cmp=False, default=False) diff --git a/src/sakia/data/processors/__init__.py b/src/sakia/data/processors/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..462ae146b8924e925f1f786a876fd2f87f53de00 100644 --- a/src/sakia/data/processors/__init__.py +++ b/src/sakia/data/processors/__init__.py @@ -0,0 +1,3 @@ +from .nodes import NodesProcessor +from .communities import CommunityProcessor +from .identities import IdentityProcessor diff --git a/src/sakia/data/processors/nodes.py b/src/sakia/data/processors/nodes.py new file mode 100644 index 0000000000000000000000000000000000000000..d89a57ea9f2f176a81a82ae7476baf19d3844bbf --- /dev/null +++ b/src/sakia/data/processors/nodes.py @@ -0,0 +1,84 @@ +import attr +from ..entities import Node +from duniterpy.documents import BlockUID, endpoint +import logging + + +@attr.s +class NodesProcessor: + _currency = attr.ib(convert=str) + _repo = attr.ib() # :type sakia.data.repositories.NodesRepo + + def synced_nodes(self): + """ + Get nodes which are in the ONLINE state. + """ + return self._repo.get_all(**{'currency': self._currency, 'state': Node.ONLINE}) + + def online_nodes(self): + """ + Get nodes which are in the ONLINE state. + """ + return self._repo.get_all(**{'currency': self._currency, 'state': Node.ONLINE}) + \ + self._repo.get_all(**{'currency': self._currency, 'state': Node.DESYNCED}) + + def update_node(self, node): + """ + Update node in the repository. + First involves basic checks about pubkey and primary key constraints. + + :param sakia.data.entities.Node node: the node to update + """ + other_node = self._repo.get_one(**{'currency': self._currency, 'pubkey': node.pubkey}) + if other_node: + self._repo.update(node) + else: + self._repo.insert(node) + + def nodes(self): + """ + Get all knew nodes. + """ + return self._repo.get_all(**{'currency': self._currency}) + + def root_nodes(self): + """ + Get root nodes. + """ + return self._repo.get_all(**{'currency': self._currency, 'root': True}) + + def current_buid(self): + """ + Get the latest block considered valid + It is the most frequent last block of every known nodes + """ + blocks_uids = [n.current_buid for n in self.synced_nodes()] + if len(blocks_uids) > 0: + return blocks_uids[0] + else: + return BlockUID.empty() + + def quality(self): + """ + Get a ratio of the synced nodes vs the rest + """ + synced = len(self.synced_nodes()) + total = len(self.nodes()) + if total == 0: + ratio_synced = 0 + else: + ratio_synced = synced / total + return ratio_synced + + def update_peer(self, peer): + """ + Update the peer of a node + :param peer: + :return: + """ + node = self._repo.get_one(**{'pubkey': peer.pubkey, 'currency': self._currency}) + if node.peer_blockstamp < peer.blockUID: + logging.debug("Update node : {0}".format(peer.pubkey[:5])) + node.endpoints = tuple(peer.endpoints) + node.peer_blockstamp = peer.blockUID + self._repo.update(node) diff --git a/src/sakia/data/repositories/meta.sql b/src/sakia/data/repositories/meta.sql index f30c866d442537f026272a069cc02fd8dab6076f..4c9f3e1bea1367d2b0d2ffbe90b0e1c15388543a 100644 --- a/src/sakia/data/repositories/meta.sql +++ b/src/sakia/data/repositories/meta.sql @@ -77,14 +77,18 @@ CREATE TABLE IF NOT EXISTS transactions( -- NODES TABLE CREATE TABLE IF NOT EXISTS nodes( - currency VARCHAR(30), - pubkey VARCHAR(50), - endpoints TEXT, - current_buid VARCHAR(100), - previous_buid VARCHAR(100), - state INT, - software VARCHAR(100), - version VARCHAR(50), - merkle_nodes TEXT, + currency VARCHAR(30), + pubkey VARCHAR(50), + endpoints TEXT, + peer_buid VARCHAR(100), + uid VARCHAR(50), + current_buid VARCHAR(100), + previous_buid VARCHAR(100), + state INT, + software VARCHAR(100), + version VARCHAR(50), + merkle_peers_root VARCHAR(50), + merkle_peers_leaves TEXT, + root BOOLEAN, PRIMARY KEY (currency, pubkey) ); diff --git a/src/sakia/data/repositories/nodes.py b/src/sakia/data/repositories/nodes.py index 4a239864eeede8a9c3a68e3fec26f6107c24b630..d8770584041cc1452cae1369035c89059e59eae3 100644 --- a/src/sakia/data/repositories/nodes.py +++ b/src/sakia/data/repositories/nodes.py @@ -1,5 +1,4 @@ import attr -import json from ..entities import Node @@ -18,8 +17,6 @@ class NodesRepo: """ with self._conn: node_tuple = attr.astuple(node, tuple_factory=list) - node_tuple[2] = '\n'.join([e.inline() for e in node_tuple[2]]) - node_tuple[8] = json.dumps(node_tuple[8]) values = ",".join(['?'] * len(node_tuple)) self._conn.execute("INSERT INTO nodes VALUES ({0})".format(values), node_tuple) @@ -31,18 +28,20 @@ class NodesRepo: with self._conn: updated_fields = attr.astuple(node, tuple_factory=list, filter=attr.filters.exclude(*NodesRepo._primary_keys)) - updated_fields[0] = '\n'.join([e.inline() for e in updated_fields[0]]) - updated_fields[6] = json.dumps(updated_fields[6]) where_fields = attr.astuple(node, tuple_factory=list, filter=attr.filters.include(*NodesRepo._primary_keys)) self._conn.execute("""UPDATE nodes SET endpoints=?, + peer_buid=?, + uid=?, current_buid=?, previous_buid=?, state=?, software=?, version=?, - merkle_nodes=? + merkle_peers_root=?, + merkle_peers_leaves=?, + root=? WHERE currency=? AND pubkey=?""", @@ -58,6 +57,8 @@ class NodesRepo: filters = [] values = [] for k, v in search.items(): + if isinstance(v, bool): + v = int(v) filters.append("{k}=?".format(k=k)) values.append(v) @@ -78,7 +79,10 @@ class NodesRepo: filters = [] values = [] for k, v in search.items(): - value = v + if isinstance(v, bool): + value = int(v) + else: + value = v filters.append("{key} = ?".format(key=k)) values.append(value) diff --git a/src/sakia/errors.py b/src/sakia/errors.py index 9b8104d1d4ddb95de34b27c937b2f7cf9b6bc253..801e47dc5764ae7167520f919eb3383debe150ba 100644 --- a/src/sakia/errors.py +++ b/src/sakia/errors.py @@ -48,3 +48,16 @@ class NoPeerAvailable(Error): super() .__init__( "No peer answered in {0} community ({1} peers available)" .format(currency, nbpeers)) + + +class InvalidNodeCurrency(Error): + """ + Exception raised when a node doesn't use the intended currency + """ + def __init__(self, currency, node_currency): + """ + Constructor + """ + super() .__init__( + "Node is working for {0} currency, but should be {1}" + .format(node_currency, currency)) diff --git a/src/sakia/services/__init__.py b/src/sakia/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..391f3f96909275e47196b2f72237388596698ebd --- /dev/null +++ b/src/sakia/services/__init__.py @@ -0,0 +1 @@ +from .network import NetworkService \ No newline at end of file diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py new file mode 100644 index 0000000000000000000000000000000000000000..374cf2356ae7f5fd5f3f95531ec4128ae23e2806 --- /dev/null +++ b/src/sakia/services/network.py @@ -0,0 +1,244 @@ +""" +Created on 24 févr. 2015 + +@author: inso +""" +from sakia.data.connectors import NodeConnector +from sakia.data.entities import Node +from sakia.errors import InvalidNodeCurrency +from sakia.tools.decorators import asyncify +import logging +import time +import asyncio +from duniterpy.key import VerifyingKey +from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QTimer +from collections import Counter + +MAX_CONFIRMATIONS = 6 + + +class NetworkService(QObject): + """ + A network is managing nodes polling and crawling of a + given community. + """ + nodes_changed = pyqtSignal() + root_nodes_changed = pyqtSignal() + new_block_mined = pyqtSignal(int) + blockchain_rollback = pyqtSignal(int) + + def __init__(self, currency, repo, processor, connectors, session): + """ + Constructor of a network + + :param str currency: The currency name of the community + :param sakia.data.repositories.NodesRepository repo: the nodes repository + :param sakia.data.processors.NodesProcessor processor: the nodes processor for given currency + :param list connectors: The connectors to nodes of the network + :param aiohttp.ClientSession session: The main aiohttp client session + """ + super().__init__() + self._repo = repo + self._processor = processor + self._connectors = [] + for c in connectors: + self.add_connector(c) + self.currency = currency + self._must_crawl = False + self._block_found = self._processor.current_buid() + self._client_session = session + self._discovery_stack = [] + + @classmethod + def create(cls, repo, processor, node_connector): + """ + Create a new network with one knew node + Crawls the nodes from the first node to build the + community network + + :param node_connector: The first connector of the network service + """ + connectors = [node_connector] + repo.insert(node_connector.node) + network = cls(node_connector.node.currency, repo, processor, connectors, node_connector.session) + return network + + def start_coroutines(self): + """ + Start network nodes crawling + :return: + """ + asyncio.ensure_future(self.discover_network()) + + async def stop_coroutines(self, closing=False): + """ + Stop network nodes crawling. + """ + self._must_crawl = False + close_tasks = [] + logging.debug("Start closing") + for connector in self._connectors: + close_tasks.append(asyncio.ensure_future(connector.close_ws())) + logging.debug("Closing {0} websockets".format(len(close_tasks))) + if len(close_tasks) > 0: + await asyncio.wait(close_tasks, timeout=15) + if closing: + logging.debug("Closing client session") + await self._client_session.close() + logging.debug("Closed") + + @property + def session(self): + return self._client_session + + def continue_crawling(self): + return self._must_crawl + + def _check_nodes_sync(self): + """ + Check nodes sync with the following rules : + 1 : The block of the majority + 2 : The more last different issuers + 3 : The more difficulty + 4 : The biggest number or timestamp + """ + online_nodes = self._processor.online_nodes() + # rule number 1 : block of the majority + blocks = [n.current_buid.sha_hash for n in online_nodes if n.current_buid.sha_hash] + blocks_occurences = Counter(blocks) + blocks_by_occurences = {} + for key, value in blocks_occurences.items(): + the_block = [n.current_buid.sha_hash + for n in online_nodes if n.current_buid.sha_hash == key][0] + if value not in blocks_by_occurences: + blocks_by_occurences[value] = [the_block] + else: + blocks_by_occurences[value].append(the_block) + + if len(blocks_by_occurences) == 0: + for n in [n for n in online_nodes if n.state in (Node.ONLINE, Node.DESYNCED)]: + n.state = Node.ONLINE + self._processor.update_node(n) + return + + most_present = max(blocks_by_occurences.keys()) + + synced_block_hash = blocks_by_occurences[most_present][0] + + for n in online_nodes: + if n.current_buid.sha_hash == synced_block_hash: + n.state = Node.ONLINE + else: + n.state = Node.DESYNCED + self._processor.update_node(n) + + def add_connector(self, node_connector): + """ + Add a nod to the network. + """ + self._connectors.append(node_connector) + node_connector.changed.connect(self.handle_change) + node_connector.error.connect(self.handle_error) + node_connector.identity_changed.connect(self.handle_identity_change) + node_connector.neighbour_found.connect(self.handle_new_node) + logging.debug("{:} connected".format(node_connector.node.pubkey[:5])) + + @asyncify + async def refresh_once(self): + for connector in self._connectors: + await asyncio.sleep(1) + connector.refresh(manual=True) + + async def discover_network(self): + """ + Start crawling which never stops. + To stop this crawling, call "stop_crawling" method. + """ + self._must_crawl = True + first_loop = True + asyncio.ensure_future(self.discovery_loop()) + while self.continue_crawling(): + for connector in self._connectors: + if self.continue_crawling(): + connector.refresh() + if not first_loop: + await asyncio.sleep(15) + first_loop = False + await asyncio.sleep(15) + + logging.debug("End of network discovery") + + async def discovery_loop(self): + """ + Handle poping of nodes in discovery stack + :return: + """ + while self.continue_crawling(): + try: + await asyncio.sleep(1) + peer = self._discovery_stack.pop() + pubkeys = [n.pubkey for n in self._processor.nodes()] + if peer.pubkey not in pubkeys: + logging.debug("New node found : {0}".format(peer.pubkey[:5])) + try: + connector = NodeConnector.from_peer(self.currency, peer, self.session) + self._repo.insert(connector.node) + connector.refresh(manual=True) + self.add_connector(connector) + self.nodes_changed.emit() + except InvalidNodeCurrency as e: + logging.debug(str(e)) + else: + self._processor.update_peer(peer) + except IndexError: + await asyncio.sleep(2) + + def handle_new_node(self, peer): + key = VerifyingKey(peer.pubkey) + if key.verify_document(peer): + if len(self._discovery_stack) < 1000 \ + and peer.signatures[0] not in [p.signatures[0] for p in self._discovery_stack]: + logging.debug("Stacking new peer document : {0}".format(peer.pubkey)) + self._discovery_stack.append(peer) + else: + logging.debug("Wrong document received : {0}".format(peer.signed_raw())) + + @pyqtSlot() + def handle_identity_change(self): + connector = self.sender() + self._repo.update(connector.node) + self.nodes_changed.emit() + + @pyqtSlot() + def handle_error(self): + node = self.sender() + if node.state in (Node.OFFLINE, Node.CORRUPTED) and \ + node.last_change + 3600 < time.time(): + node.disconnect() + self.nodes.remove(node) + self.nodes_changed.emit() + + @pyqtSlot() + def handle_change(self): + node_connector = self.sender() + + if node_connector.node.state in (Node.ONLINE, Node.DESYNCED): + self._check_nodes_sync() + self.nodes_changed.emit() + self._repo.update(node_connector.node) + + if node_connector.node.state == Node.ONLINE: + current_buid = self._processor.current_buid() + logging.debug("{0} -> {1}".format(self._block_found.sha_hash[:10], current_buid.sha_hash[:10])) + if self._block_found.sha_hash != current_buid.sha_hash: + logging.debug("Latest block changed : {0}".format(current_buid.number)) + # If new latest block is lower than the previously found one + # or if the previously found block is different locally + # than in the main chain, we declare a rollback + if current_buid <= self._block_found \ + or node_connector.node.previous_buid != self._block_found: + self._block_found = current_buid + self.blockchain_rollback.emit(current_buid.number) + else: + self._block_found = current_buid + self.new_block_mined.emit(current_buid.number) diff --git a/src/sakia/core/net/api/__init__.py b/src/sakia/tests/technical/__init__.py similarity index 100% rename from src/sakia/core/net/api/__init__.py rename to src/sakia/tests/technical/__init__.py diff --git a/src/sakia/tests/technical/test_network_service.py b/src/sakia/tests/technical/test_network_service.py new file mode 100644 index 0000000000000000000000000000000000000000..e86ec44f06366580302a574aded703eb496dd7bc --- /dev/null +++ b/src/sakia/tests/technical/test_network_service.py @@ -0,0 +1,52 @@ +import asyncio +import unittest +import sqlite3 +import aiohttp +from duniterpy.documents import BlockUID, Peer +from sakia.tests import QuamashTest +from sakia.services import NetworkService +from sakia.data.connectors import NodeConnector +from sakia.data.repositories import NodesRepo, MetaDatabase +from sakia.data.processors import NodesProcessor + + +class TestNetworkService(unittest.TestCase, QuamashTest): + def setUp(self): + self.setUpQuamash() + sqlite3.register_adapter(BlockUID, str) + sqlite3.register_adapter(bool, int) + sqlite3.register_adapter(list, lambda ls: '\n'.join([str(v) for v in ls])) + sqlite3.register_adapter(tuple, lambda ls: '\n'.join([str(v) for v in ls])) + sqlite3.register_converter("BOOLEAN", lambda v: bool(int(v))) + self.con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) + + def tearDown(self): + self.tearDownQuamash() + + def test_network_discovering(self): + meta_repo = MetaDatabase(self.con) + meta_repo.prepare() + meta_repo.upgrade_database() + nodes_repo = NodesRepo(self.con) + + async def exec_test(): + with aiohttp.ClientSession() as session: + peering = await session.get("http://peer.duniter.org:8999/network/peering") + self.assertEqual(peering.status, 200) + data = await peering.json() + peer_document = Peer.from_signed_raw("{0}{1}\n".format(data["raw"], data["signature"])) + node_connector = NodeConnector.from_peer(peer_document.currency, peer_document, session) + + processor = NodesProcessor(peer_document.currency, nodes_repo) + network_service = NetworkService.create(nodes_repo, processor, node_connector) + + network_service._must_crawl = True + asyncio.ensure_future(network_service.discovery_loop()) + network_service.refresh_once() + await asyncio.sleep(20) + self.assertGreater(len(processor.nodes()), 1) + await asyncio.sleep(20) + self.assertGreater(len(processor.nodes()), 2) + await network_service.stop_coroutines(True) + + self.lp.run_until_complete(exec_test()) diff --git a/src/sakia/tests/unit/core/test_network.py b/src/sakia/tests/unit/core/test_network.py deleted file mode 100644 index 175062c5f3255ca4d711aceb8efd80405cb172d0..0000000000000000000000000000000000000000 --- a/src/sakia/tests/unit/core/test_network.py +++ /dev/null @@ -1,17 +0,0 @@ -import aiohttp -import unittest -from unittest.mock import PropertyMock -from asynctest import Mock, patch -from duniterpy.documents.block import BlockUID -from PyQt5.QtCore import QLocale -from sakia.core.net import Network -from sakia.tests import QuamashTest - - -class TestCommunity(unittest.TestCase, QuamashTest): - def setUp(self): - self.setUpQuamash() - QLocale.setDefault(QLocale("en_GB")) - - def tearDown(self): - self.tearDownQuamash() diff --git a/src/sakia/tests/unit/core/test_node.py b/src/sakia/tests/unit/core/test_node.py deleted file mode 100644 index fd07ca043bd73ee0fc14ee9bcb4741439655a825..0000000000000000000000000000000000000000 --- a/src/sakia/tests/unit/core/test_node.py +++ /dev/null @@ -1,104 +0,0 @@ -import unittest -from unittest.mock import Mock -from asynctest import CoroutineMock, patch -from duniterpy.documents import Peer, BlockUID -from PyQt5.QtCore import QLocale -from sakia.core.net import Node -from sakia.tests import QuamashTest -from sakia.tests.mocks.bma import nice_blockchain -from pkg_resources import parse_version - - -class TestNode(unittest.TestCase, QuamashTest): - def setUp(self): - self.setUpQuamash() - QLocale.setDefault(QLocale("en_GB")) - - def tearDown(self): - self.tearDownQuamash() - - def test_from_peer(self): - peer = Peer.from_signed_raw("""Version: 2 -Type: Peer -Currency: meta_brouzouf -PublicKey: 8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU -Block: 48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8 -Endpoints: -BASIC_MERKLED_API duniter.inso.ovh 80 -82o1sNCh1bLpUXU6nacbK48HBcA9Eu2sPkL1/3c2GtDPxBUZd2U2sb7DxwJ54n6ce9G0Oy7nd1hCxN3fS0oADw== -""") - node = Node.from_peer('meta_brouzouf', peer, Mock("aiohttp.ClientSession")) - self.assertEqual(node.pubkey, "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU") - self.assertEqual(node.endpoint.inline(), "BASIC_MERKLED_API duniter.inso.ovh 80") - self.assertEqual(node.currency, "meta_brouzouf") - - @patch('duniterpy.api.bma.network.Peering') - def test_from_address(self, peering): - peering.return_value.get = CoroutineMock(return_value={ - "version": 2, - "currency": "meta_brouzouf", - "endpoints": [ - "BASIC_MERKLED_API duniter.inso.ovh 80" - ], - "block": "48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8", - "signature": "82o1sNCh1bLpUXU6nacbK48HBcA9Eu2sPkL1/3c2GtDPxBUZd2U2sb7DxwJ54n6ce9G0Oy7nd1hCxN3fS0oADw==", - "raw": "Version: 2\nType: Peer\nCurrency: meta_brouzouf\nPublicKey: 8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU\nBlock: 48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8\nEndpoints:\nBASIC_MERKLED_API duniter.inso.ovh 80\n", - "pubkey": "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU" - }) - - async def exec_test(): - node = await Node.from_address("meta_brouzouf", "127.0.0.1", 9000, Mock("aiohttp.ClientSession")) - self.assertEqual(node.pubkey, "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU") - self.assertEqual(node.endpoint.inline(), "BASIC_MERKLED_API duniter.inso.ovh 80") - self.assertEqual(node.currency, "meta_brouzouf") - - self.lp.run_until_complete(exec_test()) - - def test_from_json_to_json(self): - json_data = {"version": "0.12.0", "state": 1, "fork_window": 0, "uid": "inso", - "block": nice_blockchain.bma_blockchain_current, - "peer": """Version: 2 -Type: Peer -Currency: meta_brouzouf -PublicKey: 8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU -Block: 48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8 -Endpoints: -BASIC_MERKLED_API duniter.inso.ovh 80 -82o1sNCh1bLpUXU6nacbK48HBcA9Eu2sPkL1/3c2GtDPxBUZd2U2sb7DxwJ54n6ce9G0Oy7nd1hCxN3fS0oADw== -""", - "pubkey": "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU", - "last_change": 1448199706.6561477, "software": "duniter"} - node = Node.from_json("meta_brouzouf", json_data, parse_version('0.12.0'), Mock("aiohttp.ClientSession")) - self.assertEqual(node.version, "0.12.0") - self.assertEqual(node.state, 1) - self.assertEqual(node.fork_window, 0) - self.assertEqual(node.uid, "inso") - self.assertEqual(node.block, nice_blockchain.bma_blockchain_current) - self.assertEqual(node.endpoint.inline(), "BASIC_MERKLED_API duniter.inso.ovh 80") - self.assertEqual(node.pubkey, "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU") - self.assertEqual(node.last_change, 1448199706.6561477) - self.assertEqual(node.currency, "meta_brouzouf") - self.assertEqual(node.peer.pubkey, "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU") - self.assertEqual(node.peer.blockUID.number, 48698) - self.assertEqual(node.peer.blockUID.sha_hash, "000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8") - - result = node.jsonify() - for key in result: - self.assertEqual(result[key], json_data[key], "Error with key {0}".format(key)) - - def test_jsonify_root_node(self): - peer = Peer.from_signed_raw("""Version: 2 -Type: Peer -Currency: meta_brouzouf -PublicKey: 8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU -Block: 48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8 -Endpoints: -BASIC_MERKLED_API duniter.inso.ovh 80 -82o1sNCh1bLpUXU6nacbK48HBcA9Eu2sPkL1/3c2GtDPxBUZd2U2sb7DxwJ54n6ce9G0Oy7nd1hCxN3fS0oADw== -""") - node = Node(peer, "inso", "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU", nice_blockchain.bma_blockchain_current, - Node.ONLINE, 1111111111, {}, "duniter", "0.12", 0, Mock("aiohttp.ClientSession")) - result = node.jsonify_root_node() - self.assertEqual(result['pubkey'], "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU") - self.assertEqual(result['uid'], "inso") - self.assertEqual(result['peer'], peer.signed_raw()) \ No newline at end of file diff --git a/src/sakia/tests/unit/data/test_node_connector.py b/src/sakia/tests/unit/data/test_node_connector.py new file mode 100644 index 0000000000000000000000000000000000000000..de9b3574c0815d0c3ddbbad59c64bca9f4cf3152 --- /dev/null +++ b/src/sakia/tests/unit/data/test_node_connector.py @@ -0,0 +1,30 @@ +import unittest +from unittest.mock import Mock +from duniterpy.documents import Peer +from PyQt5.QtCore import QLocale +from sakia.data.connectors import NodeConnector +from sakia.tests import QuamashTest + + +class TestNodeConnector(unittest.TestCase, QuamashTest): + def setUp(self): + self.setUpQuamash() + QLocale.setDefault(QLocale("en_GB")) + + def tearDown(self): + self.tearDownQuamash() + + def test_from_peer(self): + peer = Peer.from_signed_raw("""Version: 2 +Type: Peer +Currency: meta_brouzouf +PublicKey: 8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU +Block: 48698-000005E0F228038E4DDD4F6CA4ACB01EC88FBAF8 +Endpoints: +BASIC_MERKLED_API duniter.inso.ovh 80 +82o1sNCh1bLpUXU6nacbK48HBcA9Eu2sPkL1/3c2GtDPxBUZd2U2sb7DxwJ54n6ce9G0Oy7nd1hCxN3fS0oADw== +""") + connector = NodeConnector.from_peer('meta_brouzouf', peer, Mock("aiohttp.ClientSession")) + self.assertEqual(connector.node.pubkey, "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU") + self.assertEqual(connector.node.endpoints[0].inline(), "BASIC_MERKLED_API duniter.inso.ovh 80") + self.assertEqual(connector.node.currency, "meta_brouzouf") diff --git a/src/sakia/tests/unit/data/test_nodes_repo.py b/src/sakia/tests/unit/data/test_nodes_repo.py index 7c71910ce65562d6854105bf0f94a6139d219409..23dda28ae7b90e0f00b8d91a1624f6bd5dcd6fdc 100644 --- a/src/sakia/tests/unit/data/test_nodes_repo.py +++ b/src/sakia/tests/unit/data/test_nodes_repo.py @@ -9,6 +9,8 @@ class TestNodesRepo(unittest.TestCase): def setUp(self): sqlite3.register_adapter(BlockUID, str) sqlite3.register_adapter(bool, int) + sqlite3.register_adapter(list, lambda ls: '\n'.join([str(v) for v in ls])) + sqlite3.register_adapter(tuple, lambda ls: '\n'.join([str(v) for v in ls])) sqlite3.register_converter("BOOLEAN", lambda v: bool(int(v))) self.con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) @@ -20,17 +22,19 @@ class TestNodesRepo(unittest.TestCase): meta_repo.prepare() meta_repo.upgrade_database() nodes_repo = NodesRepo(self.con) - nodes_repo.insert(Node("testcurrency", - "7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ", - """BASIC_MERKLED_API test-net.duniter.fr 13.222.11.22 9201 + inserted = Node("testcurrency", + "7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ", + """BASIC_MERKLED_API test-net.duniter.fr 13.222.11.22 9201 BASIC_MERKLED_API testnet.duniter.org 80 UNKNOWNAPI some useless information""", - "15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", - "14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", - Node.ONLINE, - "duniter", - "0.30.17", - {})) + BlockUID.empty(), + "doe", + "15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", + "14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", + Node.ONLINE, + "duniter", + "0.30.17") + nodes_repo.insert(inserted) node = nodes_repo.get_one(currency="testcurrency", pubkey="7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ") self.assertEqual(node.currency, "testcurrency") @@ -43,7 +47,8 @@ UNKNOWNAPI some useless information""", self.assertEqual(node.state, Node.ONLINE) self.assertEqual(node.software, "duniter") self.assertEqual(node.version, "0.30.17") - self.assertEqual(node.merkle_nodes, {}) + self.assertEqual(node.merkle_peers_root, Node.MERKLE_EMPTY_ROOT) + self.assertEqual(node.merkle_peers_leaves, tuple()) nodes_repo.drop(node) node = nodes_repo.get_one(pubkey="7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ") @@ -59,21 +64,23 @@ UNKNOWNAPI some useless information""", """BASIC_MERKLED_API test-net.duniter.fr 13.222.11.22 9201 BASIC_MERKLED_API testnet.duniter.org 80 UNKNOWNAPI some useless information""", + BlockUID.empty(), + "doe", "15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", "14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", Node.ONLINE, "duniter", - "0.30.17", - {})) + "0.30.17")) nodes_repo.insert(Node("testcurrency", "FADxcH5LmXGmGFgdixSes6nWnC4Vb4pRUBYT81zQRhjn", "BASIC_MERKLED_API test-net.duniter.org 22.22.22.22 9201", + BlockUID.empty(), + "doe", "18-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", "12-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", Node.ONLINE, "duniter", - "0.30.2a5", - {})) + "0.30.2a5")) nodes = nodes_repo.get_all(currency="testcurrency") self.assertIn("testcurrency", [t.currency for t in nodes]) self.assertIn("7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ", [n.pubkey for n in nodes]) @@ -89,11 +96,12 @@ UNKNOWNAPI some useless information""", """BASIC_MERKLED_API test-net.duniter.fr 13.222.11.22 9201 BASIC_MERKLED_API testnet.duniter.org 80 UNKNOWNAPI some useless information""", + BlockUID.empty(), + "doe", "15-76543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", "14-AEFFCB00E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67", Node.ONLINE, - "duniter", - "0.30.17", {}) + "duniter") nodes_repo.insert(node) node.previous_buid = node.current_buid node.current_buid = "16-77543400E78B56CC21FB1DDC6CBAB24E0FACC9A798F5ED8736EA007F38617D67"