Skip to content
Snippets Groups Projects
Select Git revision
  • d4912f02c2ef5ae228e9cd79b1fad941c08ea8a9
  • master default protected
  • hugo-add-cert-emission-block
  • duniter-v2s-issue-125-industrialize-releases
  • tuxmain/gdev
5 results

functions.py

  • functions.py 5.19 KiB
    import json, csv
    from symbol import parameters
    import sys
    from urllib.request import urlopen
    from substrateinterface import Keypair, KeypairType
    from currency_parameters import *
    from bs4 import BeautifulSoup
    
    def get_g1stats_data():
        f = urlopen("https://g1-stats.axiom-team.fr/data/forbes.json")
        return json.load(f)
    
    def get_datajune_certs():
        list_certs = {}
        certsIndex = urlopen("https://files.datajune.coinduf.eu/graphs.lg").read().decode()
        
        soup = BeautifulSoup(certsIndex, features="lxml")
        for chunks in soup.find_all('a'):
            pass
        lastChunk = int(chunks.string.split('.')[0])
        
        print('Getting certifications in progress, this could take ~5 minutes...')
        for i in progressbar(range(1, lastChunk)):
            # print(i)
            paddedNbr = str(i).rjust(4, '0')
            graphData = urlopen("https://files.datajune.coinduf.eu/graphs.lg/" + paddedNbr + ".lg")
            
            for j in graphData.readlines():
                data = j.decode()
                if 'simplegraph' in data: continue
                
                cert_data = data.replace('\n', '').split(',')
                issuer = int(cert_data[0])
                receiver = int(cert_data[1])
                
                if receiver not in list_certs: list_certs.update({receiver: []})
                if issuer not in list_certs[receiver]: list_certs[receiver].append(issuer)
        
        list_certs_json = json.dumps(list_certs).encode()
        list_certs_json_file = open('certs.json', 'wb')
        list_certs_json_file.write(list_certs_json)
        # print(list_certs[1])
        # sys.exit(0)
        
        return list_certs
    
    def get_datajune_certs_local():
        f = open('certs.json')
        return json.load(f)
            
    def username_to_number(username, datajune_usernames):
        if username in datajune_usernames:
            return datajune_usernames.index(username) + 1
        else:
            return 9999999
    
    def number_to_username(number, datajune_usernames):
        return datajune_usernames[number - 1]
    
    
    def get_datajune_usernames():
        return urlopen("https://files.datajune.coinduf.eu/global/pseudos.txt").read().decode().split()
    
    
    
    def v1PubkeyToV2Address(pubkey):
        # get incomplete Substrate keypair (only public key) from public key bytes
        keypair = Keypair(public_key=pubkey, ss58_format=42, crypto_type=KeypairType.ED25519)
    
        # return V2 address
        return keypair.ss58_address
    
    def get_genesis_parameters():
        return {
            'genesis_certs_expire_on': GENESIS_CERTS_EXPIRE_ON,
            'genesis_certs_min_received': GENESIS_CERTS_MIN_RECEIVED,
            'genesis_memberships_expire_on': GENESIS_MEMBERSHIPS_EXPIRE_ON,
            'genesis_smith_certs_expire_on': GENESIS_SMITH_CERTS_EXPIRE_ON,
            'genesis_smith_certs_min_received': GENESIS_SMITH_CERTS_MIN_RECEIVED,
            'genesis_smith_memberships_expire_on': GENESIS_SMITH_MEMBERSHIPS_EXPIRE_ON,
        }
    
    def get_parameters():
        return {
            'babe_epoch_duration': BABE_EPOCH_DURATION,
            'cert_period': CERT_PERIOD,
            'cert_max_by_issuer': CERT_MAX_BY_ISSUER,
            'cert_min_received_cert_to_issue_cert': CERT_MIN_RECEIVED_CERT_TO_ISSUE_CERT,
            'cert_validity_period': CERT_VALIDITY_PERIOD,
            'idty_confirm_period': IDTY_CONFIRM_PERIOD,
            'idty_creation_period': IDTY_CREATION_PERIOD,
            'membership_period': MEMBERSHIP_PERIOD,
            'pending_membership_period': PENDING_MEMBERSHIP_PERIOD,
            'ud_creation_period': UD_CREATION_PERIOD,
            'ud_reeval_period': UD_REEVAL_PERIOD,
            'smith_cert_period': SMITH_CERT_PERIOD,
            'smith_cert_max_by_issuer': SMITH_CERT_MAX_BY_ISSUER,
            'smith_cert_min_received_cert_to_issue_cert': SMITH_CERT_MIN_RECEIVED_CERT_TO_ISSUE_CERT,
            'smith_cert_validity_period': SMITH_CERT_VALIDITY_PERIOD,
            'smith_membership_period': SMITH_MEMBERSHIP_PERIOD,
            'smith_pending_membership_period': SMITH_PENDING_MEMBERSHIP_PERIOD,
            'smiths_wot_first_cert_issuable_on': SMITHS_WOT_FIRST_CERT_ISSUABLE_ON,
            'smiths_wot_min_cert_for_membership': SMITHS_WOT_MIN_CERT_FOR_MEMBERSHIP,
            'wot_first_cert_issuable_on': WOT_FIRST_CERT_ISSUABLE_ON,
            'wot_min_cert_for_create_idty_right': WOT_MIN_CERT_FOR_CREATE_IDTY_RIGHT,
            'wot_min_cert_for_membership': WOT_MIN_CERT_FOR_MEMBERSHIP,
        }
    
    def get_smiths():
        smiths = {}
        smiths.update({'poka': {'certs': ['elois', 'HugoTrentesaux', 'vit', 'tuxmain']}})
        smiths.update({'elois': {'certs': ['HugoTrentesaux', 'vit', 'tuxmain', 'poka']}})
        smiths.update({'HugoTrentesaux': {'certs': ['elois', 'vit', 'tuxmain', 'poka']}})
        smiths.update({'vit': {'certs': ['elois', 'HugoTrentesaux', 'tuxmain', 'poka']}})
        smiths.update({'tuxmain': {'certs': ['elois', 'HugoTrentesaux', 'vit', 'poka']}})
        smiths.update({'1000i100': {'certs': ['elois', 'HugoTrentesaux', 'vit', 'poka']}})
        return smiths
    
    def get_technical_committee():
        return ['poka', 'elois', 'HugoTrentesaux', 'vit', 'tuxmain']
    
    import sys
    def progressbar(it, prefix="", size=60, out=sys.stdout): # Python3.3+
        count = len(it)
        def show(j):
            x = int(size*j/count)
            print("{}[{}{}] {}/{}".format(prefix, "#"*x, "."*(size-x), j, count), 
                    end='\r', file=out, flush=True)
        show(0)
        for i, item in enumerate(it):
            yield item
            show(i+1)
        print("\n", flush=True, file=out)