Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

HTTP Replication Client #15470

Merged
merged 17 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15470.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Create new `Client` for use with HTTP Replication between workers. Contributed by Jason Little.
120 changes: 120 additions & 0 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
from synapse.http.proxyagent import ProxyAgent
from synapse.http.replicationagent import ReplicationAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
Expand Down Expand Up @@ -819,6 +820,125 @@ def __init__(
)


class SimpleReplicationClient(BaseHttpClient):
realtyem marked this conversation as resolved.
Show resolved Hide resolved
"""No frills client for connecting to Replication endpoints.

Uses existing BaseHttpClient methods but replaces the 'agent' used to make the
request with one that supports HTTP and HTTPS.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
realtyem marked this conversation as resolved.
Show resolved Hide resolved
Attributes:
realtyem marked this conversation as resolved.
Show resolved Hide resolved
agent: The custom Twisted Agent used for constructing the connection.
"""

def __init__(
self,
hs: "HomeServer",
):
"""
Args:
hs: The HomeServer instance to pass in
"""
super().__init__(hs)

# Use a pool, but a very small one.
pool = HTTPConnectionPool(self.reactor)
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60

self.agent: IAgent = ReplicationAgent(
hs.get_reactor(),
contextFactory=hs.get_http_client_context_factory(),
pool=pool,
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def request(
clokep marked this conversation as resolved.
Show resolved Hide resolved
self,
method: str,
uri: str,
data: Optional[bytes] = None,
headers: Optional[Headers] = None,
) -> IResponse:
"""
Args:
clokep marked this conversation as resolved.
Show resolved Hide resolved
method: HTTP method to use.
uri: URI to query.
data: Data to send in the request body, if applicable.
headers: Request headers.

Returns:
Response object, once the headers have been read.

Raises:
RequestTimedOutError if the request times out before the headers are read

"""
outgoing_requests_counter.labels(method).inc()
clokep marked this conversation as resolved.
Show resolved Hide resolved

logger.debug("Sending request %s %s", method, uri)

with start_active_span(
"outgoing-replication-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.HTTP_METHOD: method,
tags.HTTP_URL: uri,
},
finish_on_close=True,
):
try:
body_producer = None
if data is not None:
body_producer = QuieterFileBodyProducer(
BytesIO(data),
cooperator=self._cooperator,
)

# Skip the fancy treq stuff, we don't need cookie handling, redirects,
# or buffered response bodies.
method_bytes = method.encode("ascii")
uri_bytes = uri.encode("ascii")

request_deferred = self.agent.request(
method_bytes,
uri_bytes,
headers,
bodyProducer=body_producer,
)

# we use our own timeout mechanism rather than treq's as a workaround
# for https://twistedmatrix.com/trac/ticket/9534.
request_deferred = timeout_deferred(
request_deferred,
60,
self.hs.get_reactor(),
)
realtyem marked this conversation as resolved.
Show resolved Hide resolved

# turn timeouts into RequestTimedOutErrors
request_deferred.addErrback(_timeout_to_request_timed_out_error)

response = await make_deferred_yieldable(request_deferred)
realtyem marked this conversation as resolved.
Show resolved Hide resolved

incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method,
uri,
response.code,
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,
uri,
type(e).__name__,
e.args[0],
)
set_tag(tags.ERROR, True)
set_tag("error_reason", e.args[0])
raise


def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
# The TCP connection has its own timeout (set by the 'connectTimeout' param
Expand Down
170 changes: 170 additions & 0 deletions synapse/http/replicationagent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Optional

from zope.interface import implementer

from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.python.failure import Failure
from twisted.web.client import (
URI,
HTTPConnectionPool,
_AgentBase,
_DeprecatedToCurrentPolicyForHTTPS,
)
from twisted.web.error import SchemeNotSupported
from twisted.web.http_headers import Headers
from twisted.web.iweb import (
IAgent,
IAgentEndpointFactory,
IBodyProducer,
IPolicyForHTTPS,
IResponse,
)

from synapse.types import ISynapseReactor

logger = logging.getLogger(__name__)


@implementer(IAgentEndpointFactory)
class ReplicationEndpointFactory:
"""Connect to a given TCP socket"""

def __init__(
self,
reactor: ISynapseReactor,
context_factory: IPolicyForHTTPS,
) -> None:
self.reactor = reactor
self.context_factory = context_factory

def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
"""
This part of the factory decides what kind of endpoint is being connected to.

Args:
uri: The pre-parsed URI object containing all the uri data

Returns: The correct client endpoint object
"""
if b"http" in uri.scheme:
endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
if uri.scheme == b"https":
endpoint = wrapClientTLS(
self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
)
return endpoint
realtyem marked this conversation as resolved.
Show resolved Hide resolved
else:
raise SchemeNotSupported()
realtyem marked this conversation as resolved.
Show resolved Hide resolved


@implementer(IAgent)
class ReplicationAgent(_AgentBase):
"""
This Agent is solely for the purposes of connecting to Synapse replication
endpoints, and can handle https and http connections. Appropriate comments are
copied from Twisted's Agent Class.
realtyem marked this conversation as resolved.
Show resolved Hide resolved

Attributes:
_endpointFactory: The IAgentEndpointFactory which will
be used to create endpoints for outgoing TCP connections.
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(
self,
reactor: ISynapseReactor,
contextFactory: Optional[IPolicyForHTTPS] = None,
connectTimeout: Optional[float] = None,
bindAddress: Optional[bytes] = None,
pool: Optional[HTTPConnectionPool] = None,
):
"""
Create a ReplicationAgent.

Args:
reactor: A reactor for this Agent to place outgoing connections.
contextFactory: A factory for TLS contexts, to control the
verification parameters of OpenSSL. The default is to use a
BrowserLikePolicyForHTTPS, so unless you have special
requirements you can leave this as-is.
connectTimeout: The amount of time that this Agent will wait
for the peer to accept a connection.
bindAddress: The local address for client sockets to bind to.
pool: An HTTPConnectionPool instance, or None, in which
case a non-persistent HTTPConnectionPool instance will be
created.
"""
if not IPolicyForHTTPS.providedBy(contextFactory):
logger.warning(
f"{contextFactory} was passed as the HTTPS policy for an "
"Agent, but it does not provide IPolicyForHTTPS. Since Twisted 14.0, "
"you must pass a provider of IPolicyForHTTPS.",
)
contextFactory = _DeprecatedToCurrentPolicyForHTTPS(contextFactory)
clokep marked this conversation as resolved.
Show resolved Hide resolved

_AgentBase.__init__(self, reactor, pool)
endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
self._endpointFactory = endpoint_factory

def _getEndpoint(self, uri: URI) -> IStreamClientEndpoint:
"""
Get an endpoint for the given URI, using self._endpointFactory.
uri: The URI of the request.
Returns: An endpoint which can be used to connect to given address.
"""
return self._endpointFactory.endpointForURI(uri)

def request(
self,
method: bytes,
uri: bytes,
headers: Optional[Headers] = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> "defer.Deferred[IResponse]":
"""
Issue a request to the server indicated by the given uri.
An existing connection from the connection pool may be used or a new
one may be created.
Currently, HTTP and HTTPS schemes are supported in uri.

See: twisted.web.iweb.IAgent.request
"""
# This function is overridden in preparation of future work:
# * So as to properly set a key for the pool and
# * to remove an _ensureValidURI() that will be in the way.
realtyem marked this conversation as resolved.
Show resolved Hide resolved
parsedURI = URI.fromBytes(uri)
try:
endpoint = self._getEndpoint(parsedURI)
except SchemeNotSupported:
return defer.fail(Failure())

# This sets the Pool key to be:
# (http(s), <host:ip>)
key = (parsedURI.scheme, parsedURI.netloc)

# _requestWithEndpoint comes from _AgentBase class
return self._requestWithEndpoint(
key,
endpoint,
method,
parsedURI,
headers,
bodyProducer,
parsedURI.originForm,
)
2 changes: 1 addition & 1 deletion synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def make_client(cls, hs: "HomeServer") -> Callable:
the `instance_map` config).
"""
clock = hs.get_clock()
client = hs.get_simple_http_client()
client = hs.get_replication_client()
local_instance_name = hs.get_instance_name()

# The value of these option should match the replication listener settings
Expand Down
13 changes: 12 additions & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
from synapse.http.client import (
InsecureInterceptableContextFactory,
SimpleHttpClient,
SimpleReplicationClient,
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
Expand Down Expand Up @@ -454,6 +458,13 @@ def get_proxied_blacklisted_http_client(self) -> SimpleHttpClient:
use_proxy=True,
)

@cache_in_self
def get_replication_client(self) -> SimpleReplicationClient:
"""
An HTTP client for HTTP replication.
"""
return SimpleReplicationClient(self)

@cache_in_self
def get_federation_http_client(self) -> MatrixFederationHttpClient:
"""
Expand Down
1 change: 1 addition & 0 deletions tests/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def setUp(self) -> None:
"get_macaroon_generator",
"get_instance_name",
"get_simple_http_client",
"get_replication_client",
"hostname",
]
)
Expand Down