Skip to main content
Sign in
Snippets Groups Projects
Commit 49e91b39 authored by inso's avatar inso
Browse files

Big refactoring of threads to better handle their lifecycle

parent a2dcd465
Branches
Tags
No related merge requests found
Showing with 196 additions and 71 deletions
......@@ -15,10 +15,11 @@ import datetime
from PyQt5.QtCore import QObject, pyqtSignal
from . import config
from ..tools.exceptions import NameAlreadyExists, BadAccountFile
from .account import Account
from . import person
from .watching.monitor import Monitor
from .. import __version__
from ..tools.exceptions import NameAlreadyExists, BadAccountFile
class Application(QObject):
......@@ -41,6 +42,7 @@ class Application(QObject):
self.accounts = {}
self.default_account = ""
self.current_account = None
self.monitor = None
config.parse_arguments(argv)
self.load()
......@@ -97,9 +99,12 @@ class Application(QObject):
self.loading_progressed.emit(value, maximum)
if self.current_account is not None:
self.monitor.stop_watching()
self.save_cache(self.current_account)
account.loading_progressed.connect(progressing)
account.refresh_cache()
self.monitor = Monitor(account)
self.monitor.prepare_watching()
self.current_account = account
def load(self):
......
......
......@@ -4,6 +4,7 @@ Created on 1 févr. 2014
@author: inso
'''
from PyQt5.QtCore import QObject, pyqtSignal
from ucoinpy.api import bma
from ucoinpy.documents.block import Block
from ..tools.exceptions import NoPeerAvailable
......@@ -94,7 +95,7 @@ class Cache():
return self.data[cache_key]
class Community(object):
class Community(QObject):
'''
A community is a group of nodes using the same currency.
......@@ -102,6 +103,8 @@ class Community(object):
but nothing exists in ucoin to assert that a currency name is unique.
'''
new_block_mined = pyqtSignal()
def __init__(self, currency, network):
'''
Initialize community attributes with a currency and a network.
......@@ -112,6 +115,7 @@ class Community(object):
.. warning:: The community object should be created using its factory
class methods.
'''
super().__init__()
self.currency = currency
self._network = network
self._cache = Cache(self)
......
......
......@@ -17,6 +17,7 @@ class Network(QObject):
given community.
'''
nodes_changed = pyqtSignal()
stopped_perpetual_crawling = pyqtSignal()
def __init__(self, currency, nodes):
'''
......@@ -30,7 +31,8 @@ class Network(QObject):
self._nodes = nodes
for n in self._nodes:
n.changed.connect(self.nodes_changed)
self.must_crawl = False
self._must_crawl = False
self._is_perpetual = False
@classmethod
def create(cls, node):
......@@ -103,7 +105,13 @@ class Network(QObject):
'''
Stop network nodes crawling.
'''
self.must_crawl = False
self._must_crawl = False
def continue_crawling(self):
if self._is_perpetual:
return self._must_crawl
else:
return True
@property
def online_nodes(self):
......@@ -131,8 +139,8 @@ class Network(QObject):
Start crawling which never stops.
To stop this crawling, call "stop_crawling" method.
'''
self.must_crawl = True
while self.must_crawl:
self._must_crawl = True
while self.continue_crawling():
nodes = self.crawling(interval=10)
new_inlines = [n.endpoint.inline() for n in nodes]
......@@ -146,6 +154,7 @@ class Network(QObject):
self.nodes_changed.emit()
for n in self._nodes:
n.changed.connect(self.nodes_changed)
self.stopped_perpetual_crawling.emit()
def crawling(self, interval=0):
'''
......@@ -159,9 +168,10 @@ class Network(QObject):
logging.debug(traversed_pubkeys)
logging.debug("Peering : next to read : {0} : {1}".format(n.pubkey,
(n.pubkey not in traversed_pubkeys)))
if n.pubkey not in traversed_pubkeys:
if n.pubkey not in traversed_pubkeys and self.continue_crawling():
n.peering_traversal(nodes,
traversed_pubkeys, interval)
traversed_pubkeys, interval,
self.continue_crawling)
time.sleep(interval)
block_max = max([n.block for n in nodes])
......@@ -177,8 +187,5 @@ class Network(QObject):
pass
self._nodes.remove(node)
#TODO: Offline nodes for too long have to be removed
#TODO: Corrupted nodes should maybe be removed faster ?
logging.debug("Nodes found : {0}".format(nodes))
return nodes
......@@ -246,7 +246,8 @@ class Node(QObject):
self.changed.emit()
def peering_traversal(self, found_nodes,
traversed_pubkeys, interval):
traversed_pubkeys, interval,
continue_crawling):
logging.debug("Read {0} peering".format(self.pubkey))
traversed_pubkeys.append(self.pubkey)
self.refresh_state()
......@@ -254,6 +255,7 @@ class Node(QObject):
if self.pubkey not in [n.pubkey for n in found_nodes]:
# if node is corrupted remove it
if self.state != Node.CORRUPTED:
logging.debug("Found : {0} node".format(self.pubkey))
found_nodes.append(self)
try:
logging.debug(self.neighbours)
......@@ -266,9 +268,9 @@ class Node(QObject):
logging.debug(traversed_pubkeys)
logging.debug("Traversing : next to read : {0} : {1}".format(node.pubkey,
(node.pubkey not in traversed_pubkeys)))
if node.pubkey not in traversed_pubkeys:
if node.pubkey not in traversed_pubkeys and continue_crawling():
node.peering_traversal(found_nodes,
traversed_pubkeys, interval)
traversed_pubkeys, interval, continue_crawling())
time.sleep(interval)
except RequestException as e:
self._change_state(Node.OFFLINE)
......
......
......@@ -6,7 +6,7 @@ Created on 11 févr. 2014
import logging
import functools
import datetime
from ucoinpy.api import bma
from ucoinpy import PROTOCOL_VERSION
from ucoinpy.documents.certification import SelfCertification
......@@ -82,6 +82,7 @@ class Person(object):
:param str pubkey: The person pubkey
:param cache: The last returned values of the person properties.
'''
super().__init__()
self.uid = uid
self.pubkey = pubkey
self._cache = cache
......
......
......@@ -4,14 +4,14 @@ Created on 27 févr. 2015
@author: inso
'''
from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal
import logging
import time
from requests.exceptions import RequestException
from ...tools.exceptions import NoPeerAvailable
from .watcher import Watcher
class BlockchainWatcher(QObject):
class BlockchainWatcher(Watcher):
def __init__(self, account, community):
super().__init__()
self.account = account
......@@ -21,7 +21,6 @@ class BlockchainWatcher(QObject):
blockid = self.community.current_blockid()
self.last_block = blockid['number']
@pyqtSlot()
def watch(self):
while not self.exiting:
time.sleep(self.time_to_wait)
......@@ -29,18 +28,21 @@ class BlockchainWatcher(QObject):
blockid = self.community.current_blockid()
block_number = blockid['number']
if self.last_block != block_number:
if not self.exiting:
self.community.refresh_cache()
for w in self.account.wallets:
if not self.exiting:
w.refresh_cache(self.community)
logging.debug("New block, {0} mined in {1}".format(block_number,
self.community.currency))
self.new_block_mined.emit(block_number)
self.community.new_block_mined.emit(block_number)
self.last_block = block_number
except NoPeerAvailable:
return
pass
except RequestException as e:
self.connection_error.emit("Cannot check new block : {0}".format(str(e)))
self.error.emit("Cannot check new block : {0}".format(str(e)))
self.watching_stopped.emit()
new_block_mined = pyqtSignal(int)
connection_error = pyqtSignal(str)
\ No newline at end of file
def stop(self):
self.exiting = True
'''
Created on 18 mars 2015
@author: inso
'''
from PyQt5.QtCore import QThread
from .blockchain import BlockchainWatcher
from .persons import PersonsWatcher
from .network import NetworkWatcher
class Monitor(object):
'''
The monitor is managing watchers
'''
def __init__(self, account):
'''
Constructor
'''
self.account = account
self.threads_pool = []
self._blockchain_watchers = {}
self._network_watchers = {}
self._persons_watchers = {}
def blockchain_watcher(self, community):
return self._blockchain_watchers[community.name]
def network_watcher(self, community):
return self._network_watchers[community.name]
def persons_watcher(self, community):
return self._persons_watchers[community.name]
def connect_watcher_to_thread(self, watcher):
thread = QThread()
watcher.moveToThread(thread)
thread.started.connect(watcher.watch)
watcher.watching_stopped.connect(thread.exit)
thread.finished.connect(lambda: self.threads_pool.remove(thread))
thread.finished.connect(watcher.deleteLater)
thread.finished.connect(thread.deleteLater)
self.threads_pool.append(thread)
def prepare_watching(self):
for c in self.account.communities:
persons_watcher = PersonsWatcher(c)
self.connect_watcher_to_thread(persons_watcher)
self._persons_watchers[c.name] = persons_watcher
bc_watcher = BlockchainWatcher(self.account, c)
self.connect_watcher_to_thread(bc_watcher)
self._blockchain_watchers[c.name] = bc_watcher
network_watcher = NetworkWatcher(c)
self.connect_watcher_to_thread(network_watcher)
self._network_watchers[c.name] = network_watcher
def start_watching(self):
for thread in self.threads_pool:
thread.start()
def stop_watching(self):
for watcher in self._persons_watchers.values():
watcher.stop()
for watcher in self._blockchain_watchers.values():
watcher.stop()
for watcher in self._network_watchers.values():
watcher.stop()
......@@ -4,10 +4,10 @@ Created on 27 févr. 2015
@author: inso
'''
from PyQt5.QtCore import QObject, pyqtSlot
from .watcher import Watcher
class NetworkWatcher(QObject):
class NetworkWatcher(Watcher):
'''
This will crawl the network to always
have up to date informations about the nodes
......@@ -17,10 +17,10 @@ class NetworkWatcher(QObject):
super().__init__()
self.community = community
@pyqtSlot()
def watch(self):
self.community.network.stopped_perpetual_crawling.connect(self.watching_stopped)
self.community.network.start_perpetual_crawling()
@pyqtSlot()
def stop(self):
self.community.network.stop_crawling()
......@@ -4,34 +4,38 @@ Created on 27 févr. 2015
@author: inso
'''
from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal
from PyQt5.QtCore import pyqtSignal
from ..person import Person
from .watcher import Watcher
import logging
class PersonsWatcher(QObject):
class PersonsWatcher(Watcher):
'''
This will crawl the network to always
have up to date informations about the nodes
'''
person_changed = pyqtSignal(str)
end_watching = pyqtSignal()
def __init__(self, community):
super().__init__()
self.community = community
self.exiting = False
@pyqtSlot()
def watch(self):
logging.debug("Watching persons")
for p in Person._instances.values():
if not self.exiting:
for func in [Person.membership,
Person.is_member,
Person.certifiers_of,
Person.certified_by]:
if not self.exiting:
if p.reload(func, self.community):
logging.debug("Change detected on {0} about {1}".format(p.pubkey,
func.__name__))
self.person_changed.emit(p.pubkey)
self.end_watching.emit()
self.watching_stopped.emit()
def stop(self):
self.exiting = True
\ No newline at end of file
'''
Created on 20 mars 2015
@author: inso
'''
from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal
class Watcher(QObject):
watching_stopped = pyqtSignal()
error = pyqtSignal(str)
def __init__(self):
super().__init__()
@pyqtSlot()
def watch(self):
pass
@pyqtSlot()
def stop(self):
pass
......@@ -22,8 +22,8 @@ from ..tools.exceptions import MembershipNotFoundError
from ..core.wallet import Wallet
from ..core.person import Person
from ..core.transfer import Transfer
from ..core.watchers.blockchain import BlockchainWatcher
from ..core.watchers.persons import PersonsWatcher
from cutecoin.core.watching.blockchain import BlockchainWatcher
from cutecoin.core.watching.persons import PersonsWatcher
class CurrencyTabWidget(QWidget, Ui_CurrencyTabWidget):
......@@ -52,24 +52,11 @@ class CurrencyTabWidget(QWidget, Ui_CurrencyTabWidget):
self.tab_network = NetworkTabWidget(self.community)
self.bc_watcher = BlockchainWatcher(self.app.current_account,
community)
self.bc_watcher.new_block_mined.connect(self.refresh_block)
self.bc_watcher.connection_error.connect(self.display_error)
self.watcher_thread = QThread()
self.bc_watcher.moveToThread(self.watcher_thread)
self.watcher_thread.started.connect(self.bc_watcher.watch)
self.watcher_thread.start()
self.persons_watcher = PersonsWatcher(self.community)
self.persons_watcher.person_changed.connect(self.tab_community.refresh_person)
self.persons_watcher_thread = QThread()
self.persons_watcher.moveToThread(self.persons_watcher_thread)
self.persons_watcher_thread.started.connect(self.persons_watcher.watch)
self.persons_watcher.end_watching.connect(self.persons_watcher_thread.finished)
self.persons_watcher_thread.start()
self.community.new_block_mined.connect(self.refresh_block)
persons_watcher = self.app.monitor.persons_watcher(self.community)
persons_watcher.person_changed.connect(self.tab_community.refresh_person)
bc_watcher = self.app.monitor.blockchain_watcher(self.community)
bc_watcher.error.connect(self.display_error)
person = Person.lookup(self.app.current_account.pubkey, self.community)
try:
......@@ -287,11 +274,6 @@ QMessageBox.Ok | QMessageBox.Cancel)
self.status_label.setText("Connected : Block {0}"
.format(block_number))
def closeEvent(self, event):
super().closeEvent(event)
self.bc_watcher.deleteLater()
self.watcher_thread.deleteLater()
def dates_changed(self, datetime):
ts_from = self.date_from.dateTime().toTime_t()
ts_to = self.date_to.dateTime().toTime_t()
......
......
......@@ -123,6 +123,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.refresh()
self.busybar.hide()
self.app.disconnect()
self.app.monitor.start_watching()
@pyqtSlot(str)
def display_error(self, error):
......
......
......@@ -7,7 +7,7 @@ Created on 20 févr. 2015
import logging
from PyQt5.QtWidgets import QWidget
from PyQt5.QtCore import Qt, QThread
from ..core.watchers.network import NetworkWatcher
from cutecoin.core.watching.network import NetworkWatcher
from ..models.network import NetworkTableModel, NetworkFilterProxyModel
from ..gen_resources.network_tab_uic import Ui_NetworkTabWidget
......@@ -28,24 +28,8 @@ class NetworkTabWidget(QWidget, Ui_NetworkTabWidget):
proxy.setSourceModel(model)
self.table_network.setModel(proxy)
self.network_watcher = NetworkWatcher(community)
self.watcher_thread = QThread()
self.network_watcher.moveToThread(self.watcher_thread)
self.watcher_thread.started.connect(self.network_watcher.watch)
self.watcher_thread.finished.connect(self.network_watcher.stop)
self.watcher_thread.finished.connect(self.network_watcher.deleteLater)
self.watcher_thread.finished.connect(self.watcher_thread.deleteLater)
community.network.nodes_changed.connect(self.refresh_nodes)
def refresh_nodes(self):
self.table_network.model().sourceModel().modelReset.emit()
self.table_network.sortByColumn(0, Qt.AscendingOrder)
def closeEvent(self, event):
super().closeEvent(event)
self.watcher_thread.terminate()
def showEvent(self, event):
super().showEvent(event)
self.watcher_thread.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment