diff --git a/src/sakia/tests/mocks/bma/nice_blockchain.py b/src/sakia/tests/mocks/bma/nice_blockchain.py index 1d012287b6c7e2ac7ef289b0f747af68b0db21c2..91ef3a050f73bdfc03648144b3b73c8be5eab51a 100644 --- a/src/sakia/tests/mocks/bma/nice_blockchain.py +++ b/src/sakia/tests/mocks/bma/nice_blockchain.py @@ -40,12 +40,12 @@ bma_membership_john = { "memberships": [ { - "version": 1, "currency": "test_currency", "membership": "IN", "blockNumber": 0, - "blockHash": "DA39A3EE5E6B4B0D3255BFEF95601890AFD80709" + "blockHash": "DA39A3EE5E6B4B0D3255BFEF95601890AFD80709", + "written": 10000 } ] } diff --git a/src/sakia/tests/unit/core/test_identity.py b/src/sakia/tests/unit/core/test_identity.py index 26186f7a190b651fc46b6afb32fc6e3333a82019..0702b5902e8c2198ccd5d765cb8a7605ad48f6ce 100644 --- a/src/sakia/tests/unit/core/test_identity.py +++ b/src/sakia/tests/unit/core/test_identity.py @@ -1,96 +1,82 @@ -import sys import unittest -import logging -import time -import asyncio -import json -from aiohttp import web +from asynctest import Mock, CoroutineMock from PyQt5.QtCore import QLocale -from sakia.core.registry.identities import Identity, IdentitiesRegistry, LocalState, BlockchainState +from sakia.core.registry.identities import Identity, LocalState, BlockchainState from sakia.tests.mocks.bma import nice_blockchain, corrupted from sakia.tests import QuamashTest -from sakia.core import Application, Community -from sakia.core.net import Network, Node -from ucoinpy.documents.peer import BMAEndpoint -from sakia.core.net.api.bma.access import BmaAccess +from ucoinpy.api import bma from sakia.tools.exceptions import MembershipNotFoundError -from ucoinpy.api.bma import API class TestIdentity(unittest.TestCase, QuamashTest): def setUp(self): self.setUpQuamash() QLocale.setDefault(QLocale("en_GB")) - self.identities_registry = IdentitiesRegistry() - - self.application = Application(self.qapplication, self.lp, self.identities_registry) - self.application.preferences['notifications'] = False - - self.mock_nice_blockchain = nice_blockchain.get_mock(self.lp) - self.node = Node(self.mock_nice_blockchain.peer(), - "", "HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk", - None, Node.ONLINE, - time.time(), {}, "ucoin", "0.12.0", 0) - self.network = Network.create(self.node) - self.bma_access = BmaAccess.create(self.network) - self.community = Community("test_currency", self.network, self.bma_access) + self.identities_registry = Mock(spec='sakia.core.registry.IdentitiesRegistry') + self.community = Mock(spec='sakia.core.Community') + self.community.name = "test_brouzouf" + self.community.bma_access = Mock(spec='sakia.core.net.api.bma.BmaAccess') def tearDown(self): self.tearDownQuamash() def test_identity_certifiers_of(self): - time.sleep(1) + def bma_access(request, *args): + if request is bma.wot.CertifiersOf: + return nice_blockchain.bma_certifiers_of_john + if request is bma.wot.Lookup: + return nice_blockchain.bma_lookup_john + if request is bma.blockchain.Block: + return nice_blockchain.bma_blockchain_current + identity = Identity("john", "7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ", 1441130831, LocalState.COMPLETED, BlockchainState.VALIDATED) + id_doe = Identity("doe", "FADxcH5LmXGmGFgdixSes6nWnC4Vb4pRUBYT81zQRhjn", 1441230831, + LocalState.COMPLETED, BlockchainState.VALIDATED) + self.community.bma_access.future_request = CoroutineMock(side_effect=bma_access) + self.identities_registry.from_handled_data = Mock(return_value=id_doe) async def exec_test(): - srv, port, url = await self.mock_nice_blockchain.create_server() - self.addCleanup(srv.close) certifiers = await identity.certifiers_of(self.identities_registry, self.community) - self.assertEqual(len(certifiers), 1) + self.assertEqual(len(certifiers), 2) self.assertEqual(certifiers[0]['identity'].uid, "doe") - # Force one more loop turn - await asyncio.sleep(0) + self.assertEqual(certifiers[1]['identity'].uid, "doe") self.lp.run_until_complete(exec_test()) def test_identity_membership(self): - time.sleep(1) + def bma_access(request, *args): + if request is bma.blockchain.Membership: + return nice_blockchain.bma_membership_john + identity = Identity("john", "7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ", 1441130831, LocalState.COMPLETED, BlockchainState.VALIDATED) + self.community.bma_access.future_request = CoroutineMock(side_effect=bma_access) + async def exec_test(): - srv, port, url = await self.mock_nice_blockchain.create_server() - self.addCleanup(srv.close) ms = await identity.membership(self.community) self.assertEqual(ms["blockNumber"], 0) self.assertEqual(ms["blockHash"], "DA39A3EE5E6B4B0D3255BFEF95601890AFD80709") self.assertEqual(ms["membership"], "IN") self.assertEqual(ms["currency"], "test_currency") + self.assertEqual(ms["written"], 10000) self.lp.run_until_complete(exec_test()) def test_identity_corrupted_membership(self): - mock = corrupted.get_mock(self.lp) - time.sleep(1) - node = Node(mock.peer(), - "", "HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk", - None, Node.ONLINE, - time.time(), {}, "ucoin", "0.12.0", 0) - network = Network.create(node) - bma_access = BmaAccess.create(network) - community = Community("test_currency", network, bma_access) + def bma_access(request, *args): + if request is bma.blockchain.Membership: + return corrupted.bma_memberships_empty_array identity = Identity("john", "7Aqw6Efa9EzE7gtsc8SveLLrM7gm6NEGoywSv4FJx6pZ", 1441130831, LocalState.COMPLETED, BlockchainState.VALIDATED) - + self.community.bma_access.future_request = CoroutineMock(side_effect=bma_access) async def exec_test(): - srv, port, url = await mock.create_server() - self.addCleanup(srv.close) with self.assertRaises(MembershipNotFoundError): - await identity.membership(community) + await identity.membership(self.community) self.lp.run_until_complete(exec_test())