Skip to content

Commit

Permalink
#3305 process ssl options parsing earlier
Browse files Browse the repository at this point in the history
so we can still have access to the command line and decide which options should be overriden by the options object and which options should be loaded from the per host+port configuration file
  • Loading branch information
totaam committed Jul 8, 2022
1 parent 2417060 commit 306edee
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 72 deletions.
63 changes: 36 additions & 27 deletions xpra/net/socket_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is part of Xpra.
# Copyright (C) 2017-2021 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2017-2022 Antoine Martin <antoine@xpra.org>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

Expand All @@ -26,7 +26,6 @@
DEFAULT_PORT,
)
from xpra.make_thread import start_thread
from xpra.scripts.parsing import do_legacy_bool_parse

#pylint: disable=import-outside-toplevel

Expand Down Expand Up @@ -809,22 +808,21 @@ def mdns_publish(display_name, listen_on, text_dict=None):


SSL_ATTRIBUTES = (
"cert", "key", "ca_certs", "ca_data",
"cert", "key", "ca-certs", "ca-data",
"protocol",
"client_verify_mode", "server_verify_mode", "verify_flags",
"check_hostname", "server_hostname",
"client-verify-mode", "server-verify-mode", "verify-flags",
"check-hostname", "server-hostname",
"options", "ciphers",
)

def get_ssl_attributes(opts, server_side=True, overrides=None):
args = {
"server_side" : server_side,
"server-side" : server_side,
}
for attr in SSL_ATTRIBUTES:
ssl_attr = "ssl_%s" % attr #ie: "ssl_ca_certs"
option = ssl_attr.replace("_", "-") #ie: "ssl-ca-certs"
v = (overrides or {}).get(option)
v = (overrides or {}).get(attr)
if v is None:
ssl_attr = "ssl_%s" % attr.replace("-", "_") #ie: "ssl_ca_certs"
v = getattr(opts, ssl_attr)
args[attr] = v
return args
Expand Down Expand Up @@ -857,6 +855,9 @@ def find_ssl_cert(filename="ssl-cert.pem"):

def ssl_wrap_socket(sock, **kwargs):
context, wrap_kwargs = get_ssl_wrap_socket_context(**kwargs)
from xpra.log import Logger
ssllog = Logger("ssl")
ssllog("ssl_wrap_socket(%s, %s) context=%s, wrap_kwargs=%s", sock, kwargs, context, wrap_kwargs)
return do_wrap_socket(sock, context, **wrap_kwargs)

def log_ssl_info(ssl_sock):
Expand Down Expand Up @@ -952,13 +953,13 @@ def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None
ca_certs = None
elif ca_certs=="auto":
ca_certs = find_ssl_cert("ca-cert.pem")
ssllog(" ca_certs=%s", ca_certs)
ssllog(" ca-certs=%s", ca_certs)
#parse verify-mode:
ssl_cert_reqs = getattr(ssl, "CERT_%s" % verify_mode.upper(), None)
if ssl_cert_reqs is None:
values = [k[len("CERT_"):].lower() for k in dir(ssl) if k.startswith("CERT_")]
raise InitException("invalid ssl-server-verify-mode '%s', must be one of: %s" % (verify_mode, csv(values)))
ssllog(" cert_reqs=%#x", ssl_cert_reqs)
ssllog(" cert-reqs=%#x", ssl_cert_reqs)
#parse protocol:
proto = getattr(ssl, "PROTOCOL_%s" % (protocol.upper().replace("V", "v")), None)
if proto is None:
Expand All @@ -984,7 +985,7 @@ def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None
if v is None:
raise InitException("invalid ssl verify-flag: %s" % x)
ssl_verify_flags |= v
ssllog(" verify_flags=%#x", ssl_verify_flags)
ssllog(" verify-flags=%#x", ssl_verify_flags)
#parse ssl-options as CSV:
ssl_options = 0
for x in options.split(","):
Expand Down Expand Up @@ -1016,17 +1017,18 @@ def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None
except ssl.SSLError as e:
ssllog("load_cert_chain", exc_info=True)
raise InitException("SSL error, failed to load certificate chain: %s" % e) from e
#if not server_side and (check_hostname or (ca_certs and ca_certs.lower()!="default")):
if not server_side:
kwargs["server_hostname"] = server_hostname
if ssl_cert_reqs!=ssl.CERT_NONE:
if server_side:
purpose = ssl.Purpose.CLIENT_AUTH #@UndefinedVariable
else:
purpose = ssl.Purpose.SERVER_AUTH #@UndefinedVariable
context.check_hostname = check_hostname
ssllog(" check_hostname=%s, server_hostname=%s", check_hostname, server_hostname)
if context.check_hostname:
if not server_hostname:
raise InitException("ssl error: check-hostname is set but server-hostname is not")
kwargs["server_hostname"] = server_hostname
if context.check_hostname and not server_hostname:
raise InitException("ssl error: check-hostname is set but server-hostname is not")
ssllog(" load_default_certs(%s)", purpose)
context.load_default_certs(purpose)

Expand Down Expand Up @@ -1107,7 +1109,7 @@ def ssl_retry(e, ssl_ca_certs):
ssllog("ssl_retry: %s not handled here", SSL_VERIFY_CODES.get(verify_code, verify_code))
return None
if not server_hostname:
ssllog("ssl_retry: not server hostname")
ssllog("ssl_retry: no server hostname")
return None
ssllog("ssl_retry: server_hostname=%s, ssl verify_code=%s (%i)",
server_hostname, SSL_VERIFY_CODES.get(verify_code, verify_code), verify_code)
Expand All @@ -1127,7 +1129,7 @@ def confirm(*args):
cert_file = find_ssl_config_file(server_hostname, port, CERT_FILENAME)
if cert_file:
ssllog("retrying with %r", cert_file)
options["ssl_ca_certs"] = cert_file
options["ca-certs"] = cert_file
return options
#download the certificate data
import ssl
Expand All @@ -1144,11 +1146,13 @@ def confirm(*args):
prompt = "Do you want to accept this certificate?"
if not confirm((msg, ), title, prompt):
return None
filename = save_ssl_config_file(server_hostname, port, CERT_FILENAME, cert_data.encode("latin1"))
filename = save_ssl_config_file(server_hostname, port,
CERT_FILENAME, "certificate", cert_data.encode("latin1"))
if not filename:
ssllog.warn("Warning: failed to save certificate data")
return None
options["ssl_ca_certs"] = filename
options["ca-certs"] = filename
save_ssl_options(server_hostname, port, options)
return options
if verify_code in (SSL_VERIFY_WRONG_HOST, SSL_VERIFY_IP_MISMATCH, SSL_VERIFY_HOSTNAME_MISMATCH):
#ask the user if he wants to skip verifying the host
Expand All @@ -1158,7 +1162,7 @@ def confirm(*args):
ssllog("run_pinentry_confirm(..) returned %r", r)
if r:
ssllog.info("retrying without checking the hostname")
options["ssl_check_hostname"] = False
options["check-hostname"] = False
save_ssl_options(server_hostname, port, options)
return options
return None
Expand All @@ -1179,8 +1183,11 @@ def load_ssl_options(server_hostname, port):
if len(parts)!=2:
continue
k, v = parts
if k not in SSL_ATTRIBUTES:
ssllog("Warning: unknown SSL attribute %r in %r", k, f)
continue
#some options use boolean values, convert them back:
if k in ("ssl_check_hostname", ):
if k in ("check-hostname", ):
v = v.lower() in TRUE_OPTIONS
options[k] = v
except OSError as e:
Expand All @@ -1191,8 +1198,10 @@ def load_ssl_options(server_hostname, port):
def save_ssl_options(server_hostname, port=443, options=b""):
from xpra.log import Logger
ssllog = Logger("ssl")
boptions = b"\n".join(("%s=%s" % (k, v)).encode("latin1") for k, v in options.items())
f = save_ssl_config_file(server_hostname, port=port, filename="options", filedata=boptions)
boptions = b"\n".join(("%s=%s" % (k.replace("_", "-"), v)).encode("latin1") for k, v in options.items())
boptions += b"\n"
f = save_ssl_config_file(server_hostname, port,
"options", "configuration options", boptions)
ssllog("save_ssl_options%s saved to %r", (server_hostname, port, options), f)
return f

Expand All @@ -1211,7 +1220,7 @@ def find_ssl_config_file(server_hostname, port=443, filename="cert.pem"):
return f
return None

def save_ssl_config_file(server_hostname, port=443, filename="cert.pem", filedata=b""):
def save_ssl_config_file(server_hostname, port=443, filename="cert.pem", fileinfo="certificate", filedata=b""):
from xpra.log import Logger
ssllog = Logger("ssl")
from xpra.platform.paths import get_ssl_hosts_config_dirs
Expand All @@ -1224,10 +1233,10 @@ def save_ssl_config_file(server_hostname, port=443, filename="cert.pem", filedat
f = os.path.join(d, filename)
with open(f, "wb") as fd:
fd.write(filedata)
ssllog.info("saved SSL certificate to %r", f)
ssllog.info("saved SSL %s to %r", fileinfo, f)
return f
except OSError:
ssllog("failed to save cert data to %r", f, exc_info=True)
ssllog("failed to save SSL %s to %r", fileinfo, f, exc_info=True)
#try to create a host config dir:
for d in host_dirs:
folders = os.path.normpath(d).split(os.sep)
Expand Down
Loading

0 comments on commit 306edee

Please sign in to comment.