Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Also check if first event matches the last in prev batch #17066

Merged
merged 4 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17066.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
20 changes: 13 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,17 +1266,23 @@ async def _compute_state_delta_for_incremental_sync(
#
# c.f. #16941 for an example of why we can't do this for all non-gappy
# syncs.
is_linear_timeline = False
is_linear_timeline = True
if batch.events:
prev_event_id = batch.events[0].event_id
for e in batch.events[1:]:
# We need to make sure the first event in our batch points to the
# last event in the previous batch.
last_event_id_prev_batch = (
await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=since_token.room_key,
)
)

prev_event_id = last_event_id_prev_batch
for e in batch.events:
if e.prev_event_ids() != [prev_event_id]:
is_linear_timeline = False
break
prev_event_id = e.event_id
else:
is_linear_timeline = True
else:
is_linear_timeline = True

if is_linear_timeline and not batch.limited:
state_ids: StateMap[str] = {}
Expand Down
95 changes: 95 additions & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,101 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None:
[s2_event],
)

def test_state_includes_changes_on_long_lived_forks(self) -> None:
"""State changes that happen on a fork of the DAG must be included in `state`

Given the following DAG:

E1
↗ ↖
| S2
| ↑
--|------|----
E3 |
--|------|----
| E4
| |

... and a filter that means we only return 1 event, represented by the dashed
horizontal lines: `S2` must be included in the `state` section on the second sync.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)

# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)

# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]

# Do an incremental sync, this will return E3 but *not* S2 at this
# point.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
since_token=initial_sync_result.next_batch,
)
)
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[],
)

# Now send another event that points to S2, but not E3.
with self._patch_get_latest_events([s2_event]):
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]

# Doing an incremental sync should return S2 in state.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
since_token=incremental_sync.next_batch,
)
)
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertFalse(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e4_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)

def test_state_includes_changes_on_ungappy_syncs(self) -> None:
"""Test `state` where the sync is not gappy.

Expand Down
Loading