Skip to content
Snippets Groups Projects
Commit 7283e43d authored by Vincent Texier's avatar Vincent Texier
Browse files

[enh] #91 fix pylint alerts in api.endpoint module

Remove some alerts
parent 04a581ff
No related branches found
No related tags found
1 merge request!65Pylint
...@@ -17,4 +17,4 @@ mypy: ...@@ -17,4 +17,4 @@ mypy:
# check code errors # check code errors
pylint: pylint:
pylint --disable=C --enable=C0121,C0202,C0321 --jobs=0 duniterpy/ pylint --disable=C,R0913,R0903 --enable=C0121,C0202,C0321 --jobs=0 duniterpy/
...@@ -3,7 +3,7 @@ from typing import Any, Optional, TypeVar, Type, Dict ...@@ -3,7 +3,7 @@ from typing import Any, Optional, TypeVar, Type, Dict
from aiohttp import ClientSession from aiohttp import ClientSession
from ..constants import * import duniterpy.constants as constants
from ..documents import MalformedDocumentError from ..documents import MalformedDocumentError
...@@ -137,9 +137,9 @@ class BMAEndpoint(Endpoint): ...@@ -137,9 +137,9 @@ class BMAEndpoint(Endpoint):
API = "BASIC_MERKLED_API" API = "BASIC_MERKLED_API"
re_inline = re.compile( re_inline = re.compile(
'^BASIC_MERKLED_API(?: ({host_regex}))?(?: ({ipv4_regex}))?(?: ({ipv6_regex}))?(?: ([0-9]+))$'.format( '^BASIC_MERKLED_API(?: ({host_regex}))?(?: ({ipv4_regex}))?(?: ({ipv6_regex}))?(?: ([0-9]+))$'.format(
host_regex=HOST_REGEX, host_regex=constants.HOST_REGEX,
ipv4_regex=IPV4_REGEX, ipv4_regex=constants.IPV4_REGEX,
ipv6_regex=IPV6_REGEX)) ipv6_regex=constants.IPV6_REGEX))
def __init__(self, server: str, ipv4: str, ipv6: str, port: int) -> None: def __init__(self, server: str, ipv4: str, ipv6: str, port: int) -> None:
""" """
...@@ -193,11 +193,13 @@ class BMAEndpoint(Endpoint): ...@@ -193,11 +193,13 @@ class BMAEndpoint(Endpoint):
:return: :return:
""" """
if self.server: if self.server:
return ConnectionHandler("http", "ws", self.server, self.port, "", session, proxy) conn_handler = ConnectionHandler("http", "ws", self.server, self.port, "", session, proxy)
elif self.ipv6: elif self.ipv6:
return ConnectionHandler("http", "ws", "[{0}]".format(self.ipv6), self.port, "", session, proxy) conn_handler = ConnectionHandler("http", "ws", "[{0}]".format(self.ipv6), self.port, "", session, proxy)
else:
conn_handler = ConnectionHandler("http", "ws", self.ipv4, self.port, "", session, proxy)
return ConnectionHandler("http", "ws", self.ipv4, self.port, "", session, proxy) return conn_handler
def __str__(self) -> str: def __str__(self) -> str:
return self.inline() return self.inline()
...@@ -220,10 +222,10 @@ class SecuredBMAEndpoint(BMAEndpoint): ...@@ -220,10 +222,10 @@ class SecuredBMAEndpoint(BMAEndpoint):
API = "BMAS" API = "BMAS"
re_inline = re.compile( re_inline = re.compile(
'^BMAS(?: ({host_regex}))?(?: ({ipv4_regex}))?(?: ({ipv6_regex}))? ([0-9]+)(?: ({path_regex}))?$'.format( '^BMAS(?: ({host_regex}))?(?: ({ipv4_regex}))?(?: ({ipv6_regex}))? ([0-9]+)(?: ({path_regex}))?$'.format(
host_regex=HOST_REGEX, host_regex=constants.HOST_REGEX,
ipv4_regex=IPV4_REGEX, ipv4_regex=constants.IPV4_REGEX,
ipv6_regex=IPV6_REGEX, ipv6_regex=constants.IPV6_REGEX,
path_regex=PATH_REGEX)) path_regex=constants.PATH_REGEX))
def __init__(self, server: str, ipv4: str, ipv6: str, port: int, path: str) -> None: def __init__(self, server: str, ipv4: str, ipv6: str, port: int, path: str) -> None:
""" """
...@@ -276,11 +278,14 @@ class SecuredBMAEndpoint(BMAEndpoint): ...@@ -276,11 +278,14 @@ class SecuredBMAEndpoint(BMAEndpoint):
:return: :return:
""" """
if self.server: if self.server:
return ConnectionHandler("https", "wss", self.server, self.port, self.path, session, proxy) conn_handler = ConnectionHandler("https", "wss", self.server, self.port, self.path, session, proxy)
elif self.ipv6: elif self.ipv6:
return ConnectionHandler("https", "wss", "[{0}]".format(self.ipv6), self.port, self.path, session, proxy) conn_handler = ConnectionHandler("https", "wss", "[{0}]".format(self.ipv6), self.port, self.path, session,
proxy)
else:
conn_handler = ConnectionHandler("https", "wss", self.ipv4, self.port, self.path, session, proxy)
return ConnectionHandler("https", "wss", self.ipv4, self.port, self.path, session, proxy) return conn_handler
# required to type hint cls in classmethod # required to type hint cls in classmethod
...@@ -290,12 +295,15 @@ WS2PEndpointType = TypeVar('WS2PEndpointType', bound='WS2PEndpoint') ...@@ -290,12 +295,15 @@ WS2PEndpointType = TypeVar('WS2PEndpointType', bound='WS2PEndpoint')
class WS2PEndpoint(Endpoint): class WS2PEndpoint(Endpoint):
API = "WS2P" API = "WS2P"
re_inline = re.compile( re_inline = re.compile(
'^WS2P ({ws2pid_regex}) ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)?(?: ({path_regex}))?$'.format( '^WS2P ({ws2pid_regex}) ((?:{host_regex})|(?:{ipv4_regex})|(?:{ipv6_regex})) ([0-9]+)?(?: ({path_regex}))?$'
ws2pid_regex=WS2PID_REGEX, .format(
host_regex=HOST_REGEX, ws2pid_regex=constants.WS2PID_REGEX,
ipv4_regex=IPV4_REGEX, host_regex=constants.HOST_REGEX,
ipv6_regex=IPV6_REGEX, ipv4_regex=constants.IPV4_REGEX,
path_regex=PATH_REGEX)) ipv6_regex=constants.IPV6_REGEX,
path_regex=constants.PATH_REGEX
)
)
def __init__(self, ws2pid: str, server: str, port: int, path: str) -> None: def __init__(self, ws2pid: str, server: str, port: int, path: str) -> None:
self.ws2pid = ws2pid self.ws2pid = ws2pid
...@@ -361,8 +369,8 @@ ESCoreEndpointType = TypeVar('ESCoreEndpointType', bound='ESCoreEndpoint') ...@@ -361,8 +369,8 @@ ESCoreEndpointType = TypeVar('ESCoreEndpointType', bound='ESCoreEndpoint')
class ESCoreEndpoint(Endpoint): class ESCoreEndpoint(Endpoint):
API = "ES_CORE_API" API = "ES_CORE_API"
re_inline = re.compile( re_inline = re.compile(
'^ES_CORE_API ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)$'.format(host_regex=HOST_REGEX, '^ES_CORE_API ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)$'.format(host_regex=constants.HOST_REGEX,
ipv4_regex=IPV4_REGEX)) ipv4_regex=constants.IPV4_REGEX))
def __init__(self, server: str, port: int) -> None: def __init__(self, server: str, port: int) -> None:
self.server = server self.server = server
...@@ -421,8 +429,8 @@ ESUserEndpointType = TypeVar('ESUserEndpointType', bound='ESUserEndpoint') ...@@ -421,8 +429,8 @@ ESUserEndpointType = TypeVar('ESUserEndpointType', bound='ESUserEndpoint')
class ESUserEndpoint(Endpoint): class ESUserEndpoint(Endpoint):
API = "ES_USER_API" API = "ES_USER_API"
re_inline = re.compile( re_inline = re.compile(
'^ES_USER_API ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)$'.format(host_regex=HOST_REGEX, '^ES_USER_API ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)$'.format(host_regex=constants.HOST_REGEX,
ipv4_regex=IPV4_REGEX)) ipv4_regex=constants.IPV4_REGEX))
def __init__(self, server: str, port: int) -> None: def __init__(self, server: str, port: int) -> None:
self.server = server self.server = server
...@@ -481,8 +489,8 @@ ESSubscribtionEndpointType = TypeVar('ESSubscribtionEndpointType', bound='ESSubs ...@@ -481,8 +489,8 @@ ESSubscribtionEndpointType = TypeVar('ESSubscribtionEndpointType', bound='ESSubs
class ESSubscribtionEndpoint(Endpoint): class ESSubscribtionEndpoint(Endpoint):
API = "ES_SUBSCRIPTION_API" API = "ES_SUBSCRIPTION_API"
re_inline = re.compile( re_inline = re.compile(
'^ES_SUBSCRIPTION_API ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)$'.format(host_regex=HOST_REGEX, '^ES_SUBSCRIPTION_API ((?:{host_regex})|(?:{ipv4_regex})) ([0-9]+)$'.format(host_regex=constants.HOST_REGEX,
ipv4_regex=IPV4_REGEX)) ipv4_regex=constants.IPV4_REGEX))
def __init__(self, server: str, port: int) -> None: def __init__(self, server: str, port: int) -> None:
self.server = server self.server = server
...@@ -551,12 +559,17 @@ def endpoint(value: Any) -> Any: ...@@ -551,12 +559,17 @@ def endpoint(value: Any) -> Any:
:param value: Endpoint string or subclass :param value: Endpoint string or subclass
:return: :return:
""" """
result = UnknownEndpoint.from_inline(value)
# if Endpoint instance...
if issubclass(type(value), Endpoint): if issubclass(type(value), Endpoint):
return value result = value
# if str...
elif isinstance(value, str): elif isinstance(value, str):
# find Endpoint instance
for api, cls in MANAGED_API.items(): for api, cls in MANAGED_API.items():
if value.startswith(api + " "): if value.startswith(api + " "):
return cls.from_inline(value) result = cls.from_inline(value)
return UnknownEndpoint.from_inline(value)
else: else:
raise TypeError("Cannot convert {0} to endpoint".format(value)) raise TypeError("Cannot convert {0} to endpoint".format(value))
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment