Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bound ephemeral events by key #12544

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12544.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Bound read receipts being sent to app services to avoid runaway memory in AS worker. Contributed by Brad & Nick @ Beeper.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async def _handle_typing(
return typing

async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
Expand Down Expand Up @@ -447,7 +447,7 @@ async def _handle_receipts(

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
service=service, from_key=from_key, to_key=new_token
)
return receipts

Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,14 @@ async def get_new_events(
return events, to_key

async def get_new_events_as(
self, from_key: int, service: ApplicationService
self, from_key: int, to_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new read receipt events that an appservice
may be interested in.

Args:
from_key: the stream position at which events should be fetched from
to_key: the stream position up to which events should be fetched to
service: The appservice which may be interested

Returns:
Expand All @@ -255,7 +256,6 @@ async def get_new_events_as(
* The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()

if from_key == to_key:
return [], to_key
Expand Down
82 changes: 82 additions & 0 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,88 @@ def prepare(self, reactor, clock, hs):
"exclusive_as_user", "password", self.exclusive_as_user_device_id
)

def test_sending_read_receipt_batches_to_application_services(self):
"""Tests that a large batch of read receipts are sent correctly to
interested application services.
"""
# Register an application service that's interested in a certain user
# and room prefix
interested_appservice = self._register_application_service(
namespaces={
ApplicationService.NS_USERS: [
{
"regex": "@exclusive_as_user:.+",
"exclusive": True,
}
],
ApplicationService.NS_ROOMS: [
{
"regex": "!fakeroom_.*",
"exclusive": True,
}
],
},
)

# "Complete" a transaction.
# All this really does for us is make an entry in the application_services_state
# database table, which tracks the current stream_token per stream ID per AS.
self.get_success(
self.hs.get_datastores().main.complete_appservice_txn(
0,
interested_appservice,
)
)

# Now, pretend that we receive a large burst of read receipts (300 total) that
# all come in at once.
for i in range(300):
self.get_success(
# Insert a fake read receipt into the database
self.hs.get_datastores().main.insert_receipt(
# We have to use unique room ID + user ID combinations here, as the db query
# is an upsert.
room_id=f"!fakeroom_{i}:test",
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
data={},
)
)

# Now notify the appservice handler that 300 read receipts have all arrived
# at once. What will it do!
# note: stream tokens start at 2
for stream_token in range(2, 303):
self.get_success(
self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
services=[interested_appservice],
stream_key="receipt_key",
new_token=stream_token,
users=[self.exclusive_as_user],
)
)

# Using our txn send mock, we can see what the AS received. After iterating over every
# transaction, we'd like to see all 300 read receipts accounted for.
# No more, no less.
all_ephemeral_events = []
for call in self.send_mock.call_args_list:
ephemeral_events = call[0][2]
all_ephemeral_events += ephemeral_events

# Ensure that no duplicate events were sent
self.assertEqual(len(all_ephemeral_events), 300)

# Check that the ephemeral event is a read receipt with the expected structure
latest_read_receipt = all_ephemeral_events[-1]
self.assertEqual(latest_read_receipt["type"], "m.receipt")

event_id = list(latest_read_receipt["content"].keys())[0]
self.assertEqual(
latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
)

@unittest.override_config(
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
)
Expand Down