From eff722d808893f76cdab20d422226a0f12fe8014 Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Mon, 25 Apr 2022 15:04:53 -0400 Subject: [PATCH 1/7] hacky upper bounding work --- synapse/handlers/appservice.py | 2 +- synapse/handlers/receipts.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 1b5784050621..60fbbc01b0c2 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -447,7 +447,7 @@ async def _handle_receipts( receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( - service=service, from_key=from_key + service=service, from_key=from_key, to_key=new_token ) return receipts diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 6250bb3bdf2b..77ee5cc3351b 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -239,7 +239,7 @@ async def get_new_events( return events, to_key async def get_new_events_as( - self, from_key: int, service: ApplicationService + self, from_key: int, to_key: int, service: ApplicationService ) -> Tuple[List[JsonDict], int]: """Returns a set of new read receipt events that an appservice may be interested in. @@ -255,7 +255,6 @@ async def get_new_events_as( * The current read receipt stream token. """ from_key = int(from_key) - to_key = self.get_current_key() if from_key == to_key: return [], to_key From 74b4fb9f831b6e3cd6220cf3cccf6f9c435c9c5e Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 25 Apr 2022 20:40:54 +0100 Subject: [PATCH 2/7] Add docstring for `to_key` argument --- synapse/handlers/receipts.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 77ee5cc3351b..cfe860decc95 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -246,6 +246,7 @@ async def get_new_events_as( Args: from_key: the stream position at which events should be fetched from + to_key: the stream position up to which events should be fetched to service: The appservice which may be interested Returns: From 1798073e7250d6e59420554c1fe6eff22e6a3c37 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 25 Apr 2022 20:48:22 +0100 Subject: [PATCH 3/7] Fix type --- synapse/handlers/appservice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 60fbbc01b0c2..b3894666ccf5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -416,7 +416,7 @@ async def _handle_typing( return typing async def _handle_receipts( - self, service: ApplicationService, new_token: Optional[int] + self, service: ApplicationService, new_token: int ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. From 6b8971f7a0a6e92f71a56cd233cb989cc4e50279 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 25 Apr 2022 20:50:40 +0100 Subject: [PATCH 4/7] Add changelog file --- changelog.d/12544.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12544.bugfix diff --git a/changelog.d/12544.bugfix b/changelog.d/12544.bugfix new file mode 100644 index 000000000000..dae548186d73 --- /dev/null +++ b/changelog.d/12544.bugfix @@ -0,0 +1 @@ +Bound read receipts being sent to app services to avoid runaway memory in AS worker. Contributed by Brad & Nick @ Beeper. From 933c7a93b222529da03e192914358f15e6f91d6d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 26 Apr 2022 19:28:31 +0100 Subject: [PATCH 5/7] Add a test to verify that large batches of read receipts are properly sent to clients --- tests/handlers/test_appservice.py | 74 +++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 8c72cf6b308b..ddd8fda79df3 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -411,6 +411,80 @@ def prepare(self, reactor, clock, hs): "exclusive_as_user", "password", self.exclusive_as_user_device_id ) + def test_sending_read_receipt_batches_to_application_services(self): + """Tests that a large batch of read receipts are sent correctly to + interested application services. + """ + # Register an application service that's interested in a certain user + # and room prefix + interested_appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + } + ], + ApplicationService.NS_ROOMS: [ + { + "regex": "!fakeroom_.*", + "exclusive": True, + } + ], + }, + ) + + # "Complete" a transaction. + # All this really does for us is make an entry in the application_services_state + # database table, which tracks the current stream_token per stream ID per AS. + self.get_success( + self.hs.get_datastores().main.complete_appservice_txn( + 0, + interested_appservice, + ) + ) + + # Now, pretend that we receive a large burst of read receipts (300 total) that + # all come in at once. + for i in range(300): + self.get_success( + # Insert a fake read receipt into the database + self.hs.get_datastores().main.insert_receipt( + # We have to use unique room ID + user ID combinations here, as the db query + # is an upsert. + room_id=f"!fakeroom_{i}:test", + receipt_type="m.read", + user_id=self.local_user, + event_ids=[f"$eventid_{i}"], + data={}, + ) + ) + + # Now notify the appservice handler that 300 read receipts have all arrived + # at once. What will it do! + # note: stream tokens start at 2 + for stream_token in range(2, 303): + self.get_success( + self.hs.get_application_service_handler()._notify_interested_services_ephemeral( + services=[interested_appservice], + stream_key="receipt_key", + new_token=stream_token, + users=[self.exclusive_as_user], + ) + ) + + # Using our txn send mock, we can see what the AS received. After iterating over every + # transaction, we'd like to see all 300 read receipts accounted for. + # No more, no less. + total_ephemeral_events = 0 + for call in self.send_mock.call_args_list: + ephemeral_events = call[0][2] + print(ephemeral_events) + total_ephemeral_events += len(ephemeral_events) + + # Ensure that no duplicate events were sent + self.assertEqual(total_ephemeral_events, 300) + @unittest.override_config( {"experimental_features": {"msc2409_to_device_messages_enabled": True}} ) From 2797c73da7e5cf1b3dd99d088a93770b744a42a7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 26 Apr 2022 19:32:48 +0100 Subject: [PATCH 6/7] unit test read receipt ephemeral event shape while we're here --- tests/handlers/test_appservice.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ddd8fda79df3..5b0cd1ab8608 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -476,14 +476,22 @@ def test_sending_read_receipt_batches_to_application_services(self): # Using our txn send mock, we can see what the AS received. After iterating over every # transaction, we'd like to see all 300 read receipts accounted for. # No more, no less. - total_ephemeral_events = 0 + all_ephemeral_events = [] for call in self.send_mock.call_args_list: ephemeral_events = call[0][2] - print(ephemeral_events) - total_ephemeral_events += len(ephemeral_events) + all_ephemeral_events += ephemeral_events # Ensure that no duplicate events were sent - self.assertEqual(total_ephemeral_events, 300) + self.assertEqual(len(all_ephemeral_events), 300) + + # Check that the ephemeral event is a read receipt with the expected structure + latest_read_receipt = all_ephemeral_events[-1] + self.assertEqual(latest_read_receipt["type"], "m.receipt") + + event_id = list(latest_read_receipt["content"].keys())[0] + self.assertEqual( + latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}} + ) @unittest.override_config( {"experimental_features": {"msc2409_to_device_messages_enabled": True}} From 8d36ecc4dda40f4be41c14a05d2fa8274ed3cea6 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 26 Apr 2022 19:38:21 +0100 Subject: [PATCH 7/7] Update changelog.d/12544.bugfix --- changelog.d/12544.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/12544.bugfix b/changelog.d/12544.bugfix index dae548186d73..b5169cd8311a 100644 --- a/changelog.d/12544.bugfix +++ b/changelog.d/12544.bugfix @@ -1 +1 @@ -Bound read receipts being sent to app services to avoid runaway memory in AS worker. Contributed by Brad & Nick @ Beeper. +Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.