Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 175_gva_migration
  • 429_rm_features
  • i18n
  • main
  • pages
  • release/0.11
  • release/0.12
  • v0.1.0
  • v0.10.0
  • v0.10.0rc0
  • v0.10.0rc1
  • v0.11.0
  • v0.11.0rc0
  • v0.11.1
  • v0.11.2
  • v0.12.0
  • v0.12.1
  • v0.2.0
  • v0.3.0
  • v0.4.0
  • v0.5.0
  • v0.6.0
  • v0.6.1
  • v0.6.2
  • v0.6.3
  • v0.6.4
  • v0.6.5
  • v0.7.0
  • v0.7.1
  • v0.7.2
  • v0.7.3
  • v0.7.4
  • v0.7.5
  • v0.7.6
  • v0.8.0
  • v0.8.1
  • v0.9.0
  • v0.9.0rc
38 results

Target

Select target project
  • elmau/silkaj
  • Mr-Djez/silkaj
  • jbar/silkaj
  • clients/python/silkaj
  • Bernard/silkaj
  • cebash/silkaj
  • jytou/silkaj
  • c-geek/silkaj
  • vincentux/silkaj
  • jeanlucdonnadieu/silkaj
  • matograine/silkaj
  • zicmama/silkaj
  • manutopik/silkaj
  • atrax/silkaj
14 results
Select Git revision
  • 72_rework_tx
  • dev
  • master
  • patch-1
  • 0.1.0
  • 0.2.0
  • v0.3.0
  • v0.4.0
  • v0.5.0
9 results
Show changes
Showing
with 3088 additions and 0 deletions
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import contextlib
import rich_click as click
from duniterpy.api.endpoint import endpoint as du_endpoint
from silkaj import tools
from silkaj.about import about
from silkaj.auth import generate_auth_file
from silkaj.blockchain.blocks import list_blocks
from silkaj.blockchain.difficulty import difficulties
from silkaj.blockchain.information import currency_info
from silkaj.checksum import checksum_command
from silkaj.constants import (
G1_DEFAULT_ENDPOINT,
G1_TEST_DEFAULT_ENDPOINT,
SILKAJ_VERSION,
)
from silkaj.g1_monetary_license import license_command
from silkaj.money.balance import balance_cmd
from silkaj.money.history import transaction_history
from silkaj.money.transfer import transfer_money
from silkaj.wot import certify, revocation
from silkaj.wot.lookup import lookup_cmd
from silkaj.wot.membership import send_membership
from silkaj.wot.status import status
click.rich_click.SHOW_ARGUMENTS = True
click.rich_click.OPTION_GROUPS = {
"silkaj": [
{
"name": "Basic options",
"options": ["--help", "--version"],
},
{
"name": "Endpoint and currency specification",
"options": ["--endpoint", "--gtest"],
},
{
"name": "Account and authentication specification",
"options": ["--account", "--password"],
},
],
}
@click.group()
@click.help_option("-h", "--help")
@click.version_option(SILKAJ_VERSION, "-v", "--version")
@click.option(
"--endpoint",
"-ep",
help=f"Without specifying this option, the default endpoint reaches Ğ1 currency on its official endpoint: https://{du_endpoint(G1_DEFAULT_ENDPOINT).host}. \
--endpoint allows to specify a custom endpoint following `<host>:<port>/<path>` format. \
`port` and `path` are optional. In case no port is specified, it defaults to 443.",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["gtest"],
)
@click.option(
"--gtest",
"-gt",
is_flag=True,
help=f"Uses official ĞTest currency endpoint: https://{du_endpoint(G1_TEST_DEFAULT_ENDPOINT).host}",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["endpoint"],
)
@click.option(
"account_name",
"--account",
"-a",
help="Account name used in storage `$HOME/.local/share/silkaj/$currency/$account_name` for authentication and revocation.",
)
@click.option(
"--password",
"-p",
help="EWIF authentication password. If you use this option, prefix the command \
with a space so the password is not saved in your shell history. \
In case of an encrypted file, password input will be prompted.",
)
@click.option(
"--display",
"-d",
is_flag=True,
help="Display the generated document before sending it",
)
@click.option(
"--dry-run",
"-n",
is_flag=True,
help="By-pass the licence and confirmation. Do not send the document, but display it instead",
)
@click.pass_context
def cli(
ctx: click.Context,
endpoint: str,
gtest: bool,
account_name: str,
password: str,
display: bool,
dry_run: bool,
) -> None:
if display and dry_run:
ctx.fail("Display and dry-run options can not be used together")
ctx.obj = {}
ctx.ensure_object(dict)
ctx.obj["ENDPOINT"] = endpoint
ctx.obj["GTEST"] = gtest
ctx.obj["ACCOUNT_NAME"] = account_name
ctx.obj["PASSWORD"] = password
ctx.obj["DISPLAY_DOCUMENT"] = display
ctx.obj["DRY_RUN"] = dry_run
ctx.help_option_names = ["-h", "--help"]
cli.add_command(about)
cli.add_command(generate_auth_file)
cli.add_command(checksum_command)
cli.add_command(license_command)
@cli.group("blockchain", help="Blockchain related commands")
def blockchain_group() -> None:
pass
blockchain_group.add_command(list_blocks)
blockchain_group.add_command(difficulties)
blockchain_group.add_command(currency_info)
@cli.group("money", help="Money management related commands")
def money_group() -> None:
pass
money_group.add_command(balance_cmd)
money_group.add_command(transaction_history)
money_group.add_command(transfer_money)
@cli.group("wot", help="Web-of-Trust related commands")
def wot_group() -> None:
pass
wot_group.add_command(certify.certify)
with contextlib.suppress(ModuleNotFoundError):
from silkaj.wot.exclusions import exclusions_command
wot_group.add_command(exclusions_command)
wot_group.add_command(lookup_cmd)
wot_group.add_command(send_membership)
wot_group.add_command(status)
@wot_group.group("revocation", help="Manage revocation document commands.")
def revocation_group() -> None:
pass
revocation_group.add_command(revocation.create)
revocation_group.add_command(revocation.verify)
revocation_group.add_command(revocation.publish)
revocation_group.add_command(revocation.revoke_now)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
SILKAJ_VERSION = "0.20.0dev"
G1_SYMBOL = "Ğ1"
GTEST_SYMBOL = "ĞTest"
G1_DEFAULT_ENDPOINT = "BMAS g1.duniter.org 443"
G1_TEST_DEFAULT_ENDPOINT = "BMAS g1-test.duniter.org 443"
G1_CSP_USER_ENDPOINT = "ES_USER_API g1.data.e-is.pro 443"
GTEST_CSP_USER_ENDPOINT = "ES_USER_API g1-test.data.e-is.pro 443"
ONE_HOUR = 3600
SUCCESS_EXIT_STATUS = 0
FAILURE_EXIT_STATUS = 1
BMA_MAX_BLOCKS_CHUNK_SIZE = 5000
BMA_SLEEP = 0.1
PUBKEY_MIN_LENGTH = 43
PUBKEY_MAX_LENGTH = 44
PUBKEY_PATTERN = f"[1-9A-HJ-NP-Za-km-z]{{{PUBKEY_MIN_LENGTH},{PUBKEY_MAX_LENGTH}}}"
MINIMAL_ABSOLUTE_TX_AMOUNT = 0.01
MINIMAL_RELATIVE_TX_AMOUNT = 1e-6
CENT_MULT_TO_UNIT = 100
SHORT_PUBKEY_SIZE = 8
# pendulum constants
# see https://pendulum.eustace.io/docs/#localized-formats
DATE = "LL"
HOUR = "LTS"
ALL = "LLL"
# Not ISO 8601 compliant but common
ALL_DIGITAL = "YYYY-MM-DD HH:mm:ss"
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
import rich_click as click
import g1_monetary_license as gml
def license_approval(currency: str) -> None:
if currency != "g1":
return
if click.confirm(
"You will be asked to approve Ğ1 license. Would you like to display it?",
):
g1ml = G1MonetaryLicense()
g1ml.display_license()
click.confirm("Do you approve Ğ1 license?", abort=True)
@click.command("license", help="Display Ğ1 monetary license")
def license_command() -> None:
g1ml = G1MonetaryLicense()
g1ml.display_license()
class G1MonetaryLicense:
def __init__(self):
self.licenses_dir_path = gml.__path__.__dict__["_path"][0]
self._available_languages()
def display_license(self) -> None:
"""
Determine available languages
Ask to select a language code
Display license in the terminal
"""
selected_language_code = self.language_prompt()
license_path = self.get_license_path(selected_language_code)
click.echo_via_pager(license_path.read_text(encoding="utf-8"))
def language_prompt(self) -> str:
return click.prompt(
"In which language would you like to display Ğ1 monetary license?",
type=click.Choice(self.languages_codes),
show_choices=True,
show_default=True,
default="en",
)
def _available_languages(self) -> None:
"""
Handle long language codes ie: 'fr-FR'
"""
self.languages_codes = []
licenses_path = sorted(Path(self.licenses_dir_path).glob(file_name("*")))
for license_path in licenses_path:
language_code = license_path.stem[-2:]
if language_code.isupper():
language_code = license_path.stem[-5:]
self.languages_codes.append(language_code)
def get_license_path(self, language_code: str) -> Path:
return Path(self.licenses_dir_path, file_name(language_code))
def file_name(language_code: str) -> str:
return f"g1_monetary_license_{language_code}.rst"
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional
import rich_click as click
from silkaj import tools, tui
from silkaj.auth import auth_method
from silkaj.blockchain.tools import get_head_block
from silkaj.money import tools as m_tools
from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
from silkaj.wot import tools as wt
@click.command(
"balance",
help="Wallet·s balance·s. Multiple public keys can be passed, then a sum is computed. Also works with the authentication.",
)
@click.argument("pubkeys", nargs=-1)
@click.pass_context
def balance_cmd(ctx: click.Context, pubkeys: str) -> None:
if not tools.has_account_defined(exit_error=False):
# check input pubkeys
if not pubkeys:
ctx.fail("You should specify one or many pubkeys")
pubkeys_list = []
wrong_pubkeys = False
for input_pubkey in pubkeys:
checked_pubkey = is_pubkey_and_check(input_pubkey)
if checked_pubkey:
pubkey = str(checked_pubkey)
else:
pubkey = input_pubkey
wrong_pubkeys = True
click.echo(f"ERROR: pubkey {pubkey} has a wrong format")
if pubkey in pubkeys_list:
ctx.fail(
f"Pubkey {gen_pubkey_checksum(pubkey)} was specified many times",
)
pubkeys_list.append(pubkey)
if wrong_pubkeys:
ctx.fail("Please check the pubkeys format reported above.")
total = [0, 0]
for pubkey in pubkeys_list:
inputs_balance = m_tools.get_amount_from_pubkey(pubkey)
show_amount_from_pubkey(pubkey, inputs_balance)
total[0] += inputs_balance[0]
total[1] += inputs_balance[1]
if len(pubkeys_list) > 1:
show_amount_from_pubkey("Total", total)
else:
key = auth_method()
pubkey = key.pubkey
show_amount_from_pubkey(pubkey, m_tools.get_amount_from_pubkey(pubkey))
def show_amount_from_pubkey(label: str, inputs_balance: list[int]) -> None:
"""
Shows the balance of a pubkey.
`label` can be either a pubkey or "Total".
"""
totalAmountInput = inputs_balance[0]
balance = inputs_balance[1]
currency_symbol = tools.get_currency_symbol()
ud_value = m_tools.get_ud_value()
average = get_average()
member = None
# if `pubkey` is a pubkey, get pubkey:checksum and uid
if label != "Total":
member = wt.is_member(label)
label = gen_pubkey_checksum(label)
# display balance table
display = []
display.append(["Balance of pubkey", label])
if member:
display.append(["User identifier", member["uid"]])
if totalAmountInput - balance != 0:
m_tools.display_amount(
display,
"Blockchain",
balance,
ud_value,
currency_symbol,
)
m_tools.display_amount(
display,
"Pending transaction",
(totalAmountInput - balance),
ud_value,
currency_symbol,
)
m_tools.display_amount(
display,
"Total balance",
totalAmountInput,
ud_value,
currency_symbol,
)
if average:
display.append(
[
"Total relative to M/N",
f"{round(totalAmountInput / average, 2)} x M/N",
],
)
table = tui.Table()
table.fill_rows(display)
click.echo(table.draw())
def get_average() -> Optional[int]:
head = get_head_block()
try:
return head["monetaryMass"] / head["membersCount"]
except ZeroDivisionError:
print("The currency reached zero members")
return None
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import csv
from operator import eq, itemgetter, ne, neg
from pathlib import Path
from typing import Any, Optional
from urllib.error import HTTPError
import pendulum
import rich_click as click
from duniterpy.api.bma.tx import history
from duniterpy.api.client import Client
from duniterpy.documents.transaction import OutputSource, Transaction
from duniterpy.grammars.output import Condition
from silkaj.constants import ALL, ALL_DIGITAL, CENT_MULT_TO_UNIT
from silkaj.money import tools as mt
from silkaj.network import client_instance
from silkaj.public_key import (
check_pubkey_format,
gen_pubkey_checksum,
validate_checksum,
)
from silkaj.tools import get_currency_symbol
from silkaj.tui import Table
from silkaj.wot import tools as wt
@click.command("history", help="History of wallet money movements")
@click.argument("pubkey")
@click.option("--uids", "-u", is_flag=True, help="Display identities username")
@click.option(
"--full-pubkey", "-f", is_flag=True, help="Display full-length public keys"
)
@click.option(
"--csv-file",
"--csv",
type=click.Path(exists=False, writable=True, dir_okay=False, path_type=Path),
help="Write in specified file name in CSV (Comma-separated values) format the history of money movements",
)
def transaction_history(
pubkey: str,
uids: bool,
full_pubkey: bool,
csv_file: Optional[Path],
) -> None:
if csv_file:
full_pubkey = True
if check_pubkey_format(pubkey):
pubkey = validate_checksum(pubkey)
client = client_instance()
ud_value = mt.get_ud_value()
currency_symbol = get_currency_symbol()
received_txs, sent_txs = [], [] # type: list[Transaction], list[Transaction]
get_transactions_history(client, pubkey, received_txs, sent_txs)
remove_duplicate_txs(received_txs, sent_txs)
txs_list = generate_txs_list(
received_txs,
sent_txs,
pubkey,
ud_value,
currency_symbol,
uids,
full_pubkey,
)
table_headers = [
"Date",
"Issuers/Recipients",
f"Amounts {currency_symbol}",
f"Amounts UD{currency_symbol}",
"Reference",
]
if csv_file:
if csv_file.is_file():
click.confirm(f"{csv_file} exists, would you like to erase it?", abort=True)
txs_list.insert(0, table_headers)
with csv_file.open("w", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(txs_list)
click.echo(f"{csv_file} file successfully saved!")
else:
table = Table()
table.fill_rows(txs_list, table_headers)
header = generate_header(pubkey, currency_symbol, ud_value)
click.echo_via_pager(header + table.draw())
def generate_header(pubkey: str, currency_symbol: str, ud_value: int) -> str:
try:
idty = wt.identity_of(pubkey)
except HTTPError:
idty = {"uid": ""}
balance = mt.get_amount_from_pubkey(pubkey)
balance_ud = round(balance[1] / ud_value, 2)
date = pendulum.now().format(ALL)
return f"Transactions history from: {idty['uid']} {gen_pubkey_checksum(pubkey)}\n\
Current balance: {balance[1] / CENT_MULT_TO_UNIT} {currency_symbol}, {balance_ud} UD {currency_symbol} on {date}\n"
def get_transactions_history(
client: Client,
pubkey: str,
received_txs: list,
sent_txs: list,
) -> None:
"""
Get transaction history
Store txs in Transaction object
"""
tx_history = client(history, pubkey)
currency = tx_history["currency"]
for received in tx_history["history"]["received"]:
received_txs.append(Transaction.from_bma_history(received, currency))
for sent in tx_history["history"]["sent"]:
sent_txs.append(Transaction.from_bma_history(sent, currency))
def remove_duplicate_txs(received_txs: list, sent_txs: list) -> None:
"""
Remove duplicate transactions from history
Remove received tx which contains output back return
that we don't want to displayed
A copy of received_txs is necessary to remove elements
"""
for received_tx in list(received_txs):
if received_tx in sent_txs:
received_txs.remove(received_tx)
def generate_txs_list(
received_txs: list[Transaction],
sent_txs: list[Transaction],
pubkey: str,
ud_value: int,
currency_symbol: str,
uids: bool,
full_pubkey: bool,
) -> list:
"""
Generate information in a list of lists for texttable
Merge received and sent txs
Sort txs temporarily
"""
received_txs_list, sent_txs_list = (
[],
[],
) # type: list[Transaction], list[Transaction]
parse_received_tx(
received_txs_list,
received_txs,
pubkey,
ud_value,
uids,
full_pubkey,
)
parse_sent_tx(sent_txs_list, sent_txs, pubkey, ud_value, uids, full_pubkey)
txs_list = received_txs_list + sent_txs_list
txs_list.sort(key=itemgetter(0), reverse=True)
return txs_list
def parse_received_tx(
received_txs_table: list[Transaction],
received_txs: list[Transaction],
pubkey: str,
ud_value: int,
uids: bool,
full_pubkey: bool,
) -> None:
"""
Extract issuers` pubkeys
Get identities from pubkeys
Convert time into human format
Assign identities
Get amounts and assign amounts and amounts_ud
Append reference/comment
"""
issuers = []
for received_tx in received_txs:
for issuer in received_tx.issuers:
issuers.append(issuer)
identities = wt.identities_from_pubkeys(issuers, uids)
for received_tx in received_txs:
tx_list = []
tx_list.append(
pendulum.from_timestamp(received_tx.time, tz="local").format(ALL_DIGITAL)
)
tx_list.append("")
for i, issuer in enumerate(received_tx.issuers):
tx_list[1] += prefix(None, None, i) + assign_idty_from_pubkey(
issuer,
identities,
full_pubkey,
)
amounts = tx_amount(received_tx, pubkey, received_func)[0]
tx_list.append(amounts / CENT_MULT_TO_UNIT)
tx_list.append(round(amounts / ud_value, 2))
tx_list.append(received_tx.comment)
received_txs_table.append(tx_list)
def parse_sent_tx(
sent_txs_table: list[Transaction],
sent_txs: list[Transaction],
pubkey: str,
ud_value: int,
uids: bool,
full_pubkey: bool,
) -> None:
"""
Extract recipients` pubkeys from outputs
Get identities from pubkeys
Convert time into human format
Store "Total" and total amounts according to the number of outputs
If not output back return:
Assign amounts, amounts_ud, identities, and comment
"""
pubkeys = []
for sent_tx in sent_txs:
outputs = tx_amount(sent_tx, pubkey, sent_func)[1]
for output in outputs:
if output_available(output.condition, ne, pubkey):
pubkeys.append(output.condition.left.pubkey)
identities = wt.identities_from_pubkeys(pubkeys, uids)
for sent_tx in sent_txs:
tx_list = []
tx_list.append(
pendulum.from_timestamp(sent_tx.time, tz="local").format(ALL_DIGITAL)
)
total_amount, outputs = tx_amount(sent_tx, pubkey, sent_func)
if len(outputs) > 1:
tx_list.append("Total")
amounts = str(total_amount / CENT_MULT_TO_UNIT)
amounts_ud = str(round(total_amount / ud_value, 2))
else:
tx_list.append("")
amounts = ""
amounts_ud = ""
for i, output in enumerate(outputs):
if output_available(output.condition, ne, pubkey):
amounts += prefix(None, outputs, i) + str(
neg(mt.amount_in_current_base(output)) / CENT_MULT_TO_UNIT,
)
amounts_ud += prefix(None, outputs, i) + str(
round(neg(mt.amount_in_current_base(output)) / ud_value, 2),
)
tx_list[1] += prefix(tx_list[1], outputs, 0) + assign_idty_from_pubkey(
output.condition.left.pubkey,
identities,
full_pubkey,
)
tx_list.append(amounts)
tx_list.append(amounts_ud)
tx_list.append(sent_tx.comment)
sent_txs_table.append(tx_list)
def tx_amount(
tx: list[Transaction],
pubkey: str,
function: Any,
) -> tuple[int, list[OutputSource]]:
"""
Determine transaction amount from output sources
"""
amount = 0
outputs = []
for output in tx.outputs: # type: ignore[attr-defined]
if output_available(output.condition, ne, pubkey):
outputs.append(output)
amount += function(output, pubkey)
return amount, outputs
def received_func(output: OutputSource, pubkey: str) -> int:
if output_available(output.condition, eq, pubkey):
return mt.amount_in_current_base(output)
return 0
def sent_func(output: OutputSource, pubkey: str) -> int:
if output_available(output.condition, ne, pubkey):
return neg(mt.amount_in_current_base(output))
return 0
def output_available(condition: Condition, comparison: Any, value: str) -> bool:
"""
Check if output source is available
Currently only handle simple SIG condition
XHX, CLTV, CSV should be handled when present in the blockchain
"""
if hasattr(condition.left, "pubkey"):
return comparison(condition.left.pubkey, value)
return False
def assign_idty_from_pubkey(pubkey: str, identities: list, full_pubkey: bool) -> str:
idty = gen_pubkey_checksum(pubkey, short=not full_pubkey)
for identity in identities:
if pubkey == identity["pubkey"]:
pubkey_mod = gen_pubkey_checksum(pubkey, short=not full_pubkey)
idty = f"{identity['uid']} - {pubkey_mod}"
return idty
def prefix(
tx_addresses: Optional[str],
outputs: Optional[list[OutputSource]],
occurence: int,
) -> str:
"""
Pretty print with texttable
Break line when several values in a cell
Received tx case, 'outputs' is not defined, then add a breakline
between the pubkeys except for the first occurence for multi-sig support
Sent tx case, handle "Total" line in case of multi-output txs
In case of multiple outputs, there is a "Total" on the top,
where there must be a breakline
"""
if not outputs:
return "\n" if occurence > 0 else ""
if tx_addresses == "Total":
return "\n"
return "\n" if len(outputs) > 1 else ""
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import functools
from typing import Union
from duniterpy.api import bma
from duniterpy.documents.transaction import InputSource, OutputSource
from silkaj.constants import CENT_MULT_TO_UNIT
from silkaj.network import client_instance
from silkaj.public_key import gen_pubkey_checksum
from silkaj.wot import tools as wt
def display_amount(
tx: list,
message: str,
amount: float,
ud_value: float,
currency_symbol: str,
) -> None:
"""
Displays an amount in unit and relative reference.
"""
UD_amount = str(round((amount / ud_value), 2))
unit_amount = str(amount / CENT_MULT_TO_UNIT)
tx.append(
[
f"{message} (unit|relative)",
f"{unit_amount} {currency_symbol} | {UD_amount} UD {currency_symbol}",
],
)
def display_pubkey(tx: list, message: str, pubkey: str) -> None:
"""
Displays a pubkey and the eventually associated identity
"""
tx.append([f"{message} (pubkey:checksum)", gen_pubkey_checksum(pubkey)])
idty = wt.is_member(pubkey)
if idty:
tx.append([f"{message} (id)", idty["uid"]])
def get_amount_from_pubkey(pubkey: str) -> list[int]:
listinput, amount = get_sources(pubkey)
totalAmountInput = 0
for _input in listinput:
totalAmountInput += amount_in_current_base(_input)
return [totalAmountInput, amount]
def get_sources(pubkey: str) -> tuple[list[InputSource], int]:
client = client_instance()
# Sources written into the blockchain
sources = client(bma.tx.sources, pubkey)
listinput = []
amount = 0
for source in sources["sources"]:
if source["conditions"] == f"SIG({pubkey})":
listinput.append(
InputSource(
amount=source["amount"],
base=source["base"],
source=source["type"],
origin_id=source["identifier"],
index=source["noffset"],
),
)
amount += amount_in_current_base(listinput[-1])
# pending source
history = (client(bma.tx.pending, pubkey))["history"]
pendings = history["sending"] + history["pending"]
# add pending output
pending_sources = []
for pending in pendings:
for i, output in enumerate(pending["outputs"]):
# duniterpy#80
outputsplited = output.split(":")
if outputsplited[2] == f"SIG({pubkey})":
inputgenerated = InputSource(
amount=int(outputsplited[0]),
base=int(outputsplited[1]),
source="T",
origin_id=pending["hash"],
index=i,
)
if inputgenerated not in listinput:
# add pendings before blockchain sources for change txs
listinput.insert(0, inputgenerated)
for _input in pending["inputs"]:
pending_sources.append(InputSource.from_inline(_input))
# remove input already used
for _input in pending_sources:
if _input in listinput:
listinput.remove(_input)
return listinput, amount
@functools.lru_cache(maxsize=1)
def get_ud_value() -> int:
client = client_instance()
blockswithud = client(bma.blockchain.ud)
NBlastUDblock = blockswithud["result"]["blocks"][-1]
lastUDblock = client(bma.blockchain.block, NBlastUDblock)
return lastUDblock["dividend"] * 10 ** lastUDblock["unitbase"]
def amount_in_current_base(source: Union[InputSource, OutputSource]) -> int:
"""
Get amount in current base from input or output source
"""
return source.amount * 10**source.base
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import math
import re
import shlex
import time
from pathlib import Path
from typing import Optional
import rich_click as click
from duniterpy.api.bma.tx import process
from duniterpy.documents import (
BlockID,
InputSource,
OutputSource,
SIGParameter,
Transaction,
Unlock,
)
from duniterpy.key import SigningKey
from silkaj import auth, network, public_key, tools, tui
from silkaj.blockchain import tools as bc_tools
from silkaj.constants import (
BMA_SLEEP,
CENT_MULT_TO_UNIT,
MINIMAL_ABSOLUTE_TX_AMOUNT,
MINIMAL_RELATIVE_TX_AMOUNT,
)
from silkaj.money import tools as m_tools
from silkaj.public_key import gen_pubkey_checksum
MAX_REFERENCE_LENGTH = 255
# max size for tx doc is 100 lines.
# Formula for accepted field numbers is:
# (2 * IU + 2 * IS + OUT) <= ( MAX_LINES_IN_TX_DOC - FIX_LINES)
# with IU = inputs/unlocks ; IS = Issuers/Signatures ; OUT = Outpouts.
MAX_LINES_IN_TX_DOC = 100
# 2 lines are necessary, and we block 1 more for the reference/comment
FIX_LINES = 3
# assuming there is only 1 issuer and 2 outputs, max inputs is 46
MAX_INPUTS_PER_TX = 46
# assuming there is 1 issuer and 1 input, max outputs is 93.
MAX_OUTPUTS = 93
# for now, silkaj handles txs for one issuer only
NBR_ISSUERS = 1
@click.command("transfer", help="Transfer money")
@click.option(
"amounts",
"--amount",
"-a",
multiple=True,
type=click.FloatRange(MINIMAL_ABSOLUTE_TX_AMOUNT),
help="Quantitative amount(s).",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["amountsud", "allsources", "file_path"],
)
@click.option(
"amountsud",
"--amountUD",
"-d",
multiple=True,
type=click.FloatRange(MINIMAL_RELATIVE_TX_AMOUNT),
help="Relative amount(s).",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["amounts", "allsources", "file_path"],
)
@click.option(
"--allSources",
is_flag=True,
help="Send all sources to one recipient.",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["amounts", "amountsud", "file_path"],
)
@click.option(
"recipients",
"--recipient",
"-r",
multiple=True,
help="Public key(s)' recipients + optional checksum: `<pubkey>[:checksum]`. \
Sending to multiple recipients is possible. \
With one amount specified, all recipients will receive the same amount. \
With one amount specified per recipient, recipient 1 will recieve amount 1, and so on.",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["file_path"],
)
@click.option(
"file_path",
"--file",
"-f",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="File's path containing a list of amounts in absolute or \
relative reference and recipients' pubkeys",
cls=tools.MutuallyExclusiveOption,
mutually_exclusive=["recipients", "amounts", "amountsUD", "allsources"],
)
@click.option("--reference", "-ref", default="", help="Transfer reference")
@click.option(
"--outputBackChange",
help="Pubkey recipient to send the rest of the transaction: `<pubkey[:checksum]>`",
)
@click.option(
"--yes",
"-y",
is_flag=True,
help="Assume yes. Do not prompt confirmation",
)
def transfer_money(
amounts: list[float],
amountsud: list[float],
allsources: bool,
recipients: list[str],
file_path: Path,
reference: str,
outputbackchange: str,
yes: bool,
) -> None:
if file_path:
tx_amounts, recipients = parse_file_containing_amounts_recipients(file_path)
else:
if not (amounts or amountsud or allsources):
tools.message_exit("Error: amount, amountUD or allSources is not set.")
if not recipients:
tools.message_exit("Error: A recipient should be passed")
if allsources and len(recipients) > 1:
tools.message_exit(
"Error: the --allSources option can only be used with one recipient.",
)
# compute amounts and amountsud
if not allsources:
tx_amounts = transaction_amount(amounts, amountsud, recipients)
key = auth.auth_method()
issuer_pubkey = key.pubkey
pubkey_amount = m_tools.get_amount_from_pubkey(issuer_pubkey)
if allsources:
if pubkey_amount[0] <= 0:
tools.message_exit(
f"Error: Issuer pubkey {gen_pubkey_checksum(issuer_pubkey)} is empty. \
No transaction sent.",
)
tx_amounts = [pubkey_amount[0]]
recipients = list(recipients)
outputbackchange = check_transaction_values(
reference,
recipients,
outputbackchange,
pubkey_amount[0] < sum(tx_amounts),
issuer_pubkey,
)
if not yes:
table = tui.Table()
table.fill_rows(
gen_confirmation_table(
issuer_pubkey,
pubkey_amount[0],
tx_amounts,
recipients,
outputbackchange,
reference,
),
)
confirmation_table = table.draw()
if yes or click.confirm(
f"{confirmation_table}\nDo you confirm sending this transaction?",
):
handle_intermediaries_transactions(
key,
issuer_pubkey,
tx_amounts,
recipients,
reference,
outputbackchange,
)
def parse_file_containing_amounts_recipients(
file_path: Path,
) -> tuple[list[int], list[str]]:
"""
Parse file in a specific format
Comments are ignored
Format should be:
```txt
[ABSOLUTE/RELATIVE]
# comment1
amount1 recipient1's pubkey
# comment2
amount2 recipient2's pubkey
```
"""
reference = ""
amounts, recipients = [], []
with file_path.open(encoding="utf-8") as file:
for n, raw_line in enumerate(file):
line = shlex.split(raw_line, True)
if line:
if n == 0:
reference = line[0]
else:
try:
amounts.append(float(line[0]))
recipients.append(line[1])
except (ValueError, IndexError):
tools.message_exit(f"Syntax error at line {n + 1}")
if not reference or reference not in ("ABSOLUTE", "RELATIVE"):
tools.message_exit(
f"{file_path} must contain at first line 'ABSOLUTE' or 'RELATIVE' header",
)
if not amounts or not recipients:
tools.message_exit("No amounts or recipients specified")
# Compute amount depending on the reference
reference_mult = (
CENT_MULT_TO_UNIT if reference == "ABSOLUTE" else m_tools.get_ud_value()
)
tx_amounts = compute_amounts(amounts, reference_mult)
return tx_amounts, recipients
def transaction_amount(
amounts: list[float],
UDs_amounts: list[float],
outputAddresses: list[str],
) -> list[int]:
"""
Check that the number of passed amounts(UD) and recipients are the same
Returns a list of amounts.
"""
# Create amounts list
if amounts:
amounts_list = compute_amounts(amounts, CENT_MULT_TO_UNIT)
elif UDs_amounts:
UD_value = m_tools.get_ud_value()
amounts_list = compute_amounts(UDs_amounts, UD_value)
if len(amounts_list) != len(outputAddresses) and len(amounts_list) != 1:
tools.message_exit(
"Error: The number of passed recipients is not the same as the passed amounts.",
)
# In case one amount is passed with multiple recipients
# generate list containing multiple time the same amount
if len(amounts_list) == 1 and len(outputAddresses) > 1:
return [amounts_list[0]] * len(outputAddresses)
return amounts_list
def compute_amounts(amounts: list[float], multiplicator: float) -> list[int]:
"""
Computes the amounts(UD) and returns a list.
Multiplicator should be either CENT_MULT_TO_UNIT or UD_Value.
If relative amount, check that amount is superior to minimal amount.
"""
# Create amounts list
amounts_list = []
for amount in amounts:
computed_amount = amount * multiplicator
# check if relative amounts are high enough
if (multiplicator != CENT_MULT_TO_UNIT) and (
computed_amount < (MINIMAL_ABSOLUTE_TX_AMOUNT * CENT_MULT_TO_UNIT)
):
tools.message_exit(f"Error: amount {amount} is too low.")
amounts_list.append(round(computed_amount))
return amounts_list
def check_transaction_values(
reference: str,
outputAddresses: list[str],
outputBackChange: str,
enough_source: bool,
issuer_pubkey: str,
) -> str:
"""
Check reference format
Check the pubkeys and the checksums of the recipients and the outputbackchange
In case of a valid checksum, assign and return the pubkey without the checksum
Check the balance is big enough for the transaction
"""
check_reference(reference)
# we check output numbers and leave one line for the backchange.
if len(outputAddresses) > (MAX_OUTPUTS - 1):
tools.message_exit(
f"Error : there should be less than {MAX_OUTPUTS - 1} outputs.",
)
for i, outputAddress in enumerate(outputAddresses):
if public_key.check_pubkey_format(outputAddress):
outputAddresses[i] = public_key.validate_checksum(outputAddress)
if outputBackChange and public_key.check_pubkey_format(outputBackChange):
outputBackChange = public_key.validate_checksum(outputBackChange)
if enough_source:
pubkey = gen_pubkey_checksum(issuer_pubkey)
tools.message_exit(
f"{pubkey} pubkey doesn't have enough money for this transaction.",
)
return outputBackChange
def gen_confirmation_table(
issuer_pubkey: str,
pubkey_amount: int,
tx_amounts: list[int],
outputAddresses: list[str],
outputBackChange: str,
reference: str,
) -> list[list]:
"""
Generate transaction confirmation
"""
currency_symbol = tools.get_currency_symbol()
ud_value = m_tools.get_ud_value()
total_tx_amount = sum(tx_amounts)
tx = [] # type: list[list[str]]
# display account situation
m_tools.display_amount(
tx,
"Initial balance",
pubkey_amount,
ud_value,
currency_symbol,
)
m_tools.display_amount(
tx,
"Total transaction amount",
total_tx_amount,
ud_value,
currency_symbol,
)
m_tools.display_amount(
tx,
"Balance after transaction",
(pubkey_amount - total_tx_amount),
ud_value,
currency_symbol,
)
m_tools.display_pubkey(tx, "From", issuer_pubkey)
# display outputs and amounts
for outputAddress, tx_amount in zip(outputAddresses, tx_amounts):
m_tools.display_pubkey(tx, "To", outputAddress)
time.sleep(BMA_SLEEP)
m_tools.display_amount(tx, "Amount", tx_amount, ud_value, currency_symbol)
# display last informations
if outputBackChange:
m_tools.display_pubkey(tx, "Backchange", outputBackChange)
tx.append(["Reference", reference])
return tx
def get_list_input_for_transaction(
pubkey: str,
TXamount: int,
outputs_number: int,
) -> tuple[list[InputSource], int, bool]:
listinput = m_tools.get_sources(pubkey)[0]
maxInputsNumber = max_inputs_number(outputs_number, NBR_ISSUERS)
# generate final list source
listinputfinal = []
totalAmountInput = 0
intermediatetransaction = False
for nbr_inputs, _input in enumerate(listinput, start=1):
listinputfinal.append(_input)
totalAmountInput += m_tools.amount_in_current_base(_input)
TXamount -= m_tools.amount_in_current_base(_input)
# if too much sources, it's an intermediate transaction.
amount_not_reached_and_max_doc_size_reached = (
TXamount > 0 and nbr_inputs >= MAX_INPUTS_PER_TX
)
amount_reached_too_much_inputs = TXamount <= 0 and maxInputsNumber < nbr_inputs
if (
amount_not_reached_and_max_doc_size_reached
or amount_reached_too_much_inputs
):
intermediatetransaction = True
# if we reach the MAX_INPUTX_PER_TX limit, we send the interm.tx
# if we gather the good amount, we send the tx :
# - either this is no int.tx, and the tx is sent to the receiver,
# - or the int.tx it is sent to the issuer before sent to the receiver.
if nbr_inputs >= MAX_INPUTS_PER_TX or TXamount <= 0:
break
if TXamount > 0 and not intermediatetransaction:
tools.message_exit("Error: you don't have enough money")
return listinputfinal, totalAmountInput, intermediatetransaction
def handle_intermediaries_transactions(
key: SigningKey,
issuers: str,
tx_amounts: list[int],
outputAddresses: list[str],
reference: str = "",
OutputbackChange: Optional[str] = None,
) -> None:
while True:
# consider there is always one backchange output, hence +1
listinput_and_amount = get_list_input_for_transaction(
issuers,
sum(tx_amounts),
len(outputAddresses) + 1,
)
intermediatetransaction = listinput_and_amount[2]
if intermediatetransaction:
totalAmountInput = listinput_and_amount[1]
generate_and_send_transaction(
key,
issuers,
[totalAmountInput],
listinput_and_amount,
[issuers],
"Change operation",
)
else:
generate_and_send_transaction(
key,
issuers,
tx_amounts,
listinput_and_amount,
outputAddresses,
reference,
OutputbackChange,
)
break
def max_inputs_number(outputs_number: int, issuers_number: int) -> int:
"""
returns the maximum number of inputs.
This function does not take care of backchange line.
formula is IU <= (MAX_LINES_IN_TX_DOC - FIX_LINES - O - 2*IS)/2
"""
return int(
(MAX_LINES_IN_TX_DOC - FIX_LINES - (2 * issuers_number) - outputs_number) / 2,
)
def generate_and_send_transaction(
key: SigningKey,
issuers: str,
tx_amounts: list[int],
listinput_and_amount: tuple[list[InputSource], int, bool],
outputAddresses: list[str],
reference: str,
OutputbackChange: Optional[str] = None,
) -> None:
"""
Display sent transaction
Generate, sign, and send transaction document
"""
intermediate_tx = listinput_and_amount[2]
if intermediate_tx:
print("Generate Change Transaction")
else:
print("Generate Transaction:")
print(" - From: " + gen_pubkey_checksum(issuers))
for tx_amount, outputAddress in zip(tx_amounts, outputAddresses):
display_sent_tx(outputAddress, tx_amount)
print(" - Total: " + str(sum(tx_amounts) / CENT_MULT_TO_UNIT))
transaction = generate_transaction_document(
issuers,
tx_amounts,
listinput_and_amount,
outputAddresses,
reference,
OutputbackChange,
)
transaction.sign(key)
network.send_document(process, transaction)
def display_sent_tx(outputAddress: str, amount: int) -> None:
print(
" - To: ",
gen_pubkey_checksum(outputAddress),
"\n - Amount: ",
amount / CENT_MULT_TO_UNIT,
)
def generate_transaction_document(
issuers: str,
tx_amounts: list[int],
listinput_and_amount: tuple[list[InputSource], int, bool],
outputAddresses: list[str],
reference: str = "",
OutputbackChange: Optional[str] = None,
) -> Transaction:
listinput = listinput_and_amount[0]
totalAmountInput = listinput_and_amount[1]
total_tx_amount = sum(tx_amounts)
head_block = bc_tools.get_head_block()
if not OutputbackChange:
OutputbackChange = issuers
# If it's not a foreign exchange transaction,
# we remove units after two digits after the decimal point
if issuers not in outputAddresses:
total_tx_amount = (
total_tx_amount // 10 ** head_block["unitbase"]
) * 10 ** head_block["unitbase"]
# Generate output
################
listoutput = [] # type: list[OutputSource]
for tx_amount, outputAddress in zip(tx_amounts, outputAddresses):
generate_output(listoutput, head_block["unitbase"], tx_amount, outputAddress)
# Outputs to himself
rest = totalAmountInput - total_tx_amount
generate_output(listoutput, head_block["unitbase"], rest, OutputbackChange)
# Unlocks
unlocks = generate_unlocks(listinput)
# Generate transaction document
##############################
return Transaction(
block_id=BlockID(head_block["number"], head_block["hash"]),
locktime=0,
issuers=[issuers],
inputs=listinput,
unlocks=unlocks,
outputs=listoutput,
comment=reference,
currency=head_block["currency"],
)
def generate_unlocks(listinput: list[InputSource]) -> list[Unlock]:
unlocks = []
for i in range(len(listinput)):
unlocks.append(Unlock(index=i, parameters=[SIGParameter(0)]))
return unlocks
def generate_output(
listoutput: list[OutputSource],
unitbase: int,
rest: int,
recipient_address: str,
) -> None:
while rest > 0:
outputAmount = truncBase(rest, unitbase)
rest -= outputAmount
if outputAmount > 0:
outputAmount = int(outputAmount / math.pow(10, unitbase))
listoutput.append(
OutputSource(
amount=outputAmount,
base=unitbase,
condition=f"SIG({recipient_address})",
),
)
unitbase = unitbase - 1
def check_reference(reference: str) -> None:
if len(reference) > MAX_REFERENCE_LENGTH:
tools.message_exit("Error: Transfer reference is too long")
regex = re.compile(
"^[0-9a-zA-Z\\ \\-\\_\\:\\/\\;\\*\\[\\]\\(\\)\\?\
\\!\\^\\+\\=\\@\\&\\~\\#\\{\\}\\|\\\\<\\>\\%\\.]*$",
)
if not re.search(regex, reference):
tools.message_exit("Error: the reference format is invalid")
def truncBase(amount: int, base: int) -> int:
_pow = int(math.pow(10, base))
if amount < _pow:
return 0
return math.trunc(amount / _pow) * _pow
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import functools
import re
import sys
from typing import Any
from urllib.error import HTTPError
from duniterpy import constants as du_const
from duniterpy.api import endpoint as ep
from duniterpy.api.client import Client
from duniterpy.documents import Document
from silkaj import constants, tools
def determine_endpoint() -> ep.Endpoint:
"""
Pass custom endpoint, parse through a regex
{host|ipv4|[ipv6]}:{port}{/path}
^(?:(HOST)|(IPV4|[(IPV6)]))(?::(PORT))?(?:/(PATH))?$
If gtest flag passed, return default gtest endpoint
Else, return g1 default endpoint
"""
regex = f"^(?:(?P<host>{du_const.HOST_REGEX})|(?P<ipv4>{du_const.IPV4_REGEX})|\
(?:\\[(?P<ipv6>{du_const.IPV6_REGEX})\\]))(?::(?P<port>{du_const.PORT_REGEX}))?\
(?:/(?P<path>{du_const.PATH_REGEX}))?$"
try:
from click.globals import get_current_context
ctx = get_current_context()
endpoint = ctx.obj.get("ENDPOINT", None)
gtest = ctx.obj.get("GTEST", None)
except (ModuleNotFoundError, RuntimeError):
endpoint, gtest = None, None
if endpoint:
m = re.search(re.compile(regex), endpoint)
if not m:
tools.click_fail(
"Passed endpoint is of wrong format. Expected format: {host|ipv4|[ipv6]}:{port}{/path}",
)
return None
port = int(m["port"]) if m["port"] else 443
host, ipv4 = ep.fix_host_ipv4_mix_up(m["host"], m["ipv4"])
if port == 443:
return ep.SecuredBMAEndpoint(host, ipv4, m["ipv6"], port, m["path"])
return ep.BMAEndpoint(host, ipv4, m["ipv6"], port)
if gtest:
return ep.endpoint(constants.G1_TEST_DEFAULT_ENDPOINT)
return ep.endpoint(constants.G1_DEFAULT_ENDPOINT)
@functools.lru_cache(maxsize=1)
def client_instance():
return Client(determine_endpoint())
def send_document(bma_path: Any, document: Document) -> None:
client = client_instance()
doc_name = document.__class__.__name__
try:
client(bma_path, document.signed_raw())
print(f"{doc_name} successfully sent")
except HTTPError as error:
print(error)
tools.click_fail(f"Error while publishing {doc_name.lower()}")
def exit_on_http_error(error: HTTPError, err_code: int, message: str) -> None:
"""
Nicely displays a message on an expected error code.
Else, displays the HTTP error message.
"""
if error.code == err_code:
tools.click_fail(message)
print(error)
sys.exit(constants.FAILURE_EXIT_STATUS)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import hashlib
import re
from typing import Any, Optional, Union
import base58
from silkaj.constants import PUBKEY_PATTERN, SHORT_PUBKEY_SIZE
from silkaj.tools import message_exit
PUBKEY_DELIMITED_PATTERN = f"^{PUBKEY_PATTERN}$"
CHECKSUM_SIZE = 3
CHECKSUM_PATTERN = f"[1-9A-HJ-NP-Za-km-z]{{{CHECKSUM_SIZE}}}"
PUBKEY_CHECKSUM_PATTERN = f"^{PUBKEY_PATTERN}:{CHECKSUM_PATTERN}$"
def is_pubkey_and_check(pubkey: str) -> Union[str, bool]:
"""
Checks if the given argument contains a pubkey.
If so, verifies the checksum if needed and returns the pubkey.
Exits if the checksum is wrong.
Else, return False
"""
if re.search(re.compile(PUBKEY_PATTERN), pubkey):
if check_pubkey_format(pubkey, True):
return validate_checksum(pubkey)
return pubkey
return False
def check_pubkey_format(pubkey: str, display_error: bool = True) -> Optional[bool]:
"""
Checks if a pubkey has a checksum.
Exits if the pubkey is invalid.
"""
if re.search(re.compile(PUBKEY_DELIMITED_PATTERN), pubkey):
return False
if re.search(re.compile(PUBKEY_CHECKSUM_PATTERN), pubkey):
return True
if display_error:
message_exit(f"Error: bad format for following public key: {pubkey}")
return None
def validate_checksum(pubkey_checksum: str) -> Any:
"""
Check pubkey checksum after the pubkey, delimited by ":".
If check pass: return pubkey
Else: exit.
"""
pubkey, checksum = pubkey_checksum.split(":")
if checksum == gen_checksum(pubkey):
return pubkey
message_exit(
f"Error: public key '{pubkey}' does not match checksum '{checksum}'.\n\
Please verify the public key.",
)
return None
def gen_checksum(pubkey: str) -> str:
"""
Returns the checksum of the input pubkey (encoded in b58)
"""
pubkey_byte = base58.b58decode(pubkey)
_hash = hashlib.sha256(hashlib.sha256(pubkey_byte).digest()).digest()
return str(base58.b58encode(_hash)[:3].decode("utf-8"))
def gen_pubkey_checksum(
pubkey: str,
short: Optional[bool] = False,
length: Optional[int] = SHORT_PUBKEY_SIZE,
) -> str:
"""
Returns "<pubkey>:<checksum>" in full form.
returns `length` first chars of pubkey and checksum in short form.
`length` defaults to SHORT_PUBKEY_SIZE.
"""
short_pubkey = f"{pubkey[:length]}" if short else pubkey
return f"{short_pubkey}:{gen_checksum(pubkey)}"
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import functools
import sys
from typing import Any, Union
import rich_click as click
from silkaj.blockchain.tools import get_blockchain_parameters
from silkaj.constants import FAILURE_EXIT_STATUS, G1_SYMBOL, GTEST_SYMBOL
@functools.lru_cache(maxsize=1)
def get_currency_symbol() -> str:
params = get_blockchain_parameters()
if params["currency"] == "g1":
return G1_SYMBOL
return GTEST_SYMBOL
@click.pass_context
def has_account_defined(
ctx: click.Context,
exit_error: bool = True,
) -> Union[bool, str]:
if not (account_name := ctx.obj["ACCOUNT_NAME"]):
if exit_error:
click_fail("--account general option should be specified")
return False
return account_name
def message_exit(message: str) -> None:
print(message)
sys.exit(FAILURE_EXIT_STATUS)
@click.pass_context
def click_fail(context: click.Context, message: str) -> None:
context.fail(message)
class MutuallyExclusiveOption(click.Option):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
_help = kwargs.get("help", "")
if self.mutually_exclusive:
ex_str = ", ".join(self.mutually_exclusive)
kwargs["help"] = (
f"{_help} NOTE: This argument is mutually exclusive with arguments: [{ex_str}]."
)
super().__init__(*args, **kwargs)
def handle_parse_result(self, ctx: click.Context, opts: Any, args: Any) -> Any:
if self.mutually_exclusive.intersection(opts) and self.name in opts:
arguments = ", ".join(self.mutually_exclusive)
raise click.UsageError(
message=f"Usage: `{self.name}` is mutually exclusive with arguments `{arguments}`.",
)
return super().handle_parse_result(ctx, opts, args)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import shutil
import sys
from typing import Optional
import rich_click as click
from texttable import Texttable
from silkaj import constants
VERT_TABLE_CHARS = ["", "", "", ""]
def send_doc_confirmation(document_name: str) -> None:
if not click.confirm(f"Do you confirm sending this {document_name}?"):
sys.exit(constants.SUCCESS_EXIT_STATUS)
class Table(Texttable):
def __init__(
self,
style="default",
):
super().__init__(max_width=shutil.get_terminal_size().columns)
if style == "columns":
self.set_deco(self.HEADER | self.VLINES | self.BORDER)
self.set_chars(VERT_TABLE_CHARS)
def fill_rows(self, rows: list[list], header: Optional[list] = None) -> None:
"""
Fills a table from header and rows list.
`rows` is a list of lists representing each row content.
each element of `rows` and header must be of same length.
"""
if header:
if len(rows) == 0:
rows.append([""] * len(header))
assert len(header) == len(rows[0])
self.header(header)
for line in rows:
assert len(line) == len(rows[0])
self.add_row(line)
def fill_from_dict(self, _dict: dict) -> None:
"""
Given a dict where each value represents a column,
fill a table where labels are dict keys and columns are dict values
This function stops on the first line with only empty cells
"""
keys = list(_dict.keys())
rows = []
n = 0
while True:
row = []
empty_cells_number = 0
for key in keys:
try:
row.append(_dict[key][n])
except IndexError:
row.append("")
empty_cells_number += 1
# break on first empty row
if empty_cells_number == len(keys):
break
rows.append(row)
n += 1
return self.fill_rows(rows, keys)
def fill_from_dict_list(self, dict_list: list[dict]) -> None:
"""
Given a list of dict with same keys,
fills the table with keys as header
"""
header = list(dict_list[0].keys())
content = []
for _dict in dict_list:
assert list(_dict.keys()) == header
line = []
for head in header:
line.append(_dict[head])
content.append(line)
return self.fill_rows(content, header)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import sys
import pendulum
import rich_click as click
from duniterpy.api import bma
from duniterpy.api.client import Client
from duniterpy.documents import Block, BlockID, Certification, Identity, get_block_id
from duniterpy.key import SigningKey
from silkaj import tui
from silkaj.auth import auth_method
from silkaj.blockchain import tools as bc_tools
from silkaj.constants import ALL, DATE
from silkaj.g1_monetary_license import license_approval
from silkaj.network import client_instance, send_document
from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
from silkaj.wot import tools as wot_tools
@click.command("certify", help="Certify identity")
@click.argument("uid_pubkey_to_certify")
@click.pass_context
def certify(ctx: click.Context, uid_pubkey_to_certify: str) -> None:
client = client_instance()
checked_pubkey = is_pubkey_and_check(uid_pubkey_to_certify)
if checked_pubkey:
uid_pubkey_to_certify = str(checked_pubkey)
idty_to_certify, pubkey_to_certify, send_certs = wot_tools.choose_identity(
uid_pubkey_to_certify,
)
# Authentication
key = auth_method()
issuer_pubkey = key.pubkey
issuer = pre_checks(client, issuer_pubkey, pubkey_to_certify)
# Display license and ask for confirmation
head = bc_tools.get_head_block()
currency = head["currency"]
license_approval(currency)
# Certification confirmation
certification_confirmation(
ctx,
issuer,
issuer_pubkey,
pubkey_to_certify,
idty_to_certify,
)
# Create and sign certification document
certification = docs_generation(
currency,
pubkey_to_certify,
idty_to_certify,
issuer_pubkey,
head,
key,
)
if ctx.obj["DISPLAY_DOCUMENT"]:
click.echo(certification.signed_raw(), nl=False)
tui.send_doc_confirmation("certification")
# Send certification document
send_document(bma.wot.certify, certification)
def pre_checks(client: Client, issuer_pubkey: str, pubkey_to_certify: str) -> dict:
# Check whether current user is member
issuer = wot_tools.is_member(issuer_pubkey)
if not issuer:
sys.exit("Current identity is not member.")
if issuer_pubkey == pubkey_to_certify:
sys.exit("You can't certify yourself!")
# Check if the certification can be renewed
params = bc_tools.get_blockchain_parameters()
requirements = client(bma.wot.requirements, pubkey_to_certify, pubkey=True)
req = requirements["identities"][0] # type: dict
for cert in req["certifications"]:
if cert["from"] == issuer_pubkey:
# Ğ1: 0<->2y - 2y + 2m
# ĞT: 0<->4.8m - 4.8m + 12.5d
renewable = cert["expiresIn"] - params["sigValidity"] + params["sigReplay"]
if renewable > 0:
renewable_date = pendulum.now().add(seconds=renewable).format(DATE)
sys.exit(f"Certification renewable from {renewable_date}")
# Check if the certification is already in the pending certifications
for pending_cert in req["pendingCerts"]:
if pending_cert["from"] == issuer_pubkey:
sys.exit("Certification is currently being processed")
return issuer
def certification_confirmation(
ctx: click.Context,
issuer: dict,
issuer_pubkey: str,
pubkey_to_certify: str,
idty_to_certify: dict,
) -> None:
cert = []
client = client_instance()
idty_timestamp = idty_to_certify["meta"]["timestamp"]
block_id_idty = get_block_id(idty_timestamp)
block = client(bma.blockchain.block, block_id_idty.number)
timestamp_date = pendulum.from_timestamp(block["time"], tz="local").format(ALL)
block_id_date = f": #{idty_timestamp[:15]}{timestamp_date}"
cert.append(["ID", issuer["uid"], "->", idty_to_certify["uid"] + block_id_date])
cert.append(
[
"Pubkey",
gen_pubkey_checksum(issuer_pubkey),
"->",
gen_pubkey_checksum(pubkey_to_certify),
],
)
params = bc_tools.get_blockchain_parameters()
cert_ends = pendulum.now().add(seconds=params["sigValidity"]).format(DATE)
cert.append(["Valid", pendulum.now().format(DATE), "—>", cert_ends])
table = tui.Table()
table.fill_rows(
cert,
["Cert", "Issuer", "->", "Recipient: Published: #block-hash date"],
)
click.echo(table.draw())
if not ctx.obj["DISPLAY_DOCUMENT"]:
tui.send_doc_confirmation("certification")
def docs_generation(
currency: str,
pubkey_to_certify: str,
idty_to_certify: dict,
issuer_pubkey: str,
head: Block,
key: SigningKey,
) -> Certification:
identity = Identity(
pubkey=pubkey_to_certify,
uid=idty_to_certify["uid"],
block_id=get_block_id(idty_to_certify["meta"]["timestamp"]),
currency=currency,
)
identity.signature = idty_to_certify["self"]
return Certification(
pubkey_from=issuer_pubkey,
identity=identity,
block_id=BlockID(head["number"], head["hash"]),
signing_key=key,
currency=currency,
)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import logging
import socket
import sys
import time
import urllib
import pendulum
import rich_click as click
from duniterpy import constants as dp_const
from duniterpy.api.bma import blockchain
from duniterpy.api.client import Client
from duniterpy.documents.block import Block
from pydiscourse import DiscourseClient
from pydiscourse.exceptions import DiscourseClientError
from silkaj import constants
from silkaj.blockchain.tools import get_blockchain_parameters
from silkaj.network import client_instance
from silkaj.tools import get_currency_symbol
from silkaj.wot.tools import wot_lookup
G1_CESIUM_URL = "https://demo.cesium.app/"
GTEST_CESIUM_URL = "https://g1-test.cesium.app/"
CESIUM_BLOCK_PATH = "#/app/block/"
DUNITER_FORUM_URL = "https://forum.duniter.org/"
MONNAIE_LIBRE_FORUM_URL = "https://forum.monnaie-libre.fr/"
DUNITER_FORUM_G1_TOPIC_ID = 4393
DUNITER_FORUM_GTEST_TOPIC_ID = 6554
MONNAIE_LIBRE_FORUM_G1_TOPIC_ID = 30219 # 26117, 17627, 8233
@click.command(
"exclusions",
help="DeathReaper: Generate membership exclusions messages, \
markdown formatted and publish them on Discourse Forums",
)
@click.option(
"-a",
"--api-id",
help="Username used on Discourse forum API",
)
@click.option(
"-du",
"--duniter-forum-api-key",
help="API key used on Duniter Forum",
)
@click.option(
"-ml",
"--ml-forum-api-key",
help="API key used on Monnaie Libre Forum",
)
@click.argument("days", default=1, type=click.FloatRange(0, 50))
@click.option(
"--publish",
is_flag=True,
help="Publish the messages on the forums, otherwise it will be printed here.",
)
def exclusions_command(api_id, duniter_forum_api_key, ml_forum_api_key, days, publish):
params = get_blockchain_parameters()
currency = params["currency"]
check_options(api_id, duniter_forum_api_key, ml_forum_api_key, publish, currency)
bma_client = client_instance()
blocks_to_process = get_blocks_to_process(bma_client, days, params)
if not blocks_to_process:
no_exclusion(days, currency)
message = gen_message_over_blocks(bma_client, blocks_to_process, params)
if not message:
no_exclusion(days, currency)
header = gen_header(blocks_to_process)
# Add ability to publish just one of the two forum, via a flags?
publish_display(
api_id,
duniter_forum_api_key,
header + message,
publish,
currency,
"duniter",
)
if currency == dp_const.G1_CURRENCY_CODENAME:
publish_display(
api_id,
ml_forum_api_key,
header + message,
publish,
currency,
"monnaielibre",
)
def check_options(api_id, duniter_forum_api_key, ml_forum_api_key, publish, currency):
if publish and (
not api_id
or not duniter_forum_api_key
or (not ml_forum_api_key and currency != dp_const.G1_TEST_CURRENCY_CODENAME)
):
sys.exit(
f"Error: To be able to publish, api_id, duniter_forum_api, and \
ml_forum_api_key (not required for {constants.GTEST_SYMBOL}) options should be specified",
)
def no_exclusion(days, currency):
# Use Humanize
print(f"No exclusion to report within the last {days} day(s) on {currency}")
# Success exit status for not failing GitLab job in case there is no exclusions
sys.exit()
def get_blocks_to_process(bma_client, days, params):
head_number = bma_client(blockchain.current)["number"]
block_number_days_ago = (
head_number - days * 24 * constants.ONE_HOUR / params["avgGenTime"]
)
# print(block_number_days_ago) # DEBUG
i = 0
blocks_with_excluded = bma_client(blockchain.excluded)["result"]["blocks"]
for i, block_number in reversed(list(enumerate(blocks_with_excluded))):
if block_number < block_number_days_ago:
index = i
break
return blocks_with_excluded[index + 1 :]
def gen_message_over_blocks(bma_client, blocks_to_process, params):
"""
Loop over the list of blocks to retrieve and parse the blocks
Ignore revocation kind of exclusion
"""
if params["currency"] == dp_const.G1_CURRENCY_CODENAME:
es_client = Client(constants.G1_CSP_USER_ENDPOINT)
else:
es_client = Client(constants.GTEST_CSP_USER_ENDPOINT)
message = ""
for block_number in blocks_to_process:
logging.info("Processing block number %s", block_number)
print(f"Processing block number {block_number}")
# DEBUG / to be removed once the #115 logging system is set
try:
block = bma_client(blockchain.block, block_number)
except urllib.error.HTTPError:
time.sleep(2)
block = bma_client(blockchain.block, block_number)
block_hash = block["hash"]
block = Block.from_signed_raw(block["raw"] + block["signature"] + "\n")
if block.revoked and block.excluded[0] == block.revoked[0].pubkey:
continue
message += generate_message(es_client, block, block_hash, params)
return message
def gen_header(blocks_to_process):
nbr_exclusions = len(blocks_to_process)
# Handle when there is one block with multiple exclusion within
# And when there is a revocation
s = "s" if nbr_exclusions > 1 else ""
des_du = "des" if nbr_exclusions > 1 else "du"
currency_symbol = get_currency_symbol()
header = f"## Exclusion{s} de la toile de confiance {currency_symbol}, perte{s} {des_du} statut{s} de membre"
message_g1 = "\n> Message automatique. Merci de notifier vos proches de leur exclusion de la toile de confiance."
return header + message_g1
def generate_message(es_client, block, block_hash, params):
"""
Loop over exclusions within a block
Generate identity header + info
"""
message = ""
for excluded in block.excluded:
lookup = wot_lookup(excluded)[0]
uid = lookup["uids"][0]["uid"]
pubkey = lookup["pubkey"]
try:
response = es_client.get(f"user/profile/{pubkey}/_source")
es_uid = response["title"]
except (urllib.error.HTTPError, socket.timeout):
es_uid = uid
logging.info("Cesium+ API: Not found pubkey or connection error")
if params["currency"] == dp_const.G1_CURRENCY_CODENAME:
cesium_url = G1_CESIUM_URL
else:
cesium_url = GTEST_CESIUM_URL
cesium_url += CESIUM_BLOCK_PATH
message += f"\n\n### @{uid} [{es_uid}]({cesium_url}{block.number}/{block_hash}?ssl=true)\n"
message += generate_identity_info(lookup, block, params)
return message
def generate_identity_info(lookup, block, params):
info = "- **Certifié·e par**"
nbr_different_certifiers = 0
for i, certifier in enumerate(lookup["uids"][0]["others"]):
if certifier["uids"][0] not in info:
nbr_different_certifiers += 1
info += elements_inbetween_list(i, lookup["uids"][0]["others"])
info += "@" + certifier["uids"][0]
if lookup["signed"]:
info += ".\n- **A certifié**"
for i, certified in enumerate(lookup["signed"]):
info += elements_inbetween_list(i, lookup["signed"])
info += "@" + certified["uid"]
dt = pendulum.from_timestamp(block.mediantime + constants.ONE_HOUR, tz="local")
info += ".\n- **Exclu·e le** " + dt.format("LLLL zz", locale="fr")
info += "\n- **Raison de l'exclusion** : "
if nbr_different_certifiers < params["sigQty"]:
info += "manque de certifications"
else:
info += "expiration du document d'adhésion"
# a renouveller tous les ans (variable) humanize(params[""])
return info
def elements_inbetween_list(i, cert_list):
return " " if i == 0 else (" et " if i + 1 == len(cert_list) else ", ")
def publish_display(api_id, forum_api_key, message, publish, currency, forum):
if publish:
topic_id = get_topic_id(currency, forum)
publish_message_on_the_forum(api_id, forum_api_key, message, topic_id, forum)
elif forum == "duniter":
click.echo(message)
def get_topic_id(currency, forum):
if currency == dp_const.G1_CURRENCY_CODENAME:
if forum == "duniter":
return DUNITER_FORUM_G1_TOPIC_ID
return MONNAIE_LIBRE_FORUM_G1_TOPIC_ID
return DUNITER_FORUM_GTEST_TOPIC_ID
def publish_message_on_the_forum(api_id, forum_api_key, message, topic_id, forum):
if forum == "duniter":
discourse_client = DiscourseClient(
DUNITER_FORUM_URL,
api_username=api_id,
api_key=forum_api_key,
)
else:
discourse_client = DiscourseClient(
MONNAIE_LIBRE_FORUM_URL,
api_username=api_id,
api_key=forum_api_key,
)
try:
response = discourse_client.create_post(message, topic_id=topic_id)
publication_link(forum, response, topic_id)
except DiscourseClientError:
logging.exception("Issue publishing on %s", forum)
# Handle DiscourseClient exceptions, pass them to the logger
# discourse_client.close()
# How to close this client? It looks like it is not implemented
# May be by closing requests' client
def publication_link(forum, response, topic_id):
forum_url = DUNITER_FORUM_URL if forum == "duniter" else MONNAIE_LIBRE_FORUM_URL
print(f"Published on {forum_url}t/{response['topic_slug']}/{topic_id!s}/last")
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import shutil
import sys
import urllib
from typing import Union
import pendulum
import rich_click as click
from duniterpy.api import bma
from duniterpy.documents import BlockID, Identity, Revocation
from texttable import Texttable
from silkaj.constants import ALL
from silkaj.network import client_instance
from silkaj.public_key import gen_pubkey_checksum
from silkaj.wot.tools import wot_lookup
def display_identity(idty: Identity) -> Texttable:
"""
Creates a table containing the identity infos
"""
client = client_instance()
id_table = []
id_table.append(["Public key", gen_pubkey_checksum(idty.pubkey)])
id_table.append(["User ID", idty.uid])
id_table.append(["Blockstamp", str(idty.block_id)])
creation_block = client(bma.blockchain.block, idty.block_id.number)
creation_date = pendulum.from_timestamp(creation_block["time"], tz="local").format(
ALL,
)
id_table.append(["Created on", creation_date])
# display infos
table = Texttable(max_width=shutil.get_terminal_size().columns)
table.add_rows(id_table, header=False)
return table
def check_many_identities(document: Union[Identity, Revocation]) -> bool:
"""
Checks if many identities match the one looked after.
Returns True if the same identity is found, False if not.
"""
doc_type = document.__class__.__name__
error_no_identical_id = f"{doc_type} document does not match any valid identity."
idty = document if doc_type == "Identity" else document.identity
try:
results_pubkey = wot_lookup(idty.pubkey)
results_uid = wot_lookup(idty.uid)
except urllib.error.HTTPError:
sys.exit(
f"{error_no_identical_id}\nuid: {idty.uid}\npubkey: \
{gen_pubkey_checksum(idty.pubkey)}",
)
# get all matching identities
lookup_ids = merge_ids_lists(results_pubkey, results_uid, idty.currency)
match = False
for n, lookup in enumerate(lookup_ids):
if idty == lookup:
lookup_ids.pop(n)
match = True
break
alternate_ids = display_alternate_ids(lookup_ids).draw()
if match:
if len(lookup_ids) >= 1:
click.echo(f"One matching identity!\nSimilar identities:\n{alternate_ids}")
return True
click.echo(f"{error_no_identical_id}\nSimilar identities:\n{alternate_ids}")
return False
def display_alternate_ids(ids_list: list) -> Texttable:
labels = ["uid", "public key", "timestamp"]
table = Texttable(max_width=shutil.get_terminal_size().columns)
table.header(labels)
for _id in ids_list:
table.add_row(
[_id.uid, gen_pubkey_checksum(_id.pubkey), str(_id.block_id)[:12]],
)
return table
def merge_ids_lists(lookups_pubkey: list, lookups_uid: list, currency: str) -> list:
"""
merge two lists of identities and remove duplicate identities.
"""
ids = ids_list_from_lookups(lookups_pubkey, currency)
ids_uid = ids_list_from_lookups(lookups_uid, currency)
for _id in ids_uid:
# __equal__ does not work. This is condition "id in ids".
for listed_id in ids:
if _id.signed_raw() == listed_id.signed_raw():
id_in_ids = True
break
id_in_ids = False
if not id_in_ids:
ids.append(_id)
return ids
def ids_list_from_lookups(lookups: list, currency: str) -> list:
ids = []
for lookup in lookups:
pubkey = lookup["pubkey"]
lookup_ids = lookup["uids"]
for _id in lookup_ids:
appended_id = Identity(
currency=currency,
pubkey=pubkey,
uid=_id["uid"],
block_id=BlockID.from_str(_id["meta"]["timestamp"]),
)
appended_id.signature = _id["self"]
ids.append(appended_id)
return ids
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import urllib
import rich_click as click
from silkaj.network import exit_on_http_error
from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
from silkaj.wot import tools as wt
@click.command("lookup", help="Username identifier and public key lookup")
@click.argument("uid_pubkey")
def lookup_cmd(uid_pubkey: str) -> None:
checked_pubkey = is_pubkey_and_check(uid_pubkey)
if checked_pubkey:
uid_pubkey = str(checked_pubkey)
try:
lookups = wt.wot_lookup(uid_pubkey)
except urllib.error.HTTPError as e:
exit_on_http_error(e, 404, f"No identity found for {uid_pubkey}")
content = f"Public keys or user id found matching '{uid_pubkey}':\n"
for lookup in lookups:
for identity in lookup["uids"]:
pubkey_checksum = gen_pubkey_checksum(lookup["pubkey"])
content += f"\n{pubkey_checksum}{identity['uid']}"
click.echo(content)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import logging
import pendulum
import rich_click as click
from duniterpy.api import bma
from duniterpy.documents import BlockID, Membership, get_block_id
from duniterpy.key import SigningKey
from silkaj import auth, tui
from silkaj.blockchain import tools as bc_tools
from silkaj.constants import DATE
from silkaj.g1_monetary_license import license_approval
from silkaj.network import client_instance, send_document
from silkaj.public_key import gen_pubkey_checksum
from silkaj.wot import tools as w_tools
@click.command("membership", help="Send or renew membership.")
@click.pass_context
def send_membership(ctx: click.Context) -> None:
dry_run = ctx.obj["DRY_RUN"]
# Authentication
key = auth.auth_method()
# Get the identity information
head_block = bc_tools.get_head_block()
membership_block_id = BlockID(head_block["number"], head_block["hash"])
identity = (w_tools.choose_identity(key.pubkey))[0]
identity_uid = identity["uid"]
identity_block_id = get_block_id(identity["meta"]["timestamp"])
# Display license and ask for confirmation
currency = head_block["currency"]
if not dry_run:
license_approval(currency)
# Confirmation
display_confirmation_table(identity_uid, key.pubkey, identity_block_id)
if not dry_run and not ctx.obj["DISPLAY_DOCUMENT"]:
tui.send_doc_confirmation("membership document for this identity")
# Create and sign membership document
membership = generate_membership_document(
key.pubkey,
membership_block_id,
identity_uid,
identity_block_id,
currency,
key,
)
logging.debug(membership.signed_raw())
if dry_run:
click.echo(membership.signed_raw())
ctx.exit()
if ctx.obj["DISPLAY_DOCUMENT"]:
click.echo(membership.signed_raw())
tui.send_doc_confirmation("membership document for this identity")
# Send the membership signed raw document to the node
send_document(bma.blockchain.membership, membership)
def display_confirmation_table(
identity_uid: str,
pubkey: str,
identity_block_id: BlockID,
) -> None:
"""
Check whether there is pending memberships already in the mempool
Display their expiration date
Actually, sending a membership document works even if the time
between two renewals is not awaited as for the certification
"""
client = client_instance()
identities_requirements = client(bma.wot.requirements, pubkey, pubkey=True)
for identity_requirements in identities_requirements["identities"]:
if identity_requirements["uid"] == identity_uid:
membership_expires = identity_requirements["membershipExpiresIn"]
pending_expires = identity_requirements["membershipPendingExpiresIn"]
pending_memberships = identity_requirements["pendingMemberships"]
break
table = []
if membership_expires:
expires = pendulum.now().add(seconds=membership_expires).diff_for_humans()
table.append(["Expiration date of current membership", expires])
if pending_memberships:
table.append(
[
"Number of pending membership(s) in the mempool",
len(pending_memberships),
],
)
table.append(
[
"Pending membership documents will expire",
pendulum.now().add(seconds=pending_expires).diff_for_humans(),
],
)
table.append(["User Identifier (UID)", identity_uid])
table.append(["Public Key", gen_pubkey_checksum(pubkey)])
table.append(["Block Identity", str(identity_block_id)[:45] + ""])
block = client(bma.blockchain.block, identity_block_id.number)
table.append(
[
"Identity published",
pendulum.from_timestamp(block["time"], tz="local").format(DATE),
],
)
params = bc_tools.get_blockchain_parameters()
table.append(
[
"Expiration date of new membership",
pendulum.now().add(seconds=params["msValidity"]).diff_for_humans(),
],
)
table.append(
[
"Expiration date of new membership from the mempool",
pendulum.now().add(seconds=params["msPeriod"]).diff_for_humans(),
],
)
display_table = tui.Table()
display_table.fill_rows(table)
click.echo(display_table.draw())
def generate_membership_document(
pubkey: str,
membership_block_id: BlockID,
identity_uid: str,
identity_block_id: BlockID,
currency: str,
key: SigningKey = None,
) -> Membership:
return Membership(
issuer=pubkey,
membership_block_id=membership_block_id,
uid=identity_uid,
identity_block_id=identity_block_id,
currency=currency,
signing_key=key,
)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
from pathlib import Path
import rich_click as click
from duniterpy.api import bma
from duniterpy.documents.block_id import BlockID
from duniterpy.documents.document import MalformedDocumentError
from duniterpy.documents.identity import Identity
from duniterpy.documents.revocation import Revocation
from duniterpy.key.verifying_key import VerifyingKey
from silkaj import auth, network, tui
from silkaj.account_storage import AccountStorage
from silkaj.blockchain import tools as bc_tools
from silkaj.constants import FAILURE_EXIT_STATUS, SUCCESS_EXIT_STATUS
from silkaj.public_key import gen_pubkey_checksum
from silkaj.wot import idty_tools
from silkaj.wot import tools as w_tools
@click.command("create", help="Create and save revocation document")
def create() -> None:
currency = bc_tools.get_currency()
key = auth.auth_method()
gen_pubkey_checksum(key.pubkey)
_id = (w_tools.choose_identity(key.pubkey))[0]
rev_doc = create_revocation_doc(_id, key.pubkey, currency)
rev_doc.sign(key)
idty_table = idty_tools.display_identity(rev_doc.identity)
click.echo(idty_table.draw())
revocation_file_path = AccountStorage().revocation_path(check_exist=False)
confirm_message = "Do you want to save the revocation document for this identity?"
if click.confirm(confirm_message):
save_doc(revocation_file_path, rev_doc.signed_raw(), key.pubkey)
else:
click.echo("Ok, goodbye!")
@click.command(
"revoke",
help="Create and publish revocation document. Will immediately revoke the identity.",
)
@click.pass_context
def revoke_now(ctx: click.Context) -> None:
currency = bc_tools.get_currency()
warn_before_dry_run_or_display(ctx)
key = auth.auth_method()
gen_pubkey_checksum(key.pubkey)
_id = (w_tools.choose_identity(key.pubkey))[0]
rev_doc = create_revocation_doc(_id, key.pubkey, currency)
rev_doc.sign(key)
if ctx.obj["DRY_RUN"]:
click.echo(rev_doc.signed_raw())
return
idty_table = idty_tools.display_identity(rev_doc.identity)
click.echo(idty_table.draw())
if ctx.obj["DISPLAY_DOCUMENT"]:
click.echo(rev_doc.signed_raw())
warn_before_sending_document()
network.send_document(bma.wot.revoke, rev_doc)
@click.command(
"verify",
help="Verifies that the revocation document is correctly formatted and matches an existing identity",
)
def verify() -> None:
revocation_file_path = AccountStorage().revocation_path()
rev_doc = verify_document(revocation_file_path)
idty_table = idty_tools.display_identity(rev_doc.identity)
click.echo(idty_table.draw())
click.echo("Revocation document is valid.")
@click.command(
"publish",
help="Publish previously created revocation document. Identity will be immediately revoked.",
)
@click.pass_context
def publish(ctx: click.Context) -> None:
revocation_file_path = AccountStorage().revocation_path()
warn_before_dry_run_or_display(ctx)
rev_doc = verify_document(revocation_file_path)
if ctx.obj["DRY_RUN"]:
click.echo(rev_doc.signed_raw())
return
idty_table = idty_tools.display_identity(rev_doc.identity)
click.echo(idty_table.draw())
if ctx.obj["DISPLAY_DOCUMENT"]:
click.echo(rev_doc.signed_raw())
warn_before_sending_document()
network.send_document(bma.wot.revoke, rev_doc)
def warn_before_dry_run_or_display(ctx: click.Context) -> None:
if ctx.obj["DRY_RUN"]:
click.echo("WARNING: the document will only be displayed and will not be sent.")
def warn_before_sending_document() -> None:
click.secho("/!\\WARNING/!\\", blink=True, fg="red")
click.echo(
"This identity will be revoked.\n\
It will cease to be member and to create the Universal Dividend.\n\
All currently sent certifications will remain valid until they expire.",
)
tui.send_doc_confirmation("revocation document immediately")
def create_revocation_doc(_id: dict, pubkey: str, currency: str) -> Revocation:
"""
Creates an unsigned revocation document.
_id is the dict object containing id infos from request wot.requirements
"""
idty = Identity(
currency=currency,
pubkey=pubkey,
uid=_id["uid"],
block_id=BlockID.from_str(_id["meta"]["timestamp"]),
)
idty.signature = _id["self"]
return Revocation(
currency=currency,
identity=idty,
)
def opener_user_rw(path, flags):
return os.open(path, flags, 0o600)
def save_doc(rev_path: Path, content: str, pubkey: str) -> None:
pubkey_cksum = gen_pubkey_checksum(pubkey)
# Ask confirmation if the file exists
if rev_path.is_file():
if click.confirm(
f"Would you like to erase existing file `{rev_path}` with the \
generated revocation document corresponding to {pubkey_cksum} public key?",
):
rev_path.unlink()
else:
click.echo("Ok, goodbye!")
sys.exit(SUCCESS_EXIT_STATUS)
with open(rev_path, "w", encoding="utf-8", opener=opener_user_rw) as fh:
fh.write(content)
click.echo(
f"Revocation document file stored into `{rev_path}` for following public key: {pubkey_cksum}",
)
def verify_document(doc: Path) -> Revocation:
"""
This checks that:
- that the revocation signature is valid.
- if the identity is unique (warns the user)
It returns the revocation document or exits.
"""
error_invalid_sign = "Error: the signature of the revocation document is invalid."
error_invalid_doc = (
f"Error: {doc} is not a revocation document, or is not correctly formatted."
)
original_doc = doc.read_text(encoding="utf-8")
try:
rev_doc = Revocation.from_signed_raw(original_doc)
except (MalformedDocumentError, IndexError):
sys.exit(error_invalid_doc)
verif_key = VerifyingKey(rev_doc.pubkey)
if not verif_key.check_signature(rev_doc.raw(), rev_doc.signature):
sys.exit(error_invalid_sign)
many_idtys = idty_tools.check_many_identities(rev_doc)
if many_idtys:
return rev_doc
sys.exit(FAILURE_EXIT_STATUS)
# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
#
# Silkaj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Silkaj is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import pendulum
import rich_click as click
from duniterpy.api.bma import blockchain, wot
from silkaj.blockchain.tools import get_blockchain_parameters
from silkaj.constants import DATE
from silkaj.network import client_instance
from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
from silkaj.tui import Table
from silkaj.wot import tools as wt
@click.command(
"status",
help="Check received and sent certifications and \
consult the membership status of any given identity",
)
@click.argument("uid_pubkey")
def status(uid_pubkey: str) -> None:
"""
get searched id
get id of received and sent certifications
display in a table the result with the numbers
"""
client = client_instance()
first_block = client(blockchain.block, 1)
time_first_block = first_block["time"]
checked_pubkey = is_pubkey_and_check(uid_pubkey)
if checked_pubkey:
uid_pubkey = str(checked_pubkey)
identity, pubkey, signed = wt.choose_identity(uid_pubkey)
certifications = {} # type: dict
params = get_blockchain_parameters()
req = (client(wot.requirements, search=pubkey, pubkey=True))["identities"][0]
certifications["received_expire"] = []
certifications["received"] = []
certifications["sent"] = []
certifications["sent_expire"] = []
for lookup_cert in identity["others"]:
for req_cert in req["certifications"]:
if req_cert["from"] == lookup_cert["pubkey"]:
certifications["received_expire"].append(
pendulum.now().add(seconds=req_cert["expiresIn"]).format(DATE),
)
certifications["received"].append(f"{lookup_cert['uids'][0]}")
break
for pending_cert in req["pendingCerts"]:
certifications["received"].append(
f"{(wt.identity_of(pending_cert['from']))['uid']}",
)
certifications["received_expire"].append(
pendulum.from_timestamp(pending_cert["expires_on"], tz="local").format(
DATE
),
)
certifications["sent"], certifications["sent_expire"] = get_sent_certifications(
signed,
time_first_block,
params,
)
nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0
table = Table(style="columns").set_cols_align(["r", "r", "r", "r"])
table.fill_from_dict(certifications)
print(
f"{identity['uid']} ({gen_pubkey_checksum(pubkey, True)}) \
from block #{identity['meta']['timestamp'][:15]}\n\
received {len(certifications['received'])} and sent \
{nbr_sent_certs}/{params['sigStock']} certifications:\n\
{table.draw()}\n\
✔: Certification written in the blockchain\n\
✘: Pending certification, deadline treatment\n",
)
membership_status(certifications, pubkey, req)
def membership_status(certifications: dict, pubkey: str, req: dict) -> None:
params = get_blockchain_parameters()
if len(certifications["received"]) >= params["sigQty"]:
date = certifications["received_expire"][
len(certifications["received"]) - params["sigQty"]
]
print(f"Membership expiration due to certification expirations: {date}")
member_lookup = wt.is_member(pubkey)
is_member = bool(member_lookup)
print("member:", is_member)
if req["revoked"]:
revoke_date = pendulum.from_timestamp(req["revoked_on"], tz="local").format(
DATE
)
print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}")
if not is_member and req["wasMember"]:
print("expired:", req["expired"], "\nwasMember:", req["wasMember"])
elif is_member:
expiration_date = (
pendulum.now().add(seconds=req["membershipExpiresIn"]).format(DATE)
)
print(f"Membership document expiration: {expiration_date}")
print("Sentry:", req["isSentry"])
print("outdistanced:", req["outdistanced"])
def get_sent_certifications(
signed: list,
time_first_block: int,
params: dict,
) -> tuple[list[str], list[str]]:
sent = []
expire = []
if signed:
for cert in signed:
sent.append(cert["uid"])
expire.append(
expiration_date_from_block_id(
cert["cert_time"]["block"],
time_first_block,
params,
),
)
return sent, expire
def expiration_date_from_block_id(
block_id: str,
time_first_block: int,
params: dict,
) -> str:
expir_timestamp = (
date_approximation(block_id, time_first_block, params["avgGenTime"])
+ params["sigValidity"]
)
return pendulum.from_timestamp(expir_timestamp, tz="local").format(DATE)
def date_approximation(block_id, time_first_block, avgentime):
return time_first_block + block_id * avgentime