Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Keep track when we try and fail to process a pulled event #13589

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e0d7fab
Keep track when we tried to backfill an event
MadLittleMods Aug 23, 2022
b8d55d3
Record in some fail spots
MadLittleMods Aug 25, 2022
f63d823
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Aug 25, 2022
bec26e2
Record and clear attempts
MadLittleMods Aug 25, 2022
fee37c3
Add changelog
MadLittleMods Aug 25, 2022
d1290be
Remove from when spam checker fails
MadLittleMods Aug 25, 2022
f9119d0
Custom upsert to increment
MadLittleMods Aug 25, 2022
f5c6fe7
Fix lints
MadLittleMods Aug 25, 2022
16ebec0
Remove extra whitespace
MadLittleMods Aug 25, 2022
ce07aa1
Move to correct folder
MadLittleMods Aug 25, 2022
5811ba1
Set the version back
MadLittleMods Aug 25, 2022
cf2b093
Fix `TypeError: not all arguments converted during string formatting`
MadLittleMods Aug 25, 2022
cbb4194
Add test to make sure failed backfill attempts are recorded
MadLittleMods Aug 26, 2022
621c5d3
Clean up test
MadLittleMods Aug 26, 2022
75c07bb
Fix comments
MadLittleMods Aug 26, 2022
783cce5
Add test for clearing backfill attempts
MadLittleMods Aug 26, 2022
54ef84b
Maybe a better comment
MadLittleMods Aug 26, 2022
e4192d7
Update table name with "failed" and include room_id in the primary key
MadLittleMods Aug 31, 2022
7a44932
Rename to record_event_failed_backfill_attempt
MadLittleMods Aug 31, 2022
86d98ca
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Aug 31, 2022
1464101
Add _unsafe_to_upsert_tables check
MadLittleMods Sep 1, 2022
71c7738
Add foreign key references
MadLittleMods Sep 1, 2022
df8c76d
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 1, 2022
d45b078
Remove reference to event that won't be in the events table
MadLittleMods Sep 1, 2022
33ad64e
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 9, 2022
63bec99
Remove emulated upsert code (all of our dbs no support it)
MadLittleMods Sep 9, 2022
31d7502
Table rename `event_failed_pull_attempts`
MadLittleMods Sep 9, 2022
0b5f1db
Add `last_cause` column
MadLittleMods Sep 9, 2022
4ce7709
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 12, 2022
d3a1f84
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 13, 2022
1347686
Update schema version summary
MadLittleMods Sep 13, 2022
57182dc
Remove backfilled check since we plan to go general anyway
MadLittleMods Sep 14, 2022
e58bc65
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 14, 2022
70019d2
Move change to latest delta 73
MadLittleMods Sep 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13589.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
7 changes: 7 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,9 @@ async def _process_pulled_event(
self._sanity_check_event(event)
except SynapseError as err:
logger.warning("Event %s failed sanity check: %s", event_id, err)
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(err)
)
return

try:
Expand Down Expand Up @@ -897,6 +900,10 @@ async def _process_pulled_event(
backfilled=backfilled,
)
except FederationError as e:
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(e)
)

if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
Expand Down
45 changes: 45 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,51 @@ def _get_backfill_events(

return event_id_results

@trace
async def record_event_failed_pull_attempt(
self, room_id: str, event_id: str, cause: str
) -> None:
"""
Record when we fail to pull an event over federation.

This information allows us to be more intelligent when we decide to
retry (we don't need to fail over and over) and we can process that
event in the background so we don't block on it each time.

Args:
room_id: The room where the event failed to pull from
event_id: The event that failed to be fetched or processed
cause: The error message or reason that we failed to pull the event
"""
await self.db_pool.runInteraction(
"record_event_failed_pull_attempt",
self._record_event_failed_pull_attempt_upsert_txn,
room_id,
event_id,
cause,
db_autocommit=True, # Safe as it's a single upsert
)

def _record_event_failed_pull_attempt_upsert_txn(
self,
txn: LoggingTransaction,
room_id: str,
event_id: str,
cause: str,
) -> None:
sql = """
INSERT INTO event_failed_pull_attempts (
room_id, event_id, num_attempts, last_attempt_ts, last_cause
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (room_id, event_id) DO UPDATE SET
num_attempts=event_failed_pull_attempts.num_attempts + 1,
last_attempt_ts=EXCLUDED.last_attempt_ts,
last_cause=EXCLUDED.last_cause;
"""

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

async def get_missing_events(
self,
room_id: str,
Expand Down
32 changes: 23 additions & 9 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2435,17 +2435,31 @@ def _update_backward_extremeties(
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
]
txn.execute_batch(
query,
[
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
],
backward_extremity_tuples_to_remove,
)

# Clear out the failed backfill attempts after we successfully pulled
# the event. Since we no longer need these events as backward
# extremities, it also means that they won't be backfilled from again so
# we no longer need to store the backfill attempts around it.
query = """
DELETE FROM event_failed_pull_attempts
WHERE event_id = ? and room_id = ?
"""
txn.execute_batch(
query,
backward_extremity_tuples_to_remove,
)


Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
Changes in SCHEMA_VERSION = 73;
- thread_id column is added to event_push_actions, event_push_actions_staging
event_push_summary, receipts_linearized, and receipts_graph.
- Add table `event_failed_pull_attempts` to keep track when we fail to pull
events over federation.
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


-- Add a table that keeps track of when we failed to pull an event over
-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
-- us to be more intelligent when we decide to retry (we don't need to fail over
-- and over) and we can process that event in the background so we don't block
-- on it each time.
CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
room_id TEXT NOT NULL REFERENCES rooms (room_id),
event_id TEXT NOT NULL,
num_attempts INT NOT NULL,
last_attempt_ts BIGINT NOT NULL,
last_cause TEXT NOT NULL,
PRIMARY KEY (room_id, event_id)
);
222 changes: 222 additions & 0 deletions tests/handlers/test_federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,225 @@ async def get_event(destination: str, event_id: str, timeout=None):

if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()

def test_process_pulled_event_records_failed_backfill_attempts(
self,
) -> None:
"""
Test to make sure that failed backfill attempts for an event are
recorded in the `event_failed_pull_attempts` table.

In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event has a fake `prev_event` which our server has
obviously never seen before so it attempts to request the state at that
`prev_event` which expectedly fails because it's a fake event. Because
the server can't fetch the state at the missing `prev_event`, the
"pulled" event fails the history check and is fails to process.

We check that we correctly record the number of failed pull attempts
of the pulled event and as a sanity check, that the "pulled" event isn't
persisted.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# We expect an outbound request to /state_ids, so stub that out
self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
{
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
"pdu_ids": [],
"auth_chain_ids": [],
}
)
# We also expect an outbound request to /state
self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
StateRequestResponse(
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
auth_events=[],
state=[],
)
)

pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [
# The fake prev event will make the pulled event fail
# the history check (`Unable to get missing prev_event
# $fake_prev_event`)
"$fake_prev_event"
],
"auth_events": [],
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure our failed pull attempt was recorded
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)

# The function under test: try to process the pulled event again
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 2)

# And as a sanity check, make sure the event was not persisted through all of this.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNone(
persisted,
"pulled event that fails the history check should not be persisted at all",
)

def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
self,
) -> None:
"""
Test to make sure that failed pull attempts
(`event_failed_pull_attempts` table) for an event are cleared after the
event is successfully persisted.

In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event succesfully processes and the backward
extremeties are updated along with clearing out any failed pull attempts
for those old extremities.

We check that we correctly cleared failed pull attempts of the
pulled event.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)

# add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)

initial_state_map = self.get_success(
main_store.get_partial_current_state_ids(room_id)
)

auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
member_event.event_id,
]

pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [member_event.event_id],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# Fake the "pulled" event failing to backfill once so we can test
# if it's cleared out later on.
self.get_success(
main_store.record_event_failed_pull_attempt(
pulled_event.room_id, pulled_event.event_id, "fake cause"
)
)
# Make sure we have a failed pull attempt recorded for the pulled event
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)

# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure the failed pull attempts for the pulled event are cleared
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
allow_none=True,
)
)
self.assertIsNone(backfill_num_attempts)

# And as a sanity check, make sure the "pulled" event was persisted.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNotNone(persisted, "pulled event was not persisted at all")