Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
silkaj
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Terraform modules
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
clients
python
silkaj
Commits
b46ff8e9
Commit
b46ff8e9
authored
2 years ago
by
Moul
Browse files
Options
Downloads
Patches
Plain Diff
[mypy]
#163
: Add type annotation on wot
parent
b7a9b7cd
No related branches found
Branches containing commit
No related tags found
Tags containing commit
1 merge request
!214
#163: Introduce mypy
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
silkaj/wot.py
+28
-19
28 additions, 19 deletions
silkaj/wot.py
with
28 additions
and
19 deletions
silkaj/wot.py
+
28
−
19
View file @
b46ff8e9
...
@@ -15,6 +15,7 @@
...
@@ -15,6 +15,7 @@
import
urllib
import
urllib
from
collections
import
OrderedDict
from
collections
import
OrderedDict
from
typing
import
Dict
,
List
,
Tuple
import
click
import
click
from
duniterpy.api.bma
import
blockchain
,
wot
from
duniterpy.api.bma
import
blockchain
,
wot
...
@@ -29,7 +30,9 @@ from silkaj.network_tools import client_instance, exit_on_http_error
...
@@ -29,7 +30,9 @@ from silkaj.network_tools import client_instance, exit_on_http_error
from
silkaj.tui
import
gen_pubkey_checksum
from
silkaj.tui
import
gen_pubkey_checksum
def
get_sent_certifications
(
signed
,
time_first_block
,
params
):
def
get_sent_certifications
(
signed
:
List
,
time_first_block
:
int
,
params
:
Dict
)
->
Tuple
[
List
[
str
],
List
[
str
]]:
sent
=
list
()
sent
=
list
()
expire
=
list
()
expire
=
list
()
if
signed
:
if
signed
:
...
@@ -49,7 +52,7 @@ def get_sent_certifications(signed, time_first_block, params):
...
@@ -49,7 +52,7 @@ def get_sent_certifications(signed, time_first_block, params):
consult the membership status of any given identity
"
,
consult the membership status of any given identity
"
,
)
)
@click.argument
(
"
uid_pubkey
"
)
@click.argument
(
"
uid_pubkey
"
)
def
received_sent_certifications
(
uid_pubkey
)
:
def
received_sent_certifications
(
uid_pubkey
:
str
)
->
None
:
"""
"""
get searched id
get searched id
get id of received and sent certifications
get id of received and sent certifications
...
@@ -61,10 +64,10 @@ def received_sent_certifications(uid_pubkey):
...
@@ -61,10 +64,10 @@ def received_sent_certifications(uid_pubkey):
checked_pubkey
=
is_pubkey_and_check
(
uid_pubkey
)
checked_pubkey
=
is_pubkey_and_check
(
uid_pubkey
)
if
checked_pubkey
:
if
checked_pubkey
:
uid_pubkey
=
checked_pubkey
uid_pubkey
=
str
(
checked_pubkey
)
identity
,
pubkey
,
signed
=
choose_identity
(
uid_pubkey
)
identity
,
pubkey
,
signed
=
choose_identity
(
uid_pubkey
)
certifications
=
OrderedDict
()
certifications
=
OrderedDict
()
# type: OrderedDict
params
=
get_blockchain_parameters
()
params
=
get_blockchain_parameters
()
requirements
=
client
(
wot
.
requirements
,
pubkey
)
requirements
=
client
(
wot
.
requirements
,
pubkey
)
for
req
in
requirements
[
"
identities
"
]:
for
req
in
requirements
[
"
identities
"
]:
...
@@ -99,37 +102,38 @@ sent {nbr_sent_certs}/{params["sigStock"]} certifications:\n\
...
@@ -99,37 +102,38 @@ sent {nbr_sent_certs}/{params["sigStock"]} certifications:\n\
membership_status
(
certifications
,
pubkey
,
req
)
membership_status
(
certifications
,
pubkey
,
req
)
def
cert_written_in_the_blockchain
(
written_certs
,
certifieur
):
def
cert_written_in_the_blockchain
(
written_certs
:
Dict
,
certifieur
:
Dict
):
for
cert
in
written_certs
:
for
cert
in
written_certs
:
if
cert
[
"
from
"
]
==
certifieur
[
"
pubkey
"
]:
if
cert
[
"
from
"
]
==
certifieur
[
"
pubkey
"
]:
return
certifieur
[
"
uids
"
][
0
]
+
"
✔
"
return
certifieur
[
"
uids
"
][
0
]
+
"
✔
"
return
certifieur
[
"
uids
"
][
0
]
return
certifieur
[
"
uids
"
][
0
]
def
membership_status
(
certifications
,
pubkey
,
req
)
:
def
membership_status
(
certifications
:
OrderedDict
,
pubkey
:
str
,
req
:
Dict
)
->
None
:
params
=
get_blockchain_parameters
()
params
=
get_blockchain_parameters
()
if
len
(
certifications
[
"
received
"
])
>=
params
[
"
sigQty
"
]:
if
len
(
certifications
[
"
received
"
])
>=
params
[
"
sigQty
"
]:
date
=
certifications
[
"
received_expire
"
][
date
=
certifications
[
"
received_expire
"
][
len
(
certifications
[
"
received
"
])
-
params
[
"
sigQty
"
]
len
(
certifications
[
"
received
"
])
-
params
[
"
sigQty
"
]
]
]
print
(
f
"
Membership expiration due to certification expirations:
{
date
}
"
)
print
(
f
"
Membership expiration due to certification expirations:
{
date
}
"
)
member
=
wt
.
is_member
(
pubkey
)
member_lookup
=
wt
.
is_member
(
pubkey
)
if
member
:
is_member
=
bool
(
member_lookup
)
member
=
True
print
(
"
member:
"
,
is_member
)
print
(
"
member:
"
,
member
)
if
req
[
"
revoked
"
]:
if
req
[
"
revoked
"
]:
revoke_date
=
from_timestamp
(
req
[
"
revoked_on
"
],
tz
=
"
local
"
).
format
(
DATE
)
revoke_date
=
from_timestamp
(
req
[
"
revoked_on
"
],
tz
=
"
local
"
).
format
(
DATE
)
print
(
f
"
revoked:
{
req
[
'
revoked
'
]
}
\n
revoked on:
{
revoke_date
}
"
)
print
(
f
"
revoked:
{
req
[
'
revoked
'
]
}
\n
revoked on:
{
revoke_date
}
"
)
if
not
member
and
req
[
"
wasMember
"
]:
if
not
is_
member
and
req
[
"
wasMember
"
]:
print
(
"
expired:
"
,
req
[
"
expired
"
],
"
\n
wasMember:
"
,
req
[
"
wasMember
"
])
print
(
"
expired:
"
,
req
[
"
expired
"
],
"
\n
wasMember:
"
,
req
[
"
wasMember
"
])
elif
member
:
elif
is_
member
:
expiration_date
=
now
().
add
(
seconds
=
req
[
"
membershipExpiresIn
"
]).
format
(
DATE
)
expiration_date
=
now
().
add
(
seconds
=
req
[
"
membershipExpiresIn
"
]).
format
(
DATE
)
print
(
f
"
Membership document expiration:
{
expiration_date
}
"
)
print
(
f
"
Membership document expiration:
{
expiration_date
}
"
)
print
(
"
Sentry:
"
,
req
[
"
isSentry
"
])
print
(
"
Sentry:
"
,
req
[
"
isSentry
"
])
print
(
"
outdistanced:
"
,
req
[
"
outdistanced
"
])
print
(
"
outdistanced:
"
,
req
[
"
outdistanced
"
])
def
expiration_date_from_block_id
(
block_id
,
time_first_block
,
params
):
def
expiration_date_from_block_id
(
block_id
:
str
,
time_first_block
:
int
,
params
:
Dict
)
->
str
:
expir_timestamp
=
(
expir_timestamp
=
(
date_approximation
(
block_id
,
time_first_block
,
params
[
"
avgGenTime
"
])
date_approximation
(
block_id
,
time_first_block
,
params
[
"
avgGenTime
"
])
+
params
[
"
sigValidity
"
]
+
params
[
"
sigValidity
"
]
...
@@ -143,10 +147,10 @@ def date_approximation(block_id, time_first_block, avgentime):
...
@@ -143,10 +147,10 @@ def date_approximation(block_id, time_first_block, avgentime):
@click.command
(
"
lookup
"
,
help
=
"
User identifier and public key lookup
"
)
@click.command
(
"
lookup
"
,
help
=
"
User identifier and public key lookup
"
)
@click.argument
(
"
uid_pubkey
"
)
@click.argument
(
"
uid_pubkey
"
)
def
id_pubkey_correspondence
(
uid_pubkey
)
:
def
id_pubkey_correspondence
(
uid_pubkey
:
str
)
->
None
:
checked_pubkey
=
is_pubkey_and_check
(
uid_pubkey
)
checked_pubkey
=
is_pubkey_and_check
(
uid_pubkey
)
if
checked_pubkey
:
if
checked_pubkey
:
uid_pubkey
=
checked_pubkey
uid_pubkey
=
str
(
checked_pubkey
)
try
:
try
:
lookups
=
wt
.
wot_lookup
(
uid_pubkey
)
lookups
=
wt
.
wot_lookup
(
uid_pubkey
)
...
@@ -161,7 +165,7 @@ def id_pubkey_correspondence(uid_pubkey):
...
@@ -161,7 +165,7 @@ def id_pubkey_correspondence(uid_pubkey):
click
.
echo
(
content
)
click
.
echo
(
content
)
def
choose_identity
(
pubkey_uid
)
:
def
choose_identity
(
pubkey_uid
:
str
)
->
Tuple
[
Dict
,
str
,
List
]
:
"""
"""
Get lookup from a pubkey or an uid
Get lookup from a pubkey or an uid
Loop over the double lists: pubkeys, then uids
Loop over the double lists: pubkeys, then uids
...
@@ -175,7 +179,12 @@ def choose_identity(pubkey_uid):
...
@@ -175,7 +179,12 @@ def choose_identity(pubkey_uid):
exit_on_http_error
(
e
,
404
,
f
"
No identity found for
{
pubkey_uid
}
"
)
exit_on_http_error
(
e
,
404
,
f
"
No identity found for
{
pubkey_uid
}
"
)
# Generate table containing the choices
# Generate table containing the choices
identities_choices
=
{
"
id
"
:
[],
"
uid
"
:
[],
"
pubkey
"
:
[],
"
timestamp
"
:
[]}
identities_choices
=
{
"
id
"
:
[],
"
uid
"
:
[],
"
pubkey
"
:
[],
"
timestamp
"
:
[],
}
# type: Dict
for
pubkey_index
,
lookup
in
enumerate
(
lookups
):
for
pubkey_index
,
lookup
in
enumerate
(
lookups
):
for
uid_index
,
identity
in
enumerate
(
lookup
[
"
uids
"
]):
for
uid_index
,
identity
in
enumerate
(
lookup
[
"
uids
"
]):
identities_choices
[
"
id
"
].
append
(
str
(
pubkey_index
)
+
str
(
uid_index
))
identities_choices
[
"
id
"
].
append
(
str
(
pubkey_index
)
+
str
(
uid_index
))
...
@@ -199,8 +208,8 @@ def choose_identity(pubkey_uid):
...
@@ -199,8 +208,8 @@ def choose_identity(pubkey_uid):
while
selected_id
not
in
identities_choices
[
"
id
"
]:
while
selected_id
not
in
identities_choices
[
"
id
"
]:
selected_id
=
click
.
prompt
(
message
)
selected_id
=
click
.
prompt
(
message
)
pubkey_index
=
int
(
selected_id
[:
-
1
])
pubkey_index
=
int
(
str
(
selected_id
)
[:
-
1
])
uid_index
=
int
(
selected_id
[
-
1
:])
uid_index
=
int
(
str
(
selected_id
)
[
-
1
:])
return
(
return
(
lookups
[
pubkey_index
][
"
uids
"
][
uid_index
],
lookups
[
pubkey_index
][
"
uids
"
][
uid_index
],
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment