From 2964ea71c73bf279be44cd41e907acbe4adbd319 Mon Sep 17 00:00:00 2001 From: inso <insomniak.fr@gmaiL.com> Date: Wed, 8 Aug 2018 21:11:55 +0200 Subject: [PATCH] Do not refresh data older than 1 month --- src/sakia/data/entities/transaction.py | 29 ++++- src/sakia/data/processors/blockchain.py | 76 +++++------ src/sakia/data/processors/tx_lifecycle.py | 19 +-- .../gui/navigation/txhistory/sql_adapter.py | 20 +-- .../gui/navigation/txhistory/table_model.py | 75 ++++++++--- src/sakia/services/blockchain.py | 65 ++++------ src/sakia/services/identities.py | 122 +----------------- src/sakia/services/network.py | 1 + src/sakia/services/sources.py | 73 +++-------- src/sakia/services/transactions.py | 96 +++++++------- tests/technical/test_documents_service.py | 3 +- tests/technical/test_sources_service.py | 14 +- tests/technical/test_transactions_service.py | 15 ++- 13 files changed, 257 insertions(+), 351 deletions(-) diff --git a/src/sakia/data/entities/transaction.py b/src/sakia/data/entities/transaction.py index 1d02ce1a..9db652d8 100644 --- a/src/sakia/data/entities/transaction.py +++ b/src/sakia/data/entities/transaction.py @@ -1,6 +1,6 @@ import attr import hashlib -from duniterpy.documents import block_uid +from duniterpy.documents import block_uid, BlockUID from duniterpy.documents import Transaction as TransactionDoc from duniterpy.documents.transaction import reduce_base from sakia.helpers import attrs_tuple_of_str @@ -9,6 +9,7 @@ import math STOPLINE_HASH = hashlib.sha256("STOPLINE".encode("UTF-8")).hexdigest() + def parse_transaction_doc(tx_doc, pubkey, block_number, mediantime, txid): """ Parse a transaction @@ -73,6 +74,32 @@ def parse_transaction_doc(tx_doc, pubkey, block_number, mediantime, txid): raw=tx_doc.signed_raw()) return transaction + +STOPLINE_HASH = hashlib.sha256("STOPLINE".encode("UTF-8")).hexdigest() + + +def build_stopline(currency, pubkey, block_number, mediantime): + """ + Used to insert a line of ignored tx in the history + """ + transaction = Transaction(currency=currency, + pubkey=pubkey, + sha_hash=STOPLINE_HASH, + written_block=block_number, + blockstamp=BlockUID(block_number, BlockUID.empty().sha_hash), + timestamp=mediantime, + signatures="", + issuers="", + receivers="", + amount=0, + amount_base=0, + comment="", + txid=0, + state=Transaction.VALIDATED, + raw="") + return transaction + + @attr.s(hash=True) class Transaction: """ diff --git a/src/sakia/data/processors/blockchain.py b/src/sakia/data/processors/blockchain.py index 4df38e27..bd838009 100644 --- a/src/sakia/data/processors/blockchain.py +++ b/src/sakia/data/processors/blockchain.py @@ -81,6 +81,15 @@ class BlockchainProcessor: diff_time = diff_blocks * parameters.avg_gen_time return current_time + diff_time + def block_number_30days_ago(self, currency, blockstamp): + """ + When refreshing data, we only request the last 30 days + This method computes the block number 30 days ago + :param currency: + :return: + """ + avg_blocks_per_month = int(30 * 24 * 3600 / self.parameters(currency).avg_gen_time) + return blockstamp.number - avg_blocks_per_month def current_buid(self, currency): """ @@ -231,22 +240,6 @@ class BlockchainProcessor: local_current_buid = self.current_buid(currency) return sorted([b for b in with_money if b > local_current_buid.number]) - async def next_blocks(self, start, filter, currency): - """ - Get blocks from the network - :param List[int] numbers: list of blocks numbers to get - :return: the list of block documents - :rtype: List[duniterpy.documents.Block] - """ - blocks = [] - blocks_data = await self._bma_connector.get(currency, bma.blockchain.blocks, req_args={'count': 100, - 'start': start}) - for data in blocks_data: - if data['number'] in filter or data['number'] == start+99: - blocks.append(Block.from_signed_raw(data["raw"] + data["signature"] + "\n")) - - return blocks - async def initialize_blockchain(self, currency): """ Initialize blockchain for a given currency if no source exists locally @@ -353,30 +346,39 @@ class BlockchainProcessor: except sqlite3.IntegrityError: self._repo.update(blockchain) - def handle_new_blocks(self, currency, blocks): + async def handle_new_blocks(self, currency, network_blockstamp): """ Initialize blockchain for a given currency if no source exists locally - :param List[duniterpy.documents.Block] blocks + :param str currency: + :param BlockUID network_blockstamp: the blockstamp of the network """ - blockchain = self._repo.get_one(currency=currency) - for block in sorted(blocks): - if blockchain.current_buid < block.blockUID: - blockchain.current_buid = block.blockUID - blockchain.median_time = block.mediantime - blockchain.current_members_count = block.members_count - if block.ud: - blockchain.current_mass += (block.ud * 10**block.unit_base) * block.members_count - if block.ud and blockchain.last_ud_time + blockchain.parameters.dt_reeval >= block.mediantime: - blockchain.previous_mass = blockchain.last_mass - blockchain.previous_members_count = blockchain.last_members_count - blockchain.previous_ud = blockchain.last_ud - blockchain.previous_ud_base = blockchain.last_ud_base - blockchain.previous_ud_time = blockchain.last_ud_time - blockchain.last_members_count = block.members_count - blockchain.last_ud = block.ud - blockchain.last_ud_base = block.unit_base - blockchain.last_ud_time = block.mediantime - self._repo.update(blockchain) + + self._logger.debug("Requesting current block") + try: + current_block = await self._bma_connector.get(currency, bma.blockchain.block, + req_args={'number': network_blockstamp.number}) + signed_raw = "{0}{1}\n".format(current_block['raw'], current_block['signature']) + block = Block.from_signed_raw(signed_raw) + blockchain = self._repo.get_one(currency=currency) + blockchain.current_buid = block.blockUID + blockchain.median_time = block.mediantime + blockchain.current_members_count = block.members_count + if block.ud: + blockchain.current_mass += (block.ud * 10**block.unit_base) * block.members_count + if block.ud and blockchain.last_ud_time + blockchain.parameters.dt_reeval >= block.mediantime: + blockchain.previous_mass = blockchain.last_mass + blockchain.previous_members_count = blockchain.last_members_count + blockchain.previous_ud = blockchain.last_ud + blockchain.previous_ud_base = blockchain.last_ud_base + blockchain.previous_ud_time = blockchain.last_ud_time + blockchain.last_members_count = block.members_count + blockchain.last_ud = block.ud + blockchain.last_ud_base = block.unit_base + blockchain.last_ud_time = block.mediantime + self._repo.update(blockchain) + except errors.DuniterError as e: + if e.ucode != errors.NO_CURRENT_BLOCK: + raise def remove_blockchain(self, currency): self._repo.drop(self._repo.get_one(currency=currency)) diff --git a/src/sakia/data/processors/tx_lifecycle.py b/src/sakia/data/processors/tx_lifecycle.py index 8076f355..d80ac4ee 100644 --- a/src/sakia/data/processors/tx_lifecycle.py +++ b/src/sakia/data/processors/tx_lifecycle.py @@ -3,17 +3,17 @@ from sakia.data.entities import Transaction from duniterpy.documents import Block -def _found_in_block(tx, block): +def _found_in_block(tx, sha_hash, block_number): """ Check if the transaction can be found in the blockchain :param sakia.data.entities.Transaction tx: the transaction - :param duniterpy.documents.Block block: The block to check for the transaction + :param str sha_hash: The transaction sha_hash found in history + :param int block_number: The block_number where the tx was found :return: True if the transaction was found :rtype: bool """ - for block_tx in block.transactions: - if block_tx.sha_hash == tx.sha_hash: - return True + if sha_hash == tx.sha_hash: + return True def _broadcast_success(tx, ret_codes): @@ -49,15 +49,16 @@ def _is_locally_created(tx): return tx.local -def _be_validated(tx, block): +def _be_validated(tx, sha_hash, block_number): """ Action when the transfer ins found in a block :param sakia.data.entities.Transaction tx: the transaction :param bool rollback: True if we are in a rollback procedure - :param duniterpy.documents.Block block: The block checked + :param str sha_hash: The tx sha_hash found in history + :param int block_number: The block_number where the tx was found """ - tx.written_block = block.number + tx.written_block = block_number def _drop(tx): @@ -81,7 +82,7 @@ states = { (Transaction.TO_SEND, ()): ((_is_locally_created, _drop, Transaction.DROPPED),), - (Transaction.AWAITING, (Block,)): + (Transaction.AWAITING, (str, int,)): ((_found_in_block, _be_validated, Transaction.VALIDATED),), (Transaction.REFUSED, ()): diff --git a/src/sakia/gui/navigation/txhistory/sql_adapter.py b/src/sakia/gui/navigation/txhistory/sql_adapter.py index 5bcfbd11..6a05210a 100644 --- a/src/sakia/gui/navigation/txhistory/sql_adapter.py +++ b/src/sakia/gui/navigation/txhistory/sql_adapter.py @@ -19,6 +19,7 @@ SELECT AND transactions.ts >= ? and transactions.ts <= ? AND transactions.issuers LIKE "%{pubkey}%" + UNION ALL SELECT transactions.ts, @@ -35,7 +36,8 @@ SELECT and transactions.pubkey = ? AND transactions.ts >= ? and transactions.ts <= ? - AND transactions.receivers LIKE "%{pubkey}%" + AND (transactions.receivers LIKE "%{pubkey}%" + OR transactions.sha_hash = ?) UNION ALL SELECT dividends.timestamp as ts, @@ -61,7 +63,7 @@ PAGE_LENGTH = 50 class TxHistorySqlAdapter: _conn = attr.ib() # :type sqlite3.Connection - def _transfers_and_dividends(self, currency, pubkey, ts_from, ts_to, offset=0, limit=1000, + def _transfers_and_dividends(self, currency, pubkey, ts_from, ts_to, stopline_hash, offset=0, limit=1000, sort_by="currency", sort_order="ASC"): """ Get all transfers in the database on a given currency from or to a pubkey @@ -78,14 +80,14 @@ LIMIT {limit} OFFSET {offset}""").format(offset=offset, pubkey=pubkey ) c = self._conn.execute(request, (currency, pubkey, ts_from, ts_to, - currency, pubkey, ts_from, ts_to, + currency, pubkey, ts_from, ts_to, stopline_hash, currency, pubkey, ts_from, ts_to)) datas = c.fetchall() if datas: return datas return [] - def _transfers_and_dividends_count(self, currency, pubkey, ts_from, ts_to): + def _transfers_and_dividends_count(self, currency, pubkey, ts_from, ts_to, stopline_hash): """ Get all transfers in the database on a given currency from or to a pubkey @@ -97,14 +99,14 @@ SELECT COUNT(*) FROM ( """ + TX_HISTORY_REQUEST + ")").format(pubkey=pubkey) c = self._conn.execute(request, (currency, pubkey, ts_from, ts_to, - currency, pubkey, ts_from, ts_to, + currency, pubkey, ts_from, ts_to, stopline_hash, currency, pubkey, ts_from, ts_to)) datas = c.fetchone() if datas: return datas[0] return 0 - def transfers_and_dividends(self, currency, pubkey, page, ts_from, ts_to, sort_by, sort_order): + def transfers_and_dividends(self, currency, pubkey, page, ts_from, ts_to, stopline_hash, sort_by, sort_order): """ Get all transfers and dividends from or to a given pubkey :param str currency: @@ -115,12 +117,12 @@ FROM ( :return: the list of Transaction entities :rtype: List[sakia.data.entities.Transaction] """ - return self._transfers_and_dividends(currency, pubkey, ts_from, ts_to, + return self._transfers_and_dividends(currency, pubkey, ts_from, ts_to, stopline_hash, offset=page*PAGE_LENGTH, limit=PAGE_LENGTH, sort_by=sort_by, sort_order=sort_order) - def pages(self, currency, pubkey, ts_from, ts_to): + def pages(self, currency, pubkey, ts_from, ts_to, stopline_hash): """ Get all transfers and dividends from or to a given pubkey :param str currency: @@ -131,7 +133,7 @@ FROM ( :return: the list of Transaction entities :rtype: List[sakia.data.entities.Transaction] """ - count = self._transfers_and_dividends_count(currency, pubkey, ts_from, ts_to) + count = self._transfers_and_dividends_count(currency, pubkey, ts_from, ts_to, stopline_hash) return int(count / PAGE_LENGTH) diff --git a/src/sakia/gui/navigation/txhistory/table_model.py b/src/sakia/gui/navigation/txhistory/table_model.py index 5d5087ac..f6ebb545 100644 --- a/src/sakia/gui/navigation/txhistory/table_model.py +++ b/src/sakia/gui/navigation/txhistory/table_model.py @@ -9,6 +9,7 @@ from sakia.constants import MAX_CONFIRMATIONS from sakia.data.processors import BlockchainProcessor from .sql_adapter import TxHistorySqlAdapter from sakia.data.repositories import TransactionsRepo, DividendsRepo +from sakia.data.entities.transaction import STOPLINE_HASH class HistoryTableModel(QAbstractTableModel): @@ -89,7 +90,8 @@ class HistoryTableModel(QAbstractTableModel): return self.sql_adapter.pages(self.app.currency, self.connection.pubkey, ts_from=self.ts_from, - ts_to=self.ts_to) + ts_to=self.ts_to, + stopline_hash=STOPLINE_HASH) def pubkeys(self, row): return self.transfers_data[row][HistoryTableModel.columns_types.index('pubkey')].split('\n') @@ -104,6 +106,7 @@ class HistoryTableModel(QAbstractTableModel): page=self.current_page, ts_from=self.ts_from, ts_to=self.ts_to, + stopline_hash=STOPLINE_HASH, sort_by=HistoryTableModel.columns_to_sql[self.main_column_id], sort_order= "ASC" if Qt.AscendingOrder else "DESC") @@ -122,6 +125,29 @@ class HistoryTableModel(QAbstractTableModel): self.transfers_data[i] = self.data_received(transfer) self.dataChanged.emit(self.index(i, 0), self.index(i, len(HistoryTableModel.columns_types))) + def data_stopline(self, transfer): + """ + Converts a transaction to table data + :param sakia.data.entities.Transaction transfer: the transaction + :return: data as tuple + """ + block_number = transfer.written_block + + senders = [] + for issuer in transfer.issuers: + identity = self.identities_service.get_identity(issuer) + if identity: + senders.append(issuer + " (" + identity.uid + ")") + else: + senders.append(issuer) + + date_ts = transfer.timestamp + txid = transfer.txid + + return (date_ts, "", 0, + self.tr("Transactions missing from history"), transfer.state, txid, + transfer.issuers, block_number, transfer.sha_hash, transfer) + def data_received(self, transfer): """ Converts a transaction to table data @@ -199,7 +225,9 @@ class HistoryTableModel(QAbstractTableModel): sha_hash=data[4]) if transfer.state != Transaction.DROPPED: - if data[2] < 0: + if data[4] == STOPLINE_HASH: + self.transfers_data.append(self.data_stopline(transfer)) + elif data[2] < 0: self.transfers_data.append(self.data_sent(transfer)) else: self.transfers_data.append(self.data_received(transfer)) @@ -240,6 +268,7 @@ class HistoryTableModel(QAbstractTableModel): return QVariant() source_data = self.transfers_data[row][col] + txhash_data = self.transfers_data[row][HistoryTableModel.columns_types.index('txhash')] state_data = self.transfers_data[row][HistoryTableModel.columns_types.index('state')] block_data = self.transfers_data[row][HistoryTableModel.columns_types.index('block_number')] @@ -252,29 +281,39 @@ class HistoryTableModel(QAbstractTableModel): if col == HistoryTableModel.columns_types.index('pubkey'): return "<p>" + source_data.replace('\n', "<br>") + "</p>" if col == HistoryTableModel.columns_types.index('date'): - ts = self.blockchain_processor.adjusted_ts(self.connection.currency, source_data) - return QLocale.toString( - QLocale(), - QDateTime.fromTime_t(ts).date(), - QLocale.dateFormat(QLocale(), QLocale.ShortFormat) - ) + " BAT" + if txhash_data == STOPLINE_HASH: + return "" + else: + ts = self.blockchain_processor.adjusted_ts(self.connection.currency, source_data) + return QLocale.toString( + QLocale(), + QDateTime.fromTime_t(ts).date(), + QLocale.dateFormat(QLocale(), QLocale.ShortFormat) + ) + " BAT" if col == HistoryTableModel.columns_types.index('amount'): - amount = self.app.current_ref.instance(source_data, self.connection.currency, - self.app, block_data).diff_localized(False, False) - return amount + if txhash_data == STOPLINE_HASH: + return "" + else: + amount = self.app.current_ref.instance(source_data, self.connection.currency, + self.app, block_data).diff_localized(False, False) + return amount + return source_data if role == Qt.FontRole: font = QFont() - if state_data == Transaction.AWAITING or \ - (state_data == Transaction.VALIDATED and current_confirmations < MAX_CONFIRMATIONS): + if txhash_data == STOPLINE_HASH: font.setItalic(True) - elif state_data == Transaction.REFUSED: - font.setItalic(True) - elif state_data == Transaction.TO_SEND: - font.setBold(True) else: - font.setItalic(False) + if state_data == Transaction.AWAITING or \ + (state_data == Transaction.VALIDATED and current_confirmations < MAX_CONFIRMATIONS): + font.setItalic(True) + elif state_data == Transaction.REFUSED: + font.setItalic(True) + elif state_data == Transaction.TO_SEND: + font.setBold(True) + else: + font.setItalic(False) return font if role == Qt.ForegroundRole: diff --git a/src/sakia/services/blockchain.py b/src/sakia/services/blockchain.py index db95eaf3..dabef132 100644 --- a/src/sakia/services/blockchain.py +++ b/src/sakia/services/blockchain.py @@ -2,6 +2,7 @@ import asyncio from PyQt5.QtCore import QObject import math import logging +from duniterpy.api import bma from duniterpy.api.errors import DuniterError from sakia.errors import NoPeerAvailable @@ -40,17 +41,6 @@ class BlockchainService(QObject): def initialized(self): return self._blockchain_processor.initialized(self.app.currency) - def handle_new_blocks(self, blocks): - self._blockchain_processor.handle_new_blocks(self.currency, blocks) - - async def new_blocks(self, network_blockstamp): - with_identities = await self._blockchain_processor.new_blocks_with_identities(self.currency) - with_money = await self._blockchain_processor.new_blocks_with_money(self.currency) - block_numbers = with_identities + with_money - if network_blockstamp > self.current_buid(): - block_numbers += [network_blockstamp.number] - return block_numbers - async def handle_blockchain_progress(self, network_blockstamp): """ Handle a new current block uid @@ -61,32 +51,33 @@ class BlockchainService(QObject): try: self._update_lock = True self.app.refresh_started.emit() - block_numbers = await self.new_blocks(network_blockstamp) - while block_numbers: - start = self.current_buid().number - self._logger.debug("Parsing from {0}".format(start)) - blocks = await self._blockchain_processor.next_blocks(start, block_numbers, self.currency) - if len(blocks) > 0: - connections = self._connections_processor.connections_to(self.currency) - identities = await self._identities_service.handle_new_blocks(blocks) - changed_tx, new_tx, new_dividends = await self._transactions_service.handle_new_blocks(connections, - blocks) - destructions = await self._sources_service.refresh_sources(connections, new_tx, new_dividends) - self.handle_new_blocks(blocks) - self.app.db.commit() - for tx in changed_tx: - self.app.transaction_state_changed.emit(tx) - for conn in new_tx: - for tx in new_tx[conn]: - self.app.new_transfer.emit(conn, tx) - for conn in destructions: - for tx in destructions[conn]: - self.app.new_transfer.emit(conn, tx) - for conn in new_dividends: - for ud in new_dividends[conn]: - self.app.new_dividend.emit(conn, ud) - self.app.new_blocks_handled.emit() - block_numbers = await self.new_blocks(network_blockstamp) + start = self._blockchain_processor.block_number_30days_ago(self.currency, network_blockstamp) + if self.current_buid().number > start: + start = self.current_buid().number + 1 + else: + connections = self._connections_processor.connections_to(self.currency) + time_30days_ago = self._blockchain_processor.rounded_timestamp(self.currency, start) + self._transactions_service.insert_stopline(connections, start, time_30days_ago) + self._logger.debug("Parsing from {0}".format(start)) + connections = self._connections_processor.connections_to(self.currency) + await self._identities_service.refresh() + changed_tx, new_tx, new_dividends = await self._transactions_service.handle_new_blocks(connections, + start, + network_blockstamp.number) + await self._sources_service.refresh_sources(connections) + + await self._blockchain_processor.handle_new_blocks(self.currency, network_blockstamp) + + self.app.db.commit() + for tx in changed_tx: + self.app.transaction_state_changed.emit(tx) + for conn in new_tx: + for tx in new_tx[conn]: + self.app.new_transfer.emit(conn, tx) + for conn in new_dividends: + for ud in new_dividends[conn]: + self.app.new_dividend.emit(conn, ud) + self.app.new_blocks_handled.emit() self.app.sources_refreshed.emit() except (NoPeerAvailable, DuniterError) as e: self._logger.debug(str(e)) diff --git a/src/sakia/services/identities.py b/src/sakia/services/identities.py index 43f1808e..b35e22f0 100644 --- a/src/sakia/services/identities.py +++ b/src/sakia/services/identities.py @@ -313,106 +313,6 @@ class IdentitiesService(QObject): identities.append(idty) return identities - def _parse_revocations(self, block): - """ - Parse revoked pubkeys found in a block and refresh local data - - :param duniterpy.documents.Block block: the block received - :return: list of identities updated - """ - revoked = [] - connections_identities = self._get_connections_identities() - for idty in connections_identities: - for rev in block.revoked: - if rev.pubkey == idty.pubkey: - idty.revoked_on = block.number - revoked.append(idty) - return revoked - - def _parse_identities(self, block): - """ - Parse revoked pubkeys found in a block and refresh local data - - :param duniterpy.documents.Block block: the block received - :return: list of identities updated - """ - identities = [] - connections_identities = self._get_connections_identities() - for idty in connections_identities: - for idty_doc in block.identities: - if idty_doc.pubkey == idty.pubkey: - idty.written = True - identities.append(idty) - return identities - - def _parse_memberships(self, block): - """ - Parse memberships pubkeys found in a block and refresh local data - - :param duniterpy.documents.Block block: the block received - :return: list of pubkeys requiring a refresh of requirements - """ - need_refresh = [] - connections_identities = self._get_connections_identities() - for ms in block.joiners + block.actives: - # we update every written identities known locally - for identity in connections_identities: - if ms.issuer == identity: - identity.membership_written_on = block.number - identity.membership_type = "IN" - identity.membership_buid = ms.membership_ts - identity.written = True - self._identities_processor.insert_or_update_identity(identity) - # If the identity was not member - # it can become one - if not identity.member: - need_refresh.append(identity) - - for ms in block.leavers: - # we update every written identities known locally - for identity in connections_identities: - identity.membership_written_on = block.number - identity.membership_type = "OUT" - identity.membership_buid = ms.membership_ts - identity.written = True - self._identities_processor.insert_or_update_identity(identity) - # If the identity was a member - # it can stop to be one - if identity.member: - need_refresh.append(identity) - - return need_refresh - - async def _parse_certifications(self, block): - """ - Parse certified pubkeys found in a block and refresh local data - This method only creates certifications if one of both identities is - locally known as written. - This method returns the identities needing to be refreshed. These can only be - the identities which we already known as written before parsing this certification. - :param duniterpy.documents.Block block: - :return: - """ - connections_identities = self._get_connections_identities() - need_refresh = [] - parameters = self._blockchain_processor.parameters(self.currency) - current_ts = self._blockchain_processor.time(self.currency) - for identity in connections_identities: - if self._certs_processor.drop_expired(identity, sig_validity=parameters.sig_validity, - sig_window=parameters.sig_window, - current_ts=current_ts): - need_refresh.append(identity) - - for cert in block.certifications: - # if we have are a target or a source of the certification - if cert.pubkey_from == identity.pubkey or cert.pubkey_to in identity.pubkey: - identity.written = True - timestamp = self._blockchain_processor.rounded_timestamp(self.currency, cert.timestamp.number) - self._certs_processor.create_or_update_certification(self.currency, cert, timestamp, block.blockUID) - need_refresh.append(identity) - - return need_refresh - async def load_requirements(self, identity): """ Refresh a given identity information @@ -427,7 +327,7 @@ class IdentitiesService(QObject): if not identity.blockstamp or identity.blockstamp == block_uid(identity_data["meta"]["timestamp"]): identity.uid = identity_data["uid"] identity.blockstamp = block_uid(identity_data["meta"]["timestamp"]) - identity.timestamp = await self._blockchain_processor.timestamp(self.currency, identity.blockstamp.number) + identity.timestamp = self._blockchain_processor.rounded_timestamp(self.currency, identity.blockstamp.number) identity.outdistanced = identity_data["outdistanced"] identity.written = identity_data["wasMember"] identity.sentry = identity_data["isSentry"] @@ -447,28 +347,12 @@ class IdentitiesService(QObject): self._logger.debug(str(e)) return identity - async def parse_block(self, block): - """ - Parse a block to refresh local data - :param block: - :return: - """ - self._parse_revocations(block) - need_refresh = [] - need_refresh += self._parse_identities(block) - need_refresh += self._parse_memberships(block) - need_refresh += await self._parse_certifications(block) - need_refresh += self._parse_median_time(block) - return set(need_refresh) - - async def handle_new_blocks(self, blocks): + async def refresh(self): """ Handle new block received and refresh local data :param duniterpy.documents.Block block: the received block """ - need_refresh = [] - for block in blocks: - need_refresh += await self.parse_block(block) + need_refresh = self._get_connections_identities() refresh_futures = [] # for every identity for which we need a refresh, we gather # requirements requests diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py index 4c097a98..dc35159e 100644 --- a/src/sakia/services/network.py +++ b/src/sakia/services/network.py @@ -5,6 +5,7 @@ import random from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, Qt from duniterpy.api import errors +from duniterpy.documents import MalformedDocumentError from duniterpy.documents.ws2p.heads import * from duniterpy.documents.peer import BMAEndpoint from duniterpy.key import VerifyingKey diff --git a/src/sakia/services/sources.py b/src/sakia/services/sources.py index 040ed6ba..be0a2db9 100644 --- a/src/sakia/services/sources.py +++ b/src/sakia/services/sources.py @@ -135,59 +135,31 @@ class SourcesServices(QObject): self._transactions_processor.commit(destruction) return destruction - async def refresh_sources_of_pubkey(self, pubkey, transactions, dividends, unit_base, log_stream=None, - progress=None): + async def refresh_sources_of_pubkey(self, pubkey): """ Refresh the sources for a given pubkey :param str pubkey: - :param list[sakia.data.entities.Transaction] transactions: - :param list[sakia.data.entities.Dividend] dividends: - :param int unit_base: the unit base of the destruction. None to look for the past uds :return: the destruction of sources """ - tx_ud_data = {} - for tx in transactions: - try: - tx_ud_data[tx.written_block].append(tx) - except: - tx_ud_data[tx.written_block] = [tx] - for ud in dividends: - try: - tx_ud_data[ud.block_number].append(ud) - except: - tx_ud_data[ud.block_number] = [ud] - - blocks = sorted(b for b in tx_ud_data.keys()) - - nb_tx_ud = len(transactions) + len(dividends) - udblocks = await self._bma_connector.get(self.currency, bma.blockchain.ud) - destructions = [] - for block_number in blocks: - if log_stream: - log_stream("Parsing info ud/tx of {:}".format(block_number)) - if progress: - progress(1/nb_tx_ud) - - for data in tx_ud_data[block_number]: - if isinstance(data, Dividend): - self._parse_ud(pubkey, data) - if isinstance(data, Transaction): - self.parse_transaction_outputs(pubkey, data) - - for data in tx_ud_data[block_number]: - if isinstance(data, Transaction): - self.parse_transaction_inputs(pubkey, data) - - if not unit_base: - _, destruction_base = await self._blockchain_processor.ud_before(self.currency, block_number, udblocks) - else: - destruction_base = unit_base - destruction = await self.check_destruction(pubkey, block_number, destruction_base) - if destruction: - destructions.append(destruction) - return destructions + sources_data = await self._bma_connector.get(self.currency, bma.tx.sources, req_args={'pubkey': pubkey}) + self._sources_processor.drop_all_of(self.currency, pubkey) + for i, s in enumerate(sources_data["sources"]): + conditions = pypeg2.parse(s["conditions"], Condition) + if conditions.left.pubkey == pubkey: + try: + if conditions.left.pubkey == pubkey: + source = Source(currency=self.currency, + pubkey=pubkey, + identifier=s["identifier"], + type=s["type"], + noffset=s["noffset"], + amount=s["amount"], + base=s["base"]) + self._sources_processor.insert(source) + except AttributeError as e: + self._logger.error(str(e)) - async def refresh_sources(self, connections, transactions, dividends): + async def refresh_sources(self, connections): """ :param list[sakia.data.entities.Connection] connections: @@ -195,18 +167,13 @@ class SourcesServices(QObject): :param dict[sakia.data.entities.Dividend] dividends: :return: the destruction of sources """ - destructions = {} for conn in connections: - destructions[conn] = [] _, current_base = self._blockchain_processor.last_ud(self.currency) # there can be bugs if the current base switch during the parsing of blocks # but since it only happens every 23 years and that its only on accounts having less than 100 # this is acceptable I guess - destructions[conn] += await self.refresh_sources_of_pubkey(conn.pubkey, transactions[conn], - dividends[conn], current_base) - - return destructions + await self.refresh_sources_of_pubkey(conn.pubkey) def restore_sources(self, pubkey, tx): """ diff --git a/src/sakia/services/transactions.py b/src/sakia/services/transactions.py index ec966df0..c865061b 100644 --- a/src/sakia/services/transactions.py +++ b/src/sakia/services/transactions.py @@ -1,5 +1,5 @@ from PyQt5.QtCore import QObject -from sakia.data.entities.transaction import parse_transaction_doc, Transaction +from sakia.data.entities.transaction import parse_transaction_doc, Transaction, build_stopline from duniterpy.documents import Transaction as TransactionDoc from duniterpy.documents import SimpleTransaction, Block from sakia.data.entities import Dividend @@ -51,66 +51,62 @@ class TransactionsService(QObject): else: logging.debug("Error during transfer parsing") - def _parse_block(self, connections, block_doc, txid): - """ - Parse a block - :param duniterpy.documents.Block block_doc: The block - :param int txid: Latest tx id - :return: The list of transfers sent - """ - transfers_changed = [] - new_transfers = {} - for tx in [t for t in self._transactions_processor.awaiting(self.currency)]: - if self._transactions_processor.run_state_transitions(tx, block_doc): - transfers_changed.append(tx) - self._logger.debug("New transaction validated : {0}".format(tx.sha_hash)) + def insert_stopline(self, connections, block_number, time): for conn in connections: - new_transactions = [t for t in block_doc.transactions - if not self._transactions_processor.find_by_hash(conn.pubkey, t.sha_hash) - and SimpleTransaction.is_simple(t)] - - new_transfers[conn] = [] - for (i, tx_doc) in enumerate(new_transactions): - tx = parse_transaction_doc(tx_doc, conn.pubkey, block_doc.blockUID.number, block_doc.mediantime, txid+i) - if tx: - new_transfers[conn].append(tx) - self._transactions_processor.commit(tx) - else: - logging.debug("Error during transfer parsing") - - return transfers_changed, new_transfers + self._transactions_processor.commit(build_stopline(conn.currency, conn.pubkey, block_number, time)) - async def handle_new_blocks(self, connections, blocks): + async def handle_new_blocks(self, connections, start, end): """ Refresh last transactions :param list[duniterpy.documents.Block] blocks: The blocks containing data to parse """ self._logger.debug("Refresh transactions") + transfers_changed, new_transfers = await self.parse_transactions_history(connections, start, end) + new_dividends = await self.parse_dividends_history(connections, start, end, new_transfers) + return transfers_changed, new_transfers, new_dividends + + async def parse_transactions_history(self, connections, start, end): + """ + Request transactions from the network to initialize data for a given pubkey + :param List[sakia.data.entities.Connection] connections: the list of connections found by tx parsing + :param int start: the first block + :param int end: the last block + """ transfers_changed = [] new_transfers = {} - txid = 0 - for block in blocks: - changes, new_tx = self._parse_block(connections, block, txid) - txid += len(new_tx) - transfers_changed += changes - for conn in new_tx: - try: - new_transfers[conn] += new_tx[conn] - except KeyError: - new_transfers[conn] = new_tx[conn] - new_dividends = await self.parse_dividends_history(connections, blocks, new_transfers) - return transfers_changed, new_transfers, new_dividends + for connection in connections: + txid = 0 + new_transfers[connection] = [] + history_data = await self._bma_connector.get(self.currency, bma.tx.blocks, + req_args={'pubkey': connection.pubkey, + 'start': start, + 'end': end}) + for tx_data in history_data["history"]["sent"]: + for tx in [t for t in self._transactions_processor.awaiting(self.currency)]: + if self._transactions_processor.run_state_transitions(tx, tx_data["hash"], tx_data["block_number"]): + transfers_changed.append(tx) + self._logger.debug("New transaction validated : {0}".format(tx.sha_hash)) + for tx_data in history_data["history"]["received"]: + tx_doc = TransactionDoc.from_bma_history(history_data["currency"], tx_data) + if not self._transactions_processor.find_by_hash(connection.pubkey, tx_doc.sha_hash) \ + and SimpleTransaction.is_simple(tx_doc): + tx = parse_transaction_doc(tx_doc, connection.pubkey, tx_data["block_number"], + tx_data["time"], txid) + if tx: + new_transfers[connection].append(tx) + self._transactions_processor.commit(tx) + else: + logging.debug("Error during transfer parsing") + return transfers_changed, new_transfers - async def parse_dividends_history(self, connections, blocks, transactions): + async def parse_dividends_history(self, connections, start, end, transactions): """ Request transactions from the network to initialize data for a given pubkey :param List[sakia.data.entities.Connection] connections: the list of connections found by tx parsing :param List[duniterpy.documents.Block] blocks: the list of transactions found by tx parsing :param List[sakia.data.entities.Transaction] transactions: the list of transactions found by tx parsing """ - min_block_number = blocks[0].number - max_block_number = blocks[-1].number dividends = {} for connection in connections: dividends[connection] = [] @@ -124,7 +120,7 @@ class TransactionsService(QObject): timestamp=ud_data["time"], amount=ud_data["amount"], base=ud_data["base"]) - if max_block_number >= dividend.block_number >= min_block_number: + if start <= dividend.block_number <= end: self._logger.debug("Dividend of block {0}".format(dividend.block_number)) block_numbers.append(dividend.block_number) if self._dividends_processor.commit(dividend): @@ -135,13 +131,9 @@ class TransactionsService(QObject): for input in txdoc.inputs: # For each dividends inputs, if it is consumed (not present in ud history) if input.source == "D" and input.origin_id == connection.pubkey and input.index not in block_numbers: - try: - # we try to get the block of the dividend - block = next((b for b in blocks if b.number == input.index)) - except StopIteration: - block_data = await self._bma_connector.get(self.currency, bma.blockchain.block, - req_args={'number': input.index}) - block = Block.from_signed_raw(block_data["raw"] + block_data["signature"] + "\n") + block_data = await self._bma_connector.get(self.currency, bma.blockchain.block, + req_args={'number': input.index}) + block = Block.from_signed_raw(block_data["raw"] + block_data["signature"] + "\n") dividend = Dividend(currency=self.currency, pubkey=connection.pubkey, block_number=input.index, diff --git a/tests/technical/test_documents_service.py b/tests/technical/test_documents_service.py index 2d39b543..3718f050 100644 --- a/tests/technical/test_documents_service.py +++ b/tests/technical/test_documents_service.py @@ -13,8 +13,7 @@ async def test_send_more_than_40_sources(application_with_one_connection, fake_s changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) - for conn in new_tx: - await application_with_one_connection.sources_service.refresh_sources_of_pubkey(bob.key.pubkey, new_tx[conn], new_ud[conn], None) + await application_with_one_connection.sources_service.refresh_sources_of_pubkey(bob.key.pubkey) amount_before_send = application_with_one_connection.sources_service.amount(bob.key.pubkey) bob_connection = application_with_one_connection.db.connections_repo.get_one(pubkey=bob.key.pubkey) diff --git a/tests/technical/test_sources_service.py b/tests/technical/test_sources_service.py index 1b1ad862..c1ad6a8f 100644 --- a/tests/technical/test_sources_service.py +++ b/tests/technical/test_sources_service.py @@ -13,9 +13,8 @@ async def test_receive_source(application_with_one_connection, fake_server_with_ fake_server_with_blockchain.forge.forge_block() new_blocks = fake_server_with_blockchain.forge.blocks[-3:] connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections() - changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections, - new_blocks) - await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud) + await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) + await application_with_one_connection.sources_service.refresh_sources(connections) assert amount + 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey) await fake_server_with_blockchain.close() @@ -30,9 +29,8 @@ async def test_send_source(application_with_one_connection, fake_server_with_blo fake_server_with_blockchain.forge.forge_block() new_blocks = fake_server_with_blockchain.forge.blocks[-3:] connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections() - changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections, - new_blocks) - await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud) + await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) + await application_with_one_connection.sources_service.refresh_sources(connections) assert amount - 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey) await fake_server_with_blockchain.close() @@ -47,9 +45,9 @@ async def test_destruction(application_with_one_connection, fake_server_with_blo fake_server_with_blockchain.forge.forge_block() new_blocks = fake_server_with_blockchain.forge.blocks[-3:] connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections() - changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections, + await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) - await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud) + await application_with_one_connection.sources_service.refresh_sources(connections) assert 0 == application_with_one_connection.sources_service.amount(bob.key.pubkey) tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) assert "Too low balance" in [t.comment for t in tx_after_parse] diff --git a/tests/technical/test_transactions_service.py b/tests/technical/test_transactions_service.py index f55a1c91..0c25f3ff 100644 --- a/tests/technical/test_transactions_service.py +++ b/tests/technical/test_transactions_service.py @@ -15,12 +15,13 @@ async def test_send_tx_then_validate(application_with_one_connection, fake_serve assert len(tx_before_send) + 1 == len(tx_after_send) assert tx_after_send[-1].state is Transaction.AWAITING assert tx_after_send[-1].written_block == 0 + start = fake_server_with_blockchain.forge.blocks[-1].number fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block() - new_blocks = fake_server_with_blockchain.forge.blocks[-3:] + end = fake_server_with_blockchain.forge.blocks[-1].number connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections() - await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) + await application_with_one_connection.transactions_service.handle_new_blocks(connections, start, end) tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) assert tx_after_parse[-1].state is Transaction.VALIDATED assert tx_after_parse[-1].written_block == fake_server_with_blockchain.forge.blocks[-3].number @@ -32,12 +33,13 @@ async def test_receive_tx(application_with_one_connection, fake_server_with_bloc tx_before_send = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) fake_server_with_blockchain.forge.push(alice.send_money(10, fake_server_with_blockchain.forge.user_identities[alice.key.pubkey].sources, bob, fake_server_with_blockchain.forge.blocks[-1].blockUID, "Test receive")) + start = fake_server_with_blockchain.forge.blocks[-1].number fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block() - new_blocks = fake_server_with_blockchain.forge.blocks[-3:] + end = fake_server_with_blockchain.forge.blocks[-1].number connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections() - await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) + await application_with_one_connection.transactions_service.handle_new_blocks(connections, start, end) tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey) assert tx_after_parse[-1].state is Transaction.VALIDATED assert len(tx_before_send) + 1 == len(tx_after_parse) @@ -47,6 +49,7 @@ async def test_receive_tx(application_with_one_connection, fake_server_with_bloc @pytest.mark.asyncio async def test_issue_dividend(application_with_one_connection, fake_server_with_blockchain, bob): dividends_before_send = application_with_one_connection.transactions_service.dividends(bob.key.pubkey) + start = fake_server_with_blockchain.forge.blocks[-1].number + 1 fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.generate_dividend() fake_server_with_blockchain.forge.forge_block() @@ -54,9 +57,9 @@ async def test_issue_dividend(application_with_one_connection, fake_server_with_ fake_server_with_blockchain.forge.generate_dividend() fake_server_with_blockchain.forge.forge_block() fake_server_with_blockchain.forge.forge_block() - new_blocks = fake_server_with_blockchain.forge.blocks[-5:] + end = fake_server_with_blockchain.forge.blocks[-1].number connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections() - await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks) + await application_with_one_connection.transactions_service.handle_new_blocks(connections, start, end) dividends_after_parse = application_with_one_connection.transactions_service.dividends(bob.key.pubkey) assert len(dividends_before_send) + 2 == len(dividends_after_parse) await fake_server_with_blockchain.close() -- GitLab