Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 175_gva_migration
  • 429_rm_features
  • i18n
  • main
  • pages
  • release/0.11
  • release/0.12
  • v0.1.0
  • v0.10.0
  • v0.10.0rc0
  • v0.10.0rc1
  • v0.11.0
  • v0.11.0rc0
  • v0.11.1
  • v0.11.2
  • v0.12.0
  • v0.12.1
  • v0.2.0
  • v0.3.0
  • v0.4.0
  • v0.5.0
  • v0.6.0
  • v0.6.1
  • v0.6.2
  • v0.6.3
  • v0.6.4
  • v0.6.5
  • v0.7.0
  • v0.7.1
  • v0.7.2
  • v0.7.3
  • v0.7.4
  • v0.7.5
  • v0.7.6
  • v0.8.0
  • v0.8.1
  • v0.9.0
  • v0.9.0rc
38 results

Target

Select target project
  • elmau/silkaj
  • Mr-Djez/silkaj
  • jbar/silkaj
  • clients/python/silkaj
  • Bernard/silkaj
  • cebash/silkaj
  • jytou/silkaj
  • c-geek/silkaj
  • vincentux/silkaj
  • jeanlucdonnadieu/silkaj
  • matograine/silkaj
  • zicmama/silkaj
  • manutopik/silkaj
  • atrax/silkaj
14 results
Select Git revision
  • 72_rework_tx
  • dev
  • master
  • patch-1
  • 0.1.0
  • 0.2.0
  • v0.3.0
  • v0.4.0
  • v0.5.0
