Skip to content
Snippets Groups Projects
Commit cc5ae9e0 authored by inso's avatar inso
Browse files

Handle WS2P API

parent 4ef05e09
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -12,6 +12,12 @@ WS2P_HEADS_SCHEMA = {
"items": {
"type": "object",
"properties": {
"message": {
"type": "string"
},
"sig": {
"type": "string",
},
"messageV2": {
"type": "string"
},
......@@ -29,6 +35,7 @@ WS2P_HEADS_SCHEMA = {
"required": ["heads"]
}
async def heads(connection):
"""
GET Certification data over a member
......
......@@ -15,3 +15,6 @@ ipv6_regex = '(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?
ws2pid_regex = "[0-9a-f]{8}"
host_regex = "[a-z0-9-_.]*(?:.[a-zA-Z])?"
path_regex = "[/\w \.-]*/?"
ws2p_private_prefix_regex = "O[CT][SAM]"
ws2p_public_prefix_regex = "I[CT]"
ws2p_head_regex = "HEAD:?(?:[0-9]+)?"
import attr
import re
from ..block import BlockUID
from ..constants import ws2p_public_prefix_regex, ws2p_private_prefix_regex,\
pubkey_regex, signature_regex, ws2pid_regex, block_uid_regex, ws2p_head_regex
@attr.s()
class API:
re_inline = re.compile("WS2P({ws2p_private})?({ws2p_public})?"
.format(
ws2p_private=ws2p_private_prefix_regex,
ws2p_public=ws2p_public_prefix_regex))
private = attr.ib(type=str)
public = attr.ib(type=str)
@classmethod
def from_inline(cls, inline):
data = API.re_inline.match(inline)
if data.group(1):
private = data.group(1)
else:
private = ""
if data.group(2):
public = data.group(2)
else:
public = ""
return cls(private, public)
def __str__(self):
return "WS2P" + self.private + self.public
@attr.s()
class Head:
re_inline = re.compile(ws2p_head_regex)
version = attr.ib(type=int)
@classmethod
def from_inline(cls, inline):
data = Head.re_inline.match(inline)
head = data.group(0).split(':')
if len(head) == 2:
version = int(head[1])
else:
version = 0
return cls(version)
def __str__(self):
return "HEAD" if self.version == 0 else "HEAD:{}".format(str(self.version))
@attr.s()
class HeadV0:
"""
A document describing a self certification.
"""
re_inline = re.compile("^(WS2P(?:{ws2p_private})?(?:{ws2p_public})?):({head}):({pubkey}):({blockstamp})(?::)?(.*)"
.format(ws2p_private=ws2p_private_prefix_regex,
ws2p_public=ws2p_public_prefix_regex,
head=ws2p_head_regex,
version="[0-9]+",
pubkey=pubkey_regex,
blockstamp=block_uid_regex))
re_signature = re.compile(signature_regex)
signature = attr.ib(type=str)
api = attr.ib(type=API)
head = attr.ib(type=Head)
pubkey = attr.ib(type=str)
blockstamp = attr.ib(type=BlockUID)
@classmethod
def from_inline(cls, inline, signature):
data = HeadV0.re_inline.match(inline)
api = API.from_inline(data.group(1))
head = Head.from_inline(data.group(2))
pubkey = data.group(3)
blockstamp = data.group(4)
offload = data.group(5)
return cls(signature, api, head, pubkey, blockstamp), offload
def inline(self):
values = (str(v) for v in attr.astuple(self, recurse=False,
filter=attr.filters.exclude(attr.fields(HeadV0).signature)))
return ":".join(values)
@attr.s()
class HeadV1:
re_inline = re.compile("({ws2pid}):" \
"({software}):({software_version}):({pow_prefix})(?::)?(.*)"
.format(
ws2pid=ws2pid_regex,
software="[A-Za-z-_]+",
software_version="[0-9]+[.][0-9]+[.][0-9]+",
pow_prefix="[0-9]+"))
v0 = attr.ib(type=HeadV0)
ws2pid = attr.ib(type=str)
software = attr.ib(type=str)
software_version = attr.ib(type=str)
pow_prefix= attr.ib(type=int)
@classmethod
def from_inline(cls, inline, signature):
v0, offload = HeadV0.from_inline(inline, signature)
data = HeadV1.re_inline.match(offload)
ws2pid = data.group(1)
software = data.group(2)
software_version = data.group(3)
pow_prefix = int(data.group(4))
offload = data.group(5)
return cls(v0, ws2pid, software, software_version, pow_prefix), offload
def inline(self):
values = [str(v) for v in attr.astuple(self, True, filter=attr.filters.exclude(attr.fields(HeadV1).v0))]
return self.v0.inline() + ":" + ":".join(values)
@property
def pubkey(self):
return self.v0.pubkey
@property
def signature(self):
return self.v0.signature
@attr.s
class HeadV2:
re_inline = re.compile("({free_member_room}):({free_mirror_room})(?::)?(.*)"
.format(
free_member_room="[0-9]+",
free_mirror_room="[0-9]+"))
v1 = attr.ib(type=HeadV1)
free_member_room = attr.ib(type=int)
free_mirror_room = attr.ib(type=int)
@classmethod
def from_inline(cls, inline, signature):
v1, offload = HeadV1.from_inline(inline, signature)
data = HeadV2.re_inline.match(offload)
free_member_room = int(data.group(1))
free_mirror_room = int(data.group(2))
return cls(v1, free_member_room, free_mirror_room)
def inline(self):
values = (str(v) for v in attr.astuple(self, True, filter=attr.filters.exclude(attr.fields(HeadV2).v1)))
return self.v1.inline() + ":" + ":".join(values)
@property
def pubkey(self):
return self.v1.pubkey
@property
def signature(self):
return self.v1.signature
\ No newline at end of file
......@@ -37,3 +37,19 @@ class VerifyingKey(libnacl.sign.Verifier):
return True
except ValueError:
return False
def verify_ws2p_head(self, head):
"""
Check specified document
:param duniterpy.documents.Document document:
:return:
"""
signature = base64.b64decode(head.signature)
inline = head.inline()
prepended = signature + bytes(inline, 'ascii')
try:
self.verify(prepended)
return True
except ValueError:
return False
import unittest
from duniterpy.documents.ws2p.heads import *
headv1_clear = ""
headv1_tor = ""
headv2 = ""
class TestWS2PHeads(unittest.TestCase):
def test_headv0(self):
headv0, _ = HeadV0.from_inline("WS2P:HEAD:3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj:"
"54813-00000A24802B33B71A91B6E990038C145A4815A45C71E57B2F2EF393183C7E2C",
"a1vAAM666kPsMCFTbkgkcCsqHf8nmXR+Lh3D3u+BaXzmArj7kwlItbdGUs4fc9QUG5Lp4TwPS7nhOM5t1Kt6CA==")
self.assertEqual(headv0.api.public, "")
self.assertEqual(headv0.api.private, "")
self.assertEqual(headv0.head.version, 0)
self.assertEqual(headv0.pubkey, "3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj")
self.assertEqual(headv0.blockstamp, "54813-00000A24802B33B71A91B6E990038C145A4815A45C71E57B2F2EF393183C7E2C")
def test_ws2p_headv1(self):
headv1, _ = HeadV1.from_inline("WS2POCAIC:HEAD:1:HbTqJ1Ts3RhJ8Rx4XkNyh1oSKmoZL1kY5U7t9mKTSjAB:"
"102131-0000066028B991BDFE3FF6DBA84EF519F76B62EA3787BC29D9A05557675B1F16:1152e46e:"
"duniter:1.6.21:1",
"ZGpT8HG4uX5Hc96gqhzIkkELVjGQKDp2/L+7BTFG5ODxGYWd2VX/H+hdZRqf0iUWRNuhxlequ68kkwMaE6ymBw==")
self.assertEquals(headv1.v0.api.public, "IC")
self.assertEqual(headv1.v0.api.private, "OCA")
self.assertEqual(headv1.v0.head.version, 1)
self.assertEqual(headv1.software, "duniter")
self.assertEqual(headv1.software_version, "1.6.21")
self.assertEqual(headv1.pow_prefix, 1)
def test_ws2p_headv2(self):
headv2 = HeadV2.from_inline("WS2POCA:HEAD:2:D3krfq6J9AmfpKnS3gQVYoy7NzGCc61vokteTS8LJ4YH:"
"99393-0000017256006BFA979565F1280488D5831DD66054069E46A3EDEB1AECDBBF13:cb36b021:"
"duniter:1.6.21:1:20:19",
"CgD1vaImPWZUCDFt5HDHUdjCTFcIwW5ndiCx6kXioFLZoz1a4WhCFYXvjI2N8+jEwQdWtf5+yNoHonqBSqirAQ==")
self.assertEqual(headv2.free_member_room, 20)
self.assertEqual(headv2.free_mirror_room, 19)
self.assertEqual(headv2.v1.v0.head.version, 2)
from duniterpy.key import VerifyingKey, SigningKey, ScryptParams
from duniterpy.documents import Peer
from duniterpy.documents.ws2p.heads import *
import unittest
......@@ -23,3 +24,29 @@ BASIC_MERKLED_API testnet.duniter.inso.ovh 80
pubkey = "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU"
verifying_key = VerifyingKey(pubkey)
self.assertTrue(verifying_key.verify_document(peer))
def test_ws2p_headv0(self):
headv0, _ = HeadV0.from_inline("WS2P:HEAD:3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj:"
"54813-00000A24802B33B71A91B6E990038C145A4815A45C71E57B2F2EF393183C7E2C",
"a1vAAM666kPsMCFTbkgkcCsqHf8nmXR+Lh3D3u+BaXzmArj7kwlItbdGUs4fc9QUG5Lp4TwPS7nhOM5t1Kt6CA==")
verifying_key = VerifyingKey(headv0.pubkey)
self.assertTrue(verifying_key.verify_ws2p_head(headv0))
def test_ws2p_headv1(self):
headv1, _ = HeadV1.from_inline("WS2POCAIC:HEAD:1:HbTqJ1Ts3RhJ8Rx4XkNyh1oSKmoZL1kY5U7t9mKTSjAB:"
"102131-0000066028B991BDFE3FF6DBA84EF519F76B62EA3787BC29D9A05557675B1F16:1152e46e:"
"duniter:1.6.21:1",
"ZGpT8HG4uX5Hc96gqhzIkkELVjGQKDp2/L+7BTFG5ODxGYWd2VX/H+hdZRqf0iUWRNuhxlequ68kkwMaE6ymBw==")
verifying_key = VerifyingKey(headv1.pubkey)
self.assertTrue(verifying_key.verify_ws2p_head(headv1))
def test_ws2p_headv2(self):
headv2 = HeadV2.from_inline("WS2POCA:HEAD:2:D3krfq6J9AmfpKnS3gQVYoy7NzGCc61vokteTS8LJ4YH:"
"99393-0000017256006BFA979565F1280488D5831DD66054069E46A3EDEB1AECDBBF13:cb36b021:"
"duniter:1.6.21:1:20:19",
"CgD1vaImPWZUCDFt5HDHUdjCTFcIwW5ndiCx6kXioFLZoz1a4WhCFYXvjI2N8+jEwQdWtf5+yNoHonqBSqirAQ==")
verifying_key = VerifyingKey(headv2.pubkey)
self.assertTrue(verifying_key.verify_ws2p_head(headv2))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment