diff --git a/src/sakia/app.py b/src/sakia/app.py index 2d080f1f11b0754388e3e22b598f52eb06a9ee0e..de2ccd0823e9e624ae3c9a4e6e70b72f8cd7ec37 100644 --- a/src/sakia/app.py +++ b/src/sakia/app.py @@ -125,7 +125,8 @@ class Application(QObject): bma_connector) self.sources_service = SourcesServices(self.currency, sources_processor, - connections_processor, bma_connector) + connections_processor, transactions_processor, + blockchain_processor, bma_connector) self.blockchain_service = BlockchainService(self, self.currency, blockchain_processor, bma_connector, self.identities_service, diff --git a/src/sakia/data/entities/source.py b/src/sakia/data/entities/source.py index 96beeb2cf4a37e695d24f4269675ceefc6f030a4..870340ad7fd18495e375224a0af0793a193838bd 100644 --- a/src/sakia/data/entities/source.py +++ b/src/sakia/data/entities/source.py @@ -8,5 +8,5 @@ class Source: identifier = attr.ib(convert=str) noffset = attr.ib(convert=int) type = attr.ib(convert=str, validator=lambda i, a, s: s == 'T' or s == 'D') - amount = attr.ib(convert=int, cmp=False, hash=False) - base = attr.ib(convert=int, cmp=False, hash=False) + amount = attr.ib(convert=int, hash=False) + base = attr.ib(convert=int, hash=False) diff --git a/src/sakia/data/entities/user_parameters.py b/src/sakia/data/entities/user_parameters.py index b6e79736a2bd7d2d3b307877fc5adbfc70801f78..cceffb653d3a497a0a8dfe355be85d40f798c1a3 100644 --- a/src/sakia/data/entities/user_parameters.py +++ b/src/sakia/data/entities/user_parameters.py @@ -7,7 +7,7 @@ class UserParameters: The user parameters entity """ profile_name = attr.ib(convert=str, default="Default Profile") - lang = attr.ib(convert=str, default="en_US") + lang = attr.ib(convert=str, default="en") referential = attr.ib(convert=int, default=0) expert_mode = attr.ib(convert=bool, default=False) digits_after_comma = attr.ib(convert=int, default=2) diff --git a/src/sakia/data/processors/blockchain.py b/src/sakia/data/processors/blockchain.py index 3bdae2283be861a53be07566cfba4c1f31686120..2839319650c18b750244b5df94469bdb28643f4d 100644 --- a/src/sakia/data/processors/blockchain.py +++ b/src/sakia/data/processors/blockchain.py @@ -29,6 +29,24 @@ class BlockchainProcessor: def initialized(self, currency): return self._repo.get_one(currency=currency) is not None + async def ud_before(self, currency, block_number): + try: + udblocks = await self._bma_connector.get(currency, bma.blockchain.ud) + blocks = udblocks['result']['blocks'] + ud_block_number = next(b for b in blocks if b <= block_number) + block = await self._bma_connector.get(currency, bma.blockchain.block, {'number': ud_block_number}) + return block['dividend'], block['unitbase'] + except StopIteration: + self._logger.debug("No dividend generated before {0}".format(block_number)) + except NoPeerAvailable as e: + self._logger.debug(str(e)) + except errors.DuniterError as e: + if e.ucode == errors.BLOCK_NOT_FOUND: + self._logger.debug(str(e)) + else: + raise + return 0, 0 + async def timestamp(self, currency, block_number): try: block = await self._bma_connector.get(currency, bma.blockchain.block, {'number': block_number}) diff --git a/src/sakia/data/processors/dividends.py b/src/sakia/data/processors/dividends.py index a9d7e975ae4504f8a1701b2f43bace7777a64979..3da268237c8a2cf55550b7be6e576aa3ceee67e6 100644 --- a/src/sakia/data/processors/dividends.py +++ b/src/sakia/data/processors/dividends.py @@ -47,6 +47,7 @@ class DividendsProcessor: req_args={'pubkey': connection.pubkey}) log_stream("Found {0} available dividends".format(len(history_data["history"]["history"]))) block_numbers = [] + dividends = [] for ud_data in history_data["history"]["history"]: dividend = Dividend(currency=connection.currency, pubkey=connection.pubkey, @@ -57,6 +58,7 @@ class DividendsProcessor: log_stream("Dividend of block {0}".format(dividend.block_number)) block_numbers.append(dividend.block_number) try: + dividends.append(dividend) self._repo.insert(dividend) except sqlite3.IntegrityError: log_stream("Dividend already registered in database") @@ -77,9 +79,11 @@ class DividendsProcessor: base=block["unitbase"]) log_stream("Dividend of block {0}".format(dividend.block_number)) try: + dividends.append(dividend) self._repo.insert(dividend) except sqlite3.IntegrityError: log_stream("Dividend already registered in database") + return dividends def dividends(self, currency, pubkey): return self._repo.get_all(currency=currency, pubkey=pubkey) diff --git a/src/sakia/data/processors/sources.py b/src/sakia/data/processors/sources.py index deaa41a70748fe82d313ead463a807ab23b57abf..c93986d99923fa88cd6246751b397ac77ecbe5de 100644 --- a/src/sakia/data/processors/sources.py +++ b/src/sakia/data/processors/sources.py @@ -82,5 +82,17 @@ class SourcesProcessor: for s in sources: self._repo.drop(s) + def insert(self, source): + try: + self._repo.insert(source) + except sqlite3.IntegrityError: + self._logger.debug("Source already exist : {0}".format(source)) + + def drop(self, source): + try: + self._repo.drop(source) + except sqlite3.IntegrityError: + self._logger.debug("Source already dropped : {0}".format(source)) + def drop_all_of(self, currency, pubkey): self._repo.drop_all(currency=currency, pubkey=pubkey) diff --git a/src/sakia/data/processors/transactions.py b/src/sakia/data/processors/transactions.py index ce921673821d326bed54700b9adf42f09d1e9b36..0705b6fcecfbc57a8a02722a5f81e4a523cdc76c 100644 --- a/src/sakia/data/processors/transactions.py +++ b/src/sakia/data/processors/transactions.py @@ -145,8 +145,8 @@ class TransactionsProcessor: try: tx = parse_transaction_doc(sent, connection.pubkey, sent_data["block_number"], sent_data["time"], txid) - self._repo.insert(tx) transactions.append(tx) + self._repo.insert(tx) except sqlite3.IntegrityError: log_stream("Transaction already registered in database") await asyncio.sleep(0) diff --git a/src/sakia/data/repositories/sources.py b/src/sakia/data/repositories/sources.py index bd5266cf27688cc0810caddc8c4b8e746a0a5b9a..58ac6c7b24b1292b95a16561f3a42840b837a56c 100644 --- a/src/sakia/data/repositories/sources.py +++ b/src/sakia/data/repositories/sources.py @@ -8,7 +8,7 @@ class SourcesRepo: """The repository for Communities entities. """ _conn = attr.ib() # :type sqlite3.Connection - _primary_keys = (Source.identifier,) + _primary_keys = (Source.currency, Source.pubkey, Source.identifier, Source.noffset) def insert(self, source): """ @@ -67,7 +67,10 @@ class SourcesRepo: where_fields = attr.astuple(source, filter=attr.filters.include(*SourcesRepo._primary_keys)) self._conn.execute("""DELETE FROM sources WHERE - identifier=?""", where_fields) + currency=? AND + pubkey=? AND + identifier=? AND + noffset=?""", where_fields) def drop_all(self, **filter): filters = [] diff --git a/src/sakia/gui/dialogs/connection_cfg/controller.py b/src/sakia/gui/dialogs/connection_cfg/controller.py index 4c26d0d3eed5f6f3e9522fe3af7478b9e306441b..b1993af403a65aa17b9f7f49cf0e958d2045e4c8 100644 --- a/src/sakia/gui/dialogs/connection_cfg/controller.py +++ b/src/sakia/gui/dialogs/connection_cfg/controller.py @@ -175,10 +175,10 @@ class ConnectionConfigController(QObject): self.view.stream_log("Initializing transactions history...") transactions = await self.model.initialize_transactions(self.model.connection, log_stream=self.view.stream_log) self.view.stream_log("Initializing dividends history...") - await self.model.initialize_dividends(self.model.connection, transactions, log_stream=self.view.stream_log) + dividends = await self.model.initialize_dividends(self.model.connection, transactions, log_stream=self.view.stream_log) self.view.progress_bar.setValue(3) - await self.model.initialize_sources(self.view.stream_log) + await self.model.initialize_sources(transactions, dividends, self.view.stream_log) self._logger.debug("Validate changes") self.model.insert_or_update_connection() diff --git a/src/sakia/gui/dialogs/connection_cfg/model.py b/src/sakia/gui/dialogs/connection_cfg/model.py index 14b83a69f06f5abe62a9a34c1090fc257aec03cb..317334086540730b4242d55ea061b645c6af6b3d 100644 --- a/src/sakia/gui/dialogs/connection_cfg/model.py +++ b/src/sakia/gui/dialogs/connection_cfg/model.py @@ -57,14 +57,15 @@ class ConnectionConfigModel(QObject): blockchain_processor = BlockchainProcessor.instanciate(self.app) await blockchain_processor.initialize_blockchain(self.app.currency, log_stream) - async def initialize_sources(self, log_stream): + async def initialize_sources(self, transactions, dividends, log_stream): """ Download sources information locally :param function log_stream: a method to log data in the screen :return: """ - sources_processor = SourcesProcessor.instanciate(self.app) - await sources_processor.initialize_sources(self.app.currency, self.connection.pubkey, log_stream) + log_stream("Parsing sources...") + await self.app.sources_service.refresh_sources_of_pubkey(self.connection.pubkey, transactions, dividends, None) + log_stream("Sources parsed succefully !") async def initialize_identity(self, identity, log_stream): """ diff --git a/src/sakia/gui/navigation/txhistory/controller.py b/src/sakia/gui/navigation/txhistory/controller.py index 78f4d501c5e7ae485e672b674312fbc721cab1d1..36b46be799a4440222c5041496881afa3b40db23 100644 --- a/src/sakia/gui/navigation/txhistory/controller.py +++ b/src/sakia/gui/navigation/txhistory/controller.py @@ -6,7 +6,7 @@ from PyQt5.QtGui import QCursor from sakia.decorators import asyncify from sakia.gui.widgets import toast from sakia.gui.widgets.context_menu import ContextMenu -from sakia.data.entities import Identity +from sakia.data.entities import Identity, Transaction from .model import TxHistoryModel from .view import TxHistoryView @@ -73,11 +73,12 @@ class TxHistoryController(QObject): valid, identity, transfer = self.model.table_data(index) if valid: if identity is None: - if transfer.issuer != self.model.connection.pubkey: - pubkey = transfer.issuer - else: - pubkey = next(transfer.receiver) - identity = Identity(currency=transfer.currency, pubkey=pubkey) + if isinstance(transfer, Transaction): + if transfer.issuer != self.model.connection.pubkey: + pubkey = transfer.issuer + else: + pubkey = transfer.receiver + identity = Identity(currency=transfer.currency, pubkey=pubkey) menu = ContextMenu.from_data(self.view, self.model.app, self.model.connection, (identity, transfer)) menu.view_identity_in_wot.connect(self.view_in_wot) diff --git a/src/sakia/services/blockchain.py b/src/sakia/services/blockchain.py index 81880d64c0dc51f41bd8d886ea71f459ac8fb43d..bd5b75be95574bc3751873ec02c749ffa6562a2f 100644 --- a/src/sakia/services/blockchain.py +++ b/src/sakia/services/blockchain.py @@ -66,6 +66,7 @@ class BlockchainService(QObject): if len(blocks) > 0: identities = await self._identities_service.handle_new_blocks(blocks) changed_tx, new_tx, new_dividends = await self._transactions_service.handle_new_blocks(blocks) + new_tx += await self._sources_service.refresh_sources(new_tx, new_dividends) self.handle_new_blocks(blocks) self.app.db.commit() for tx in changed_tx: @@ -78,7 +79,6 @@ class BlockchainService(QObject): self.app.identity_changed.emit(idty) self.app.new_blocks_handled.emit() block_numbers = await self.new_blocks(network_blockstamp) - await self._sources_service.refresh_sources() self.app.sources_refreshed.emit() except (NoPeerAvailable, DuniterError) as e: self._logger.debug(str(e)) diff --git a/src/sakia/services/sources.py b/src/sakia/services/sources.py index fe082a28252da85dd09e59041c64f3e3f80c8939..796979ed9200075125ad39bb463e5397f9678d3a 100644 --- a/src/sakia/services/sources.py +++ b/src/sakia/services/sources.py @@ -1,7 +1,10 @@ from PyQt5.QtCore import QObject from duniterpy.api import bma, errors +from duniterpy.documents import Transaction as TransactionDoc +from duniterpy.documents import BlockUID import logging -from sakia.data.entities import Source +from sakia.data.entities import Source, Transaction +import hashlib class SourcesServices(QObject): @@ -9,18 +12,23 @@ class SourcesServices(QObject): Source service is managing sources received to update data locally """ - def __init__(self, currency, sources_processor, connections_processor, bma_connector): + def __init__(self, currency, sources_processor, connections_processor, + transactions_processor, blockchain_processor, bma_connector): """ Constructor the identities service :param str currency: The currency name of the community :param sakia.data.processors.SourcesProcessor sources_processor: the sources processor for given currency :param sakia.data.processors.ConnectionsProcessor connections_processor: the connections processor + :param sakia.data.processors.TransactionsProcessor transactions_processor: the transactions processor + :param sakia.data.processors.BlockchainProcessor blockchain_processor: the blockchain processor :param sakia.data.connectors.BmaConnector bma_connector: The connector to BMA API """ super().__init__() self._sources_processor = sources_processor self._connections_processor = connections_processor + self._transactions_processor = transactions_processor + self._blockchain_processor = blockchain_processor self._bma_connector = bma_connector self.currency = currency self._logger = logging.getLogger('sakia') @@ -28,20 +36,139 @@ class SourcesServices(QObject): def amount(self, pubkey): return self._sources_processor.amount(self.currency, pubkey) - async def refresh_sources(self): + def _parse_tx(self, pubkey, transaction): + """ + Parse a transaction + :param sakia.data.entities.Transaction transaction: + """ + txdoc = TransactionDoc.from_signed_raw(transaction.raw) + for offset, output in enumerate(txdoc.outputs): + if output.conditions.left.pubkey == pubkey: + source = Source(currency=self.currency, + pubkey=pubkey, + identifier=txdoc.sha_hash, + type='T', + noffset=offset, + amount=output.amount, + base=output.base) + self._sources_processor.insert(source) + for index, input in enumerate(txdoc.inputs): + source = Source(currency=self.currency, + pubkey=txdoc.issuers[0], + identifier=input.origin_id, + type=input.source, + noffset=input.index, + amount=input.amount, + base=input.base) + if source.pubkey == pubkey: + self._sources_processor.drop(source) + + def _parse_ud(self, pubkey, dividend): + """ + :param str pubkey: + :param sakia.data.entities.Dividend dividend: + :return: + """ + source = Source(currency=self.currency, + pubkey=pubkey, + identifier=pubkey, + type='D', + noffset=dividend.block_number, + amount=dividend.amount, + base=dividend.base) + self._sources_processor.insert(source) + + async def check_destruction(self, pubkey, block_number, unit_base): + amount = self._sources_processor.amount(self.currency, pubkey) + if amount < 100 * 10 ** unit_base: + if self._sources_processor.available(self.currency, pubkey): + self._sources_processor.drop_all_of(self.currency, pubkey) + timestamp = await self._blockchain_processor.timestamp(self.currency, block_number) + next_txid = self._transactions_processor.next_txid(self.currency, block_number) + sha_identifier = hashlib.sha256("Destruction{0}{1}{2}".format(block_number, pubkey, amount).encode("ascii")).hexdigest().upper() + destruction = Transaction(currency=self.currency, + sha_hash=sha_identifier, + written_block=block_number, + blockstamp=BlockUID.empty(), + timestamp=timestamp, + signature="", + issuer=pubkey, + receiver="", + amount=amount, + amount_base=0, + comment="Too low balance", + txid=next_txid, + state=Transaction.VALIDATED, + local=True, + raw="") + self._transactions_processor.commit(destruction) + return destruction + + async def refresh_sources_of_pubkey(self, pubkey, transactions, dividends, unit_base): + """ + Refresh the sources for a given pubkey + :param str pubkey: + :param list[sakia.data.entities.Transaction] transactions: + :param list[sakia.data.entities.Dividend] dividends: + :param int unit_base: the unit base of the destruction. None to look for the past uds + :return: the destruction of sources + """ + sorted_tx = (s for s in sorted(transactions, key=lambda t: t.written_block)) + sorted_ud = (u for u in sorted(dividends, key=lambda d: d.block_number)) + try: + tx = next(sorted_tx) + block_number = max(tx.written_block, 0) + except StopIteration: + tx = None + block_number = 0 + try: + ud = next(sorted_ud) + block_number = min(block_number, ud.block_number) + except StopIteration: + ud = None + + destructions = [] + while tx or ud: + if tx and tx.written_block == block_number: + self._parse_tx(pubkey, tx) + try: + tx = next(sorted_tx) + except StopIteration: + tx = None + if ud and ud.block_number == block_number: + self._parse_ud(pubkey, ud) + try: + ud = next(sorted_ud) + except StopIteration: + ud = None + if not unit_base: + _, destruction_base = await self._blockchain_processor.ud_before(self.currency, block_number) + else: + destruction_base = unit_base + destruction = await self.check_destruction(pubkey, block_number, destruction_base) + if destruction: + destructions.append(destruction) + if ud and tx: + block_number = min(ud.block_number, tx.written_block) + elif ud: + block_number = ud.block_number + elif tx: + block_number = tx.written_block + return destructions + + async def refresh_sources(self, transactions, dividends): + """ + + :param list[sakia.data.entities.Transaction] transactions: + :param list[sakia.data.entities.Dividend] dividends: + :return: the destruction of sources + """ connections_pubkeys = [c.pubkey for c in self._connections_processor.connections_to(self.currency)] + destructions = [] for pubkey in connections_pubkeys: - sources_data = await self._bma_connector.get(self.currency, bma.tx.sources, - req_args={'pubkey': pubkey}) - - self._logger.debug("Found {0} sources".format(len(sources_data['sources']))) - self._sources_processor.drop_all_of(currency=self.currency, pubkey=pubkey) - for i, s in enumerate(sources_data['sources']): - source = Source(currency=self.currency, pubkey=pubkey, - identifier=s['identifier'], - type=s['type'], - noffset=s['noffset'], - amount=s['amount'], - base=s['base']) - self._sources_processor.commit(source) - self._logger.debug("{0}/{1} sources".format(i, len(sources_data['sources']))) + _, current_base = self._blockchain_processor.last_ud(self.currency) + # there can be bugs if the current base switch during the parsing of blocks + # but since it only happens every 23 years and that its only on accounts having less than 100 + # this is acceptable I guess + destructions += await self.refresh_sources_of_pubkey(pubkey, transactions, dividends, current_base) + return destructions diff --git a/tests/technical/test_sources_service.py b/tests/technical/test_sources_service.py new file mode 100644 index 0000000000000000000000000000000000000000..dc6711ec583d2b5bf0e046db144d9059bcfb3ece --- /dev/null +++ b/tests/technical/test_sources_service.py @@ -0,0 +1,51 @@ +import pytest +from sakia.data.entities import Transaction + + +@pytest.mark.asyncio +async def test_receive_source(application_with_one_connection, fake_server, bob, alice): + amount = application_with_one_connection.sources_service.amount(bob.key.pubkey) + fake_server.forge.push(alice.send_money(150, fake_server.forge.user_identities[alice.key.pubkey].sources, bob, + fake_server.forge.blocks[-1].blockUID, "Test receive")) + fake_server.forge.forge_block() + fake_server.forge.forge_block() + fake_server.forge.forge_block() + new_blocks = fake_server.forge.blocks[-3:] + changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) + await application_with_one_connection.sources_service.refresh_sources(new_tx, new_ud) + assert amount + 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey) + await fake_server.close() + + +@pytest.mark.asyncio +async def test_send_source(application_with_one_connection, fake_server, bob, alice): + amount = application_with_one_connection.sources_service.amount(bob.key.pubkey) + fake_server.forge.push(bob.send_money(150, fake_server.forge.user_identities[bob.key.pubkey].sources, alice, + fake_server.forge.blocks[-1].blockUID, "Test receive")) + fake_server.forge.forge_block() + fake_server.forge.forge_block() + fake_server.forge.forge_block() + new_blocks = fake_server.forge.blocks[-3:] + changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) + await application_with_one_connection.sources_service.refresh_sources(new_tx, new_ud) + assert amount - 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey) + await fake_server.close() + + + +@pytest.mark.asyncio +async def test_destruction(application_with_one_connection, fake_server, bob, alice): + amount = application_with_one_connection.sources_service.amount(bob.key.pubkey) + fake_server.forge.push(bob.send_money(amount - 80, fake_server.forge.user_identities[bob.key.pubkey].sources, alice, + fake_server.forge.blocks[-1].blockUID, "Test receive")) + fake_server.forge.forge_block() + fake_server.forge.forge_block() + fake_server.forge.forge_block() + new_blocks = fake_server.forge.blocks[-3:] + changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(new_blocks) + await application_with_one_connection.sources_service.refresh_sources(new_tx, new_ud) + assert 0 == application_with_one_connection.sources_service.amount(bob.key.pubkey) + tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) + assert tx_after_parse[-1].comment == "Too low balance" + await fake_server.close() +