Commit 83078779 authored by Moul's avatar Moul
Browse files

[enh] #94: apply Black on every files.

parent 159d4572
......@@ -5,7 +5,7 @@ from sys import exit
from silkaj.network_tools import check_port, best_node, EndPoint
from silkaj.cli_tools import manage_cmd
if __name__ == '__main__':
if __name__ == "__main__":
ep = EndPoint().ep
if not check_port(ep["port"]):
exit(1)
......
......@@ -10,40 +10,45 @@ from re import compile, search
def auth_method(cli_args):
if cli_args.contains_switches('auth-seed'):
if cli_args.contains_switches("auth-seed"):
return auth_by_seed()
if cli_args.contains_switches('auth-file'):
if cli_args.contains_switches("auth-file"):
return auth_by_auth_file(cli_args)
if cli_args.contains_switches('auth-wif'):
if cli_args.contains_switches("auth-wif"):
return auth_by_wif()
else:
return auth_by_scrypt(cli_args)
def generate_auth_file(cli_args):
if cli_args.contains_definitions('file'):
file = cli_args.get_definition('file')
if cli_args.contains_definitions("file"):
file = cli_args.get_definition("file")
else:
file = "authfile"
seed = auth_method(cli_args)
with open(file, "w") as f:
f.write(seed)
print("Authentication file 'authfile' generated and stored in current\
folder for following public key:", get_publickey_from_seed(seed))
print(
"Authentication file 'authfile' generated and stored in current\
folder for following public key:",
get_publickey_from_seed(seed),
)
def auth_by_auth_file(cli_args):
if cli_args.contains_definitions('file'):
file = cli_args.get_definition('file')
if cli_args.contains_definitions("file"):
file = cli_args.get_definition("file")
else:
file = "authfile"
if not path.isfile(file):
message_exit("Error: the file \"" + file + "\" does not exist")
message_exit('Error: the file "' + file + '" does not exist')
with open(file) as f:
filetxt = f.read()
regex_seed = compile('^[0-9a-fA-F]{64}$')
regex_gannonce = compile('^pub: [1-9A-HJ-NP-Za-km-z]{43,44}\nsec: [1-9A-HJ-NP-Za-km-z]{88,90}.*$')
regex_seed = compile("^[0-9a-fA-F]{64}$")
regex_gannonce = compile(
"^pub: [1-9A-HJ-NP-Za-km-z]{43,44}\nsec: [1-9A-HJ-NP-Za-km-z]{88,90}.*$"
)
# Seed Format
if search(regex_seed, filetxt):
seed = filetxt[0:64]
......@@ -58,7 +63,7 @@ def auth_by_auth_file(cli_args):
def auth_by_seed():
seed = input("Please enter your seed on hex format: ")
regex = compile('^[0-9a-fA-F]{64}$')
regex = compile("^[0-9a-fA-F]{64}$")
if not search(regex, seed):
message_exit("Error: the format of the seed is invalid")
return seed
......@@ -68,8 +73,16 @@ def auth_by_scrypt(cli_args):
salt = getpass("Please enter your Scrypt Salt (Secret identifier): ")
password = getpass("Please enter your Scrypt password (masked): ")
if cli_args.contains_definitions('n') and cli_args.contains_definitions('r') and cli_args.contains_definitions('p'):
n, r, p = cli_args.get_definition('n'), cli_args.get_definition('r'), cli_args.get_definition('p')
if (
cli_args.contains_definitions("n")
and cli_args.contains_definitions("r")
and cli_args.contains_definitions("p")
):
n, r, p = (
cli_args.get_definition("n"),
cli_args.get_definition("r"),
cli_args.get_definition("p"),
)
if n.isnumeric() and r.isnumeric() and p.isnumeric():
n, r, p = int(n), int(r), int(p)
if n <= 0 or n > 65536 or r <= 0 or r > 512 or p <= 0 or p > 32:
......@@ -87,18 +100,17 @@ def auth_by_scrypt(cli_args):
def auth_by_wif():
wif = input("Please enter your WIF or Encrypted WIF address: ")
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
regex = compile("^[1-9A-HJ-NP-Za-km-z]*$")
if not search(regex, wif):
message_exit("Error: the format of WIF is invalid")
wif_bytes = b58_decode(wif)
fi = wif_bytes[0:1]
if fi == b'\x01':
if fi == b"\x01":
return get_seed_from_wifv1(wif)
elif fi == b'\x02':
password = getpass("Please enter the " +
"password of WIF (masked): ")
elif fi == b"\x02":
password = getpass("Please enter the " + "password of WIF (masked): ")
return get_seed_from_ewifv1(wif, password)
message_exit("Error: the format of WIF is invalid or unknown")
......@@ -111,7 +123,7 @@ def get_seed_from_scrypt(salt, password, N=4096, r=16, p=1):
def get_seed_from_wifv1(wif):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
regex = compile("^[1-9A-HJ-NP-Za-km-z]*$")
if not search(regex, wif):
message_exit("Error: the format of WIF is invalid")
......@@ -124,13 +136,13 @@ def get_seed_from_wifv1(wif):
seed = wif_bytes[1:-2]
seed_fi = wif_bytes[0:-2]
if fi != b'\x01':
if fi != b"\x01":
message_exit("Error: It's not a WIF format")
# checksum control
checksum = nacl.hash.sha256(
nacl.hash.sha256(seed_fi, encoding.RawEncoder),
encoding.RawEncoder)[0:2]
nacl.hash.sha256(seed_fi, encoding.RawEncoder), encoding.RawEncoder
)[0:2]
if checksum_from_wif != checksum:
message_exit("Error: bad checksum of the WIF")
......@@ -139,7 +151,7 @@ def get_seed_from_wifv1(wif):
def get_seed_from_ewifv1(ewif, password):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
regex = compile("^[1-9A-HJ-NP-Za-km-z]*$")
if not search(regex, ewif):
message_exit("Error: the format of EWIF is invalid")
......@@ -154,13 +166,13 @@ def get_seed_from_ewifv1(ewif, password):
encryptedhalf1 = wif_bytes[5:21]
encryptedhalf2 = wif_bytes[21:37]
if fi != b'\x02':
if fi != b"\x02":
message_exit("Error: It's not a EWIF format")
# Checksum Control
checksum = nacl.hash.sha256(
nacl.hash.sha256(wif_no_checksum, encoding.RawEncoder),
encoding.RawEncoder)[0:2]
nacl.hash.sha256(wif_no_checksum, encoding.RawEncoder), encoding.RawEncoder
)[0:2]
if checksum_from_ewif != checksum:
message_exit("Error: bad checksum of EWIF address")
......@@ -178,15 +190,16 @@ def get_seed_from_ewifv1(ewif, password):
# XOR
seed1 = xor_bytes(decryptedhalf1, derivedhalf1[0:16])
seed2 = xor_bytes(decryptedhalf2, derivedhalf1[16:32])
seed = seed1+seed2
seed = seed1 + seed2
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
# Password Control
salt_from_seed = nacl.hash.sha256(
nacl.hash.sha256(
b58_decode(get_publickey_from_seed(seedhex)),
encoding.RawEncoder),
encoding.RawEncoder)[0:4]
nacl.hash.sha256(
b58_decode(get_publickey_from_seed(seedhex)), encoding.RawEncoder
),
encoding.RawEncoder,
)[0:4]
if salt_from_seed != salt:
message_exit("Error: bad Password of EWIF address")
......
......@@ -6,8 +6,7 @@ from silkaj.crypto_tools import get_publickey_from_seed, sign_document_from_seed
from silkaj.tools import message_exit
from silkaj.network_tools import post_request, HeadBlock
from silkaj.license import license_approval
from silkaj.wot import is_member,\
get_uid_from_pubkey, get_informations_for_identity
from silkaj.wot import is_member, get_uid_from_pubkey, get_informations_for_identity
def send_certification(cli_args):
......@@ -34,9 +33,13 @@ def send_certification(cli_args):
message_exit("Identity already certified by " + issuer_id)
# Certification confirmation
if not certification_confirmation(issuer_id, issuer_pubkey, id_to_certify, main_id_to_certify):
if not certification_confirmation(
issuer_id, issuer_pubkey, id_to_certify, main_id_to_certify
):
return
cert_doc = generate_certification_document(issuer_pubkey, id_to_certify, main_id_to_certify)
cert_doc = generate_certification_document(
issuer_pubkey, id_to_certify, main_id_to_certify
)
cert_doc += sign_document_from_seed(cert_doc, seed) + "\n"
# Send certification document
......@@ -44,24 +47,49 @@ def send_certification(cli_args):
print("Certification successfully sent.")
def certification_confirmation(issuer_id, issuer_pubkey, id_to_certify, main_id_to_certify):
def certification_confirmation(
issuer_id, issuer_pubkey, id_to_certify, main_id_to_certify
):
cert = list()
cert.append(["Cert", "From", "–>", "To"])
cert.append(["ID", issuer_id, "–>", main_id_to_certify["uid"]])
cert.append(["Pubkey", issuer_pubkey, "–>", id_to_certify["pubkey"]])
if input(tabulate(cert, tablefmt="fancy_grid") +
"\nDo you confirm sending this certification? [yes/no]: ") == "yes":
if (
input(
tabulate(cert, tablefmt="fancy_grid")
+ "\nDo you confirm sending this certification? [yes/no]: "
)
== "yes"
):
return True
def generate_certification_document(issuer_pubkey, id_to_certify, main_id_to_certify):
head_block = HeadBlock().head_block
return "Version: 10\n\
return (
"Version: 10\n\
Type: Certification\n\
Currency: " + head_block["currency"] + "\n\
Issuer: " + issuer_pubkey + "\n\
IdtyIssuer: " + id_to_certify["pubkey"] + "\n\
IdtyUniqueID: " + main_id_to_certify["uid"] + "\n\
IdtyTimestamp: " + main_id_to_certify["meta"]["timestamp"] + "\n\
IdtySignature: " + main_id_to_certify["self"] + "\n\
CertTimestamp: " + str(head_block["number"]) + "-" + head_block["hash"] + "\n"
Currency: "
+ head_block["currency"]
+ "\n\
Issuer: "
+ issuer_pubkey
+ "\n\
IdtyIssuer: "
+ id_to_certify["pubkey"]
+ "\n\
IdtyUniqueID: "
+ main_id_to_certify["uid"]
+ "\n\
IdtyTimestamp: "
+ main_id_to_certify["meta"]["timestamp"]
+ "\n\
IdtySignature: "
+ main_id_to_certify["self"]
+ "\n\
CertTimestamp: "
+ str(head_block["number"])
+ "-"
+ head_block["hash"]
+ "\n"
)
......@@ -5,26 +5,47 @@ from commandlines import Command
from silkaj.tx import send_transaction
from silkaj.money import cmd_amount
from silkaj.cert import send_certification
from silkaj.commands import currency_info, difficulties, set_network_sort_keys,\
network_info, argos_info, list_issuers
from silkaj.commands import (
currency_info,
difficulties,
set_network_sort_keys,
network_info,
argos_info,
list_issuers,
)
from silkaj.tools import message_exit
from silkaj.network_tools import get_request
from silkaj.wot import received_sent_certifications, id_pubkey_correspondence
from silkaj.auth import generate_auth_file
from silkaj.license import display_license
from silkaj.constants import SILKAJ_VERSION, G1_SYMBOL, GTEST_SYMBOL, G1_DEFAULT_ENDPOINT, G1_TEST_DEFAULT_ENDPOINT
from silkaj.constants import (
SILKAJ_VERSION,
G1_SYMBOL,
GTEST_SYMBOL,
G1_DEFAULT_ENDPOINT,
G1_TEST_DEFAULT_ENDPOINT,
)
def usage():
message_exit("Silkaj: command line client for Duniter currencies\
message_exit(
"Silkaj: command line client for Duniter currencies\
\n\nhelp: -h, --help, --usage \
\nversion: -v, --version \
\nabout: display informations about the programm\
\n \
\nEndpoint:\
\nDefault endpoint will reach " + G1_SYMBOL + " currency with `https://" + G1_DEFAULT_ENDPOINT[0] + "` endpoint.\
\nDefault endpoint will reach "
+ G1_SYMBOL
+ " currency with `https://"
+ G1_DEFAULT_ENDPOINT[0]
+ "` endpoint.\
\nUse one of these options at the end of the command:\
\n - `--gtest` to reach " + GTEST_SYMBOL + " currency with `https://" + G1_TEST_DEFAULT_ENDPOINT[0] + "` endpoint\
\n - `--gtest` to reach "
+ GTEST_SYMBOL
+ " currency with `https://"
+ G1_TEST_DEFAULT_ENDPOINT[0]
+ "` endpoint\
\n - custom endpoint can be specified with `-p` option followed by <domain>:<port>\
\n \
\nCommands: \
......@@ -77,7 +98,8 @@ def usage():
\n you can specify others values specifying following parameters: -n <N> -r <r> -p <p>\
\n - Seed: --auth-seed\
\n - File: --auth-file [--file=<path file>], './authfile' will be taken if there is no path specified\
\n - WIF: --auth-wif")
\n - WIF: --auth-wif"
)
def manage_cmd():
......@@ -85,8 +107,29 @@ def manage_cmd():
if cli_args.is_version_request():
message_exit(SILKAJ_VERSION)
subcmd = ["license", "about", "info", "diffi", "net", "network", "issuers", "argos", "amount", "tx", "transaction", "cert", "generate_auth_file", "id", "identities", "wot"]
if cli_args.is_help_request() or cli_args.is_usage_request() or cli_args.subcmd not in subcmd:
subcmd = [
"license",
"about",
"info",
"diffi",
"net",
"network",
"issuers",
"argos",
"amount",
"tx",
"transaction",
"cert",
"generate_auth_file",
"id",
"identities",
"wot",
]
if (
cli_args.is_help_request()
or cli_args.is_usage_request()
or cli_args.subcmd not in subcmd
):
usage()
if cli_args.subcmd == "about":
......@@ -104,8 +147,12 @@ def manage_cmd():
set_network_sort_keys(cli_args.get_definition("s"))
network_info(cli_args.contains_switches("discover"))
elif cli_args.subcmd == "issuers" and cli_args.subsubcmd and int(cli_args.subsubcmd) >= 0:
list_issuers(int(cli_args.subsubcmd), cli_args.contains_switches('last'))
elif (
cli_args.subcmd == "issuers"
and cli_args.subsubcmd
and int(cli_args.subsubcmd) >= 0
):
list_issuers(int(cli_args.subsubcmd), cli_args.contains_switches("last"))
elif cli_args.subcmd == "argos":
argos_info()
......@@ -133,10 +180,13 @@ def manage_cmd():
def about():
print("\
print(
"\
\n @@@@@@@@@@@@@\
\n @@@ @ @@@\
\n @@@ @@ @@@@@@ @@. ", SILKAJ_VERSION, "\
\n @@@ @@ @@@@@@ @@. ",
SILKAJ_VERSION,
"\
\n @@ @@@ @@@@@@@@@@@ @@,\
\n @@ @@@ &@@@@@@@@@@@@@ @@@ Powerfull and lightweight command line client\
\n @@ @@@ @@@@@@@@@# @@@@ @@(\
......@@ -150,6 +200,5 @@ def about():
\n @@@ @@@@@@@@@@@@ @ @@*\
\n @@@ @@@@@@@@ @ @@@ License: GNU AGPLv3\
\n @@@@ @@ @@@,\
\n @@@@@@@@@@@@@@@\n")
\n @@@@@@@@@@@@@@@\n",
)
......@@ -6,37 +6,71 @@ from tabulate import tabulate
from operator import itemgetter
from silkaj.wot import get_uid_from_pubkey
from silkaj.network_tools import discover_peers, get_request, best_node, EndPoint, HeadBlock
from silkaj.network_tools import (
discover_peers,
get_request,
best_node,
EndPoint,
HeadBlock,
)
from silkaj.tools import convert_time, message_exit, CurrencySymbol
from silkaj.constants import NO_MATCHING_ID
def currency_info():
info_data = dict()
for info_type in ["newcomers", "certs", "actives", "leavers", "excluded", "ud", "tx"]:
info_data[info_type] = get_request("blockchain/with/" + info_type)["result"]["blocks"]
for info_type in [
"newcomers",
"certs",
"actives",
"leavers",
"excluded",
"ud",
"tx",
]:
info_data[info_type] = get_request("blockchain/with/" + info_type)["result"][
"blocks"
]
head_block = HeadBlock().head_block
ep = EndPoint().ep
system("clear")
print("Connected to node:", ep[best_node(ep, False)], ep["port"],
"\nCurrent block number:", head_block["number"],
"\nCurrency name:", CurrencySymbol().symbol,
"\nNumber of members:", head_block["membersCount"],
"\nMinimal Proof-of-Work:", head_block["powMin"],
"\nCurrent time:", convert_time(head_block["time"], "all"),
"\nMedian time:", convert_time(head_block["medianTime"], "all"),
"\nDifference time:", convert_time(head_block["time"] - head_block["medianTime"], "hour"),
"\nNumber of blocks containing: \
\n- new comers:", len(info_data["newcomers"]),
"\n- Certifications:", len(info_data["certs"]),
"\n- Actives (members updating their membership):", len(info_data["actives"]),
"\n- Leavers:", len(info_data["leavers"]),
"\n- Excluded:", len(info_data["excluded"]),
"\n- UD created:", len(info_data["ud"]),
"\n- transactions:", len(info_data["tx"]))
def match_pattern(pow, match='', p=1):
print(
"Connected to node:",
ep[best_node(ep, False)],
ep["port"],
"\nCurrent block number:",
head_block["number"],
"\nCurrency name:",
CurrencySymbol().symbol,
"\nNumber of members:",
head_block["membersCount"],
"\nMinimal Proof-of-Work:",
head_block["powMin"],
"\nCurrent time:",
convert_time(head_block["time"], "all"),
"\nMedian time:",
convert_time(head_block["medianTime"], "all"),
"\nDifference time:",
convert_time(head_block["time"] - head_block["medianTime"], "hour"),
"\nNumber of blocks containing: \
\n- new comers:",
len(info_data["newcomers"]),
"\n- Certifications:",
len(info_data["certs"]),
"\n- Actives (members updating their membership):",
len(info_data["actives"]),
"\n- Leavers:",
len(info_data["leavers"]),
"\n- Excluded:",
len(info_data["excluded"]),
"\n- UD created:",
len(info_data["ud"]),
"\n- transactions:",
len(info_data["tx"]),
)
def match_pattern(pow, match="", p=1):
while pow > 0:
if pow >= 16:
match += "0"
......@@ -46,7 +80,7 @@ def match_pattern(pow, match='', p=1):
match += "[0-" + hex(15 - pow)[2:].upper() + "]"
p *= pow
pow = 0
return match + '*', p
return match + "*", p
def power(nbr, pow=0):
......@@ -59,10 +93,13 @@ def power(nbr, pow=0):
def difficulties():
while True:
diffi = get_request("blockchain/difficulties")
levels = [OrderedDict((i, d[i]) for i in ("uid", "level")) for d in diffi["levels"]]
levels = [
OrderedDict((i, d[i]) for i in ("uid", "level")) for d in diffi["levels"]
]
diffi["levels"] = levels
current = get_request("blockchain/current")
issuers, sorted_diffi = 0, sorted(diffi["levels"], key=itemgetter("level"), reverse=True)
issuers = 0
sorted_diffi = sorted(diffi["levels"], key=itemgetter("level"), reverse=True)
for d in diffi["levels"]:
if d["level"] / 2 < current["powMin"]:
issuers += 1
......@@ -70,9 +107,18 @@ def difficulties():
d["Π diffi"] = power(match_pattern(d["level"])[1])
d["Σ diffi"] = d.pop("level")
system("clear")
print("Minimal Proof-of-Work: {0} to match `{1}`\nDifficulty to generate next block n°{2} for {3}/{4} nodes:\n{5}"
.format(current["powMin"], match_pattern(int(current["powMin"]))[0], diffi["block"], issuers, len(diffi["levels"]),
tabulate(sorted_diffi, headers="keys", tablefmt="orgtbl", stralign="center")))
print(
"Minimal Proof-of-Work: {0} to match `{1}`\nDifficulty to generate next block n°{2} for {3}/{4} nodes:\n{5}".format(
current["powMin"],
match_pattern(int(current["powMin"]))[0],
diffi["block"],
issuers,
len(diffi["levels"]),
tabulate(
sorted_diffi, headers="keys", tablefmt="orgtbl", stralign="center"
),
)
)
sleep(5)
......@@ -82,14 +128,16 @@ network_sort_keys = ["block", "member", "diffi", "uid"]
def set_network_sort_keys(some_keys):
global network_sort_keys
if some_keys.endswith(","):
message_exit("Argument 'sort' ends with a comma, you have probably inserted a space after the comma, which is incorrect.")
message_exit(
"Argument 'sort' ends with a comma, you have probably inserted a space after the comma, which is incorrect."
)
network_sort_keys = some_keys.split(",")
def get_network_sort_key(endpoint):
t = list()
for akey in network_sort_keys:
if akey == 'diffi' or akey == 'block' or akey == 'port':
if akey == "diffi" or akey == "block" or akey == "port":
t.append(int(endpoint[akey]) if akey in endpoint else 0)
else:
t.append(str(endpoint[akey]) if akey in endpoint else "")
......@@ -97,20 +145,24 @@ def get_network_sort_key(endpoint):
def network_info(discover):
rows, columns = popen('stty size', 'r').read().split()
# print(rows, columns) # debug
rows, columns = popen("stty size", "r").read().split()
wide = int(columns)
if wide < 146:
message_exit("Wide screen need to be larger than 146. Current wide: " + wide)
# discover peers
# and make sure fields are always ordered the same
endpoints = [OrderedDict((i, p.get(i, None)) for i in ("domain", "port", "ip4", "ip6", "pubkey")) for p in discover_peers(discover)]
endpoints = [
OrderedDict(
(i, p.get(i, None)) for i in ("domain", "port", "ip4", "ip6", "pubkey")
)
for p in discover_peers(discover)
]
# Todo : renommer endpoints en info
diffi = get_request("blockchain/difficulties")
i, members = 0, 0
print("Getting informations about nodes:")
while (i < len(endpoints)):
print("{0:.0f}%".format(i/len(endpoints) * 100, 1), end=" ")
while i < len(endpoints):
print("{0:.0f}%".format(i / len(endpoints) * 100, 1), end=" ")
best_ep = best_node(endpoints[i], False)
print(best_ep if best_ep is None else endpoints[i][best_ep], end=" ")
print(endpoints[i]["port"])
......@@ -126,8 +178,8 @@ def network_info(discover):
if endpoints[i].get("member") is None:
endpoints[i]["member"] = "no"
endpoints[i]["pubkey"] = endpoints[i]["pubkey"][:5] + "…"
# Todo: request difficulty from node point of view: two nodes with same pubkey id could be on diffrent branches and have different difficulties
# diffi = get_request(endpoints[i], "blockchain/difficulties") # super long, doit être requetté uniquement pour les nœuds d’une autre branche
# Todo: request difficulty from node point of view: two nodes with same pubkey id could be on diffrent branches and have different difficulties
# diffi = get_request(endpoints[i], "blockchain/difficulties") # super long, doit être requetté uniquement pour les nœuds d’une autre branche
for d in diffi["levels"]:
if endpoints[i].get("uid") is not None:
if endpoints[i]["uid"] == d["uid"]:
......@@ -138,12 +190,18 @@ def network_info(discover):
if current_blk is not None:
endpoints[i]["gen_time"] = convert_time(current_blk["time"], "hour")
if wide > 171:
endpoints[i]["mediantime"] = convert_time(current_blk["medianTime"], "hour")
endpoints[i]["mediantime"] = convert_time(
current_blk["medianTime"], "hour"
)