diff --git a/3scale/nginx.conf b/3scale/nginx.conf index 24b1524..b79e231 100644 --- a/3scale/nginx.conf +++ b/3scale/nginx.conf @@ -55,7 +55,7 @@ http { # proxy_set_header Connection $http_connection; # .. but it doesn't: it always adds this header even for plain HTTP requests # https://issues.redhat.com/browse/RHCLOUD-21326 - proxy_set_header Connection "upgrade"; + proxy_set_header Connection "Upgrade"; # Pass ETag header from Cockpit to clients. # See: https://github.com/cockpit-project/cockpit/issues/5239 diff --git a/appservice/Containerfile b/appservice/Containerfile index 14b5ced..5e47b8f 100644 --- a/appservice/Containerfile +++ b/appservice/Containerfile @@ -1,12 +1,8 @@ -FROM debian:bookworm +FROM docker.io/redhat/ubi9-minimal -RUN apt-get update && \ - apt-get install -y python3 python3-redis nginx curl inetutils-ping procps && \ - apt-get clean && \ - rm /var/lib/apt/lists/*dists* - -# allow unprivileged container user to run nginx and change configuration -RUN chmod -R a+rw /etc/nginx/ /var/lib/nginx/ /var/log/nginx/ /run +# iputils and procps-ng are just for debugging; drop for production +RUN microdnf install -y python3-pip iputils procps-ng && microdnf clean all +RUN pip3 install redis starlette httpx websockets uvicorn COPY *.py /usr/local/bin/ diff --git a/appservice/multiplexer.py b/appservice/multiplexer.py index f091d5c..944a15c 100644 --- a/appservice/multiplexer.py +++ b/appservice/multiplexer.py @@ -1,255 +1,289 @@ -import os -import socket -import signal -import subprocess -import uuid -import http -import http.client +import async_timeout +import asyncio import json -import time import logging +import os +import uuid -from multiprocessing import Process -from http.server import BaseHTTPRequestHandler, HTTPServer +import httpx +import redis.exceptions +import redis.asyncio +import uvicorn +import websockets -import redis +from starlette.applications import Starlette +from starlette.background import BackgroundTask +from starlette.concurrency import run_until_first_complete +from starlette.responses import PlainTextResponse, JSONResponse, StreamingResponse +from starlette.websockets import WebSocket import config -logger = logging.getLogger("multiplexer") - API_URL = os.environ['API_URL'] SESSION_INSTANCE_DOMAIN = os.getenv('SESSION_INSTANCE_DOMAIN', '') +PODMAN_SOCKET = '/run/podman/podman.sock' -NGINX_TEMPLATE = """ -daemon off; -worker_processes auto; - -events {{ - worker_connections 1024; -}} - -http {{ - include /etc/nginx/mime.types; - default_type application/octet-stream; - - log_format main '$remote_addr - $remote_user [$time_local] "$request" ' - '$status $body_bytes_sent "$http_referer" ' - '"$http_user_agent" "$http_x_forwarded_for"'; - - access_log /dev/stderr main; - error_log stderr; +# states: wait_target or running +SESSIONS = {} + +REDIS = redis.asyncio.Redis(host=os.environ['REDIS_SERVICE_HOST'], + port=int(os.environ.get('REDIS_SERVICE_PORT', '6379'))) +logger = logging.getLogger('multiplexer') +app = Starlette() + + +@app.route(f'{config.ROUTE_API}/ping') +async def handle_ping(request): + return PlainTextResponse('pong') + + +async def new_session_podman(sessionid): + name = f'session-{sessionid}' + body = { + 'image': 'quay.io/rhn_engineering_mpitt/ws', + 'name': name, + # for local debugging + # 'command': ['sleep', 'infinity'], + # XXX: http://localhost:8080 origin is for directly connecting to appservice, without 3scale + 'command': ['sh', '-exc', + f"mkdir -p /tmp/conf/cockpit; " + f"printf '[Webservice]\nUrlRoot={config.ROUTE_WSS}/sessions/{sessionid}/web\\n" + f"Origins = {API_URL} http://localhost:8080\\n'" + "> /tmp/conf/cockpit/cockpit.conf;" + "export XDG_CONFIG_DIRS=/tmp/conf;" + "exec /usr/libexec/cockpit-ws --for-tls-proxy --local-session=socat-session.sh"], + 'netns': {'nsmode': 'bridge'}, + # deprecated; use this with podman ≥ 4: 'Networks': {'consoledot': {}}, + 'cni_networks': ['consoledot'], + 'user': 'cockpit-wsinstance', + } + + async with httpx.AsyncClient(transport=httpx.AsyncHTTPTransport(uds=PODMAN_SOCKET)) as podman: + response = await podman.post('http://none/v1.12/libpod/containers/create', data=json.dumps(body).encode()) + status = response.status_code + content = response.text + + if status >= 200 and status < 300: + logger.debug('/new: creating container succeeded with %i: %s; starting container', status, content) + response = await podman.post(f'http://none/v1.12/libpod/containers/{name}/start') + status = response.status_code + content = response.text + + return status, content + + +@app.route(f'{config.ROUTE_API}/sessions/new') +async def handle_session_new(request): + sessionid = str(uuid.uuid4()) + assert sessionid not in SESSIONS + + if os.path.exists(PODMAN_SOCKET): + pod_status, content = await new_session_podman(sessionid) + else: + # TODO: support k8s API + raise NotImplementedError('cannot create sessions other than podman') - sendfile on; + if pod_status >= 200 and pod_status < 300: + response = JSONResponse({'id': sessionid}) + await update_session(sessionid, 'wait_target') + else: + response = PlainTextResponse(f'creating session container failed: {content}', status_code=pod_status) - keepalive_timeout 65; + return response - server {{ - listen 8080 default_server; - listen [::]:8080 default_server; - server_name localhost; +@app.route(f'{config.ROUTE_API}/sessions/{{sessionid}}/status') +async def handle_session_status(request): + sessionid = request.path_params['sessionid'] + try: + return PlainTextResponse(SESSIONS[sessionid]) + except KeyError: + return PlainTextResponse('unknown session ID', status_code=404) - {routes} - location {route_control}/ping {{ - proxy_pass http://127.0.0.1:8081; - }} - location {route_control}/sessions/new {{ - proxy_pass http://127.0.0.1:8081; - }} +async def ws_up2down(recv_ws: WebSocket, send_ws: websockets.WebSocketClientProtocol): + while True: + msg = await recv_ws.receive() + if msg['type'] == 'websocket.receive': + data = msg.get('text') or msg.get('bytes') + await send_ws.send(data) + elif msg['type'] == 'websocket.disconnect': + break - location / {{ - return 404 'no route found in multiplexer\r\n'; - }} - }} -}} -""" -PODMAN_SOCKET = '/run/podman/podman.sock' -NGINX_PROC = None -REDIS = redis.Redis(host=os.environ["REDIS_SERVICE_HOST"], port=int(os.environ.get("REDIS_SERVICE_PORT", "6379"))) +async def ws_down2up(recv_ws: websockets.WebSocketClientProtocol, send_ws: WebSocket): + while True: + data = await recv_ws.recv() + if isinstance(data, str): + await send_ws.send_text(data) + else: + await send_ws.send_bytes(data) + + +async def websocket_forward(upstream_ws: WebSocket, target_url: str): + await upstream_ws.accept() + headers = [] + origin = None + for k, v in upstream_ws.scope['headers']: + if k == b'origin': + origin = v.decode() + # XXX: do we need to forward any other headers? + + logging.debug('websocket_forward %s → %s; origin %s', upstream_ws.url.path, target_url, origin) + + downstream_ws = await websockets.connect( + target_url, + subprotocols=upstream_ws.scope['subprotocols'], + origin=origin, + extra_headers=headers, + ) + await run_until_first_complete( + (ws_up2down, {'recv_ws': upstream_ws, 'send_ws': downstream_ws}), + (ws_down2up, {'recv_ws': downstream_ws, 'send_ws': upstream_ws}), + ) + await downstream_ws.close() + + +@app.websocket_route(f'{config.ROUTE_WSS}/sessions/{{sessionid}}/ws') +async def handle_session_id_bridge(ws: WebSocket): + '''reverse-proxy bridge websocket to session pod''' + + sessionid = ws.path_params['sessionid'] + if sessionid not in SESSIONS: + await ws.close(reason='unknown session ID', code=404) + return + + if SESSIONS[sessionid] == 'wait_target': + asyncio.create_task(update_session(sessionid, 'running')) + await websocket_forward(ws, f'ws://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:8080{ws.url.path}') + + +@app.websocket_route(f'{config.ROUTE_WSS}/sessions/{{sessionid}}/web/{{path:path}}') +async def handle_session_id_ws(ws: WebSocket): + '''reverse-proxy cockpit websocket to session pod''' + + sessionid = ws.path_params['sessionid'] + if sessionid not in SESSIONS: + await ws.close(reason='unknown session ID', code=404) + return + await websocket_forward(ws, f'ws://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:9090{ws.url.path}') + + +@app.route(f'{config.ROUTE_WSS}/sessions/{{sessionid}}/web/{{path:path}}', methods=['GET', 'HEAD']) +async def handle_session_id_http(upstream_req): + '''reverse-proxy cockpit HTTP to session pod''' + + sessionid = upstream_req.path_params['sessionid'] + if sessionid not in SESSIONS: + return PlainTextResponse('unknown session ID', status_code=404) + + target_url = f'http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:9090{upstream_req.url.path}' + + client = httpx.AsyncClient() + downstream_req = client.build_request( + method=upstream_req.method, + url=target_url, + headers=upstream_req.headers.items(), + params=upstream_req.query_params, + cookies=upstream_req.cookies, + ) + downstream_response = await client.send(downstream_req, stream=True) + return StreamingResponse( + downstream_response.aiter_raw(), + headers=dict(downstream_response.headers), + background=BackgroundTask(downstream_response.aclose) + ) + + +async def watch_redis(channel): + global SESSIONS + while True: + try: + async with async_timeout.timeout(1): + message = await channel.get_message(ignore_subscribe_messages=True) + if message is not None and message['channel'] == b'sessions': + logger.debug('got redis sessions update: %s', message['data']) + try: + SESSIONS = json.loads(message['data'].decode()) + except json.decoder.JSONDecodeError as e: + logger.warning('invalid JSON, starting without sessions: %s', e) + SESSIONS = {} + + await asyncio.sleep(0.01) + except asyncio.TimeoutError: + pass + + +@app.on_event('startup') +async def init_sessions(): + global SESSIONS + + pubsub = REDIS.pubsub() + # wait for Redis service to be up + for retry in range(10): + try: + await pubsub.subscribe('sessions') + break + except redis.exceptions.ConnectionError as e: + logger.warning('Failed to connect to Redis, retry %i: %s', retry, e) + await asyncio.sleep(retry * retry + 1) + else: + raise RuntimeError('timed out trying to connect to Redis') + asyncio.create_task(watch_redis(pubsub)) -def get_sessions(): - # Add new entry to our sessions - sessions = REDIS.get('sessions') + sessions = await REDIS.get('sessions') if sessions is None: - sessions = {} + SESSIONS = {} else: try: - sessions = json.loads(sessions) + SESSIONS = json.loads(sessions) except json.decoder.JSONDecodeError: - sessions = {} - - return sessions - - -def write_routes(sessions): - routes = "" - for sessionid in sessions: - routes += f""" -location {config.ROUTE_WSS}/sessions/{sessionid}/web {{ - proxy_pass http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:9090; - - # Required to proxy the connection to Cockpit - proxy_set_header Host $host; - proxy_set_header X-Forwarded-Proto $scheme; - - # Required for web sockets to function - proxy_http_version 1.1; - proxy_buffering off; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - - # Pass ETag header from Cockpit to clients. - # See: https://github.com/cockpit-project/cockpit/issues/5239 - gzip off; -}} -location {config.ROUTE_WSS}/sessions/{sessionid}/ws {{ - proxy_pass http://session-{sessionid}{SESSION_INSTANCE_DOMAIN}:8080; - - # Required to proxy the connection to Cockpit - proxy_set_header Host $host; - proxy_set_header X-Forwarded-Proto $scheme; - - # Required for web sockets to function - proxy_http_version 1.1; - proxy_buffering off; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - - # Pass ETag header from Cockpit to clients. - # See: https://github.com/cockpit-project/cockpit/issues/5239 - gzip off; -}}""" - - with open('/etc/nginx/nginx.conf', 'w') as f: - f.write(NGINX_TEMPLATE.format(routes=routes, route_control=config.ROUTE_API)) - - -class ProxyHTTPRequestHandler(BaseHTTPRequestHandler): - def new_session_podman(self, sessionid): - name = f'session-{sessionid}' - connection = http.client.HTTPConnection('localhost') - connection.sock = socket.socket(socket.AF_UNIX) - connection.sock.connect(PODMAN_SOCKET) - body = { - 'image': 'quay.io/rhn_engineering_mpitt/ws', - 'name': name, - # for local debugging - # 'command': ['sleep', 'infinity'], - # XXX: http://localhost:8080 origin is for directly connecting to appservice, without 3scale - 'command': ['sh', '-exc', - f"mkdir -p /tmp/conf/cockpit; " - f"printf '[Webservice]\nUrlRoot={config.ROUTE_WSS}/sessions/{sessionid}/web\\n" - f"Origins = {API_URL} http://localhost:8080\\n'" - "> /tmp/conf/cockpit/cockpit.conf;" - "export XDG_CONFIG_DIRS=/tmp/conf;" - "exec /usr/libexec/cockpit-ws --for-tls-proxy --local-session=socat-session.sh"], - 'netns': {'nsmode': 'bridge'}, - # deprecated; use this with podman ≥ 4: 'Networks': {'consoledot': {}}, - 'cni_networks': ['consoledot'], - 'user': 'cockpit-wsinstance', - } - connection.request('POST', '/v1.12/libpod/containers/create', body=json.dumps(body)) - response = connection.getresponse() - content = response.read() - - if response.status >= 200 and response.status < 300: - logger.debug("/new: creating container result: %i %s", response.status, content.decode()) - connection.request('POST', f'/v1.12/libpod/containers/{name}/start') - response = connection.getresponse() - content = response.read() - - return response, content - - def new_session(self): - sessionid = str(uuid.uuid4()) - - if os.path.exists(PODMAN_SOCKET): - response, content = self.new_session_podman(sessionid) - else: - # TODO: support k8s API - raise NotImplementedError("cannot create sessions other than podman") + SESSIONS = {} - if response.status >= 200 and response.status < 300: - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps({"id": sessionid}).encode()) - else: - self.send_response(response.status) - self.end_headers() - self.wfile.write("creating session container failed: ".encode()) - self.wfile.write(content) - return - - sessions = get_sessions() - sessions[sessionid] = True - - dumped_sessions = json.dumps(sessions) - REDIS.set('sessions', dumped_sessions) - REDIS.publish('sessions', dumped_sessions) - - def ping(self): - self.send_response(200) - self.end_headers() - self.wfile.write(b'pong') - - def do_GET(self): - logger.debug("GET %s", self.path) - if self.path == f"{config.ROUTE_API}/sessions/new": - self.new_session() - elif self.path == f"{config.ROUTE_API}/ping": - self.ping() - else: - self.send_response(404, 'Not found') + logger.debug('initial sessions: %s', SESSIONS) -def watch_redis(): - redis = REDIS.pubsub() - redis.subscribe("sessions") - logger = logging.getLogger("multiplexer/redis") +async def update_session(session_id, status): + global SESSIONS + SESSIONS[session_id] = status + dumped_sessions = json.dumps(SESSIONS) + await REDIS.set('sessions', dumped_sessions) + await REDIS.publish('sessions', dumped_sessions) - while True: - message = redis.get_message() - if message: - logger.debug("got message: %s", message) - sessions = get_sessions() - write_routes(sessions) - os.kill(NGINX_PROC.pid, signal.SIGHUP) - time.sleep(0.01) +# Terrifying hack around broken 3scale Connection: header; see https://issues.redhat.com/browse/RHCLOUD-21326 +# uvicorn's H11Protocol.handle_events() can't be tapped into, so we need to monkey-patch +# h11.Connection.next_event() to deliver non-broken headers; otherwise uvicorn refuses these paths with +# "Unsupported upgrade request". +import h11 # noqa: E402 +h11.Connection.next_event_real = h11.Connection.next_event -def start_nginx(): - proc = subprocess.Popen(['nginx']) - # wait for nginx to start up - connection = http.client.HTTPConnection('localhost:8080') - for _ in range(10): - try: - connection.connect() - break - except OSError: - time.sleep(0.2) - else: - raise TimeoutError('timed out waiting for nginx to start up') - return proc +def hack_h11_con_next_event(self): + res = h11.Connection.next_event_real(self) + if type(res) == h11.Request: + connection_idx = None + has_upgrade = False + for i, (name, value) in enumerate(res.headers): + if name == b'connection' and b'Upgrade' in value: + connection_idx = i + if name == b'upgrade': + has_upgrade = True + if connection_idx is not None and not has_upgrade: + res.headers._full_items[connection_idx] = ( + res.headers._full_items[connection_idx][0], # raw name + res.headers._full_items[connection_idx][1], # normalized name + res.headers._full_items[connection_idx][2].replace(b'Upgrade', b'')) # value + logger.debug('hack_h11_con_next_event on %s: fixing broken Connection: header', res.target) -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - write_routes(get_sessions()) - NGINX_PROC = start_nginx() + return res - # start redis watcher - redis = Process(target=watch_redis) - redis.start() - server_address = ('0.0.0.0', 8081) - httpd = HTTPServer(server_address, ProxyHTTPRequestHandler) - try: - httpd.serve_forever() - except KeyboardInterrupt: - NGINX_PROC.kill() - NGINX_PROC.wait() +h11.Connection.next_event = hack_h11_con_next_event +# End hack + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + uvicorn.run(app, host='0.0.0.0', port=8080) diff --git a/test/test_basic.py b/test/test_basic.py index 4a63425..d147db1 100755 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -99,9 +99,15 @@ def request(self, url, retries=0): def newSession(self): response = self.request(f'{self.api_url}{config.ROUTE_API}/sessions/new') self.assertEqual(response.status, 200) + self.assertEqual(response.getheader('Content-Type'), 'application/json') sessionid = json.load(response)['id'] self.assertIsInstance(sessionid, str) + # inital status + response = self.request(f'{self.api_url}{config.ROUTE_API}/sessions/{sessionid}/status') + self.assertEqual(response.status, 200) + self.assertEqual(response.read(), b'wait_target') + # API URL is on the container host's localhost; translate for the container DNS websocket_url = self.api_url.replace('localhost', 'host.containers.internal').replace('https:', 'wss:') podman = ['podman', 'run', '-d', '--pod', 'webconsoleapp', @@ -112,6 +118,17 @@ def newSession(self): subprocess.check_call(podman + cmd) + # successful bridge connection updates status + for retry in range(10): + response = self.request(f'{self.api_url}{config.ROUTE_API}/sessions/{sessionid}/status') + self.assertEqual(response.status, 200) + status = response.read() + if status == b'running': + break + time.sleep(0.5) + else: + self.fail(f'session status was not updated to running, still at {status}') + return sessionid def checkSession(self, sessionid): @@ -141,6 +158,26 @@ def testSessions(self): # first session still works self.checkSession(s1) + # unknown session ID + with self.assertRaises(urllib.error.HTTPError) as cm: + self.request(f'{self.api_url}{config.ROUTE_API}/sessions/123unknown/status') + self.assertEqual(cm.exception.code, 404) + + # crash container for s2; use --time 0 once we have podman 4.0 everywhere + subprocess.check_call(['podman', 'rm', '--force', f'session-{s2}']) + # first session still works + self.checkSession(s1) + # second session is broken + request = self.get_auth_request(f'{self.api_url}{config.ROUTE_WSS}/sessions/{s2}/web/') + with self.assertRaises(OSError): + urllib.request.urlopen(request, context=self.ssl_3scale, timeout=1) + + # can create a new session + s3 = self.newSession() + self.checkSession(s3) + # first session still works + self.checkSession(s1) + def test3scaleErrors(self): # unauthenticated with self.assertRaises(urllib.error.HTTPError) as cm: