Mise à jour effectuée, merci de nous signaler tout dysfonctionnement ! | Upgrade done, please let us know about any dysfunction!

Commit d1f53c1f authored by Moul's avatar Moul
Browse files

[enh] improve imports with flake8’s help

- May improve loading speed performance.
parent 34d2a611
from tools import *
import nacl.encoding
import nacl.signing
from tools import get_publickey_from_seed, b58_decode, xor_bytes, message_exit
from nacl import encoding
import nacl.hash
import scrypt
from scrypt import hash
import pyaes
import getpass
import os
from getpass import getpass
from os import path
from re import compile, search
def auth_method(cli_args):
......@@ -37,20 +37,20 @@ def auth_by_auth_file(cli_args):
file = cli_args.get_definition('file')
else:
file = "authfile"
if not os.path.isfile(file):
if not path.isfile(file):
message_exit("Error: the file \"" + file + "\" does not exist")
with open(file) as f:
filetxt = f.read()
regex_seed = re.compile('^[0-9a-fA-F]{64}$')
regex_gannonce = re.compile('^pub: [1-9A-HJ-NP-Za-km-z]{43,44}\nsec: [1-9A-HJ-NP-Za-km-z]{88,90}.*$')
regex_seed = compile('^[0-9a-fA-F]{64}$')
regex_gannonce = compile('^pub: [1-9A-HJ-NP-Za-km-z]{43,44}\nsec: [1-9A-HJ-NP-Za-km-z]{88,90}.*$')
# Seed Format
if re.search(regex_seed, filetxt):
if search(regex_seed, filetxt):
seed = filetxt[0:64]
# gannonce.duniter.org Format
elif re.search(regex_gannonce, filetxt):
elif search(regex_gannonce, filetxt):
private_key = filetxt.split("sec: ")[1].split("\n")[0]
seed = nacl.encoding.HexEncoder.encode(b58_decode(private_key))[0:64].decode("utf-8")
seed = encoding.HexEncoder.encode(b58_decode(private_key))[0:64].decode("utf-8")
else:
message_exit("Error: the format of the file is invalid")
return seed
......@@ -58,15 +58,15 @@ def auth_by_auth_file(cli_args):
def auth_by_seed():
seed = input("Please enter your seed on hex format: ")
regex = re.compile('^[0-9a-fA-F]{64}$')
if not re.search(regex, seed):
regex = compile('^[0-9a-fA-F]{64}$')
if not search(regex, seed):
message_exit("Error: the format of the seed is invalid")
return seed
def auth_by_scrypt(cli_args):
salt = getpass.getpass("Please enter your Scrypt Salt (Secret identifier): ")
password = getpass.getpass("Please enter your Scrypt password (masked): ")
salt = getpass("Please enter your Scrypt Salt (Secret identifier): ")
password = getpass("Please enter your Scrypt password (masked): ")
if cli_args.contains_definitions('n') and cli_args.contains_definitions('r') and cli_args.contains_definitions('p'):
n, r, p = cli_args.get_definition('n'), cli_args.get_definition('r'), cli_args.get_definition('p')
......@@ -87,8 +87,8 @@ def auth_by_scrypt(cli_args):
def auth_by_wif():
wif = input("Please enter your WIF or Encrypted WIF address: ")
regex = re.compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not re.search(regex, wif):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not search(regex, wif):
message_exit("Error: the format of WIF is invalid")
wif_bytes = b58_decode(wif)
......@@ -97,7 +97,7 @@ def auth_by_wif():
if fi == b'\x01':
return get_seed_from_wifv1(wif)
elif fi == b'\x02':
password = getpass.getpass("Please enter the " +
password = getpass("Please enter the " +
"password of WIF (masked): ")
return get_seed_from_ewifv1(wif, password)
......@@ -105,14 +105,14 @@ def auth_by_wif():
def get_seed_from_scrypt(salt, password, N=4096, r=16, p=1):
seed = scrypt.hash(password, salt, N, r, p, 32)
seedhex = nacl.encoding.HexEncoder.encode(seed).decode("utf-8")
seed = hash(password, salt, N, r, p, 32)
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
return seedhex
def get_seed_from_wifv1(wif):
regex = re.compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not re.search(regex, wif):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not search(regex, wif):
message_exit("Error: the format of WIF is invalid")
wif_bytes = b58_decode(wif)
......@@ -129,18 +129,18 @@ def get_seed_from_wifv1(wif):
# checksum control
checksum = nacl.hash.sha256(
nacl.hash.sha256(seed_fi, nacl.encoding.RawEncoder),
nacl.encoding.RawEncoder)[0:2]
nacl.hash.sha256(seed_fi, encoding.RawEncoder),
encoding.RawEncoder)[0:2]
if checksum_from_wif != checksum:
message_exit("Error: bad checksum of the WIF")
seedhex = nacl.encoding.HexEncoder.encode(seed).decode("utf-8")
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
return seedhex
def get_seed_from_ewifv1(ewif, password):
regex = re.compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not re.search(regex, ewif):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not search(regex, ewif):
message_exit("Error: the format of EWIF is invalid")
wif_bytes = b58_decode(ewif)
......@@ -159,14 +159,14 @@ def get_seed_from_ewifv1(ewif, password):
# Checksum Control
checksum = nacl.hash.sha256(
nacl.hash.sha256(wif_no_checksum, nacl.encoding.RawEncoder),
nacl.encoding.RawEncoder)[0:2]
nacl.hash.sha256(wif_no_checksum, encoding.RawEncoder),
encoding.RawEncoder)[0:2]
if checksum_from_ewif != checksum:
message_exit("Error: bad checksum of EWIF address")
# SCRYPT
password = password.encode("utf-8")
scrypt_seed = scrypt.hash(password, salt, 16384, 8, 8, 64)
scrypt_seed = hash(password, salt, 16384, 8, 8, 64)
derivedhalf1 = scrypt_seed[0:32]
derivedhalf2 = scrypt_seed[32:64]
......@@ -179,14 +179,14 @@ def get_seed_from_ewifv1(ewif, password):
seed1 = xor_bytes(decryptedhalf1, derivedhalf1[0:16])
seed2 = xor_bytes(decryptedhalf2, derivedhalf1[16:32])
seed = seed1+seed2
seedhex = nacl.encoding.HexEncoder.encode(seed).decode("utf-8")
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
# Password Control
salt_from_seed = nacl.hash.sha256(
nacl.hash.sha256(
b58_decode(get_publickey_from_seed(seedhex)),
nacl.encoding.RawEncoder),
nacl.encoding.RawEncoder)[0:4]
encoding.RawEncoder),
encoding.RawEncoder)[0:4]
if salt_from_seed != salt:
message_exit("Error: bad Password of EWIF address")
......
......@@ -6,9 +6,8 @@ from pydoc import pager
from tabulate import tabulate
from auth import auth_method
from tools import get_publickey_from_seed, message_exit, get_current_block,\
sign_document_from_seed
from network_tools import get_request, get_current_block, post_request
from tools import get_publickey_from_seed, message_exit, sign_document_from_seed
from network_tools import get_current_block, post_request
from constants import NO_MATCHING_ID
from wot import is_member, get_pubkey_from_id, get_pubkeys_from_id,\
get_uid_from_pubkey
......
import datetime
import time
import os
from datetime import datetime
from time import sleep
from os import system, popen
from collections import OrderedDict
from tabulate import tabulate
from operator import itemgetter
from wot import *
from network_tools import *
from auth import *
from tools import *
from constants import *
from wot import get_uid_from_pubkey
from network_tools import discover_peers, get_request, best_node, get_current_block
from tools import convert_time, get_currency_symbol, message_exit
from constants import NO_MATCHING_ID
def currency_info(ep):
......@@ -19,7 +18,7 @@ def currency_info(ep):
info_data[info_type[i]] = get_request(ep, "blockchain/with/" + info_type[i])["result"]["blocks"]
i += 1
current = get_current_block(ep)
os.system("clear")
system("clear")
print("Connected to node:", ep[best_node(ep, 1)], ep["port"],
"\nCurrent block number:", current["number"],
"\nCurrency name:", get_currency_symbol(current["currency"]),
......@@ -71,11 +70,11 @@ def difficulties(ep):
d["match"] = match_pattern(d["level"])[0][:20]
d["Π diffi"] = power(match_pattern(d["level"])[1])
d["Σ diffi"] = d.pop("level")
os.system("clear")
system("clear")
print("Minimal Proof-of-Work: {0} to match `{1}`\nDifficulty to generate next block n°{2} for {3}/{4} nodes:\n{5}"
.format(current["powMin"], match_pattern(int(current["powMin"]))[0], diffi["block"], issuers, len(diffi["levels"]),
tabulate(sorted_diffi, headers="keys", tablefmt="orgtbl", stralign="center")))
time.sleep(5)
sleep(5)
network_sort_keys = ["block", "member", "diffi", "uid"]
......@@ -99,7 +98,7 @@ def get_network_sort_key(endpoint):
def network_info(ep, discover):
rows, columns = os.popen('stty size', 'r').read().split()
rows, columns = popen('stty size', 'r').read().split()
# print(rows, columns) # debug
wide = int(columns)
if wide < 146:
......@@ -154,8 +153,8 @@ def network_info(ep, discover):
else:
endpoints[i]["ip6"] = endpoints[i]["ip6"][:8] + "…"
i += 1
os.system("clear")
print(len(endpoints), "peers ups, with", members, "members and", len(endpoints) - members, "non-members at", datetime.datetime.now().strftime("%H:%M:%S"))
system("clear")
print(len(endpoints), "peers ups, with", members, "members and", len(endpoints) - members, "non-members at", datetime.now().strftime("%H:%M:%S"))
endpoints = sorted(endpoints, key=get_network_sort_key)
print(tabulate(endpoints, headers="keys", tablefmt="orgtbl", stralign="center"))
......@@ -187,7 +186,7 @@ def list_issuers(ep, nbr, last):
issuer2["pubkey"] == issuer["pubkey"]:
issuer2["uid"] = uid
issuer2.pop("pubkey")
os.system("clear")
system("clear")
print("Issuers for last {0} blocks from block n°{1} to block n°{2}".format(nbr, current_nbr - nbr + 1, current_nbr), end=" ")
if last or nbr <= 30:
sorted_list = sorted(list_issuers, key=itemgetter("block"), reverse=True)
......
......@@ -3,6 +3,7 @@ from tools import get_currency_symbol, get_publickey_from_seed
from auth import auth_method
from wot import check_public_key
def cmd_amount(ep, cli_args):
if not cli_args.subsubcmd.startswith("--"):
pubkeys = cli_args.subsubcmd.split(":")
......@@ -71,7 +72,7 @@ def get_amount_from_pubkey(ep, pubkey):
current_blk = get_current_block(ep)
last_block_number = int(current_blk["number"])
# add pending output
# add pending output
for pending in pendings:
blockstamp = pending["blockstamp"]
block_number = int(blockstamp.split("-")[0])
......
from __future__ import unicode_literals
import ipaddress
import json
from ipaddress import ip_address
from json import loads
import socket
import urllib.request
import sys
from sys import exit, stderr
def discover_peers(ep, discover):
......@@ -94,7 +94,7 @@ def endpoint_type(sep, ep):
def check_ip(address):
try:
return ipaddress.ip_address(address).version
return ip_address(address).version
except:
return 0
......@@ -109,7 +109,7 @@ def get_request(ep, path):
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
encoding = response.info().get_content_charset('utf8')
return json.loads(response.read().decode(encoding))
return loads(response.read().decode(encoding))
def post_request(ep, path, postdata):
......@@ -123,10 +123,10 @@ def post_request(ep, path, postdata):
try:
response = urllib.request.urlopen(request)
except urllib.error.URLError as e:
print(e, file=sys.stderr)
sys.exit(1)
print(e, file=stderr)
exit(1)
encoding = response.info().get_content_charset('utf8')
return json.loads(response.read().decode(encoding))
return loads(response.read().decode(encoding))
def best_node(ep, main):
......@@ -141,8 +141,8 @@ def best_node(ep, main):
except:
pass
if main:
print("Wrong node gived as argument", file=sys.stderr)
sys.exit(1)
print("Wrong node gived as argument", file=stderr)
exit(1)
return None
......@@ -150,12 +150,13 @@ def check_port(port):
try:
port = int(port)
except:
print("Port must be an integer", file=sys.stderr)
sys.exit(1)
print("Port must be an integer", file=stderr)
exit(1)
if (port < 0 or port > 65536):
print("Wrong port number", file=sys.stderr)
sys.exit(1)
print("Wrong port number", file=stderr)
exit(1)
return 1
def get_current_block(ep):
return get_request(ep, "blockchain/current")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
from sys import stderr
from commandlines import Command
from tx import send_transaction
from money import cmd_amount
from cert import send_certification
from commands import *
from tools import *
from wot import *
from constants import SILKAJ_VERSION
from commands import currency_info, difficulties, set_network_sort_keys,\
network_info, argos_info, list_issuers
from tools import message_exit
from network_tools import check_port, best_node
from wot import received_sent_certifications, id_pubkey_correspondence
from auth import generate_auth_file
from constants import SILKAJ_VERSION, G1_DEFAULT_ENDPOINT
def usage():
......@@ -81,7 +83,7 @@ def cli():
try:
ep["domain"], ep["port"] = cli_args.get_definition('p').rsplit(':', 1)
except:
print("Requested default node: <{}:{}>".format(ep["domain"], ep["port"]), file=sys.stderr)
print("Requested default node: <{}:{}>".format(ep["domain"], ep["port"]), file=stderr)
if ep["domain"].startswith('[') and ep["domain"].endswith(']'):
ep["domain"] = ep["domain"][1:-1]
return ep, cli_args
......
import datetime
import nacl.encoding
import nacl.signing
import nacl.hash
import re
import sys
from datetime import datetime
from nacl import encoding, signing, hash, bindings
from re import compile, search
from sys import exit
from network_tools import *
from constants import *
from constants import G1_SYMBOL, GTEST_SYMBOL
def convert_time(timestamp, kind):
......@@ -18,7 +15,7 @@ def convert_time(timestamp, kind):
pattern = "%H:%M:%S"
else:
pattern = "%M:%S"
return datetime.datetime.fromtimestamp(ts).strftime(pattern)
return datetime.fromtimestamp(ts).strftime(pattern)
def get_currency_symbol(currency):
......@@ -30,16 +27,16 @@ def get_currency_symbol(currency):
def sign_document_from_seed(document, seed):
seed = bytes(seed, 'utf-8')
signing_key = nacl.signing.SigningKey(seed, nacl.encoding.HexEncoder)
signing_key = signing.SigningKey(seed, encoding.HexEncoder)
signed = signing_key.sign(bytes(document, 'utf-8'))
signed_b64 = nacl.encoding.Base64Encoder.encode(signed.signature)
signed_b64 = encoding.Base64Encoder.encode(signed.signature)
return signed_b64.decode("utf-8")
def get_publickey_from_seed(seed):
seed = bytes(seed, 'utf-8')
seed = nacl.encoding.HexEncoder.decode(seed)
public_key, secret_key = nacl.bindings.crypto_sign_seed_keypair(seed)
seed = encoding.HexEncoder.decode(seed)
public_key, secret_key = bindings.crypto_sign_seed_keypair(seed)
return b58_encode(public_key)
......@@ -49,17 +46,17 @@ def check_public_key(pubkey, display_error):
Check pubkey checksum which could be append after the pubkey
If check pass: return pubkey
"""
regex = re.compile('^[1-9A-HJ-NP-Za-km-z]{43,44}$')
regex_checksum = re.compile('^[1-9A-HJ-NP-Za-km-z]{43,44}' +
regex = compile('^[1-9A-HJ-NP-Za-km-z]{43,44}$')
regex_checksum = compile('^[1-9A-HJ-NP-Za-km-z]{43,44}' +
':[1-9A-HJ-NP-Za-km-z]{3}$')
if re.search(regex, pubkey):
if search(regex, pubkey):
return pubkey
elif re.search(regex_checksum, pubkey):
elif search(regex_checksum, pubkey):
pubkey, checksum = pubkey.split(":")
pubkey_byte = b58_decode(pubkey)
checksum_calculed = b58_encode(nacl.hash.sha256(
nacl.hash.sha256(pubkey_byte, nacl.encoding.RawEncoder),
nacl.encoding.RawEncoder))[:3]
checksum_calculed = b58_encode(hash.sha256(
hash.sha256(pubkey_byte, encoding.RawEncoder),
encoding.RawEncoder))[:3]
if checksum_calculed == checksum:
return pubkey
else:
......@@ -77,7 +74,7 @@ b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def b58_encode(b):
# Convert big-endian bytes to integer
n = int('0x0' + nacl.encoding.HexEncoder.encode(b).decode('utf8'), 16)
n = int('0x0' + encoding.HexEncoder.encode(b).decode('utf8'), 16)
# Divide that integer into base58
res = []
......@@ -115,7 +112,7 @@ def b58_decode(s):
h = '%x' % n
if len(h) % 2:
h = '0' + h
res = nacl.encoding.HexEncoder.decode(h.encode('utf8'))
res = encoding.HexEncoder.decode(h.encode('utf8'))
# Add padding back.
pad = 0
......@@ -133,6 +130,7 @@ def xor_bytes(b1, b2):
result.append(b1 ^ b2)
return result
def message_exit(message):
print(message)
sys.exit(1)
exit(1)
import re
from re import compile, search
import math
import time
import sys
from time import sleep
from sys import exit
import urllib
from tabulate import tabulate
from network_tools import *
from tools import *
from network_tools import get_request, post_request, get_current_block
from tools import get_currency_symbol, get_publickey_from_seed, sign_document_from_seed,\
check_public_key, message_exit
from auth import auth_method
from wot import *
from wot import get_uid_from_pubkey
from money import get_last_ud_value, get_amount_from_pubkey
from constants import NO_MATCHING_ID
def send_transaction(ep, cli_args):
......@@ -60,7 +63,7 @@ def check_transaction_values(comment, output, outputBackChange):
if outputBackChange:
outputBackChange = check_public_key(outputBackChange, True)
if output is False or outputBackChange is False:
sys.exit(1)
exit(1)
def transaction_confirmation(ep, c, issuer_pubkey, amount, ud, output, comment):
......@@ -100,9 +103,9 @@ def generate_and_send_transaction(ep, seed, issuers, AmountTransfered, outputAdd
print(" - Amount: " + str(totalAmountInput / 100))
transaction = generate_transaction_document(ep, issuers, totalAmountInput, listinput_and_amount, issuers, "Change operation")
transaction += sign_document_from_seed(transaction, seed) + "\n"
retour = post_request(ep, "tx/process", "transaction=" + urllib.parse.quote_plus(transaction))
post_request(ep, "tx/process", "transaction=" + urllib.parse.quote_plus(transaction))
print("Change Transaction successfully sent.")
time.sleep(1) # wait 1 second before sending a new transaction
sleep(1) # wait 1 second before sending a new transaction
else:
print("Generate Transaction:")
......@@ -115,13 +118,13 @@ def generate_and_send_transaction(ep, seed, issuers, AmountTransfered, outputAdd
transaction = generate_transaction_document(ep, issuers, AmountTransfered, listinput_and_amount, outputAddr, Comment, OutputbackChange)
transaction += sign_document_from_seed(transaction, seed) + "\n"
retour = post_request(ep, "tx/process", "transaction=" + urllib.parse.quote_plus(transaction))
post_request(ep, "tx/process", "transaction=" + urllib.parse.quote_plus(transaction))
print("Transaction successfully sent.")
break
def generate_transaction_document(ep, issuers, AmountTransfered, listinput_and_amount, outputaddr, Comment="", OutputbackChange=None):
outputAddr = check_public_key(outputaddr, True)
check_public_key(outputaddr, True)
if OutputbackChange:
OutputbackChange = check_public_key(OutputbackChange, True)
......@@ -130,7 +133,6 @@ def generate_transaction_document(ep, issuers, AmountTransfered, listinput_and_a
current_blk = get_current_block(ep)
currency_name = current_blk["currency"]
currency_symbol = get_currency_symbol(currency_name)
blockstamp_current = str(current_blk["number"]) + "-" + str(current_blk["hash"])
curentUnitBase = current_blk["unitbase"]
......@@ -257,8 +259,8 @@ def get_list_input_for_transaction(ep, pubkey, TXamount, allinput=False):
def checkComment(Comment):
if len(Comment) > 255:
message_exit("Error: Comment is too long")
regex = re.compile('^[0-9a-zA-Z\ \-\_\:\/\;\*\[\]\(\)\?\!\^\+\=\@\&\~\#\{\}\|\\\<\>\%\.]*$')
if not re.search(regex, Comment):
regex = compile('^[0-9a-zA-Z\ \-\_\:\/\;\*\[\]\(\)\?\!\^\+\=\@\&\~\#\{\}\|\\\<\>\%\.]*$')
if not search(regex, Comment):
message_exit("Error: the format of the comment is invalid")
......
import os
from os import system
from tabulate import tabulate
from collections import OrderedDict
from network_tools import get_request
from tools import message_exit, check_public_key
from constants import *
from constants import NO_MATCHING_ID
def get_sent_certifications(certs):
......@@ -31,7 +31,7 @@ def received_sent_certifications(ep, id):
id_certs = certs_id
break
certifications = OrderedDict()
os.system("clear")
system("clear")
for certs in id_certs["uids"]:
if certs["uid"].lower() == id.lower():
certifications["received"] = list()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment