Skip to content
Snippets Groups Projects
Select Git revision
  • patch-26
  • master default protected
  • Vivakvo-master-patch-80805
  • patch-19
  • patch-25
  • patch-24
  • patch-23
  • patch-22
  • patch-21
  • patch-18
  • patch-17
  • Vivakvo-master-patch-99327
  • patch-16
  • patch-15
  • patch-14
  • patch-13
  • patch-12
  • patch-11
  • patch-10
  • patch-9
  • v1.4.1
  • v1.4.0
  • v1.3.11
  • v1.3.10
  • v1.3.9
  • v1.3.8
  • v1.3.7
  • v1.3.6
  • v1.3.5
  • v1.3.4
  • v1.3.3
  • v1.3.2
  • v1.3.1
  • v1.3.0
  • v1.2.10
  • v1.2.9
  • v1.2.8
  • v1.2.7
  • v1.2.6
  • v1.2.5
40 results

development_tutorial-01.md

Blame
  • Forked from clients / Cesium-grp / Cesium
    Source project has a limited visibility.
    client.py 8.01 KiB
    # Authors:
    # Caner Candan <caner@candan.fr>, http://caner.candan.fr
    # Inso <insomniak.fr at gmail.com>
    from typing import Callable
    import json
    import logging
    import aiohttp
    import jsonschema
    from .errors import DuniterError
    import duniterpy.api.endpoint as endpoint
    
    logger = logging.getLogger("duniter")
    
    # Response type constants
    RESPONSE_JSON = 'json'
    RESPONSE_TEXT = 'text'
    RESPONSE_AIOHTTP = 'aiohttp'
    
    # jsonschema validator
    ERROR_SCHEMA = {
        "type": "object",
        "properties": {
            "ucode": {
                "type": "number"
            },
            "message": {
                "type": "string"
            }
        },
        "required": ["ucode", "message"]
    }
    
    
    def parse_text(text, schema):
        """
        Validate and parse the BMA answer from websocket
    
        :param str text: the bma answer
        :param dict schema: dict for jsonschema
        :return: the json data
        """
        try:
            data = json.loads(text)
            jsonschema.validate(data, schema)
            return data
        except (TypeError, json.decoder.JSONDecodeError):
            raise jsonschema.ValidationError("Could not parse json")
    
    
    def parse_error(text):
        """
        Validate and parse the BMA answer from websocket
    
        :param str text: the bma error
        :return: the json data
        """
        try:
            data = json.loads(text)
            jsonschema.validate(data, ERROR_SCHEMA)
            return data
        except (TypeError, json.decoder.JSONDecodeError) as e:
            raise jsonschema.ValidationError("Could not parse json : {0}".format(str(e)))
    
    
    async def parse_response(response, schema):
        """
        Validate and parse the BMA answer
    
        :param aiohttp.ClientResponse response: Response of aiohttp request
        :param dict schema: The expected response structure
        :return: the json data
        """
        try:
            data = await response.json()
            response.close()
            if schema is not None:
                jsonschema.validate(data, schema)
            return data
        except (TypeError, json.decoder.JSONDecodeError) as e:
            raise jsonschema.ValidationError("Could not parse json : {0}".format(str(e)))
    
    
    class API(object):
        """APIRequest is a class used as an interface. The intermediate derivated classes are the modules and the leaf
        classes are the API requests. """
    
        schema = {}
    
        def __init__(self, connection_handler, module):
            """
            Asks a module in order to create the url used then by derivated classes.
    
            :param ConnectionHandler connection_handler: Connection handler
            :param str module: Module path
            """
            self.module = module
            self.connection_handler = connection_handler
            self.headers = {}
    
        def reverse_url(self, scheme, path):
            """
            Reverses the url using scheme and path given in parameter.
    
            :param str scheme: Scheme of the url
            :param str path: Path of the url
            :return: str
            """
    
            server, port = self.connection_handler.server, self.connection_handler.port
            if self.connection_handler.path:
                url = '{scheme}://{server}:{port}/{path}/{module}'.format(scheme=scheme,
                                                                          server=server,
                                                                          port=port,
                                                                          path=path,
                                                                          module=self.module)
            else:
                url = '{scheme}://{server}:{port}/{module}'.format(scheme=scheme,
                                                                   server=server,
                                                                   port=port,
                                                                   module=self.module)
    
            return url + path
    
        async def requests_get(self, path, **kwargs):
            """
            Requests GET wrapper in order to use API parameters.
    
            :param str path: the request path
            :rtype: aiohttp.ClientResponse
            """
            logging.debug("Request : {0}".format(self.reverse_url(self.connection_handler.http_scheme, path)))
            url = self.reverse_url(self.connection_handler.http_scheme, path)
            response = await self.connection_handler.session.get(url, params=kwargs, headers=self.headers,
                                                                 proxy=self.connection_handler.proxy,
                                                                 timeout=15)
            if response.status != 200:
                try:
                    error_data = parse_error(await response.text())
                    raise DuniterError(error_data)
                except (TypeError, jsonschema.ValidationError):
                    raise ValueError('status code != 200 => %d (%s)' % (response.status, (await response.text())))
    
            return response
    
        async def requests_post(self, path, **kwargs):
            """
            Requests POST wrapper in order to use API parameters.
    
            :param str path: the request path
            :rtype: aiohttp.ClientResponse
            """
            if 'self_' in kwargs:
                kwargs['self'] = kwargs.pop('self_')
    
            logging.debug("POST : {0}".format(kwargs))
            response = await self.connection_handler.session.post(
                self.reverse_url(self.connection_handler.http_scheme, path),
                data=kwargs,
                headers=self.headers,
                proxy=self.connection_handler.proxy,
                timeout=15
            )
            return response
    
        def connect_ws(self, path):
            """
            Connect to a websocket in order to use API parameters
    
            :param str path: the url path
            :rtype: aiohttp.ClientWebSocketResponse
            """
            url = self.reverse_url(self.connection_handler.ws_scheme, path)
            return self.connection_handler.session.ws_connect(url, proxy=self.connection_handler.proxy)
    
    
    class Client:
        """
        Main class to create an API client
        """
        def __init__(self, _endpoint: str, session: aiohttp.ClientSession = None, proxy: str = None):
            """
            Init Client instance
    
            :param _endpoint: Endpoint string in duniter format
            :param session: Aiohttp client session (optional, default None)
            :param proxy: Proxy server as hostname:port
            """
            # Endpoint Protocol detection
            self.endpoint = endpoint.endpoint(_endpoint)
    
            # if no user session...
            if session is None:
                # open a session
                self.session = aiohttp.ClientSession()
            else:
                self.session = session
            self.proxy = proxy
    
        async def get(self, url_path: str, params: dict = None, rtype: str = RESPONSE_JSON, schema: dict = None)-> any:
            """
            Get request on self.endpoint + url_path
    
            :param url_path: Url encoded path following the endpoint
            :param params: Url query string parameters dictionary
            :param rtype: Response type
            :param schema: Json Schema to validate response (optional, default None)
            :return:
            """
            if params is None:
                params = dict()
    
            client = API(self.endpoint.conn_handler(self.session, self.proxy), '')
    
            # get aiohttp response
            response = await client.requests_get(url_path, **params)
    
            # if schema supplied...
            if schema is not None:
                # validate response
                await parse_response(response, schema)
    
            # return the chosen type
            if rtype == RESPONSE_AIOHTTP:
                return response
            elif rtype == RESPONSE_TEXT:
                return await response.text()
            elif rtype == RESPONSE_JSON:
                return await response.json()
    
        async def close(self):
            """
            Close aiohttp session
    
            :return:
            """
            await self.session.close()
    
        async def __call__(self, _function: Callable, *args: any, **kwargs: any) -> any:
            """
            Call the _function given with the args given
            So we can have use many packages wrapping a REST API
    
            :param _function: The function to call
            :param args: The parameters
            :param kwargs: The key/value parameters
            :return:
            """
            return await _function(self, *args, **kwargs)