Skip to content
Snippets Groups Projects
Commit b0933c6b authored by Moul's avatar Moul
Browse files

[ref] #409: Improve endpoint determination code layer

Implement determine_endpoint() to replace Endpoint()
It handles custom, default g1 and gtest endpoints
Read custom endpoints from a regex
Handle host/ipv4 mixup fix with ipaddress
ipv6 support with brakets

Rename peer to endpoint:
option --peer/-p −> --endpoint/-ep

import sys and silkaj.constants directly

write tests

Kept the ability of not having click installed
- #354 The tests are difficult to write
Actually, I am not sure if this feature is easy use.
Using Silkaj as lib is kind of difficult since click is
imported in almost every modules
parent 8e735c07
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,7 @@ from duniterpy.documents import Block ...@@ -23,7 +23,7 @@ from duniterpy.documents import Block
from duniterpy.key.verifying_key import VerifyingKey from duniterpy.key.verifying_key import VerifyingKey
from silkaj.constants import BMA_MAX_BLOCKS_CHUNK_SIZE from silkaj.constants import BMA_MAX_BLOCKS_CHUNK_SIZE
from silkaj.network_tools import EndPoint from silkaj.network_tools import determine_endpoint
from silkaj.tools import message_exit from silkaj.tools import message_exit
...@@ -36,7 +36,7 @@ If nothing specified, the whole blockchain gets verified.", ...@@ -36,7 +36,7 @@ If nothing specified, the whole blockchain gets verified.",
@argument("from_block", default=0, type=INT) @argument("from_block", default=0, type=INT)
@argument("to_block", default=0, type=INT) @argument("to_block", default=0, type=INT)
def verify_blocks_signatures(from_block, to_block): def verify_blocks_signatures(from_block, to_block):
client = Client(EndPoint().BMA_ENDPOINT) client = Client(determine_endpoint())
to_block = check_passed_blocks_range(client, from_block, to_block) to_block = check_passed_blocks_range(client, from_block, to_block)
invalid_blocks_signatures = list() invalid_blocks_signatures = list()
chunks_from = range(from_block, to_block + 1, BMA_MAX_BLOCKS_CHUNK_SIZE) chunks_from = range(from_block, to_block + 1, BMA_MAX_BLOCKS_CHUNK_SIZE)
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
import sys import sys
from click import group, help_option, option, pass_context, version_option from click import group, help_option, option, pass_context, version_option
from duniterpy.api.endpoint import endpoint
from silkaj.auth import generate_auth_file from silkaj.auth import generate_auth_file
from silkaj.blocks import verify_blocks_signatures from silkaj.blocks import verify_blocks_signatures
...@@ -39,22 +40,17 @@ from silkaj.wot import id_pubkey_correspondence, received_sent_certifications ...@@ -39,22 +40,17 @@ from silkaj.wot import id_pubkey_correspondence, received_sent_certifications
@help_option("-h", "--help") @help_option("-h", "--help")
@version_option(SILKAJ_VERSION, "-v", "--version") @version_option(SILKAJ_VERSION, "-v", "--version")
@option( @option(
"--peer", "--endpoint",
"-p", "-ep",
help="Default endpoint to reach Ğ1 currency by its official node {}\ help=f"Default endpoint to reach Ğ1 currency by its official node {endpoint(G1_DEFAULT_ENDPOINT).host}\
This option allows to specify a custom endpoint as follow: <host>:<port>.\ This option allows to specify a custom endpoint as follow: <host>:<port>.\
In case no port is specified, it defaults to 443.".format( In case no port is specified, it defaults to 443.",
":".join(G1_DEFAULT_ENDPOINT)
),
) )
@option( @option(
"--gtest", "--gtest",
"-gt", "-gt",
is_flag=True, is_flag=True,
help="Default endpoint to reach ĞTest currency by its official node: {}\ help=f"Default endpoint to reach ĞTest currency by its official node: {endpoint(G1_TEST_DEFAULT_ENDPOINT).host}",
".format(
":".join(G1_TEST_DEFAULT_ENDPOINT)
),
) )
@option( @option(
"--auth-scrypt", "--auth-scrypt",
...@@ -93,7 +89,7 @@ Do not send the document, but display it instead", ...@@ -93,7 +89,7 @@ Do not send the document, but display it instead",
@pass_context @pass_context
def cli( def cli(
ctx, ctx,
peer, endpoint,
gtest, gtest,
auth_scrypt, auth_scrypt,
nrp, nrp,
...@@ -109,7 +105,7 @@ def cli( ...@@ -109,7 +105,7 @@ def cli(
ctx.obj = dict() ctx.obj = dict()
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj["PEER"] = peer ctx.obj["ENDPOINT"] = endpoint
ctx.obj["GTEST"] = gtest ctx.obj["GTEST"] = gtest
ctx.obj["AUTH_SCRYPT"] = auth_scrypt ctx.obj["AUTH_SCRYPT"] = auth_scrypt
ctx.obj["AUTH_SCRYPT_PARAMS"] = nrp ctx.obj["AUTH_SCRYPT_PARAMS"] = nrp
......
...@@ -26,7 +26,7 @@ from tabulate import tabulate ...@@ -26,7 +26,7 @@ from tabulate import tabulate
from silkaj.blockchain_tools import HeadBlock from silkaj.blockchain_tools import HeadBlock
from silkaj.constants import ALL, HOUR from silkaj.constants import ALL, HOUR
from silkaj.network_tools import ClientInstance, EndPoint from silkaj.network_tools import ClientInstance, determine_endpoint
from silkaj.tools import CurrencySymbol from silkaj.tools import CurrencySymbol
from silkaj.wot_tools import identity_of from silkaj.wot_tools import identity_of
...@@ -34,13 +34,13 @@ from silkaj.wot_tools import identity_of ...@@ -34,13 +34,13 @@ from silkaj.wot_tools import identity_of
@command("info", help="Display information about currency") @command("info", help="Display information about currency")
def currency_info(): def currency_info():
head_block = HeadBlock().head_block head_block = HeadBlock().head_block
ep = EndPoint().ep ep = determine_endpoint()
current_time = from_timestamp(head_block["time"], tz="local") current_time = from_timestamp(head_block["time"], tz="local")
mediantime = from_timestamp(head_block["medianTime"], tz="local") mediantime = from_timestamp(head_block["medianTime"], tz="local")
print( print(
"Connected to node:", "Connected to node:",
ep["domain"], ep.host,
ep["port"], ep.port,
"\nCurrent block number:", "\nCurrent block number:",
head_block["number"], head_block["number"],
"\nCurrency name:", "\nCurrency name:",
...@@ -217,17 +217,17 @@ def argos_info(): ...@@ -217,17 +217,17 @@ def argos_info():
currency_symbol = CurrencySymbol().symbol currency_symbol = CurrencySymbol().symbol
print(currency_symbol, "|") print(currency_symbol, "|")
print("---") print("---")
ep = EndPoint().ep ep = determine_endpoint()
if ep["port"] == "443": if ep.port == 443:
href = "href=https://%s/" % ep["domain"] href = f"href=https://{ep.host}/"
else: else:
href = "href=http://{}:{}/".format(ep["domain"], ep["port"]) href = f"href=http://{ep.host}:{ep.port}/"
current_time = from_timestamp(head_block["time"], tz="local") current_time = from_timestamp(head_block["time"], tz="local")
mediantime = from_timestamp(head_block["medianTime"], tz="local") mediantime = from_timestamp(head_block["medianTime"], tz="local")
print( print(
"Connected to node:", "Connected to node:",
ep["domain"], ep.host,
ep["port"], ep.port,
"|", "|",
href, href,
"\nCurrent block number:", "\nCurrent block number:",
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
SILKAJ_VERSION = "0.10.0dev" SILKAJ_VERSION = "0.10.0dev"
G1_SYMBOL = "Ğ1" G1_SYMBOL = "Ğ1"
GTEST_SYMBOL = "ĞTest" GTEST_SYMBOL = "ĞTest"
G1_DEFAULT_ENDPOINT = "g1.duniter.org", "443" G1_DEFAULT_ENDPOINT = "BMAS g1.duniter.org 443"
G1_TEST_DEFAULT_ENDPOINT = "g1-test.duniter.org", "443" G1_TEST_DEFAULT_ENDPOINT = "BMAS g1-test.duniter.org 443"
SUCCESS_EXIT_STATUS = 0 SUCCESS_EXIT_STATUS = 0
FAILURE_EXIT_STATUS = 1 FAILURE_EXIT_STATUS = 1
BMA_MAX_BLOCKS_CHUNK_SIZE = 5000 BMA_MAX_BLOCKS_CHUNK_SIZE = 5000
......
...@@ -13,11 +13,15 @@ ...@@ -13,11 +13,15 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. # along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
import re
import sys
import urllib import urllib
from duniterpy import constants as du_const
from duniterpy.api import endpoint as ep
from duniterpy.api.client import Client from duniterpy.api.client import Client
from silkaj.constants import G1_DEFAULT_ENDPOINT, G1_TEST_DEFAULT_ENDPOINT from silkaj import constants
def singleton(class_): def singleton(class_):
...@@ -31,40 +35,52 @@ def singleton(class_): ...@@ -31,40 +35,52 @@ def singleton(class_):
return getinstance return getinstance
@singleton def determine_endpoint():
class EndPoint: """
def __init__(self): Pass custom endpoint, parse through a regex
ep = dict() {host|ipv4|[ipv6]}:{port}{/path}
^(?:(HOST)|(IPV4|[(IPV6)]))(?::(PORT))?(?:/(PATH))?$
If gtest flag passed, return default gtest endpoint
Else, return g1 default endpoint
"""
regex = f"^(?:(?P<host>{du_const.HOST_REGEX})|(?P<ipv4>{du_const.IPV4_REGEX})|\
(?:\\[(?P<ipv6>{du_const.IPV6_REGEX})\\]))(?::(?P<port>{du_const.PORT_REGEX}))?(?:/(?P<path>{du_const.PATH_REGEX}))?$"
try: try:
from click.globals import get_current_context from click.globals import get_current_context
ctx = get_current_context() ctx = get_current_context()
peer = ctx.obj["PEER"] endpoint = ctx.obj["ENDPOINT"]
gtest = ctx.obj["GTEST"] gtest = ctx.obj["GTEST"]
# To be activated when dropping Python 3.5 except (ModuleNotFoundError, RuntimeError):
# except (ModuleNotFoundError, RuntimeError): endpoint, gtest = None, None
except:
peer, gtest = None, None if endpoint:
if peer: m = re.search(re.compile(regex), endpoint)
if ":" in peer: if not m:
ep["domain"], ep["port"] = peer.rsplit(":", 1) sys.exit(
"Error: Passed endpoint is of wrong format.\n\
Expected format: {host|ipv4|[ipv6]}:{port}{/path}"
)
port = int(m["port"]) if m["port"] else 443
host, ipv4 = ep.fix_host_ipv4_mix_up(m["host"], m["ipv4"])
if port == 443:
return ep.SecuredBMAEndpoint(host, ipv4, m["ipv6"], port, m["path"])
else: else:
ep["domain"], ep["port"] = peer, "443" return ep.BMAEndpoint(host, ipv4, m["ipv6"], port)
elif gtest:
return ep.endpoint(constants.G1_TEST_DEFAULT_ENDPOINT)
else: else:
ep["domain"], ep["port"] = ( return ep.endpoint(constants.G1_DEFAULT_ENDPOINT)
G1_TEST_DEFAULT_ENDPOINT if gtest else G1_DEFAULT_ENDPOINT
)
if ep["domain"].startswith("[") and ep["domain"].endswith("]"):
ep["domain"] = ep["domain"][1:-1]
self.ep = ep
api = "BMAS" if ep["port"] == "443" else "BASIC_MERKLED_API"
self.BMA_ENDPOINT = " ".join([api, ep["domain"], ep["port"]])
@singleton @singleton
class ClientInstance: class ClientInstance:
def __init__(self): def __init__(self):
self.client = Client(EndPoint().BMA_ENDPOINT) self.client = Client(determine_endpoint())
def send_document(bma_path, document): def send_document(bma_path, document):
......
...@@ -16,10 +16,55 @@ ...@@ -16,10 +16,55 @@
import urllib import urllib
from unittest.mock import patch from unittest.mock import patch
import pytest
from duniterpy.api import bma from duniterpy.api import bma
from duniterpy.api import endpoint as du_ep
from silkaj import network_tools from silkaj import constants, network_tools
from silkaj.membership import generate_membership_document from silkaj.membership import generate_membership_document
from tests import helpers
ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
@pytest.mark.parametrize(
"endpoint, host, ipv4, ipv6, port, path",
[
("127.0.0.1", "", "127.0.0.1", None, 443, None),
("127.0.0.1:80", "", "127.0.0.1", None, 80, None),
("127.0.0.1:443", "", "127.0.0.1", None, 443, None),
("127.0.0.1/path", "", "127.0.0.1", None, 443, "path"),
("127.0.0.1:80/path", "", "127.0.0.1", None, 80, "path"),
("domain.tld:80/path", "domain.tld", None, None, 80, "path"),
("localhost:80/path", "localhost", None, None, 80, "path"),
(f"[{ipv6}]", None, None, ipv6, 443, None),
(f"[{ipv6}]/path", None, None, ipv6, 443, "path"),
(f"[{ipv6}]:80/path", None, None, ipv6, 80, "path"),
],
)
def test_determine_endpoint_custom(endpoint, host, ipv4, ipv6, port, path):
helpers.define_click_context(endpoint)
ep = network_tools.determine_endpoint()
assert ep.host == host
assert ep.ipv4 == ipv4
assert ep.ipv6 == ipv6
assert ep.port == port
if isinstance(ep, du_ep.SecuredBMAEndpoint):
assert ep.path == path
@pytest.mark.parametrize(
"gtest, endpoint",
[
(True, constants.G1_TEST_DEFAULT_ENDPOINT),
(False, constants.G1_DEFAULT_ENDPOINT),
],
)
def test_determine_endpoint(gtest, endpoint):
helpers.define_click_context(gtest=gtest)
ep = network_tools.determine_endpoint()
assert ep == du_ep.endpoint(endpoint)
# def test_send_document_success(capsys): # def test_send_document_success(capsys):
# display = capsys.readouterr().out # display = capsys.readouterr().out
......
...@@ -32,7 +32,7 @@ from silkaj.constants import ( ...@@ -32,7 +32,7 @@ from silkaj.constants import (
FAILURE_EXIT_STATUS, FAILURE_EXIT_STATUS,
SUCCESS_EXIT_STATUS, SUCCESS_EXIT_STATUS,
) )
from silkaj.network_tools import EndPoint from silkaj.network_tools import determine_endpoint
G1_INVALID_BLOCK_SIG = 15144 G1_INVALID_BLOCK_SIG = 15144
HEAD_BLOCK = 48000 HEAD_BLOCK = 48000
...@@ -47,7 +47,7 @@ def current(self): ...@@ -47,7 +47,7 @@ def current(self):
) )
def test_check_passed_blocks_range(from_block, to_block, capsys, monkeypatch): def test_check_passed_blocks_range(from_block, to_block, capsys, monkeypatch):
monkeypatch.setattr(bma.blockchain, "current", current) monkeypatch.setattr(bma.blockchain, "current", current)
client = Client(EndPoint().BMA_ENDPOINT) client = Client(determine_endpoint)
# https://medium.com/python-pandemonium/testing-sys-exit-with-pytest-10c6e5f7726f # https://medium.com/python-pandemonium/testing-sys-exit-with-pytest-10c6e5f7726f
with pytest.raises(SystemExit) as pytest_wrapped_e: with pytest.raises(SystemExit) as pytest_wrapped_e:
check_passed_blocks_range(client, from_block, to_block) check_passed_blocks_range(client, from_block, to_block)
...@@ -104,7 +104,7 @@ def test_get_chunk_size(from_block, to_block, chunks_from, chunk_from): ...@@ -104,7 +104,7 @@ def test_get_chunk_size(from_block, to_block, chunks_from, chunk_from):
@pytest.mark.parametrize("chunk_size, chunk_from", [(2, 1), (5, 10)]) @pytest.mark.parametrize("chunk_size, chunk_from", [(2, 1), (5, 10)])
def test_get_chunks(chunk_size, chunk_from): def test_get_chunks(chunk_size, chunk_from):
client = Client(EndPoint().BMA_ENDPOINT) client = Client(determine_endpoint())
chunk = get_chunk(client, chunk_size, chunk_from) chunk = get_chunk(client, chunk_size, chunk_from)
assert chunk[0]["number"] + chunk_size - 1 == chunk[-1]["number"] assert chunk[0]["number"] + chunk_size - 1 == chunk[-1]["number"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment