Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track in memory events using weakrefs
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jul 29, 2021
1 parent d472a08 commit 5afe3d2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
6 changes: 5 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering", "outlier"]
__slots__ = ["_dict", "stream_ordering", "outlier", "redacted_by"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
Expand All @@ -111,6 +111,10 @@ def __init__(self, internal_metadata_dict: JsonDict):
# in the DAG)
self.outlier = False

# Whether this event has a valid redaction event pointing at it (i.e.
# whether it should be redacted before giving to clients).
self.redacted_by: Optional[str] = None

out_of_band_membership: bool = DictProperty("out_of_band_membership")
send_on_behalf_of: str = DictProperty("send_on_behalf_of")
recheck_redaction: bool = DictProperty("recheck_redaction")
Expand Down
41 changes: 38 additions & 3 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import threading
import weakref
from typing import (
Collection,
Container,
Expand Down Expand Up @@ -174,6 +175,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# Dict[str, _EventCacheEntry].
self._current_event_fetches: Dict[str, ObservableDeferred] = {}

# We keep track of the events we have currently loaded in memory so that
# we can reuse them even if they've been evicted from the cache.
self._event_ref: Dict[str, EventBase] = weakref.WeakValueDictionary()

self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
Expand Down Expand Up @@ -588,6 +593,8 @@ async def _get_events_from_cache_or_db(

def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
Expand All @@ -601,13 +608,34 @@ def _get_events_from_cache(
event_map = {}

for event_id in events:
# First check if its in the event cache
ret = self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if not ret:
continue
if ret:
event_map[event_id] = ret

# Otherwise check if we still have the event in memory.
event = self._event_ref.get(event_id)
if event:
redacted_event = None
if event.internal_metadata.redacted_by is not None:
# The event has been redacted, so we generate a redacted
# version.
redacted_event = prune_event(event)
redacted_event.unsigned[
"redacted_by"
] = event.internal_metadata.redacted_by

cache_entry = _EventCacheEntry(
event=event,
redacted_event=redacted_event,
)
event_map[event_id] = cache_entry

event_map[event_id] = ret
# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)

return event_map

Expand Down Expand Up @@ -877,13 +905,20 @@ async def _get_events_from_db(
original_ev, redactions, event_map
)

if redacted_event:
original_ev.internal_metadata.redacted_by = redacted_event.unsigned[
"redacted_by"
]

cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)

self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

self._event_ref[event_id] = original_ev

return result_map

async def _enqueue_events(self, events):
Expand Down

0 comments on commit 5afe3d2

Please sign in to comment.