-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve perf of sync device lists #17216
Merged
Merged
Changes from 4 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
bec0313
Improve perf of sync device lists (#17191)
erikjohnston 5b2b312
Cap the top stream ID when fetching changed devices
erikjohnston cf474a0
Add stream change cache for device lists in room
erikjohnston e6d3d80
Newsfile
erikjohnston 6a0d2dc
Only check for all rooms if not outbound poke
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve performance of calculating device lists changes in `/sync`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,10 +70,7 @@ | |
from synapse.util import json_decoder, json_encoder | ||
from synapse.util.caches.descriptors import cached, cachedList | ||
from synapse.util.caches.lrucache import LruCache | ||
from synapse.util.caches.stream_change_cache import ( | ||
AllEntitiesChangedResult, | ||
StreamChangeCache, | ||
) | ||
from synapse.util.caches.stream_change_cache import StreamChangeCache | ||
from synapse.util.cancellation import cancellable | ||
from synapse.util.iterutils import batch_iter | ||
from synapse.util.stringutils import shortstr | ||
|
@@ -132,6 +129,20 @@ def __init__( | |
prefilled_cache=device_list_prefill, | ||
) | ||
|
||
device_list_room_prefill, min_device_list_room_id = self.db_pool.get_cache_dict( | ||
db_conn, | ||
"device_lists_changes_in_room", | ||
entity_column="room_id", | ||
stream_column="stream_id", | ||
max_value=device_list_max, | ||
limit=10000, | ||
) | ||
self._device_list_room_stream_cache = StreamChangeCache( | ||
"DeviceListRoomStreamChangeCache", | ||
min_device_list_room_id, | ||
prefilled_cache=device_list_room_prefill, | ||
) | ||
|
||
( | ||
user_signature_stream_prefill, | ||
user_signature_stream_list_id, | ||
|
@@ -209,6 +220,13 @@ def _invalidate_caches_for_devices( | |
row.entity, token | ||
) | ||
|
||
def device_lists_in_rooms_have_changed( | ||
self, room_ids: StrCollection, token: int | ||
) -> None: | ||
"Record that device lists have changed in rooms" | ||
for room_id in room_ids: | ||
self._device_list_room_stream_cache.entity_has_changed(room_id, token) | ||
|
||
def get_device_stream_token(self) -> int: | ||
return self._device_list_id_gen.get_current_token() | ||
|
||
|
@@ -832,16 +850,6 @@ async def get_cached_devices_for_user( | |
) | ||
return {device[0]: db_to_json(device[1]) for device in devices} | ||
|
||
def get_cached_device_list_changes( | ||
self, | ||
from_key: int, | ||
) -> AllEntitiesChangedResult: | ||
"""Get set of users whose devices have changed since `from_key`, or None | ||
if that information is not in our cache. | ||
""" | ||
|
||
return self._device_list_stream_cache.get_all_entities_changed(from_key) | ||
|
||
@cancellable | ||
async def get_all_devices_changed( | ||
self, | ||
|
@@ -1457,7 +1465,7 @@ async def _get_min_device_lists_changes_in_room(self) -> int: | |
|
||
@cancellable | ||
async def get_device_list_changes_in_rooms( | ||
self, room_ids: Collection[str], from_id: int | ||
self, room_ids: Collection[str], from_id: int, to_id: int | ||
) -> Optional[Set[str]]: | ||
"""Return the set of users whose devices have changed in the given rooms | ||
since the given stream ID. | ||
|
@@ -1473,9 +1481,15 @@ async def get_device_list_changes_in_rooms( | |
if min_stream_id > from_id: | ||
return None | ||
|
||
changed_room_ids = self._device_list_room_stream_cache.get_entities_changed( | ||
room_ids, from_id | ||
) | ||
if not changed_room_ids: | ||
return set() | ||
|
||
sql = """ | ||
SELECT DISTINCT user_id FROM device_lists_changes_in_room | ||
WHERE {clause} AND stream_id >= ? | ||
WHERE {clause} AND stream_id > ? AND stream_id <= ? | ||
""" | ||
|
||
def _get_device_list_changes_in_rooms_txn( | ||
|
@@ -1487,11 +1501,12 @@ def _get_device_list_changes_in_rooms_txn( | |
return {user_id for user_id, in txn} | ||
|
||
changes = set() | ||
for chunk in batch_iter(room_ids, 1000): | ||
for chunk in batch_iter(changed_room_ids, 1000): | ||
clause, args = make_in_list_sql_clause( | ||
self.database_engine, "room_id", chunk | ||
) | ||
args.append(from_id) | ||
args.append(to_id) | ||
|
||
changes |= await self.db_pool.runInteraction( | ||
"get_device_list_changes_in_rooms", | ||
|
@@ -1502,6 +1517,34 @@ def _get_device_list_changes_in_rooms_txn( | |
|
||
return changes | ||
|
||
async def get_all_device_list_changes(self, from_id: int, to_id: int) -> Set[str]: | ||
"""Return the set of rooms where devices have changed since the given | ||
stream ID. | ||
|
||
Will raise an exception if the given stream ID is too old. | ||
""" | ||
|
||
min_stream_id = await self._get_min_device_lists_changes_in_room() | ||
|
||
if min_stream_id > from_id: | ||
raise Exception("stream ID is too old") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not really materially important, but feels like this should be a more specific exception type |
||
|
||
sql = """ | ||
SELECT DISTINCT room_id FROM device_lists_changes_in_room | ||
WHERE stream_id > ? AND stream_id <= ? | ||
""" | ||
|
||
def _get_all_device_list_changes_txn( | ||
txn: LoggingTransaction, | ||
) -> Set[str]: | ||
txn.execute(sql, (from_id, to_id)) | ||
return {room_id for room_id, in txn} | ||
|
||
return await self.db_pool.runInteraction( | ||
"get_all_device_list_changes", | ||
_get_all_device_list_changes_txn, | ||
) | ||
|
||
async def get_device_list_changes_in_room( | ||
self, room_id: str, min_stream_id: int | ||
) -> Collection[Tuple[str, str]]: | ||
|
@@ -1962,8 +2005,8 @@ def _update_remote_device_list_cache_txn( | |
async def add_device_change_to_streams( | ||
self, | ||
user_id: str, | ||
device_ids: Collection[str], | ||
room_ids: Collection[str], | ||
device_ids: StrCollection, | ||
room_ids: StrCollection, | ||
) -> Optional[int]: | ||
"""Persist that a user's devices have been updated, and which hosts | ||
(if any) should be poked. | ||
|
@@ -2122,8 +2165,8 @@ def _add_device_outbound_room_poke_txn( | |
self, | ||
txn: LoggingTransaction, | ||
user_id: str, | ||
device_ids: Iterable[str], | ||
room_ids: Collection[str], | ||
device_ids: StrCollection, | ||
room_ids: StrCollection, | ||
stream_ids: List[int], | ||
context: Dict[str, str], | ||
) -> None: | ||
|
@@ -2161,6 +2204,10 @@ def _add_device_outbound_room_poke_txn( | |
], | ||
) | ||
|
||
txn.call_after( | ||
self.device_lists_in_rooms_have_changed, room_ids, max(stream_ids) | ||
) | ||
|
||
async def get_uncoverted_outbound_room_pokes( | ||
self, start_stream_id: int, start_room_id: str, limit: int = 10 | ||
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not that familiar as always, but it sounds unintuitive to get the latest stream token and call it
prev_token
— what's going on here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this really needs a comment. Basically at this point we haven't updated the id generators, and so they don't know about the new stuff that has just come in (that happens in process_replication_position). What we're basically doing is getting an ID of "this is where we're currently at", so that we can get the deltas from the DB for what has changed since then upto the new token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup OK I was hoping it was something like that, but a comment would be good to avoid wtfery