Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Keep track when we try and fail to process a pulled event #13589

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e0d7fab
Keep track when we tried to backfill an event
MadLittleMods Aug 23, 2022
b8d55d3
Record in some fail spots
MadLittleMods Aug 25, 2022
f63d823
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Aug 25, 2022
bec26e2
Record and clear attempts
MadLittleMods Aug 25, 2022
fee37c3
Add changelog
MadLittleMods Aug 25, 2022
d1290be
Remove from when spam checker fails
MadLittleMods Aug 25, 2022
f9119d0
Custom upsert to increment
MadLittleMods Aug 25, 2022
f5c6fe7
Fix lints
MadLittleMods Aug 25, 2022
16ebec0
Remove extra whitespace
MadLittleMods Aug 25, 2022
ce07aa1
Move to correct folder
MadLittleMods Aug 25, 2022
5811ba1
Set the version back
MadLittleMods Aug 25, 2022
cf2b093
Fix `TypeError: not all arguments converted during string formatting`
MadLittleMods Aug 25, 2022
cbb4194
Add test to make sure failed backfill attempts are recorded
MadLittleMods Aug 26, 2022
621c5d3
Clean up test
MadLittleMods Aug 26, 2022
75c07bb
Fix comments
MadLittleMods Aug 26, 2022
783cce5
Add test for clearing backfill attempts
MadLittleMods Aug 26, 2022
54ef84b
Maybe a better comment
MadLittleMods Aug 26, 2022
e4192d7
Update table name with "failed" and include room_id in the primary key
MadLittleMods Aug 31, 2022
7a44932
Rename to record_event_failed_backfill_attempt
MadLittleMods Aug 31, 2022
86d98ca
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Aug 31, 2022
1464101
Add _unsafe_to_upsert_tables check
MadLittleMods Sep 1, 2022
71c7738
Add foreign key references
MadLittleMods Sep 1, 2022
df8c76d
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 1, 2022
d45b078
Remove reference to event that won't be in the events table
MadLittleMods Sep 1, 2022
33ad64e
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 9, 2022
63bec99
Remove emulated upsert code (all of our dbs no support it)
MadLittleMods Sep 9, 2022
31d7502
Table rename `event_failed_pull_attempts`
MadLittleMods Sep 9, 2022
0b5f1db
Add `last_cause` column
MadLittleMods Sep 9, 2022
4ce7709
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 12, 2022
d3a1f84
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 13, 2022
1347686
Update schema version summary
MadLittleMods Sep 13, 2022
57182dc
Remove backfilled check since we plan to go general anyway
MadLittleMods Sep 14, 2022
e58bc65
Merge branch 'develop' into madlittlemods/keep-track-when-we-tried-to…
MadLittleMods Sep 14, 2022
70019d2
Move change to latest delta 73
MadLittleMods Sep 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13589.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
5 changes: 5 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,8 @@ async def _process_pulled_event(
self._sanity_check_event(event)
except SynapseError as err:
logger.warning("Event %s failed sanity check: %s", event_id, err)
if backfilled:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
await self._store.record_event_backfill_attempt(event_id)
return

try:
Expand Down Expand Up @@ -897,6 +899,9 @@ async def _process_pulled_event(
backfilled=backfilled,
)
except FederationError as e:
if backfilled:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
await self._store.record_event_backfill_attempt(event_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
Expand Down
73 changes: 73 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,79 @@ def _get_backfill_events(

return event_id_results

@trace
async def record_event_backfill_attempt(self, event_id: str) -> None:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if self.database_engine.can_native_upsert:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
await self.db_pool.runInteraction(
"record_event_backfill_attempt",
self._record_event_backfill_attempt_upsert_native_txn,
event_id,
db_autocommit=True, # Safe as its a single upsert
)
else:
await self.db_pool.runInteraction(
"record_event_backfill_attempt",
self._record_event_backfill_attempt_upsert_emulated_txn,
event_id,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

def _record_event_backfill_attempt_upsert_native_txn(
self,
txn: LoggingTransaction,
event_id: str,
) -> None:
assert self.database_engine.can_native_upsert

sql = """
INSERT INTO event_backfill_attempts (
event_id, num_attempts, last_attempt_ts
)
VALUES (?, ?, ?)
ON CONFLICT (event_id) DO UPDATE SET
event_id=EXCLUDED.event_id,
num_attempts=event_backfill_attempts.num_attempts + 1,
last_attempt_ts=EXCLUDED.last_attempt_ts;
"""

txn.execute(sql, (event_id, 1, self._clock.time_msec()))

def _record_event_backfill_attempt_upsert_emulated_txn(
self,
txn: LoggingTransaction,
event_id: str,
) -> None:
self.database_engine.lock_table(txn, "event_backfill_attempts")

prev_row = self.db_pool.simple_select_one_txn(
txn,
table="event_backfill_attempts",
keyvalues={"event_id": event_id},
retcols=("num_attempts"),
allow_none=True,
)

if not prev_row:
self.db_pool.simple_insert_txn(
txn,
table="event_backfill_attempts",
values={
"event_id": event_id,
"num_attempts": 1,
"last_attempt_ts": self._clock.time_msec(),
},
)
else:
self.db_pool.simple_update_one_txn(
txn,
table="event_backfill_attempts",
keyvalues={"event_id": event_id},
updatevalues={
"event_id": event_id,
"num_attempts": prev_row["num_attempts"] + 1,
"last_attempt_ts": self._clock.time_msec(),
},
)

async def get_missing_events(
self,
room_id: str,
Expand Down
35 changes: 26 additions & 9 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2435,17 +2435,34 @@ def _update_backward_extremeties(
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
]
txn.execute_batch(
query,
[
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
],
backward_extremity_tuples_to_remove,
)

# Since we no longer need these backward extremities, it also means that
# they won't be backfilled from again so we no longer need to store the
# backfill attempts around it.
query = """
DELETE FROM event_backfill_attempts
WHERE event_id = ?
"""
backward_extremity_event_ids_to_remove = [
(extremity_tuple[0],)
for extremity_tuple in backward_extremity_tuples_to_remove
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
]
txn.execute_batch(
query,
backward_extremity_event_ids_to_remove,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


-- Add a table that keeps track of when we last tried to backfill an event. This
-- allows us to be more intelligent when we decide to retry (we don't need to
-- fail over and over) and we can process that event in the background so we
-- don't block on it each time.
CREATE TABLE IF NOT EXISTS event_backfill_attempts(
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
event_id TEXT NOT NULL,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
num_attempts INT NOT NULL,
last_attempt_ts BIGINT NOT NULL
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
);
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

CREATE UNIQUE INDEX IF NOT EXISTS event_backfill_attempts_event_id ON event_backfill_attempts(event_id);
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
220 changes: 220 additions & 0 deletions tests/handlers/test_federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,223 @@ async def get_event(destination: str, event_id: str, timeout=None):

if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()

def test_process_pulled_event_records_failed_backfill_attempts(
self,
) -> None:
"""
Test to make sure that failed backfill attempts for an event are
recorded in the `event_backfill_attempts` table.

In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event has a fake `prev_event` which our server has
obviously never seen before so it attempts to request the state at that
`prev_event` which expectedly fails because it's a fake event. Because
the server can't fetch the state at the missing `prev_event`, the
"pulled" event fails the history check and is fails to process.

We check that we correctly record the number of failed backfill attempts
to the pulled event and as a sanity check, that the "pulled" event isn't
persisted. expect.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# We expect an outbound request to /state_ids, so stub that out
self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
{
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
"pdu_ids": [],
"auth_chain_ids": [],
}
)
# We also expect an outbound request to /state
self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
StateRequestResponse(
# Mimic the other server not knowing about the state at all.
# We want to cause Synapse to throw an error (`Unable to get
# missing prev_event $fake_prev_event`) and fail to backfill
# the pulled event.
auth_events=[],
state=[],
)
)

pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [
# The fake prev event will make the pulled event fail
# the history check (`Unable to get missing prev_event
# $fake_prev_event`)
"$fake_prev_event"
],
"auth_events": [],
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure our backfill attempt was recorded
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_backfill_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)

# The function under test: try to process the pulled event again
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure our second backfill attempt was recorded (`num_attempts` was incremented)
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_backfill_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 2)

# And as a sanity check, make sure the event was not persisted through all of this.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNone(
persisted,
"pulled event that fails the history check should not be persisted at all",
)

def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
self,
) -> None:
"""
Test to make sure that failed backfill attempts
(`event_backfill_attempts` table) for an event are cleared after the
event is successfully persisted.

In this test, we pretend we are processing a "pulled" event via
backfill. The pulled event succesfully processes and the backward
extremeties are updated along with clearing out any backfill attempts
for those old extremities.

We check that we correctly cleared failed backfill attempts of the
pulled event.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)

# add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)

initial_state_map = self.get_success(
main_store.get_partial_current_state_ids(room_id)
)

auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
member_event.event_id,
]

pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [member_event.event_id],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# Fake the "pulled" event failing to backfill once so we can test
# if it's cleared out later on.
self.get_success(
main_store.record_event_backfill_attempt(pulled_event.event_id)
)
# Make sure we have a backfill attempt recorded for the pulled event
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_backfill_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
self.assertEqual(backfill_num_attempts, 1)

# The function under test: try to process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
)
)

# Make sure the backfill attempt for the pulled event are cleared
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_backfill_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
allow_none=True,
)
)
self.assertIsNone(backfill_num_attempts)

# And as a sanity check, make sure the "pulled" event was persisted.
persisted = self.get_success(
main_store.get_event(pulled_event.event_id, allow_none=True)
)
self.assertIsNotNone(persisted, "pulled event was not persisted at all")