diff --git a/silkaj/cli.py b/silkaj/cli.py index 22b587a1a4e13c8f8964b8e720cbf49a9440ccd1..67f3d6098079eaa4ead2aa4ecf84a7dae79231c0 100644 --- a/silkaj/cli.py +++ b/silkaj/cli.py @@ -18,6 +18,7 @@ import sys from click import group, help_option, option, pass_context, version_option from duniterpy.api.endpoint import endpoint +from silkaj import revocation from silkaj.auth import generate_auth_file from silkaj.blocks import verify_blocks_signatures from silkaj.cert import send_certification @@ -143,6 +144,22 @@ cli.add_command(verify_blocks_signatures) cli.add_command(received_sent_certifications) +@cli.group( + "revocation", + help="Create, save, verify or publish revocation document.\n\ +Subcommands optionnaly take the path to the revocation document.", +) +@help_option("-h", "--help") +def revocation_group(): + pass + + +revocation_group.add_command(revocation.save) +revocation_group.add_command(revocation.verify) +revocation_group.add_command(revocation.publish) +revocation_group.add_command(revocation.revoke_now) + + @cli.command("about", help="Display program information") def about(): print( diff --git a/silkaj/idty_tools.py b/silkaj/idty_tools.py new file mode 100644 index 0000000000000000000000000000000000000000..46df7b220627832e4ab9507d3c66a9c5f2c7e9c5 --- /dev/null +++ b/silkaj/idty_tools.py @@ -0,0 +1,127 @@ +# Copyright 2016-2021 Maël Azimi <m.a@moul.re> +# +# Silkaj is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Silkaj is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. + +import shutil +import sys + +import click +import pendulum +from duniterpy.api import bma +from duniterpy.documents.block_id import BlockID +from duniterpy.documents.identity import Identity +from texttable import Texttable + +from silkaj import wot_tools as wt +from silkaj.constants import ALL +from silkaj.network_tools import ClientInstance +from silkaj.tui import display_pubkey_and_checksum + + +def display_identity(idty: Identity): + """ + Creates a table containing the identity infos + """ + client = ClientInstance().client + id_table = list() + id_table.append(["Public key", display_pubkey_and_checksum(idty.pubkey)]) + id_table.append(["User ID", idty.uid]) + id_table.append(["Blockstamp", str(idty.block_id)]) + creation_block = client(bma.blockchain.block, idty.block_id.number) + creation_date = pendulum.from_timestamp(creation_block["time"], tz="local").format( + ALL + ) + id_table.append(["Created on", creation_date]) + # display infos + table = Texttable(max_width=shutil.get_terminal_size().columns) + table.add_rows(id_table, header=False) + return table + + +def check_many_identities(idty: Identity, doc_type: str = "Identity"): + """ + Checks if many identities match the one looked after. + Returns True if the same identity is found, False if not. + """ + error_no_identical_id = f"{doc_type} document does not match any valid identity." + # if no id, exit is a bit rough... + try: + results_pubkey = wt.wot_lookup(idty.pubkey) + results_uid = wt.wot_lookup(idty.uid) + except SystemExit as e: + sys.exit(f"{error_no_identical_id} Error: {e}") + + # get all matching identities + lookup_ids = merge_ids_lists(results_pubkey, results_uid, idty.currency) + match = False + for n, lookup in enumerate(lookup_ids): + # __equal"__ method does not work. + if idty.signed_raw() == lookup.signed_raw(): + lookup_ids.pop(n) + match = True + break + alternate_ids = display_alternate_ids(lookup_ids).draw() + if match == True: + if len(lookup_ids) >= 1: + click.echo(f"One matching identity!\nSimilar identities:\n{alternate_ids}") + return True + else: + click.echo(f"{error_no_identical_id}\nSimilar identities:\n{alternate_ids}") + return False + + +def display_alternate_ids(ids_list: list): + labels = ["uid", "public key", "timestamp"] + table = Texttable(max_width=shutil.get_terminal_size().columns) + table.header(labels) + for id in ids_list: + table.add_row( + [id.uid, display_pubkey_and_checksum(id.pubkey), str(id.block_id)[:12]] + ) + return table + + +def merge_ids_lists(lookups_pubkey: list, lookups_uid: list, currency: str): + """ + merge two lists of identities and remove duplicate identities. + """ + ids = ids_list_from_lookups(lookups_pubkey, currency) + ids_uid = ids_list_from_lookups(lookups_uid, currency) + for id in ids_uid: + # __equal__ does not work. This is condition "id in ids". + for listed_id in ids: + if id.signed_raw() == listed_id.signed_raw(): + id_in_ids = True + break + id_in_ids = False + if not id_in_ids: + ids.append(id) + return ids + + +def ids_list_from_lookups(lookups: list, currency: str): + ids = list() + for lookup in lookups: + pubkey = lookup["pubkey"] + lookup_ids = lookup["uids"] + for id in lookup_ids: + appended_id = Identity( + currency=currency, + pubkey=pubkey, + uid=id["uid"], + block_id=BlockID.from_str(id["meta"]["timestamp"]), + ) + appended_id.signature = id["self"] + ids.append(appended_id) + return ids diff --git a/silkaj/revocation.py b/silkaj/revocation.py new file mode 100644 index 0000000000000000000000000000000000000000..39451f7f7e58d80869d5fb08462d2b7449d785a1 --- /dev/null +++ b/silkaj/revocation.py @@ -0,0 +1,241 @@ +# Copyright 2016-2021 Maël Azimi <m.a@moul.re> +# +# Silkaj is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Silkaj is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. + +import logging +import sys +from pathlib import Path + +import click +from duniterpy.api import bma +from duniterpy.documents.block_id import BlockID +from duniterpy.documents.document import MalformedDocumentError +from duniterpy.documents.identity import Identity +from duniterpy.documents.revocation import Revocation +from duniterpy.key.verifying_key import VerifyingKey + +from silkaj import auth, idty_tools, tui, wot +from silkaj.blockchain_tools import get_currency +from silkaj.constants import FAILURE_EXIT_STATUS, SUCCESS_EXIT_STATUS +from silkaj.network_tools import send_document + +REVOCATION_LOCAL_PATH = "revocation.txt" + + +@click.command( + "save", + help="Create and save a revocation document. Optionnaly takes the document filename.", +) +@click.argument( + "file", + default=REVOCATION_LOCAL_PATH, +) +@click.pass_context +def save(ctx: click.core.Context, file: str): + currency = get_currency() + + key = auth.auth_method() + pubkey_ck = tui.display_pubkey_and_checksum(key.pubkey) + id = (wot.choose_identity(key.pubkey))[0] + rev_doc = create_revocation_doc(id, key.pubkey, currency) + rev_doc.sign(key) + + if ctx.obj["DRY_RUN"]: + click.echo(rev_doc.signed_raw()) + return SUCCESS_EXIT_STATUS + + idty_table = idty_tools.display_identity(rev_doc.identity) + click.echo(idty_table.draw()) + if ctx.obj["DISPLAY_DOCUMENT"]: + click.echo(rev_doc.signed_raw()) + + confirm_message = "Do you want to save the revocation document for this identity?" + if click.confirm(confirm_message): + save_doc(file, rev_doc.signed_raw(), key.pubkey) + else: + click.echo("Ok, goodbye!") + return SUCCESS_EXIT_STATUS + + +@click.command( + "revoke", + help="Create and publish revocation document. Will revoke the identity immediately.", +) +@click.pass_context +def revoke_now(ctx: click.core.Context): + currency = get_currency() + + warn_before_dry_run_or_display(ctx) + + key = auth.auth_method() + pubkey_ck = tui.display_pubkey_and_checksum(key.pubkey) + id = (wot.choose_identity(key.pubkey))[0] + rev_doc = create_revocation_doc(id, key.pubkey, currency) + rev_doc.sign(key) + + if ctx.obj["DRY_RUN"]: + click.echo(rev_doc.signed_raw()) + return SUCCESS_EXIT_STATUS + + idty_table = idty_tools.display_identity(rev_doc.identity) + click.echo(idty_table.draw()) + if ctx.obj["DISPLAY_DOCUMENT"]: + click.echo(rev_doc.signed_raw()) + + warn_before_sending_document() + send_document(bma.wot.revoke, rev_doc) + + +@click.command( + "verify", + help="Verifies that a revocation document is correctly formatted and matches an existing identity.\n\ +Optionnaly takes the document filename.", +) +@click.argument( + "file", + default=REVOCATION_LOCAL_PATH, +) +@click.pass_context +def verify(ctx: click.core.Context, file: str): + currency = get_currency() + + rev_doc = verify_document(file) + + if ctx.obj["DRY_RUN"]: + click.echo(rev_doc.signed_raw()) + return SUCCESS_EXIT_STATUS + + idty_table = idty_tools.display_identity(rev_doc.identity) + click.echo(idty_table.draw()) + if ctx.obj["DISPLAY_DOCUMENT"]: + click.echo(rev_doc.signed_raw()) + + click.echo("Revocation document is valid.") + return SUCCESS_EXIT_STATUS + + +@click.command( + "publish", + help="Publish revocation document. Identity will be revoked immediately.\n\ +Optionnaly takes the document filename.", +) +@click.argument( + "file", + default=REVOCATION_LOCAL_PATH, +) +@click.pass_context +def publish(ctx: click.core.Context, file: str): + currency = get_currency() + + warn_before_dry_run_or_display(ctx) + + rev_doc = verify_document(file) + if ctx.obj["DRY_RUN"]: + click.echo(rev_doc.signed_raw()) + return SUCCESS_EXIT_STATUS + + idty_table = idty_tools.display_identity(rev_doc.identity) + click.echo(idty_table.draw()) + if ctx.obj["DISPLAY_DOCUMENT"]: + click.echo(rev_doc.signed_raw()) + + warn_before_sending_document() + send_document(bma.wot.revoke, rev_doc) + + +def warn_before_dry_run_or_display(ctx): + if ctx.obj["DRY_RUN"]: + click.echo("WARNING: the document will only be displayed and will not be sent.") + + +def warn_before_sending_document(): + click.secho("/!\\WARNING/!\\", blink=True, fg="red") + click.echo( + "This identity will be revoked.\n\ +It will cease to be member and to create the Universal Dividend.\n\ +All currently sent certifications will remain valid until they expire." + ) + tui.send_doc_confirmation("revocation document immediately") + + +def create_revocation_doc(id, pubkey: str, currency: str): + """ + Creates an unsigned revocation document. + id is the dict object containing id infos from request wot.requirements + """ + idty = Identity( + currency=currency, + pubkey=pubkey, + uid=id["uid"], + block_id=BlockID.from_str(id["meta"]["timestamp"]), + ) + idty.signature = id["self"] + return Revocation( + currency=currency, + identity=idty, + ) + + +def save_doc(path: str, content: str, pubkey: str): + pubkey_cksum = tui.display_pubkey_and_checksum(pubkey) + rev_path = Path(path) + # Ask confirmation if the file exists + if rev_path.is_file(): + if click.confirm( + f"Would you like to erase existing file `{path}` with the generated revocation document corresponding to {pubkey_cksum} public key?" + ): + rev_path.unlink() + else: + click.echo("Ok, goodbye!") + sys.exit(SUCCESS_EXIT_STATUS) + # write doc + with open(rev_path, "w") as file: + file.write(content) + click.echo( + f"Revocation document file stored into `{path}` for following public key: {pubkey_cksum}" + ) + return SUCCESS_EXIT_STATUS + + +def verify_document(doc: str): + """ + This checks that: + - that the revocation signature is valid. + - if the identity is unique (warns the user) + It returns the revocation document or exits. + """ + error_invalid_sign = "Error: the signature of the revocation document is invalid." + error_invalid_doc = ( + f"Error: {doc} is not a revocation document, or is not correctly formatted." + ) + + if not Path(doc).is_file(): + sys.exit(f"Error: file {doc} does not exist") + with open(doc) as document: + original_doc = document.read() + + try: + rev_doc = Revocation.from_signed_raw(original_doc) + except (MalformedDocumentError, IndexError): + sys.exit(error_invalid_doc) + idty = rev_doc.identity + + verif_key = VerifyingKey(idty.pubkey) + if not verif_key.check_signature(rev_doc.raw(), rev_doc.signature): + sys.exit(error_invalid_sign) + + many_idtys = idty_tools.check_many_identities(idty, "Revocation") + if many_idtys: + return rev_doc + sys.exit(FAILURE_EXIT_STATUS)