Skip to content
Snippets Groups Projects
Select Git revision
  • 9e691bb5d93dcbc5973c65204cd69e41bf639844
  • main default protected
  • release/1.1
  • encrypt_comments
  • mnemonic_dewif
  • authors_rules
  • 0.14
  • rtd
  • 1.2.1 protected
  • 1.2.0 protected
  • 1.1.1 protected
  • 1.1.0 protected
  • 1.0.0 protected
  • 1.0.0rc1 protected
  • 1.0.0rc0 protected
  • 1.0.0-rc protected
  • 0.62.0 protected
  • 0.61.0 protected
  • 0.60.1 protected
  • 0.58.1 protected
  • 0.60.0 protected
  • 0.58.0 protected
  • 0.57.0 protected
  • 0.56.0 protected
  • 0.55.1 protected
  • 0.55.0 protected
  • 0.54.3 protected
  • 0.54.2 protected
28 results

load_encrypted_ascii_armor_message.py

Blame
  • webserver.py 1.36 KiB
    import asyncio
    import socket
    from aiohttp import web
    from aiohttp import log
    
    
    # Thanks to aiohttp tests courtesy
    # Here is a nice mocking server
    class WebFunctionalSetupMixin:
    
        def setUp(self):
            self.handler = None
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
    
        def tearDown(self):
            if self.handler:
                self.loop.run_until_complete(self.handler.shutdown())
            try:
                self.loop.stop()
                self.loop.close()
            finally:
                asyncio.set_event_loop(None)
    
        def find_unused_port(self):
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.bind(('127.0.0.1', 0))
            port = s.getsockname()[1]
            s.close()
            return port
    
        async def create_server(self, method, path, handler=None, ssl_ctx=None):
            app = web.Application()
            if handler:
                app.router.add_route(method, path, handler)
    
            port = self.find_unused_port()
            self.handler = app.make_handler(
                keep_alive_on=False,
                access_log=log.access_logger)
            srv = await self.loop.create_server(
                self.handler, '127.0.0.1', port, ssl=ssl_ctx)
            protocol = "https" if ssl_ctx else "http"
            url = "{}://127.0.0.1:{}".format(protocol, port) + path
            self.addCleanup(srv.close)
            return app, srv, port, url