Skip to content
Snippets Groups Projects
Commit 87b0f4ca authored by Vincent Texier's avatar Vincent Texier
Browse files

[enh] #94 add mypy verification on tests and examples

Fix mypy alerts
Improve duniterpy static type
Improve examples
parent 73b4e415
No related branches found
No related tags found
1 merge request!66Merge of check_tests_and_examples to dev
......@@ -14,6 +14,8 @@ check: mypy pylint check-format
# check static typing
mypy:
python3 -m mypy duniterpy --ignore-missing-imports
python3 -m mypy tests --ignore-missing-imports
python3 -m mypy examples --ignore-missing-imports
# check code errors
pylint:
......
......@@ -4,7 +4,7 @@ Created on 2 déc. 2014
@author: inso
"""
import re
from typing import TypeVar, Type
from typing import TypeVar, Type, Optional
from .block_uid import BlockUID
from .document import Document, MalformedDocumentError
......@@ -72,7 +72,7 @@ class Membership(Document):
membership_type: str,
uid: str,
identity_ts: BlockUID,
signature: str,
signature: Optional[str] = None,
) -> None:
"""
Create a membership document
......@@ -86,7 +86,12 @@ class Membership(Document):
:param identity_ts: BlockUID of the identity
:param signature: Signature of the document
"""
super().__init__(version, currency, [signature])
if signature:
signatures = [signature]
else:
signatures = []
super().__init__(version, currency, signatures)
self.issuer = issuer
self.membership_ts = membership_ts
self.membership_type = membership_type
......
"""
duniter public and private keys
@author: inso
@author: inso, vtexier, Moul
"""
import re
from typing import Optional, Union, TypeVar, Type
......
......@@ -44,7 +44,7 @@ signer.save_private_key(PRIVATE_KEYS_FILE_PATH)
print("Private keys for public key %s saved in %s" % (pubkey, PRIVATE_KEYS_FILE_PATH))
# load private keys from file
loaded_signer = SigningKey.from_private_key(PRIVATE_KEYS_FILE_PATH)
loaded_signer = SigningKey.from_private_key(PRIVATE_KEYS_FILE_PATH) # type: SigningKey
# check public key from file
print("Public key %s loaded from file %s" % (pubkey, PRIVATE_KEYS_FILE_PATH))
......
......@@ -48,7 +48,7 @@ print("Private key for public key %s saved in %s" % (signer.pubkey, PRIVATE_KEY_
try:
# load private keys from file
loaded_signer = SigningKey.from_ewif_file(PRIVATE_KEY_FILE_PATH, ewif_password)
loaded_signer = SigningKey.from_ewif_file(PRIVATE_KEY_FILE_PATH, ewif_password) # type: SigningKey
# check public key from file
print("Public key %s loaded from file %s" % (loaded_signer.pubkey, PRIVATE_KEY_FILE_PATH))
......
......@@ -45,7 +45,7 @@ print("Private key for public key %s saved in %s" % (signer.pubkey, PRIVATE_KEY_
try:
# load private keys from file
loaded_signer = SigningKey.from_wif_file(PRIVATE_KEY_FILE_PATH)
loaded_signer = SigningKey.from_wif_file(PRIVATE_KEY_FILE_PATH) # type: SigningKey
# check public key from file
print("Public key %s loaded from file %s" % (loaded_signer.pubkey, PRIVATE_KEY_FILE_PATH))
......
import asyncio
import getpass
import os
from typing import Optional
import duniterpy.api.bma as bma
from duniterpy.api.client import Client
......@@ -35,43 +36,42 @@ PROTOCOL_VERSION = 10
################################################
async def get_identity_document(client: Client, currency: str, pubkey: str) -> Identity:
async def get_identity_document(client: Client, current_block: dict, pubkey: str) -> Optional[Identity]:
"""
Get the Identity document of the pubkey
Get the identity document of the pubkey
:param client: Client to connect to the api
:param currency: Currency name
:param pubkey: Public key
:param current_block: Current block data
:param pubkey: UID/Public key
:rtype: Identity
"""
# Here we request for the path wot/lookup/pubkey
lookup_data = await client(bma.wot.lookup, pubkey)
# init vars
uid = None
timestamp = BlockUID.empty()
signature = None
identity = None
# parse results
for result in lookup_data['results']:
if result["pubkey"] == pubkey:
uids = result['uids']
for uid_data in uids:
uid_data = uids[0]
# capture data
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
uid = uid_data["uid"]
signature = uid_data["self"]
uid = uid_data["uid"] # type: str
signature = uid_data["self"] # type: str
# return self-certification document
return Identity(
version=PROTOCOL_VERSION,
currency=currency,
identity = Identity(
version=10,
currency=current_block['currency'],
pubkey=pubkey,
uid=uid,
ts=timestamp,
signature=signature
)
break
return identity
def get_signed_raw_revocation_document(identity: Identity, salt: str, password: str) -> str:
......@@ -122,8 +122,13 @@ async def main():
# capture current block to get currency name
current_block = await client(bma.blockchain.current)
# create our Identity document to sign the revoke document
identity_document = await get_identity_document(client, current_block['currency'], pubkey)
# create our Identity document to sign the Certification document
identity = await get_identity_document(client, current_block, pubkey)
if identity is None:
print("Identity not found for pubkey {0}".format(pubkey))
# Close client aiohttp session
await client.close()
exit(1)
# get the revoke document
revocation_signed_raw_document = get_signed_raw_revocation_document(identity_document, salt, password)
......
import asyncio
import getpass
from typing import Optional
import duniterpy.api.bma as bma
from duniterpy.api.client import Client
......@@ -17,7 +18,7 @@ BMAS_ENDPOINT = "BMAS g1-test.duniter.org 443"
################################################
async def get_identity_document(client: Client, current_block: dict, pubkey: str) -> Identity:
async def get_identity_document(client: Client, current_block: dict, pubkey: str) -> Optional[Identity]:
"""
Get the identity document of the pubkey
......@@ -29,24 +30,20 @@ async def get_identity_document(client: Client, current_block: dict, pubkey: str
"""
# Here we request for the path wot/lookup/pubkey
lookup_data = await client(bma.wot.lookup, pubkey)
# init vars
uid = None
timestamp = BlockUID.empty()
signature = None
identity = None
# parse results
for result in lookup_data['results']:
if result["pubkey"] == pubkey:
uids = result['uids']
for uid_data in uids:
uid_data = uids[0]
# capture data
timestamp = BlockUID.from_str(uid_data["meta"]["timestamp"])
uid = uid_data["uid"]
signature = uid_data["self"]
uid = uid_data["uid"] # type: str
signature = uid_data["self"] # type: str
# return self-certification document
return Identity(
identity = Identity(
version=10,
currency=current_block['currency'],
pubkey=pubkey,
......@@ -54,6 +51,9 @@ async def get_identity_document(client: Client, current_block: dict, pubkey: str
ts=timestamp,
signature=signature
)
break
return identity
def get_certification_document(current_block: dict, self_cert_document: Identity, from_pubkey: str) -> Certification:
......@@ -101,6 +101,11 @@ async def main():
# create our Identity document to sign the Certification document
identity = await get_identity_document(client, current_block, pubkey_to)
if identity is None:
print("Identity not found for pubkey {0}".format(pubkey_to))
# Close client aiohttp session
await client.close()
exit(1)
# send the Certification document to the node
certification = get_certification_document(current_block, identity, pubkey_from)
......
......@@ -79,8 +79,7 @@ def get_membership_document(membership_type: str, current_block: dict, identity:
membership_ts=timestamp,
membership_type=membership_type,
uid=identity.uid,
identity_ts=identity.timestamp,
signature=None
identity_ts=identity.timestamp
)
# sign document
......
......@@ -12,7 +12,8 @@ class WebFunctionalSetupMixin:
def setUp(self) -> None:
self.handler = None
self.runner = None
self.app = web.Application()
self.runner = web.AppRunner(self.app)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
......@@ -45,16 +46,15 @@ class WebFunctionalSetupMixin:
:param ssl_ctx: SSL context (https is used if not None)
:return:
"""
app = web.Application()
if handler:
app.router.add_route(method, path, handler)
self.app.router.add_route(method, path, handler)
port = self.find_unused_port()
self.runner = web.AppRunner(app)
await self.runner.setup()
port = self.find_unused_port()
site = web.TCPSite(self.runner, '127.0.0.1', port)
await site.start()
protocol = "https" if ssl_ctx else "http"
url = "{}://127.0.0.1:{}".format(protocol, port) + path
return app, port, url
return self.app, port, url
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment