diff --git a/changelog.d/10695.bugfix b/changelog.d/10695.bugfix new file mode 100644 index 000000000000..b494935a5d3c --- /dev/null +++ b/changelog.d/10695.bugfix @@ -0,0 +1 @@ +Fix leaking per-room nicknames and avatars to the user directory for local users when they update their per-room nickname or avatar. diff --git a/docs/user_directory.md b/docs/user_directory.md index d4f38d2cf11a..5ff14e334c4b 100644 --- a/docs/user_directory.md +++ b/docs/user_directory.md @@ -10,3 +10,32 @@ DB corruption) get stale or out of sync. If this happens, for now the solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql) and then restart synapse. This should then start a background task to flush the current tables and regenerate the directory. + +Data model +---------- + +There are five relevant tables: + +* `user_directory`. This contains the user_id, display name and avatar we'll + return when you search the directory. + - Because there's only one directory entry per user, it's important that we only + ever put publicly visible names here. Otherwise we might leak a private + nickname or avatar used in a private room. + - Indexed on rooms. Indexed on users. + +* `user_directory_search`. To be joined to `user_directory`. It contains an extra + column that enables full text search based on user ids and display names. + Different schemas for SQLite and Postgres with different code paths to match. + - Indexed on the full text search data. Indexed on users. + +* `user_directory_stream_pos`. When the initial background update to populate + the directory is complete, we record a stream position here. This indicates + that synapse should now listen for room changes and incrementally update + the directory where necessary. + +* `users_in_public_rooms`. Contains associations between users and the public rooms they're in. + Used to determine which users are in public rooms and should be publicly visible in the directory. + +* `users_who_share_private_rooms`. Rows are triples `(L, M, room id)` where `L` + is a local user and `M` is a local or remote user. `L` and `M` should be + different, but this isn't enforced by a constraint. diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 45d2404ddebf..99e12e2d9a07 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -131,7 +131,7 @@ async def deactivate_account( await self.store.add_user_pending_deactivation(user_id) # delete from user directory - await self.user_directory_handler.handle_user_deactivated(user_id) + await self.user_directory_handler.handle_local_user_deactivated(user_id) # Mark the user as erased, if they asked for that if erase_data: @@ -248,7 +248,7 @@ async def activate_account(self, user_id: str) -> None: This marks the user as active and not erased in the database, but does not attempt to rejoin rooms, re-add threepids, etc. - If enabled, the user will be re-added to the user directory. + The user will be re-added to the user directory. The user will also need a password hash set to actually login. @@ -257,11 +257,8 @@ async def activate_account(self, user_id: str) -> None: """ # Add the user to the directory, if necessary. user = UserID.from_string(user_id) - if self.hs.config.user_directory_search_all_users: - profile = await self.store.get_profileinfo(user.localpart) - await self.user_directory_handler.handle_local_profile_change( - user_id, profile - ) + profile = await self.store.get_profileinfo(user.localpart) + await self.user_directory_handler.handle_local_profile_change(user_id, profile) # Ensure the user is not marked as erased. await self.store.mark_user_not_erased(user_id) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 20a033d0ba5f..c4fd5f243339 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -214,11 +214,10 @@ async def set_displayname( target_user.localpart, displayname_to_set ) - if self.hs.config.user_directory_search_all_users: - profile = await self.store.get_profileinfo(target_user.localpart) - await self.user_directory_handler.handle_local_profile_change( - target_user.to_string(), profile - ) + profile = await self.store.get_profileinfo(target_user.localpart) + await self.user_directory_handler.handle_local_profile_change( + target_user.to_string(), profile + ) await self._update_join_states(requester, target_user) @@ -300,11 +299,10 @@ async def set_avatar_url( target_user.localpart, avatar_url_to_set ) - if self.hs.config.user_directory_search_all_users: - profile = await self.store.get_profileinfo(target_user.localpart) - await self.user_directory_handler.handle_local_profile_change( - target_user.to_string(), profile - ) + profile = await self.store.get_profileinfo(target_user.localpart) + await self.user_directory_handler.handle_local_profile_change( + target_user.to_string(), profile + ) await self._update_join_states(requester, target_user) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 0ed59d757bf7..192edb205539 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -289,11 +289,10 @@ async def register_user( shadow_banned=shadow_banned, ) - if self.hs.config.user_directory_search_all_users: - profile = await self.store.get_profileinfo(localpart) - await self.user_directory_handler.handle_local_profile_change( - user_id, profile - ) + profile = await self.store.get_profileinfo(localpart) + await self.user_directory_handler.handle_local_profile_change( + user_id, profile + ) else: # autogen a sequential user ID diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index 077c7c064965..d30ba2b7248c 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from enum import Enum, auto from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: @@ -21,6 +22,12 @@ logger = logging.getLogger(__name__) +class MatchChange(Enum): + no_change = auto() + now_true = auto() + now_false = auto() + + class StateDeltasHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() @@ -31,18 +38,12 @@ async def _get_key_change( event_id: Optional[str], key_name: str, public_value: str, - ) -> Optional[bool]: + ) -> MatchChange: """Given two events check if the `key_name` field in content changed from not matching `public_value` to doing so. For example, check if `history_visibility` (`key_name`) changed from `shared` to `world_readable` (`public_value`). - - Returns: - None if the field in the events either both match `public_value` - or if neither do, i.e. there has been no change. - True if it didn't match `public_value` but now does - False if it did match `public_value` but now doesn't """ prev_event = None event = None @@ -54,7 +55,7 @@ async def _get_key_change( if not event and not prev_event: logger.debug("Neither event exists: %r %r", prev_event_id, event_id) - return None + return MatchChange.no_change prev_value = None value = None @@ -68,8 +69,8 @@ async def _get_key_change( logger.debug("prev_value: %r -> value: %r", prev_value, value) if value == public_value and prev_value != public_value: - return True + return MatchChange.now_true elif value != public_value and prev_value == public_value: - return False + return MatchChange.now_false else: - return None + return MatchChange.no_change diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 6edb1da50a98..0817a9ad5ad2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -13,11 +13,11 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional import synapse.metrics from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership -from synapse.handlers.state_deltas import StateDeltasHandler +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo from synapse.types import JsonDict @@ -30,14 +30,26 @@ class UserDirectoryHandler(StateDeltasHandler): - """Handles querying of and keeping updated the user_directory. + """Handles queries and updates for the user_directory. N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY - The user directory is filled with users who this server can see are joined to a - world_readable or publicly joinable room. We keep a database table up to date - by streaming changes of the current state and recalculating whether users should - be in the directory or not when necessary. + When a local user searches the user_directory, we report two kinds of users: + + - users this server can see are joined to a world_readable or publicly + joinable room, and + - users belonging to a private room shared by that local user. + + The two cases are tracked separately in the `users_in_public_rooms` and + `users_who_share_private_rooms` tables. Both kinds of users have their + username and avatar tracked in a `user_directory` table. + + This handler has three responsibilities: + 1. Forwarding requests to `/user_directory/search` to the UserDirectoryStore. + 2. Providing hooks for the application to call when local users are added, + removed, or have their profile changed. + 3. Listening for room state changes that indicate remote users have + joined or left a room, or that their profile has changed. """ def __init__(self, hs: "HomeServer"): @@ -94,23 +106,6 @@ async def search_users( return results - def notify_new_event(self) -> None: - """Called when there may be more deltas to process""" - if not self.update_user_directory: - return - - if self._is_processing: - return - - async def process(): - try: - await self._unsafe_process() - finally: - self._is_processing = False - - self._is_processing = True - run_as_background_process("user_directory.notify_new_event", process) - async def handle_local_profile_change( self, user_id: str, profile: ProfileInfo ) -> None: @@ -130,12 +125,29 @@ async def handle_local_profile_change( user_id, profile.display_name, profile.avatar_url ) - async def handle_user_deactivated(self, user_id: str) -> None: + async def handle_local_user_deactivated(self, user_id: str) -> None: """Called when a user ID is deactivated""" # FIXME(#3714): We should probably do this in the same worker as all # the other changes. await self.store.remove_from_user_dir(user_id) + def notify_new_event(self) -> None: + """Called when there may be more deltas to process""" + if not self.update_user_directory: + return + + if self._is_processing: + return + + async def process(): + try: + await self._unsafe_process() + finally: + self._is_processing = False + + self._is_processing = True + run_as_background_process("user_directory.notify_new_event", process) + async def _unsafe_process(self) -> None: # If self.pos is None then means we haven't fetched it from DB if self.pos is None: @@ -160,7 +172,8 @@ async def _unsafe_process(self) -> None: ) logger.debug("Handling %d state deltas", len(deltas)) - await self._handle_deltas(deltas) + for delta in deltas: + await self._handle_delta(delta) self.pos = max_pos @@ -171,78 +184,89 @@ async def _unsafe_process(self) -> None: await self.store.update_user_directory_stream_pos(max_pos) - async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: - """Called with the state deltas to process""" - for delta in deltas: - typ = delta["type"] - state_key = delta["state_key"] - room_id = delta["room_id"] - event_id = delta["event_id"] - prev_event_id = delta["prev_event_id"] - - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) - - # For join rule and visibility changes we need to check if the room - # may have become public or not and add/remove the users in said room - if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): - await self._handle_room_publicity_change( - room_id, prev_event_id, event_id, typ - ) - elif typ == EventTypes.Member: - change = await self._get_key_change( - prev_event_id, - event_id, - key_name="membership", - public_value=Membership.JOIN, - ) + async def _handle_delta(self, delta: Dict[str, Any]) -> None: + """Called with an individual state delta to process""" + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room + if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): + await self._handle_room_publicity_change( + room_id, prev_event_id, event_id, typ + ) + elif typ == EventTypes.Member: + if await self._user_omitted_from_directory(state_key): + return - if change is False: - # Need to check if the server left the room entirely, if so - # we might need to remove all the users in that room - is_in_room = await self.store.is_host_joined( - room_id, self.server_name + joined = await self._get_key_change( + prev_event_id, + event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + + if joined is MatchChange.now_false: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = await self.store.is_host_joined(room_id, self.server_name) + if not is_in_room: + logger.debug("Server left room: %r", room_id) + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = await self.store.get_users_in_dir_due_to_room(room_id) + + for user_id in user_ids: + await self._handle_remove_user(room_id, user_id) + else: + logger.debug("Server is still in room: %r", room_id) + await self._handle_remove_user(room_id, state_key) + + elif joined is MatchChange.no_change: + # Handle any profile changes for remote users + if not self.is_mine_id(state_key): + await self._handle_remote_possible_profile_change( + state_key, prev_event_id, event_id ) - if not is_in_room: - logger.debug("Server left room: %r", room_id) - # Fetch all the users that we marked as being in user - # directory due to being in the room and then check if - # need to remove those users or not - user_ids = await self.store.get_users_in_dir_due_to_room( - room_id - ) - - for user_id in user_ids: - await self._handle_remove_user(room_id, user_id) - return - else: - logger.debug("Server is still in room: %r", room_id) - - is_support = await self.store.is_support_user(state_key) - if not is_support: - if change is None: - # Handle any profile changes - await self._handle_profile_change( - state_key, room_id, prev_event_id, event_id - ) - continue - if change: # The user joined - event = await self.store.get_event(event_id, allow_none=True) - # It isn't expected for this event to not exist, but we - # don't want the entire background process to break. - if event is None: - continue + elif joined is MatchChange.now_true: # The user joined + await self._track_user_joined_room(room_id, state_key) - profile = ProfileInfo( - avatar_url=event.content.get("avatar_url"), - display_name=event.content.get("displayname"), - ) + event = await self.store.get_event(event_id, allow_none=True) + # It isn't expected for this event to not exist, but we + # don't want the entire background process to break. + if event is None: + return + + # TODO this leaks per-room profiles to the directory. + profile = ProfileInfo( + avatar_url=event.content.get("avatar_url"), + display_name=event.content.get("displayname"), + ) - await self._handle_new_user(room_id, state_key, profile) - else: # The user left - await self._handle_remove_user(room_id, state_key) - else: - logger.debug("Ignoring irrelevant type: %r", typ) + if not self.is_mine_id(state_key): + logger.debug("Adding new user to dir, %r", state_key) + await self.store.update_profile_in_user_dir( + state_key, profile.display_name, profile.avatar_url + ) + else: + logger.debug("Ignoring irrelevant type: %r", typ) + + async def _user_omitted_from_directory(self, user_id: str) -> bool: + """We want to ignore events from "hidden" users who shouldn't be exposed + to real users.""" + if await self.store.is_support_user(user_id): + return True + if self.store.get_if_app_services_interested_in_user(user_id): + return True + + return False async def _handle_room_publicity_change( self, @@ -263,14 +287,14 @@ async def _handle_room_publicity_change( logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: - change = await self._get_key_change( + publicity = await self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value=HistoryVisibility.WORLD_READABLE, ) elif typ == EventTypes.JoinRules: - change = await self._get_key_change( + publicity = await self._get_key_change( prev_event_id, event_id, key_name="join_rule", @@ -278,9 +302,7 @@ async def _handle_room_publicity_change( ) else: raise Exception("Invalid event type") - # If change is None, no change. True => become world_readable/public, - # False => was world_readable/public - if change is None: + if publicity is MatchChange.no_change: logger.debug("No change") return @@ -290,111 +312,95 @@ async def _handle_room_publicity_change( room_id ) - logger.debug("Change: %r, is_public: %r", change, is_public) + logger.debug("Change: %r, is_public: %r", publicity, is_public) - if change and not is_public: + if publicity is MatchChange.now_true and not is_public: # If we became world readable but room isn't currently public then # we ignore the change return - elif not change and is_public: + elif publicity is MatchChange.now_false and is_public: # If we stopped being world readable but are still public, # ignore the change return - other_users_in_room_with_profiles = ( - await self.store.get_users_in_room_with_profiles(room_id) - ) + other_users_in_room = await self.store.get_users_in_room(room_id) # Remove every user from the sharing tables for that room. - for user_id in other_users_in_room_with_profiles.keys(): + for user_id in other_users_in_room: await self.store.remove_user_who_share_room(user_id, room_id) # Then, re-add them to the tables. - # NOTE: this is not the most efficient method, as handle_new_user sets + # NOTE: this is not the most efficient, as _track_user_joined_room sets # up local_user -> other_user and other_user_whos_local -> local_user, # which when ran over an entire room, will result in the same values # being added multiple times. The batching upserts shouldn't make this # too bad, though. - for user_id, profile in other_users_in_room_with_profiles.items(): - await self._handle_new_user(room_id, user_id, profile) - - async def _handle_new_user( - self, room_id: str, user_id: str, profile: ProfileInfo - ) -> None: - """Called when we might need to add user to directory - - Args: - room_id: The room ID that user joined or started being public - user_id - """ - logger.debug("Adding new user to dir, %r", user_id) - - await self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) + for user_id in other_users_in_room: + await self._track_user_joined_room(room_id, user_id) + async def _track_user_joined_room(self, room_id: str, user_id: str) -> None: + """Someone's just joined a room. Add to `users_in_public_rooms` and + `users_who_share_private_rooms` if necessary.""" is_public = await self.store.is_room_world_readable_or_publicly_joinable( room_id ) - # Now we update users who share rooms with users. - other_users_in_room = await self.store.get_users_in_room(room_id) - if is_public: await self.store.add_users_in_public_rooms(room_id, (user_id,)) else: + other_users_in_room = await self.store.get_users_in_room(room_id) + other_users_in_room = [ + other_user + for other_user in other_users_in_room + if not await self._user_omitted_from_directory(other_user) + ] + to_insert = set() # First, if they're our user then we need to update for every user if self.is_mine_id(user_id): + for other_user_id in other_users_in_room: + if user_id == other_user_id: + continue - is_appservice = self.store.get_if_app_services_interested_in_user( - user_id - ) - - # We don't care about appservice users. - if not is_appservice: - for other_user_id in other_users_in_room: - if user_id == other_user_id: - continue - - to_insert.add((user_id, other_user_id)) + to_insert.add((user_id, other_user_id)) # Next we need to update for every local user in the room for other_user_id in other_users_in_room: if user_id == other_user_id: continue - - is_appservice = self.store.get_if_app_services_interested_in_user( - other_user_id - ) - if self.is_mine_id(other_user_id) and not is_appservice: + if self.is_mine_id(other_user_id): to_insert.add((other_user_id, user_id)) if to_insert: await self.store.add_users_who_share_private_room(room_id, to_insert) async def _handle_remove_user(self, room_id: str, user_id: str) -> None: - """Called when we might need to remove user from directory + """Called in two cases: + 1. The given user has left the given room. + 2. The last local user (someone else) just left the given room. So + so everyone else in the room is remote. We may no longer need to + include the given user (`user_id`) in the directory. Args: - room_id: The room ID that user left or stopped being public that - user_id + user_id: The user we may need to remove from the directory + room_id: The room which someone just left """ logger.debug("Removing user %r", user_id) # Remove user from sharing tables await self.store.remove_user_who_share_room(user_id, room_id) - # Are they still in any rooms? If not, remove them entirely. - rooms_user_is_in = await self.store.get_user_dir_rooms_user_is_in(user_id) + # If this is a remote user, we can remove them from the user_directory + # if they're no longer in any rooms. + if not self.is_mine_id(user_id): + rooms_user_is_in = await self.store.get_user_dir_rooms_user_is_in(user_id) - if len(rooms_user_is_in) == 0: - await self.store.remove_from_user_dir(user_id) + if len(rooms_user_is_in) == 0: + await self.store.remove_from_user_dir(user_id) - async def _handle_profile_change( + async def _handle_remote_possible_profile_change( self, user_id: str, - room_id: str, prev_event_id: Optional[str], event_id: Optional[str], ) -> None: @@ -419,6 +425,7 @@ async def _handle_profile_change( if not isinstance(new_name, str): new_name = prev_name + # TODO this leaks per-room profiles to the directory. prev_avatar = prev_event.content.get("avatar_url") new_avatar = event.content.get("avatar_url") # If the new avatar is an unexpected form, do not update the directory. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index c58a4b869072..64c18c6f86d4 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -196,6 +196,11 @@ async def get_users_in_room_with_profiles( ) -> Dict[str, ProfileInfo]: """Get a mapping from user ID to profile information for all users in a given room. + The profile information comes directly from this room's `m.room.member` + events, and so may be specific to this room rather than part of a user's + global profile. To avoid privacy leaks, the profile data should only be + revealed to users who are already in this room. + Args: room_id: The ID of the room to retrieve the users of. diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 65dde67ae98d..21b516423120 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -14,10 +14,12 @@ import logging import re -from typing import Any, Dict, Iterable, Optional, Set, Tuple +from typing import Any, Dict, Iterable, Optional, Sequence, Set, Tuple + +from typing_extensions import TypedDict from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.state import StateFilter from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -30,6 +32,10 @@ TEMP_TABLE = "_temp_populate_user_directory" +class ProgressDict(TypedDict): + remaining: int + + class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # How many records do we calculate before sending it to @@ -57,7 +63,9 @@ def __init__(self, database: DatabasePool, db_conn, hs): "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) - async def _populate_user_directory_createtables(self, progress, batch_size): + async def _populate_user_directory_createtables( + self, progress: Dict, batch_size: str + ) -> int: # Get all the rooms that we want to process. def _make_staging_area(txn): @@ -85,19 +93,17 @@ def _make_staging_area(txn): self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) del rooms - # If search all users is on, get all the users we want to add. - if self.hs.config.user_directory_search_all_users: - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_users(user_id TEXT NOT NULL)" - ) - txn.execute(sql) + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_users(user_id TEXT NOT NULL)" + ) + txn.execute(sql) - txn.execute("SELECT name FROM users") - users = [{"user_id": x[0]} for x in txn.fetchall()] + txn.execute("SELECT name FROM users") + users = [{"user_id": x[0]} for x in txn.fetchall()] - self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) + self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) new_pos = await self.get_max_stream_id_in_current_state_deltas() await self.db_pool.runInteraction( @@ -112,12 +118,14 @@ def _make_staging_area(txn): ) return 1 - async def _populate_user_directory_cleanup(self, progress, batch_size): + async def _populate_user_directory_cleanup( + self, progress: Dict, batch_size: str + ) -> int: """ Update the user directory stream position, then clean up the old tables. """ position = await self.db_pool.simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" + TEMP_TABLE + "_position", {}, "position" ) await self.update_user_directory_stream_pos(position) @@ -135,18 +143,32 @@ def _delete_staging_area(txn): ) return 1 - async def _populate_user_directory_process_rooms(self, progress, batch_size): + async def _populate_user_directory_process_rooms( + self, progress: ProgressDict, batch_size: int + ) -> int: """ + Rescan the state of all rooms so we can track + + - who's in a public room; + - which local users share a private room with other users (local + and remote); and + - who should be in the user_directory. + Args: progress (dict) batch_size (int): Maximum number of state events to process per cycle. + + Returns: + number of events processed. """ # If we don't have progress filed, delete everything. if not progress: await self.delete_all_from_user_dir() - def _get_next_batch(txn): + def _get_next_batch( + txn: LoggingTransaction, + ) -> Optional[Sequence[Tuple[str, str]]]: # Only fetch 250 rooms, so we don't fetch too many at once, even # if those 250 rooms have less than batch_size state events. sql = """ @@ -188,60 +210,7 @@ def _get_next_batch(txn): processed_event_count = 0 for room_id, event_count in rooms_to_work_on: - is_in_room = await self.is_host_joined(room_id, self.server_name) - - if is_in_room: - is_public = await self.is_room_world_readable_or_publicly_joinable( - room_id - ) - - users_with_profile = await self.get_users_in_room_with_profiles(room_id) - user_ids = set(users_with_profile) - - # Update each user in the user directory. - for user_id, profile in users_with_profile.items(): - await self.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) - - to_insert = set() - - if is_public: - for user_id in user_ids: - if self.get_if_app_services_interested_in_user(user_id): - continue - - to_insert.add(user_id) - - if to_insert: - await self.add_users_in_public_rooms(room_id, to_insert) - to_insert.clear() - else: - for user_id in user_ids: - if not self.hs.is_mine_id(user_id): - continue - - if self.get_if_app_services_interested_in_user(user_id): - continue - - for other_user_id in user_ids: - if user_id == other_user_id: - continue - - user_set = (user_id, other_user_id) - to_insert.add(user_set) - - # If it gets too big, stop and write to the database - # to prevent storing too much in RAM. - if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET: - await self.add_users_who_share_private_room( - room_id, to_insert - ) - to_insert.clear() - - if to_insert: - await self.add_users_who_share_private_room(room_id, to_insert) - to_insert.clear() + await self._populate_user_directory_process_single_room(room_id) # We've finished a room. Delete it from the table. await self.db_pool.simple_delete_one( @@ -264,15 +233,77 @@ def _get_next_batch(txn): return processed_event_count - async def _populate_user_directory_process_users(self, progress, batch_size): - """ - If search_all_users is enabled, add all of the users to the user directory. + async def _populate_user_directory_process_single_room(self, room_id: str) -> None: + """Update the user_directory and room-sharing tables to account for this room. + + There's a lot of overlap with _track_user_joined_room here. But that has + some more guarantees. It's only ever called when a user X of interest has + joined the room, so only needs to add rows containing user X (linear in + the size of the room). It also is only concerned with the room-sharing + tables. + + We need to add rows for all pairs of users (quadratic in the size of the + room) and to update the user_directory entries too. """ - if not self.hs.config.user_directory_search_all_users: - await self.db_pool.updates._end_background_update( - "populate_user_directory_process_users" + is_in_room = await self.is_host_joined(room_id, self.server_name) + if not is_in_room: + return + + is_public = await self.is_room_world_readable_or_publicly_joinable(room_id) + # TODO: this will leak per-room profiles to the user directory. + users_with_profile = await self.get_users_in_room_with_profiles(room_id) + + # Update each remote user in the user directory. + # (Entries for local users are managed by the UserDirectoryHandler + # and do not require us to peek at room state/events.) + for user_id, profile in users_with_profile.items(): + if self.hs.is_mine_id(user_id): + continue + await self.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url ) - return 1 + + to_insert = set() + + if is_public: + for user_id in users_with_profile: + if self.get_if_app_services_interested_in_user(user_id): + continue + + to_insert.add(user_id) + + if to_insert: + await self.add_users_in_public_rooms(room_id, to_insert) + to_insert.clear() + else: + for user_id in users_with_profile: + if not self.hs.is_mine_id(user_id): + continue + + if self.get_if_app_services_interested_in_user(user_id): + continue + + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + user_set = (user_id, other_user_id) + to_insert.add(user_set) + + # If it gets too big, stop and write to the database + # to prevent storing too much in RAM. + if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET: + await self.add_users_who_share_private_room(room_id, to_insert) + to_insert.clear() + + if to_insert: + await self.add_users_who_share_private_room(room_id, to_insert) + to_insert.clear() + + async def _populate_user_directory_process_users( + self, progress: ProgressDict, batch_size: int + ) -> int: + """Upsert a user_directory entry for each local user.""" def _get_next_batch(txn): sql = "SELECT user_id FROM %s LIMIT %s" % ( @@ -443,7 +474,7 @@ async def add_users_who_share_private_room( for user_id, other_user_id in user_id_tuples ], value_names=(), - value_values=None, + value_values=(), desc="add_users_who_share_room", ) @@ -462,7 +493,7 @@ async def add_users_in_public_rooms( key_names=["user_id", "room_id"], key_values=[(user_id, room_id) for user_id in user_ids], value_names=(), - value_values=None, + value_values=(), desc="add_users_in_public_rooms", ) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 2928c4f48cca..57cc3e264617 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -16,6 +16,7 @@ import synapse.types from synapse.api.errors import AuthError, SynapseError +from synapse.rest import admin from synapse.types import UserID from tests import unittest @@ -25,6 +26,8 @@ class ProfileTestCase(unittest.HomeserverTestCase): """Tests profile management.""" + servlets = [admin.register_servlets] + def make_homeserver(self, reactor, clock): self.mock_federation = Mock() self.mock_registry = Mock() @@ -46,11 +49,11 @@ def register_query_handler(query_type, handler): def prepare(self, reactor, clock, hs): self.store = hs.get_datastore() - self.frank = UserID.from_string("@1234ABCD:test") + self.frank = UserID.from_string("@1234abcd:test") self.bob = UserID.from_string("@4567:test") self.alice = UserID.from_string("@alice:remote") - self.get_success(self.store.create_profile(self.frank.localpart)) + self.get_success(self.register_user(self.frank.localpart, "frankpassword")) self.handler = hs.get_profile_handler() diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index e44bf2b3b187..a251f77f9c2e 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Tuple from unittest.mock import Mock from twisted.internet import defer @@ -94,7 +95,7 @@ def test_handle_local_profile_change_with_deactivated_user(self): # deactivate user self.get_success(self.store.set_user_deactivated_status(r_user_id, True)) - self.get_success(self.handler.handle_user_deactivated(r_user_id)) + self.get_success(self.handler.handle_local_user_deactivated(r_user_id)) # profile is not in directory profile = self.get_success(self.store.get_user_in_directory(r_user_id)) @@ -118,7 +119,7 @@ def test_handle_user_deactivated_support_user(self): ) self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None)) - self.get_success(self.handler.handle_user_deactivated(s_user_id)) + self.get_success(self.handler.handle_local_user_deactivated(s_user_id)) self.store.remove_from_user_dir.not_called() def test_handle_user_deactivated_regular_user(self): @@ -127,7 +128,7 @@ def test_handle_user_deactivated_regular_user(self): self.store.register_user(user_id=r_user_id, password_hash=None) ) self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None)) - self.get_success(self.handler.handle_user_deactivated(r_user_id)) + self.get_success(self.handler.handle_local_user_deactivated(r_user_id)) self.store.remove_from_user_dir.called_once_with(r_user_id) def test_private_room(self): @@ -379,7 +380,7 @@ def _compress_shared(self, shared): r.add((i["user_id"], i["other_user_id"], i["room_id"])) return r - def get_users_in_public_rooms(self): + def get_users_in_public_rooms(self) -> List[Tuple[str, str]]: r = self.get_success( self.store.db_pool.simple_select_list( "users_in_public_rooms", None, ("user_id", "room_id") @@ -390,7 +391,7 @@ def get_users_in_public_rooms(self): retval.append((i["user_id"], i["room_id"])) return retval - def get_users_who_share_private_rooms(self): + def get_users_who_share_private_rooms(self) -> List[Tuple[str, str, str]]: return self.get_success( self.store.db_pool.simple_select_list( "users_who_share_private_rooms", @@ -639,6 +640,66 @@ def _add_user_to_room( self.hs.get_storage().persistence.persist_event(event, context) ) + def test_making_room_public_doesnt_alter_directory_entry(self): + """Per-room names shouldn't go to the directory when the room becomes public. + + I made this a Synapse test case rather than a Complement one because + I think this is (strictly speaking) an implementation choice. Synapse + has chosen to only ever use the public profile when responding to a user + directory search. There's no privacy leak here, because making the room + public discloses the per-room name. + + The spec doesn't mandate anything about _how_ a user + should appear in a /user_directory/search result. Hypothetical example: + suppose Bob searches for Alice. When representing Alice in a search + result, it's reasonable to use any of Alice's nicknames that Bob is + aware of. Heck, maybe we even want to use lots of them in a combined + displayname like `Alice (aka "ali", "ally", "41iC3")`. + """ + # TODO the same should apply when Alice is a remote user. + alice = self.register_user("alice", "pass") + alice_token = self.login(alice, "pass") + bob = self.register_user("bob", "pass") + bob_token = self.login(bob, "pass") + + # Alice and Bob are in a private room. + room = self.helper.create_room_as(alice, is_public=False, tok=alice_token) + self.helper.invite(room, src=alice, targ=bob, tok=alice_token) + self.helper.join(room, user=bob, tok=bob_token) + + # Alice has a nickname unique to that room. + self.helper.send_state( + room, + "m.room.member", + { + "displayname": "Freddy Mercury", + "membership": "join", + }, + alice_token, + state_key=alice, + ) + + # Check Alice isn't recorded as being in a public room. + self.assertNotIn((alice, room), self.get_users_in_public_rooms()) + + # One of them makes the room public. + self.helper.send_state( + room, + "m.room.join_rules", + {"join_rule": "public"}, + alice_token, + ) + # Check that Alice is now recorded as being in a public room + self.assertIn((alice, room), self.get_users_in_public_rooms()) + + # Alice's display name remains the same in the user directory. + search_result = self.get_success(self.handler.search_users(bob, alice, 10)) + self.assertEqual( + search_result["results"], + [{"display_name": "alice", "avatar_url": None, "user_id": alice}], + 0, + ) + class TestUserDirSearchDisabled(unittest.HomeserverTestCase): user_id = "@test:test" diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 0c9cbb9aff52..847985d2adb3 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -702,6 +702,12 @@ class RoomJoinRatelimitTestCase(RoomBase): room.register_servlets, ] + def prepare(self, reactor, clock, homeserver): + super().prepare(reactor, clock, homeserver) + # profile changes expect that the user is actually registered + user = UserID.from_string(self.user_id) + self.get_success(self.register_user(user.localpart, "supersecretpassword")) + @unittest.override_config( {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} ) @@ -731,12 +737,6 @@ def test_join_local_ratelimit_profile_change(self): # join in a second. room_ids.append(self.helper.create_room_as(self.user_id)) - # Create a profile for the user, since it hasn't been done on registration. - store = self.hs.get_datastore() - self.get_success( - store.create_profile(UserID.from_string(self.user_id).localpart) - ) - # Update the display name for the user. path = "/_matrix/client/r0/profile/%s/displayname" % self.user_id channel = self.make_request("PUT", path, {"displayname": "John Doe"})