Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix logic for dropping old events in fed queue #11806

Merged
merged 6 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11806.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room.
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,10 @@ async def prune_staged_events_in_room(

if room_version.event_format == EventFormatVersions.V1:
for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
if (
not isinstance(prev_event_tuple, list)
or len(prev_event_tuple) != 2
):
logger.info("Invalid prev_events for %s", event_id)
break

Expand Down
30 changes: 24 additions & 6 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Tuple, Union

import attr
from parameterized import parameterized

from synapse.api.room_versions import RoomVersions
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
)
from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder

Expand Down Expand Up @@ -506,11 +512,21 @@ def insert_event(txn):
)
self.assertSetEqual(difference, set())

def test_prune_inbound_federation_queue(self):
"Test that pruning of inbound federation queues work"
@parameterized.expand(
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
[(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()]
)
def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Test that pruning of inbound federation queues work"""

room_id = "some_room_id"

def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
"""Account for differences in prev_events format across room versions"""
if room_version.event_format == EventFormatVersions.V1:
return prev_event_id, {}

return prev_event_id

# Insert a bunch of events that all reference the previous one.
self.get_success(
self.store.db_pool.simple_insert_many(
Expand All @@ -529,7 +545,9 @@ def test_prune_inbound_federation_queue(self):
room_id,
0,
f"$fake_event_id_{i + 1}",
json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}),
json_encoder.encode(
{"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
),
"{}",
)
for i in range(500)
Expand All @@ -541,12 +559,12 @@ def test_prune_inbound_federation_queue(self):
# Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't.
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
self.store.prune_staged_events_in_room(room_id, room_version)
)
self.assertTrue(pruned)

pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
self.store.prune_staged_events_in_room(room_id, room_version)
)
self.assertFalse(pruned)

Expand Down