Skip to content
Snippets Groups Projects
Commit 38ea4399 authored by inso's avatar inso
Browse files

Big work on network traversal stabilization

parent 5b98a826
No related branches found
No related tags found
No related merge requests found
......@@ -12,5 +12,8 @@
<file alias="wallet_icon">noun_29542_cc.svg</file>
<file alias="tx_icon">noun_63271_cc.svg</file>
<file alias="currency_icon">noun_43022_cc.svg</file>
<file alias="connected">connected.svg</file>
<file alias="weak_connect">weak_connect.svg</file>
<file alias="disconnected">disconnected.svg</file>
</qresource>
</RCC>
......@@ -104,8 +104,6 @@ class Community(QObject):
but nothing exists in ucoin to assert that a currency name is unique.
'''
new_block_mined = pyqtSignal(int)
def __init__(self, currency, network):
'''
Initialize community attributes with a currency and a network.
......@@ -120,8 +118,6 @@ class Community(QObject):
self.currency = currency
self._network = network
self._cache = Cache(self)
self._network.new_block_mined.connect(self.new_block_mined)
self._cache.refresh()
@classmethod
......@@ -280,15 +276,6 @@ class Community(QObject):
if '404' in e:
return 0
@property
def nodes(self):
'''
Get the known community nodes
:return: All community known nodes
'''
return self._network.all_nodes
@property
def network(self):
'''
......@@ -303,11 +290,10 @@ class Community(QObject):
Get a ratio of the synced nodes vs the rest
'''
synced = len(self._network.synced_nodes)
online = len(self._network.online_nodes)
total = len(self._network.all_nodes)
ratio_synced = synced * 2 / total
ratio_unsynced = (online - synced) / total
return (ratio_synced + ratio_unsynced) / 3
#online = len(self._network.online_nodes)
total = len(self._network.nodes)
ratio_synced = synced / total
return ratio_synced
@property
def parameters(self):
......@@ -410,9 +396,10 @@ class Community(QObject):
continue
else:
raise
except RequestException:
except RequestException as e:
logging.debug("Error : {1} : {0}".format(str(e),
str(request)))
continue
raise NoPeerAvailable(self.currency, len(nodes))
def post(self, request, req_args={}, post_args={}):
......
......@@ -8,10 +8,11 @@ from .node import Node
import logging
import time
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtCore import pyqtSignal, pyqtSlot, QMutex, QCoreApplication
from ..watching.watcher import Watcher
class Network(QObject):
class Network(Watcher):
'''
A network is managing nodes polling and crawling of a
given community.
......@@ -28,10 +29,10 @@ class Network(QObject):
:param list nodes: The nodes of the network
'''
super().__init__()
self._nodes = []
self._mutex = QMutex()
self.currency = currency
self._nodes = nodes
for n in self._nodes:
n.changed.connect(self.nodes_changed)
self.nodes = nodes
self._must_crawl = False
self._is_perpetual = False
......@@ -45,12 +46,12 @@ class Network(QObject):
:param node: The first knew node of the network
'''
nodes = [node]
network = cls(node.currency, nodes, 0)
network = cls(node.currency, nodes)
nodes = network.crawling()
block_max = max([n.block for n in nodes])
for node in nodes:
node.check_sync(block_max)
network._nodes = nodes
network.nodes = nodes
network.latest_block = block_max
return network
......@@ -63,16 +64,15 @@ class Network(QObject):
'''
for data in json_data:
node = Node.from_json(self.currency, data)
self._nodes.append(node)
if node.pubkey not in [n.pubkey for n in self.nodes]:
self.add_node(node)
logging.debug("Loading : {:}".format(data['pubkey']))
for n in self._nodes:
for n in self.nodes:
try:
n.changed.disconnect()
except TypeError:
pass
self._nodes = self.crawling()
for n in self._nodes:
n.changed.connect(self.nodes_changed)
self.nodes = self.crawling()
@classmethod
def from_json(cls, currency, json_data):
......@@ -98,7 +98,7 @@ class Network(QObject):
:return: The network as a dict in json format.
'''
data = []
for node in self._nodes:
for node in self.nodes:
data.append(node.jsonify())
return data
......@@ -119,35 +119,67 @@ class Network(QObject):
'''
Get nodes which are in the ONLINE state.
'''
return [n for n in self._nodes if n.state == Node.ONLINE]
return [n for n in self.nodes if n.state == Node.ONLINE]
@property
def online_nodes(self):
'''
Get nodes which are in the ONLINE state.
'''
return [n for n in self._nodes if n.state in (Node.ONLINE, Node.DESYNCED)]
return [n for n in self.nodes if n.state in (Node.ONLINE, Node.DESYNCED)]
@property
def all_nodes(self):
def nodes(self):
'''
Get all knew nodes.
'''
return self._nodes.copy()
return self._nodes
@nodes.setter
def nodes(self, new_nodes):
'''
Set new nodes
'''
self._mutex.lock()
try:
for n in self.nodes:
try:
n.disconnect()
except TypeError:
logging.debug("Error disconnecting node {0}".format(n.pubkey[:5]))
self._nodes = []
for n in new_nodes:
self.add_node(n)
finally:
self._mutex.unlock()
@property
def latest_block(self):
'''
Get latest block known
'''
return max([n.block for n in self._nodes])
return max([n.block for n in self.nodes])
def add_nodes(self, node):
def add_node(self, node):
'''
Add a node to the network.
'''
self._nodes.append(node)
node.changed.connect(self.nodes_changed)
node.changed.connect(self.handle_change)
logging.debug("{:} connected".format(node.pubkey))
def moveToThread(self, thread):
for n in self.nodes:
n.moveToThread(thread)
super().moveToThread(thread)
def watch(self):
self.stopped_perpetual_crawling.connect(self.watching_stopped)
self.start_perpetual_crawling()
def stop(self):
self.stop_crawling()
def start_perpetual_crawling(self):
'''
......@@ -156,25 +188,47 @@ class Network(QObject):
'''
self._must_crawl = True
while self.continue_crawling():
latest_before_crawling = self.latest_block
nodes = self.crawling(interval=10)
new_inlines = [n.endpoint.inline() for n in nodes]
last_inlines = [n.endpoint.inline() for n in self._nodes]
last_inlines = [n.endpoint.inline() for n in self.nodes]
hash_new_nodes = hash(tuple(frozenset(sorted(new_inlines))))
hash_last_nodes = hash(tuple(frozenset(sorted(last_inlines))))
if hash_new_nodes != hash_last_nodes:
self._nodes = nodes
self.nodes_changed.emit()
for n in self._nodes:
n.changed.connect(self.nodes_changed)
self.nodes = nodes
self.handle_change()
if self.latest_block != latest_before_crawling:
self.block_mined.emit(self.latest_block)
self.stopped_perpetual_crawling.emit()
@pyqtSlot()
def handle_change(self):
node = self.sender()
logging.debug("Handle change")
block_max = max([n.block for n in self.nodes])
if node.state in (Node.ONLINE, Node.DESYNCED):
node.check_sync(block_max)
if self.latest_block != block_max:
logging.debug("New block found : {0}".format(block_max))
self.latest_block = block_max
self.new_block_mined.emit(self.latest_block)
if node.last_change + 3600 < time.time() and \
node.state in (Node.OFFLINE, Node.CORRUPTED):
try:
node.changed.disconnect()
except TypeError:
logging.debug("Error : {0} not connected".format(node.pubkey))
pass
self.nodes.remove(node)
logging.debug("Syncing : {0} : last changed {1} : unsynced : {2}".format(node.pubkey[:5],
node.last_change, time.time() - node.last_change))
self.nodes_changed.emit()
def crawling(self, interval=0):
'''
One network crawling.
......@@ -183,32 +237,17 @@ class Network(QObject):
'''
nodes = []
traversed_pubkeys = []
for n in self._nodes.copy():
knew_pubkeys = [n.pubkey for n in self.nodes]
for n in self.nodes:
logging.debug(traversed_pubkeys)
logging.debug("Peering : next to read : {0} : {1}".format(n.pubkey,
(n.pubkey not in traversed_pubkeys)))
if n.pubkey not in traversed_pubkeys and self.continue_crawling():
n.peering_traversal(nodes,
if self.continue_crawling():
n.peering_traversal(knew_pubkeys, nodes,
traversed_pubkeys, interval,
self.continue_crawling)
QCoreApplication.processEvents()
time.sleep(interval)
block_max = max([n.block for n in nodes])
for node in [n for n in nodes if n.state == Node.ONLINE]:
node.check_sync(block_max)
for node in nodes:
if node.last_change + 3600 < time.time() and \
node.state in (Node.OFFLINE, Node.CORRUPTED):
try:
node.changed.disconnect()
except TypeError:
logging.debug("Error : {0} not connected".format(node.pubkey))
pass
nodes.remove(node)
for node in nodes:
logging.debug("Syncing : {0} : last changed {1} : unsynced : {2}".format(node.pubkey[:5],
node.last_change, time.time() - node.last_change))
logging.debug("Nodes found : {0}".format(nodes))
return nodes
......@@ -42,7 +42,7 @@ class Node(QObject):
self._endpoints = endpoints
self._uid = uid
self._pubkey = pubkey
self._block = block
self.block = block
self._state = state
self._neighbours = []
self._currency = currency
......@@ -70,6 +70,7 @@ class Node(QObject):
node = cls(peer.currency, peer.endpoints, "", peer.pubkey, 0,
Node.ONLINE, time.time())
node.refresh_state()
logging.debug("Node from address : {:}".format(str(node)))
return node
@classmethod
......@@ -88,6 +89,7 @@ class Node(QObject):
node = cls(peer.currency, peer.endpoints, "", "", 0,
Node.ONLINE, time.time())
node.refresh_state()
logging.debug("Node from peer : {:}".format(str(node)))
return node
@classmethod
......@@ -121,9 +123,11 @@ class Node(QObject):
node = cls(currency, endpoints, uid, pubkey, 0,
state, last_change)
node.refresh_state()
logging.debug("Node from json : {:}".format(str(node)))
return node
def jsonify(self):
logging.debug("Saving node : {:}".format(str(self)))
data = {'pubkey': self._pubkey,
'uid': self._uid,
'currency': self._currency,
......@@ -147,6 +151,10 @@ class Node(QObject):
def block(self):
return self._block
@block.setter
def block(self, new_block):
self._block = new_block
@property
def state(self):
return self._state
......@@ -173,19 +181,20 @@ class Node(QObject):
val))
self._last_change = val
def _change_state(self, new_state):
@state.setter
def state(self, new_state):
logging.debug("{:} | Last state : {:} / new state : {:}".format(self.pubkey[:5],
self.state, new_state))
if self.state != new_state:
if self._state != new_state:
self.last_change = time.time()
self._state = new_state
def check_sync(self, block):
logging.debug("Check sync")
if self._block < block:
self._change_state(Node.DESYNCED)
if self.block < block:
self.state = Node.DESYNCED
else:
self._change_state(Node.ONLINE)
self.state = Node.ONLINE
def _request_uid(self):
uid = ""
......@@ -205,12 +214,19 @@ class Node(QObject):
uid = ""
return uid
def refresh_state(self):
def refresh_state(self, init=False):
logging.debug("Refresh state")
emit_change = False
try:
informations = bma.network.Peering(self.endpoint.conn_handler()).get()
node_pubkey = informations["pubkey"]
try:
block = bma.blockchain.Current(self.endpoint.conn_handler()).get()
block_number = block["number"]
except ValueError as e:
if '404' in e:
block_number = 0
peers_data = bma.network.peering.Peers(self.endpoint.conn_handler()).get()
neighbours = []
for p in peers_data:
......@@ -218,39 +234,49 @@ class Node(QObject):
p['value']['signature']))
neighbours.append(peer.endpoints)
block_number = block["number"]
node_pubkey = informations["pubkey"]
node_currency = informations["currency"]
node_uid = self._request_uid()
#If the nodes goes back online...
if self.state in (Node.OFFLINE, Node.CORRUPTED):
self._change_state(Node.ONLINE)
self.state = Node.ONLINE
logging.debug("Change : new state online")
emit_change = True
except ValueError as e:
if '404' in e:
block_number = 0
except RequestException:
self._change_state(Node.OFFLINE)
if self.state != Node.OFFLINE:
self.state = Node.OFFLINE
logging.debug("Change : new state offine")
emit_change = True
# If not is offline, do not refresh last data
if init:
self.block = block_number
self._pubkey = node_pubkey
self._uid = node_uid
self._neighbours = neighbours
if self.state != Node.OFFLINE:
# If not changed its currency, consider it corrupted
if node_currency != self._currency:
self._change_state(Node.CORRUPTED)
self.state = Node.CORRUPTED
logging.debug("Change : new state corrupted")
emit_change = True
else:
node_uid = self._request_uid()
if block_number != self._block:
self._block = block_number
if block_number != self.block:
logging.debug("Change : new block {0} -> {1}".format(self.block,
block_number))
self.block = block_number
logging.debug("Changed block {0} -> {1}".format(self.block,
block_number))
emit_change = True
if node_pubkey != self._pubkey:
logging.debug("Change : new pubkey {0} -> {1}".format(self._pubkey,
node_pubkey))
self._pubkey = node_pubkey
emit_change = True
if node_uid != self._uid:
logging.debug("Change : new uid")
self._uid = node_uid
emit_change = True
......@@ -262,12 +288,14 @@ class Node(QObject):
hash_last_neighbours = hash(tuple(frozenset(sorted(last_inlines))))
if hash_new_neighbours != hash_last_neighbours:
self._neighbours = neighbours
logging.debug("Change : new neighbours {0} -> {1}".format(last_inlines,
new_inlines))
emit_change = True
if emit_change:
self.changed.emit()
def peering_traversal(self, found_nodes,
def peering_traversal(self, knew_pubkeys, found_nodes,
traversed_pubkeys, interval,
continue_crawling):
logging.debug("Read {0} peering".format(self.pubkey))
......@@ -286,16 +314,17 @@ class Node(QObject):
peering = bma.network.Peering(e.conn_handler()).get()
peer = Peer.from_signed_raw("{0}{1}\n".format(peering['raw'],
peering['signature']))
if peer.pubkey not in traversed_pubkeys and \
peer.pubkey not in knew_pubkeys and continue_crawling():
node = Node.from_peer(self._currency, peer)
logging.debug(traversed_pubkeys)
logging.debug("Traversing : next to read : {0} : {1}".format(node.pubkey,
(node.pubkey not in traversed_pubkeys)))
if node.pubkey not in traversed_pubkeys and continue_crawling():
node.peering_traversal(found_nodes,
traversed_pubkeys, interval, continue_crawling)
time.sleep(interval)
except RequestException as e:
self._change_state(Node.OFFLINE)
self.state = Node.OFFLINE
def __str__(self):
return ','.join([str(self.pubkey), str(self.endpoint.server), str(self.endpoint.port), str(self.block),
......
......@@ -7,7 +7,6 @@ Created on 18 mars 2015
from PyQt5.QtCore import QThread, Qt
from .blockchain import BlockchainWatcher
from .persons import PersonsWatcher
from .network import NetworkWatcher
import logging
......@@ -30,7 +29,7 @@ class Monitor(object):
return self._blockchain_watchers[community.name]
def network_watcher(self, community):
return self._network_watchers[community.name]
return self._networks[community.name]
def persons_watcher(self, community):
return self._persons_watchers[community.name]
......@@ -53,9 +52,8 @@ class Monitor(object):
self.connect_watcher_to_thread(bc_watcher)
self._blockchain_watchers[c.name] = bc_watcher
network_watcher = NetworkWatcher(c)
self.connect_watcher_to_thread(network_watcher)
self._network_watchers[c.name] = network_watcher
self.connect_watcher_to_thread(c.network)
self._network_watchers[c.name] = c.network
def start_network_watchers(self):
for watcher in self._network_watchers.values():
......
......@@ -47,7 +47,7 @@ class CurrencyTabWidget(QWidget, Ui_CurrencyTabWidget):
self.tab_network = NetworkTabWidget(self.community)
self.community.new_block_mined.connect(self.refresh_block)
self.community.network.new_block_mined.connect(self.refresh_block)
self.community.network.nodes_changed.connect(self.refresh_status)
persons_watcher = self.app.monitor.persons_watcher(self.community)
persons_watcher.person_changed.connect(self.tab_community.refresh_person)
......@@ -151,14 +151,15 @@ class CurrencyTabWidget(QWidget, Ui_CurrencyTabWidget):
@pyqtSlot()
def refresh_status(self):
if self.community.network_quality() > 0.66:
icon = '<img src=":/icons/connected" width="12" height="12"/>'
text = "Connected : Block {0}".format(self.community.network.latest_block)
self.status_label.setText(text)
elif self.community.network_quality() > 0.33:
icon = '<img src=":/icons/weak_connect" width="12" height="12"/>'
text = "Connected (weak link) : Block {0}".format(self.community.network.latest_block)
self.status_label.setText(text)
else:
icon = '<img src=":/icons/disconnected" width="12" height="12"/>'
text = "Disconnected : Block {0}".format(self.community.network.latest_block)
self.status_label.setText(text)
self.status_label.setText("{0}{1}".format(icon, text))
def refresh_wallets(self):
if self.app.current_account:
......
......@@ -86,6 +86,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.combo_referential.currentTextChanged.connect(self.referential_changed)
self.status_label = QLabel("", self)
self.status_label.setTextFormat(Qt.RichText)
self.label_time = QLabel("", self)
......
......@@ -7,7 +7,6 @@ Created on 20 févr. 2015
import logging
from PyQt5.QtWidgets import QWidget
from PyQt5.QtCore import Qt, QThread
from cutecoin.core.watching.network import NetworkWatcher
from ..models.network import NetworkTableModel, NetworkFilterProxyModel
from ..gen_resources.network_tab_uic import Ui_NetworkTabWidget
......@@ -32,4 +31,5 @@ class NetworkTabWidget(QWidget, Ui_NetworkTabWidget):
community.network.nodes_changed.connect(self.refresh_nodes)
def refresh_nodes(self):
logging.debug("Refresh nodes")
self.table_network.model().sourceModel().modelReset.emit()
......@@ -79,7 +79,7 @@ class NetworkTableModel(QAbstractTableModel):
@property
def nodes(self):
return self.community.nodes
return self.community.network.nodes
def rowCount(self, parent):
return len(self.nodes)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment