Skip to content
Snippets Groups Projects
Select Git revision
  • 3224796067fd9162422de89e9ccc7aa2a251bbbe
  • master default
  • release_0.1.53
  • 0.1.15
  • 0.1.14
  • 0.1.13
  • 0.1.11
  • 0.1.10
  • 0.1.9
9 results

_ud.py

Blame
  • functions.py 11.26 KiB
    import base58
    import json
    import math
    
    from adapters.duniter_v18.blocks import LevelDBBlocksRepository
    from adapters.duniter_v18.certifications import LevelDBCertificationsRepository
    from adapters.duniter_v18.identities import LevelDBIdentitiesRepository
    from adapters.duniter_v18.memberships import LevelDBMembershipsRepository
    from adapters.duniter_v18.wallets import LevelDBWalletsRepository
    from adapters.duniter_v18.ud_value import LevelDBUDValueRepository
    from lib.utility import load_json
    from lib.utility_param import *
    
    # Constant to estimate cert interval
    CERT_PERIOD = b_days(5)  # 5 days
    
    # when iterating on blocks, log current block every NOTIF_INTERVAL
    NOTIF_INTERVAL = 100000
    
    
    def get_wallets_data(leveldb_path: str) -> tuple:
        """
        Get wallets data,
        return a tuple with wallets, total_money, ignored_money
    
        :param leveldb_path: LevelDB folder path
        :return:
        """
        # Get wallets balances data
        wallets_repository = LevelDBWalletsRepository(leveldb_path)
        wallets = {}
        total_money = 0  # counter
        ignored_money = 0  # counter
        for unlock_expression, wallet in wallets_repository:
            balance = wallet["balance"]
            if "&&" in unlock_expression:
                print(f"⚠️ wallet {unlock_expression} ignored (balance {balance})")
                ignored_money += balance
                continue
            pubkey = unlock_expression.split("(")[1].split(")")[0]
    
            if balance == 0:
                continue
    
            wallets.update({pubkey: int(balance)})
            total_money += balance
    
        return wallets, total_money, ignored_money
    
    
    def get_identities_and_wallets(start_timestamp: int, leveldb_path: str) -> tuple:
        """
        Get identities with certifications and wallets with their balance
        start_timestamp is the timestamp of the v2 genesis
        used to estimate cert expiration in number of blocks
    
        :param start_timestamp: Import start timestamp
        :param leveldb_path: LevelDB folder path
        :return:
        """
        # initialize
        identity_names = {}
        identities = {}
        treasury = 0
    
        # Get last block info
        ud_value_repository = LevelDBUDValueRepository(leveldb_path)
        last_block = ud_value_repository.get_last()
        initial_monetary_mass = last_block["mass"]
        last_block_time = last_block["medianTime"]
    
        print("    parse Identities...")
        # Get leveldb indices as Dex data
        memberships_repository = LevelDBMembershipsRepository(leveldb_path)
        identities_repository = LevelDBIdentitiesRepository(leveldb_path)
        certifications_repository = LevelDBCertificationsRepository(leveldb_path)
        blocks_repository = LevelDBBlocksRepository(leveldb_path)
    
        # Get wallets data
        print("    parse Wallets...")
        (wallets, total_money, ignored_money) = get_wallets_data(leveldb_path)
        # add ignored money to treasury and check initial monetary mass
        treasury += ignored_money  # add ignored money to treasury
        wallet_sum = total_money + ignored_money
        missing_money = initial_monetary_mass - wallet_sum
        if missing_money != 0:
            print(
                f"⚠️ initial monetary mass {initial_monetary_mass:,} does not equal wallet sum {wallet_sum:,}"
            )
            print(f"money on the wallets: {total_money:,}")
            print(f"money from ignored sources: {ignored_money:,}")
            print(f"missing money (added to treasury): {missing_money:,}")
            # add missing money to treasury
            treasury += missing_money
        # FIXME get real treasury address
        # wallets["5EYCAe5ijiYfyeZ2JJCGq56LmPyNRAKzpG4QkoQkkQNB5e6Z"] = treasury
    
        # TODO make sure that index respects order of arrival
        # Get identities names by pubkey
        for pubkey, identity in identities_repository:
            index = identity["wotb_id"] + 1
            uid = identity["uid"]
            is_member = identity["member"]
            identity_names[pubkey] = uid
            mindex_entry = memberships_repository.get(pubkey)
            membership_expire_on = mindex_entry["expires_on"]
    
            # add address and balance to identity
            if pubkey not in wallets:
                balance = 0
            else:
                balance = wallets[pubkey]
                # remove identity from wallet
                # (only let simple wallets)
                del wallets[pubkey]
    
            # fill in identity entry
            identities[uid] = {
                "index": index,
                "owner_pubkey": pubkey,
                "balance": balance,
                "revoked": mindex_entry["revoked_on"] is not None,
                "membership_expire_on": membership_expire_on if is_member else 0,
                "membership_revokes_on": mindex_entry["revokes_on"],
                "next_cert_issuable_on": 0,  # initialized to zero, modified later
                "certs_received": {},
            }
    
        # Generate identities Ğ1v2 genesis json bloc
        # certs are stored per issuer in input file
        # certs are stored per receiver in output file
        print("    parse certifications...")
        # get certifications updated to their last state
        for i_pubkey, issuer in certifications_repository:
            i_uid = identity_names[i_pubkey]
    
            # get certifications updated to their last state
            for cert in issuer["issued"]:
                # if certification expired, skip silently
                if cert["expired_on"] != 0:
                    continue
    
                r_pubkey = cert["receiver"]
                r_uid = identity_names[r_pubkey]
    
                # get expiration of certification
                # timestamp of cert creation
                created_at = blocks_repository.get(cert["created_on"])["medianTime"]
                # block of next issuable cert
                next_issuable_on = created_at + CERT_PERIOD
                # timestamp of cert expiration
                cert_expire_at = cert["expires_on"]
                cert_expire_on = cert_expire_at
    
                if next_issuable_on > identities[i_uid]["next_cert_issuable_on"]:
                    identities[i_uid]["next_cert_issuable_on"] = next_issuable_on
    
                # add received certification to identity
                identities[r_uid]["certs_received"][i_uid] = cert_expire_on
    
        return identities, wallets
    
    
    def get_blocks(leveldb_path: str) -> list:
        """
        Get blocks,
        return a list of blocks
        """
        # Get wallets balances data
        blocks_repo = LevelDBBlocksRepository(leveldb_path)
        blocks = []
        for num, block in blocks_repo:
            if num % NOTIF_INTERVAL == 0:
                print(num)
            noEvent = (
                True
                and not block.get("certifications")
                and not block.get("transactions")
                and not block.get("joiners") # TODO membership events
                and not block.get("leavers") # TODO membership events
                and not block.get("revoked") # TODO membership events
                and not block.get("actives") # TODO membership events
                and not block.get("excluded") # TODO membership events
            )
            sample = {
                "height": block.get("number"),
                "timestamp": block.get("medianTime"),
                "hash": block.get("hash"),
                "parentHash": block.get("previousHash"),
                "validator": block.get("issuer"),
                "version": block.get("version"),
                "hasEvent": not noEvent,
            }
            blocks.append(sample)
    
        return blocks
    
    
    def get_tx(leveldb_path: str) -> list:
        """
        Get tx,
        return a list of tx
        """
        # Get wallets balances data
        blocks_repo = LevelDBBlocksRepository(leveldb_path)
        txs = []
        for num, block in blocks_repo:
            if num % NOTIF_INTERVAL == 0:
                print(num)
            for tx in block.get("transactions"):
                outputs = tx["outputs"]
                issuers = tx["issuers"]
                comment = tx["comment"]
                timestamp = block["medianTime"]
                issuers_count = len(issuers)
                # loop on issuers. If multiple issuers, approximate amount
                for issuer in issuers:
                    # loop on outputs
                    for output in outputs:
                        outputparts = output.split(":")
                        amount = int(outputparts[0])
                        receiver = outputparts[2]
                        if issuers_count > 1:
                            amount = math.floor(amount / issuers_count)  # approximation
                        # ignore non trivial unlock sources
                        # https://git.duniter.org/tools/py-g1-migrator/-/issues/3
                        if "&&" in receiver or "||" in receiver:
                            print(num)
                            print("ignoring " + receiver)
                            continue
                        receiver = receiver.split("SIG(")[1].split(")")[0]
                        sample = {
                            "blockNumber": num,
                            "timestamp": timestamp,
                            "from": issuer,
                            "to": receiver,
                            "amount": amount,
                            "comment": comment,
                        }
                        # do not include outputs that go back to sender
                        if sample["from"] != sample["to"]:
                            txs.append(sample)
        return txs
    
    
    def get_cert(leveldb_path: str) -> list:
        """
        Get cert,
        return a list of cert, but does not tell if it is a new cert or a renewal
        """
        # Get wallets balances data
        blocks_repo = LevelDBBlocksRepository(leveldb_path)
        certs = []
        for num, block in blocks_repo:
            if num % NOTIF_INTERVAL == 0:
                print(num)
            for cert in block.get("certifications"):
                parts = cert.split(":")
                issuer = parts[0]
                receiver = parts[1]
                block = parts[2]
                sample = {
                    # block number of the document creation
                    "blockNumberCreated": int(block),
                    # block in which the certification is written
                    "blockNumberWritten": num,
                    "issuer": issuer,
                    "receiver": receiver,
                }
                certs.append(sample)
        return certs
    
    
    def get_cert_hist(leveldb_path: str) -> list:
        """
        Get cert history,
        returns a sorted list of cert event: Creation, Renewal, Removal
        """
        # initialize
        cert_events = []
        identity_id = {}
    
        identities_repository = LevelDBIdentitiesRepository(leveldb_path)
        certifications_repository = LevelDBCertificationsRepository(leveldb_path)
    
        # Get identities index by pubkey
        for pubkey, identity in identities_repository:
            index = identity["wotb_id"] + 1
            identity_id[pubkey] = index
    
        # Get certifications
        for key, value in certifications_repository.index.__iter__():
            issuer_pubkey = key.decode("utf-8")
            issuer_val = json.loads(value)
            for evt in issuer_val["issued"]:
                evt["created_on_ref"] = 0
                op = evt["op"]
                issuer_id = identity_id[evt["issuer"]]
                receiver_id = identity_id[evt["receiver"]]
                writtenOn = evt["writtenOn"]
                if op == "CREATE":
                    isReplay = evt.get("isReplay", False)
                    optype = "Renewal" if isReplay else "Creation"
                elif op == "UPDATE":
                    optype = "Removal"
                else:
                    print(evt)
                sample = {
                    "blockNumber": writtenOn,
                    "issuer": issuer_id,
                    "receiver": receiver_id,
                    "type": optype,
                }
                cert_events.append(sample)
        cert_events.sort(key=lambda x: x["blockNumber"])
        return cert_events