Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use stream cache in get_linearized_receipts_for_room #3505

Merged
merged 4 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3505.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database consumption when processing large numbers of receipts
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def stream_positions(self):

def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
self.get_linearized_receipts_for_room.invalidate_many((room_id,))
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
Expand Down
23 changes: 18 additions & 5 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
"""
room_ids = set(room_ids)

if from_key:
if from_key is not None:
# Only ask the database about rooms where there have been new
# receipts added since `from_key`
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
Expand All @@ -151,7 +153,6 @@ def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):

defer.returnValue([ev for res in results.values() for ev in res])

@cachedInlineCallbacks(num_args=3, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.

Expand All @@ -164,6 +165,18 @@ def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
Returns:
list: A list of receipts.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you clarify that this is a Deferred[list] ?

"""
if from_key is not None:
# Check the cache first to see if any new receipts have been added
# since`from_key`. If not we can no-op.
if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a comment here please? why is this a sensible thing to do?

defer.succeed([])

return self._get_linearized_receipts_for_room(room_id, to_key, from_key)

@cachedInlineCallbacks(num_args=3, tree=True)
def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""See get_linearized_receipts_for_room
"""
def f(txn):
if from_key:
sql = (
Expand Down Expand Up @@ -211,7 +224,7 @@ def f(txn):
"content": content,
}])

@cachedList(cached_method_name="get_linearized_receipts_for_room",
@cachedList(cached_method_name="_get_linearized_receipts_for_room",
list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
Expand Down Expand Up @@ -373,7 +386,7 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))

txn.call_after(
self._receipts_stream_cache.entity_has_changed,
Expand Down Expand Up @@ -493,7 +506,7 @@ def insert_graph_receipt_txn(self, txn, room_id, receipt_type,
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))

self._simple_delete_txn(
txn,
Expand Down