From 78e3df81c8bc37b183d6aa309f00eb8fde8a1523 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 29 Mar 2022 07:59:46 +0100 Subject: [PATCH 01/12] Add `order_by` argument for paginating room events This can be used to flip betweek topological ordering (the default) and stream ordering as needed by the caller. --- synapse/storage/databases/main/stream.py | 42 ++++++++++++++++++++---- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 39e1efe37348..cb8bc508b06f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -652,7 +652,11 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: return ret async def get_recent_events_for_room( - self, room_id: str, limit: int, end_token: RoomStreamToken + self, + room_id: str, + limit: int, + end_token: RoomStreamToken, + order_by: str = "topological", ) -> Tuple[List[EventBase], RoomStreamToken]: """Get the most recent events in the room in topological ordering. @@ -660,6 +664,8 @@ async def get_recent_events_for_room( room_id limit end_token: The stream token representing now. + order_by: Either 'topological' or 'stream' to indicate the order in + which results should be returned. Returns: A list of events and a token pointing to the start of the returned @@ -667,7 +673,10 @@ async def get_recent_events_for_room( """ rows, token = await self.get_recent_event_ids_for_room( - room_id, limit, end_token + room_id, + limit, + end_token, + order_by, ) events = await self.get_events_as_list( @@ -679,7 +688,11 @@ async def get_recent_events_for_room( return events, token async def get_recent_event_ids_for_room( - self, room_id: str, limit: int, end_token: RoomStreamToken + self, + room_id: str, + limit: int, + end_token: RoomStreamToken, + order_by: str = "topological", ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: """Get the most recent events in the room in topological ordering. @@ -687,6 +700,8 @@ async def get_recent_event_ids_for_room( room_id limit end_token: The stream token representing now. + order_by: Either 'topological' or 'stream' to indicate the order in + which results should be returned. Returns: A list of _EventDictReturn and a token pointing to the start of the @@ -701,6 +716,7 @@ async def get_recent_event_ids_for_room( self._paginate_room_events_txn, room_id, from_token=end_token, + order_by=order_by, limit=limit, ) @@ -1099,6 +1115,7 @@ def _paginate_room_events_txn( from_token: RoomStreamToken, to_token: Optional[RoomStreamToken] = None, direction: str = "b", + order_by: str = "topological", limit: int = -1, event_filter: Optional[Filter] = None, ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: @@ -1111,6 +1128,8 @@ def _paginate_room_events_txn( to_token: A token which if given limits the results to only those before direction: Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. + order_by: Either 'topological' or 'stream' to indicate the order in + which results should be returned. limit: The maximum number of events to return. event_filter: If provided filters the events to those that match the filter. @@ -1123,6 +1142,7 @@ def _paginate_room_events_txn( """ assert int(limit) >= 0 + assert order_by in ("topological", "stream") # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -1133,6 +1153,12 @@ def _paginate_room_events_txn( else: order = "ASC" + order_clause = """ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s""" + if order_by == "stream": + order_clause = """ORDER BY event.stream_ordering %(order)s, event.topological_ordering %(order)s""" + + order_clause = order_clause % {"order": order} + # The bounds for the stream tokens are complicated by the fact # that we need to handle the instance_map part of the tokens. We do this # by fetching all events between the min stream token and the maximum @@ -1228,13 +1254,13 @@ def _paginate_room_events_txn( FROM events AS event %(join_clause)s WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s - ORDER BY event.topological_ordering %(order)s, - event.stream_ordering %(order)s LIMIT ? + %(order_clause)s + LIMIT ? """ % { "select_keywords": select_keywords, "join_clause": join_clause, "bounds": bounds, - "order": order, + "order_clause": order_clause, } txn.execute(sql, args) @@ -1275,6 +1301,7 @@ async def paginate_room_events( from_key: RoomStreamToken, to_key: Optional[RoomStreamToken] = None, direction: str = "b", + order_by: str = "topological", limit: int = -1, event_filter: Optional[Filter] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: @@ -1286,6 +1313,8 @@ async def paginate_room_events( to_key: A token which if given limits the results to only those before direction: Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. + order_by: Either 'topological' or 'stream' to indicate the order in + which results should be returned. limit: The maximum number of events to return. event_filter: If provided filters the events to those that match the filter. @@ -1303,6 +1332,7 @@ async def paginate_room_events( from_key, to_key, direction, + order_by, limit, event_filter, ) From 00f81f0e1b9dc09e388a15e8273255514d253695 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 29 Mar 2022 08:09:01 +0100 Subject: [PATCH 02/12] Use new `order_by` argument to get events in stream order --- synapse/handlers/message.py | 9 ++++----- synapse/handlers/sync.py | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c4fb4360af9..04d4d9a877d4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,12 +175,11 @@ async def get_state_events( state_filter = state_filter or StateFilter.all() if at_token: - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=at_token.room_key, limit=1 + room_id, + end_token=at_token.room_key, + limit=1, + order_by="stream", ) if not last_events: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6c569cfb1c88..e51e59f02515 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -683,12 +683,11 @@ async def get_state_at( stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1 + room_id, + end_token=stream_position.room_key, + limit=1, + order_by="stream", ) if last_events: From e974d6c06dcd41746cc53c9f6af4b6e97eb73f37 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 29 Mar 2022 08:14:11 +0100 Subject: [PATCH 03/12] Add changelog file --- changelog.d/12319.fix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12319.fix diff --git a/changelog.d/12319.fix b/changelog.d/12319.fix new file mode 100644 index 000000000000..a50191feaaaf --- /dev/null +++ b/changelog.d/12319.fix @@ -0,0 +1 @@ +Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper. From 446d64765d6c2923fa1aabaeb2f6f7081a619fd2 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 29 Mar 2022 15:25:37 +0100 Subject: [PATCH 04/12] Correct newfile name --- changelog.d/{12319.fix => 12319.bugfix} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{12319.fix => 12319.bugfix} (100%) diff --git a/changelog.d/12319.fix b/changelog.d/12319.bugfix similarity index 100% rename from changelog.d/12319.fix rename to changelog.d/12319.bugfix From e440db94d638b778a5a6b7ac2dc909cb37093445 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 30 Mar 2022 14:27:52 +0100 Subject: [PATCH 05/12] Add sync while batch importing test case --- tests/rest/client/test_room_batch.py | 114 ++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py index 44f333a0ee77..f03a517b9693 100644 --- a/tests/rest/client/test_room_batch.py +++ b/tests/rest/client/test_room_batch.py @@ -7,9 +7,9 @@ from synapse.api.constants import EventContentFields, EventTypes from synapse.appservice import ApplicationService from synapse.rest import admin -from synapse.rest.client import login, register, room, room_batch +from synapse.rest.client import login, register, room, room_batch, sync from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken from synapse.util import Clock from tests import unittest @@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase): room.register_servlets, register.register_servlets, login.register_servlets, + sync.register_servlets, ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: @@ -178,3 +179,112 @@ def test_same_state_groups_for_whole_historical_batch(self) -> None: "Expected a single state_group to be returned by saw state_groups=%s" % (state_group_map.keys(),), ) + + @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) + def test_sync_while_batch_importing(self) -> None: + """ + Make sure that /sync correctly returns full room state when a user joins + during ongoing batch backfilling. + See: https://github.com/matrix-org/synapse/issues/12281 + """ + # Create user who will be invited & join room + user_id = self.register_user("beep", "test") + user_tok = self.login("beep", "test") + + time_before_room = int(self.clock.time_msec()) + + # Create a room with some events + room_id, _, _, _ = self._create_test_room() + # Invite the user + self.helper.invite( + room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id + ) + + # Create another room, send a bunch of events to advance the stream token + other_room_id = self.helper.create_room_as( + self.appservice.sender, tok=self.appservice.token + ) + for _ in range(5): + self.helper.send_event( + room_id=other_room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "C"}, + tok=self.appservice.token, + ) + + # Join the room as the normal user + self.helper.join(room_id, user_id, tok=user_tok) + + # Create event to hang backfill batch from, and send batch + response = self.helper.send_event( + room_id=room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "C", + }, + tok=self.appservice.token, + ) + event_to_hang_id = response["event_id"] + + channel = self.make_request( + "POST", + "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" + % (room_id, event_to_hang_id), + content={ + "events": _create_message_events_for_batch_send_request( + self.virtual_user_id, time_before_room, 3 + ), + "state_events_at_start": _create_join_state_events_for_batch_send_request( + [self.virtual_user_id], time_before_room + ), + }, + access_token=self.appservice.token, + ) + self.assertEqual(channel.code, 200, channel.result) + + # Now we need to find the invite + join events stream tokens so we can sync between + main_store = self.hs.get_datastores().main + events, next_key = self.get_success( + main_store.get_recent_events_for_room( + room_id, + 50, + end_token=main_store.get_room_max_token(), + ), + ) + invite_event_position = None + for event in events: + if ( + event.type == "m.room.member" + and event.content["membership"] == "invite" + ): + invite_event_position = self.get_success( + main_store.get_topological_token_for_event(event.event_id) + ) + break + + assert invite_event_position is not None, "No invite event found" + + # Remove the topological order from the token by re-creating w/stream only + invite_event_position = RoomStreamToken(None, invite_event_position.stream) + + # Sync everything after this token + since_token = self.get_success(invite_event_position.to_string(main_store)) + sync_response = self.make_request( + "GET", + f"/sync?since={since_token}", + access_token=user_tok, + ) + + # Assert that, for this room, the user was considered to have joined and thus + # receives the full state history + state_event_types = [ + event["type"] + for event in sync_response.json_body["rooms"]["join"][room_id]["state"][ + "events" + ] + ] + + assert ( + "m.room.create" in state_event_types + ), "Missing room full state in sync response" From ef07fcda333cd22825661eee5f8615b258ac3505 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 31 Mar 2022 08:11:42 +0100 Subject: [PATCH 06/12] Remove `order_by` argument and automatically select ordering This aligns with automatic selection of bounds depending on the token passed into the pagination function. --- synapse/handlers/message.py | 1 - synapse/handlers/sync.py | 1 - synapse/storage/databases/main/stream.py | 34 +++++------------------- 3 files changed, 6 insertions(+), 30 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5e6419aa9465..8bb3cda41355 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -179,7 +179,6 @@ async def get_state_events( room_id, end_token=at_token.room_key, limit=1, - order_by="stream", ) if not last_events: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e51e59f02515..7de55cb1dd4e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -687,7 +687,6 @@ async def get_state_at( room_id, end_token=stream_position.room_key, limit=1, - order_by="stream", ) if last_events: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index cb8bc508b06f..89fd0b247921 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -652,11 +652,7 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: return ret async def get_recent_events_for_room( - self, - room_id: str, - limit: int, - end_token: RoomStreamToken, - order_by: str = "topological", + self, room_id: str, limit: int, end_token: RoomStreamToken ) -> Tuple[List[EventBase], RoomStreamToken]: """Get the most recent events in the room in topological ordering. @@ -664,8 +660,6 @@ async def get_recent_events_for_room( room_id limit end_token: The stream token representing now. - order_by: Either 'topological' or 'stream' to indicate the order in - which results should be returned. Returns: A list of events and a token pointing to the start of the returned @@ -673,10 +667,7 @@ async def get_recent_events_for_room( """ rows, token = await self.get_recent_event_ids_for_room( - room_id, - limit, - end_token, - order_by, + room_id, limit, end_token ) events = await self.get_events_as_list( @@ -688,11 +679,7 @@ async def get_recent_events_for_room( return events, token async def get_recent_event_ids_for_room( - self, - room_id: str, - limit: int, - end_token: RoomStreamToken, - order_by: str = "topological", + self, room_id: str, limit: int, end_token: RoomStreamToken ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: """Get the most recent events in the room in topological ordering. @@ -700,8 +687,6 @@ async def get_recent_event_ids_for_room( room_id limit end_token: The stream token representing now. - order_by: Either 'topological' or 'stream' to indicate the order in - which results should be returned. Returns: A list of _EventDictReturn and a token pointing to the start of the @@ -716,7 +701,6 @@ async def get_recent_event_ids_for_room( self._paginate_room_events_txn, room_id, from_token=end_token, - order_by=order_by, limit=limit, ) @@ -1115,7 +1099,6 @@ def _paginate_room_events_txn( from_token: RoomStreamToken, to_token: Optional[RoomStreamToken] = None, direction: str = "b", - order_by: str = "topological", limit: int = -1, event_filter: Optional[Filter] = None, ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: @@ -1128,8 +1111,6 @@ def _paginate_room_events_txn( to_token: A token which if given limits the results to only those before direction: Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. - order_by: Either 'topological' or 'stream' to indicate the order in - which results should be returned. limit: The maximum number of events to return. event_filter: If provided filters the events to those that match the filter. @@ -1142,7 +1123,6 @@ def _paginate_room_events_txn( """ assert int(limit) >= 0 - assert order_by in ("topological", "stream") # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -1153,8 +1133,10 @@ def _paginate_room_events_txn( else: order = "ASC" + # If we have only a stream token, order by stream_ordering, otherwise + # order by topologicial_ordering. order_clause = """ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s""" - if order_by == "stream": + if from_token.topological is None: order_clause = """ORDER BY event.stream_ordering %(order)s, event.topological_ordering %(order)s""" order_clause = order_clause % {"order": order} @@ -1301,7 +1283,6 @@ async def paginate_room_events( from_key: RoomStreamToken, to_key: Optional[RoomStreamToken] = None, direction: str = "b", - order_by: str = "topological", limit: int = -1, event_filter: Optional[Filter] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: @@ -1313,8 +1294,6 @@ async def paginate_room_events( to_key: A token which if given limits the results to only those before direction: Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. - order_by: Either 'topological' or 'stream' to indicate the order in - which results should be returned. limit: The maximum number of events to return. event_filter: If provided filters the events to those that match the filter. @@ -1332,7 +1311,6 @@ async def paginate_room_events( from_key, to_key, direction, - order_by, limit, event_filter, ) From 0df9e93e015385a022fc0cc7fb45f0f84385d3f3 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 31 Mar 2022 08:13:23 +0100 Subject: [PATCH 07/12] Put formatting back --- synapse/handlers/message.py | 4 +--- synapse/handlers/sync.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8bb3cda41355..f59afa7456c6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -176,9 +176,7 @@ async def get_state_events( if at_token: last_events, _ = await self.store.get_recent_events_for_room( - room_id, - end_token=at_token.room_key, - limit=1, + room_id, end_token=at_token.room_key, limit=1 ) if not last_events: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7de55cb1dd4e..8f1839f11af4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -684,9 +684,7 @@ async def get_state_at( state_filter: The state filter used to fetch state from the database. """ last_events, _ = await self.store.get_recent_events_for_room( - room_id, - end_token=stream_position.room_key, - limit=1, + room_id, end_token=stream_position.room_key, limit=1 ) if last_events: From b2c6c20f4ab8e68b56b39b869af29c56e81d0ff7 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 7 Apr 2022 07:08:48 +0100 Subject: [PATCH 08/12] Update comment on sync while batching test --- tests/rest/client/test_room_batch.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py index f03a517b9693..b985680914de 100644 --- a/tests/rest/client/test_room_batch.py +++ b/tests/rest/client/test_room_batch.py @@ -215,7 +215,9 @@ def test_sync_while_batch_importing(self) -> None: # Join the room as the normal user self.helper.join(room_id, user_id, tok=user_tok) - # Create event to hang backfill batch from, and send batch + # Create event to hang backfill batch from, and send batch - historical batches + # must be hung from the lmost recent event after the member join event to see + # this failure case. response = self.helper.send_event( room_id=room_id, type=EventTypes.Message, From 9a78d14cb04353b93687faab194d404b5931ba41 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 12 Apr 2022 15:21:45 +0100 Subject: [PATCH 09/12] Add and use `get_last_event_in_room_before_stream_ordering` method This uses an existing method, `get_last_event_in_room_before_stream_ordering`, to retrieve the most recent (stream ordered) event in a room and replaces uses of the complex pagination txn. --- synapse/handlers/message.py | 10 ++++----- synapse/handlers/sync.py | 8 ++++---- synapse/storage/databases/main/stream.py | 26 ++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 71cc93cbed2c..5c0e571d77a8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,13 +175,13 @@ async def get_state_events( state_filter = state_filter or StateFilter.all() if at_token: - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=at_token.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=at_token.room_key, ) - if not last_events: + if not last_event: raise NotFoundError("Can't find event for token %s" % (at_token,)) - last_event = last_events[0] # check whether the user is in the room at that time to determine # whether they should be treated as peeking. @@ -200,7 +200,7 @@ async def get_state_events( visible_events = await filter_events_for_client( self.storage, user_id, - last_events, + [last_event], filter_send_to_client=False, is_peeking=is_peeking, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 69f0e2bd2e2d..f4048149d278 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -661,12 +661,12 @@ async def get_state_at( stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, ) - if last_events: - last_event = last_events[-1] + if last_event: state = await self.get_state_after_event( last_event, state_filter=state_filter or StateFilter.all() ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 51316af3a502..2a3ea49c8c40 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -748,6 +748,32 @@ def _f(txn): "get_room_event_before_stream_ordering", _f ) + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[EventBase]: + """Returns the last event in a room at or before a stream ordering + + Args: + room_id + end_token: The token used to stream from + + Returns: + The most recent event. + """ + + last_row = await self.get_room_event_before_stream_ordering( + room_id=room_id, + stream_ordering=end_token.stream, + ) + if last_row: + _, _, event_id = last_row + event = await self.get_event(event_id, get_prev_content=True) + return event + + return None + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: From 8e319ccccefff6ceb41bca1c1983da470ea1674e Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 12 Apr 2022 15:21:59 +0100 Subject: [PATCH 10/12] Undo ordering changes based on stream token in pagination txn --- synapse/storage/databases/main/stream.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 2a3ea49c8c40..2cce24093628 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1171,14 +1171,6 @@ def _paginate_room_events_txn( else: order = "ASC" - # If we have only a stream token, order by stream_ordering, otherwise - # order by topologicial_ordering. - order_clause = """ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s""" - if from_token.topological is None: - order_clause = """ORDER BY event.stream_ordering %(order)s, event.topological_ordering %(order)s""" - - order_clause = order_clause % {"order": order} - # The bounds for the stream tokens are complicated by the fact # that we need to handle the instance_map part of the tokens. We do this # by fetching all events between the min stream token and the maximum @@ -1274,13 +1266,13 @@ def _paginate_room_events_txn( FROM events AS event %(join_clause)s WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s - %(order_clause)s - LIMIT ? + ORDER BY event.topological_ordering %(order)s, + event.stream_ordering %(order)s LIMIT ? """ % { "select_keywords": select_keywords, "join_clause": join_clause, "bounds": bounds, - "order_clause": order_clause, + "order": order, } txn.execute(sql, args) From 77ac51cb35150adbbc0cbd67e1030e6d5abdb0b7 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 12 Apr 2022 16:53:20 +0100 Subject: [PATCH 11/12] Re-add fixme comment about last event room state Co-authored-by: Erik Johnston --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f4048149d278..d4264b14ca05 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -661,6 +661,9 @@ async def get_state_at( stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ + # FIXME: This gets the state at the latest event before the stream ordering, + # which might not be the same as the "current state" of the room at the time + # of the stream token if there were multiple forward extremities at the time. last_event = await self.store.get_last_event_in_room_before_stream_ordering( room_id, end_token=stream_position.room_key, From 361dd387b38c1f9b2e57bad0fc2fe9549036ab65 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 12 Apr 2022 21:09:00 +0100 Subject: [PATCH 12/12] Expand test room batch comment detail Co-authored-by: Eric Eastwood --- tests/rest/client/test_room_batch.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py index b985680914de..41a1bf6d890e 100644 --- a/tests/rest/client/test_room_batch.py +++ b/tests/rest/client/test_room_batch.py @@ -215,9 +215,18 @@ def test_sync_while_batch_importing(self) -> None: # Join the room as the normal user self.helper.join(room_id, user_id, tok=user_tok) - # Create event to hang backfill batch from, and send batch - historical batches - # must be hung from the lmost recent event after the member join event to see - # this failure case. + # Create an event to hang the historical batch from - In order to see + # the failure case originally reported in #12281, the historical batch + # must be hung from the most recent event in the room so the base + # insertion event ends up with the highest `topogological_ordering` + # (`depth`) in the room but will have a negative `stream_ordering` + # because it's a `historical` event. Previously, when assembling the + # `state` for the `/sync` response, the bugged logic would sort by + # `topological_ordering` descending and pick up the base insertion + # event because it has a negative `stream_ordering` below the given + # pagination token. Now we properly sort by `stream_ordering` + # descending which puts `historical` events with a negative + # `stream_ordering` way at the bottom and aren't selected as expected. response = self.helper.send_event( room_id=room_id, type=EventTypes.Message,