Skip to content
Snippets Groups Projects
Commit 180aa1c0 authored by inso's avatar inso
Browse files

Working on implementation

parent 0c268169
No related branches found
No related tags found
No related merge requests found
......@@ -3,10 +3,11 @@ import os
import unittest
import subprocess
import time
import shlex
cmd = 'python -m pretenders.server.server --host 0.0.0.0 --port 50000'
cmd = 'python -m pretenders.server.server --host 127.0.0.1 --port 50000'
p = subprocess.Popen(cmd)
p = subprocess.Popen(shlex.split(cmd))
time.sleep(2)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'lib')))
......
......@@ -52,6 +52,7 @@ class StepPageInit(Step):
@pyqtSlot()
def check_node(self):
logging.debug("Check node")
asyncio.async(self.coroutine_check_node())
def is_valid(self):
......
......@@ -83,7 +83,7 @@ bma_peering = b"""{
def get_mock():
# Assume a running server
# Initialise the mock client and clear all responses
mock = HTTPMock('localhost', 50000)
mock = HTTPMock('127.0.0.1', 50000)
mock.when('GET /network/peering')\
.reply(body=bma_peering,
......
......@@ -16,11 +16,40 @@ from cutecoin.core.app import Application
from cutecoin.core.account import Account
from cutecoin.tests import get_application
class ProcessAddCommunity(unittest.TestCase):
test_instance = None
@staticmethod
def async_exception_handler(loop, context):
"""
An exception handler which exists the program if the exception
was not catch
:param loop: the asyncio loop
:param context: the exception context
"""
message = context.get('message')
if not message:
message = 'Unhandled exception in event loop'
try:
exception = context['exception']
except KeyError:
exc_info = False
else:
exc_info = (type(exception), exception, exception.__traceback__)
log_lines = [message]
for key in [k for k in sorted(context) if k not in {'message', 'exception'}]:
log_lines.append('{}: {!r}'.format(key, context[key]))
unittest.TestCase.fail(ProcessAddCommunity.test_instance, '\n'.join(log_lines))
def setUp(self):
ProcessAddCommunity.test_instance = self
self.qapplication = get_application()
QLocale.setDefault(QLocale("en_GB"))
self.lp = quamash.QEventLoop(self.qapplication)
self.lp.set_exception_handler(ProcessAddCommunity.async_exception_handler)
asyncio.set_event_loop(self.lp)
self.application = Application(self.qapplication, self.lp, None, None)
......@@ -35,21 +64,24 @@ class ProcessAddCommunity(unittest.TestCase):
asyncio.set_event_loop(None)
def test_add_community_empty_blockchain(self):
asyncio.set_event_loop(self.lp)
self.lp.run_forever()
mock = new_blockchain.get_mock()
self.process_community = ProcessConfigureCommunity(self.application, self.account, None, self.password_asker)
self.process_community = ProcessConfigureCommunity(self.application,
self.account, None,
self.password_asker)
QTest.mouseClick(self.process_community.lineedit_add_address, Qt.LeftButton)
QTest.keyClicks(self.process_community.lineedit_add_address, "localhost")
QTest.keyClicks(self.process_community.lineedit_add_address, "127.0.0.1")
QTest.mouseDClick(self.process_community.spinbox_add_port, Qt.LeftButton)
self.process_community.spinbox_add_port.setValue(50000)
self.assertEqual(self.process_community.lineedit_add_address.text(), "localhost")
self.assertEqual(self.process_community.lineedit_add_address.text(), "127.0.0.1")
self.assertEqual(self.process_community.spinbox_add_port.value(), 50000)
QTest.mouseClick(self.process_community.button_checknode, Qt.LeftButton)
time.sleep(1)
self.lp.run_forever()
QTest.qSleep(10000)
self.assertEqual(mock.get_request(0).method, 'GET')
self.assertEqual(mock.get_request(0).url, '/network/peering')
self.assertEqual(self.process_community.button_checknode.text(), "Ok !")
if __name__ == '__main__':
logging.basicConfig( stream=sys.stderr )
logging.getLogger().setLevel( logging.DEBUG )
unittest.main()
_application_ = []
def get_application():
"""Get the singleton QApplication"""
from PyQt5.QtWidgets import QApplication
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment