Skip to content
Snippets Groups Projects
Commit 629b5be0 authored by inso's avatar inso
Browse files

Fix issue #687 : Conns changed during refresh

parent 73683d9c
No related branches found
No related tags found
No related merge requests found
...@@ -131,7 +131,8 @@ class Application(QObject): ...@@ -131,7 +131,8 @@ class Application(QObject):
connections_processor, transactions_processor, connections_processor, transactions_processor,
blockchain_processor, bma_connector) blockchain_processor, bma_connector)
self.blockchain_service = BlockchainService(self, self.currency, blockchain_processor, bma_connector, self.blockchain_service = BlockchainService(self, self.currency, blockchain_processor, connections_processor,
bma_connector,
self.identities_service, self.identities_service,
self.transactions_service, self.transactions_service,
self.sources_service) self.sources_service)
......
...@@ -11,7 +11,7 @@ class BlockchainService(QObject): ...@@ -11,7 +11,7 @@ class BlockchainService(QObject):
Blockchain service is managing new blocks received Blockchain service is managing new blocks received
to update data locally to update data locally
""" """
def __init__(self, app, currency, blockchain_processor, bma_connector, def __init__(self, app, currency, blockchain_processor, connections_processor, bma_connector,
identities_service, transactions_service, sources_service): identities_service, transactions_service, sources_service):
""" """
Constructor the identities service Constructor the identities service
...@@ -19,6 +19,7 @@ class BlockchainService(QObject): ...@@ -19,6 +19,7 @@ class BlockchainService(QObject):
:param sakia.app.Application app: Sakia application :param sakia.app.Application app: Sakia application
:param str currency: The currency name of the community :param str currency: The currency name of the community
:param sakia.data.processors.BlockchainProcessor blockchain_processor: the blockchain processor for given currency :param sakia.data.processors.BlockchainProcessor blockchain_processor: the blockchain processor for given currency
:param sakia.data.processors.ConnectionsProcessor connections_processor: the connections processor
:param sakia.data.connectors.BmaConnector bma_connector: The connector to BMA API :param sakia.data.connectors.BmaConnector bma_connector: The connector to BMA API
:param sakia.services.IdentitiesService identities_service: The identities service :param sakia.services.IdentitiesService identities_service: The identities service
:param sakia.services.TransactionsService transactions_service: The transactions service :param sakia.services.TransactionsService transactions_service: The transactions service
...@@ -27,6 +28,7 @@ class BlockchainService(QObject): ...@@ -27,6 +28,7 @@ class BlockchainService(QObject):
super().__init__() super().__init__()
self.app = app self.app = app
self._blockchain_processor = blockchain_processor self._blockchain_processor = blockchain_processor
self._connections_processor = connections_processor
self._bma_connector = bma_connector self._bma_connector = bma_connector
self.currency = currency self.currency = currency
self._identities_service = identities_service self._identities_service = identities_service
...@@ -64,9 +66,11 @@ class BlockchainService(QObject): ...@@ -64,9 +66,11 @@ class BlockchainService(QObject):
self._logger.debug("Parsing from {0}".format(start)) self._logger.debug("Parsing from {0}".format(start))
blocks = await self._blockchain_processor.next_blocks(start, block_numbers, self.currency) blocks = await self._blockchain_processor.next_blocks(start, block_numbers, self.currency)
if len(blocks) > 0: if len(blocks) > 0:
connections = self._connections_processor.connections_to(self.currency)
identities = await self._identities_service.handle_new_blocks(blocks) identities = await self._identities_service.handle_new_blocks(blocks)
changed_tx, new_tx, new_dividends = await self._transactions_service.handle_new_blocks(blocks) changed_tx, new_tx, new_dividends = await self._transactions_service.handle_new_blocks(connections,
destructions = await self._sources_service.refresh_sources(new_tx, new_dividends) blocks)
destructions = await self._sources_service.refresh_sources(connections, new_tx, new_dividends)
self.handle_new_blocks(blocks) self.handle_new_blocks(blocks)
self.app.db.commit() self.app.db.commit()
for tx in changed_tx: for tx in changed_tx:
......
...@@ -159,14 +159,14 @@ class SourcesServices(QObject): ...@@ -159,14 +159,14 @@ class SourcesServices(QObject):
block_number = tx.written_block block_number = tx.written_block
return destructions return destructions
async def refresh_sources(self, transactions, dividends): async def refresh_sources(self, connections, transactions, dividends):
""" """
:param list[sakia.data.entities.Transaction] transactions: :param list[sakia.data.entities.Connection] connections:
:param list[sakia.data.entities.Dividend] dividends: :param dict[sakia.data.entities.Transaction] transactions:
:param dict[sakia.data.entities.Dividend] dividends:
:return: the destruction of sources :return: the destruction of sources
""" """
connections = self._connections_processor.connections_to(self.currency)
destructions = {} destructions = {}
for conn in connections: for conn in connections:
destructions[conn] = [] destructions[conn] = []
......
...@@ -51,7 +51,7 @@ class TransactionsService(QObject): ...@@ -51,7 +51,7 @@ class TransactionsService(QObject):
else: else:
logging.debug("Error during transfer parsing") logging.debug("Error during transfer parsing")
def _parse_block(self, block_doc, txid): def _parse_block(self, connections, block_doc, txid):
""" """
Parse a block Parse a block
:param duniterpy.documents.Block block_doc: The block :param duniterpy.documents.Block block_doc: The block
...@@ -64,7 +64,6 @@ class TransactionsService(QObject): ...@@ -64,7 +64,6 @@ class TransactionsService(QObject):
if self._transactions_processor.run_state_transitions(tx, block_doc): if self._transactions_processor.run_state_transitions(tx, block_doc):
transfers_changed.append(tx) transfers_changed.append(tx)
self._logger.debug("New transaction validated : {0}".format(tx.sha_hash)) self._logger.debug("New transaction validated : {0}".format(tx.sha_hash))
connections = self._connections_processor.connections_to(self.currency)
for conn in connections: for conn in connections:
new_transactions = [t for t in block_doc.transactions new_transactions = [t for t in block_doc.transactions
if not self._transactions_processor.find_by_hash(conn.pubkey, t.sha_hash) if not self._transactions_processor.find_by_hash(conn.pubkey, t.sha_hash)
...@@ -81,7 +80,7 @@ class TransactionsService(QObject): ...@@ -81,7 +80,7 @@ class TransactionsService(QObject):
return transfers_changed, new_transfers return transfers_changed, new_transfers
async def handle_new_blocks(self, blocks): async def handle_new_blocks(self, connections, blocks):
""" """
Refresh last transactions Refresh last transactions
...@@ -92,7 +91,7 @@ class TransactionsService(QObject): ...@@ -92,7 +91,7 @@ class TransactionsService(QObject):
new_transfers = {} new_transfers = {}
txid = 0 txid = 0
for block in blocks: for block in blocks:
changes, new_tx = self._parse_block(block, txid) changes, new_tx = self._parse_block(connections, block, txid)
txid += len(new_tx) txid += len(new_tx)
transfers_changed += changes transfers_changed += changes
for conn in new_tx: for conn in new_tx:
...@@ -100,16 +99,16 @@ class TransactionsService(QObject): ...@@ -100,16 +99,16 @@ class TransactionsService(QObject):
new_transfers[conn] += new_tx[conn] new_transfers[conn] += new_tx[conn]
except KeyError: except KeyError:
new_transfers[conn] = new_tx[conn] new_transfers[conn] = new_tx[conn]
new_dividends = await self.parse_dividends_history(blocks, new_transfers) new_dividends = await self.parse_dividends_history(connections, blocks, new_transfers)
return transfers_changed, new_transfers, new_dividends return transfers_changed, new_transfers, new_dividends
async def parse_dividends_history(self, blocks, transactions): async def parse_dividends_history(self, connections, blocks, transactions):
""" """
Request transactions from the network to initialize data for a given pubkey Request transactions from the network to initialize data for a given pubkey
:param List[sakia.data.entities.Connection] connections: the list of connections found by tx parsing
:param List[duniterpy.documents.Block] blocks: the list of transactions found by tx parsing :param List[duniterpy.documents.Block] blocks: the list of transactions found by tx parsing
:param List[sakia.data.entities.Transaction] transactions: the list of transactions found by tx parsing :param List[sakia.data.entities.Transaction] transactions: the list of transactions found by tx parsing
""" """
connections = self._connections_processor.connections_to(self.currency)
min_block_number = blocks[0].number min_block_number = blocks[0].number
max_block_number = blocks[-1].number max_block_number = blocks[-1].number
dividends = {} dividends = {}
......
import pytest import pytest
from sakia.data.entities import Transaction from sakia.data.processors import ConnectionsProcessor
@pytest.mark.asyncio @pytest.mark.asyncio
...@@ -9,7 +9,9 @@ async def test_send_more_than_40_sources(application_with_one_connection, fake_s ...@@ -9,7 +9,9 @@ async def test_send_more_than_40_sources(application_with_one_connection, fake_s
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-60:] new_blocks = fake_server_with_blockchain.forge.blocks[-60:]
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
for conn in new_tx: for conn in new_tx:
await application_with_one_connection.sources_service.refresh_sources_of_pubkey(bob.key.pubkey, new_tx[conn], new_ud[conn], None) await application_with_one_connection.sources_service.refresh_sources_of_pubkey(bob.key.pubkey, new_tx[conn], new_ud[conn], None)
......
import pytest import pytest
from sakia.data.entities import Transaction from sakia.data.entities import Transaction
from sakia.data.processors import TransactionsProcessor from sakia.data.processors import TransactionsProcessor, ConnectionsProcessor
@pytest.mark.asyncio @pytest.mark.asyncio
...@@ -12,8 +12,10 @@ async def test_receive_source(application_with_one_connection, fake_server_with_ ...@@ -12,8 +12,10 @@ async def test_receive_source(application_with_one_connection, fake_server_with_
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:] new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.sources_service.refresh_sources(new_tx, new_ud) changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud)
assert amount + 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey) assert amount + 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey)
await fake_server_with_blockchain.close() await fake_server_with_blockchain.close()
...@@ -27,8 +29,10 @@ async def test_send_source(application_with_one_connection, fake_server_with_blo ...@@ -27,8 +29,10 @@ async def test_send_source(application_with_one_connection, fake_server_with_blo
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:] new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.sources_service.refresh_sources(new_tx, new_ud) changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud)
assert amount - 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey) assert amount - 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey)
await fake_server_with_blockchain.close() await fake_server_with_blockchain.close()
...@@ -42,8 +46,10 @@ async def test_destruction(application_with_one_connection, fake_server_with_blo ...@@ -42,8 +46,10 @@ async def test_destruction(application_with_one_connection, fake_server_with_blo
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:] new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.sources_service.refresh_sources(new_tx, new_ud) changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud)
assert 0 == application_with_one_connection.sources_service.amount(bob.key.pubkey) assert 0 == application_with_one_connection.sources_service.amount(bob.key.pubkey)
tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
assert "Too low balance" in [t.comment for t in tx_after_parse] assert "Too low balance" in [t.comment for t in tx_after_parse]
......
import pytest import pytest
from sakia.data.entities import Transaction from sakia.data.entities import Transaction
from sakia.data.processors import ConnectionsProcessor
@pytest.mark.asyncio @pytest.mark.asyncio
...@@ -18,7 +19,8 @@ async def test_send_tx_then_validate(application_with_one_connection, fake_serve ...@@ -18,7 +19,8 @@ async def test_send_tx_then_validate(application_with_one_connection, fake_serve
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:] new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
assert tx_after_parse[-1].state is Transaction.VALIDATED assert tx_after_parse[-1].state is Transaction.VALIDATED
assert tx_after_parse[-1].written_block == fake_server_with_blockchain.forge.blocks[-3].number assert tx_after_parse[-1].written_block == fake_server_with_blockchain.forge.blocks[-3].number
...@@ -34,7 +36,8 @@ async def test_receive_tx(application_with_one_connection, fake_server_with_bloc ...@@ -34,7 +36,8 @@ async def test_receive_tx(application_with_one_connection, fake_server_with_bloc
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:] new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
assert tx_after_parse[-1].state is Transaction.VALIDATED assert tx_after_parse[-1].state is Transaction.VALIDATED
assert len(tx_before_send) + 1 == len(tx_after_parse) assert len(tx_before_send) + 1 == len(tx_after_parse)
...@@ -52,7 +55,8 @@ async def test_issue_dividend(application_with_one_connection, fake_server_with_ ...@@ -52,7 +55,8 @@ async def test_issue_dividend(application_with_one_connection, fake_server_with_
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-5:] new_blocks = fake_server_with_blockchain.forge.blocks[-5:]
await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
dividends_after_parse = application_with_one_connection.transactions_service.dividends(bob.key.pubkey) dividends_after_parse = application_with_one_connection.transactions_service.dividends(bob.key.pubkey)
assert len(dividends_before_send) + 2 == len(dividends_after_parse) assert len(dividends_before_send) + 2 == len(dividends_after_parse)
await fake_server_with_blockchain.close() await fake_server_with_blockchain.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment