From 9f438801cab896ed248299a04ec5cc0dafed6e13 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 6 Dec 2022 11:43:57 +0000 Subject: [PATCH 01/23] Enable Complement tests for Faster Remote Room Joins on worker-mode --- docker/complement/conf/workers-shared-extra.yaml.j2 | 4 ++-- scripts-dev/complement.sh | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 7e9ec23808c6..62f74d93337a 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -94,10 +94,10 @@ allow_device_name_lookup_over_federation: true experimental_features: # Enable history backfilling support msc2716_enabled: true - {% if not workers_in_use %} + # server-side support for partial state in /send_join responses + msc3706_enabled: true # client-side support for partial state in /send_join responses faster_joins: true - {% endif %} # Filtering /messages by relation type. msc3874_enabled: true # Enable deleting device-specific notification settings stored in account data diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 7c48d8bccb0f..67cd03d4163a 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -190,7 +190,7 @@ fi extra_test_args=() -test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391" +test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,faster_joins" # All environment variables starting with PASS_ will be shared. # (The prefix is stripped off before reaching the container.) @@ -223,12 +223,9 @@ else export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite fi - # We only test faster room joins on monoliths, because they are purposefully - # being developed without worker support to start with. - # - # The tests for importing historical messages (MSC2716) also only pass with monoliths, - # currently. - test_tags="$test_tags,faster_joins,msc2716" + # The tests for importing historical messages (MSC2716) + # only pass with monoliths, currently. + test_tags="$test_tags,msc2716" fi From 58d4f93451ae1c2ec89a02a40eb92cb1e3b1ab0d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 29 Dec 2022 16:30:39 +0000 Subject: [PATCH 02/23] (dangerous) Add an override to allow Complement to use FRRJ under workers --- scripts-dev/complement.sh | 4 ++++ synapse/app/generic_worker.py | 28 +++++++++++++++++++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 67cd03d4163a..becab54134e5 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -209,6 +209,10 @@ if [[ -n "$WORKERS" ]]; then # Workers can only use Postgres as a database. export PASS_SYNAPSE_COMPLEMENT_DATABASE=postgres + # Allow testing Faster Remote Room Joins despite the feature being incomplete. + # Do not use in production, not even temporarily! No support will be provided! + export PASS_SYNAPSE_DANGEROUS_EXPERIMENTAL_I_DO_NOT_CARE_ABOUT_THIS_SERVER_ENABLE_FRRJ_UNDER_WORKERS_ANYWAY=1 + # And provide some more configuration to complement. # It can take quite a while to spin up a worker-mode Synapse for the first diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 8108b1e98f7f..4673c395a0a1 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import os import sys from typing import Dict, List @@ -283,11 +284,28 @@ def start(config_options: List[str]) -> None: ) if config.experimental.faster_joins_enabled: - raise ConfigError( - "You have enabled the experimental `faster_joins` config option, but it is " - "not compatible with worker deployments yet. Please disable `faster_joins` " - "or run Synapse as a single process deployment instead." - ) + # Never enable this flag on a server that is not ephemeral, and especially + # not on a production server! Not even temporarily! + # THIS FLAG IS PROVIDED SOLELY FOR COMPLEMENT TEST COVERAGE TO PREVENT TEST + # REGRESSIONS WHILST FASTER JOINS ARE IN DEVELOPMENT. + # NO SUPPORT WILL BE PROVIDED ONCE THIS OPTION HAS BEEN ENABLED ON A SERVER. + if os.environb.get( + b"SYNAPSE_DANGEROUS_EXPERIMENTAL_I_DO_NOT_CARE_ABOUT_THIS_SERVER_ENABLE_FRRJ_UNDER_WORKERS_ANYWAY" + ): + logger.critical( + "You have enabled the experimental `faster_joins` config option. " + "It is not currently compatible with worker deployments. " + "You have chosen to enable it anyway. " + "Expect severe bugs, including irreversible database corruption " + "and permanent data loss. " + "No support will be offered for this installation of Synapse." + ) + else: + raise ConfigError( + "You have enabled the experimental `faster_joins` config option, but " + "it is not compatible with worker deployments yet. Please disable " + "`faster_joins` or run Synapse as a single process deployment instead." + ) synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage From 29c331d882ba6d4b841f9a47ba65ac4b0cfac31b Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 29 Dec 2022 16:32:31 +0000 Subject: [PATCH 03/23] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/14752.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14752.misc diff --git a/changelog.d/14752.misc b/changelog.d/14752.misc new file mode 100644 index 000000000000..1cd2d4eabf44 --- /dev/null +++ b/changelog.d/14752.misc @@ -0,0 +1 @@ +Enable Complement tests for Faster Remote Room Joins against worker-mode Synapse. Not suitable for production use. \ No newline at end of file From 91faaf641121e8cc0ccd19aecf6857f58af63ce3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Jan 2023 14:24:12 +0000 Subject: [PATCH 04/23] Fix race where we didn't send out replication notification --- synapse/storage/databases/main/state.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index f32cbb2decd8..ba325d390b58 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -95,6 +95,7 @@ def process_replication_rows( for row in rows: assert isinstance(row, UnPartialStatedEventStreamRow) self._get_state_group_for_event.invalidate((row.event_id,)) + self.is_partial_state_event.invalidate((row.event_id,)) super().process_replication_rows(stream_name, instance_name, token, rows) @@ -485,6 +486,7 @@ def _update_state_for_partial_state_event_txn( "rejection_status_changed": rejection_status_changed, }, ) + txn.call_after(self.hs.get_notifier().on_new_replication_data) class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): From 50776af9053ef130390b8411e1f9b518aeb65bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Jan 2023 15:11:31 +0000 Subject: [PATCH 05/23] MORE HACKS --- synapse/replication/tcp/streams/partial_state.py | 2 +- synapse/storage/databases/main/events_worker.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index b5a2ae74b685..84df5e8590ca 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -71,6 +71,6 @@ def __init__(self, hs: "HomeServer"): super().__init__( hs.get_instance_name(), # TODO(faster_joins, multiple writers): we need to account for instance names - current_token_without_instance(store.get_un_partial_stated_events_token), + store.get_un_partial_stated_events_token, store.get_un_partial_stated_events_from_stream, ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d150fa8a943d..2e8ecac6e8ec 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -314,11 +314,12 @@ def get_chain_id_txn(txn: Cursor) -> int: db_conn, "un_partial_stated_event_stream", "stream_id" ) - def get_un_partial_stated_events_token(self) -> int: - # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple - # writers because workers that don't write often will hold all - # readers up. - return self._un_partial_stated_events_stream_id_gen.get_current_token() + def get_un_partial_stated_events_token(self, instance_name: str) -> int: + return ( + self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer( + instance_name + ) + ) async def get_un_partial_stated_events_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -408,6 +409,11 @@ def process_replication_position( self._stream_id_gen.advance(instance_name, token) elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) + elif stream_name == UnPartialStatedEventStream.NAME: + logger.info( + "Advancing %s token to %s", UnPartialStatedEventStream.NAME, token + ) + self._un_partial_stated_events_stream_id_gen.advance(instance_name, token) super().process_replication_position(stream_name, instance_name, token) async def have_censored_event(self, event_id: str) -> bool: From 4d1a4f0771584dde04747df5e829956d868f8644 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 10:59:44 +0000 Subject: [PATCH 06/23] Fix get_un_partial_stated_rooms_token to take instance_name --- synapse/replication/tcp/streams/partial_state.py | 3 +-- synapse/storage/databases/main/room.py | 11 ++++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index 84df5e8590ca..b78eb165215c 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -16,7 +16,6 @@ import attr from synapse.replication.tcp.streams import Stream -from synapse.replication.tcp.streams._base import current_token_without_instance if TYPE_CHECKING: from synapse.server import HomeServer @@ -43,7 +42,7 @@ def __init__(self, hs: "HomeServer"): super().__init__( hs.get_instance_name(), # TODO(faster_joins, multiple writers): we need to account for instance names - current_token_without_instance(store.get_un_partial_stated_rooms_token), + store.get_un_partial_stated_rooms_token, store.get_un_partial_stated_rooms_from_stream, ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 78906a5e1d9e..fb6d3b85815c 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1277,13 +1277,10 @@ async def get_join_event_id_and_device_lists_stream_id_for_partial_state( ) return result["join_event_id"], result["device_lists_stream_id"] - def get_un_partial_stated_rooms_token(self) -> int: - # TODO(faster_joins, multiple writers): This is inappropriate if there - # are multiple writers because workers that don't write often will - # hold all readers up. - # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an - # explanation.) - return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + def get_un_partial_stated_rooms_token(self, instance_name: str) -> int: + return self._un_partial_stated_rooms_stream_id_gen.get_current_token_for_writer( + instance_name + ) async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int From 5b97e4eb078c63f10fd6396002e37d9b0e9856c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 13:58:51 +0000 Subject: [PATCH 07/23] Fix bad merge --- docker/complement/conf/workers-shared-extra.yaml.j2 | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 94fca8add6ec..4cb059ce3906 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -94,14 +94,10 @@ allow_device_name_lookup_over_federation: true experimental_features: # Enable history backfilling support msc2716_enabled: true - # server-side support for partial state in /send_join responses - msc3706_enabled: true # client-side support for partial state in /send_join responses faster_joins: true # Filtering /messages by relation type. msc3874_enabled: true - # Enable support for polls - msc3381_polls_enabled: true # Enable deleting device-specific notification settings stored in account data msc3890_enabled: true # Enable removing account data support From d24b947427a60818db5570ac1849cdc9dfddd262 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 13:59:29 +0000 Subject: [PATCH 08/23] Remove warning --- scripts-dev/complement.sh | 4 ---- synapse/app/generic_worker.py | 25 ------------------------- 2 files changed, 29 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 944655020528..e72d96fd165c 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -209,10 +209,6 @@ if [[ -n "$WORKERS" ]]; then # Workers can only use Postgres as a database. export PASS_SYNAPSE_COMPLEMENT_DATABASE=postgres - # Allow testing Faster Remote Room Joins despite the feature being incomplete. - # Do not use in production, not even temporarily! No support will be provided! - export PASS_SYNAPSE_DANGEROUS_EXPERIMENTAL_I_DO_NOT_CARE_ABOUT_THIS_SERVER_ENABLE_FRRJ_UNDER_WORKERS_ANYWAY=1 - # And provide some more configuration to complement. # It can take quite a while to spin up a worker-mode Synapse for the first diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 4673c395a0a1..946f3a380744 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import os import sys from typing import Dict, List @@ -283,30 +282,6 @@ def start(config_options: List[str]) -> None: "synapse.app.user_dir", ) - if config.experimental.faster_joins_enabled: - # Never enable this flag on a server that is not ephemeral, and especially - # not on a production server! Not even temporarily! - # THIS FLAG IS PROVIDED SOLELY FOR COMPLEMENT TEST COVERAGE TO PREVENT TEST - # REGRESSIONS WHILST FASTER JOINS ARE IN DEVELOPMENT. - # NO SUPPORT WILL BE PROVIDED ONCE THIS OPTION HAS BEEN ENABLED ON A SERVER. - if os.environb.get( - b"SYNAPSE_DANGEROUS_EXPERIMENTAL_I_DO_NOT_CARE_ABOUT_THIS_SERVER_ENABLE_FRRJ_UNDER_WORKERS_ANYWAY" - ): - logger.critical( - "You have enabled the experimental `faster_joins` config option. " - "It is not currently compatible with worker deployments. " - "You have chosen to enable it anyway. " - "Expect severe bugs, including irreversible database corruption " - "and permanent data loss. " - "No support will be offered for this installation of Synapse." - ) - else: - raise ConfigError( - "You have enabled the experimental `faster_joins` config option, but " - "it is not compatible with worker deployments yet. Please disable " - "`faster_joins` or run Synapse as a single process deployment instead." - ) - synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage From aa97b35c9fc5a07b898a9ccd6bcd3e50d1e6e09c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 15:23:38 +0000 Subject: [PATCH 09/23] Correctly advance un_partial_stated_room_stream --- synapse/handlers/federation.py | 8 +++++--- synapse/storage/databases/main/room.py | 9 +++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eca75f1108d1..c7245ed49d24 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1727,14 +1727,16 @@ async def _sync_partial_state_room( logger.info("Clearing partial-state flag for %s", room_id) success = await self.store.clear_partial_state_room(room_id) + + # Poke the notifier so that other workers see the write to + # the un-partial-stated rooms stream. + self._notifier.notify_replication() + if success: logger.info("State resync complete for %s", room_id) self._storage_controllers.state.notify_room_un_partial_stated( room_id ) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index fb6d3b85815c..d431234e2528 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -43,6 +43,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.config.homeserver import HomeServerConfig from synapse.events import EventBase +from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -140,6 +141,13 @@ def __init__( db_conn, "un_partial_stated_room_stream", "stream_id" ) + def process_replication_position( + self, stream_name: str, instance_name: str, token: int + ) -> None: + if stream_name == UnPartialStatedRoomStream.NAME: + self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token) + return super().process_replication_position(stream_name, instance_name, token) + async def store_room( self, room_id: str, @@ -2372,3 +2380,4 @@ def _clear_partial_state_room_txn( WHERE stream_id <= ? """ txn.execute(sql, (device_lists_stream_id,)) + txn.execute(sql, (device_lists_stream_id,)) From a3cddbfbea9af60444b79ddac75299a01de859b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 16:15:49 +0000 Subject: [PATCH 10/23] Fix merge --- docker/complement/conf/workers-shared-extra.yaml.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 4cb059ce3906..63acf86a4619 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -96,8 +96,8 @@ experimental_features: msc2716_enabled: true # client-side support for partial state in /send_join responses faster_joins: true - # Filtering /messages by relation type. - msc3874_enabled: true + # Enable support for polls + msc3381_polls_enabled: true # Enable deleting device-specific notification settings stored in account data msc3890_enabled: true # Enable removing account data support From eb65ad35bc89c0afe53008649ac745d39c9b8c66 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 21:11:52 +0000 Subject: [PATCH 11/23] Add another notify_replication --- synapse/handlers/device.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0640ea79a03d..58180ae2faba 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -974,6 +974,7 @@ def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): self.federation = hs.get_federation_client() self.clock = hs.get_clock() self.device_handler = device_handler + self._notifier = hs.get_notifier() self._remote_edu_linearizer = Linearizer(name="remote_device_list") @@ -1054,6 +1055,7 @@ async def incoming_device_list_update( user_id, device_id, ) + self._notifier.notify_replication() room_ids = await self.store.get_rooms_for_user(user_id) if not room_ids: From 1c7761127643f264c0376a22693c4e6ee45230ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Jan 2023 23:24:30 +0000 Subject: [PATCH 12/23] Fixups --- synapse/storage/databases/main/events_worker.py | 3 --- synapse/storage/databases/main/room.py | 1 - 2 files changed, 4 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 2e8ecac6e8ec..1e048cadbffb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -410,9 +410,6 @@ def process_replication_position( elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) elif stream_name == UnPartialStatedEventStream.NAME: - logger.info( - "Advancing %s token to %s", UnPartialStatedEventStream.NAME, token - ) self._un_partial_stated_events_stream_id_gen.advance(instance_name, token) super().process_replication_position(stream_name, instance_name, token) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index d431234e2528..1e830fa178c5 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2380,4 +2380,3 @@ def _clear_partial_state_room_txn( WHERE stream_id <= ? """ txn.execute(sql, (device_lists_stream_id,)) - txn.execute(sql, (device_lists_stream_id,)) From af2815a3e64115835214271e31619a44238cb771 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 11:36:06 +0000 Subject: [PATCH 13/23] Create a separate ReplicationNotifier --- synapse/notifier.py | 31 ++++++++++++++++--- synapse/server.py | 6 +++- .../storage/databases/main/account_data.py | 2 ++ synapse/storage/databases/main/cache.py | 1 + synapse/storage/databases/main/deviceinbox.py | 3 +- synapse/storage/databases/main/devices.py | 1 + .../storage/databases/main/end_to_end_keys.py | 5 ++- .../storage/databases/main/events_worker.py | 10 +++++- synapse/storage/databases/main/presence.py | 3 +- synapse/storage/databases/main/push_rule.py | 1 + synapse/storage/databases/main/pusher.py | 1 + synapse/storage/databases/main/receipts.py | 2 ++ synapse/storage/databases/main/room.py | 6 +++- synapse/storage/util/id_generators.py | 26 ++++++++++++++-- tests/storage/test_id_generators.py | 4 +++ 15 files changed, 89 insertions(+), 13 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766c3..e7b011125bf3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -226,8 +226,7 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.pending_new_room_events: List[_PendingRoomEventEntry] = [] - # Called when there are new things to stream over replication - self.replication_callbacks: List[Callable[[], None]] = [] + self._replication_notifier = hs.get_replication_notifier() self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = [] self._federation_client = hs.get_federation_http_client() @@ -279,7 +278,7 @@ def add_replication_callback(self, cb: Callable[[], None]) -> None: it needs to do any asynchronous work, a background thread should be started and wrapped with run_as_background_process. """ - self.replication_callbacks.append(cb) + self._replication_notifier.add_replication_callback(cb) def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None: """Add a callback that will be called when a user joins a room. @@ -741,8 +740,7 @@ def _user_joined_room(self, user_id: str, room_id: str) -> None: def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" - for cb in self.replication_callbacks: - cb() + self._replication_notifier.notify_replication() def notify_user_joined_room(self, event_id: str, room_id: str) -> None: for cb in self._new_join_in_room_callbacks: @@ -759,3 +757,26 @@ def notify_remote_server_up(self, server: str) -> None: # Tell the federation client about the fact the server is back up, so # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) + + +@attr.s +class ReplicationNotifier: + """Tracks callbacks for things that need to know about stream changes. + + This is separate from the notifier to avoid circular dependencies. + """ + + _replication_callbacks: List[Callable[[], None]] = attr.Factory(list) + + def add_replication_callback(self, cb: Callable[[], None]) -> None: + """Add a callback that will be called when some new data is available. + Callback is not given any arguments. It should *not* return a Deferred - if + it needs to do any asynchronous work, a background thread should be started and + wrapped with run_as_background_process. + """ + self._replication_callbacks.append(cb) + + def notify_replication(self) -> None: + """Notify the any replication listeners that there's a new event""" + for cb in self._replication_callbacks: + cb() diff --git a/synapse/server.py b/synapse/server.py index f4ab94c4f33c..9d6d268f490c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -107,7 +107,7 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager from synapse.module_api import ModuleApi -from synapse.notifier import Notifier +from synapse.notifier import Notifier, ReplicationNotifier from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator from synapse.push.pusherpool import PusherPool from synapse.replication.tcp.client import ReplicationDataHandler @@ -389,6 +389,10 @@ def get_federation_server(self) -> FederationServer: def get_notifier(self) -> Notifier: return Notifier(self) + @cache_in_self + def get_replication_notifier(self) -> ReplicationNotifier: + return ReplicationNotifier() + @cache_in_self def get_auth(self) -> Auth: return Auth(self) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 881d7089dbb2..8a359d7eb89c 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -75,6 +75,7 @@ def __init__( self._account_data_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="account_data", instance_name=self._instance_name, tables=[ @@ -95,6 +96,7 @@ def __init__( # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "room_account_data", "stream_id", extra_tables=[("room_tags_revisions", "stream_id")], diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2179a8bf5922..5b6643169139 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -75,6 +75,7 @@ def __init__( self._cache_id_gen = MultiWriterIdGenerator( db_conn, database, + notifier=hs.get_replication_notifier(), stream_name="caches", instance_name=hs.get_instance_name(), tables=[ diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 713be91c5dd8..8e61aba4548d 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -91,6 +91,7 @@ def __init__( MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="to_device", instance_name=self._instance_name, tables=[("device_inbox", "instance_name", "stream_id")], @@ -101,7 +102,7 @@ def __init__( else: self._can_write_to_device = True self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" + db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id" ) max_device_inbox_id = self._device_inbox_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index cd186c84726c..903606fb4664 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -92,6 +92,7 @@ def __init__( # class below that is used on the main process. self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "device_lists_stream", "stream_id", extra_tables=[ diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 4c691642e2b5..c4ac6c33ba54 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1181,7 +1181,10 @@ def __init__( super().__init__(database, db_conn, hs) self._cross_signing_id_gen = StreamIdGenerator( - db_conn, "e2e_cross_signing_keys", "stream_id" + db_conn, + hs.get_replication_notifier(), + "e2e_cross_signing_keys", + "stream_id", ) async def set_e2e_device_keys( diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 1e048cadbffb..24127d0364ab 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -191,6 +191,7 @@ def __init__( self._stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="events", instance_name=hs.get_instance_name(), tables=[("events", "instance_name", "stream_ordering")], @@ -200,6 +201,7 @@ def __init__( self._backfill_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="backfill", instance_name=hs.get_instance_name(), tables=[("events", "instance_name", "stream_ordering")], @@ -217,12 +219,14 @@ def __init__( # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "events", "stream_ordering", is_writer=hs.get_instance_name() in hs.config.worker.writers.events, ) self._backfill_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "events", "stream_ordering", step=-1, @@ -300,6 +304,7 @@ def get_chain_id_txn(txn: Cursor) -> int: self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="un_partial_stated_event_stream", instance_name=hs.get_instance_name(), tables=[ @@ -311,7 +316,10 @@ def get_chain_id_txn(txn: Cursor) -> int: ) else: self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( - db_conn, "un_partial_stated_event_stream", "stream_id" + db_conn, + hs.get_replication_notifier(), + "un_partial_stated_event_stream", + "stream_id", ) def get_un_partial_stated_events_token(self, instance_name: str) -> int: diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 7b60815043a6..beb210f8eefd 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -77,6 +77,7 @@ def __init__( self._presence_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="presence_stream", instance_name=self._instance_name, tables=[("presence_stream", "instance_name", "stream_id")], @@ -85,7 +86,7 @@ def __init__( ) else: self._presence_id_gen = StreamIdGenerator( - db_conn, "presence_stream", "stream_id" + db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id" ) self.hs = hs diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 03182887d138..14ca167b34b4 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -118,6 +118,7 @@ def __init__( # class below that is used on the main process. self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "push_rules_stream", "stream_id", is_writer=hs.config.worker.worker_app is None, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 7f24a3b6ec5e..df53e726e62a 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -62,6 +62,7 @@ def __init__( # class below that is used on the main process. self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 86f5bce5f08d..3468f354e60f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -73,6 +73,7 @@ def __init__( self._receipts_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="receipts", instance_name=self._instance_name, tables=[("receipts_linearized", "instance_name", "stream_id")], @@ -91,6 +92,7 @@ def __init__( # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "receipts_linearized", "stream_id", is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 1e830fa178c5..6a65b2a89bad 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -127,6 +127,7 @@ def __init__( self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="un_partial_stated_room_stream", instance_name=self._instance_name, tables=[ @@ -138,7 +139,10 @@ def __init__( ) else: self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator( - db_conn, "un_partial_stated_room_stream", "stream_id" + db_conn, + hs.get_replication_notifier(), + "un_partial_stated_room_stream", + "stream_id", ) def process_replication_position( diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 8670ffbfa374..9adff3f4f523 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -20,6 +20,7 @@ from contextlib import contextmanager from types import TracebackType from typing import ( + TYPE_CHECKING, AsyncContextManager, ContextManager, Dict, @@ -49,6 +50,9 @@ from synapse.storage.types import Cursor from synapse.storage.util.sequence import PostgresSequenceGenerator +if TYPE_CHECKING: + from synapse.notifier import ReplicationNotifier + logger = logging.getLogger(__name__) @@ -182,6 +186,7 @@ class StreamIdGenerator(AbstractStreamIdGenerator): def __init__( self, db_conn: LoggingDatabaseConnection, + notifier: "ReplicationNotifier", table: str, column: str, extra_tables: Iterable[Tuple[str, str]] = (), @@ -205,6 +210,8 @@ def __init__( # The key and values are the same, but we never look at the values. self._unfinished_ids: OrderedDict[int, int] = OrderedDict() + self._notifier = notifier + def advance(self, instance_name: str, new_id: int) -> None: # Advance should never be called on a writer instance, only over replication if self._is_writer: @@ -227,6 +234,8 @@ def manager() -> Generator[int, None, None]: with self._lock: self._unfinished_ids.pop(next_id) + self._notifier.notify_replication() + return _AsyncCtxManagerWrapper(manager()) def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]: @@ -250,6 +259,8 @@ def manager() -> Generator[Sequence[int], None, None]: for next_id in next_ids: self._unfinished_ids.pop(next_id) + self._notifier.notify_replication() + return _AsyncCtxManagerWrapper(manager()) def get_current_token(self) -> int: @@ -296,6 +307,7 @@ def __init__( self, db_conn: LoggingDatabaseConnection, db: DatabasePool, + notifier: "ReplicationNotifier", stream_name: str, instance_name: str, tables: List[Tuple[str, str, str]], @@ -304,6 +316,7 @@ def __init__( positive: bool = True, ) -> None: self._db = db + self._notifier = notifier self._stream_name = stream_name self._instance_name = instance_name self._positive = positive @@ -535,7 +548,9 @@ def get_next(self) -> AsyncContextManager[int]: # Cast safety: the second argument to _MultiWriterCtxManager, multiple_ids, # controls the return type. If `None` or omitted, the context manager yields # a single integer stream_id; otherwise it yields a list of stream_ids. - return cast(AsyncContextManager[int], _MultiWriterCtxManager(self)) + return cast( + AsyncContextManager[int], _MultiWriterCtxManager(self, self._notifier) + ) def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]: # If we have a list of instances that are allowed to write to this @@ -544,7 +559,10 @@ def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]: raise Exception("Tried to allocate stream ID on non-writer") # Cast safety: see get_next. - return cast(AsyncContextManager[List[int]], _MultiWriterCtxManager(self, n)) + return cast( + AsyncContextManager[List[int]], + _MultiWriterCtxManager(self, self._notifier, n), + ) def get_next_txn(self, txn: LoggingTransaction) -> int: """ @@ -563,6 +581,7 @@ def get_next_txn(self, txn: LoggingTransaction) -> int: txn.call_after(self._mark_id_as_finished, next_id) txn.call_on_exception(self._mark_id_as_finished, next_id) + txn.call_after(self._notifier.notify_replication) # Update the `stream_positions` table with newly updated stream # ID (unless self._writers is not set in which case we don't @@ -787,6 +806,7 @@ class _MultiWriterCtxManager: """Async context manager returned by MultiWriterIdGenerator""" id_gen: MultiWriterIdGenerator + notifier: "ReplicationNotifier" multiple_ids: Optional[int] = None stream_ids: List[int] = attr.Factory(list) @@ -814,6 +834,8 @@ async def __aexit__( for i in self.stream_ids: self.id_gen._mark_id_as_finished(i) + self.notifier.notify_replication() + if exc_type is not None: return False diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index ff9691c518bc..9174fb096470 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -52,6 +52,7 @@ def _create_id_generator(self) -> StreamIdGenerator: def _create(conn: LoggingDatabaseConnection) -> StreamIdGenerator: return StreamIdGenerator( db_conn=conn, + notifier=self.hs.get_replication_notifier(), table="foobar", column="stream_id", ) @@ -196,6 +197,7 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: return MultiWriterIdGenerator( conn, self.db_pool, + notifier=self.hs.get_replication_notifier(), stream_name="test_stream", instance_name=instance_name, tables=[("foobar", "instance_name", "stream_id")], @@ -630,6 +632,7 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: return MultiWriterIdGenerator( conn, self.db_pool, + notifier=self.hs.get_replication_notifier(), stream_name="test_stream", instance_name=instance_name, tables=[("foobar", "instance_name", "stream_id")], @@ -766,6 +769,7 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: return MultiWriterIdGenerator( conn, self.db_pool, + notifier=self.hs.get_replication_notifier(), stream_name="test_stream", instance_name=instance_name, tables=[ From d650011eaab6354017b6ea390b07ba3ab4b0f27f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 14:17:18 +0000 Subject: [PATCH 14/23] Fix test --- synapse/notifier.py | 2 +- tests/replication/tcp/test_handler.py | 23 +++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e7b011125bf3..28f0d4a25afe 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -759,7 +759,7 @@ def notify_remote_server_up(self, server: str) -> None: self._federation_client.wake_destination(server) -@attr.s +@attr.s(auto_attribs=True) class ReplicationNotifier: """Tracks callbacks for things that need to know about stream changes. diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 555922409d13..6e4055cc2102 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -14,7 +14,7 @@ from twisted.internet import defer -from synapse.replication.tcp.commands import PositionCommand, RdataCommand +from synapse.replication.tcp.commands import PositionCommand from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -111,20 +111,14 @@ def test_wait_for_stream_position(self) -> None: next_token = self.get_success(ctx.__aenter__()) self.get_success(ctx.__aexit__(None, None, None)) - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() - self.get_success( data_handler.wait_for_stream_position("worker1", "caches", next_token) ) - # `wait_for_stream_position` should only return once master receives an - # RDATA from the worker - ctx = cache_id_gen.get_next() - next_token = self.get_success(ctx.__aenter__()) - self.get_success(ctx.__aexit__(None, None, None)) + # `wait_for_stream_position` should only return once master receives a + # notification that `next_token` has persisted. + ctx_worker1 = cache_id_gen.get_next() + next_token = self.get_success(ctx_worker1.__aenter__()) d = defer.ensureDeferred( data_handler.wait_for_stream_position("worker1", "caches", next_token) @@ -142,10 +136,7 @@ def test_wait_for_stream_position(self) -> None: ) self.assertFalse(d.called) - # ... but receiving the RDATA should - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() + # ... but worker1 finishing (and so sending an update) should. + self.get_success(ctx_worker1.__aexit__(None, None, None)) self.assertTrue(d.called) From 8ae6c319cfd64ca8b37bf740849b550eb8c0bddc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 14:27:54 +0000 Subject: [PATCH 15/23] Fix portdb --- synapse/_scripts/synapse_port_db.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index c463b60b2620..5e137dbbf711 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -51,6 +51,7 @@ make_deferred_yieldable, run_in_background, ) +from synapse.notifier import ReplicationNotifier from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn from synapse.storage.databases.main import PushRuleStore from synapse.storage.databases.main.account_data import AccountDataWorkerStore @@ -260,6 +261,9 @@ def get_instance_name(self) -> str: def should_send_federation(self) -> bool: return False + def get_replication_notifier(self) -> ReplicationNotifier: + return ReplicationNotifier() + class Porter: def __init__( From 71472bac9109fd2b773b119767312e2f6f32caa6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 11:36:06 +0000 Subject: [PATCH 16/23] Create a separate ReplicationNotifier --- synapse/notifier.py | 31 ++++++++++++++++--- synapse/server.py | 6 +++- .../storage/databases/main/account_data.py | 2 ++ synapse/storage/databases/main/cache.py | 1 + synapse/storage/databases/main/deviceinbox.py | 3 +- synapse/storage/databases/main/devices.py | 1 + .../storage/databases/main/end_to_end_keys.py | 5 ++- .../storage/databases/main/events_worker.py | 10 +++++- synapse/storage/databases/main/presence.py | 3 +- synapse/storage/databases/main/push_rule.py | 1 + synapse/storage/databases/main/pusher.py | 1 + synapse/storage/databases/main/receipts.py | 2 ++ synapse/storage/databases/main/room.py | 6 +++- synapse/storage/util/id_generators.py | 26 ++++++++++++++-- tests/storage/test_id_generators.py | 4 +++ 15 files changed, 89 insertions(+), 13 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766c3..e7b011125bf3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -226,8 +226,7 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.pending_new_room_events: List[_PendingRoomEventEntry] = [] - # Called when there are new things to stream over replication - self.replication_callbacks: List[Callable[[], None]] = [] + self._replication_notifier = hs.get_replication_notifier() self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = [] self._federation_client = hs.get_federation_http_client() @@ -279,7 +278,7 @@ def add_replication_callback(self, cb: Callable[[], None]) -> None: it needs to do any asynchronous work, a background thread should be started and wrapped with run_as_background_process. """ - self.replication_callbacks.append(cb) + self._replication_notifier.add_replication_callback(cb) def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None: """Add a callback that will be called when a user joins a room. @@ -741,8 +740,7 @@ def _user_joined_room(self, user_id: str, room_id: str) -> None: def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" - for cb in self.replication_callbacks: - cb() + self._replication_notifier.notify_replication() def notify_user_joined_room(self, event_id: str, room_id: str) -> None: for cb in self._new_join_in_room_callbacks: @@ -759,3 +757,26 @@ def notify_remote_server_up(self, server: str) -> None: # Tell the federation client about the fact the server is back up, so # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) + + +@attr.s +class ReplicationNotifier: + """Tracks callbacks for things that need to know about stream changes. + + This is separate from the notifier to avoid circular dependencies. + """ + + _replication_callbacks: List[Callable[[], None]] = attr.Factory(list) + + def add_replication_callback(self, cb: Callable[[], None]) -> None: + """Add a callback that will be called when some new data is available. + Callback is not given any arguments. It should *not* return a Deferred - if + it needs to do any asynchronous work, a background thread should be started and + wrapped with run_as_background_process. + """ + self._replication_callbacks.append(cb) + + def notify_replication(self) -> None: + """Notify the any replication listeners that there's a new event""" + for cb in self._replication_callbacks: + cb() diff --git a/synapse/server.py b/synapse/server.py index f4ab94c4f33c..9d6d268f490c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -107,7 +107,7 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager from synapse.module_api import ModuleApi -from synapse.notifier import Notifier +from synapse.notifier import Notifier, ReplicationNotifier from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator from synapse.push.pusherpool import PusherPool from synapse.replication.tcp.client import ReplicationDataHandler @@ -389,6 +389,10 @@ def get_federation_server(self) -> FederationServer: def get_notifier(self) -> Notifier: return Notifier(self) + @cache_in_self + def get_replication_notifier(self) -> ReplicationNotifier: + return ReplicationNotifier() + @cache_in_self def get_auth(self) -> Auth: return Auth(self) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 881d7089dbb2..8a359d7eb89c 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -75,6 +75,7 @@ def __init__( self._account_data_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="account_data", instance_name=self._instance_name, tables=[ @@ -95,6 +96,7 @@ def __init__( # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "room_account_data", "stream_id", extra_tables=[("room_tags_revisions", "stream_id")], diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2179a8bf5922..5b6643169139 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -75,6 +75,7 @@ def __init__( self._cache_id_gen = MultiWriterIdGenerator( db_conn, database, + notifier=hs.get_replication_notifier(), stream_name="caches", instance_name=hs.get_instance_name(), tables=[ diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 713be91c5dd8..8e61aba4548d 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -91,6 +91,7 @@ def __init__( MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="to_device", instance_name=self._instance_name, tables=[("device_inbox", "instance_name", "stream_id")], @@ -101,7 +102,7 @@ def __init__( else: self._can_write_to_device = True self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" + db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id" ) max_device_inbox_id = self._device_inbox_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index cd186c84726c..903606fb4664 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -92,6 +92,7 @@ def __init__( # class below that is used on the main process. self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "device_lists_stream", "stream_id", extra_tables=[ diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 4c691642e2b5..c4ac6c33ba54 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1181,7 +1181,10 @@ def __init__( super().__init__(database, db_conn, hs) self._cross_signing_id_gen = StreamIdGenerator( - db_conn, "e2e_cross_signing_keys", "stream_id" + db_conn, + hs.get_replication_notifier(), + "e2e_cross_signing_keys", + "stream_id", ) async def set_e2e_device_keys( diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d150fa8a943d..d8a8bcafb6ce 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -191,6 +191,7 @@ def __init__( self._stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="events", instance_name=hs.get_instance_name(), tables=[("events", "instance_name", "stream_ordering")], @@ -200,6 +201,7 @@ def __init__( self._backfill_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="backfill", instance_name=hs.get_instance_name(), tables=[("events", "instance_name", "stream_ordering")], @@ -217,12 +219,14 @@ def __init__( # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "events", "stream_ordering", is_writer=hs.get_instance_name() in hs.config.worker.writers.events, ) self._backfill_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "events", "stream_ordering", step=-1, @@ -300,6 +304,7 @@ def get_chain_id_txn(txn: Cursor) -> int: self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="un_partial_stated_event_stream", instance_name=hs.get_instance_name(), tables=[ @@ -311,7 +316,10 @@ def get_chain_id_txn(txn: Cursor) -> int: ) else: self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( - db_conn, "un_partial_stated_event_stream", "stream_id" + db_conn, + hs.get_replication_notifier(), + "un_partial_stated_event_stream", + "stream_id", ) def get_un_partial_stated_events_token(self) -> int: diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 7b60815043a6..beb210f8eefd 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -77,6 +77,7 @@ def __init__( self._presence_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="presence_stream", instance_name=self._instance_name, tables=[("presence_stream", "instance_name", "stream_id")], @@ -85,7 +86,7 @@ def __init__( ) else: self._presence_id_gen = StreamIdGenerator( - db_conn, "presence_stream", "stream_id" + db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id" ) self.hs = hs diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 03182887d138..14ca167b34b4 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -118,6 +118,7 @@ def __init__( # class below that is used on the main process. self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "push_rules_stream", "stream_id", is_writer=hs.config.worker.worker_app is None, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 7f24a3b6ec5e..df53e726e62a 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -62,6 +62,7 @@ def __init__( # class below that is used on the main process. self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 86f5bce5f08d..3468f354e60f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -73,6 +73,7 @@ def __init__( self._receipts_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="receipts", instance_name=self._instance_name, tables=[("receipts_linearized", "instance_name", "stream_id")], @@ -91,6 +92,7 @@ def __init__( # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, + hs.get_replication_notifier(), "receipts_linearized", "stream_id", is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 78906a5e1d9e..7264a33cd4ac 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -126,6 +126,7 @@ def __init__( self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, + notifier=hs.get_replication_notifier(), stream_name="un_partial_stated_room_stream", instance_name=self._instance_name, tables=[ @@ -137,7 +138,10 @@ def __init__( ) else: self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator( - db_conn, "un_partial_stated_room_stream", "stream_id" + db_conn, + hs.get_replication_notifier(), + "un_partial_stated_room_stream", + "stream_id", ) async def store_room( diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 8670ffbfa374..9adff3f4f523 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -20,6 +20,7 @@ from contextlib import contextmanager from types import TracebackType from typing import ( + TYPE_CHECKING, AsyncContextManager, ContextManager, Dict, @@ -49,6 +50,9 @@ from synapse.storage.types import Cursor from synapse.storage.util.sequence import PostgresSequenceGenerator +if TYPE_CHECKING: + from synapse.notifier import ReplicationNotifier + logger = logging.getLogger(__name__) @@ -182,6 +186,7 @@ class StreamIdGenerator(AbstractStreamIdGenerator): def __init__( self, db_conn: LoggingDatabaseConnection, + notifier: "ReplicationNotifier", table: str, column: str, extra_tables: Iterable[Tuple[str, str]] = (), @@ -205,6 +210,8 @@ def __init__( # The key and values are the same, but we never look at the values. self._unfinished_ids: OrderedDict[int, int] = OrderedDict() + self._notifier = notifier + def advance(self, instance_name: str, new_id: int) -> None: # Advance should never be called on a writer instance, only over replication if self._is_writer: @@ -227,6 +234,8 @@ def manager() -> Generator[int, None, None]: with self._lock: self._unfinished_ids.pop(next_id) + self._notifier.notify_replication() + return _AsyncCtxManagerWrapper(manager()) def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]: @@ -250,6 +259,8 @@ def manager() -> Generator[Sequence[int], None, None]: for next_id in next_ids: self._unfinished_ids.pop(next_id) + self._notifier.notify_replication() + return _AsyncCtxManagerWrapper(manager()) def get_current_token(self) -> int: @@ -296,6 +307,7 @@ def __init__( self, db_conn: LoggingDatabaseConnection, db: DatabasePool, + notifier: "ReplicationNotifier", stream_name: str, instance_name: str, tables: List[Tuple[str, str, str]], @@ -304,6 +316,7 @@ def __init__( positive: bool = True, ) -> None: self._db = db + self._notifier = notifier self._stream_name = stream_name self._instance_name = instance_name self._positive = positive @@ -535,7 +548,9 @@ def get_next(self) -> AsyncContextManager[int]: # Cast safety: the second argument to _MultiWriterCtxManager, multiple_ids, # controls the return type. If `None` or omitted, the context manager yields # a single integer stream_id; otherwise it yields a list of stream_ids. - return cast(AsyncContextManager[int], _MultiWriterCtxManager(self)) + return cast( + AsyncContextManager[int], _MultiWriterCtxManager(self, self._notifier) + ) def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]: # If we have a list of instances that are allowed to write to this @@ -544,7 +559,10 @@ def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]: raise Exception("Tried to allocate stream ID on non-writer") # Cast safety: see get_next. - return cast(AsyncContextManager[List[int]], _MultiWriterCtxManager(self, n)) + return cast( + AsyncContextManager[List[int]], + _MultiWriterCtxManager(self, self._notifier, n), + ) def get_next_txn(self, txn: LoggingTransaction) -> int: """ @@ -563,6 +581,7 @@ def get_next_txn(self, txn: LoggingTransaction) -> int: txn.call_after(self._mark_id_as_finished, next_id) txn.call_on_exception(self._mark_id_as_finished, next_id) + txn.call_after(self._notifier.notify_replication) # Update the `stream_positions` table with newly updated stream # ID (unless self._writers is not set in which case we don't @@ -787,6 +806,7 @@ class _MultiWriterCtxManager: """Async context manager returned by MultiWriterIdGenerator""" id_gen: MultiWriterIdGenerator + notifier: "ReplicationNotifier" multiple_ids: Optional[int] = None stream_ids: List[int] = attr.Factory(list) @@ -814,6 +834,8 @@ async def __aexit__( for i in self.stream_ids: self.id_gen._mark_id_as_finished(i) + self.notifier.notify_replication() + if exc_type is not None: return False diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index ff9691c518bc..9174fb096470 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -52,6 +52,7 @@ def _create_id_generator(self) -> StreamIdGenerator: def _create(conn: LoggingDatabaseConnection) -> StreamIdGenerator: return StreamIdGenerator( db_conn=conn, + notifier=self.hs.get_replication_notifier(), table="foobar", column="stream_id", ) @@ -196,6 +197,7 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: return MultiWriterIdGenerator( conn, self.db_pool, + notifier=self.hs.get_replication_notifier(), stream_name="test_stream", instance_name=instance_name, tables=[("foobar", "instance_name", "stream_id")], @@ -630,6 +632,7 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: return MultiWriterIdGenerator( conn, self.db_pool, + notifier=self.hs.get_replication_notifier(), stream_name="test_stream", instance_name=instance_name, tables=[("foobar", "instance_name", "stream_id")], @@ -766,6 +769,7 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: return MultiWriterIdGenerator( conn, self.db_pool, + notifier=self.hs.get_replication_notifier(), stream_name="test_stream", instance_name=instance_name, tables=[ From b72b698701f1f6ed89c2aef1dffbebb1648a6d52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 14:17:18 +0000 Subject: [PATCH 17/23] Fix test --- synapse/notifier.py | 2 +- tests/replication/tcp/test_handler.py | 23 +++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e7b011125bf3..28f0d4a25afe 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -759,7 +759,7 @@ def notify_remote_server_up(self, server: str) -> None: self._federation_client.wake_destination(server) -@attr.s +@attr.s(auto_attribs=True) class ReplicationNotifier: """Tracks callbacks for things that need to know about stream changes. diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 555922409d13..6e4055cc2102 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -14,7 +14,7 @@ from twisted.internet import defer -from synapse.replication.tcp.commands import PositionCommand, RdataCommand +from synapse.replication.tcp.commands import PositionCommand from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -111,20 +111,14 @@ def test_wait_for_stream_position(self) -> None: next_token = self.get_success(ctx.__aenter__()) self.get_success(ctx.__aexit__(None, None, None)) - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() - self.get_success( data_handler.wait_for_stream_position("worker1", "caches", next_token) ) - # `wait_for_stream_position` should only return once master receives an - # RDATA from the worker - ctx = cache_id_gen.get_next() - next_token = self.get_success(ctx.__aenter__()) - self.get_success(ctx.__aexit__(None, None, None)) + # `wait_for_stream_position` should only return once master receives a + # notification that `next_token` has persisted. + ctx_worker1 = cache_id_gen.get_next() + next_token = self.get_success(ctx_worker1.__aenter__()) d = defer.ensureDeferred( data_handler.wait_for_stream_position("worker1", "caches", next_token) @@ -142,10 +136,7 @@ def test_wait_for_stream_position(self) -> None: ) self.assertFalse(d.called) - # ... but receiving the RDATA should - cmd_handler.send_command( - RdataCommand("caches", "worker1", next_token, ("func_name", [], 0)) - ) - self.replicate() + # ... but worker1 finishing (and so sending an update) should. + self.get_success(ctx_worker1.__aexit__(None, None, None)) self.assertTrue(d.called) From 5a4355de58933861c072cfc78583e7fe2a569357 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 14:27:54 +0000 Subject: [PATCH 18/23] Fix portdb --- synapse/_scripts/synapse_port_db.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index c463b60b2620..5e137dbbf711 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -51,6 +51,7 @@ make_deferred_yieldable, run_in_background, ) +from synapse.notifier import ReplicationNotifier from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn from synapse.storage.databases.main import PushRuleStore from synapse.storage.databases.main.account_data import AccountDataWorkerStore @@ -260,6 +261,9 @@ def get_instance_name(self) -> str: def should_send_federation(self) -> bool: return False + def get_replication_notifier(self) -> ReplicationNotifier: + return ReplicationNotifier() + class Porter: def __init__( From f2a097c271b8150cee40d55f35a7374fc3041231 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 16:03:03 +0000 Subject: [PATCH 19/23] Fix presence test --- tests/module_api/test_api.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 9919938e8071..8f88c0117d78 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -404,6 +404,9 @@ def test_send_local_online_presence_to_federation(self): self.module_api.send_local_online_presence_to([remote_user_id]) ) + # We don't always send out federation immediately, so we advance the clock. + self.reactor.advance(1000) + # Check that a presence update was sent as part of a federation transaction found_update = False calls = ( From 56e6dda7da8e7d3b14a111b89b9dcce8d0b08ed1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 16:04:23 +0000 Subject: [PATCH 20/23] Newsfile --- changelog.d/14877.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14877.misc diff --git a/changelog.d/14877.misc b/changelog.d/14877.misc new file mode 100644 index 000000000000..4e9c3fa33fdc --- /dev/null +++ b/changelog.d/14877.misc @@ -0,0 +1 @@ +Always notify replication when a stream advances automatically. From 207c0a3ceb7aa51acdd12c0f0a3adf2727fdcac6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jan 2023 19:42:19 +0000 Subject: [PATCH 21/23] Apply suggestions from code review --- synapse/replication/tcp/streams/partial_state.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index b78eb165215c..a8ce5ffd7289 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -41,7 +41,6 @@ def __init__(self, hs: "HomeServer"): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - # TODO(faster_joins, multiple writers): we need to account for instance names store.get_un_partial_stated_rooms_token, store.get_un_partial_stated_rooms_from_stream, ) @@ -69,7 +68,6 @@ def __init__(self, hs: "HomeServer"): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - # TODO(faster_joins, multiple writers): we need to account for instance names store.get_un_partial_stated_events_token, store.get_un_partial_stated_events_from_stream, ) From b7df49969f6c74164802bde8448e74dad7430438 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 19:30:46 +0000 Subject: [PATCH 22/23] Update changelog.d/14752.misc Co-authored-by: Erik Johnston --- changelog.d/14752.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/14752.misc b/changelog.d/14752.misc index 1cd2d4eabf44..1f9675c53bca 100644 --- a/changelog.d/14752.misc +++ b/changelog.d/14752.misc @@ -1 +1 @@ -Enable Complement tests for Faster Remote Room Joins against worker-mode Synapse. Not suitable for production use. \ No newline at end of file +Enable Complement tests for Faster Remote Room Joins against worker-mode Synapse. \ No newline at end of file From 4d62667e2c5a586811d05eb99ffc67bcd2aaf8bb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 19:41:56 +0000 Subject: [PATCH 23/23] lint --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8023067e5e3a..7620245e2642 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1869,7 +1869,7 @@ async def _sync_partial_state_room( async with self._is_partial_state_room_linearizer.queue(room_id): logger.info("Clearing partial-state flag for %s", room_id) success = await self.store.clear_partial_state_room(room_id) - + # Poke the notifier so that other workers see the write to # the un-partial-stated rooms stream. self._notifier.notify_replication()