Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix have_seen_event cache not being invalidated
Browse files Browse the repository at this point in the history
Fix #13856

`_invalidate_caches_for_event` doesn't run in monolith mode which
means we never even tried to clear the `have_seen_event` and other
caches. And even in worker mode, it only runs on the workers, not
the master (AFAICT).

Additionally there is bug with the key being wrong so
`_invalidate_caches_for_event` never invalidates the
`have_seen_event` cache even when it does run.

Wrong:
```py
self.have_seen_event.invalidate((room_id, event_id))
```

Correct:
```py
self.have_seen_event.invalidate(((room_id, event_id),))
```
  • Loading branch information
MadLittleMods committed Sep 21, 2022
1 parent 1a1abdd commit a847a35
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 50 deletions.
34 changes: 33 additions & 1 deletion synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
Expand Down Expand Up @@ -431,6 +431,22 @@ async def enqueue(
else:
events.append(event)

# We expect events to be persisted by this point and this makes
# mypy happy about `stream_ordering` not being optional below
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)

return (
events,
self.main_store.get_room_max_token(),
Expand Down Expand Up @@ -463,6 +479,22 @@ async def persist_event(
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
else:
# We expect events to be persisted by this point and this makes
# mypy happy about `stream_ordering` not being optional below
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)

event_stream_id = event.internal_metadata.stream_ordering
# stream ordering should have been assigned by now
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _invalidate_caches_for_event(
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))
self.have_seen_event.invalidate(((room_id, event_id),))

self.get_latest_event_ids_in_room.invalidate((room_id,))

Expand Down
120 changes: 72 additions & 48 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,66 +35,45 @@
from synapse.util.async_helpers import yieldable_gather_results

from tests import unittest
from tests.test_utils.event_injection import create_event, inject_event


class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor, clock, hs):
self.hs = hs
self.store: EventsWorkerStore = hs.get_datastores().main

# insert some test data
for rid in ("room1", "room2"):
self.get_success(
self.store.db_pool.simple_insert(
"rooms",
{"room_id": rid, "room_version": 4},
)
)
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room_id = self.helper.create_room_as(self.user, tok=self.token)

self.event_ids: List[str] = []
for idx, rid in enumerate(
(
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id

self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": event_id,
"room_id": rid,
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
for i in range(3):
event = self.get_success(
inject_event(
hs,
room_version=RoomVersions.V7.identifier,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": f"foobarbaz{i}"},
)
)
self.event_ids.append(event_id)

self.event_ids.append(event.event_id)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})

Expand All @@ -104,7 +83,9 @@ def test_simple(self):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
Expand All @@ -116,11 +97,54 @@ def test_query_via_event_cache(self):
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_persisting_event_invalidates_cache(self):
event, event_context = self.get_success(
create_event(
self.hs,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": "garply"},
)
)

with LoggingContext(name="test") as ctx:
# First, check `have_seen_event` for an event we have not seen yet
# to prime the cache with a `false` value.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, set())

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)

# Persist the event which should invalidate or prefill the
# `have_seen_event` cache so we don't return stale values.
persistence = self.hs.get_storage_controllers().persistence
self.get_success(
persistence.persist_event(
event,
event_context,
)
)

with LoggingContext(name="test") as ctx:
# Check `have_seen_event` again and we should see the updated fact
# that we have now seen the event after persisting it.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, {event.event_id})

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)


class EventCacheTestCase(unittest.HomeserverTestCase):
"""Test that the various layers of event cache works."""
Expand Down

0 comments on commit a847a35

Please sign in to comment.