Select Git revision
runtime-errors.md
-
* update docs (was not done in 277) * update metadata after rebase * uncomment commands * separate weights
* update docs (was not done in 277) * update metadata after rebase * uncomment commands * separate weights
webserver.py 1.37 KiB
import asyncio
import socket
from aiohttp import web
from aiohttp import log
# Thanks to aiohttp tests courtesy
# Here is a nice mocking server
class WebFunctionalSetupMixin:
def setUp(self):
self.handler = None
self.runner = None
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
if self.handler:
self.loop.run_until_complete(self.handler.shutdown())
if self.runner:
self.loop.run_until_complete(self.runner.cleanup())
try:
self.loop.stop()
self.loop.close()
finally:
asyncio.set_event_loop(None)
def find_unused_port(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 0))
port = s.getsockname()[1]
s.close()
return port
async def create_server(self, method, path, handler=None, ssl_ctx=None):
app = web.Application()
if handler:
app.router.add_route(method, path, handler)
port = self.find_unused_port()
self.runner = web.AppRunner(app)
await self.runner.setup()
site = web.TCPSite(self.runner, '127.0.0.1', port)
await site.start()
protocol = "https" if ssl_ctx else "http"
url = "{}://127.0.0.1:{}".format(protocol, port) + path
return app, port, url