Skip to content
Snippets Groups Projects
Commit f782e818 authored by inso's avatar inso
Browse files

Use WS2P to detect consensus

parent 53553e5a
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,7 @@ class NodeConnector(QObject): ...@@ -34,6 +34,7 @@ class NodeConnector(QObject):
error = pyqtSignal() error = pyqtSignal()
identity_changed = pyqtSignal() identity_changed = pyqtSignal()
neighbour_found = pyqtSignal(Peer) neighbour_found = pyqtSignal(Peer)
block_found = pyqtSignal(BlockUID)
FAILURE_THRESHOLD = 3 FAILURE_THRESHOLD = 3
...@@ -183,14 +184,14 @@ class NodeConnector(QObject): ...@@ -183,14 +184,14 @@ class NodeConnector(QObject):
if msg.type == aiohttp.WSMsgType.TEXT: if msg.type == aiohttp.WSMsgType.TEXT:
self._logger.debug("Received a block") self._logger.debug("Received a block")
block_data = bma.parse_text(msg.data, bma.ws.WS_BLOCk_SCHEMA) block_data = bma.parse_text(msg.data, bma.ws.WS_BLOCk_SCHEMA)
await self.refresh_block(block_data) self.block_found.emit(BlockUID(block_data['number'], block_data['hash']))
elif msg.type == aiohttp.WSMsgType.CLOSED: elif msg.type == aiohttp.WSMsgType.CLOSED:
break break
elif msg.type == aiohttp.WSMsgType.ERROR: elif msg.type == aiohttp.WSMsgType.ERROR:
break break
except (aiohttp.WSServerHandshakeError, ValueError) as e: except (aiohttp.WSServerHandshakeError, ValueError) as e:
self._logger.debug("Websocket block {0} : {1}".format(type(e).__name__, str(e))) self._logger.debug("Websocket block {0} : {1}".format(type(e).__name__, str(e)))
await self.request_current_block() self.handle_failure()
except (ClientError, gaierror, TimeoutError) as e: except (ClientError, gaierror, TimeoutError) as e:
self._logger.debug("{0} : {1}".format(str(e), self.node.pubkey[:5])) self._logger.debug("{0} : {1}".format(str(e), self.node.pubkey[:5]))
self.handle_failure() self.handle_failure()
...@@ -211,102 +212,6 @@ class NodeConnector(QObject): ...@@ -211,102 +212,6 @@ class NodeConnector(QObject):
self._connected['block'] = False self._connected['block'] = False
self._ws_tasks['block'] = None self._ws_tasks['block'] = None
async def request_current_block(self):
"""
Request a node on the HTTP GET interface
If an error occurs, the node is considered offline
"""
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
try:
block_data = await self.safe_request(endpoint, bma.blockchain.current,
proxy=self._user_parameters.proxy())
if not block_data:
continue
await self.refresh_block(block_data)
return # Do not try any more endpoint
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
self.node.previous_buid = BlockUID.empty()
self.handle_success()
else:
self.change_state_and_emit(Node.CORRUPTED)
self._logger.debug("Error in block reply : {0}".format(str(e)))
else:
if self.session.closed:
pass
else:
self._logger.debug("Could not connect to any BMA endpoint")
self.handle_failure()
async def refresh_block(self, block_data):
"""
Refresh the blocks of this node
:param dict block_data: The block data in json format
"""
if not self.node.current_buid or self.node.current_buid.sha_hash != block_data['hash']:
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
conn_handler = next(endpoint.conn_handler(self.session,
proxy=self._user_parameters.proxy()))
self._logger.debug("Requesting {0}".format(conn_handler))
try:
previous_block = await self.safe_request(endpoint, bma.blockchain.block,
proxy=self._user_parameters.proxy(),
req_args={'number': self.node.current_buid.number})
if not previous_block:
continue
self.node.previous_buid = BlockUID(previous_block['number'], previous_block['hash'])
break # Do not try any more endpoint
except errors.DuniterError as e:
if e.ucode == errors.BLOCK_NOT_FOUND:
self.node.previous_buid = BlockUID.empty()
# we don't change state here
break
else:
self.change_state_and_emit(Node.CORRUPTED)
break
finally:
if self.node.current_buid != BlockUID(block_data['number'], block_data['hash']):
self.node.current_buid = BlockUID(block_data['number'], block_data['hash'])
self.node.current_ts = block_data['medianTime']
self._logger.debug("Changed block {0} -> {1}".format(self.node.current_buid.number,
block_data['number']))
self.changed.emit()
else:
if self.session.closed:
pass
else:
self._logger.debug("Could not connect to any BMA endpoint")
self.handle_failure()
else:
self.handle_success()
@asyncify
async def refresh_summary(self):
"""
Refresh the summary of this node
"""
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
try:
summary_data = await self.safe_request(endpoint, bma.node.summary,
proxy=self._user_parameters.proxy())
if not summary_data:
continue
self.node.software = summary_data["duniter"]["software"]
self.node.version = summary_data["duniter"]["version"]
self.node.state = Node.ONLINE
self.identity_changed.emit()
return # Break endpoints loop
except errors.DuniterError as e:
self._logger.debug("Error in summary : {:}".format(str(e)))
self.handle_failure()
else:
if self.session.closed:
pass
else:
self._logger.debug("Could not connect to any BMA endpoint")
self.handle_failure()
async def connect_peers(self): async def connect_peers(self):
""" """
Connects to the peer websocket entry point Connects to the peer websocket entry point
...@@ -418,6 +323,28 @@ class NodeConnector(QObject): ...@@ -418,6 +323,28 @@ class NodeConnector(QObject):
else: else:
self._logger.debug("Incorrect leaf reply") self._logger.debug("Incorrect leaf reply")
async def request_ws2p_heads(self):
"""
Refresh the list of peers knew by this node
"""
for endpoint in [e for e in self.node.endpoints if isinstance(e, BMAEndpoint)]:
try:
heads_data = await self.safe_request(endpoint, bma.network.heads,
proxy=self._user_parameters.proxy())
if not heads_data:
continue
self.node.state = Node.ONLINE
return heads_data # Break endpoints loop
except errors.DuniterError as e:
self._logger.debug("Error in peers reply : {0}".format(str(e)))
self.handle_failure()
else:
if self.session.closed:
pass
else:
self._logger.debug("Could not connect to any BMA endpoint")
self.handle_failure()
def handle_success(self): def handle_success(self):
self.failure_count = 0 self.failure_count = 0
self.change_state_and_emit(Node.ONLINE) self.change_state_and_emit(Node.ONLINE)
......
...@@ -131,6 +131,24 @@ class NodesProcessor: ...@@ -131,6 +131,24 @@ class NodesProcessor:
ratio_synced = synced / total ratio_synced = synced / total
return ratio_synced return ratio_synced
def update_ws2p(self, currency, head):
"""
Update the peer of a node
:param str currency: the currency of the peer
:param head:
:return:
"""
node = self._repo.get_one(pubkey=head.pubkey, currency=currency)
if node:
if node.current_buid < head.blockstamp:
logging.debug("Update node : {0}".format(head.pubkey[:5]))
node.previous_buid = node.current_buid
node.current_buid = head.blockstamp
node.state = Node.ONLINE
self._repo.update(node)
return node, True
return node, False
def update_peer(self, currency, peer): def update_peer(self, currency, peer):
""" """
Update the peer of a node Update the peer of a node
......
...@@ -31,8 +31,7 @@ class NetworkFilterProxyModel(QSortFilterProxyModel): ...@@ -31,8 +31,7 @@ class NetworkFilterProxyModel(QSortFilterProxyModel):
left_data = int(left_data.split('\n')[0]) if left_data != '' else 0 left_data = int(left_data.split('\n')[0]) if left_data != '' else 0
right_data = int(right_data.split('\n')[0]) if right_data != '' else 0 right_data = int(right_data.split('\n')[0]) if right_data != '' else 0
if left.column() in (NetworkTableModel.columns_types.index('current_block'), if left.column() == NetworkTableModel.columns_types.index('current_block'):
NetworkTableModel.columns_types.index('current_time')):
left_data = int(left_data) if left_data != '' else 0 left_data = int(left_data) if left_data != '' else 0
right_data = int(right_data) if right_data != '' else 0 right_data = int(right_data) if right_data != '' else 0
if left_data == right_data: if left_data == right_data:
...@@ -89,14 +88,6 @@ class NetworkFilterProxyModel(QSortFilterProxyModel): ...@@ -89,14 +88,6 @@ class NetworkFilterProxyModel(QSortFilterProxyModel):
if index.column() == NetworkTableModel.columns_types.index('current_hash'): if index.column() == NetworkTableModel.columns_types.index('current_hash'):
return source_data[:10] return source_data[:10]
if index.column() == NetworkTableModel.columns_types.index('current_time') and source_data:
ts = self.blockchain_processor.adjusted_ts(self.app.currency, source_data)
return QLocale.toString(
QLocale(),
QDateTime.fromTime_t(ts),
QLocale.dateTimeFormat(QLocale(), QLocale.ShortFormat)
) + " BAT"
if role == Qt.TextAlignmentRole: if role == Qt.TextAlignmentRole:
if source_index.column() == NetworkTableModel.columns_types.index('address') \ if source_index.column() == NetworkTableModel.columns_types.index('address') \
or source_index.column() == self.sourceModel().columns_types.index('current_block'): or source_index.column() == self.sourceModel().columns_types.index('current_block'):
...@@ -129,7 +120,6 @@ class NetworkTableModel(QAbstractTableModel): ...@@ -129,7 +120,6 @@ class NetworkTableModel(QAbstractTableModel):
'api': QT_TRANSLATE_NOOP('NetworkTableModel', 'API'), 'api': QT_TRANSLATE_NOOP('NetworkTableModel', 'API'),
'current_block': QT_TRANSLATE_NOOP("NetworkTableModel", 'Block'), 'current_block': QT_TRANSLATE_NOOP("NetworkTableModel", 'Block'),
'current_hash': QT_TRANSLATE_NOOP("NetworkTableModel", 'Hash'), 'current_hash': QT_TRANSLATE_NOOP("NetworkTableModel", 'Hash'),
'current_time': QT_TRANSLATE_NOOP("NetworkTableModel", 'Time'),
'uid': QT_TRANSLATE_NOOP("NetworkTableModel", 'UID'), 'uid': QT_TRANSLATE_NOOP("NetworkTableModel", 'UID'),
'is_member': QT_TRANSLATE_NOOP("NetworkTableModel", 'Member'), 'is_member': QT_TRANSLATE_NOOP("NetworkTableModel", 'Member'),
'pubkey': QT_TRANSLATE_NOOP("NetworkTableModel", 'Pubkey'), 'pubkey': QT_TRANSLATE_NOOP("NetworkTableModel", 'Pubkey'),
...@@ -142,7 +132,6 @@ class NetworkTableModel(QAbstractTableModel): ...@@ -142,7 +132,6 @@ class NetworkTableModel(QAbstractTableModel):
'api', 'api',
'current_block', 'current_block',
'current_hash', 'current_hash',
'current_time',
'uid', 'uid',
'is_member', 'is_member',
'pubkey', 'pubkey',
...@@ -231,16 +220,16 @@ class NetworkTableModel(QAbstractTableModel): ...@@ -231,16 +220,16 @@ class NetworkTableModel(QAbstractTableModel):
api = "\n".join(api_list) api = "\n".join(api_list)
if node.current_buid: if node.current_buid:
number, block_hash, block_time = node.current_buid.number, node.current_buid.sha_hash, node.current_ts number, block_hash = node.current_buid.number, node.current_buid.sha_hash
else: else:
number, block_hash, block_time = "", "", "" number, block_hash = "", ""
state = node.state state = node.state
if not current_buid: if not current_buid:
current_buid = self.network_service.current_buid() current_buid = self.network_service.current_buid()
if node.state == Node.ONLINE and node.current_buid != current_buid: if node.state == Node.ONLINE and node.current_buid != current_buid:
state = NetworkTableModel.DESYNCED state = NetworkTableModel.DESYNCED
return (address, port, api, number, block_hash, block_time, node.uid, return (address, port, api, number, block_hash, node.uid,
node.member, node.pubkey, node.software, node.version, node.root, state, node.member, node.pubkey, node.software, node.version, node.root, state,
node) node)
......
import asyncio import asyncio
import logging import logging
import time import time
import random
from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, Qt from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, Qt
from duniterpy.api import errors from duniterpy.api import errors
from duniterpy.documents import BlockUID from duniterpy.documents.ws2p.heads import *
from duniterpy.documents.peer import BMAEndpoint
from duniterpy.key import VerifyingKey from duniterpy.key import VerifyingKey
from sakia.data.connectors import NodeConnector from sakia.data.connectors import NodeConnector
from sakia.data.entities import Node from sakia.data.entities import Node
...@@ -43,6 +45,7 @@ class NetworkService(QObject): ...@@ -43,6 +45,7 @@ class NetworkService(QObject):
self.add_connector(c) self.add_connector(c)
self.currency = currency self.currency = currency
self._must_crawl = False self._must_crawl = False
self._ws2p_heads_refreshing = False
self._block_found = self._processor.current_buid(self.currency) self._block_found = self._processor.current_buid(self.currency)
self._discovery_stack = [] self._discovery_stack = []
self._blockchain_service = blockchain_service self._blockchain_service = blockchain_service
...@@ -77,7 +80,14 @@ class NetworkService(QObject): ...@@ -77,7 +80,14 @@ class NetworkService(QObject):
""" """
connectors = [] connectors = []
for node in node_processor.nodes(currency): sample = []
for n in node_processor.online_nodes(currency):
for e in n.endpoints:
if isinstance(e, BMAEndpoint):
sample.append(n)
continue
for node in random.sample(sample, 6):
connectors.append(NodeConnector(node, app.parameters)) connectors.append(NodeConnector(node, app.parameters))
network = cls(app, currency, node_processor, connectors, blockchain_service, identities_service) network = cls(app, currency, node_processor, connectors, blockchain_service, identities_service)
return network return network
...@@ -125,6 +135,7 @@ class NetworkService(QObject): ...@@ -125,6 +135,7 @@ class NetworkService(QObject):
Add a nod to the network. Add a nod to the network.
""" """
self._connectors.append(node_connector) self._connectors.append(node_connector)
node_connector.block_found.connect(self.handle_new_block, type=Qt.UniqueConnection|Qt.QueuedConnection)
node_connector.changed.connect(self.handle_change, type=Qt.UniqueConnection|Qt.QueuedConnection) node_connector.changed.connect(self.handle_change, type=Qt.UniqueConnection|Qt.QueuedConnection)
node_connector.identity_changed.connect(self.handle_identity_change, type=Qt.UniqueConnection|Qt.QueuedConnection) node_connector.identity_changed.connect(self.handle_identity_change, type=Qt.UniqueConnection|Qt.QueuedConnection)
node_connector.neighbour_found.connect(self.handle_new_node, type=Qt.UniqueConnection|Qt.QueuedConnection) node_connector.neighbour_found.connect(self.handle_new_node, type=Qt.UniqueConnection|Qt.QueuedConnection)
...@@ -158,7 +169,7 @@ class NetworkService(QObject): ...@@ -158,7 +169,7 @@ class NetworkService(QObject):
self.node_removed.emit(connector.node) self.node_removed.emit(connector.node)
for connector in self._connectors: for connector in self._connectors:
if self.continue_crawling(): if self.continue_crawling() and len(self._connectors) <= 10:
await connector.init_session() await connector.init_session()
connector.refresh() connector.refresh()
if not first_loop: if not first_loop:
...@@ -188,21 +199,10 @@ class NetworkService(QObject): ...@@ -188,21 +199,10 @@ class NetworkService(QObject):
connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters) connector = NodeConnector.from_peer(self.currency, peer, self._app.parameters)
node = connector.node node = connector.node
self._processor.insert_node(connector.node) self._processor.insert_node(connector.node)
await connector.init_session()
connector.refresh(manual=True)
self.add_connector(connector)
self.new_node_found.emit(node) self.new_node_found.emit(node)
except InvalidNodeCurrency as e: except InvalidNodeCurrency as e:
self._logger.debug(str(e)) self._logger.debug(str(e))
if node and updated and self._blockchain_service.initialized(): if node and updated and self._blockchain_service.initialized():
try:
connector = next(conn for conn in self._connectors if conn.node == node)
except StopIteration:
self._logger.warning("A node not associated to"
" a connector was encoutered : {:}"
.format(node.pubkey[:7]))
else:
connector.refresh_summary()
try: try:
identity = await self._identities_service.find_from_pubkey(node.pubkey) identity = await self._identities_service.find_from_pubkey(node.pubkey)
identity = await self._identities_service.load_requirements(identity) identity = await self._identities_service.load_requirements(identity)
...@@ -229,26 +229,57 @@ class NetworkService(QObject): ...@@ -229,26 +229,57 @@ class NetworkService(QObject):
self._processor.update_node(connector.node) self._processor.update_node(connector.node)
self.node_changed.emit(connector.node) self.node_changed.emit(connector.node)
def handle_change(self): def handle_new_block(self, block_uid):
node_connector = self.sender() if not self._ws2p_heads_refreshing:
self._processor.update_node(node_connector.node) self._ws2p_heads_refreshing = True
self.node_changed.emit(node_connector.node) if self.current_buid() != block_uid:
asyncio.async(self.check_ws2p_heads())
async def check_ws2p_heads(self):
await asyncio.sleep(5)
futures = []
for connector in self._connectors:
futures.append(connector.request_ws2p_heads())
responses = await asyncio.gather(*futures, return_exceptions=True)
ws2p_heads = {}
for r in responses:
if isinstance(r, errors.DuniterError):
self._logger.debug("Exception in responses : " + str(r))
continue
else:
if r:
for head_data in r["heads"]:
if "messageV2" in head_data:
head = HeadV2.from_inline(head_data["messageV2"], head_data["sigV2"])
else:
head = HeadV1.from_inline(head_data["messageV2"], head_data["sigV2"])
VerifyingKey(head.pubkey).verify_ws2p_head(head)
if head.pubkey in ws2p_heads:
if ws2p_heads[head.pubkey].blockstamp < head.blockstamp:
ws2p_heads[head.pubkey] = head
else:
ws2p_heads[head.pubkey] = head
for head in ws2p_heads.values():
node, updated = self._processor.update_ws2p(self.currency, head)
if node and updated:
self.node_changed.emit(node)
self._ws2p_heads_refreshing = False
if node_connector.node.state == Node.ONLINE:
current_buid = self._processor.current_buid(self.currency) current_buid = self._processor.current_buid(self.currency)
self._logger.debug("{0} -> {1}".format(self._block_found.sha_hash[:10], current_buid.sha_hash[:10])) self._logger.debug("{0} -> {1}".format(self._block_found.sha_hash[:10], current_buid.sha_hash[:10]))
if self._block_found.sha_hash != current_buid.sha_hash: if self._block_found.sha_hash != current_buid.sha_hash:
self._logger.debug("Latest block changed : {0}".format(current_buid.number)) self._logger.debug("Latest block changed : {0}".format(current_buid.number))
self.latest_block_changed.emit(current_buid) self.latest_block_changed.emit(current_buid)
# If new latest block is lower than the previously found one
# or if the previously found block is different locally
# than in the main chain, we declare a rollback
if current_buid < self._block_found \
or node_connector.node.previous_buid != self._block_found:
self._logger.debug("Start rollback")
self._block_found = current_buid
#TODO: self._blockchain_service.rollback()
else:
self._logger.debug("Start refresh") self._logger.debug("Start refresh")
self._block_found = current_buid self._block_found = current_buid
asyncio.ensure_future(self._blockchain_service.handle_blockchain_progress(self._block_found)) asyncio.ensure_future(self._blockchain_service.handle_blockchain_progress(self._block_found))
def handle_change(self):
node_connector = self.sender()
self._processor.update_node(node_connector.node)
self.node_changed.emit(node_connector.node)
\ No newline at end of file
  • inso @Insoleet

    mentioned in commit e768deac

    ·

    mentioned in commit e768deac

    Toggle commit list
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment