Select Git revision
functions.py 11.26 KiB
import base58
import json
import math
from adapters.duniter_v18.blocks import LevelDBBlocksRepository
from adapters.duniter_v18.certifications import LevelDBCertificationsRepository
from adapters.duniter_v18.identities import LevelDBIdentitiesRepository
from adapters.duniter_v18.memberships import LevelDBMembershipsRepository
from adapters.duniter_v18.wallets import LevelDBWalletsRepository
from adapters.duniter_v18.ud_value import LevelDBUDValueRepository
from lib.utility import load_json
from lib.utility_param import *
# Constant to estimate cert interval
CERT_PERIOD = b_days(5) # 5 days
# when iterating on blocks, log current block every NOTIF_INTERVAL
NOTIF_INTERVAL = 100000
def get_wallets_data(leveldb_path: str) -> tuple:
"""
Get wallets data,
return a tuple with wallets, total_money, ignored_money
:param leveldb_path: LevelDB folder path
:return:
"""
# Get wallets balances data
wallets_repository = LevelDBWalletsRepository(leveldb_path)
wallets = {}
total_money = 0 # counter
ignored_money = 0 # counter
for unlock_expression, wallet in wallets_repository:
balance = wallet["balance"]
if "&&" in unlock_expression:
print(f"⚠️ wallet {unlock_expression} ignored (balance {balance})")
ignored_money += balance
continue
pubkey = unlock_expression.split("(")[1].split(")")[0]
if balance == 0:
continue
wallets.update({pubkey: int(balance)})
total_money += balance
return wallets, total_money, ignored_money
def get_identities_and_wallets(start_timestamp: int, leveldb_path: str) -> tuple:
"""
Get identities with certifications and wallets with their balance
start_timestamp is the timestamp of the v2 genesis
used to estimate cert expiration in number of blocks
:param start_timestamp: Import start timestamp
:param leveldb_path: LevelDB folder path
:return:
"""
# initialize
identity_names = {}
identities = {}
treasury = 0
# Get last block info
ud_value_repository = LevelDBUDValueRepository(leveldb_path)
last_block = ud_value_repository.get_last()
initial_monetary_mass = last_block["mass"]
last_block_time = last_block["medianTime"]
print(" parse Identities...")
# Get leveldb indices as Dex data
memberships_repository = LevelDBMembershipsRepository(leveldb_path)
identities_repository = LevelDBIdentitiesRepository(leveldb_path)
certifications_repository = LevelDBCertificationsRepository(leveldb_path)
blocks_repository = LevelDBBlocksRepository(leveldb_path)
# Get wallets data
print(" parse Wallets...")
(wallets, total_money, ignored_money) = get_wallets_data(leveldb_path)
# add ignored money to treasury and check initial monetary mass
treasury += ignored_money # add ignored money to treasury
wallet_sum = total_money + ignored_money
missing_money = initial_monetary_mass - wallet_sum
if missing_money != 0:
print(
f"⚠️ initial monetary mass {initial_monetary_mass:,} does not equal wallet sum {wallet_sum:,}"
)
print(f"money on the wallets: {total_money:,}")
print(f"money from ignored sources: {ignored_money:,}")
print(f"missing money (added to treasury): {missing_money:,}")
# add missing money to treasury
treasury += missing_money
# FIXME get real treasury address
# wallets["5EYCAe5ijiYfyeZ2JJCGq56LmPyNRAKzpG4QkoQkkQNB5e6Z"] = treasury
# TODO make sure that index respects order of arrival
# Get identities names by pubkey
for pubkey, identity in identities_repository:
index = identity["wotb_id"] + 1
uid = identity["uid"]
is_member = identity["member"]
identity_names[pubkey] = uid
mindex_entry = memberships_repository.get(pubkey)
membership_expire_on = mindex_entry["expires_on"]
# add address and balance to identity
if pubkey not in wallets:
balance = 0
else:
balance = wallets[pubkey]
# remove identity from wallet
# (only let simple wallets)
del wallets[pubkey]
# fill in identity entry
identities[uid] = {
"index": index,
"owner_pubkey": pubkey,
"balance": balance,
"revoked": mindex_entry["revoked_on"] is not None,
"membership_expire_on": membership_expire_on if is_member else 0,
"membership_revokes_on": mindex_entry["revokes_on"],
"next_cert_issuable_on": 0, # initialized to zero, modified later
"certs_received": {},
}
# Generate identities Ğ1v2 genesis json bloc
# certs are stored per issuer in input file
# certs are stored per receiver in output file
print(" parse certifications...")
# get certifications updated to their last state
for i_pubkey, issuer in certifications_repository:
i_uid = identity_names[i_pubkey]
# get certifications updated to their last state
for cert in issuer["issued"]:
# if certification expired, skip silently
if cert["expired_on"] != 0:
continue
r_pubkey = cert["receiver"]
r_uid = identity_names[r_pubkey]
# get expiration of certification
# timestamp of cert creation
created_at = blocks_repository.get(cert["created_on"])["medianTime"]
# block of next issuable cert
next_issuable_on = created_at + CERT_PERIOD
# timestamp of cert expiration
cert_expire_at = cert["expires_on"]
cert_expire_on = cert_expire_at
if next_issuable_on > identities[i_uid]["next_cert_issuable_on"]:
identities[i_uid]["next_cert_issuable_on"] = next_issuable_on
# add received certification to identity
identities[r_uid]["certs_received"][i_uid] = cert_expire_on
return identities, wallets
def get_blocks(leveldb_path: str) -> list:
"""
Get blocks,
return a list of blocks
"""
# Get wallets balances data
blocks_repo = LevelDBBlocksRepository(leveldb_path)
blocks = []
for num, block in blocks_repo:
if num % NOTIF_INTERVAL == 0:
print(num)
noEvent = (
True
and not block.get("certifications")
and not block.get("transactions")
and not block.get("joiners") # TODO membership events
and not block.get("leavers") # TODO membership events
and not block.get("revoked") # TODO membership events
and not block.get("actives") # TODO membership events
and not block.get("excluded") # TODO membership events
)
sample = {
"height": block.get("number"),
"timestamp": block.get("medianTime"),
"hash": block.get("hash"),
"parentHash": block.get("previousHash"),
"validator": block.get("issuer"),
"version": block.get("version"),
"hasEvent": not noEvent,
}
blocks.append(sample)
return blocks
def get_tx(leveldb_path: str) -> list:
"""
Get tx,
return a list of tx
"""
# Get wallets balances data
blocks_repo = LevelDBBlocksRepository(leveldb_path)
txs = []
for num, block in blocks_repo:
if num % NOTIF_INTERVAL == 0:
print(num)
for tx in block.get("transactions"):
outputs = tx["outputs"]
issuers = tx["issuers"]
comment = tx["comment"]
timestamp = block["medianTime"]
issuers_count = len(issuers)
# loop on issuers. If multiple issuers, approximate amount
for issuer in issuers:
# loop on outputs
for output in outputs:
outputparts = output.split(":")
amount = int(outputparts[0])
receiver = outputparts[2]
if issuers_count > 1:
amount = math.floor(amount / issuers_count) # approximation
# ignore non trivial unlock sources
# https://git.duniter.org/tools/py-g1-migrator/-/issues/3
if "&&" in receiver or "||" in receiver:
print(num)
print("ignoring " + receiver)
continue
receiver = receiver.split("SIG(")[1].split(")")[0]
sample = {
"blockNumber": num,
"timestamp": timestamp,
"from": issuer,
"to": receiver,
"amount": amount,
"comment": comment,
}
# do not include outputs that go back to sender
if sample["from"] != sample["to"]:
txs.append(sample)
return txs
def get_cert(leveldb_path: str) -> list:
"""
Get cert,
return a list of cert, but does not tell if it is a new cert or a renewal
"""
# Get wallets balances data
blocks_repo = LevelDBBlocksRepository(leveldb_path)
certs = []
for num, block in blocks_repo:
if num % NOTIF_INTERVAL == 0:
print(num)
for cert in block.get("certifications"):
parts = cert.split(":")
issuer = parts[0]
receiver = parts[1]
block = parts[2]
sample = {
# block number of the document creation
"blockNumberCreated": int(block),
# block in which the certification is written
"blockNumberWritten": num,
"issuer": issuer,
"receiver": receiver,
}
certs.append(sample)
return certs
def get_cert_hist(leveldb_path: str) -> list:
"""
Get cert history,
returns a sorted list of cert event: Creation, Renewal, Removal
"""
# initialize
cert_events = []
identity_id = {}
identities_repository = LevelDBIdentitiesRepository(leveldb_path)
certifications_repository = LevelDBCertificationsRepository(leveldb_path)
# Get identities index by pubkey
for pubkey, identity in identities_repository:
index = identity["wotb_id"] + 1
identity_id[pubkey] = index
# Get certifications
for key, value in certifications_repository.index.__iter__():
issuer_pubkey = key.decode("utf-8")
issuer_val = json.loads(value)
for evt in issuer_val["issued"]:
evt["created_on_ref"] = 0
op = evt["op"]
issuer_id = identity_id[evt["issuer"]]
receiver_id = identity_id[evt["receiver"]]
writtenOn = evt["writtenOn"]
if op == "CREATE":
isReplay = evt.get("isReplay", False)
optype = "Renewal" if isReplay else "Creation"
elif op == "UPDATE":
optype = "Removal"
else:
print(evt)
sample = {
"blockNumber": writtenOn,
"issuer": issuer_id,
"receiver": receiver_id,
"type": optype,
}
cert_events.append(sample)
cert_events.sort(key=lambda x: x["blockNumber"])
return cert_events