Skip to content
Snippets Groups Projects
Commit cc3fb54e authored by Vincent Texier's avatar Vincent Texier
Browse files

[enh] #140 replace aiohttp requests with urllib and websocket clients

parent f64174b1
No related branches found
No related tags found
1 merge request!125Mnemonic dewif
......@@ -14,15 +14,16 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import json
import logging
from typing import Callable, Union, Any, Optional, Dict
from urllib import request, parse
import aiohttp
import jsonschema
from aiohttp import ClientResponse, ClientSession, ClientWebSocketResponse
from aiohttp.client import _WSRequestContextManager
from websocket import WebSocket
from aiohttp import ClientSession
from urllib3 import HTTPResponse
import duniterpy.api.endpoint as endpoint
from .errors import DuniterError
......@@ -32,6 +33,7 @@ logger = logging.getLogger("duniter")
RESPONSE_JSON = "json"
RESPONSE_TEXT = "text"
RESPONSE_AIOHTTP = "aiohttp"
RESPONSE_HTTP = "http"
# Connection type constants
CONNECTION_TYPE_AIOHTTP = 1
......@@ -79,7 +81,7 @@ def parse_error(text: str) -> dict:
return data
async def parse_response(response: ClientResponse, schema: dict) -> Any:
async def parse_response(response: str, schema: dict) -> Any:
"""
Validate and parse the BMA answer
......@@ -88,52 +90,28 @@ async def parse_response(response: ClientResponse, schema: dict) -> Any:
:return: the json data
"""
try:
data = await response.json()
response.close()
data = json.loads(response)
if schema is not None:
jsonschema.validate(data, schema)
return data
except (TypeError, json.decoder.JSONDecodeError) as e:
except (TypeError, json.decoder.JSONDecodeError) as exception:
raise jsonschema.ValidationError(
"Could not parse json : {0}".format(str(e))
) from e
"Could not parse json : {0}".format(str(exception))
) from exception
class WSConnection:
"""
From the documentation of the aiohttp_library, the web socket connection
await ws_connection = session.ws_connect()
should return a ClientWebSocketResponse object...
https://docs.aiohttp.org/en/stable/client_quickstart.html#websockets
In fact, aiohttp.session.ws_connect() returns a aiohttp.client._WSRequestContextManager instance.
It must be used in a with statement to get the ClientWebSocketResponse instance from it (__aenter__).
At the end of the with statement, aiohttp.client._WSRequestContextManager.__aexit__ is called
and close the ClientWebSocketResponse in it.
await with ws_connection as ws:
await ws.receive_str()
Abstraction layer on websocket library
"""
def __init__(self, connection: _WSRequestContextManager) -> None:
def __init__(self, connection: WebSocket) -> None:
"""
Init WSConnection instance
:param connection: Connection instance of the connection library
:param connection: Connection instance of the websocket library
"""
if not isinstance(connection, _WSRequestContextManager):
raise Exception(
BaseException(
"Only aiohttp.client._WSRequestContextManager class supported"
)
)
self.connection_type = CONNECTION_TYPE_AIOHTTP
self._connection = connection # type: _WSRequestContextManager
self.connection = None # type: Optional[ClientWebSocketResponse]
self.connection = connection
async def send_str(self, data: str) -> None:
"""
......@@ -145,7 +123,7 @@ class WSConnection:
if self.connection is None:
raise Exception("Connection property is empty")
await self.connection.send_str(data)
self.connection.send(data)
return None
async def receive_str(self, timeout: Optional[float] = None) -> str:
......@@ -157,8 +135,9 @@ class WSConnection:
"""
if self.connection is None:
raise Exception("Connection property is empty")
return await self.connection.receive_str(timeout=timeout)
if timeout is not None:
self.connection.settimeout(timeout)
return self.connection.recv()
async def receive_json(self, timeout: Optional[float] = None) -> Any:
"""
......@@ -169,15 +148,9 @@ class WSConnection:
"""
if self.connection is None:
raise Exception("Connection property is empty")
return await self.connection.receive_json(timeout=timeout)
async def init_connection(self):
"""
Mandatory for aiohttp library in order to avoid the usage of the 'with' statement
:return:
"""
self.connection = await self._connection.__aenter__()
if timeout is not None:
self.connection.settimeout(timeout)
return json.loads(self.connection.recv())
async def close(self) -> None:
"""
......@@ -185,8 +158,6 @@ class WSConnection:
:return:
"""
await self._connection.__aexit__(None, None, None)
if self.connection is None:
raise Exception("Connection property is empty")
......@@ -241,104 +212,90 @@ class API:
return url
async def requests_get(self, path: str, **kwargs: Any) -> ClientResponse:
async def request_url(
self,
path: str,
method: str = "GET",
rtype: str = RESPONSE_JSON,
schema: Optional[dict] = None,
json_data: Optional[dict] = None,
bma_errors: bool = False,
**kwargs: Any,
) -> Any:
"""
Requests GET wrapper in order to use API parameters.
Requests wrapper in order to use API parameters.
:param path: the request path
:param method: Http method 'GET' or 'POST'
:param rtype: Response type (optional, default RESPONSE_JSON, can be RESPONSE_TEXT, RESPONSE_HTTP)
:param schema: Json Schema to validate response (optional, default None)
:param json_data: Json data as dict (optional, default None)
:param bma_errors: Set it to True to handle Duniter Error Response (optional, default False)
:return:
"""
logging.debug(
"Request : %s", self.reverse_url(self.connection_handler.http_scheme, path)
)
url = self.reverse_url(self.connection_handler.http_scheme, path)
response = await self.connection_handler.session.get(
url,
params=kwargs,
headers=self.headers,
proxy=self.connection_handler.proxy,
timeout=15,
)
if response.status != 200:
try:
error_data = parse_error(await response.text())
raise DuniterError(error_data)
except (TypeError, jsonschema.ValidationError) as e:
raise ValueError(
"status code != 200 => %d (%s)"
% (response.status, (await response.text()))
) from e
duniter_request = request.Request(url, method=method)
return response
if kwargs:
# urlencoded http form content as bytes
duniter_request.data = parse.urlencode(kwargs).encode("utf-8")
logging.debug("%s : %s, data=%s", method, url, duniter_request.data)
async def requests_post(self, path: str, **kwargs: Any) -> ClientResponse:
"""
Requests POST wrapper in order to use API parameters.
if json_data:
# json content as bytes
duniter_request.data = json.dumps(json_data).encode("utf-8")
logging.debug("%s : %s, data=%s", method, url, duniter_request.data)
:param path: the request path
:return:
"""
if "self_" in kwargs:
kwargs["self"] = kwargs.pop("self_")
# http header to send json body
self.headers["Content-Type"] = "application/json"
logging.debug("POST : %s", kwargs)
response = await self.connection_handler.session.post(
self.reverse_url(self.connection_handler.http_scheme, path),
data=kwargs,
headers=self.headers,
proxy=self.connection_handler.proxy,
timeout=15,
)
if self.headers:
duniter_request.headers = self.headers
if self.connection_handler.proxy:
# proxy host
duniter_request.set_proxy(
self.connection_handler.proxy, self.connection_handler.http_scheme
)
response = request.urlopen(duniter_request, timeout=15) # type: HTTPResponse
if response.status != 200:
try:
error_data = parse_error(await response.text())
raise DuniterError(error_data)
except (TypeError, jsonschema.ValidationError) as e:
raise ValueError(
"status code != 200 => %d (%s)"
% (response.status, (await response.text()))
) from e
content = response.read()
if bma_errors:
try:
error_data = parse_error(content)
raise DuniterError(error_data)
except (TypeError, jsonschema.ValidationError) as exception:
raise ValueError(
"status code != 200 => %d (%s)" % (response.status, content)
) from exception
raise ValueError(
"status code != 200 => %d (%s)" % (response.status, content)
)
return response
# get response content
content = response.read()
response.close()
async def requests(
self,
method: str = "GET",
path: str = "",
data: Optional[dict] = None,
_json: Optional[dict] = None,
) -> ClientResponse:
"""
Generic requests wrapper on aiohttp
# if schema supplied...
if schema is not None:
# validate response
await parse_response(content, schema)
:param method: the request http method
:param path: the path added to endpoint
:param data: data for form POST request
:param _json: json for json POST request
:rtype: aiohttp.ClientResponse
"""
url = self.reverse_url(self.connection_handler.http_scheme, path)
# return the chosen type
result = response # type: Any
if rtype == RESPONSE_TEXT:
result = content
elif rtype == RESPONSE_JSON:
result = json.loads(content)
if data is not None:
logging.debug("%s : %s, data=%s", method, url, data)
elif _json is not None:
logging.debug("%s : %s, json=%s", method, url, _json)
# http header to send json body
self.headers["Content-Type"] = "application/json"
else:
logging.debug("%s : %s", method, url)
response = await self.connection_handler.session.request(
method,
url,
data=data,
json=_json,
headers=self.headers,
proxy=self.connection_handler.proxy,
timeout=15,
)
return response
return result
async def connect_ws(self, path: str) -> WSConnection:
"""
......@@ -354,16 +311,20 @@ class API:
"""
url = self.reverse_url(self.connection_handler.ws_scheme, path)
connection = WSConnection(
self.connection_handler.session.ws_connect(
url, proxy=self.connection_handler.proxy, autoclose=False
)
)
# init aiohttp connection
await connection.init_connection()
ws = WebSocket()
if self.connection_handler.proxy:
proxy_split = ":".split(self.connection_handler.proxy)
if len(proxy_split) == 2:
host = proxy_split[0]
port = proxy_split[1]
else:
host = self.connection_handler.proxy
port = 80
ws.connect(url, http_proxy_host=host, http_proxy_port=port)
else:
ws.connect(url)
return connection
return WSConnection(ws)
class Client:
......@@ -415,7 +376,7 @@ class Client:
:param url_path: Url encoded path following the endpoint
:param params: Url query string parameters dictionary (optional, default None)
:param rtype: Response type (optional, default RESPONSE_JSON)
:param rtype: Response type (optional, default RESPONSE_JSON, can be RESPONSE_TEXT, RESPONSE_HTTP)
:param schema: Json Schema to validate response (optional, default None)
:return:
"""
......@@ -424,22 +385,10 @@ class Client:
client = API(self.endpoint.conn_handler(self.session, self.proxy))
# get aiohttp response
response = await client.requests_get(url_path, **params)
# if schema supplied...
if schema is not None:
# validate response
await parse_response(response, schema)
# return the chosen type
result = response # type: Any
if rtype == RESPONSE_TEXT:
result = await response.text()
elif rtype == RESPONSE_JSON:
result = await response.json()
return result
# get response
return await client.request_url(
url_path, "GET", rtype, schema, bma_errors=True, **params
)
async def post(
self,
......@@ -453,7 +402,7 @@ class Client:
:param url_path: Url encoded path following the endpoint
:param params: Url query string parameters dictionary (optional, default None)
:param rtype: Response type (optional, default RESPONSE_JSON)
:param rtype: Response type (optional, default RESPONSE_JSON, can be RESPONSE_TEXT, RESPONSE_HTTP)
:param schema: Json Schema to validate response (optional, default None)
:return:
"""
......@@ -462,28 +411,15 @@ class Client:
client = API(self.endpoint.conn_handler(self.session, self.proxy))
# get aiohttp response
response = await client.requests_post(url_path, **params)
# if schema supplied...
if schema is not None:
# validate response
await parse_response(response, schema)
# return the chosen type
result = response # type: Any
if rtype == RESPONSE_TEXT:
result = await response.text()
elif rtype == RESPONSE_JSON:
result = await response.json()
return result
# get response
return await client.request_url(
url_path, "POST", rtype, schema, bma_errors=True, **params
)
async def query(
self,
query: str,
variables: Optional[dict] = None,
rtype: str = RESPONSE_JSON,
schema: Optional[dict] = None,
) -> Any:
"""
......@@ -503,24 +439,16 @@ class Client:
client = API(self.endpoint.conn_handler(self.session, self.proxy))
# get aiohttp response
response = await client.requests("POST", _json=payload)
response = await client.request_url(
"", "POST", rtype=RESPONSE_JSON, schema=schema, json_data=payload
)
# if schema supplied...
if schema is not None:
# validate response
await parse_response(response, schema)
# return the chosen type
result = response # type: Any
if rtype == RESPONSE_TEXT or response.status > 399:
result = await response.text()
elif rtype == RESPONSE_JSON:
try:
result = await response.json()
except aiohttp.client_exceptions.ContentTypeError as exception:
logging.error("Response is not a json format: %s", exception)
# return response to debug...
return result
return response
async def connect_ws(self, path: str = "") -> WSConnection:
"""
......
......@@ -16,7 +16,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import asyncio
from duniterpy.api.client import Client, RESPONSE_AIOHTTP
from duniterpy.api.client import Client, RESPONSE_HTTP
from duniterpy.api import bma
# CONFIG #######################################
......@@ -77,7 +77,7 @@ async def main():
# Get the node summary infos (direct REST GET request)
print("\nCall direct get on node/summary")
response = await client.get(
"node/summary", rtype=RESPONSE_AIOHTTP, schema=summary_schema
"node/summary", rtype=RESPONSE_HTTP, schema=summary_schema
)
print(response)
......
import asyncio
import sys
import json
......
......@@ -33,6 +33,7 @@ base58 = "^2.0.0"
libnacl = "^1.7.2"
pyaes = "^1.6.1"
graphql-core = "^3.1.2"
websocket-client = "^0.57"
[tool.poetry.dev-dependencies]
black = "^20.8b1"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment