From 77e4eb4a476906a18657720309f67e6a4946cfd6 Mon Sep 17 00:00:00 2001 From: David Vo Date: Tue, 18 Aug 2020 01:36:15 +1000 Subject: [PATCH] Allow listening on UNIX sockets for HTTP listeners Signed-off-by: David Vo --- changelog.d/8103.feature | 1 + synapse/app/_base.py | 30 +++++++-- synapse/app/generic_worker.py | 75 ++++++++++++--------- synapse/app/homeserver.py | 111 ++++++++++++++++++-------------- synapse/config/server.py | 118 +++++++++++++++++++++++----------- synapse/config/workers.py | 7 +- 6 files changed, 214 insertions(+), 128 deletions(-) create mode 100644 changelog.d/8103.feature diff --git a/changelog.d/8103.feature b/changelog.d/8103.feature new file mode 100644 index 000000000000..e03ba70e648f --- /dev/null +++ b/changelog.d/8103.feature @@ -0,0 +1 @@ +Add support for listening on a named UNIX domain socket for HTTP interfaces. Contributed by David Vo. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 2b2cd795e072..6c489432ef6f 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -28,7 +28,7 @@ import synapse from synapse.app import check_bind_error -from synapse.config.server import ListenerConfig +from synapse.config.server import ListenerConfig, TcpListenerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext from synapse.util.async_helpers import Linearizer @@ -142,24 +142,38 @@ def quit_with_error(error_string: str) -> NoReturn: sys.exit(1) -def listen_metrics(bind_addresses, port): +def listen_metrics(socket_options): """ Start Prometheus metrics server. """ + if not isinstance(socket_options, TcpListenerConfig): + logger.warning( + "Metrics listener only supports TCP, use an HTTP listener instead" + ) + return + from synapse.metrics import RegistryProxy, start_http_server - for host in bind_addresses: + if socket_options.tls: + logger.warning("Ignoring 'tls' option for metrics listener") + + port = socket_options.port + for host in socket_options.bind_addresses: logger.info("Starting metrics listener on %s:%d", host, port) start_http_server(port, addr=host, registry=RegistryProxy) -def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): +def listen_tcp(socket_options, factory, reactor=reactor, backlog=50): """ Create a TCP socket for a port and several addresses Returns: list[twisted.internet.tcp.Port]: listening for TCP connections """ + assert not socket_options.tls + bind_addresses = socket_options.bind_addresses + port = socket_options.port + r = [] for address in bind_addresses: try: @@ -170,15 +184,17 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): return r -def listen_ssl( - bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 -): +def listen_ssl(socket_options, factory, context_factory, reactor=reactor, backlog=50): """ Create an TLS-over-TCP socket for a port and several addresses Returns: list of twisted.internet.tcp.Port listening for TLS connections """ + assert socket_options.tls + bind_addresses = socket_options.bind_addresses + port = socket_options.port + r = [] for address in bind_addresses: try: diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 739b013d4c3a..b5a2e16cd08c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -16,6 +16,7 @@ # limitations under the License. import contextlib import logging +import os import sys from typing import Dict, Iterable, Optional, Set @@ -37,7 +38,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.config.server import ListenerConfig +from synapse.config.server import ListenerConfig, TcpListenerConfig, UnixListenerConfig from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.presence import ( @@ -486,15 +487,8 @@ class GenericWorkerServer(HomeServer): DATASTORE_CLASS = GenericWorkerSlavedStore def _listen_http(self, listener_config: ListenerConfig): - port = listener_config.port - bind_addresses = listener_config.bind_addresses - assert listener_config.http_options is not None - site_tag = listener_config.http_options.tag - if site_tag is None: - site_tag = port - # We always include a health resource. resources = {"/health": HealthResource()} @@ -590,43 +584,60 @@ def _listen_http(self, listener_config: ListenerConfig): root_resource = create_resource_tree(resources, OptionsResource()) - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - reactor=self.get_reactor(), + socket_options = listener_config.socket_options + site_tag = listener_config.http_options.tag + + if isinstance(socket_options, TcpListenerConfig): + port = socket_options.port + if site_tag is None: + site_tag = port + site_type = "http" + else: + assert isinstance(socket_options, UnixListenerConfig) + port = None + socket_path = socket_options.path + if site_tag is None: + site_tag = os.path.basename(socket_path) + site_type = "unix" + + site = SynapseSite( + "synapse.access.%s.%s" % (site_type, site_tag), + site_tag, + listener_config, + root_resource, + self.version_string, ) - logger.info("Synapse worker now listening on port %d", port) + if port is not None: + _base.listen_tcp(socket_options, site, reactor=self.get_reactor()) + logger.info("Synapse worker now listening on port %d", port) + else: + self.get_reactor().listenUNIX(socket_path, site) + logger.info("Synapse worker now listening on socket %s", socket_path) def start_listening(self, listeners: Iterable[ListenerConfig]): for listener in listeners: if listener.type == "http": self._listen_http(listener) elif listener.type == "manhole": - _base.listen_tcp( - listener.bind_addresses, - listener.port, - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) + if not isinstance(listener.socket_options, TcpListenerConfig): + logger.warning("Manhole listener currently only supports TCP") + else: + _base.listen_tcp( + listener.socket_options, + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + ) elif listener.type == "metrics": if not self.get_config().enable_metrics: logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) + "Metrics listener configured, but enable_metrics is not True!" ) else: - _base.listen_metrics(listener.bind_addresses, listener.port) + _base.listen_metrics(listener.socket_options) else: logger.warning("Unsupported listener type: %s", listener.type) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 98d0d14a124b..c6c9f8ae69e1 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -49,7 +49,7 @@ from synapse.app._base import listen_ssl, listen_tcp, quit_with_error from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig -from synapse.config.server import ListenerConfig +from synapse.config.server import ListenerConfig, TcpListenerConfig, UnixListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( @@ -92,12 +92,7 @@ class SynapseHomeServer(HomeServer): DATASTORE_CLASS = DataStore def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig): - port = listener_config.port - bind_addresses = listener_config.bind_addresses - tls = listener_config.tls - site_tag = listener_config.http_options.tag - if site_tag is None: - site_tag = port + assert listener_config.http_options is not None # We always include a health resource. resources = {"/health": HealthResource()} @@ -137,36 +132,46 @@ def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConf root_resource = create_resource_tree(resources, root_resource) - if tls: - ports = listen_ssl( - bind_addresses, - port, - SynapseSite( - "synapse.access.https.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - self.tls_server_context_factory, - reactor=self.get_reactor(), - ) - logger.info("Synapse now listening on TCP port %d (TLS)", port) + socket_options = listener_config.socket_options + site_tag = listener_config.http_options.tag + if isinstance(socket_options, TcpListenerConfig): + port = socket_options.port + if site_tag is None: + site_tag = port + site_type = "https" if socket_options.tls else "http" else: - ports = listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - reactor=self.get_reactor(), - ) - logger.info("Synapse now listening on TCP port %d", port) + assert isinstance(socket_options, UnixListenerConfig) + port = None + if site_tag is None: + site_tag = os.path.basename(socket_options.path) + site_type = "unix" + + site = SynapseSite( + "synapse.access.%s.%s" % (site_type, site_tag), + site_tag, + listener_config, + root_resource, + self.version_string, + ) + + if port is not None: + if socket_options.tls: + ports = listen_ssl( + socket_options, + site, + self.tls_server_context_factory, + reactor=self.get_reactor(), + ) + logger.info("Synapse now listening on TCP port %d (TLS)", port) + + else: + ports = listen_tcp(socket_options, site, reactor=self.get_reactor()) + logger.info("Synapse now listening on TCP port %d", port) + + else: + ports = [self.get_reactor().listenUNIX(socket_options.path, site)] + logger.info("Synapse now listening on UNIX socket %s", socket_options.path) return ports @@ -295,31 +300,37 @@ def start_listening(self, listeners: Iterable[ListenerConfig]): if listener.type == "http": self._listening_services.extend(self._listener_http(config, listener)) elif listener.type == "manhole": - listen_tcp( - listener.bind_addresses, - listener.port, - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) + if not isinstance(listener.socket_options, TcpListenerConfig): + logger.warning("Manhole listener currently only supports TCP") + else: + listen_tcp( + listener.socket_options, + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + ) elif listener.type == "replication": + if not isinstance(listener.socket_options, TcpListenerConfig): + logger.error( + "Replication configured to listen on a UNIX socket," + " but only TCP is supported" + ) + # XXX: should we straight up bail here? + continue services = listen_tcp( - listener.bind_addresses, - listener.port, - ReplicationStreamProtocolFactory(self), + listener.socket_options, ReplicationStreamProtocolFactory(self), ) for s in services: reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) elif listener.type == "metrics": if not self.get_config().enable_metrics: logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) + "Metrics listener configured, but enable_metrics is not True!" ) else: - _base.listen_metrics(listener.bind_addresses, listener.port) + _base.listen_metrics(listener.socket_options) else: # this shouldn't happen, as the listener type should have been checked # during parsing diff --git a/synapse/config/server.py b/synapse/config/server.py index ed66f3eba19f..a5efe12cfd0b 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import os.path import re from textwrap import indent -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Union import attr import yaml @@ -101,14 +101,28 @@ class HttpListenerConfig: @attr.s(frozen=True) -class ListenerConfig: - """Object describing the configuration of a single listener.""" +class TcpListenerConfig: + """Object describing the TCP-specific parts of the config of a listener.""" port = attr.ib(type=int, validator=attr.validators.instance_of(int)) bind_addresses = attr.ib(type=List[str]) - type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES)) tls = attr.ib(type=bool, default=False) + +@attr.s(frozen=True) +class UnixListenerConfig: + """Object describing parts of a listener config for a UNIX named socket.""" + + path = attr.ib(type=str) + + +@attr.s(frozen=True) +class ListenerConfig: + """Object describing the configuration of a single listener.""" + + type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES)) + socket_options = attr.ib(type=Union[TcpListenerConfig, UnixListenerConfig]) + # http_options is only populated if type=http http_options = attr.ib(type=Optional[HttpListenerConfig], default=None) @@ -409,10 +423,11 @@ def read_config(self, config, **kwargs): if config.get("no_tls", False): l2 = [] for listener in self.listeners: - if listener.tls: + socket_options = listener.socket_options + if isinstance(socket_options, TcpListenerConfig) and socket_options.tls: logger.info( "Ignoring TLS-enabled listener on port %i due to no_tls", - listener.port, + socket_options.port, ) else: l2.append(listener) @@ -464,9 +479,9 @@ class LimitRemoteRoomsConfig(object): self.listeners.append( ListenerConfig( - port=bind_port, - bind_addresses=[bind_host], - tls=True, + socket_options=TcpListenerConfig( + port=bind_port, bind_addresses=[bind_host], tls=True, + ), type="http", http_options=http_options, ) @@ -476,9 +491,9 @@ class LimitRemoteRoomsConfig(object): if unsecure_port: self.listeners.append( ListenerConfig( - port=unsecure_port, - bind_addresses=[bind_host], - tls=False, + socket_options=TcpListenerConfig( + port=unsecure_port, bind_addresses=[bind_host], tls=False, + ), type="http", http_options=http_options, ) @@ -488,7 +503,10 @@ class LimitRemoteRoomsConfig(object): if manhole: self.listeners.append( ListenerConfig( - port=manhole, bind_addresses=["127.0.0.1"], type="manhole", + socket_options=TcpListenerConfig( + port=manhole, bind_addresses=["127.0.0.1"] + ), + type="manhole", ) ) @@ -498,8 +516,10 @@ class LimitRemoteRoomsConfig(object): self.listeners.append( ListenerConfig( - port=metrics_port, - bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")], + socket_options=TcpListenerConfig( + port=metrics_port, + bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")], + ), type="http", http_options=HttpListenerConfig( resources=[HttpResourceConfig(names=["metrics"])] @@ -543,7 +563,10 @@ class LimitRemoteRoomsConfig(object): ) # type: set def has_tls_listener(self) -> bool: - return any(listener.tls for listener in self.listeners) + return any( + getattr(listener.socket_options, "tls", False) + for listener in self.listeners + ) def generate_config_section( self, server_name, data_dir_path, open_private_ports, listeners, **kwargs @@ -1080,26 +1103,6 @@ def parse_listener_def(listener: Any) -> ListenerConfig: """parse a listener config from the config file""" listener_type = listener["type"] - port = listener.get("port") - if not isinstance(port, int): - raise ConfigError("Listener configuration is lacking a valid 'port' option") - - tls = listener.get("tls", False) - - bind_addresses = listener.get("bind_addresses", []) - bind_address = listener.get("bind_address") - # if bind_address was specified, add it to the list of addresses - if bind_address: - bind_addresses.append(bind_address) - - # if we still have an empty list of addresses, use the default list - if not bind_addresses: - if listener_type == "metrics": - # the metrics listener doesn't support IPv6 - bind_addresses.append("0.0.0.0") - else: - bind_addresses.extend(DEFAULT_BIND_ADDRESSES) - http_config = None if listener_type == "http": http_config = HttpListenerConfig( @@ -1111,7 +1114,48 @@ def parse_listener_def(listener: Any) -> ListenerConfig: tag=listener.get("tag"), ) - return ListenerConfig(port, bind_addresses, listener_type, tls, http_config) + port = listener.get("port") + path = listener.get("path") + if port is not None: + # TCP listener + if not isinstance(port, int): + raise ConfigError("Non-integer specified for 'port' in listener config") + if path is not None: + raise ConfigError("Both 'path' and 'port' in listener configuration") + + tls = listener.get("tls", False) + + bind_addresses = listener.get("bind_addresses", []) + bind_address = listener.get("bind_address") + # if bind_address was specified, add it to the list of addresses + if bind_address: + bind_addresses.append(bind_address) + + # if we still have an empty list of addresses, use the default list + if not bind_addresses: + if listener_type == "metrics": + # the metrics listener doesn't support IPv6 + bind_addresses.append("0.0.0.0") + else: + bind_addresses.extend(DEFAULT_BIND_ADDRESSES) + + socket_options = TcpListenerConfig( + port, bind_addresses, tls + ) # type: Union[TcpListenerConfig, UnixListenerConfig] + + elif path: + # UNIX socket listener + for key in ("tls", "bind_addresses", "bind_address"): + if key in listener: + raise ConfigError( + "UNIX listener configuration contains '%s' option" % key + ) + socket_options = UnixListenerConfig(path) + + else: + raise ConfigError("Listener configuration is lacking a 'port' or 'path' option") + + return ListenerConfig(listener_type, socket_options, http_config) NO_MORE_WEB_CLIENT_WARNING = """ diff --git a/synapse/config/workers.py b/synapse/config/workers.py index c784a7150897..4b3eec7e38c1 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -16,7 +16,7 @@ import attr from ._base import Config, ConfigError, ShardedWorkerHandlingConfig -from .server import ListenerConfig, parse_listener_def +from .server import ListenerConfig, TcpListenerConfig, parse_listener_def @attr.s @@ -81,7 +81,10 @@ def read_config(self, config, **kwargs): if manhole: self.worker_listeners.append( ListenerConfig( - port=manhole, bind_addresses=["127.0.0.1"], type="manhole", + socket_options=TcpListenerConfig( + port=manhole, bind_addresses=["127.0.0.1"] + ), + type="manhole", ) )