Commit e685da93 authored by Vincent Texier's avatar Vincent Texier

issue #52 add type hinting and change some method signatures (break BC!)

Document sign, raw and signed_raw methods does not have arguments anymore
Certification sign, raw and signed_raw are renamed
Revocation sign, raw and signed_raw are renamed
verifying_key.py verify_document signature has changed
parent e118a6fb
import base64
import hashlib
import re
from typing import Union, TypeVar, Type, Optional, List
from typing import Union, TypeVar, Type, Optional, List, Sequence
from .certification import Identity, Certification, Revocation
from .document import Document, MalformedDocumentError
......@@ -37,6 +37,8 @@ class BlockUID:
:param blockid: The block id
"""
data = BlockUID.re_block_uid.match(blockid)
if data is None:
raise MalformedDocumentError("BlockUID")
try:
number = int(data.group(1))
except AttributeError:
......@@ -52,19 +54,29 @@ class BlockUID:
def __str__(self) -> str:
return "{0}-{1}".format(self.number, self.sha_hash)
def __eq__(self, other: Type[BlockUIDType]) -> bool:
def __eq__(self, other: object) -> bool:
if not isinstance(other, BlockUID):
return False
return self.number == other.number and self.sha_hash == other.sha_hash
def __lt__(self, other: Type[BlockUIDType]) -> bool:
def __lt__(self, other: object) -> bool:
if not isinstance(other, BlockUID):
return False
return self.number < other.number
def __gt__(self, other: Type[BlockUIDType]) -> bool:
def __gt__(self, other: object) -> bool:
if not isinstance(other, BlockUID):
return False
return self.number > other.number
def __le__(self, other: Type[BlockUIDType]) -> bool:
def __le__(self, other: object) -> bool:
if not isinstance(other, BlockUID):
return False
return self.number <= other.number
def __ge__(self, other: Type[BlockUIDType]) -> bool:
def __ge__(self, other: object) -> bool:
if not isinstance(other, BlockUID):
return False
return self.number >= other.number
def __hash__(self) -> int:
......@@ -215,9 +227,9 @@ The class Block handles Block documents.
issuers_frame: int,
issuers_frame_var: int,
different_issuers_count: int,
prev_hash: str,
prev_issuer: str,
parameters: str,
prev_hash: Optional[str],
prev_issuer: Optional[str],
parameters: Optional[Sequence[str]],
members_count: int,
identities: List[Identity],
joiners: List[Membership],
......@@ -297,7 +309,7 @@ The class Block handles Block documents.
self.noonce = noonce
@property
def blockUID(self) -> BlockUIDType:
def blockUID(self) -> BlockUID:
return BlockUID(self.number, self.proof_of_work())
@classmethod
......@@ -326,9 +338,10 @@ The class Block handles Block documents.
mediantime = int(Block.parse_field("MedianTime", lines[n]))
n += 1
ud = Block.re_universaldividend.match(lines[n])
unit_base = None
if ud is not None:
ud_match = Block.re_universaldividend.match(lines[n])
ud = None
unit_base = 0
if ud_match is not None:
ud = int(Block.parse_field("UD", lines[n]))
n += 1
......@@ -354,19 +367,25 @@ The class Block handles Block documents.
prev_hash = None
prev_issuer = None
if number > 0:
prev_hash = Block.parse_field("PreviousHash", lines[n])
prev_hash = str(Block.parse_field("PreviousHash", lines[n]))
n += 1
prev_issuer = Block.parse_field("PreviousIssuer", lines[n])
prev_issuer = str(Block.parse_field("PreviousIssuer", lines[n]))
n += 1
parameters = None
if number == 0:
try:
if version >= 10:
parameters = Block.re_parameters_v10.match(lines[n]).groups()
params_match = Block.re_parameters_v10.match(lines[n])
if params_match is None:
raise MalformedDocumentError("Parameters")
parameters = params_match.groups()
else:
parameters = Block.re_parameters.match(lines[n]).groups()
params_match = Block.re_parameters.match(lines[n])
if params_match is None:
raise MalformedDocumentError("Parameters")
parameters = params_match.groups()
n += 1
except AttributeError:
raise MalformedDocumentError("Parameters")
......@@ -421,8 +440,10 @@ The class Block handles Block documents.
if Block.re_excluded.match(lines[n]):
n += 1
while Block.re_certifications.match(lines[n]) is None:
exclusion = Block.re_exclusion.match(lines[n]).group(1)
excluded.append(exclusion)
exclusion_match = Block.re_exclusion.match(lines[n])
if exclusion_match is not None:
exclusion = exclusion_match.group(1)
excluded.append(exclusion)
n += 1
if Block.re_certifications.match(lines[n]):
......@@ -502,7 +523,7 @@ IssuersFrameVar: {1}
DifferentIssuersCount: {2}
""".format(self.issuers_frame, self.issuers_frame_var, self.different_issuers_count)
if self.number == 0:
if self.number == 0 and self.parameters is not None:
str_params = ":".join([str(p) for p in self.parameters])
doc += "Parameters: {0}\n".format(str_params)
else:
......@@ -571,17 +592,27 @@ Nonce: {nonce}
signing = base64.b64encode(key.signature(bytes(signed, 'ascii')))
self.signatures = [signing.decode("ascii")]
def __eq__(self, other: Type[BlockType]) -> bool:
def __eq__(self, other: object) -> bool:
if not isinstance(other, Block):
return False
return self.blockUID == other.blockUID
def __lt__(self, other: Type[BlockType]) -> bool:
def __lt__(self, other: object) -> bool:
if not isinstance(other, Block):
return False
return self.blockUID < other.blockUID
def __gt__(self, other: Type[BlockType]) -> bool:
def __gt__(self, other: object) -> bool:
if not isinstance(other, Block):
return False
return self.blockUID > other.blockUID
def __le__(self, other: Type[BlockType]) -> bool:
def __le__(self, other: object) -> bool:
if not isinstance(other, Block):
return False
return self.blockUID <= other.blockUID
def __ge__(self, other: Type[BlockType]) -> bool:
def __ge__(self, other: object) -> bool:
if not isinstance(other, Block):
return False
return self.blockUID >= other.blockUID
This diff is collapsed.
......@@ -2,6 +2,7 @@ import base64
import hashlib
import logging
import re
from typing import TypeVar, Type, Any, List
from ..constants import SIGNATURE_REGEX
......@@ -11,10 +12,19 @@ class MalformedDocumentError(Exception):
Malformed document exception
"""
def __init__(self, field_name):
def __init__(self, field_name: str) -> None:
"""
Init exception instance
:param field_name: Name of the wrong field
"""
super().__init__("Could not parse field {0}".format(field_name))
# required to type hint cls in classmethod
DocumentType = TypeVar('DocumentType', bound='Document')
class Document:
re_version = re.compile("Version: ([0-9]+)\n")
re_currency = re.compile("Currency: ([^\n]+)\n")
......@@ -26,21 +36,14 @@ class Document:
"Signature": re_signature
}
@classmethod
def parse_field(cls, field_name, line):
def __init__(self, version: int, currency: str, signatures: List[str]) -> None:
"""
Init Document instance
:param field_name:
:param line:
:return:
:param version: Version of the Document
:param currency: Name of the currency
:param signatures: List of signatures
"""
try:
value = cls.fields_parsers[field_name].match(line).group(1)
except AttributeError:
raise MalformedDocumentError(field_name)
return value
def __init__(self, version, currency, signatures):
if version < 2:
raise MalformedDocumentError("Version 1 documents are not handled by duniterpy>0.2")
self.version = version
......@@ -50,10 +53,31 @@ class Document:
else:
self.signatures = []
def sign(self, keys):
@classmethod
def parse_field(cls: Type[DocumentType], field_name: str, line: str) -> Any:
"""
Parse a document field with regular expression and return the value
:param field_name: Name of the field
:param line: Line string to parse
:return:
"""
try:
match = cls.fields_parsers[field_name].match(line)
if match is None:
raise AttributeError
value = match.group(1)
except AttributeError:
raise MalformedDocumentError(field_name)
return value
def sign(self, keys: list) -> None:
"""
Sign the current document.
Warning : current signatures will be replaced with the new ones.
:param keys: List of libnacl keys instance
"""
self.signatures = []
for key in keys:
......@@ -61,16 +85,17 @@ class Document:
logging.debug("Signature : \n{0}".format(signing.decode("ascii")))
self.signatures.append(signing.decode("ascii"))
def raw(self, **kwargs):
def raw(self) -> str:
"""
Returns the raw document in string format
"""
raise NotImplementedError()
def signed_raw(self, *args):
def signed_raw(self) -> str:
"""
If keys are None, returns the raw + current signatures
If keys are present, returns the raw signed by these keys
:return:
"""
raw = self.raw()
signed = "\n".join(self.signatures)
......@@ -78,5 +103,10 @@ class Document:
return signed_raw
@property
def sha_hash(self):
def sha_hash(self) -> str:
"""
Return uppercase hex sha256 hash from signed raw document
:return:
"""
return hashlib.sha256(self.signed_raw().encode("ascii")).hexdigest().upper()
......@@ -17,6 +17,7 @@ class VerifyingKey(libnacl.sign.Verifier):
"""
Class to verify documents
"""
def __init__(self, pubkey: str) -> None:
"""
Creates a Verify class from base58 pubkey
......@@ -25,14 +26,14 @@ class VerifyingKey(libnacl.sign.Verifier):
key = libnacl.encode.hex_encode(Base58Encoder.decode(pubkey))
super().__init__(key)
def verify_document(self, document: Document, **kwargs) -> bool:
def verify_document(self, document: Document) -> bool:
"""
Check specified document
:param duniterpy.documents.Document document:
:return:
"""
signature = base64.b64decode(document.signatures[0])
prepended = signature + bytes(document.raw(**kwargs), 'ascii')
prepended = signature + bytes(document.raw(), 'ascii')
try:
self.verify(prepended)
......@@ -66,4 +67,3 @@ class VerifyingKey(libnacl.sign.Verifier):
:return str:
"""
return self.verify(message).decode('utf-8')
......@@ -87,8 +87,8 @@ def get_signed_raw_revocation_document(identity: Identity, salt: str, password:
revocation = Revocation(PROTOCOL_VERSION, identity.currency, identity.pubkey, "")
key = SigningKey(salt, password)
revocation.sign(identity, [key])
return revocation.signed_raw(identity)
revocation.sign_for_revoked(identity, [key])
return revocation.signed_raw_for_revoked(identity)
async def main():
......
......@@ -80,7 +80,7 @@ def get_certification_document(current_block: dict, self_cert_document: Identity
)
# sign document
key = SigningKey(salt, password)
certification.sign(self_cert_document, [key])
certification.sign_for_certified(self_cert_document, [key])
return certification
......@@ -118,7 +118,7 @@ async def main():
certification = get_certification_document(current_block, identity, pubkey_from, salt, password)
# Here we request for the path wot/certify
response = await client(bma.wot.certify, certification.signed_raw(identity))
response = await client(bma.wot.certify, certification.signed_raw_for_certified(identity))
if response.status == 200:
print(await response.text())
......
......@@ -107,10 +107,10 @@ IdtySignature: J3G9oM5AKYZNLAB5Wx499w61NuUoS57JVccTShUbGpCMjCqj9yXXqNq7dyZpDWA6B
CertTimestamp: 36-1076F10A7397715D2BEE82579861999EA1F274AC
SoKwoa8PFfCDJWZ6dNCv7XstezHcc2BbKiJgVDXv82R5zYR83nis9dShLgWJ5w48noVUHimdngzYQneNYSMV3rk
"""
self.assertEqual(certification.signed_raw(selfcert), result)
self.assertEqual(certification.signed_raw_for_certified(selfcert), result)
from_raw = Certification.from_signed_raw(certification.signed_raw(selfcert))
self.assertEqual(from_raw.signed_raw(selfcert), result)
from_raw = Certification.from_signed_raw(certification.signed_raw_for_certified(selfcert))
self.assertEqual(from_raw.signed_raw_for_certified(selfcert), result)
def test_revokation_from_inline(self):
version = 2
......@@ -139,7 +139,7 @@ IdtyTimestamp: 32-DB30D958EE5CB75186972286ED3F4686B8A1C2CD
IdtySignature: J3G9oM5AKYZNLAB5Wx499w61NuUoS57JVccTShUbGpCMjCqj9yXXqNq7dyZpDWA6BxipsiaMZhujMeBfCznzyci
SoKwoa8PFfCDJWZ6dNCv7XstezHcc2BbKiJgVDXv82R5zYR83nis9dShLgWJ5w48noVUHimdngzYQneNYSMV3rk
"""
self.assertEqual(revokation.signed_raw(selfcert), result)
self.assertEqual(revokation.signed_raw_for_revoked(selfcert), result)
def test_revokation_from_signed_raw(self):
signed_raw = """Version: 2
......@@ -153,4 +153,4 @@ SoKwoa8PFfCDJWZ6dNCv7XstezHcc2BbKiJgVDXv82R5zYR83nis9dShLgWJ5w48noVUHimdngzYQneN
"""
revocation = Revocation.from_signed_raw(signed_raw)
selfcert = Revocation.extract_self_cert(signed_raw)
self.assertEqual(revocation.signed_raw(selfcert), signed_raw)
self.assertEqual(revocation.signed_raw_for_revoked(selfcert), signed_raw)
......@@ -47,7 +47,7 @@ class Test_Membership(unittest.TestCase):
def test_fromraw_toraw(self):
membership = Membership.from_signed_raw(membership_raw)
rendered_membership = membership.signed_raw()
rendered_membership = membership.signed_raw_for_certified()
from_rendered_membership = Membership.from_signed_raw(rendered_membership)
self.assertEqual(from_rendered_membership.issuer, "HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk")
self.assertEqual(from_rendered_membership.membership_ts.number, 0)
......
......@@ -59,7 +59,7 @@ class TestPeer(unittest.TestCase):
def test_fromraw_toraw(self):
peer = Peer.from_signed_raw(rawpeer)
rendered_peer = peer.signed_raw()
rendered_peer = peer.signed_raw_for_certified()
from_rendered_peer = Peer.from_signed_raw(rendered_peer)
self.assertEqual(from_rendered_peer.currency, "beta_brousouf")
......@@ -87,9 +87,9 @@ class TestPeer(unittest.TestCase):
self.assertEqual(from_rendered_peer.signatures[0],
"dkaXIiCYUJtCg8Feh/BKvPYf4uFH9CJ/zY6J4MlA9BsjmcMe4YAblvNt/gJy31b1aGq3ue3h14mLMCu84rraDg==")
self.assertEqual(rawpeer, from_rendered_peer.signed_raw())
self.assertEqual(rawpeer, from_rendered_peer.signed_raw_for_certified())
def test_incorrect(self):
peer = Peer.from_signed_raw(test_weird_ipv6_peer)
rendered_peer = peer.signed_raw()
rendered_peer = peer.signed_raw_for_certified()
Peer.from_signed_raw(rendered_peer)
......@@ -299,7 +299,7 @@ class Test_Transaction(unittest.TestCase):
def test_fromraw_toraw(self):
tx = Transaction.from_signed_raw(tx_raw)
rendered_tx = tx.signed_raw()
rendered_tx = tx.signed_raw_for_certified()
from_rendered_tx = Transaction.from_signed_raw(rendered_tx)
self.assertEqual(tx.version, 2)
......@@ -366,7 +366,7 @@ class Test_Transaction(unittest.TestCase):
def test_fromraw_toraw_v3(self):
tx = Transaction.from_signed_raw(tx_raw_v3)
rendered_tx = tx.signed_raw()
rendered_tx = tx.signed_raw_for_certified()
from_rendered_tx = Transaction.from_signed_raw(rendered_tx)
self.assertEqual(tx.version, 3)
......@@ -435,7 +435,7 @@ class Test_Transaction(unittest.TestCase):
def test_compact_change(self):
tx = Transaction.from_compact("gtest", compact_change)
rendered_tx = tx.signed_raw()
rendered_tx = tx.signed_raw_for_certified()
from_rendered_tx = Transaction.from_signed_raw(rendered_tx)
def test_reduce_base(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment