Skip to content

Commit

Permalink
Add ability to run replication protocol over redis. (matrix-org#7040)
Browse files Browse the repository at this point in the history
This is configured via the `redis` config options.
  • Loading branch information
erikjohnston authored and phil-flex committed Jun 16, 2020
1 parent 7abf1e9 commit cbdc853
Show file tree
Hide file tree
Showing 12 changed files with 342 additions and 36 deletions.
1 change: 1 addition & 0 deletions changelog.d/7040.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for running replication over Redis when using workers.
40 changes: 40 additions & 0 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Contains *incomplete* type hints for txredisapi.
"""

from typing import List, Optional, Union

class RedisProtocol:
def publish(self, channel: str, message: bytes): ...

class SubscriberProtocol:
def subscribe(self, channels: Union[str, List[str]]): ...

def lazyConnection(
host: str = ...,
port: int = ...,
dbid: Optional[int] = ...,
reconnect: bool = ...,
charset: str = ...,
password: Optional[str] = ...,
connectTimeout: Optional[int] = ...,
replyTimeout: Optional[int] = ...,
convertNumbers: bool = ...,
) -> RedisProtocol: ...

class SubscriberFactory:
def buildProtocol(self, addr): ...
6 changes: 6 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ def _configure_named_resource(self, name, compress=False):
def start_listening(self, listeners):
config = self.get_config()

if config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)

for listener in listeners:
if listener["type"] == "http":
self._listening_services.extend(self._listener_http(config, listener))
Expand Down
2 changes: 2 additions & 0 deletions synapse/config/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room_directory import RoomDirectoryConfig
Expand Down Expand Up @@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig,
ThirdPartyRulesConfig,
TracerConfig,
RedisConfig,
]
35 changes: 35 additions & 0 deletions synapse/config/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.config._base import Config
from synapse.python_dependencies import check_requirements


class RedisConfig(Config):
section = "redis"

def read_config(self, config, **kwargs):
redis_config = config.get("redis", {})
self.redis_enabled = redis_config.get("enabled", False)

if not self.redis_enabled:
return

check_requirements("redis")

self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_dbid = redis_config.get("dbid")
self.redis_password = redis_config.get("password")
1 change: 1 addition & 0 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"],
"redis": ["txredisapi>=1.4.7"],
}

ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
logger = logging.getLogger(__name__)


class ReplicationClientFactory(ReconnectingClientFactory):
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
"""Factory for building connections to the master. Will reconnect if the
connection is lost.
Expand Down
18 changes: 18 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,3 +454,21 @@ class RemoteServerUpCommand(_SimpleCommand):
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
)


def parse_command_from_line(line: str) -> Command:
"""Parses a command from a received line.
Line should already be stripped of whitespace and be checked if blank.
"""

idx = line.index(" ")
if idx >= 0:
cmd_name = line[:idx]
rest_of_line = line[idx + 1 :]
else:
cmd_name = line
rest_of_line = ""

cmd_cls = COMMAND_MAP[cmd_name]
return cmd_cls.from_line(rest_of_line)
50 changes: 43 additions & 7 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@

from prometheus_client import Counter

from twisted.internet.protocol import ReconnectingClientFactory

from synapse.metrics import LaterGauge
from synapse.replication.tcp.client import ReplicationClientFactory
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
Expand Down Expand Up @@ -92,7 +94,7 @@ def __init__(self, hs):
self._pending_batches = {} # type: Dict[str, List[Any]]

# The factory used to create connections.
self._factory = None # type: Optional[ReplicationClientFactory]
self._factory = None # type: Optional[ReconnectingClientFactory]

# The currently connected connections.
self._connections = [] # type: List[AbstractConnection]
Expand All @@ -119,11 +121,45 @@ def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
self._factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
)
import txredisapi

logger.info(
"Connecting to redis (host=%r port=%r DBID=%r)",
hs.config.redis_host,
hs.config.redis_port,
hs.config.redis_dbid,
)

# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCRIBE is called).

# First create the connection for sending commands.
outbound_redis_connection = txredisapi.lazyConnection(
host=hs.config.redis_host,
port=hs.config.redis_port,
dbid=hs.config.redis_dbid,
password=hs.config.redis.redis_password,
reconnect=True,
)

# Now create the factory/connection for the subscription stream.
self._factory = RedisDirectTcpReplicationClientFactory(
hs, outbound_redis_connection
)
hs.get_reactor().connectTCP(
hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
)
else:
client_name = hs.config.worker_name
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)

async def on_REPLICATE(self, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.
Expand Down
38 changes: 12 additions & 26 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
COMMAND_MAP,
VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS,
Command,
Expand All @@ -72,6 +71,7 @@
PingCommand,
ReplicateCommand,
ServerCommand,
parse_command_from_line,
)
from synapse.types import Collection
from synapse.util import Clock
Expand Down Expand Up @@ -210,38 +210,24 @@ def lineReceived(self, line: bytes):

linestr = line.decode("utf-8")

# split at the first " ", handling one-word commands
idx = linestr.index(" ")
if idx >= 0:
cmd_name = linestr[:idx]
rest_of_line = linestr[idx + 1 :]
else:
cmd_name = linestr
rest_of_line = ""
try:
cmd = parse_command_from_line(linestr)
except Exception as e:
logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
self.send_error("failed to parse line: %r (%r):" % (e, linestr))
return

if cmd_name not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd_name)
self.send_error("invalid command: %s", cmd_name)
if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
self.send_error("invalid command: %s", cmd.NAME)
return

self.last_received_command = self.clock.time_msec()

self.inbound_commands_counter[cmd_name] = (
self.inbound_commands_counter[cmd_name] + 1
self.inbound_commands_counter[cmd.NAME] = (
self.inbound_commands_counter[cmd.NAME] + 1
)

cmd_cls = COMMAND_MAP[cmd_name]
try:
cmd = cmd_cls.from_line(rest_of_line)
except Exception as e:
logger.exception(
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
)
self.send_error(
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
)
return

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
Expand Down
Loading

0 comments on commit cbdc853

Please sign in to comment.