Skip to content
Snippets Groups Projects
Commit 2964ea71 authored by inso's avatar inso
Browse files

Do not refresh data older than 1 month

parent 0c0c6d26
No related branches found
No related tags found
No related merge requests found
Showing with 257 additions and 351 deletions
import attr
import hashlib
from duniterpy.documents import block_uid
from duniterpy.documents import block_uid, BlockUID
from duniterpy.documents import Transaction as TransactionDoc
from duniterpy.documents.transaction import reduce_base
from sakia.helpers import attrs_tuple_of_str
......@@ -9,6 +9,7 @@ import math
STOPLINE_HASH = hashlib.sha256("STOPLINE".encode("UTF-8")).hexdigest()
def parse_transaction_doc(tx_doc, pubkey, block_number, mediantime, txid):
"""
Parse a transaction
......@@ -73,6 +74,32 @@ def parse_transaction_doc(tx_doc, pubkey, block_number, mediantime, txid):
raw=tx_doc.signed_raw())
return transaction
STOPLINE_HASH = hashlib.sha256("STOPLINE".encode("UTF-8")).hexdigest()
def build_stopline(currency, pubkey, block_number, mediantime):
"""
Used to insert a line of ignored tx in the history
"""
transaction = Transaction(currency=currency,
pubkey=pubkey,
sha_hash=STOPLINE_HASH,
written_block=block_number,
blockstamp=BlockUID(block_number, BlockUID.empty().sha_hash),
timestamp=mediantime,
signatures="",
issuers="",
receivers="",
amount=0,
amount_base=0,
comment="",
txid=0,
state=Transaction.VALIDATED,
raw="")
return transaction
@attr.s(hash=True)
class Transaction:
"""
......
......@@ -81,6 +81,15 @@ class BlockchainProcessor:
diff_time = diff_blocks * parameters.avg_gen_time
return current_time + diff_time
def block_number_30days_ago(self, currency, blockstamp):
"""
When refreshing data, we only request the last 30 days
This method computes the block number 30 days ago
:param currency:
:return:
"""
avg_blocks_per_month = int(30 * 24 * 3600 / self.parameters(currency).avg_gen_time)
return blockstamp.number - avg_blocks_per_month
def current_buid(self, currency):
"""
......@@ -231,22 +240,6 @@ class BlockchainProcessor:
local_current_buid = self.current_buid(currency)
return sorted([b for b in with_money if b > local_current_buid.number])
async def next_blocks(self, start, filter, currency):
"""
Get blocks from the network
:param List[int] numbers: list of blocks numbers to get
:return: the list of block documents
:rtype: List[duniterpy.documents.Block]
"""
blocks = []
blocks_data = await self._bma_connector.get(currency, bma.blockchain.blocks, req_args={'count': 100,
'start': start})
for data in blocks_data:
if data['number'] in filter or data['number'] == start+99:
blocks.append(Block.from_signed_raw(data["raw"] + data["signature"] + "\n"))
return blocks
async def initialize_blockchain(self, currency):
"""
Initialize blockchain for a given currency if no source exists locally
......@@ -353,14 +346,20 @@ class BlockchainProcessor:
except sqlite3.IntegrityError:
self._repo.update(blockchain)
def handle_new_blocks(self, currency, blocks):
async def handle_new_blocks(self, currency, network_blockstamp):
"""
Initialize blockchain for a given currency if no source exists locally
:param List[duniterpy.documents.Block] blocks
:param str currency:
:param BlockUID network_blockstamp: the blockstamp of the network
"""
self._logger.debug("Requesting current block")
try:
current_block = await self._bma_connector.get(currency, bma.blockchain.block,
req_args={'number': network_blockstamp.number})
signed_raw = "{0}{1}\n".format(current_block['raw'], current_block['signature'])
block = Block.from_signed_raw(signed_raw)
blockchain = self._repo.get_one(currency=currency)
for block in sorted(blocks):
if blockchain.current_buid < block.blockUID:
blockchain.current_buid = block.blockUID
blockchain.median_time = block.mediantime
blockchain.current_members_count = block.members_count
......@@ -377,6 +376,9 @@ class BlockchainProcessor:
blockchain.last_ud_base = block.unit_base
blockchain.last_ud_time = block.mediantime
self._repo.update(blockchain)
except errors.DuniterError as e:
if e.ucode != errors.NO_CURRENT_BLOCK:
raise
def remove_blockchain(self, currency):
self._repo.drop(self._repo.get_one(currency=currency))
......
......@@ -3,16 +3,16 @@ from sakia.data.entities import Transaction
from duniterpy.documents import Block
def _found_in_block(tx, block):
def _found_in_block(tx, sha_hash, block_number):
"""
Check if the transaction can be found in the blockchain
:param sakia.data.entities.Transaction tx: the transaction
:param duniterpy.documents.Block block: The block to check for the transaction
:param str sha_hash: The transaction sha_hash found in history
:param int block_number: The block_number where the tx was found
:return: True if the transaction was found
:rtype: bool
"""
for block_tx in block.transactions:
if block_tx.sha_hash == tx.sha_hash:
if sha_hash == tx.sha_hash:
return True
......@@ -49,15 +49,16 @@ def _is_locally_created(tx):
return tx.local
def _be_validated(tx, block):
def _be_validated(tx, sha_hash, block_number):
"""
Action when the transfer ins found in a block
:param sakia.data.entities.Transaction tx: the transaction
:param bool rollback: True if we are in a rollback procedure
:param duniterpy.documents.Block block: The block checked
:param str sha_hash: The tx sha_hash found in history
:param int block_number: The block_number where the tx was found
"""
tx.written_block = block.number
tx.written_block = block_number
def _drop(tx):
......@@ -81,7 +82,7 @@ states = {
(Transaction.TO_SEND, ()):
((_is_locally_created, _drop, Transaction.DROPPED),),
(Transaction.AWAITING, (Block,)):
(Transaction.AWAITING, (str, int,)):
((_found_in_block, _be_validated, Transaction.VALIDATED),),
(Transaction.REFUSED, ()):
......
......@@ -19,6 +19,7 @@ SELECT
AND transactions.ts >= ?
and transactions.ts <= ?
AND transactions.issuers LIKE "%{pubkey}%"
UNION ALL
SELECT
transactions.ts,
......@@ -35,7 +36,8 @@ SELECT
and transactions.pubkey = ?
AND transactions.ts >= ?
and transactions.ts <= ?
AND transactions.receivers LIKE "%{pubkey}%"
AND (transactions.receivers LIKE "%{pubkey}%"
OR transactions.sha_hash = ?)
UNION ALL
SELECT
dividends.timestamp as ts,
......@@ -61,7 +63,7 @@ PAGE_LENGTH = 50
class TxHistorySqlAdapter:
_conn = attr.ib() # :type sqlite3.Connection
def _transfers_and_dividends(self, currency, pubkey, ts_from, ts_to, offset=0, limit=1000,
def _transfers_and_dividends(self, currency, pubkey, ts_from, ts_to, stopline_hash, offset=0, limit=1000,
sort_by="currency", sort_order="ASC"):
"""
Get all transfers in the database on a given currency from or to a pubkey
......@@ -78,14 +80,14 @@ LIMIT {limit} OFFSET {offset}""").format(offset=offset,
pubkey=pubkey
)
c = self._conn.execute(request, (currency, pubkey, ts_from, ts_to,
currency, pubkey, ts_from, ts_to,
currency, pubkey, ts_from, ts_to, stopline_hash,
currency, pubkey, ts_from, ts_to))
datas = c.fetchall()
if datas:
return datas
return []
def _transfers_and_dividends_count(self, currency, pubkey, ts_from, ts_to):
def _transfers_and_dividends_count(self, currency, pubkey, ts_from, ts_to, stopline_hash):
"""
Get all transfers in the database on a given currency from or to a pubkey
......@@ -97,14 +99,14 @@ SELECT COUNT(*)
FROM (
""" + TX_HISTORY_REQUEST + ")").format(pubkey=pubkey)
c = self._conn.execute(request, (currency, pubkey, ts_from, ts_to,
currency, pubkey, ts_from, ts_to,
currency, pubkey, ts_from, ts_to, stopline_hash,
currency, pubkey, ts_from, ts_to))
datas = c.fetchone()
if datas:
return datas[0]
return 0
def transfers_and_dividends(self, currency, pubkey, page, ts_from, ts_to, sort_by, sort_order):
def transfers_and_dividends(self, currency, pubkey, page, ts_from, ts_to, stopline_hash, sort_by, sort_order):
"""
Get all transfers and dividends from or to a given pubkey
:param str currency:
......@@ -115,12 +117,12 @@ FROM (
:return: the list of Transaction entities
:rtype: List[sakia.data.entities.Transaction]
"""
return self._transfers_and_dividends(currency, pubkey, ts_from, ts_to,
return self._transfers_and_dividends(currency, pubkey, ts_from, ts_to, stopline_hash,
offset=page*PAGE_LENGTH,
limit=PAGE_LENGTH,
sort_by=sort_by, sort_order=sort_order)
def pages(self, currency, pubkey, ts_from, ts_to):
def pages(self, currency, pubkey, ts_from, ts_to, stopline_hash):
"""
Get all transfers and dividends from or to a given pubkey
:param str currency:
......@@ -131,7 +133,7 @@ FROM (
:return: the list of Transaction entities
:rtype: List[sakia.data.entities.Transaction]
"""
count = self._transfers_and_dividends_count(currency, pubkey, ts_from, ts_to)
count = self._transfers_and_dividends_count(currency, pubkey, ts_from, ts_to, stopline_hash)
return int(count / PAGE_LENGTH)
......@@ -9,6 +9,7 @@ from sakia.constants import MAX_CONFIRMATIONS
from sakia.data.processors import BlockchainProcessor
from .sql_adapter import TxHistorySqlAdapter
from sakia.data.repositories import TransactionsRepo, DividendsRepo
from sakia.data.entities.transaction import STOPLINE_HASH
class HistoryTableModel(QAbstractTableModel):
......@@ -89,7 +90,8 @@ class HistoryTableModel(QAbstractTableModel):
return self.sql_adapter.pages(self.app.currency,
self.connection.pubkey,
ts_from=self.ts_from,
ts_to=self.ts_to)
ts_to=self.ts_to,
stopline_hash=STOPLINE_HASH)
def pubkeys(self, row):
return self.transfers_data[row][HistoryTableModel.columns_types.index('pubkey')].split('\n')
......@@ -104,6 +106,7 @@ class HistoryTableModel(QAbstractTableModel):
page=self.current_page,
ts_from=self.ts_from,
ts_to=self.ts_to,
stopline_hash=STOPLINE_HASH,
sort_by=HistoryTableModel.columns_to_sql[self.main_column_id],
sort_order= "ASC" if Qt.AscendingOrder else "DESC")
......@@ -122,6 +125,29 @@ class HistoryTableModel(QAbstractTableModel):
self.transfers_data[i] = self.data_received(transfer)
self.dataChanged.emit(self.index(i, 0), self.index(i, len(HistoryTableModel.columns_types)))
def data_stopline(self, transfer):
"""
Converts a transaction to table data
:param sakia.data.entities.Transaction transfer: the transaction
:return: data as tuple
"""
block_number = transfer.written_block
senders = []
for issuer in transfer.issuers:
identity = self.identities_service.get_identity(issuer)
if identity:
senders.append(issuer + " (" + identity.uid + ")")
else:
senders.append(issuer)
date_ts = transfer.timestamp
txid = transfer.txid
return (date_ts, "", 0,
self.tr("Transactions missing from history"), transfer.state, txid,
transfer.issuers, block_number, transfer.sha_hash, transfer)
def data_received(self, transfer):
"""
Converts a transaction to table data
......@@ -199,7 +225,9 @@ class HistoryTableModel(QAbstractTableModel):
sha_hash=data[4])
if transfer.state != Transaction.DROPPED:
if data[2] < 0:
if data[4] == STOPLINE_HASH:
self.transfers_data.append(self.data_stopline(transfer))
elif data[2] < 0:
self.transfers_data.append(self.data_sent(transfer))
else:
self.transfers_data.append(self.data_received(transfer))
......@@ -240,6 +268,7 @@ class HistoryTableModel(QAbstractTableModel):
return QVariant()
source_data = self.transfers_data[row][col]
txhash_data = self.transfers_data[row][HistoryTableModel.columns_types.index('txhash')]
state_data = self.transfers_data[row][HistoryTableModel.columns_types.index('state')]
block_data = self.transfers_data[row][HistoryTableModel.columns_types.index('block_number')]
......@@ -252,6 +281,9 @@ class HistoryTableModel(QAbstractTableModel):
if col == HistoryTableModel.columns_types.index('pubkey'):
return "<p>" + source_data.replace('\n', "<br>") + "</p>"
if col == HistoryTableModel.columns_types.index('date'):
if txhash_data == STOPLINE_HASH:
return ""
else:
ts = self.blockchain_processor.adjusted_ts(self.connection.currency, source_data)
return QLocale.toString(
QLocale(),
......@@ -259,13 +291,20 @@ class HistoryTableModel(QAbstractTableModel):
QLocale.dateFormat(QLocale(), QLocale.ShortFormat)
) + " BAT"
if col == HistoryTableModel.columns_types.index('amount'):
if txhash_data == STOPLINE_HASH:
return ""
else:
amount = self.app.current_ref.instance(source_data, self.connection.currency,
self.app, block_data).diff_localized(False, False)
return amount
return source_data
if role == Qt.FontRole:
font = QFont()
if txhash_data == STOPLINE_HASH:
font.setItalic(True)
else:
if state_data == Transaction.AWAITING or \
(state_data == Transaction.VALIDATED and current_confirmations < MAX_CONFIRMATIONS):
font.setItalic(True)
......
......@@ -2,6 +2,7 @@ import asyncio
from PyQt5.QtCore import QObject
import math
import logging
from duniterpy.api import bma
from duniterpy.api.errors import DuniterError
from sakia.errors import NoPeerAvailable
......@@ -40,17 +41,6 @@ class BlockchainService(QObject):
def initialized(self):
return self._blockchain_processor.initialized(self.app.currency)
def handle_new_blocks(self, blocks):
self._blockchain_processor.handle_new_blocks(self.currency, blocks)
async def new_blocks(self, network_blockstamp):
with_identities = await self._blockchain_processor.new_blocks_with_identities(self.currency)
with_money = await self._blockchain_processor.new_blocks_with_money(self.currency)
block_numbers = with_identities + with_money
if network_blockstamp > self.current_buid():
block_numbers += [network_blockstamp.number]
return block_numbers
async def handle_blockchain_progress(self, network_blockstamp):
"""
Handle a new current block uid
......@@ -61,32 +51,33 @@ class BlockchainService(QObject):
try:
self._update_lock = True
self.app.refresh_started.emit()
block_numbers = await self.new_blocks(network_blockstamp)
while block_numbers:
start = self.current_buid().number
start = self._blockchain_processor.block_number_30days_ago(self.currency, network_blockstamp)
if self.current_buid().number > start:
start = self.current_buid().number + 1
else:
connections = self._connections_processor.connections_to(self.currency)
time_30days_ago = self._blockchain_processor.rounded_timestamp(self.currency, start)
self._transactions_service.insert_stopline(connections, start, time_30days_ago)
self._logger.debug("Parsing from {0}".format(start))
blocks = await self._blockchain_processor.next_blocks(start, block_numbers, self.currency)
if len(blocks) > 0:
connections = self._connections_processor.connections_to(self.currency)
identities = await self._identities_service.handle_new_blocks(blocks)
await self._identities_service.refresh()
changed_tx, new_tx, new_dividends = await self._transactions_service.handle_new_blocks(connections,
blocks)
destructions = await self._sources_service.refresh_sources(connections, new_tx, new_dividends)
self.handle_new_blocks(blocks)
start,
network_blockstamp.number)
await self._sources_service.refresh_sources(connections)
await self._blockchain_processor.handle_new_blocks(self.currency, network_blockstamp)
self.app.db.commit()
for tx in changed_tx:
self.app.transaction_state_changed.emit(tx)
for conn in new_tx:
for tx in new_tx[conn]:
self.app.new_transfer.emit(conn, tx)
for conn in destructions:
for tx in destructions[conn]:
self.app.new_transfer.emit(conn, tx)
for conn in new_dividends:
for ud in new_dividends[conn]:
self.app.new_dividend.emit(conn, ud)
self.app.new_blocks_handled.emit()
block_numbers = await self.new_blocks(network_blockstamp)
self.app.sources_refreshed.emit()
except (NoPeerAvailable, DuniterError) as e:
self._logger.debug(str(e))
......
......@@ -313,106 +313,6 @@ class IdentitiesService(QObject):
identities.append(idty)
return identities
def _parse_revocations(self, block):
"""
Parse revoked pubkeys found in a block and refresh local data
:param duniterpy.documents.Block block: the block received
:return: list of identities updated
"""
revoked = []
connections_identities = self._get_connections_identities()
for idty in connections_identities:
for rev in block.revoked:
if rev.pubkey == idty.pubkey:
idty.revoked_on = block.number
revoked.append(idty)
return revoked
def _parse_identities(self, block):
"""
Parse revoked pubkeys found in a block and refresh local data
:param duniterpy.documents.Block block: the block received
:return: list of identities updated
"""
identities = []
connections_identities = self._get_connections_identities()
for idty in connections_identities:
for idty_doc in block.identities:
if idty_doc.pubkey == idty.pubkey:
idty.written = True
identities.append(idty)
return identities
def _parse_memberships(self, block):
"""
Parse memberships pubkeys found in a block and refresh local data
:param duniterpy.documents.Block block: the block received
:return: list of pubkeys requiring a refresh of requirements
"""
need_refresh = []
connections_identities = self._get_connections_identities()
for ms in block.joiners + block.actives:
# we update every written identities known locally
for identity in connections_identities:
if ms.issuer == identity:
identity.membership_written_on = block.number
identity.membership_type = "IN"
identity.membership_buid = ms.membership_ts
identity.written = True
self._identities_processor.insert_or_update_identity(identity)
# If the identity was not member
# it can become one
if not identity.member:
need_refresh.append(identity)
for ms in block.leavers:
# we update every written identities known locally
for identity in connections_identities:
identity.membership_written_on = block.number
identity.membership_type = "OUT"
identity.membership_buid = ms.membership_ts
identity.written = True
self._identities_processor.insert_or_update_identity(identity)
# If the identity was a member
# it can stop to be one
if identity.member:
need_refresh.append(identity)
return need_refresh
async def _parse_certifications(self, block):
"""
Parse certified pubkeys found in a block and refresh local data
This method only creates certifications if one of both identities is
locally known as written.
This method returns the identities needing to be refreshed. These can only be
the identities which we already known as written before parsing this certification.
:param duniterpy.documents.Block block:
:return:
"""
connections_identities = self._get_connections_identities()
need_refresh = []
parameters = self._blockchain_processor.parameters(self.currency)
current_ts = self._blockchain_processor.time(self.currency)
for identity in connections_identities:
if self._certs_processor.drop_expired(identity, sig_validity=parameters.sig_validity,
sig_window=parameters.sig_window,
current_ts=current_ts):
need_refresh.append(identity)
for cert in block.certifications:
# if we have are a target or a source of the certification
if cert.pubkey_from == identity.pubkey or cert.pubkey_to in identity.pubkey:
identity.written = True
timestamp = self._blockchain_processor.rounded_timestamp(self.currency, cert.timestamp.number)
self._certs_processor.create_or_update_certification(self.currency, cert, timestamp, block.blockUID)
need_refresh.append(identity)
return need_refresh
async def load_requirements(self, identity):
"""
Refresh a given identity information
......@@ -427,7 +327,7 @@ class IdentitiesService(QObject):
if not identity.blockstamp or identity.blockstamp == block_uid(identity_data["meta"]["timestamp"]):
identity.uid = identity_data["uid"]
identity.blockstamp = block_uid(identity_data["meta"]["timestamp"])
identity.timestamp = await self._blockchain_processor.timestamp(self.currency, identity.blockstamp.number)
identity.timestamp = self._blockchain_processor.rounded_timestamp(self.currency, identity.blockstamp.number)
identity.outdistanced = identity_data["outdistanced"]
identity.written = identity_data["wasMember"]
identity.sentry = identity_data["isSentry"]
......@@ -447,28 +347,12 @@ class IdentitiesService(QObject):
self._logger.debug(str(e))
return identity
async def parse_block(self, block):
"""
Parse a block to refresh local data
:param block:
:return:
"""
self._parse_revocations(block)
need_refresh = []
need_refresh += self._parse_identities(block)
need_refresh += self._parse_memberships(block)
need_refresh += await self._parse_certifications(block)
need_refresh += self._parse_median_time(block)
return set(need_refresh)
async def handle_new_blocks(self, blocks):
async def refresh(self):
"""
Handle new block received and refresh local data
:param duniterpy.documents.Block block: the received block
"""
need_refresh = []
for block in blocks:
need_refresh += await self.parse_block(block)
need_refresh = self._get_connections_identities()
refresh_futures = []
# for every identity for which we need a refresh, we gather
# requirements requests
......
......@@ -5,6 +5,7 @@ import random
from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, Qt
from duniterpy.api import errors
from duniterpy.documents import MalformedDocumentError
from duniterpy.documents.ws2p.heads import *
from duniterpy.documents.peer import BMAEndpoint
from duniterpy.key import VerifyingKey
......
......@@ -135,59 +135,31 @@ class SourcesServices(QObject):
self._transactions_processor.commit(destruction)
return destruction
async def refresh_sources_of_pubkey(self, pubkey, transactions, dividends, unit_base, log_stream=None,
progress=None):
async def refresh_sources_of_pubkey(self, pubkey):
"""
Refresh the sources for a given pubkey
:param str pubkey:
:param list[sakia.data.entities.Transaction] transactions:
:param list[sakia.data.entities.Dividend] dividends:
:param int unit_base: the unit base of the destruction. None to look for the past uds
:return: the destruction of sources
"""
tx_ud_data = {}
for tx in transactions:
try:
tx_ud_data[tx.written_block].append(tx)
except:
tx_ud_data[tx.written_block] = [tx]
for ud in dividends:
sources_data = await self._bma_connector.get(self.currency, bma.tx.sources, req_args={'pubkey': pubkey})
self._sources_processor.drop_all_of(self.currency, pubkey)
for i, s in enumerate(sources_data["sources"]):
conditions = pypeg2.parse(s["conditions"], Condition)
if conditions.left.pubkey == pubkey:
try:
tx_ud_data[ud.block_number].append(ud)
except:
tx_ud_data[ud.block_number] = [ud]
blocks = sorted(b for b in tx_ud_data.keys())
nb_tx_ud = len(transactions) + len(dividends)
udblocks = await self._bma_connector.get(self.currency, bma.blockchain.ud)
destructions = []
for block_number in blocks:
if log_stream:
log_stream("Parsing info ud/tx of {:}".format(block_number))
if progress:
progress(1/nb_tx_ud)
for data in tx_ud_data[block_number]:
if isinstance(data, Dividend):
self._parse_ud(pubkey, data)
if isinstance(data, Transaction):
self.parse_transaction_outputs(pubkey, data)
for data in tx_ud_data[block_number]:
if isinstance(data, Transaction):
self.parse_transaction_inputs(pubkey, data)
if not unit_base:
_, destruction_base = await self._blockchain_processor.ud_before(self.currency, block_number, udblocks)
else:
destruction_base = unit_base
destruction = await self.check_destruction(pubkey, block_number, destruction_base)
if destruction:
destructions.append(destruction)
return destructions
if conditions.left.pubkey == pubkey:
source = Source(currency=self.currency,
pubkey=pubkey,
identifier=s["identifier"],
type=s["type"],
noffset=s["noffset"],
amount=s["amount"],
base=s["base"])
self._sources_processor.insert(source)
except AttributeError as e:
self._logger.error(str(e))
async def refresh_sources(self, connections, transactions, dividends):
async def refresh_sources(self, connections):
"""
:param list[sakia.data.entities.Connection] connections:
......@@ -195,18 +167,13 @@ class SourcesServices(QObject):
:param dict[sakia.data.entities.Dividend] dividends:
:return: the destruction of sources
"""
destructions = {}
for conn in connections:
destructions[conn] = []
_, current_base = self._blockchain_processor.last_ud(self.currency)
# there can be bugs if the current base switch during the parsing of blocks
# but since it only happens every 23 years and that its only on accounts having less than 100
# this is acceptable I guess
destructions[conn] += await self.refresh_sources_of_pubkey(conn.pubkey, transactions[conn],
dividends[conn], current_base)
return destructions
await self.refresh_sources_of_pubkey(conn.pubkey)
def restore_sources(self, pubkey, tx):
"""
......
from PyQt5.QtCore import QObject
from sakia.data.entities.transaction import parse_transaction_doc, Transaction
from sakia.data.entities.transaction import parse_transaction_doc, Transaction, build_stopline
from duniterpy.documents import Transaction as TransactionDoc
from duniterpy.documents import SimpleTransaction, Block
from sakia.data.entities import Dividend
......@@ -51,66 +51,62 @@ class TransactionsService(QObject):
else:
logging.debug("Error during transfer parsing")
def _parse_block(self, connections, block_doc, txid):
def insert_stopline(self, connections, block_number, time):
for conn in connections:
self._transactions_processor.commit(build_stopline(conn.currency, conn.pubkey, block_number, time))
async def handle_new_blocks(self, connections, start, end):
"""
Parse a block
:param duniterpy.documents.Block block_doc: The block
:param int txid: Latest tx id
:return: The list of transfers sent
Refresh last transactions
:param list[duniterpy.documents.Block] blocks: The blocks containing data to parse
"""
self._logger.debug("Refresh transactions")
transfers_changed, new_transfers = await self.parse_transactions_history(connections, start, end)
new_dividends = await self.parse_dividends_history(connections, start, end, new_transfers)
return transfers_changed, new_transfers, new_dividends
async def parse_transactions_history(self, connections, start, end):
"""
Request transactions from the network to initialize data for a given pubkey
:param List[sakia.data.entities.Connection] connections: the list of connections found by tx parsing
:param int start: the first block
:param int end: the last block
"""
transfers_changed = []
new_transfers = {}
for connection in connections:
txid = 0
new_transfers[connection] = []
history_data = await self._bma_connector.get(self.currency, bma.tx.blocks,
req_args={'pubkey': connection.pubkey,
'start': start,
'end': end})
for tx_data in history_data["history"]["sent"]:
for tx in [t for t in self._transactions_processor.awaiting(self.currency)]:
if self._transactions_processor.run_state_transitions(tx, block_doc):
if self._transactions_processor.run_state_transitions(tx, tx_data["hash"], tx_data["block_number"]):
transfers_changed.append(tx)
self._logger.debug("New transaction validated : {0}".format(tx.sha_hash))
for conn in connections:
new_transactions = [t for t in block_doc.transactions
if not self._transactions_processor.find_by_hash(conn.pubkey, t.sha_hash)
and SimpleTransaction.is_simple(t)]
new_transfers[conn] = []
for (i, tx_doc) in enumerate(new_transactions):
tx = parse_transaction_doc(tx_doc, conn.pubkey, block_doc.blockUID.number, block_doc.mediantime, txid+i)
for tx_data in history_data["history"]["received"]:
tx_doc = TransactionDoc.from_bma_history(history_data["currency"], tx_data)
if not self._transactions_processor.find_by_hash(connection.pubkey, tx_doc.sha_hash) \
and SimpleTransaction.is_simple(tx_doc):
tx = parse_transaction_doc(tx_doc, connection.pubkey, tx_data["block_number"],
tx_data["time"], txid)
if tx:
new_transfers[conn].append(tx)
new_transfers[connection].append(tx)
self._transactions_processor.commit(tx)
else:
logging.debug("Error during transfer parsing")
return transfers_changed, new_transfers
async def handle_new_blocks(self, connections, blocks):
"""
Refresh last transactions
:param list[duniterpy.documents.Block] blocks: The blocks containing data to parse
"""
self._logger.debug("Refresh transactions")
transfers_changed = []
new_transfers = {}
txid = 0
for block in blocks:
changes, new_tx = self._parse_block(connections, block, txid)
txid += len(new_tx)
transfers_changed += changes
for conn in new_tx:
try:
new_transfers[conn] += new_tx[conn]
except KeyError:
new_transfers[conn] = new_tx[conn]
new_dividends = await self.parse_dividends_history(connections, blocks, new_transfers)
return transfers_changed, new_transfers, new_dividends
async def parse_dividends_history(self, connections, blocks, transactions):
async def parse_dividends_history(self, connections, start, end, transactions):
"""
Request transactions from the network to initialize data for a given pubkey
:param List[sakia.data.entities.Connection] connections: the list of connections found by tx parsing
:param List[duniterpy.documents.Block] blocks: the list of transactions found by tx parsing
:param List[sakia.data.entities.Transaction] transactions: the list of transactions found by tx parsing
"""
min_block_number = blocks[0].number
max_block_number = blocks[-1].number
dividends = {}
for connection in connections:
dividends[connection] = []
......@@ -124,7 +120,7 @@ class TransactionsService(QObject):
timestamp=ud_data["time"],
amount=ud_data["amount"],
base=ud_data["base"])
if max_block_number >= dividend.block_number >= min_block_number:
if start <= dividend.block_number <= end:
self._logger.debug("Dividend of block {0}".format(dividend.block_number))
block_numbers.append(dividend.block_number)
if self._dividends_processor.commit(dividend):
......@@ -135,10 +131,6 @@ class TransactionsService(QObject):
for input in txdoc.inputs:
# For each dividends inputs, if it is consumed (not present in ud history)
if input.source == "D" and input.origin_id == connection.pubkey and input.index not in block_numbers:
try:
# we try to get the block of the dividend
block = next((b for b in blocks if b.number == input.index))
except StopIteration:
block_data = await self._bma_connector.get(self.currency, bma.blockchain.block,
req_args={'number': input.index})
block = Block.from_signed_raw(block_data["raw"] + block_data["signature"] + "\n")
......
......@@ -13,8 +13,7 @@ async def test_send_more_than_40_sources(application_with_one_connection, fake_s
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
for conn in new_tx:
await application_with_one_connection.sources_service.refresh_sources_of_pubkey(bob.key.pubkey, new_tx[conn], new_ud[conn], None)
await application_with_one_connection.sources_service.refresh_sources_of_pubkey(bob.key.pubkey)
amount_before_send = application_with_one_connection.sources_service.amount(bob.key.pubkey)
bob_connection = application_with_one_connection.db.connections_repo.get_one(pubkey=bob.key.pubkey)
......
......@@ -13,9 +13,8 @@ async def test_receive_source(application_with_one_connection, fake_server_with_
fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud)
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections)
assert amount + 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey)
await fake_server_with_blockchain.close()
......@@ -30,9 +29,8 @@ async def test_send_source(application_with_one_connection, fake_server_with_blo
fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud)
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections)
assert amount - 150 == application_with_one_connection.sources_service.amount(bob.key.pubkey)
await fake_server_with_blockchain.close()
......@@ -47,9 +45,9 @@ async def test_destruction(application_with_one_connection, fake_server_with_blo
fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
changed_tx, new_tx, new_ud = await application_with_one_connection.transactions_service.handle_new_blocks(connections,
await application_with_one_connection.transactions_service.handle_new_blocks(connections,
new_blocks)
await application_with_one_connection.sources_service.refresh_sources(connections, new_tx, new_ud)
await application_with_one_connection.sources_service.refresh_sources(connections)
assert 0 == application_with_one_connection.sources_service.amount(bob.key.pubkey)
tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
assert "Too low balance" in [t.comment for t in tx_after_parse]
......
......@@ -15,12 +15,13 @@ async def test_send_tx_then_validate(application_with_one_connection, fake_serve
assert len(tx_before_send) + 1 == len(tx_after_send)
assert tx_after_send[-1].state is Transaction.AWAITING
assert tx_after_send[-1].written_block == 0
start = fake_server_with_blockchain.forge.blocks[-1].number
fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
end = fake_server_with_blockchain.forge.blocks[-1].number
connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
await application_with_one_connection.transactions_service.handle_new_blocks(connections, start, end)
tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
assert tx_after_parse[-1].state is Transaction.VALIDATED
assert tx_after_parse[-1].written_block == fake_server_with_blockchain.forge.blocks[-3].number
......@@ -32,12 +33,13 @@ async def test_receive_tx(application_with_one_connection, fake_server_with_bloc
tx_before_send = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
fake_server_with_blockchain.forge.push(alice.send_money(10, fake_server_with_blockchain.forge.user_identities[alice.key.pubkey].sources, bob,
fake_server_with_blockchain.forge.blocks[-1].blockUID, "Test receive"))
start = fake_server_with_blockchain.forge.blocks[-1].number
fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-3:]
end = fake_server_with_blockchain.forge.blocks[-1].number
connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
await application_with_one_connection.transactions_service.handle_new_blocks(connections, start, end)
tx_after_parse = application_with_one_connection.transactions_service.transfers(bob.key.pubkey)
assert tx_after_parse[-1].state is Transaction.VALIDATED
assert len(tx_before_send) + 1 == len(tx_after_parse)
......@@ -47,6 +49,7 @@ async def test_receive_tx(application_with_one_connection, fake_server_with_bloc
@pytest.mark.asyncio
async def test_issue_dividend(application_with_one_connection, fake_server_with_blockchain, bob):
dividends_before_send = application_with_one_connection.transactions_service.dividends(bob.key.pubkey)
start = fake_server_with_blockchain.forge.blocks[-1].number + 1
fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.generate_dividend()
fake_server_with_blockchain.forge.forge_block()
......@@ -54,9 +57,9 @@ async def test_issue_dividend(application_with_one_connection, fake_server_with_
fake_server_with_blockchain.forge.generate_dividend()
fake_server_with_blockchain.forge.forge_block()
fake_server_with_blockchain.forge.forge_block()
new_blocks = fake_server_with_blockchain.forge.blocks[-5:]
end = fake_server_with_blockchain.forge.blocks[-1].number
connections = ConnectionsProcessor.instanciate(application_with_one_connection).connections()
await application_with_one_connection.transactions_service.handle_new_blocks(connections, new_blocks)
await application_with_one_connection.transactions_service.handle_new_blocks(connections, start, end)
dividends_after_parse = application_with_one_connection.transactions_service.dividends(bob.key.pubkey)
assert len(dividends_before_send) + 2 == len(dividends_after_parse)
await fake_server_with_blockchain.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment