Skip to content
Snippets Groups Projects
Select Git revision
  • f01e0313
  • master default protected
  • network/gtest-1000 protected
  • upgradable-multisig
  • runtime/gtest-1000
  • network/gdev-800 protected
  • cgeek/issue-297-cpu
  • gdev-800-tests
  • update-docker-compose-rpc-squid-names
  • fix-252
  • 1000i100-test
  • hugo/tmp-0.9.1
  • network/gdev-803 protected
  • hugo/endpoint-gossip
  • network/gdev-802 protected
  • hugo/distance-precompute
  • network/gdev-900 protected
  • tuxmain/anonymous-tx
  • debug/podman
  • hugo/195-doc
  • hugo/195-graphql-schema
  • gtest-1000-0.11.1 protected
  • gtest-1000-0.11.0 protected
  • gtest-1000 protected
  • gdev-900-0.10.1 protected
  • gdev-900-0.10.0 protected
  • gdev-900-0.9.2 protected
  • gdev-800-0.8.0 protected
  • gdev-900-0.9.1 protected
  • gdev-900-0.9.0 protected
  • gdev-803 protected
  • gdev-802 protected
  • runtime-801 protected
  • gdev-800 protected
  • runtime-800-bis protected
  • runtime-800 protected
  • runtime-800-backup protected
  • runtime-701 protected
  • runtime-700 protected
  • runtime-600 protected
  • runtime-500 protected
41 results

chain_spec.rs

Blame
  • webserver.py 1.84 KiB
    import asyncio
    import socket
    import ssl
    from typing import Tuple, Callable, Optional
    
    from aiohttp import web
    
    
    # Thanks to aiohttp tests courtesy
    # Here is a nice mocking server
    def find_unused_port() -> int:
        """
        Return first free network port
    
        :return:
        """
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(("127.0.0.1", 0))
        port = s.getsockname()[1]
        s.close()
        return port
    
    
    class WebFunctionalSetupMixin:
        def setUp(self) -> None:
            self.handler = None
            self.app = web.Application()
            self.runner = web.AppRunner(self.app)
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
    
        def tearDown(self) -> None:
            if self.handler:
                self.loop.run_until_complete(self.handler.shutdown())
            if self.runner:
                self.loop.run_until_complete(self.runner.cleanup())
            try:
                self.loop.stop()
                self.loop.close()
            finally:
                asyncio.set_event_loop(None)
    
        async def create_server(
            self,
            method: str,
            path: str,
            handler: Optional[Callable] = None,
            ssl_ctx: Optional[ssl.SSLContext] = None,
        ) -> Tuple[web.Application, int, str]:
            """
            Create a web server for tests
    
            :param method: HTTP method type
            :param path: Url path
            :param handler: Callback function
            :param ssl_ctx: SSL context (https is used if not None)
            :return:
            """
            if handler:
                self.app.router.add_route(method, path, handler)
    
            await self.runner.setup()
    
            port = find_unused_port()
            site = web.TCPSite(self.runner, "127.0.0.1", port)
            await site.start()
    
            protocol = "https" if ssl_ctx else "http"
            url = "{}://127.0.0.1:{}".format(protocol, port) + path
            return self.app, port, url