Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Generate historic pagination token for /messages when no ?from token provided #12370

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12370.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order.
Copy link
Contributor Author

@MadLittleMods MadLittleMods Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably use the same changelog as #12319 as this PR doesn't actually fix anything without additionally merging that PR.

In any case, this PR does update /messages to do the right thing imo so it would be a good change regardless. But with more of changelog like:

Update `/messages` to use historic pagination tokens by default.

9 changes: 8 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,14 @@ async def get_messages(
if pagin_config.from_token:
from_token = pagin_config.from_token
else:
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination(
room_id
)
)
# We expect `/messages` to use historic pagination tokens by default but
# `/messages` should still works with live tokens when manually provided.
assert from_token.room_key.topological

if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,8 +1444,8 @@ async def get_new_events(
def get_current_key(self) -> RoomStreamToken:
return self.store.get_room_max_token()

def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
return self.store.get_current_room_stream_token_for_room_id(room_id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, get_current_key_for_room was unused along with get_room_events_max_id.

It seemed almost exactly what I was looking for so I repurposed both functions with proper RoomStreamToken types instead of returning the string version that was interpolated together.



class ShutdownRoomResponse(TypedDict):
Expand Down
20 changes: 11 additions & 9 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,21 +748,23 @@ def _f(txn):
"get_room_event_before_stream_ordering", _f
)

async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
"""Returns the current token for rooms stream.
By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological
token.
async def get_current_room_stream_token_for_room_id(
self, room_id: Optional[str] = None
) -> RoomStreamToken:
"""Returns the current position of the rooms stream.
By default, it returns a live token with the current global stream
token. Specifying a `room_id` causes it to return a historic token with
the room specific topological token.
"""
token = self.get_room_max_stream_ordering()
stream_ordering = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
return RoomStreamToken(None, stream_ordering)
else:
topo = await self.db_pool.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
return RoomStreamToken(topo, stream_ordering)

def get_stream_id_for_event_txn(
self,
Expand Down
4 changes: 2 additions & 2 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken:
)
return token

def get_current_token_for_pagination(self) -> StreamToken:
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.
Expand All @@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken:
The current token for pagination.
"""
token = StreamToken(
room_key=self.sources.room.get_current_key(),
room_key=await self.sources.room.get_current_key_for_room(room_id),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down
4 changes: 3 additions & 1 deletion tests/storage/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ def prepare(self, reactor, clock, homeserver):
def _filter_messages(self, filter: JsonDict) -> List[EventBase]:
"""Make a request to /messages with a filter, returns the chunk of events."""

from_token = self.hs.get_event_sources().get_current_token_for_pagination()
from_token = self.get_success(
self.hs.get_event_sources().get_current_token_for_pagination(self.room_id)
)

events, next_key = self.get_success(
self.hs.get_datastores().main.paginate_room_events(
Expand Down