From 3427c2577bb674196a59e58cbf5aa91ad92cba7d Mon Sep 17 00:00:00 2001 From: Ray Luo Date: Sat, 12 Aug 2023 12:44:10 -0700 Subject: [PATCH] Escape unsafe query string when rendering to html --- oauth2cli/authcode.py | 29 ++++++++++++++++++++++------- tests/test_authcode.py | 17 +++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/oauth2cli/authcode.py b/oauth2cli/authcode.py index bcef60b8..d2b14613 100644 --- a/oauth2cli/authcode.py +++ b/oauth2cli/authcode.py @@ -15,10 +15,12 @@ try: # Python 3 from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs, urlencode + from html import escape except ImportError: # Fall back to Python 2 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from urlparse import urlparse, parse_qs from urllib import urlencode + from cgi import escape logger = logging.getLogger(__name__) @@ -77,25 +79,37 @@ def _qs2kv(qs): for k, v in qs.items()} +def _is_html(text): + return text.startswith("<") # Good enough for our purpose + + +def _escape(key_value_pairs): + return {k: escape(v) for k, v in key_value_pairs.items()} + + class _AuthCodeHandler(BaseHTTPRequestHandler): def do_GET(self): # For flexibility, we choose to not check self.path matching redirect_uri #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') qs = parse_qs(urlparse(self.path).query) if qs.get('code') or qs.get("error"): # So, it is an auth response - self.server.auth_response = _qs2kv(qs) - logger.debug("Got auth response: %s", self.server.auth_response) + auth_response = _qs2kv(qs) + logger.debug("Got auth response: %s", auth_response) template = (self.server.success_template if "code" in qs else self.server.error_template) - self._send_full_response( - template.safe_substitute(**self.server.auth_response)) + if _is_html(template.template): + safe_data = _escape(auth_response) + else: + safe_data = auth_response + self._send_full_response(template.safe_substitute(**safe_data)) + self.server.auth_response = auth_response # Set it now, after the response is likely sent # NOTE: Don't do self.server.shutdown() here. It'll halt the server. else: self._send_full_response(self.server.welcome_page) def _send_full_response(self, body, is_ok=True): self.send_response(200 if is_ok else 400) - content_type = 'text/html' if body.startswith('<') else 'text/plain' + content_type = 'text/html' if _is_html(body) else 'text/plain' self.send_header('Content-type', content_type) self.end_headers() self.wfile.write(body.encode("utf-8")) @@ -318,6 +332,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize") p.add_argument('client_id', help="The client_id of your application") p.add_argument('--port', type=int, default=0, help="The port in redirect_uri") + p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second") p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri") p.add_argument('--scope', default=None, help="The scope list") args = parser.parse_args() @@ -331,8 +346,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): auth_uri=flow["auth_uri"], welcome_template= "Sign In, or Abort<tag>foo</tag>", + requests.get("http://localhost:{}?error=foo".format( + receiver.get_port())).text, + "Unsafe data in HTML should be escaped", + ))] + receiver.get_auth_response( # Starts server and hang until timeout + timeout=3, + error_template="$error", + ) +