Skip to content
Snippets Groups Projects
Commit 6abc9dcd authored by Moul's avatar Moul
Browse files

[enh] #140: wot: Add choose_identity()

to choose identity among identities from wot/lookup
Display the uid, pubkey and the blockstamp to choose
Add tests on choose_identity()

Delete get_information_for_identity() based on top of the lookup

Allow to pass pubkey to the following commands:
The uid was the only identifier before
Adapt 'wot' and 'cert' commands to choose_identity()

Import silkaj.wot and click directly
Import directly bma otherwise there is a namespace conflict on 'wot'
on silkaj.wot and bma.wot
parent ded4611a
No related branches found
No related tags found
2 merge requests!146Merge dev into master branch to complete v0.8.0 development cycle,!140Membership, choose_identity, pendulum, asynctest dependencies
......@@ -19,7 +19,7 @@ import sys
from click import command, argument, echo, confirm
from time import time
from tabulate import tabulate
from duniterpy.api.bma import wot, blockchain
from duniterpy.api import bma
from duniterpy.documents import BlockUID, block_uid, Identity, Certification
from silkaj.auth import auth_method
......@@ -28,7 +28,7 @@ from silkaj.tui import convert_time
from silkaj.network_tools import ClientInstance
from silkaj.blockchain_tools import BlockchainParams, HeadBlock
from silkaj.license import license_approval
from silkaj.wot import is_member, get_informations_for_identity
from silkaj import wot
from silkaj.constants import SUCCESS_EXIT_STATUS
......@@ -37,23 +37,24 @@ from silkaj.constants import SUCCESS_EXIT_STATUS
@coroutine
async def send_certification(id_to_certify):
client = ClientInstance().client
id_to_certify = await get_informations_for_identity(id_to_certify)
main_id_to_certify = id_to_certify["uids"][0]
idty_to_certify, pubkey_to_certify, send_certs = await wot.choose_identity(
id_to_certify
)
# Authentication
key = auth_method()
# Check whether current user is member
issuer_pubkey = key.pubkey
issuer = await is_member(issuer_pubkey)
issuer = await wot.is_member(issuer_pubkey)
if not issuer:
message_exit("Current identity is not member.")
if issuer_pubkey == id_to_certify["pubkey"]:
if issuer_pubkey == pubkey_to_certify:
message_exit("You can’t certify yourself!")
# Check if the certification can be renewed
req = await client(wot.requirements, id_to_certify["pubkey"])
req = await client(bma.wot.requirements, pubkey_to_certify)
req = req["identities"][0]
for cert in req["certifications"]:
if cert["from"] == issuer_pubkey:
......@@ -77,16 +78,16 @@ async def send_certification(id_to_certify):
# Certification confirmation
await certification_confirmation(
issuer, issuer_pubkey, id_to_certify, main_id_to_certify
issuer, issuer_pubkey, pubkey_to_certify, idty_to_certify
)
identity = Identity(
version=10,
currency=currency,
pubkey=id_to_certify["pubkey"],
uid=main_id_to_certify["uid"],
ts=block_uid(main_id_to_certify["meta"]["timestamp"]),
signature=main_id_to_certify["self"],
pubkey=pubkey_to_certify,
uid=idty_to_certify["uid"],
ts=block_uid(idty_to_certify["meta"]["timestamp"]),
signature=idty_to_certify["self"],
)
certification = Certification(
......@@ -102,7 +103,7 @@ async def send_certification(id_to_certify):
certification.sign([key])
# Send certification document
response = await client(wot.certify, certification.signed_raw())
response = await client(bma.wot.certify, certification.signed_raw())
if response.status == 200:
print("Certification successfully sent.")
......@@ -113,19 +114,19 @@ async def send_certification(id_to_certify):
async def certification_confirmation(
issuer, issuer_pubkey, id_to_certify, main_id_to_certify
issuer, issuer_pubkey, pubkey_to_certify, idty_to_certify
):
cert = list()
cert.append(["Cert", "Issuer", "–>", "Recipient: Published: #block-hash date"])
client = ClientInstance().client
idty_timestamp = main_id_to_certify["meta"]["timestamp"]
idty_timestamp = idty_to_certify["meta"]["timestamp"]
block_uid_idty = block_uid(idty_timestamp)
block = await client(blockchain.block, block_uid_idty.number)
block = await client(bma.blockchain.block, block_uid_idty.number)
block_uid_date = (
": #" + idty_timestamp[:15] + "" + convert_time(block["time"], "all")
)
cert.append(["ID", issuer["uid"], "–>", main_id_to_certify["uid"] + block_uid_date])
cert.append(["Pubkey", issuer_pubkey, "–>", id_to_certify["pubkey"]])
cert.append(["ID", issuer["uid"], "–>", idty_to_certify["uid"] + block_uid_date])
cert.append(["Pubkey", issuer_pubkey, "–>", pubkey_to_certify])
params = await BlockchainParams().params
cert_begins = convert_time(time(), "date")
cert_ends = convert_time(time() + params["sigValidity"], "date")
......
......@@ -15,7 +15,7 @@ You should have received a copy of the GNU Affero General Public License
along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
"""
from click import command, argument
import click
from time import time
from tabulate import tabulate
from collections import OrderedDict
......@@ -31,11 +31,11 @@ from silkaj.blockchain_tools import BlockchainParams
from silkaj.constants import ASYNC_SLEEP
def get_sent_certifications(certs, time_first_block, params):
def get_sent_certifications(signed, time_first_block, params):
sent = list()
expire = list()
if certs["signed"]:
for cert in certs["signed"]:
if signed:
for cert in signed:
sent.append(cert["uid"])
expire.append(
expiration_date_from_block_id(
......@@ -45,11 +45,11 @@ def get_sent_certifications(certs, time_first_block, params):
return sent, expire
@command(
@click.command(
"wot",
help="Check received and sent certifications and consult the membership status of any given identity",
)
@argument("id")
@click.argument("id")
@coroutine
async def received_sent_certifications(id):
"""
......@@ -60,50 +60,45 @@ async def received_sent_certifications(id):
client = ClientInstance().client
first_block = await client(blockchain.block, 1)
time_first_block = first_block["time"]
id_certs = await get_informations_for_identity(id)
identity, pubkey, signed = await choose_identity(id)
certifications = OrderedDict()
params = await BlockchainParams().params
for certs in id_certs["uids"]:
if certs["uid"].lower() == id.lower():
pubkey = id_certs["pubkey"]
req = await client(wot.requirements, pubkey)
req = req["identities"][0]
certifications["received_expire"] = list()
certifications["received"] = list()
for cert in certs["others"]:
certifications["received_expire"].append(
expiration_date_from_block_id(
cert["meta"]["block_number"], time_first_block, params
)
)
certifications["received"].append(
cert_written_in_the_blockchain(req["certifications"], cert)
)
(
certifications["sent"],
certifications["sent_expire"],
) = get_sent_certifications(id_certs, time_first_block, params)
nbr_sent_certs = (
len(certifications["sent"]) if "sent" in certifications else 0
)
print(
"{0} ({1}) from block #{2}\nreceived {3} and sent {4}/{5} certifications:\n{6}\n{7}\n".format(
id,
pubkey[:5] + "",
certs["meta"]["timestamp"][:15] + "",
len(certifications["received"]),
nbr_sent_certs,
params["sigStock"],
tabulate(
certifications,
headers="keys",
tablefmt="orgtbl",
stralign="center",
),
"✔: Certifications written into the blockchain",
)
req = await client(wot.requirements, pubkey)
req = req["identities"][0]
certifications["received_expire"] = list()
certifications["received"] = list()
for cert in identity["others"]:
certifications["received_expire"].append(
expiration_date_from_block_id(
cert["meta"]["block_number"], time_first_block, params
)
await membership_status(certifications, certs, pubkey, req)
)
certifications["received"].append(
cert_written_in_the_blockchain(req["certifications"], cert)
)
(
certifications["sent"],
certifications["sent_expire"],
) = get_sent_certifications(signed, time_first_block, params)
nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0
print(
"{0} ({1}) from block #{2}\nreceived {3} and sent {4}/{5} certifications:\n{6}\n{7}\n".format(
id,
pubkey[:5] + "",
identity["meta"]["timestamp"][:15] + "",
len(certifications["received"]),
nbr_sent_certs,
params["sigStock"],
tabulate(
certifications,
headers="keys",
tablefmt="orgtbl",
stralign="center",
),
"✔: Certifications written into the blockchain",
)
)
await membership_status(certifications, pubkey, req)
await client.close()
......@@ -114,7 +109,7 @@ def cert_written_in_the_blockchain(written_certs, certifieur):
return certifieur["uids"][0]
async def membership_status(certifications, certs, pubkey, req):
async def membership_status(certifications, pubkey, req):
params = await BlockchainParams().params
if len(certifications["received"]) >= params["sigQty"]:
print(
......@@ -157,8 +152,10 @@ def date_approximation(block_id, time_first_block, avgentime):
return time_first_block + block_id * avgentime
@command("id", help="Find corresponding identity or pubkey from pubkey or identity")
@argument("id_pubkey")
@click.command(
"id", help="Find corresponding identity or pubkey from pubkey or identity"
)
@click.argument("id_pubkey")
@coroutine
async def id_pubkey_correspondence(id_pubkey):
client = ClientInstance().client
......@@ -185,17 +182,48 @@ async def id_pubkey_correspondence(id_pubkey):
await client.close()
async def get_informations_for_identity(id):
async def choose_identity(pubkey_uid):
"""
Check that the id is present on the network
many identities could match
return the one searched
Get lookup from a pubkey or an uid
Loop over the double lists: pubkeys, then uids
If there is one uid, returns it
If there is multiple uids, prompt a selector
"""
certs_req = await wot_lookup(id)
for certs_id in certs_req:
if certs_id["uids"][0]["uid"].lower() == id.lower():
return certs_id
message_exit("No matching identity")
lookups = await wot_lookup(pubkey_uid)
# Generate table containing the choices
identities_choices = {"id": [], "uid": [], "pubkey": [], "timestamp": []}
for pubkey_index, lookup in enumerate(lookups):
for uid_index, identity in enumerate(lookup["uids"]):
identities_choices["id"].append(str(pubkey_index) + str(uid_index))
identities_choices["pubkey"].append(lookup["pubkey"])
identities_choices["uid"].append(identity["uid"])
identities_choices["timestamp"].append(
identity["meta"]["timestamp"][:20] + ""
)
identities = len(identities_choices["uid"])
if identities == 1:
pubkey_index = 0
uid_index = 0
elif identities > 1:
table = tabulate(identities_choices, headers="keys", tablefmt="orgtbl")
click.echo(table)
# Loop till the passed value is in identities_choices
message = "Which identity would you like to select (id)?"
selected_id = None
while selected_id not in identities_choices["id"]:
selected_id = click.prompt(message)
pubkey_index = int(selected_id[:-1])
uid_index = int(selected_id[-1:])
return (
lookups[pubkey_index]["uids"][uid_index],
lookups[pubkey_index]["pubkey"],
lookups[pubkey_index]["signed"],
)
async def identity_of(pubkey_uid):
......
"""
Copyright 2016-2020 Maël Azimi <m.a@moul.re>
Silkaj is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Silkaj is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
import click
from silkaj import wot
pubkey_titi_tata = "B4RoF48cTxzmsQDB3UjodKdZ2cVymKSKzgiPVRoMeA88"
pubkey_toto_tutu = "totoF48cTxzmsQDB3UjodKdZ2cVymKSKzgiPVRoMeA88"
def identity_card(uid, timestamp):
return {
"uid": uid,
"meta": {"timestamp": timestamp},
}
titi = identity_card(
"titi",
"590358-000156C5620946D1D63DAF82BF3AA735CE0B3518D59274171C88A7DBA4C906BC",
)
tata = identity_card(
"tata",
"210842-000000E7AAC79A07F487B33A48B3217F8A1F0A31CDB42C5DFC5220A20665B6B1",
)
toto = identity_card(
"toto",
"189601-0000011405B5C96EA69C1273370E956ED7887FA56A75E3EFDF81E866A2C49FD9",
)
tutu = identity_card(
"tutu",
"389601-0000023405B5C96EA69C1273370E956ED7887FA56A75E3EFDF81E866A2C49FD9",
)
async def patched_lookup_one(pubkey_uid):
return [
{
"pubkey": pubkey_titi_tata,
"uids": [titi],
"signed": [],
}
]
async def patched_lookup_two(pubkey_uid):
return [
{
"pubkey": pubkey_titi_tata,
"uids": [titi, tata],
"signed": [],
}
]
async def patched_lookup_three(pubkey_uid):
return [
{
"pubkey": pubkey_titi_tata,
"uids": [titi, tata],
"signed": [],
},
{
"pubkey": pubkey_toto_tutu,
"uids": [toto],
"signed": [],
},
]
async def patched_lookup_four(pubkey_uid):
return [
{
"pubkey": pubkey_titi_tata,
"uids": [titi, tata],
"signed": [],
},
{
"pubkey": pubkey_toto_tutu,
"uids": [toto, tutu],
"signed": [],
},
]
async def patched_lookup_five(pubkey_uid):
return [
{
"pubkey": pubkey_titi_tata,
"uids": [titi],
"signed": [],
},
{
"pubkey": pubkey_toto_tutu,
"uids": [titi],
"signed": [],
},
]
def patched_prompt_titi(message):
return "00"
def patched_prompt_tata(message):
return "01"
def patched_prompt_toto(message):
return "10"
def patched_prompt_tutu(message):
return "11"
@pytest.mark.parametrize(
"selected_uid, pubkey, patched_prompt, patched_lookup",
[
("titi", pubkey_titi_tata, patched_prompt_titi, patched_lookup_one),
("tata", pubkey_titi_tata, patched_prompt_tata, patched_lookup_two),
("toto", pubkey_toto_tutu, patched_prompt_toto, patched_lookup_three),
("tutu", pubkey_toto_tutu, patched_prompt_tutu, patched_lookup_four),
("titi", pubkey_toto_tutu, patched_prompt_toto, patched_lookup_five),
],
)
@pytest.mark.asyncio
async def test_choose_identity(
selected_uid, pubkey, patched_prompt, patched_lookup, capsys, monkeypatch
):
monkeypatch.setattr(wot, "wot_lookup", patched_lookup)
monkeypatch.setattr(click, "prompt", patched_prompt)
identity_card, get_pubkey, signed = await wot.choose_identity(pubkey)
assert pubkey == get_pubkey
assert selected_uid == identity_card["uid"]
# Check whether the table is not displayed in case of one identity
# Check it is displayed for more than one identity
# Check the uids and ids are in
captured = capsys.readouterr()
lookups = await patched_lookup("")
# only one pubkey and one uid on this pubkey
if len(lookups) == 1 and len(lookups[0]["uids"]) == 1:
assert not captured.out
# many pubkeys or many uid on one pubkey
else:
# if more than one pubkey, there should be a "10" numbering
if len(lookups) > 1:
assert " 10 " in captured.out
for lookup in lookups:
if len(lookup["uids"]) > 1:
assert " 01 " in captured.out
for uid in lookup["uids"]:
assert uid["uid"] in captured.out
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment