Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clear event caches when we purge history #15609

Merged
merged 12 commits into from
Jun 8, 2023
1 change: 1 addition & 0 deletions changelog.d/15609.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly clear caches when we delete a room.
31 changes: 31 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,14 @@ def _invalidate_state_caches(
room_id: Room where state changed
members_changed: The user_ids of members that have changed
"""

# XXX: If you add something to this function make sure you add it to
# `_invalidate_state_caches_all` as well.

# If there were any membership changes, purge the appropriate caches.
for host in {get_domain_from_id(u) for u in members_changed}:
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
self._attempt_to_invalidate_cache("is_host_invited", (room_id, host))
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
if members_changed:
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
Expand Down Expand Up @@ -117,6 +122,32 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))

def _invalidate_state_caches_all(self, room_id: str) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.

Same as `_invalidate_state_caches`, except that works when we don't know
which memberships have changed.

Args:
room_id: Room where state changed
"""
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("is_host_invited", None)
self._attempt_to_invalidate_cache("is_host_joined", None)
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
) -> bool:
Expand Down
134 changes: 134 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
# based on the current state when notifying workers over replication.
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"

# As above, but for invalidating event caches on history deletion
PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"

# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"


class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
Expand Down Expand Up @@ -175,6 +181,23 @@ def process_replication_rows(
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for 'purge history' cache"
)

room_id = row.keys[0]
self._invalidate_caches_for_events(room_id)
elif row.cache_func == DELETE_ROOM_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for 'delete room' cache"
)

room_id = row.keys[0]
self._invalidate_caches_for_events(room_id)
self._invalidate_caches_for_room(room_id)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -226,6 +249,9 @@ def _invalidate_caches_for_event(
relates_to: Optional[str],
backfilled: bool,
) -> None:
# XXX: If you add something to this function make sure you add it to
# `_invalidate_caches_for_room` as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this accurate? You now call explictly call _invalidate_caches_for_events and _invalidate_caches_for_room in the delete room case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, that should say _invalidate_caches_for_events

# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
Expand Down Expand Up @@ -271,6 +297,106 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
self._attempt_to_invalidate_cache("get_threads", (room_id,))

def _invalidate_caches_for_events_and_stream(
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
self, txn: LoggingTransaction, room_id: str
) -> None:
"""Invalidate caches associated with events in rooms, and stream to
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
replication.

Used when we delete events in rooms, but don't know which events we've
deleted.
"""

self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_events, room_id)

def _invalidate_caches_for_events(self, room_id: str) -> None:
"""Invalidate caches associated with events in rooms, and stream to
replication.

Used when we delete events in rooms, but don't know which events we've
deleted.
"""

self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)

self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_relations_for_event", None)
self._attempt_to_invalidate_cache("get_applicable_edit", None)
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
self._attempt_to_invalidate_cache("get_thread_summary", None)
self._attempt_to_invalidate_cache("get_thread_participated", None)
self._attempt_to_invalidate_cache("get_threads", (room_id,))

self._attempt_to_invalidate_cache("_get_state_group_for_event", None)

self._attempt_to_invalidate_cache("get_event_ordering", None)
self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)

def _invalidate_caches_for_room_and_stream(
self, txn: LoggingTransaction, room_id: str
) -> None:
"""Invalidate caches associated with rooms, and stream to replication.

Used when we delete rooms.
"""

self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_room, room_id)

def _invalidate_caches_for_room(self, room_id: str) -> None:
"""Invalidate caches associated with rooms.

Used when we delete rooms.
"""

# If we've deleted the room then we also need to purge all event caches.
self._invalidate_caches_for_events(room_id)

self._attempt_to_invalidate_cache("get_account_data_for_room", None)
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache(
"_get_linearized_receipts_for_room", (room_id,)
)
self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
self._attempt_to_invalidate_cache(
"_get_partial_state_servers_at_join", (room_id,)
)
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))

# And delete state caches.

self._invalidate_state_caches_all(room_id)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
Expand Down Expand Up @@ -377,6 +503,14 @@ def _send_invalidation_to_replication(
"Can't stream invalidate all with magic current state cache"
)

if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None:
raise Exception(
"Can't stream invalidate all with magic purge history cache"
)

if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
raise Exception("Can't stream invalidate all with magic delete room cache")

if isinstance(self.database_engine, PostgresEngine):
assert self._cache_id_gen is not None

Expand Down
9 changes: 9 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,15 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _invalidate_local_get_event_cache_all(self) -> None:
"""Clears the in-memory get event caches.

Used when we purge room history.
"""
self._get_event_cache.clear()
self._event_ref.clear()
self._current_event_fetches.clear()
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
Expand Down
8 changes: 3 additions & 5 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ def _purge_history_txn(

logger.info("[purge] done")

self._invalidate_caches_for_events_and_stream(txn, room_id)

return referenced_state_groups

async def purge_room(self, room_id: str) -> List[int]:
Expand Down Expand Up @@ -485,10 +487,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)

# TODO: we could probably usefully do a bunch more cache invalidation here

# XXX: as with purge_history, this is racy, but no worse than other races
# that already exist.
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
self._invalidate_caches_for_room_and_stream(txn, room_id)

return state_groups
2 changes: 1 addition & 1 deletion synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,5 +864,5 @@ def invalidate_local(self, key: KT) -> None:
async def contains(self, key: KT) -> bool:
return self._lru_cache.contains(key)

async def clear(self) -> None:
def clear(self) -> None:
self._lru_cache.clear()
2 changes: 1 addition & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def test_unknown_room_version(self) -> None:
# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()
self.store._event_ref.clear()

# The rooms should be excluded from the sync response.
Expand Down
3 changes: 0 additions & 3 deletions tests/rest/client/test_read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ def send_message() -> str:
event = self.get_success(self.store.get_event(event_id_1, allow_none=True))
assert event is None

# TODO See https://github.com/matrix-org/synapse/issues/13476
self.store.get_event_ordering.invalidate_all()

# Test moving the read marker to a newer event
event_id_2 = send_message()
channel = self.make_request(
Expand Down
8 changes: 4 additions & 4 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

def test_simple(self) -> None:
"""Test that we cache events that we pull from the DB."""
Expand All @@ -205,7 +205,7 @@ def test_event_ref(self) -> None:
"""

# Reset the event cache
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
Expand All @@ -215,7 +215,7 @@ def test_event_ref(self) -> None:
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

# Reset the event cache
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))
Expand Down Expand Up @@ -390,7 +390,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

@contextmanager
def blocking_get_event_calls(
Expand Down