Skip to content
Snippets Groups Projects
Select Git revision
  • main
  • pages protected
  • release/0.12 protected
  • 429_rm_features
  • release/0.11 protected
  • 175_gva_migration
  • i18n
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.2 protected
  • v0.11.1 protected
  • v0.11.0 protected
  • v0.11.0rc0 protected
  • v0.10.0 protected
  • v0.10.0rc1 protected
  • v0.10.0rc0 protected
  • v0.3.0 protected
  • v0.8.1 protected
  • v0.9.0 protected
  • v0.9.0rc protected
  • v0.8.0 protected
  • v0.7.6 protected
  • v0.7.5 protected
  • v0.7.4 protected
  • v0.7.3 protected
  • v0.7.2 protected
  • v0.7.1 protected
27 results

AUTHORS.md

Blame
  • webserver.py 1.79 KiB
    import asyncio
    import socket
    import ssl
    from typing import Tuple, Callable, Optional
    
    from aiohttp import web
    
    
    # Thanks to aiohttp tests courtesy
    # Here is a nice mocking server
    class WebFunctionalSetupMixin:
    
        def setUp(self) -> None:
            self.handler = None
            self.app = web.Application()
            self.runner = web.AppRunner(self.app)
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
    
        def tearDown(self) -> None:
            if self.handler:
                self.loop.run_until_complete(self.handler.shutdown())
            if self.runner:
                self.loop.run_until_complete(self.runner.cleanup())
            try:
                self.loop.stop()
                self.loop.close()
            finally:
                asyncio.set_event_loop(None)
    
        def find_unused_port(self) -> int:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.bind(('127.0.0.1', 0))
            port = s.getsockname()[1]
            s.close()
            return port
    
        async def create_server(self, method: str, path: str, handler: Optional[Callable] = None,
                                ssl_ctx: Optional[ssl.SSLContext] = None) -> Tuple[web.Application, int, str]:
            """
            Create a web server for tests
    
            :param method: HTTP method type
            :param path: Url path
            :param handler: Callback function
            :param ssl_ctx: SSL context (https is used if not None)
            :return:
            """
            if handler:
                self.app.router.add_route(method, path, handler)
    
            await self.runner.setup()
    
            port = self.find_unused_port()
            site = web.TCPSite(self.runner, '127.0.0.1', port)
            await site.start()
    
            protocol = "https" if ssl_ctx else "http"
            url = "{}://127.0.0.1:{}".format(protocol, port) + path
            return self.app, port, url