From 744c66c6d6f70290a6c444021dc3e3e6610e474a Mon Sep 17 00:00:00 2001
From: inso <inso@tuta.io>
Date: Tue, 24 Jul 2018 20:32:35 +0200
Subject: [PATCH] Speed up Sakia by computing average timestamps
---
src/sakia/data/entities/transaction.py | 4 ++-
src/sakia/data/processors/blockchain.py | 9 +++++
src/sakia/data/processors/dividends.py | 36 +++++++++----------
src/sakia/data/processors/transactions.py | 15 +++++---
.../gui/dialogs/connection_cfg/controller.py | 3 +-
src/sakia/gui/dialogs/connection_cfg/model.py | 2 +-
.../gui/navigation/graphs/wot/controller.py | 1 -
src/sakia/services/blockchain.py | 2 --
src/sakia/services/identities.py | 20 ++++-------
src/sakia/services/network.py | 23 ++++++------
10 files changed, 63 insertions(+), 52 deletions(-)
diff --git a/src/sakia/data/entities/transaction.py b/src/sakia/data/entities/transaction.py
index 332ae3ae..d6fa67b1 100644
--- a/src/sakia/data/entities/transaction.py
+++ b/src/sakia/data/entities/transaction.py
@@ -1,4 +1,5 @@
import attr
+import hashlib
from duniterpy.documents import block_uid
from duniterpy.documents import Transaction as TransactionDoc
from duniterpy.documents.transaction import reduce_base
@@ -6,6 +7,8 @@ from sakia.helpers import attrs_tuple_of_str
import math
+STOPLINE_HASH = hashlib.sha256("Stopline").hexdigest()
+
def parse_transaction_doc(tx_doc, pubkey, block_number, mediantime, txid):
"""
Parse a transaction
@@ -70,7 +73,6 @@ def parse_transaction_doc(tx_doc, pubkey, block_number, mediantime, txid):
raw=tx_doc.signed_raw())
return transaction
-
@attr.s(hash=True)
class Transaction:
"""
diff --git a/src/sakia/data/processors/blockchain.py b/src/sakia/data/processors/blockchain.py
index 6660229d..4df38e27 100644
--- a/src/sakia/data/processors/blockchain.py
+++ b/src/sakia/data/processors/blockchain.py
@@ -73,6 +73,15 @@ class BlockchainProcessor:
raise
return 0
+ def rounded_timestamp(self, currency, block_number):
+ parameters = self.parameters(currency)
+ current_time = self.time(currency)
+ current_block = self.current_buid(currency)
+ diff_blocks = block_number - current_block.number
+ diff_time = diff_blocks * parameters.avg_gen_time
+ return current_time + diff_time
+
+
def current_buid(self, currency):
"""
Get the local current blockuid
diff --git a/src/sakia/data/processors/dividends.py b/src/sakia/data/processors/dividends.py
index 908a5a5b..fbbc35f3 100644
--- a/src/sakia/data/processors/dividends.py
+++ b/src/sakia/data/processors/dividends.py
@@ -47,32 +47,33 @@ class DividendsProcessor:
:param function progress: progress callback
"""
blockchain = self._blockchain_repo.get_one(currency=connection.currency)
+ avg_blocks_per_month = int(30 * 24 * 3600 / blockchain.parameters.avg_gen_time)
+ start = blockchain.current_buid.number - avg_blocks_per_month
history_data = await self._bma_connector.get(connection.currency, bma.ud.history,
req_args={'pubkey': connection.pubkey})
- log_stream("Found {0} available dividends".format(len(history_data["history"]["history"])))
block_numbers = []
dividends = []
- nb_ud_tx = len(history_data["history"]["history"]) + len(transactions)
for ud_data in history_data["history"]["history"]:
- dividend = Dividend(currency=connection.currency,
- pubkey=connection.pubkey,
- block_number=ud_data["block_number"],
- timestamp=ud_data["time"],
- amount=ud_data["amount"],
- base=ud_data["base"])
- log_stream("Dividend of block {0}".format(dividend.block_number))
- progress(1/nb_ud_tx)
- block_numbers.append(dividend.block_number)
- try:
- dividends.append(dividend)
- self._repo.insert(dividend)
- except sqlite3.IntegrityError:
- log_stream("Dividend already registered in database")
+ if ud_data["block_number"] > start:
+ dividend = Dividend(currency=connection.currency,
+ pubkey=connection.pubkey,
+ block_number=ud_data["block_number"],
+ timestamp=ud_data["time"],
+ amount=ud_data["amount"],
+ base=ud_data["base"])
+ log_stream("Dividend of block {0}".format(dividend.block_number))
+ block_numbers.append(dividend.block_number)
+ try:
+ dividends.append(dividend)
+ self._repo.insert(dividend)
+ except sqlite3.IntegrityError:
+ log_stream("Dividend already registered in database")
for tx in transactions:
txdoc = Transaction.from_signed_raw(tx.raw)
for input in txdoc.inputs:
- if input.source == "D" and input.origin_id == connection.pubkey and input.index not in block_numbers:
+ if input.source == "D" and input.origin_id == connection.pubkey \
+ and input.index not in block_numbers and input.index > start:
diff_blocks = blockchain.current_buid.number - input.index
ud_mediantime = blockchain.median_time - diff_blocks*blockchain.parameters.avg_gen_time
dividend = Dividend(currency=connection.currency,
@@ -82,7 +83,6 @@ class DividendsProcessor:
amount=input.amount,
base=input.base)
log_stream("Dividend of block {0}".format(dividend.block_number))
- progress(1/nb_ud_tx)
try:
dividends.append(dividend)
self._repo.insert(dividend)
diff --git a/src/sakia/data/processors/transactions.py b/src/sakia/data/processors/transactions.py
index 821b0936..f131aad8 100644
--- a/src/sakia/data/processors/transactions.py
+++ b/src/sakia/data/processors/transactions.py
@@ -13,7 +13,8 @@ from duniterpy.documents import Transaction as TransactionDoc
@attr.s
class TransactionsProcessor:
- _repo = attr.ib() # :type sakia.data.repositories.SourcesRepo
+ _repo = attr.ib() # :type sakia.data.repositories.TransactionsRepo
+ _blockchain_repo = attr.ib()
_bma_connector = attr.ib() # :type sakia.data.connectors.bma.BmaConnector
_table_states = attr.ib(default=attr.Factory(dict))
_logger = attr.ib(default=attr.Factory(lambda: logging.getLogger('sakia')))
@@ -24,7 +25,7 @@ class TransactionsProcessor:
Instanciate a blockchain processor
:param sakia.app.Application app: the app
"""
- return cls(app.db.transactions_repo,
+ return cls(app.db.transactions_repo, app.db.blockchains_repo,
BmaConnector(NodesProcessor(app.db.nodes_repo), app.parameters))
def next_txid(self, currency, block_number):
@@ -132,8 +133,14 @@ class TransactionsProcessor:
:param function log_stream:
:param function progress: progress callback
"""
- history_data = await self._bma_connector.get(connection.currency, bma.tx.history,
- req_args={'pubkey': connection.pubkey})
+ blockchain = self._blockchain_repo.get_one(currency=connection.currency)
+ avg_blocks_per_month = int(30 * 24 * 3600 / blockchain.parameters.avg_gen_time)
+ start = blockchain.current_buid.number - avg_blocks_per_month
+ end = blockchain.current_buid.number
+ history_data = await self._bma_connector.get(connection.currency, bma.tx.blocks,
+ req_args={'pubkey': connection.pubkey,
+ 'start': start,
+ 'end': end})
txid = 0
nb_tx = len(history_data["history"]["sent"]) + len(history_data["history"]["received"])
log_stream("Found {0} transactions".format(nb_tx))
diff --git a/src/sakia/gui/dialogs/connection_cfg/controller.py b/src/sakia/gui/dialogs/connection_cfg/controller.py
index f5bb0b89..37711009 100644
--- a/src/sakia/gui/dialogs/connection_cfg/controller.py
+++ b/src/sakia/gui/dialogs/connection_cfg/controller.py
@@ -212,8 +212,7 @@ class ConnectionConfigController(QObject):
progress=self.view.progress)
self.view.set_step(5)
- await self.model.initialize_sources(transactions, dividends,
- log_stream=self.view.stream_log,
+ await self.model.initialize_sources(log_stream=self.view.stream_log,
progress=self.view.progress)
self.view.set_step(6)
diff --git a/src/sakia/gui/dialogs/connection_cfg/model.py b/src/sakia/gui/dialogs/connection_cfg/model.py
index 95f320c7..081e584c 100644
--- a/src/sakia/gui/dialogs/connection_cfg/model.py
+++ b/src/sakia/gui/dialogs/connection_cfg/model.py
@@ -73,7 +73,7 @@ class ConnectionConfigModel(QObject):
blockchain_processor = BlockchainProcessor.instanciate(self.app)
await blockchain_processor.initialize_blockchain(self.app.currency)
- async def initialize_sources(self, transactions, dividends, log_stream, progress):
+ async def initialize_sources(self, log_stream, progress):
"""
Download sources information locally
:param function log_stream: a method to log data in the screen
diff --git a/src/sakia/gui/navigation/graphs/wot/controller.py b/src/sakia/gui/navigation/graphs/wot/controller.py
index 362f8f38..7634a95f 100644
--- a/src/sakia/gui/navigation/graphs/wot/controller.py
+++ b/src/sakia/gui/navigation/graphs/wot/controller.py
@@ -21,7 +21,6 @@ class WotController(BaseGraphController):
"""
super().__init__(parent, view, model, password_asker)
self.set_scene(view.scene())
- self.reset()
@classmethod
def create(cls, parent, app, blockchain_service, identities_service):
diff --git a/src/sakia/services/blockchain.py b/src/sakia/services/blockchain.py
index 3b2345d4..db95eaf3 100644
--- a/src/sakia/services/blockchain.py
+++ b/src/sakia/services/blockchain.py
@@ -85,8 +85,6 @@ class BlockchainService(QObject):
for conn in new_dividends:
for ud in new_dividends[conn]:
self.app.new_dividend.emit(conn, ud)
- for idty in identities:
- self.app.identity_changed.emit(idty)
self.app.new_blocks_handled.emit()
block_numbers = await self.new_blocks(network_blockstamp)
self.app.sources_refreshed.emit()
diff --git a/src/sakia/services/identities.py b/src/sakia/services/identities.py
index bc9b51f3..43f1808e 100644
--- a/src/sakia/services/identities.py
+++ b/src/sakia/services/identities.py
@@ -141,7 +141,7 @@ class IdentitiesService(QObject):
uid=other_data["uids"][0],
member=other_data["isMember"])
if cert not in certifiers:
- cert.timestamp = await self._blockchain_processor.timestamp(self.currency,
+ cert.timestamp = self._blockchain_processor.rounded_timestamp(self.currency,
cert.block)
certifiers[cert] = certifier
# We save connections pubkeys
@@ -162,7 +162,7 @@ class IdentitiesService(QObject):
certified[cert] = certified_idty
# We save connections pubkeys
if self.is_identity_of_connection(identity):
- cert.timestamp = await self._blockchain_processor.timestamp(self.currency,
+ cert.timestamp = self._blockchain_processor.rounded_timestamp(self.currency,
cert.block)
self._certs_processor.insert_or_update_certification(cert)
except errors.DuniterError as e:
@@ -282,11 +282,8 @@ class IdentitiesService(QObject):
i += 1
if progress:
progress(1/nb_certs)
- certifier = self.get_identity(cert.certifier)
- if not certifier:
- certifier = await self.find_from_pubkey(cert.certifier)
- certifier = await self.load_requirements(certifier)
- identities.append(certifier)
+ if not self.get_identity(cert.certifier):
+ identities.append(certifiers[cert])
for cert in certified:
if log_stream:
@@ -294,11 +291,8 @@ class IdentitiesService(QObject):
i += 1
if progress:
progress(1/nb_certs)
- certified = self.get_identity(cert.certified)
- if not certified:
- certified = await self.find_from_pubkey(cert.certified)
- certified = await self.load_requirements(certified)
- identities.append(certified)
+ if not self.get_identity(cert.certified):
+ identities.append(certified[cert])
if log_stream:
log_stream("Commiting identities...")
for idty in identities:
@@ -413,7 +407,7 @@ class IdentitiesService(QObject):
# if we have are a target or a source of the certification
if cert.pubkey_from == identity.pubkey or cert.pubkey_to in identity.pubkey:
identity.written = True
- timestamp = await self._blockchain_processor.timestamp(self.currency, cert.timestamp.number)
+ timestamp = self._blockchain_processor.rounded_timestamp(self.currency, cert.timestamp.number)
self._certs_processor.create_or_update_certification(self.currency, cert, timestamp, block.blockUID)
need_refresh.append(identity)
diff --git a/src/sakia/services/network.py b/src/sakia/services/network.py
index 60d8ded2..4c097a98 100644
--- a/src/sakia/services/network.py
+++ b/src/sakia/services/network.py
@@ -264,17 +264,20 @@ class NetworkService(QObject):
else:
if r:
for head_data in r["heads"]:
- if "messageV2" in head_data:
- head, _ = HeadV2.from_inline(head_data["messageV2"], head_data["sigV2"])
- else:
- head, _ = HeadV1.from_inline(head_data["message"], head_data["sig"])
-
- VerifyingKey(head.pubkey).verify_ws2p_head(head)
- if head.pubkey in ws2p_heads:
- if ws2p_heads[head.pubkey].blockstamp < head.blockstamp:
+ try:
+ if "messageV2" in head_data:
+ head, _ = HeadV2.from_inline(head_data["messageV2"], head_data["sigV2"])
+ else:
+ head, _ = HeadV1.from_inline(head_data["message"], head_data["sig"])
+
+ VerifyingKey(head.pubkey).verify_ws2p_head(head)
+ if head.pubkey in ws2p_heads:
+ if ws2p_heads[head.pubkey].blockstamp < head.blockstamp:
+ ws2p_heads[head.pubkey] = head
+ else:
ws2p_heads[head.pubkey] = head
- else:
- ws2p_heads[head.pubkey] = head
+ except MalformedDocumentError as e:
+ self._logger.error(str(e))
for head in ws2p_heads.values():
node, updated = self._processor.update_ws2p(self.currency, head)
--
GitLab