Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve event caching code #10119

Merged
merged 16 commits into from
Aug 4, 2021
Merged
1 change: 1 addition & 0 deletions changelog.d/10119.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Try and ensure we only have one copy of an event in memory at a time.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering", "outlier"]
__slots__ = ["_dict", "stream_ordering", "outlier", "redacted_by"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
Expand All @@ -111,6 +111,10 @@ def __init__(self, internal_metadata_dict: JsonDict):
# in the DAG)
self.outlier = False

# Whether this event has a valid redaction event pointing at it (i.e.
# whether it should be redacted before giving to clients).
self.redacted_by: Optional[str] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm super dubious about stuffing more things in here that don't form part of the internal_metadata column in the database.

Should we start a new attrs class for things like this, stream_ordering and outlier? (it also looks like before, after, order might fall into the same category).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: it seems confusing to have both a redacted_by and a redacted property.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be split out into its own PR.


out_of_band_membership: bool = DictProperty("out_of_band_membership")
send_on_behalf_of: str = DictProperty("send_on_behalf_of")
recheck_redaction: bool = DictProperty("recheck_redaction")
Expand Down
172 changes: 132 additions & 40 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import logging
import threading
from collections import namedtuple
import weakref
from typing import (
Collection,
Container,
Expand All @@ -27,6 +27,7 @@
overload,
)

import attr
from constantly import NamedConstant, Names
from typing_extensions import Literal

Expand All @@ -42,7 +43,11 @@
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.logging.context import (
PreserveLoggingContext,
current_context,
make_deferred_yieldable,
)
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
Expand All @@ -56,6 +61,8 @@
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
Expand All @@ -74,7 +81,10 @@
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events


_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
@attr.s(slots=True, auto_attribs=True)
class _EventCacheEntry:
event: EventBase
redacted_event: Optional[EventBase]


class EventRedactBehaviour(Names):
Expand Down Expand Up @@ -161,6 +171,14 @@ def __init__(self, database: DatabasePool, db_conn, hs):
max_size=hs.config.caches.event_cache_size,
)

# Map from event ID to a deferred that will result in an
# Dict[str, _EventCacheEntry].
self._current_event_fetches: Dict[str, ObservableDeferred] = {}

# We keep track of the events we have currently loaded in memory so that
# we can reuse them even if they've been evicted from the cache.
self._event_ref: Dict[str, EventBase] = weakref.WeakValueDictionary()

self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
Expand Down Expand Up @@ -476,7 +494,9 @@ async def get_events_as_list(

return events

async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
async def _get_events_from_cache_or_db(
self, event_ids: Iterable[str], allow_rejected: bool = False
) -> Dict[str, _EventCacheEntry]:
"""Fetch a bunch of events from the cache or the database.

If events are pulled from the database, they will be cached for future lookups.
Expand All @@ -485,20 +505,48 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):

Args:

event_ids (Iterable[str]): The event_ids of the events to fetch
event_ids: The event_ids of the events to fetch

allow_rejected (bool): Whether to include rejected events. If False,
allow_rejected: Whether to include rejected events. If False,
rejected events are omitted from the response.

Returns:
Dict[str, _EventCacheEntry]:
map from event id to result
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_ids, allow_rejected=allow_rejected
event_ids,
)

missing_events_ids = [e for e in event_ids if e not in event_entry_map]
missing_events_ids = {e for e in event_ids if e not in event_entry_map}

# We now look up if we're already fetching some of the events in the DB,
# if so we wait for those lookups to finish instead of pulling the same
# events out of the DB multiple times.
already_fetching: Dict[str, defer.Deferred] = {}

# We also add entries to `self._current_event_fetches` for each event
# we're going to pull from the DB. We use a single deferred that
# resolves to all the events we pulled from the DB (this will result in
# this function returning more events than requested, but that can
# happen already due to `_get_events_from_db`).
fetching_deferred = ObservableDeferred(defer.Deferred())
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

for event_id in missing_events_ids:
deferred = self._current_event_fetches.get(event_id)
if deferred is not None:
# We're already pulling the event out of the DB, ad the deferred
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# to the collection of deferreds to wait on.
already_fetching[event_id] = deferred.observe()
else:
# We're not already pulling the event from the DB, so add our
# deferred to the the map of events that are being fetched.
self._current_event_fetches[event_id] = fetching_deferred
fetching_deferred.observe().addBoth(
lambda _, event_id: self._current_event_fetches.pop(event_id, None),
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
event_id,
)

missing_events_ids.difference_update(already_fetching)

if missing_events_ids:
log_ctx = current_context()
Expand All @@ -509,43 +557,85 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
missing_events = await self._get_events_from_db(
missing_events_ids, allow_rejected=allow_rejected
)
try:
missing_events = await self._get_events_from_db(
missing_events_ids,
)

event_entry_map.update(missing_events)
except Exception as e:
fetching_deferred.errback(e)
raise e

fetching_deferred.callback(missing_events)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

if already_fetching:
# Wait for the other event requests to finish and add their results
# to ours.
results = await make_deferred_yieldable(
defer.gatherResults(
already_fetching.values(),
consumeErrors=True,
)
).addErrback(unwrapFirstError)

event_entry_map.update(missing_events)
for result in results:
event_entry_map.update(result)

if not allow_rejected:
event_entry_map = {
event_id: entry
for event_id, entry in event_entry_map.items()
if not entry.event.rejected_reason
}

return event_entry_map

def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, _EventCacheEntry]:
"""Fetch events from the caches
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

Args:
events (Iterable[str]): list of event_ids to fetch
allow_rejected (bool): Whether to return events that were rejected
update_metrics (bool): Whether to update the cache hit ratio metrics

Returns:
dict of event_id -> _EventCacheEntry for each event_id in cache. If
allow_rejected is `False` then there will still be an entry but it
will be `None`
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = {}

for event_id in events:
# First check if its in the event cache
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
ret = self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if not ret:
continue

if allow_rejected or not ret.event.rejected_reason:
if ret:
event_map[event_id] = ret
else:
event_map[event_id] = None

# Otherwise check if we still have the event in memory.
event = self._event_ref.get(event_id)
if event:
redacted_event = None
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
if event.internal_metadata.redacted_by is not None:
# The event has been redacted, so we generate a redacted
# version.
redacted_event = prune_event(event)
redacted_event.unsigned[
"redacted_by"
] = event.internal_metadata.redacted_by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need redacted_because here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be split out into its own PR.


cache_entry = _EventCacheEntry(
event=event,
redacted_event=redacted_event,
)
event_map[event_id] = cache_entry

# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)

return event_map

Expand Down Expand Up @@ -672,23 +762,21 @@ def fire(evs, exc):
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list, e)

async def _get_events_from_db(self, event_ids, allow_rejected=False):
async def _get_events_from_db(
self, event_ids: Iterable[str]
) -> Dict[str, _EventCacheEntry]:
"""Fetch a bunch of events from the database.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

Returned events will be added to the cache for future lookups.

Unknown events are omitted from the response.

Args:
event_ids (Iterable[str]): The event_ids of the events to fetch

allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
event_ids: The event_ids of the events to fetch

Returns:
Dict[str, _EventCacheEntry]:
map from event id to result. May return extra events which
weren't asked for.
map from event id to result. May return extra events which
weren't asked for.
"""
fetched_events = {}
events_to_fetch = event_ids
Expand Down Expand Up @@ -717,9 +805,6 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):

rejected_reason = row["rejected_reason"]

if not allow_rejected and rejected_reason:
continue

# If the event or metadata cannot be parsed, log the error and act
# as if the event is unknown.
try:
Expand Down Expand Up @@ -820,13 +905,20 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):
original_ev, redactions, event_map
)

if redacted_event:
original_ev.internal_metadata.redacted_by = redacted_event.unsigned[
"redacted_by"
]

cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)

self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

self._event_ref[event_id] = original_ev

return result_map

async def _enqueue_events(self, events):
Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,12 @@ async def _get_joined_users_from_context(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(
member_event_ids, allow_rejected=False, update_metrics=False
)
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)

missing_member_event_ids = []
for event_id in member_event_ids:
ev_entry = event_map.get(event_id)
if ev_entry:
if ev_entry and not ev_entry.event.rejected_reason:
if ev_entry.event.membership == Membership.JOIN:
users_in_room[ev_entry.event.state_key] = ProfileInfo(
display_name=ev_entry.event.content.get("displayname", None),
Expand Down
Loading