Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding sync: Correctly track which read receipts we have or have not sent down. #17575

Merged
merged 34 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
da5339d
Migrate to per-connection state class
erikjohnston Aug 12, 2024
baac6c5
Record with new class
erikjohnston Aug 14, 2024
0561c86
Revamp
erikjohnston Aug 15, 2024
c15b8b3
WIP receipts reading
erikjohnston Aug 13, 2024
a1b75f7
WIP comments
erikjohnston Aug 14, 2024
6b9d244
Record state
erikjohnston Aug 15, 2024
2e7672d
Newsfile
erikjohnston Aug 15, 2024
55feaae
Add tests
erikjohnston Aug 15, 2024
614c0d7
Newsfile
erikjohnston Aug 15, 2024
100927d
Comments
erikjohnston Aug 15, 2024
70d32fb
Add proper DB function for getting receipts between things
erikjohnston Aug 15, 2024
64310ec
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 16, 2024
79e80eb
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 16, 2024
d982efe
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 16, 2024
b0a5c0e
Apply suggestions from code review
erikjohnston Aug 16, 2024
577370a
Simplify get room status
erikjohnston Aug 16, 2024
27b7a4a
Add docstring
erikjohnston Aug 16, 2024
dec5314
Fixups
erikjohnston Aug 16, 2024
5b6755a
Rename
erikjohnston Aug 16, 2024
9241da0
Merge remote-tracking branch 'origin/erikj/ss_per_state_class' into e…
erikjohnston Aug 16, 2024
8a8df06
Wrap comment
erikjohnston Aug 16, 2024
db4c733
Docstrings
erikjohnston Aug 16, 2024
39bece9
Update names for per-conn state variables
erikjohnston Aug 16, 2024
161275d
Add back in receipts comments
erikjohnston Aug 16, 2024
f511576
Actually only check for missing receipts in rooms with LIVE
erikjohnston Aug 16, 2024
90ecd9e
Apply suggestions from code review
erikjohnston Aug 16, 2024
49c2f73
Rewrap comments
erikjohnston Aug 16, 2024
53a35e8
Add comment about checking room.initial flag
erikjohnston Aug 16, 2024
7f5bccc
Add some comments that chain map won't infinitely grow
erikjohnston Aug 19, 2024
f628b8d
Apply suggestions from code review
erikjohnston Aug 19, 2024
41d37b3
Rename new_connection_state
erikjohnston Aug 19, 2024
96aaed8
Add receipts to docstring
erikjohnston Aug 19, 2024
1937ac9
Merge branch 'erikj/ss_per_state_class' into erikj/ss_receipts2
erikjohnston Aug 19, 2024
4f08d04
Merge remote-tracking branch 'origin/develop' into erikj/ss_receipts2
erikjohnston Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17574.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor per-connection state in experimental sliding sync handler.
1 change: 1 addition & 0 deletions changelog.d/17575.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly track read receipts that should be sent down in experimental sliding sync.
488 changes: 353 additions & 135 deletions synapse/handlers/sliding_sync.py

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
JsonMapping,
MultiWriterStreamToken,
PersistedPosition,
StrCollection,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -550,6 +552,46 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]:

return results

async def get_rooms_with_receipts_between(
self,
room_ids: StrCollection,
from_key: MultiWriterStreamToken,
to_key: MultiWriterStreamToken,
) -> StrCollection:
"""Given a set of room_ids, find out which ones (may) have receipts
between the two tokens (> `from_token` and <= `to_token`)."""

room_ids = self._receipts_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
if not room_ids:
return []

def f(txn: LoggingTransaction, room_ids: StrCollection) -> StrCollection:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

sql = f"""
SELECT DISTINCT room_id FROM receipts_linearized
WHERE {clause} AND ? < stream_id AND stream_id <= ?
"""
args.append(from_key.stream)
args.append(to_key.get_max_stream_pos())

txn.execute(sql, args)

return [room_id for room_id, in txn]

results: List[str] = []
for batch in batch_iter(room_ids, 1000):
batch_result = await self.db_pool.runInteraction(
"get_rooms_with_receipts_between", f, batch
)
results.extend(batch_result)

return results

async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:
Expand Down
105 changes: 105 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,108 @@ def test_wait_for_new_data_timeout(self) -> None:
set(),
exact=True,
)

def test_receipts_incremental_sync_out_of_range(self) -> None:
"""Tests that we don't return read receipts for rooms that fall out of
range, but then do send all read receipts once they're back in range.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)

# Send a message and read receipt into room2
event_response = self.helper.send(room_id2, body="new event", tok=user2_tok)
room2_event_id = event_response["event_id"]

self.helper.send_read_receipt(room_id2, room2_event_id, tok=user1_tok)

# Now send a message into room1 so that it is at the top of the list
self.helper.send(room_id1, body="new event", tok=user2_tok)

# Make a SS request for only the top room.
sync_body = {
"lists": {
"main": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 5,
}
},
"extensions": {
"receipts": {
"enabled": True,
}
},
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)

# The receipt is in room2, but only room1 is returned, so we don't
# expect to get the receipt.
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
set(),
exact=True,
)

# Move room2 into range.
self.helper.send(room_id2, body="new event", tok=user2_tok)

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)

# We expect to see the read receipt of room2, as that has the most
# recent update.
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
{room_id2},
exact=True,
)
receipt = response_body["extensions"]["receipts"]["rooms"][room_id2]
self.assertIncludes(
receipt["content"][room2_event_id][ReceiptTypes.READ].keys(),
{user1_id},
exact=True,
)

# Send a message into room1 to bump it to the top, but also send a
# receipt in room2
self.helper.send(room_id1, body="new event", tok=user2_tok)
self.helper.send_read_receipt(room_id2, room2_event_id, tok=user2_tok)

# We don't expect to see the new read receipt.
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
set(),
exact=True,
)

# But if we send a new message into room2, we expect to get the missing receipts
self.helper.send(room_id2, body="new event", tok=user2_tok)

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
{room_id2},
exact=True,
)

# We should only see the new receipt
receipt = response_body["extensions"]["receipts"]["rooms"][room_id2]
self.assertIncludes(
receipt["content"][room2_event_id][ReceiptTypes.READ].keys(),
{user2_id},
exact=True,
)
13 changes: 10 additions & 3 deletions tests/rest/client/sliding_sync/test_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,26 @@ def test_extensions_lists_rooms_relevant_rooms(
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
# We set this to `1` because we're testing `receipts` which
# interact with the `timeline`. With receipts, when a room
# hasn't been sent down the connection before or it appears
# as `initial: true`, we only include receipts for events in
# the timeline to avoid bloating and blowing up the sync
# response as the number of users in the room increases.
# (this behavior is part of the spec)
"timeline_limit": 1,
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
}
},
}
Expand Down
14 changes: 13 additions & 1 deletion tests/rest/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from twisted.test.proto_helpers import MemoryReactorClock
from twisted.web.server import Site

from synapse.api.constants import Membership
from synapse.api.constants import Membership, ReceiptTypes
from synapse.api.errors import Codes
from synapse.server import HomeServer
from synapse.types import JsonDict
Expand Down Expand Up @@ -944,3 +944,15 @@ def initiate_sso_ui_auth(
assert len(p.links) == 1, "not exactly one link in confirmation page"
oauth_uri = p.links[0]
return oauth_uri

def send_read_receipt(self, room_id: str, event_id: str, *, tok: str) -> None:
"""Send a read receipt into the room at the given event"""
channel = make_request(
self.reactor,
self.site,
method="POST",
path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}",
content={},
access_token=tok,
)
assert channel.code == HTTPStatus.OK, channel.text_body
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use this helper for the rest of the receipts tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, we should, just didn't want to inflate the PR

Loading