9 results
Show changes
Showing
with 496 additions and 1471 deletions
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import contextlib
import time
import urllib
from typing import Optional
from urllib.error import HTTPError
import rich_click as click
from duniterpy.api.bma import wot
from silkaj.constants import BMA_SLEEP
from silkaj.network import client_instance, exit_on_http_error
from silkaj.public_key import gen_pubkey_checksum
from silkaj.tui import Table
def identity_of(pubkey_uid: str) -> dict:
"""
Only works for members
Not able to get corresponding uid from a non-member identity
Able to know if an identity is member or not
"""
client = client_instance()
return client(wot.identity_of, pubkey_uid)
def is_member(pubkey_uid: str) -> Optional[dict]:
"""
Check identity is member
If member, return corresponding identity, else: False
"""
try:
return identity_of(pubkey_uid)
except HTTPError:
return None
def wot_lookup(identifier: str) -> list:
"""
:identifier: identity or pubkey in part or whole
Return received and sent certifications lists of matching identities
if one identity found
"""
client = client_instance()
return (client(wot.lookup, identifier))["results"]
def identities_from_pubkeys(pubkeys: list[str], uids: bool) -> list:
"""
Make list of pubkeys unique, and remove empty strings
Request identities
"""
if not uids:
return []
uniq_pubkeys = list(filter(None, set(pubkeys)))
identities = []
for pubkey in uniq_pubkeys:
time.sleep(BMA_SLEEP)
with contextlib.suppress(HTTPError):
identities.append(identity_of(pubkey))
return identities
def choose_identity(pubkey_uid: str) -> tuple[dict, str, list]:
"""
Get lookup from a pubkey or an uid
Loop over the double lists: pubkeys, then uids
If there is one uid, returns it
If there is multiple uids, prompt a selector
"""
try:
lookups = wot_lookup(pubkey_uid)
except urllib.error.HTTPError as e:
exit_on_http_error(e, 404, f"No identity found for {pubkey_uid}")
# Generate table containing the choices
identities_choices = {
"id": [],
"uid": [],
"pubkey": [],
"timestamp": [],
} # type: dict
for pubkey_index, lookup in enumerate(lookups):
for uid_index, identity in enumerate(lookup["uids"]):
identities_choices["id"].append(str(pubkey_index) + str(uid_index))
identities_choices["pubkey"].append(gen_pubkey_checksum(lookup["pubkey"]))
identities_choices["uid"].append(identity["uid"])
identities_choices["timestamp"].append(
identity["meta"]["timestamp"][:20] + "",
)
identities = len(identities_choices["uid"])
if identities == 1:
pubkey_index = 0
uid_index = 0
elif identities > 1:
table = Table().set_cols_dtype(["t", "t", "t", "t"])
table.fill_from_dict(identities_choices)
click.echo(table.draw())
# Loop till the passed value is in identities_choices
message = "Which identity would you like to select (id)?"
selected_id = None
while selected_id not in identities_choices["id"]:
selected_id = click.prompt(message)
pubkey_index = int(str(selected_id)[:-1])
uid_index = int(str(selected_id)[-1:])
return (
lookups[pubkey_index]["uids"][uid_index],
lookups[pubkey_index]["pubkey"],
lookups[pubkey_index]["signed"],
)
from tools import get_publickey_from_seed, b58_decode, xor_bytes, message_exit
from nacl import encoding
import nacl.hash
from scrypt import hash
import pyaes
from getpass import getpass
from os import path
from re import compile, search
def auth_method(cli_args):
if cli_args.contains_switches('auth-scrypt'):
return auth_by_scrypt(cli_args)
if cli_args.contains_switches('auth-seed'):
return auth_by_seed()
if cli_args.contains_switches('auth-file'):
return auth_by_auth_file(cli_args)
if cli_args.contains_switches('auth-wif'):
return auth_by_wif()
message_exit("Error: no authentication method")
def generate_auth_file(cli_args):
if cli_args.contains_definitions('file'):
file = cli_args.get_definition('file')
else:
file = "authfile"
seed = auth_method(cli_args)
with open(file, "w") as f:
f.write(seed)
print("Authfile generated for the public key: ",
get_publickey_from_seed(seed))
def auth_by_auth_file(cli_args):
if cli_args.contains_definitions('file'):
file = cli_args.get_definition('file')
else:
file = "authfile"
if not path.isfile(file):
message_exit("Error: the file \"" + file + "\" does not exist")
with open(file) as f:
filetxt = f.read()
regex_seed = compile('^[0-9a-fA-F]{64}$')
regex_gannonce = compile('^pub: [1-9A-HJ-NP-Za-km-z]{43,44}\nsec: [1-9A-HJ-NP-Za-km-z]{88,90}.*$')
# Seed Format
if search(regex_seed, filetxt):
seed = filetxt[0:64]
# gannonce.duniter.org Format
elif search(regex_gannonce, filetxt):
private_key = filetxt.split("sec: ")[1].split("\n")[0]
seed = encoding.HexEncoder.encode(b58_decode(private_key))[0:64].decode("utf-8")
else:
message_exit("Error: the format of the file is invalid")
return seed
def auth_by_seed():
seed = input("Please enter your seed on hex format: ")
regex = compile('^[0-9a-fA-F]{64}$')
if not search(regex, seed):
message_exit("Error: the format of the seed is invalid")
return seed
def auth_by_scrypt(cli_args):
salt = getpass("Please enter your Scrypt Salt (Secret identifier): ")
password = getpass("Please enter your Scrypt password (masked): ")
if cli_args.contains_definitions('n') and cli_args.contains_definitions('r') and cli_args.contains_definitions('p'):
n, r, p = cli_args.get_definition('n'), cli_args.get_definition('r'), cli_args.get_definition('p')
if n.isnumeric() and r.isnumeric() and p.isnumeric():
n, r, p = int(n), int(r), int(p)
if n <= 0 or n > 65536 or r <= 0 or r > 512 or p <= 0 or p > 32:
message_exit("Error: the values of Scrypt parameters are not good")
else:
message_exit("one of n, r or p is not a number")
else:
print("Using default values. Scrypt parameters not specified or wrong format")
n, r, p = 4096, 16, 1
print("Scrypt parameters used: N: {0}, r: {1}, p: {2}".format(n, r, p))
return get_seed_from_scrypt(salt, password, n, r, p)
def auth_by_wif():
wif = input("Please enter your WIF or Encrypted WIF address: ")
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not search(regex, wif):
message_exit("Error: the format of WIF is invalid")
wif_bytes = b58_decode(wif)
fi = wif_bytes[0:1]
if fi == b'\x01':
return get_seed_from_wifv1(wif)
elif fi == b'\x02':
password = getpass("Please enter the " +
"password of WIF (masked): ")
return get_seed_from_ewifv1(wif, password)
message_exit("Error: the format of WIF is invalid or unknown")
def get_seed_from_scrypt(salt, password, N=4096, r=16, p=1):
seed = hash(password, salt, N, r, p, 32)
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
return seedhex
def get_seed_from_wifv1(wif):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not search(regex, wif):
message_exit("Error: the format of WIF is invalid")
wif_bytes = b58_decode(wif)
if len(wif_bytes) != 35:
message_exit("Error: the size of WIF is invalid")
checksum_from_wif = wif_bytes[-2:]
fi = wif_bytes[0:1]
seed = wif_bytes[1:-2]
seed_fi = wif_bytes[0:-2]
if fi != b'\x01':
message_exit("Error: It's not a WIF format")
# checksum control
checksum = nacl.hash.sha256(
nacl.hash.sha256(seed_fi, encoding.RawEncoder),
encoding.RawEncoder)[0:2]
if checksum_from_wif != checksum:
message_exit("Error: bad checksum of the WIF")
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
return seedhex
def get_seed_from_ewifv1(ewif, password):
regex = compile('^[1-9A-HJ-NP-Za-km-z]*$')
if not search(regex, ewif):
message_exit("Error: the format of EWIF is invalid")
wif_bytes = b58_decode(ewif)
if len(wif_bytes) != 39:
message_exit("Error: the size of EWIF is invalid")
wif_no_checksum = wif_bytes[0:-2]
checksum_from_ewif = wif_bytes[-2:]
fi = wif_bytes[0:1]
salt = wif_bytes[1:5]
encryptedhalf1 = wif_bytes[5:21]
encryptedhalf2 = wif_bytes[21:37]
if fi != b'\x02':
message_exit("Error: It's not a EWIF format")
# Checksum Control
checksum = nacl.hash.sha256(
nacl.hash.sha256(wif_no_checksum, encoding.RawEncoder),
encoding.RawEncoder)[0:2]
if checksum_from_ewif != checksum:
message_exit("Error: bad checksum of EWIF address")
# SCRYPT
password = password.encode("utf-8")
scrypt_seed = hash(password, salt, 16384, 8, 8, 64)
derivedhalf1 = scrypt_seed[0:32]
derivedhalf2 = scrypt_seed[32:64]
# AES
aes = pyaes.AESModeOfOperationECB(derivedhalf2)
decryptedhalf1 = aes.decrypt(encryptedhalf1)
decryptedhalf2 = aes.decrypt(encryptedhalf2)
# XOR
seed1 = xor_bytes(decryptedhalf1, derivedhalf1[0:16])
seed2 = xor_bytes(decryptedhalf2, derivedhalf1[16:32])
seed = seed1+seed2
seedhex = encoding.HexEncoder.encode(seed).decode("utf-8")
# Password Control
salt_from_seed = nacl.hash.sha256(
nacl.hash.sha256(
b58_decode(get_publickey_from_seed(seedhex)),
encoding.RawEncoder),
encoding.RawEncoder)[0:4]
if salt_from_seed != salt:
message_exit("Error: bad Password of EWIF address")
return seedhex
import webbrowser
import urllib
from sys import exit
from pydoc import pager
from tabulate import tabulate
from auth import auth_method
from tools import get_publickey_from_seed, message_exit, sign_document_from_seed
from network_tools import get_current_block, post_request
from constants import NO_MATCHING_ID
from wot import is_member, get_pubkey_from_id, get_pubkeys_from_id,\
get_uid_from_pubkey
def send_certification(ep, cli_args):
current_blk = get_current_block(ep)
certified_uid = cli_args.subsubcmd
certified_pubkey = get_pubkey_from_id(ep, certified_uid)
# Check that the id is present on the network
if (certified_pubkey is NO_MATCHING_ID):
message_exit(NO_MATCHING_ID)
# Display license and ask for confirmation
license_approval(current_blk["currency"])
# Authentication
seed = auth_method(cli_args)
# Check whether current user is member
issuer_pubkey = get_publickey_from_seed(seed)
issuer_id = get_uid_from_pubkey(ep, issuer_pubkey)
if not is_member(ep, issuer_pubkey, issuer_id):
message_exit("Current identity is not member.")
# Check if this certification is already present on the network
id_lookup = get_pubkeys_from_id(ep, certified_uid)[0]
for certifiers in id_lookup["uids"][0]["others"]:
if certifiers["pubkey"] == issuer_pubkey:
message_exit("Identity already certified by " + issuer_id)
# Certification confirmation
if not certification_confirmation(issuer_id, issuer_pubkey, certified_uid, certified_pubkey):
return
cert_doc = generate_certification_document(id_lookup, current_blk, issuer_pubkey, certified_uid)
cert_doc += sign_document_from_seed(cert_doc, seed) + "\n"
# Send certification document
post_request(ep, "wot/certify", "cert=" + urllib.parse.quote_plus(cert_doc))
print("Certification successfully sent.")
def license_approval(currency):
if currency != "g1":
return
language = input("In which language would you like to display Ğ1 license [en/fr]? ")
if (language == "en"):
if not webbrowser.open("https://duniter.org/en/get-g1/"):
pager(open("licence-G1/license/license_g1-en.rst").read())
else:
if not webbrowser.open("https://duniter.org/fr/wiki/licence-g1/"):
pager(open("licence-G1/license/license_g1-fr-FR.rst").read())
if (input("Do you approve Ğ1 license [yes/no]? ") != "yes"):
exit(1)
def certification_confirmation(issuer_id, issuer_pubkey, certified_uid, certified_pubkey):
cert = list()
cert.append(["Cert", "From", "–>", "To"])
cert.append(["ID", issuer_id, "–>", certified_uid])
cert.append(["Pubkey", issuer_pubkey, "–>", certified_pubkey])
if input(tabulate(cert, tablefmt="fancy_grid") +
"\nDo you confirm sending this certification? [yes/no]: ") == "yes":
return True
def generate_certification_document(id_lookup, current_blk, issuer_pubkey, certified_uid):
return "Version: 10\n\
Type: Certification\n\
Currency: " + current_blk["currency"] + "\n\
Issuer: " + issuer_pubkey + "\n\
IdtyIssuer: " + id_lookup["pubkey"] + "\n\
IdtyUniqueID: " + certified_uid + "\n\
IdtyTimestamp: " + id_lookup["uids"][0]["meta"]["timestamp"] + "\n\
IdtySignature: " + id_lookup["uids"][0]["self"] + "\n\
CertTimestamp: " + str(current_blk["number"]) + "-" + current_blk["hash"] + "\n"
from datetime import datetime
from time import sleep
from os import system, popen
from collections import OrderedDict
from tabulate import tabulate
from operator import itemgetter
from wot import get_uid_from_pubkey
from network_tools import discover_peers, get_request, best_node, get_current_block
from tools import convert_time, get_currency_symbol, message_exit
from constants import NO_MATCHING_ID
def currency_info(ep):
info_type = ["newcomers", "certs", "actives", "leavers", "excluded", "ud", "tx"]
i, info_data = 0, dict()
while (i < len(info_type)):
info_data[info_type[i]] = get_request(ep, "blockchain/with/" + info_type[i])["result"]["blocks"]
i += 1
current = get_current_block(ep)
system("clear")
print("Connected to node:", ep[best_node(ep, 1)], ep["port"],
"\nCurrent block number:", current["number"],
"\nCurrency name:", get_currency_symbol(current["currency"]),
"\nNumber of members:", current["membersCount"],
"\nMinimal Proof-of-Work:", current["powMin"],
"\nCurrent time:", convert_time(current["time"], "all"),
"\nMedian time:", convert_time(current["medianTime"], "all"),
"\nDifference time:", convert_time(current["time"] - current["medianTime"], "hour"),
"\nNumber of blocks containing: \
\n- new comers:", len(info_data["newcomers"]),
"\n- Certifications:", len(info_data["certs"]),
"\n- Actives (members updating their membership):", len(info_data["actives"]),
"\n- Leavers:", len(info_data["leavers"]),
"\n- Excluded:", len(info_data["excluded"]),
"\n- UD created:", len(info_data["ud"]),
"\n- transactions:", len(info_data["tx"]))
def match_pattern(pow, match='', p=1):
while pow > 0:
if pow >= 16:
match += "0"
pow -= 16
p *= 16
else:
match += "[0-" + hex(15 - pow)[2:].upper() + "]"
p *= pow
pow = 0
return match + '*', p
def power(nbr, pow=0):
while nbr >= 10:
nbr /= 10
pow += 1
return "{0:.1f} × 10^{1}".format(nbr, pow)
def difficulties(ep):
while True:
diffi = get_request(ep, "blockchain/difficulties")
levels = [OrderedDict((i, d[i]) for i in ("uid", "level")) for d in diffi["levels"]]
diffi["levels"] = levels
current = get_current_block(ep)
issuers, sorted_diffi = 0, sorted(diffi["levels"], key=itemgetter("level"), reverse=True)
for d in diffi["levels"]:
if d["level"] / 2 < current["powMin"]:
issuers += 1
d["match"] = match_pattern(d["level"])[0][:20]
d["Π diffi"] = power(match_pattern(d["level"])[1])
d["Σ diffi"] = d.pop("level")
system("clear")
print("Minimal Proof-of-Work: {0} to match `{1}`\nDifficulty to generate next block n°{2} for {3}/{4} nodes:\n{5}"
.format(current["powMin"], match_pattern(int(current["powMin"]))[0], diffi["block"], issuers, len(diffi["levels"]),
tabulate(sorted_diffi, headers="keys", tablefmt="orgtbl", stralign="center")))
sleep(5)
network_sort_keys = ["block", "member", "diffi", "uid"]
def set_network_sort_keys(some_keys):
global network_sort_keys
if some_keys.endswith(","):
message_exit("Argument 'sort' ends with a comma, you have probably inserted a space after the comma, which is incorrect.")
network_sort_keys = some_keys.split(",")
def get_network_sort_key(endpoint):
t = list()
for akey in network_sort_keys:
if akey == 'diffi' or akey == 'block' or akey == 'port':
t.append(int(endpoint[akey]) if akey in endpoint else 0)
else:
t.append(str(endpoint[akey]) if akey in endpoint else "")
return tuple(t)
def network_info(ep, discover):
rows, columns = popen('stty size', 'r').read().split()
# print(rows, columns) # debug
wide = int(columns)
if wide < 146:
message_exit("Wide screen need to be larger than 146. Current wide: " + wide)
# discover peers
# and make sure fields are always ordered the same
endpoints = [OrderedDict((i, p.get(i, None)) for i in ("domain", "port", "ip4", "ip6", "pubkey")) for p in discover_peers(ep, discover)]
# Todo : renommer endpoints en info
diffi = get_request(ep, "blockchain/difficulties")
i, members = 0, 0
print("Getting informations about nodes:")
while (i < len(endpoints)):
print("{0:.0f}%".format(i/len(endpoints) * 100, 1), end=" ")
best_ep = best_node(endpoints[i], 0)
print(best_ep if best_ep is None else endpoints[i][best_ep], end=" ")
print(endpoints[i]["port"])
try:
endpoints[i]["uid"] = get_uid_from_pubkey(ep, endpoints[i]["pubkey"])
if endpoints[i]["uid"] is NO_MATCHING_ID:
endpoints[i]["uid"] = None
else:
endpoints[i]["member"] = "yes"
members += 1
except:
pass
if endpoints[i].get("member") is None:
endpoints[i]["member"] = "no"
endpoints[i]["pubkey"] = endpoints[i]["pubkey"][:5] + ""
# Todo: request difficulty from node point of view: two nodes with same pubkey id could be on diffrent branches and have different difficulties
# diffi = get_request(endpoints[i], "blockchain/difficulties") # super long, doit être requetté uniquement pour les nœuds d’une autre branche
for d in diffi["levels"]:
if endpoints[i].get("uid") is not None:
if endpoints[i]["uid"] == d["uid"]:
endpoints[i]["diffi"] = d["level"]
if len(endpoints[i]["uid"]) > 10:
endpoints[i]["uid"] = endpoints[i]["uid"][:9] + ""
current_blk = get_current_block(endpoints[i])
if current_blk is not None:
endpoints[i]["gen_time"] = convert_time(current_blk["time"], "hour")
if wide > 171:
endpoints[i]["mediantime"] = convert_time(current_blk["medianTime"], "hour")
if wide > 185:
endpoints[i]["difftime"] = convert_time(current_blk["time"] - current_blk["medianTime"], "hour")
endpoints[i]["block"] = current_blk["number"]
endpoints[i]["hash"] = current_blk["hash"][:10] + ""
endpoints[i]["version"] = get_request(endpoints[i], "node/summary")["duniter"]["version"]
if endpoints[i].get("domain") is not None and len(endpoints[i]["domain"]) > 20:
endpoints[i]["domain"] = "" + endpoints[i]["domain"][-20:]
if endpoints[i].get("ip6") is not None:
if wide < 156:
endpoints[i].pop("ip6")
else:
endpoints[i]["ip6"] = endpoints[i]["ip6"][:8] + ""
i += 1
system("clear")
print(len(endpoints), "peers ups, with", members, "members and", len(endpoints) - members, "non-members at", datetime.now().strftime("%H:%M:%S"))
endpoints = sorted(endpoints, key=get_network_sort_key)
print(tabulate(endpoints, headers="keys", tablefmt="orgtbl", stralign="center"))
def list_issuers(ep, nbr, last):
current_blk = get_current_block(ep)
current_nbr = current_blk["number"]
if nbr == 0:
nbr = current_blk["issuersFrame"]
url = "blockchain/blocks/" + str(nbr) + "/" + str(current_nbr - nbr + 1)
blocks, list_issuers, j = get_request(ep, url), list(), 0
issuers_dict = dict()
while j < len(blocks):
issuer = OrderedDict()
issuer["pubkey"] = blocks[j]["issuer"]
if last or nbr <= 30:
issuer["block"] = blocks[j]["number"]
issuer["gentime"] = convert_time(blocks[j]["time"], "hour")
issuer["mediantime"] = convert_time(blocks[j]["medianTime"], "hour")
issuer["hash"] = blocks[j]["hash"][:10]
issuers_dict[issuer["pubkey"]] = issuer
list_issuers.append(issuer)
j += 1
for pubkey in issuers_dict.keys():
issuer = issuers_dict[pubkey]
uid = get_uid_from_pubkey(ep, issuer["pubkey"])
for issuer2 in list_issuers:
if issuer2.get("pubkey") is not None and issuer.get("pubkey") is not None and \
issuer2["pubkey"] == issuer["pubkey"]:
issuer2["uid"] = uid
issuer2.pop("pubkey")
system("clear")
print("Issuers for last {0} blocks from block n°{1} to block n°{2}".format(nbr, current_nbr - nbr + 1, current_nbr), end=" ")
if last or nbr <= 30:
sorted_list = sorted(list_issuers, key=itemgetter("block"), reverse=True)
print("\n{0}".format(tabulate(sorted_list, headers="keys", tablefmt="orgtbl", stralign="center")))
else:
i, list_issued = 0, list()
while i < len(list_issuers):
j, found = 0, 0
while j < len(list_issued):
if list_issued[j].get("uid") is not None and \
list_issued[j]["uid"] == list_issuers[i]["uid"]:
list_issued[j]["blocks"] += 1
found = 1
break
j += 1
if found == 0:
issued = OrderedDict()
issued["uid"] = list_issuers[i]["uid"]
issued["blocks"] = 1
list_issued.append(issued)
i += 1
i = 0
while i < len(list_issued):
list_issued[i]["percent"] = list_issued[i]["blocks"] / nbr * 100
i += 1
sorted_list = sorted(list_issued, key=itemgetter("blocks"), reverse=True)
print("from {0} issuers\n{1}".format(len(list_issued),
tabulate(sorted_list, headers="keys", tablefmt="orgtbl", floatfmt=".1f", stralign="center")))
def argos_info(ep):
info_type = ["newcomers", "certs", "actives", "leavers", "excluded", "ud", "tx"]
pretty_names = {'g1': 'Ğ1', 'gtest': 'Ğtest'}
i, info_data = 0, dict()
while (i < len(info_type)):
info_data[info_type[i]] = get_request(ep, "blockchain/with/" + info_type[i])["result"]["blocks"]
i += 1
current = get_current_block(ep)
pretty = current["currency"]
if current["currency"] in pretty_names:
pretty = pretty_names[current["currency"]]
print(pretty, "|")
print("---")
href = 'href=http://%s:%s/' % (ep[best_node(ep, 1)], ep["port"])
print("Connected to node:", ep[best_node(ep, 1)], ep["port"], "|", href,
"\nCurrent block number:", current["number"],
"\nCurrency name:", get_currency_symbol(current["currency"]),
"\nNumber of members:", current["membersCount"],
"\nMinimal Proof-of-Work:", current["powMin"],
"\nCurrent time:", convert_time(current["time"], "all"),
"\nMedian time:", convert_time(current["medianTime"], "all"),
"\nDifference time:", convert_time(current["time"] - current["medianTime"], "hour"),
"\nNumber of blocks containing… \
\n-- new comers:", len(info_data["newcomers"]),
"\n-- Certifications:", len(info_data["certs"]),
"\n-- Actives (members updating their membership):", len(info_data["actives"]),
"\n-- Leavers:", len(info_data["leavers"]),
"\n-- Excluded:", len(info_data["excluded"]),
"\n-- UD created:", len(info_data["ud"]),
"\n-- transactions:", len(info_data["tx"]))
SILKAJ_VERSION = "silkaj 0.5.0"
NO_MATCHING_ID = "No matching identity"
G1_SYMBOL = "Ğ1"
GTEST_SYMBOL = "ĞTest"
G1_DEFAULT_ENDPOINT = "g1.duniter.org", "443"
from network_tools import get_request, get_current_block
from tools import get_currency_symbol, get_publickey_from_seed
from auth import auth_method
from wot import check_public_key
def cmd_amount(ep, cli_args):
if not cli_args.subsubcmd.startswith("--"):
pubkeys = cli_args.subsubcmd.split(":")
for pubkey in pubkeys:
pubkey = check_public_key(pubkey, True)
if not pubkey:
return
total = [0, 0]
for pubkey in pubkeys:
value = get_amount_from_pubkey(ep, pubkey)
show_amount_from_pubkey(ep, pubkey, value)
total[0] += value[0]
total[1] += value[1]
if (len(pubkeys) > 1):
show_amount_from_pubkey(ep, "Total", total)
else:
seed = auth_method(cli_args)
pubkey = get_publickey_from_seed(seed)
show_amount_from_pubkey(ep, pubkey, get_amount_from_pubkey(ep, pubkey))
def show_amount_from_pubkey(ep, pubkey, value):
totalAmountInput = value[0]
amount = value[1]
# output
UDvalue = get_last_ud_value(ep)
current_blk = get_current_block(ep)
currency_symbol = get_currency_symbol(current_blk["currency"])
if totalAmountInput - amount != 0:
print("Blockchain:")
print("-----------")
print("Relative =", round(amount / UDvalue, 2), "UD", currency_symbol)
print("Quantitative =", round(amount / 100, 2), currency_symbol + "\n")
print("Pending Transaction:")
print("--------------------")
print("Relative =", round((totalAmountInput - amount) / UDvalue, 2), "UD", currency_symbol)
print("Quantitative =", round((totalAmountInput - amount) / 100, 2), currency_symbol + "\n")
print("Total amount of: " + pubkey)
print("----------------------------------------------------------------")
print("Total Relative =", round(totalAmountInput / UDvalue, 2), "UD", currency_symbol)
print("Total Quantitative =", round(totalAmountInput / 100, 2), currency_symbol + "\n")
def get_amount_from_pubkey(ep, pubkey):
sources = get_request(ep, "tx/sources/" + pubkey)["sources"]
listinput = []
amount = 0
for source in sources:
if source["conditions"] == "SIG(" + pubkey + ")":
amount += source["amount"] * 10 ** source["base"]
listinput.append(str(source["amount"]) + ":" +
str(source["base"]) + ":" +
str(source["type"]) + ":" +
str(source["identifier"]) + ":" +
str(source["noffset"]))
# pending source
history = get_request(ep, "tx/history/" + pubkey + "/pending")["history"]
pendings = history["sending"] + history["receiving"] + history["pending"]
# print(pendings)
current_blk = get_current_block(ep)
last_block_number = int(current_blk["number"])
# add pending output
for pending in pendings:
blockstamp = pending["blockstamp"]
block_number = int(blockstamp.split("-")[0])
# if it's not an old transaction (bug in mirror node)
if block_number >= last_block_number - 3:
identifier = pending["hash"]
i = 0
for output in pending["outputs"]:
outputsplited = output.split(":")
if outputsplited[2] == "SIG(" + pubkey + ")":
inputgenerated = (
str(outputsplited[0]) + ":" +
str(outputsplited[1]) + ":T:" +
identifier + ":" + str(i)
)
if inputgenerated not in listinput:
listinput.append(inputgenerated)
i += 1
# remove input already used
for pending in pendings:
blockstamp = pending["blockstamp"]
block_number = int(blockstamp.split("-")[0])
# if it's not an old transaction (bug in mirror node)
if block_number >= last_block_number - 3:
for input in pending["inputs"]:
if input in listinput:
listinput.remove(input)
totalAmountInput = 0
for input in listinput:
inputsplit = input.split(":")
totalAmountInput += int(inputsplit[0]) * 10 ** int(inputsplit[1])
return int(totalAmountInput), int(amount)
def get_last_ud_value(ep):
blockswithud = get_request(ep, "blockchain/with/ud")["result"]
NBlastUDblock = blockswithud["blocks"][-1]
lastUDblock = get_request(ep, "blockchain/block/" + str(NBlastUDblock))
return lastUDblock["dividend"] * 10 ** lastUDblock["unitbase"]
from __future__ import unicode_literals
from ipaddress import ip_address
from json import loads
import socket
import urllib.request
from sys import exit, stderr
def discover_peers(ep, discover):
"""
From first node, discover his known nodes.
Remove from know nodes if nodes are down.
If discover option: scan all network to know all nodes.
display percentage discovering.
"""
endpoints = parse_endpoints(get_request(ep, "network/peers")["peers"])
if discover:
print("Discovering network")
for i, ep in enumerate(endpoints):
if discover:
print("{0:.0f}%".format(i / len(endpoints) * 100))
if best_node(ep, 0) is None:
endpoints.remove(ep)
elif discover:
endpoints = recursive_discovering(endpoints, ep)
return endpoints
def recursive_discovering(endpoints, ep):
"""
Discover recursively new nodes.
If new node found add it and try to found new node from his known nodes.
"""
news = parse_endpoints(get_request(ep, "network/peers")["peers"])
for new in news:
if best_node(new, 0) is not None and new not in endpoints:
endpoints.append(new)
recursive_discovering(endpoints, new)
return endpoints
def parse_endpoints(rep):
"""
endpoints[]{"domain", "ip4", "ip6", "pubkey"}
list of dictionaries
reps: raw endpoints
"""
i, j, endpoints = 0, 0, []
while (i < len(rep)):
if (rep[i]["status"] == "UP"):
while j < len(rep[i]["endpoints"]):
ep = parse_endpoint(rep[i]["endpoints"][j])
j += 1
if ep is None:
break
ep["pubkey"] = rep[i]["pubkey"]
endpoints.append(ep)
i += 1
j = 0
return endpoints
def parse_endpoint(rep):
"""
rep: raw endpoint, sep: split endpoint
domain, ip4 or ip6 could miss on raw endpoint
"""
ep, sep = {}, rep.split(" ")
if sep[0] == "BASIC_MERKLED_API":
if check_port(sep[-1]):
ep["port"] = sep[-1]
if len(sep) == 5 and check_ip(sep[1]) == 0 and check_ip(sep[2]) == 4 and check_ip(sep[3]) == 6:
ep["domain"], ep["ip4"], ep["ip6"] = sep[1], sep[2], sep[3]
if len(sep) == 4:
ep = endpoint_type(sep[1], ep)
ep = endpoint_type(sep[2], ep)
if len(sep) == 3:
ep = endpoint_type(sep[1], ep)
return ep
else:
return None
def endpoint_type(sep, ep):
typ = check_ip(sep)
if typ == 0:
ep["domain"] = sep
elif typ == 4:
ep["ip4"] = sep
elif typ == 6:
ep["ip6"] = sep
return ep
def check_ip(address):
try:
return ip_address(address).version
except:
return 0
def get_request(ep, path):
address = best_node(ep, 0)
if address is None:
return address
url = "http://" + ep[address] + ":" + ep["port"] + "/" + path
if ep["port"] == "443":
url = "https://" + ep[address] + "/" + path
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
encoding = response.info().get_content_charset('utf8')
return loads(response.read().decode(encoding))
def post_request(ep, path, postdata):
address = best_node(ep, 0)
if address is None:
return address
url = "http://" + ep[address] + ":" + ep["port"] + "/" + path
if ep["port"] == "443":
url = "https://" + ep[address] + "/" + path
request = urllib.request.Request(url, bytes(postdata, 'utf-8'))
try:
response = urllib.request.urlopen(request)
except urllib.error.URLError as e:
print(e, file=stderr)
exit(1)
encoding = response.info().get_content_charset('utf8')
return loads(response.read().decode(encoding))
def best_node(ep, main):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addresses, port = {"ip6", "ip4", "domain"}, int(ep["port"])
for address in addresses:
if address in ep:
try:
s.connect((ep[address], port))
s.close()
return address
except:
pass
if main:
print("Wrong node given as argument", file=stderr)
exit(1)
return None
def check_port(port):
try:
port = int(port)
except:
print("Port must be an integer", file=stderr)
exit(1)
if (port < 0 or port > 65536):
print("Wrong port number", file=stderr)
exit(1)
return 1
def get_current_block(ep):
return get_request(ep, "blockchain/current")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from sys import stderr
from commandlines import Command
from tx import send_transaction
from money import cmd_amount
from cert import send_certification
from commands import currency_info, difficulties, set_network_sort_keys,\
network_info, argos_info, list_issuers
from tools import message_exit
from network_tools import check_port, best_node
from wot import received_sent_certifications, id_pubkey_correspondence
from auth import generate_auth_file
from constants import SILKAJ_VERSION, G1_DEFAULT_ENDPOINT
def usage():
message_exit("Silkaj: command line Duniter client \
\n\nhelp: -h, --help, --usage \
\nversion: -v, --version \
\n \
\nCustom endpoint with option `-p` and <domain>:<port>\
\n \
\nCommands: \
\n - info: Display information about currency \
\n \
\n - amount: Get amount of accounts \
\n pubkeys and/or ids separated with colon: <pubkey:id:pubkey>\
\n --auth-scrypt [script parameters -n <N> -r <r> -p <p>] (default: 4096,16,1)\
\n --auth-seed | --auth-file [--file=<path file>] | --auth-wif\
\n \
\n - tx/transaction: Send transaction\
\n - authentication:\
\n --auth-scrypt [script parameters -n <N> -r <r> -p <p>] (default: 4096,16,1)\
\n --auth-seed | --auth-file [--file=<path file>] | --auth-wif\
\n - amount:\
\n --amountUD=<relative value> | --amount=<quantitative value>\
\n [--allSources] \
\n --output=<public key>[:checksum] \
\n [--comment=<comment>] \
\n [--outputBackChange=<public key[:checksum]>] \
\n [-y | --yes], don't ask for prompt confirmation \
\n \
\n - cert: Send certification\
\n - e.g.: silkaj cert <id> <auth>\
\n - authentication:\
\n --auth-scrypt [script parameters -n <N> -r <r> -p <p>] (default: 4096,16,1)\
\n --auth-seed | --auth-file [--file=<path file>] | --auth-wif\
\n \
\n - net/network: Display current network with many information \
\n [--discover] Discover all network (could take a while), optional \
\n [-s | --sort] Sort column names comma-separated (for example \"-s block,diffi\"), optional \
\n Default sort is block,member,diffi,uid \
\n \
\n - diffi: list proof-of-work difficulty to generate next block \
\n \
\n - issuers n: display last n issuers (`0` for current window size) \
\n last issuers are displayed under n <= 30.\
\n To force display last ones, use `--last` option\
\n \
\n - argos: display currency information formated for Argos or BitBar\
\n \
\n - generate_auth_file: Generate file to store the seed of the account\
\n --auth-scrypt [script parameters -n <N> -r <r> -p <p>] (default: 4096,16,1)\
\n --auth-seed | --auth-file [--file=<path file>] | --auth-wif\
\n \
\n - id/identities <pubkey> or <identity>: get corresponding identity or pubkey from pubkey or identity.\
\n it could autocomplete the pubkey corresponding to an identity with three or four following characters.\
\n \
\n - wot <pubkey> or <identity>: display received and sent certifications for an account.")
def cli():
# ep: endpoint, node's network interface
ep, cli_args = dict(), Command()
subcmd = ["info", "diffi", "net", "network", "issuers", "argos", "amount", "tx", "transaction", "cert", "generate_auth_file", "id", "identities", "wot"]
if cli_args.is_version_request():
message_exit(SILKAJ_VERSION)
if cli_args.is_help_request() or cli_args.is_usage_request() or cli_args.subcmd not in subcmd:
usage()
ep["domain"], ep["port"] = G1_DEFAULT_ENDPOINT
try:
ep["domain"], ep["port"] = cli_args.get_definition('p').rsplit(':', 1)
except:
print("Requested default node: <{}:{}>".format(ep["domain"], ep["port"]), file=stderr)
if ep["domain"].startswith('[') and ep["domain"].endswith(']'):
ep["domain"] = ep["domain"][1:-1]
return ep, cli_args
def manage_cmd(ep, c):
if cli_args.subcmd == "info":
currency_info(ep)
elif cli_args.subcmd == "diffi":
difficulties(ep)
elif cli_args.subcmd == "net" or cli_args.subcmd == "network":
if cli_args.contains_switches("sort"):
set_network_sort_keys(cli_args.get_definition("sort"))
if cli_args.contains_switches("s"):
set_network_sort_keys(cli_args.get_definition("s"))
network_info(ep, cli_args.contains_switches("discover"))
elif cli_args.subcmd == "issuers" and cli_args.subsubcmd and int(cli_args.subsubcmd) >= 0:
list_issuers(ep, int(cli_args.subsubcmd), cli_args.contains_switches('last'))
elif cli_args.subcmd == "argos":
argos_info(ep)
elif cli_args.subcmd == "amount" and cli_args.subsubcmd:
cmd_amount(ep, cli_args)
elif cli_args.subcmd == "tx" or cli_args.subcmd == "transaction":
send_transaction(ep, cli_args)
elif cli_args.subcmd == "cert":
send_certification(ep, c)
elif cli_args.subcmd == "generate_auth_file":
generate_auth_file(cli_args)
elif cli_args.subcmd == "id" or cli_args.subcmd == "identities":
id_pubkey_correspondence(ep, cli_args.subsubcmd)
elif cli_args.subcmd == "wot":
received_sent_certifications(ep, cli_args.subsubcmd)
if __name__ == '__main__':
ep, cli_args = cli()
check_port(ep["port"])
best_node(ep, 1)
manage_cmd(ep, cli_args)
from datetime import datetime
from nacl import encoding, signing, hash, bindings
from re import compile, search
from sys import exit
from constants import G1_SYMBOL, GTEST_SYMBOL
def convert_time(timestamp, kind):
ts = int(timestamp)
date = "%Y-%m-%d"
hour = "%H:%M"
second = ":%S"
if kind == "all":
pattern = date + " " + hour + second
elif kind == "date":
pattern = date
elif kind == "hour":
pattern = hour
if ts >= 3600:
pattern += second
return datetime.fromtimestamp(ts).strftime(pattern)
def get_currency_symbol(currency):
if currency == "g1":
return G1_SYMBOL
elif currency == "g1-test":
return GTEST_SYMBOL
def sign_document_from_seed(document, seed):
seed = bytes(seed, 'utf-8')
signing_key = signing.SigningKey(seed, encoding.HexEncoder)
signed = signing_key.sign(bytes(document, 'utf-8'))
signed_b64 = encoding.Base64Encoder.encode(signed.signature)
return signed_b64.decode("utf-8")
def get_publickey_from_seed(seed):
seed = bytes(seed, 'utf-8')
seed = encoding.HexEncoder.decode(seed)
public_key, secret_key = bindings.crypto_sign_seed_keypair(seed)
return b58_encode(public_key)
def check_public_key(pubkey, display_error):
"""
Check public key format
Check pubkey checksum which could be append after the pubkey
If check pass: return pubkey
"""
regex = compile('^[1-9A-HJ-NP-Za-km-z]{43,44}$')
regex_checksum = compile('^[1-9A-HJ-NP-Za-km-z]{43,44}' +
':[1-9A-HJ-NP-Za-km-z]{3}$')
if search(regex, pubkey):
return pubkey
elif search(regex_checksum, pubkey):
pubkey, checksum = pubkey.split(":")
pubkey_byte = b58_decode(pubkey)
checksum_calculed = b58_encode(hash.sha256(
hash.sha256(pubkey_byte, encoding.RawEncoder),
encoding.RawEncoder))[:3]
if checksum_calculed == checksum:
return pubkey
else:
print("error: bad checksum of the public key")
return False
elif display_error:
print("Error: the format of the public key is invalid")
return False
b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def b58_encode(b):
# Convert big-endian bytes to integer
n = int('0x0' + encoding.HexEncoder.encode(b).decode('utf8'), 16)
# Divide that integer into base58
res = []
while n > 0:
n, r = divmod(n, 58)
res.append(b58_digits[r])
res = ''.join(res[::-1])
# Encode leading zeros as base58 zeros
czero = 0
pad = 0
for c in b:
if c == czero:
pad += 1
else:
break
return b58_digits[0] * pad + res
def b58_decode(s):
if not s:
return b''
# Convert the string to an integer
n = 0
for c in s:
n *= 58
if c not in b58_digits:
raise InvalidBase58Error('Character %r is not a ' +
'valid base58 character' % c)
digit = b58_digits.index(c)
n += digit
# Convert the integer to bytes
h = '%x' % n
if len(h) % 2:
h = '0' + h
res = encoding.HexEncoder.decode(h.encode('utf8'))
# Add padding back.
pad = 0
for c in s[:-1]:
if c == b58_digits[0]:
pad += 1
else:
break
return b'\x00' * pad + res
def xor_bytes(b1, b2):
result = bytearray()
for b1, b2 in zip(b1, b2):
result.append(b1 ^ b2)
return result
def message_exit(message):
print(message)
exit(1)
from re import compile, search
import math
from time import sleep
from sys import exit
import urllib
from tabulate import tabulate
from network_tools import get_request, post_request, get_current_block
from tools import get_currency_symbol, get_publickey_from_seed, sign_document_from_seed,\
check_public_key, message_exit
from auth import auth_method
from wot import get_uid_from_pubkey
from money import get_last_ud_value, get_amount_from_pubkey
from constants import NO_MATCHING_ID
def send_transaction(ep, cli_args):
"""
Main function
"""
ud = get_last_ud_value(ep)
amount, output, comment, allSources, outputBackChange = cmd_transaction(cli_args, ud)
check_transaction_values(comment, output, outputBackChange)
seed = auth_method(cli_args)
issuer_pubkey = get_publickey_from_seed(seed)
tx_confirmation = transaction_confirmation(ep, cli_args, issuer_pubkey, amount, ud, output, comment)
if cli_args.contains_switches('yes') or cli_args.contains_switches('y') or \
input(tabulate(tx_confirmation, tablefmt="fancy_grid") +
"\nDo you confirm sending this transaction? [yes/no]: ") == "yes":
generate_and_send_transaction(ep, seed, issuer_pubkey, amount, output, comment, allSources, outputBackChange)
def cmd_transaction(cli_args, ud):
"""
Retrieve values from command line interface
"""
if not (cli_args.contains_definitions('amount') or cli_args.contains_definitions('amountUD')):
message_exit("--amount or --amountUD is not set")
if not cli_args.contains_definitions('output'):
message_exit("--output is not set")
if cli_args.contains_definitions('amount'):
amount = int(float(cli_args.get_definition('amount')) * 100)
if cli_args.contains_definitions('amountUD'):
amount = int(float(cli_args.get_definition('amountUD')) * ud)
output = cli_args.get_definition('output')
comment = cli_args.get_definition('comment') if cli_args.contains_definitions('comment') else ""
allSources = cli_args.contains_switches('allSources')
if cli_args.contains_definitions('outputBackChange'):
outputBackChange = cli_args.get_definition('outputBackChange')
else:
outputBackChange = None
return amount, output, comment, allSources, outputBackChange
def check_transaction_values(comment, output, outputBackChange):
checkComment(comment)
output = check_public_key(output, True)
if outputBackChange:
outputBackChange = check_public_key(outputBackChange, True)
if output is False or outputBackChange is False:
exit(1)
def transaction_confirmation(ep, c, issuer_pubkey, amount, ud, output, comment):
"""
Generate transaction confirmation
"""
tx = list()
currency_symbol = get_currency_symbol(get_current_block(ep)["currency"])
tx.append(["amount (" + currency_symbol + ")", amount / 100])
tx.append(["amount (UD " + currency_symbol + ")", amount / ud])
tx.append(["from", issuer_pubkey])
id_from = get_uid_from_pubkey(ep, issuer_pubkey)
if id_from is not NO_MATCHING_ID:
tx.append(["from (id)", id_from])
tx.append(["to", output])
id_to = get_uid_from_pubkey(ep, output)
if id_to is not NO_MATCHING_ID:
tx.append(["to (id)", id_to])
tx.append(["comment", comment])
return tx
def generate_and_send_transaction(ep, seed, issuers, AmountTransfered, outputAddr, Comment="", all_input=False, OutputbackChange=None):
totalamount = get_amount_from_pubkey(ep, issuers)[0]
if totalamount < AmountTransfered:
message_exit("the account: " + issuers + " don't have enough money for this transaction")
while True:
listinput_and_amount = get_list_input_for_transaction(ep, issuers, AmountTransfered, all_input)
intermediatetransaction = listinput_and_amount[2]
if intermediatetransaction:
totalAmountInput = listinput_and_amount[1]
print("Generate Change Transaction")
print(" - From: " + issuers)
print(" - To: " + issuers)
print(" - Amount: " + str(totalAmountInput / 100))
transaction = generate_transaction_document(ep, issuers, totalAmountInput, listinput_and_amount, issuers, "Change operation")
transaction += sign_document_from_seed(transaction, seed) + "\n"
post_request(ep, "tx/process", "transaction=" + urllib.parse.quote_plus(transaction))
print("Change Transaction successfully sent.")
sleep(1) # wait 1 second before sending a new transaction
else:
print("Generate Transaction:")
print(" - From: " + issuers)
print(" - To: " + outputAddr)
if all_input:
print(" - Amount: " + str(listinput_and_amount[1] / 100))
else:
print(" - Amount: " + str(AmountTransfered / 100))
transaction = generate_transaction_document(ep, issuers, AmountTransfered, listinput_and_amount, outputAddr, Comment, OutputbackChange)
transaction += sign_document_from_seed(transaction, seed) + "\n"
post_request(ep, "tx/process", "transaction=" + urllib.parse.quote_plus(transaction))
print("Transaction successfully sent.")
break
def generate_transaction_document(ep, issuers, AmountTransfered, listinput_and_amount, outputaddr, Comment="", OutputbackChange=None):
check_public_key(outputaddr, True)
if OutputbackChange:
OutputbackChange = check_public_key(OutputbackChange, True)
listinput = listinput_and_amount[0]
totalAmountInput = listinput_and_amount[1]
current_blk = get_current_block(ep)
currency_name = current_blk["currency"]
blockstamp_current = str(current_blk["number"]) + "-" + str(current_blk["hash"])
curentUnitBase = current_blk["unitbase"]
if not OutputbackChange:
OutputbackChange = issuers
# if it's not a foreign exchange transaction, we remove units after 2 digits after the decimal point.
if issuers != outputaddr:
AmountTransfered = (AmountTransfered // 10 ** curentUnitBase) * 10 ** curentUnitBase
# Generate output
################
listoutput = []
# Outputs to receiver (if not himself)
rest = AmountTransfered
unitbase = curentUnitBase
while rest > 0:
outputAmount = truncBase(rest, unitbase)
rest -= outputAmount
if outputAmount > 0:
outputAmount = int(outputAmount / math.pow(10, unitbase))
listoutput.append(str(outputAmount) + ":" + str(unitbase) + ":SIG(" + outputaddr + ")")
unitbase = unitbase - 1
# Outputs to himself
unitbase = curentUnitBase
rest = totalAmountInput - AmountTransfered
while rest > 0:
outputAmount = truncBase(rest, unitbase)
rest -= outputAmount
if outputAmount > 0:
outputAmount = int(outputAmount / math.pow(10, unitbase))
listoutput.append(str(outputAmount) + ":" + str(unitbase) + ":SIG(" + OutputbackChange + ")")
unitbase = unitbase - 1
# Generate transaction document
##############################
transaction_document = "Version: 10\n"
transaction_document += "Type: Transaction\n"
transaction_document += "Currency: " + currency_name + "\n"
transaction_document += "Blockstamp: " + blockstamp_current + "\n"
transaction_document += "Locktime: 0\n"
transaction_document += "Issuers:\n"
transaction_document += issuers + "\n"
transaction_document += "Inputs:\n"
for input in listinput:
transaction_document += input + "\n"
transaction_document += "Unlocks:\n"
for i in range(0, len(listinput)):
transaction_document += str(i) + ":SIG(0)\n"
transaction_document += "Outputs:\n"
for output in listoutput:
transaction_document += output + "\n"
transaction_document += "Comment: " + Comment + "\n"
return transaction_document
def get_list_input_for_transaction(ep, pubkey, TXamount, allinput=False):
# real source in blockchain
sources = get_request(ep, "tx/sources/" + pubkey)["sources"]
if sources is None:
return None
listinput = []
for source in sources:
if source["conditions"] == "SIG(" + pubkey + ")":
listinput.append(str(source["amount"]) + ":" + str(source["base"]) + ":" + str(source["type"]) + ":" + str(source["identifier"]) + ":" + str(source["noffset"]))
# pending source
history = get_request(ep, "tx/history/" + pubkey + "/pending")["history"]
pendings = history["sending"] + history["receiving"] + history["pending"]
current_blk = get_current_block(ep)
last_block_number = int(current_blk["number"])
# add pending output
for pending in pendings:
blockstamp = pending["blockstamp"]
block_number = int(blockstamp.split("-")[0])
# if it's not an old transaction (bug in mirror node)
if block_number >= last_block_number - 3:
identifier = pending["hash"]
i = 0
for output in pending["outputs"]:
outputsplited = output.split(":")
if outputsplited[2] == "SIG("+pubkey+")":
inputgenerated = str(outputsplited[0]) + ":" + str(outputsplited[1]) + ":T:" + identifier + ":" + str(i)
if inputgenerated not in listinput:
listinput.append(inputgenerated)
i += 1
# remove input already used
for pending in pendings:
blockstamp = pending["blockstamp"]
block_number = int(blockstamp.split("-")[0])
# if it's not an old transaction (bug in mirror node)
if block_number >= last_block_number - 3:
for input in pending["inputs"]:
if input in listinput:
listinput.remove(input)
# generate final list source
listinputfinal = []
totalAmountInput = 0
intermediatetransaction = False
for input in listinput:
listinputfinal.append(input)
inputsplit = input.split(":")
totalAmountInput += int(inputsplit[0]) * 10 ** int(inputsplit[1])
TXamount -= int(inputsplit[0]) * 10 ** int(inputsplit[1])
# if more 40 sources, it's an intermediate transaction
if len(listinputfinal) >= 40:
intermediatetransaction = True
break
if TXamount <= 0 and not allinput:
break
if TXamount > 0 and not intermediatetransaction:
message_exit("Error: you don't have enough money")
return listinputfinal, totalAmountInput, intermediatetransaction
def checkComment(Comment):
if len(Comment) > 255:
message_exit("Error: Comment is too long")
regex = compile('^[0-9a-zA-Z\ \-\_\:\/\;\*\[\]\(\)\?\!\^\+\=\@\&\~\#\{\}\|\\\<\>\%\.]*$')
if not search(regex, Comment):
message_exit("Error: the format of the comment is invalid")
def truncBase(amount, base):
pow = math.pow(10, base)
if amount < pow:
return 0
return math.trunc(amount / pow) * pow
from os import system
from tabulate import tabulate
from collections import OrderedDict
from network_tools import get_request
from tools import message_exit, check_public_key, convert_time
from constants import NO_MATCHING_ID
def get_sent_certifications(certs, time_first_block, params):
sent = list()
expire = list()
if certs["signed"]:
for cert in certs["signed"]:
sent.append(cert["uid"])
expire.append(expiration_date_from_block_id(cert["cert_time"]["block"], time_first_block, params))
return sent, expire
def received_sent_certifications(ep, id):
"""
check id exist
many identities could exist
retrieve the one searched
get id of received and sent certifications
display on a chart the result with the numbers
"""
params = get_request(ep, "blockchain/parameters")
time_first_block = get_request(ep, "blockchain/block/1")["time"]
if get_pubkeys_from_id(ep, id) == NO_MATCHING_ID:
message_exit(NO_MATCHING_ID)
certs_req = get_request(ep, "wot/lookup/" + id)["results"]
for certs_id in certs_req:
if certs_id['uids'][0]['uid'].lower() == id.lower():
id_certs = certs_id
break
certifications = OrderedDict()
system("clear")
for certs in id_certs["uids"]:
if certs["uid"].lower() == id.lower():
certifications["received_expire"] = list()
certifications["received"] = list()
for cert in certs["others"]:
certifications["received_expire"].append(expiration_date_from_block_id(cert["meta"]["block_number"], time_first_block, params))
certifications["received"].append(cert["uids"][0])
certifications["sent"], certifications["sent_expire"] = get_sent_certifications(id_certs, time_first_block, params)
print("{0} ({1}) from block #{2}\nreceived {3} and sent {4} certifications:\n{5}\n"
.format(id, id_certs["pubkey"][:5] + "", certs["meta"]["timestamp"][:15] + "",
len(certifications["received"]), len(certifications["sent"]),
tabulate(certifications, headers="keys", tablefmt="orgtbl", stralign="center")))
def expiration_date_from_block_id(block_id, time_first_block, params):
return convert_time(date_approximation(block_id, time_first_block, params["avgGenTime"]) + params["sigValidity"], "date")
def date_approximation(block_id, time_first_block, avgentime):
return time_first_block + block_id * avgentime
def id_pubkey_correspondence(ep, id_pubkey):
if check_public_key(id_pubkey, False):
print("{} public key corresponds to identity: {}".format(id_pubkey, get_uid_from_pubkey(ep, id_pubkey)))
else:
pubkeys = get_pubkeys_from_id(ep, id_pubkey)
if pubkeys == NO_MATCHING_ID:
print(NO_MATCHING_ID)
else:
print("Public keys found matching '{}':\n".format(id_pubkey))
for pubkey in pubkeys:
print("", pubkey["pubkey"], end=" ")
try:
print("" + get_request(ep, "wot/identity-of/" + pubkey["pubkey"])["uid"])
except:
print("")
def get_uid_from_pubkey(ep, pubkey):
try:
results = get_request(ep, "wot/lookup/" + pubkey)
except:
return NO_MATCHING_ID
i, results = 0, results["results"]
while i < len(results):
if results[i]["uids"][0]["uid"] != pubkey:
return results[i]["uids"][0]["uid"]
i += 1
def get_pubkeys_from_id(ep, uid):
try:
results = get_request(ep, "wot/lookup/" + uid)
except:
return NO_MATCHING_ID
return results["results"]
def is_member(ep, pubkey, uid):
members = get_request(ep, "wot/members")["results"]
for member in members:
if (pubkey in member["pubkey"] and uid in member["uid"]):
return(True)
return(False)
def get_pubkey_from_id(ep, uid):
members = get_request(ep, "wot/members")["results"]
for member in members:
if (uid in member["uid"]):
return(member["pubkey"])
return(NO_MATCHING_ID)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import click
from silkaj import cli
def define_click_context(**kwargs):
ctx = click.Context(cli.cli)
ctx.obj = {}
for kwarg in kwargs.items():
ctx.obj[kwarg[0].upper()] = kwarg[1]
click.globals.push_context(ctx)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
from subprocess import check_output
silkaj = ["poetry", "run", "silkaj"]
def test_info():
"""tests 'silkaj blockchain info' returns a number of members"""
output = check_output([*silkaj, "blockchain", "info"])
assert "Number of members" in output.decode()
def test_wot_status():
"""tests 'silkaj wot status' returns a number of members"""
output = check_output([*silkaj, "wot", "status", "Matograine"]).decode()
assert "Matograine (CmFKubyq…:CQ5) from block #106433-00000340…" in output
assert "received_expire" in output
assert "received" in output
assert "sent" in output
assert "sent_expire" in output
def test_wot_lookup():
"""tests 'silkaj wot lookup' certification on gtest"""
output = check_output([*silkaj, "--gtest", "wot", "lookup", "elois"]).decode()
assert "D7CYHJXjaH4j7zRdWngUbsURPnSnjsCYtvo6f8dvW3C" in output
def test_money_balance():
"""tests 'silkaj money balance' command on gtest"""
output = check_output(
[
*silkaj,
"--gtest",
"money",
"balance",
"3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj",
],
).decode()
assert (
"│ Balance of pubkey │ 3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj"
in output
)
assert ":EyF" in output
assert "│ Total balance (unit|relative) │" in output
assert "UD ĞTest" in output
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
from unittest.mock import Mock
import pytest
from click.testing import CliRunner
from duniterpy.documents import get_block_id
from duniterpy.key import SigningKey
from silkaj import auth
from silkaj.blockchain import tools as bc_tools
from silkaj.cli import cli
from silkaj.wot import membership
from silkaj.wot import tools as w_tools
from tests.patched.blockchain_tools import fake_block_id, patched_get_head_block
# Values and patches
PUBKEY = "EA7Dsw39ShZg4SpURsrgMaMqrweJPUFPYHwZA8e92e3D"
identity_block_id = get_block_id(
"0-E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
)
identity_uid = "toto"
membership_block_id = fake_block_id
def patched_auth_method():
return SigningKey.from_credentials(identity_uid, identity_uid)
def patched_choose_identity(pubkey):
return (
{"uid": identity_uid, "meta": {"timestamp": identity_block_id}},
PUBKEY,
None,
)
@pytest.mark.parametrize(
("dry_run", "display", "confirmation"),
[
(True, False, False),
(False, True, False),
(False, True, True),
(False, False, True),
],
)
def test_membership_cmd(dry_run, display, confirmation, monkeypatch):
# Monkeypatch and Mock
monkeypatch.setattr(auth, "auth_method", patched_auth_method)
monkeypatch.setattr(bc_tools, "get_head_block", patched_get_head_block)
monkeypatch.setattr(w_tools, "choose_identity", patched_choose_identity)
patched_display_confirmation_table = Mock()
monkeypatch.setattr(
membership,
"display_confirmation_table",
patched_display_confirmation_table,
)
if not dry_run and not display:
patched_generate_membership_document = Mock()
monkeypatch.setattr(
membership,
"generate_membership_document",
patched_generate_membership_document,
)
# Run membership command
command = []
if dry_run:
command += ["--dry-run"]
if display:
command += ["--display"]
command += ["wot", "membership"]
pass_license = "No\nYes\n"
confirmations = pass_license + ("Yes" if confirmation else "No")
result = CliRunner().invoke(cli, args=command, input=confirmations)
# Assert functions are called
patched_display_confirmation_table.assert_called_once_with(
identity_uid,
PUBKEY,
identity_block_id,
)
if dry_run or display:
assert "Type: Membership" in result.output
else:
# signing_key = patched_auth_method()
patched_generate_membership_document.assert_called_once()
# membership_block_id is different
# patched_generate_membership_document.assert_called_once_with(
# PUBKEY,
# membership_block_id,
# identity_uid,
# identity_block_id,
# currency,
# signing_key,
# )
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# This file contains patches for auth functions.
from duniterpy.key import SigningKey
def patched_auth_method(uid):
"""
insecure way to test keys
"""
return SigningKey.from_credentials(uid, uid)
def patched_auth_by_seed():
return "call_auth_by_seed"
def patched_auth_by_wif():
return "call_auth_by_wif"
def patched_auth_by_auth_file(authfile):
return "call_auth_by_auth_file"
def patched_auth_by_scrypt(nrp):
return "call_auth_by_scrypt"
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# This file contains fake values for testing purposes
from duniterpy.documents.block_id import BlockID
currency = "g1"
mocked_block = {
"number": 48000,
"time": 1592243760,
"unitbase": 0,
"currency": currency,
"hash": "0000010D30B1284D34123E036B7BE0A449AE9F2B928A77D7D20E3BDEAC7EE14C",
}
mocked_block_gtest = {
"number": 48000,
"time": 1592243760,
"unitbase": 0,
"currency": "g1-test",
"hash": "0000010D30B1284D34123E036B7BE0A449AE9F2B928A77D7D20E3BDEAC7EE14C",
}
fake_block_id = BlockID(
48000,
"0000010D30B1284D34123E036B7BE0A449AE9F2B928A77D7D20E3BDEAC7EE14C",
)
def patched_params(self):
return {
"msValidity": 31557600,
"msPeriod": 5259600,
}
def patched_block(self, number):
return mocked_block
# mock get_head_block()
def patched_get_head_block():
return mocked_block
def patched_get_head_block_gtest():
return mocked_block_gtest
def patched_get_currency():
return currency
def patched_get_currency_gtest():
return "g1-test"