diff --git a/changelog.d/10569.feature b/changelog.d/10569.feature new file mode 100644 index 000000000000..ffc4e4289cfa --- /dev/null +++ b/changelog.d/10569.feature @@ -0,0 +1 @@ +Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946). diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2eefac04fd09..0af953a5d6e2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1290,6 +1290,88 @@ async def send_request(destination: str) -> FederationSpaceSummaryResult: failover_on_unknown_endpoint=True, ) + async def get_room_hierarchy( + self, + destinations: Iterable[str], + room_id: str, + suggested_only: bool, + ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]: + """ + Call other servers to get a hierarchy of the given room. + + Performs simple data validates and parsing of the response. + + Args: + destinations: The remote servers. We will try them in turn, omitting any + that have been blacklisted. + room_id: ID of the space to be queried + suggested_only: If true, ask the remote server to only return children + with the "suggested" flag set + + Returns: + A tuple of: + The room as a JSON dictionary. + A list of children rooms, as JSON dictionaries. + A list of inaccessible children room IDs. + + Raises: + SynapseError if we were unable to get a valid summary from any of the + remote servers + """ + + async def send_request( + destination: str, + ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]: + res = await self.transport_layer.get_room_hierarchy( + destination=destination, + room_id=room_id, + suggested_only=suggested_only, + ) + + room = res.get("room") + if not isinstance(room, dict): + raise InvalidResponseError("'room' must be a dict") + + # Validate children_state of the room. + children_state = room.get("children_state", []) + if not isinstance(children_state, Sequence): + raise InvalidResponseError("'room.children_state' must be a list") + if any(not isinstance(e, dict) for e in children_state): + raise InvalidResponseError("Invalid event in 'children_state' list") + try: + [ + FederationSpaceSummaryEventResult.from_json_dict(e) + for e in children_state + ] + except ValueError as e: + raise InvalidResponseError(str(e)) + + # Validate the children rooms. + children = res.get("children", []) + if not isinstance(children, Sequence): + raise InvalidResponseError("'children' must be a list") + if any(not isinstance(r, dict) for r in children): + raise InvalidResponseError("Invalid room in 'children' list") + + # Validate the inaccessible children. + inaccessible_children = res.get("inaccessible_children", []) + if not isinstance(inaccessible_children, Sequence): + raise InvalidResponseError("'inaccessible_children' must be a list") + if any(not isinstance(r, str) for r in inaccessible_children): + raise InvalidResponseError( + "Invalid room ID in 'inaccessible_children' list" + ) + + return room, children, inaccessible_children + + # TODO Fallback to the old federation API and translate the results. + return await self._try_destination_list( + "fetch room hierarchy", + destinations, + send_request, + failover_on_unknown_endpoint=True, + ) + @attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryEventResult: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 90a7c16b62a1..8b247fe2066d 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -1177,6 +1177,28 @@ async def get_space_summary( destination=destination, path=path, data=params ) + async def get_room_hierarchy( + self, + destination: str, + room_id: str, + suggested_only: bool, + ) -> JsonDict: + """ + Args: + destination: The remote server + room_id: The room ID to ask about. + suggested_only: if True, only suggested rooms will be returned + """ + path = _create_path( + FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/hierarchy/%s", room_id + ) + + return await self.client.get_json( + destination=destination, + path=path, + args={"suggested_only": "true" if suggested_only else "false"}, + ) + def _create_path(federation_prefix: str, path: str, *args: str) -> str: """ diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 640f46fff6bf..79a2e1afa0a0 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1936,6 +1936,33 @@ async def on_POST( ) +class FederationRoomHierarchyServlet(BaseFederationServlet): + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946" + PATH = "/hierarchy/(?P[^/]*)" + + def __init__( + self, + hs: HomeServer, + authenticator: Authenticator, + ratelimiter: FederationRateLimiter, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self.handler = hs.get_space_summary_handler() + + async def on_GET( + self, + origin: str, + content: Literal[None], + query: Mapping[bytes, Sequence[bytes]], + room_id: str, + ) -> Tuple[int, JsonDict]: + suggested_only = parse_boolean_from_args(query, "suggested_only", default=False) + return 200, await self.handler.get_federation_hierarchy( + origin, room_id, suggested_only + ) + + class RoomComplexityServlet(BaseFederationServlet): """ Indicates to other servers how complex (and therefore likely @@ -1999,6 +2026,7 @@ async def on_GET( FederationVersionServlet, RoomComplexityServlet, FederationSpaceSummaryServlet, + FederationRoomHierarchyServlet, FederationV1SendKnockServlet, FederationMakeKnockServlet, ) diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index d0060f90462e..c74e90abbc08 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -16,17 +16,7 @@ import logging import re from collections import deque -from typing import ( - TYPE_CHECKING, - Deque, - Dict, - Iterable, - List, - Optional, - Sequence, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Set, Tuple import attr @@ -80,7 +70,7 @@ class _PaginationSession: # The time the pagination session was created, in milliseconds. creation_time_ms: int # The queue of rooms which are still to process. - room_queue: Deque["_RoomQueueEntry"] + room_queue: List["_RoomQueueEntry"] # A set of rooms which have been processed. processed_rooms: Set[str] @@ -197,7 +187,7 @@ async def get_space_summary( events: Sequence[JsonDict] = [] if room_entry: rooms_result.append(room_entry.room) - events = room_entry.children + events = room_entry.children_state_events logger.debug( "Query of local room %s returned events %s", @@ -232,7 +222,7 @@ async def get_space_summary( room.pop("allowed_spaces", None) rooms_result.append(room) - events.extend(room_entry.children) + events.extend(room_entry.children_state_events) # All rooms returned don't need visiting again (even if the user # didn't have access to them). @@ -350,8 +340,8 @@ async def _get_room_hierarchy( room_queue = pagination_session.room_queue processed_rooms = pagination_session.processed_rooms else: - # the queue of rooms to process - room_queue = deque((_RoomQueueEntry(requested_room_id, ()),)) + # The queue of rooms to process, the next room is last on the stack. + room_queue = [_RoomQueueEntry(requested_room_id, ())] # Rooms we have already processed. processed_rooms = set() @@ -367,7 +357,7 @@ async def _get_room_hierarchy( # Iterate through the queue until we reach the limit or run out of # rooms to include. while room_queue and len(rooms_result) < limit: - queue_entry = room_queue.popleft() + queue_entry = room_queue.pop() room_id = queue_entry.room_id current_depth = queue_entry.depth if room_id in processed_rooms: @@ -376,6 +366,18 @@ async def _get_room_hierarchy( logger.debug("Processing room %s", room_id) + # A map of summaries for children rooms that might be returned over + # federation. The rationale for caching these and *maybe* using them + # is to prefer any information local to the homeserver before trusting + # data received over federation. + children_room_entries: Dict[str, JsonDict] = {} + # A set of room IDs which are children that did not have information + # returned over federation and are known to be inaccessible to the + # current server. We should not reach out over federation to try to + # summarise these rooms. + inaccessible_children: Set[str] = set() + + # If the room is known locally, summarise it! is_in_room = await self._store.is_host_joined(room_id, self._server_name) if is_in_room: room_entry = await self._summarize_local_room( @@ -387,26 +389,68 @@ async def _get_room_hierarchy( max_children=None, ) - if room_entry: - rooms_result.append(room_entry.as_json()) - - # Add the child to the queue. We have already validated - # that the vias are a list of server names. - # - # If the current depth is the maximum depth, do not queue - # more entries. - if max_depth is None or current_depth < max_depth: - room_queue.extendleft( - _RoomQueueEntry( - ev["state_key"], ev["content"]["via"], current_depth + 1 - ) - for ev in reversed(room_entry.children) - ) - - processed_rooms.add(room_id) + # Otherwise, attempt to use information for federation. else: - # TODO Federation. - pass + # A previous call might have included information for this room. + # It can be used if either: + # + # 1. The room is not a space. + # 2. The maximum depth has been achieved (since no children + # information is needed). + if queue_entry.remote_room and ( + queue_entry.remote_room.get("room_type") != RoomTypes.SPACE + or (max_depth is not None and current_depth >= max_depth) + ): + room_entry = _RoomEntry( + queue_entry.room_id, queue_entry.remote_room + ) + + # If the above isn't true, attempt to fetch the room + # information over federation. + else: + ( + room_entry, + children_room_entries, + inaccessible_children, + ) = await self._summarize_remote_room_hiearchy( + queue_entry, + suggested_only, + ) + + # Ensure this room is accessible to the requester (and not just + # the homeserver). + if room_entry and not await self._is_remote_room_accessible( + requester, queue_entry.room_id, room_entry.room + ): + room_entry = None + + # This room has been processed and should be ignored if it appears + # elsewhere in the hierarchy. + processed_rooms.add(room_id) + + # There may or may not be a room entry based on whether it is + # inaccessible to the requesting user. + if room_entry: + # Add the room (including the stripped m.space.child events). + rooms_result.append(room_entry.as_json()) + + # If this room is not at the max-depth, check if there are any + # children to process. + if max_depth is None or current_depth < max_depth: + # The children get added in reverse order so that the next + # room to process, according to the ordering, is the last + # item in the list. + room_queue.extend( + _RoomQueueEntry( + ev["state_key"], + ev["content"]["via"], + current_depth + 1, + children_room_entries.get(ev["state_key"]), + ) + for ev in reversed(room_entry.children_state_events) + if ev["type"] == EventTypes.SpaceChild + and ev["state_key"] not in inaccessible_children + ) result: JsonDict = {"rooms": rooms_result} @@ -477,15 +521,78 @@ async def federation_space_summary( if room_entry: rooms_result.append(room_entry.room) - events_result.extend(room_entry.children) + events_result.extend(room_entry.children_state_events) # add any children to the queue room_queue.extend( - edge_event["state_key"] for edge_event in room_entry.children + edge_event["state_key"] + for edge_event in room_entry.children_state_events ) return {"rooms": rooms_result, "events": events_result} + async def get_federation_hierarchy( + self, + origin: str, + requested_room_id: str, + suggested_only: bool, + ): + """ + Implementation of the room hierarchy Federation API. + + This is similar to get_room_hierarchy, but does not recurse into the space. + It also considers whether anyone on the server may be able to access the + room, as opposed to whether a specific user can. + + Args: + origin: The server requesting the spaces summary. + requested_room_id: The room ID to start the hierarchy at (the "root" room). + suggested_only: whether we should only return children with the "suggested" + flag set. + + Returns: + The JSON hierarchy dictionary. + """ + root_room_entry = await self._summarize_local_room( + None, origin, requested_room_id, suggested_only, max_children=None + ) + if root_room_entry is None: + # Room is inaccessible to the requesting server. + raise SynapseError(404, "Unknown room: %s" % (requested_room_id,)) + + children_rooms_result: List[JsonDict] = [] + inaccessible_children: List[str] = [] + + # Iterate through each child and potentially add it, but not its children, + # to the response. + for child_room in root_room_entry.children_state_events: + room_id = child_room.get("state_key") + assert isinstance(room_id, str) + # If the room is unknown, skip it. + if not await self._store.is_host_joined(room_id, self._server_name): + continue + + room_entry = await self._summarize_local_room( + None, origin, room_id, suggested_only, max_children=0 + ) + # If the room is accessible, include it in the results. + # + # Note that only the room summary (without information on children) + # is included in the summary. + if room_entry: + children_rooms_result.append(room_entry.room) + # Otherwise, note that the requesting server shouldn't bother + # trying to summarize this room - they do not have access to it. + else: + inaccessible_children.append(room_id) + + return { + # Include the requested room (including the stripped children events). + "room": root_room_entry.as_json(), + "children": children_rooms_result, + "inaccessible_children": inaccessible_children, + } + async def _summarize_local_room( self, requester: Optional[str], @@ -519,8 +626,9 @@ async def _summarize_local_room( room_entry = await self._build_room_entry(room_id, for_federation=bool(origin)) - # If the room is not a space, return just the room information. - if room_entry.get("room_type") != RoomTypes.SPACE: + # If the room is not a space or the children don't matter, return just + # the room information. + if room_entry.get("room_type") != RoomTypes.SPACE or max_children == 0: return _RoomEntry(room_id, room_entry) # Otherwise, look for child rooms/spaces. @@ -616,6 +724,59 @@ async def _summarize_remote_room( return results + async def _summarize_remote_room_hiearchy( + self, room: "_RoomQueueEntry", suggested_only: bool + ) -> Tuple[Optional["_RoomEntry"], Dict[str, JsonDict], Set[str]]: + """ + Request room entries and a list of event entries for a given room by querying a remote server. + + Args: + room: The room to summarize. + suggested_only: True if only suggested children should be returned. + Otherwise, all children are returned. + + Returns: + A tuple of: + The room entry. + Partial room data return over federation. + A set of inaccessible children room IDs. + """ + room_id = room.room_id + logger.info("Requesting summary for %s via %s", room_id, room.via) + + via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE) + try: + ( + room_response, + children, + inaccessible_children, + ) = await self._federation_client.get_room_hierarchy( + via, + room_id, + suggested_only=suggested_only, + ) + except Exception as e: + logger.warning( + "Unable to get hierarchy of %s via federation: %s", + room_id, + e, + exc_info=logger.isEnabledFor(logging.DEBUG), + ) + return None, {}, set() + + # Map the children to their room ID. + children_by_room_id = { + c["room_id"]: c + for c in children + if "room_id" in c and isinstance(c["room_id"], str) + } + + return ( + _RoomEntry(room_id, room_response, room_response.pop("children_state", ())), + children_by_room_id, + set(inaccessible_children), + ) + async def _is_local_room_accessible( self, room_id: str, requester: Optional[str], origin: Optional[str] = None ) -> bool: @@ -866,9 +1027,16 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]: @attr.s(frozen=True, slots=True, auto_attribs=True) class _RoomQueueEntry: + # The room ID of this entry. room_id: str + # The server to query if the room is not known locally. via: Sequence[str] + # The minimum number of hops necessary to get to this room (compared to the + # originally requested room). depth: int = 0 + # The room summary for this room returned via federation. This will only be + # used if the room is not known locally (and is not a space). + remote_room: Optional[JsonDict] = None @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -879,11 +1047,17 @@ class _RoomEntry: # An iterable of the sorted, stripped children events for children of this room. # # This may not include all children. - children: Sequence[JsonDict] = () + children_state_events: Sequence[JsonDict] = () def as_json(self) -> JsonDict: + """ + Returns a JSON dictionary suitable for the room hierarchy endpoint. + + It returns the room summary including the stripped m.space.child events + as a sub-key. + """ result = dict(self.room) - result["children_state"] = self.children + result["children_state"] = self.children_state_events return result diff --git a/tests/handlers/test_space_summary.py b/tests/handlers/test_space_summary.py index 83c2bdd8f99c..bc8e131f4a0b 100644 --- a/tests/handlers/test_space_summary.py +++ b/tests/handlers/test_space_summary.py @@ -481,7 +481,7 @@ def test_pagination(self): self.assertNotIn("next_batch", result) def test_invalid_pagination_token(self): - """""" + """An invalid pagination token, or changing other parameters, shoudl be rejected.""" room_ids = [] for i in range(1, 10): room = self.helper.create_room_as(self.user, tok=self.token) @@ -581,33 +581,40 @@ def test_fed_complex(self): subspace = "#subspace:" + fed_hostname subroom = "#subroom:" + fed_hostname + # Generate some good data, and some bad data: + # + # * Event *back* to the root room. + # * Unrelated events / rooms + # * Multiple levels of events (in a not-useful order, e.g. grandchild + # events before child events). + + # Note that these entries are brief, but should contain enough info. + requested_room_entry = _RoomEntry( + subspace, + { + "room_id": subspace, + "world_readable": True, + "room_type": RoomTypes.SPACE, + }, + [ + { + "type": EventTypes.SpaceChild, + "room_id": subspace, + "state_key": subroom, + "content": {"via": [fed_hostname]}, + } + ], + ) + child_room = { + "room_id": subroom, + "world_readable": True, + } + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): - # Return some good data, and some bad data: - # - # * Event *back* to the root room. - # * Unrelated events / rooms - # * Multiple levels of events (in a not-useful order, e.g. grandchild - # events before child events). - - # Note that these entries are brief, but should contain enough info. return [ - _RoomEntry( - subspace, - { - "room_id": subspace, - "world_readable": True, - "room_type": RoomTypes.SPACE, - }, - [ - { - "room_id": subspace, - "state_key": subroom, - "content": {"via": [fed_hostname]}, - } - ], - ), + requested_room_entry, _RoomEntry( subroom, { @@ -617,6 +624,9 @@ async def summarize_remote_room( ), ] + async def summarize_remote_room_hiearchy(_self, room, suggested_only): + return requested_room_entry, {subroom: child_room}, set() + # Add a room to the space which is on another server. self._add_child(self.space, subspace, self.token) @@ -636,6 +646,15 @@ async def summarize_remote_room( ] self._assert_rooms(result, expected) + with mock.patch( + "synapse.handlers.space_summary.SpaceSummaryHandler._summarize_remote_room_hiearchy", + new=summarize_remote_room_hiearchy, + ): + result = self.get_success( + self.handler.get_room_hierarchy(self.user, self.space) + ) + self._assert_hierarchy(result, expected) + def test_fed_filtering(self): """ Rooms returned over federation should be properly filtered to only include @@ -657,100 +676,106 @@ def test_fed_filtering(self): # Poke an invite over federation into the database. self._poke_fed_invite(invited_room, "@remote:" + fed_hostname) + # Note that these entries are brief, but should contain enough info. + children_rooms = ( + ( + public_room, + { + "room_id": public_room, + "world_readable": False, + "join_rules": JoinRules.PUBLIC, + }, + ), + ( + knock_room, + { + "room_id": knock_room, + "world_readable": False, + "join_rules": JoinRules.KNOCK, + }, + ), + ( + not_invited_room, + { + "room_id": not_invited_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ), + ( + invited_room, + { + "room_id": invited_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ), + ( + restricted_room, + { + "room_id": restricted_room, + "world_readable": False, + "join_rules": JoinRules.RESTRICTED, + "allowed_spaces": [], + }, + ), + ( + restricted_accessible_room, + { + "room_id": restricted_accessible_room, + "world_readable": False, + "join_rules": JoinRules.RESTRICTED, + "allowed_spaces": [self.room], + }, + ), + ( + world_readable_room, + { + "room_id": world_readable_room, + "world_readable": True, + "join_rules": JoinRules.INVITE, + }, + ), + ( + joined_room, + { + "room_id": joined_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ), + ) + + subspace_room_entry = _RoomEntry( + subspace, + { + "room_id": subspace, + "world_readable": True, + }, + # Place each room in the sub-space. + [ + { + "type": EventTypes.SpaceChild, + "room_id": subspace, + "state_key": room_id, + "content": {"via": [fed_hostname]}, + } + for room_id, _ in children_rooms + ], + ) + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): - # Note that these entries are brief, but should contain enough info. - rooms = [ - _RoomEntry( - public_room, - { - "room_id": public_room, - "world_readable": False, - "join_rules": JoinRules.PUBLIC, - }, - ), - _RoomEntry( - knock_room, - { - "room_id": knock_room, - "world_readable": False, - "join_rules": JoinRules.KNOCK, - }, - ), - _RoomEntry( - not_invited_room, - { - "room_id": not_invited_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), - _RoomEntry( - invited_room, - { - "room_id": invited_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), - _RoomEntry( - restricted_room, - { - "room_id": restricted_room, - "world_readable": False, - "join_rules": JoinRules.RESTRICTED, - "allowed_spaces": [], - }, - ), - _RoomEntry( - restricted_accessible_room, - { - "room_id": restricted_accessible_room, - "world_readable": False, - "join_rules": JoinRules.RESTRICTED, - "allowed_spaces": [self.room], - }, - ), - _RoomEntry( - world_readable_room, - { - "room_id": world_readable_room, - "world_readable": True, - "join_rules": JoinRules.INVITE, - }, - ), - _RoomEntry( - joined_room, - { - "room_id": joined_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), + return [subspace_room_entry] + [ + # A copy is made of the room data since the allowed_spaces key + # is removed. + _RoomEntry(child_room[0], dict(child_room[1])) + for child_room in children_rooms ] - # Also include the subspace. - rooms.insert( - 0, - _RoomEntry( - subspace, - { - "room_id": subspace, - "world_readable": True, - }, - # Place each room in the sub-space. - [ - { - "room_id": subspace, - "state_key": room.room_id, - "content": {"via": [fed_hostname]}, - } - for room in rooms - ], - ), - ) - return rooms + async def summarize_remote_room_hiearchy(_self, room, suggested_only): + return subspace_room_entry, dict(children_rooms), set() # Add a room to the space which is on another server. self._add_child(self.space, subspace, self.token) @@ -788,6 +813,15 @@ async def summarize_remote_room( ] self._assert_rooms(result, expected) + with mock.patch( + "synapse.handlers.space_summary.SpaceSummaryHandler._summarize_remote_room_hiearchy", + new=summarize_remote_room_hiearchy, + ): + result = self.get_success( + self.handler.get_room_hierarchy(self.user, self.space) + ) + self._assert_hierarchy(result, expected) + def test_fed_invited(self): """ A room which the user was invited to should be included in the response. @@ -802,19 +836,22 @@ def test_fed_invited(self): # Poke an invite over federation into the database. self._poke_fed_invite(fed_room, "@remote:" + fed_hostname) + fed_room_entry = _RoomEntry( + fed_room, + { + "room_id": fed_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ) + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): - return [ - _RoomEntry( - fed_room, - { - "room_id": fed_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), - ] + return [fed_room_entry] + + async def summarize_remote_room_hiearchy(_self, room, suggested_only): + return fed_room_entry, {}, set() # Add a room to the space which is on another server. self._add_child(self.space, fed_room, self.token) @@ -833,3 +870,12 @@ async def summarize_remote_room( (fed_room, ()), ] self._assert_rooms(result, expected) + + with mock.patch( + "synapse.handlers.space_summary.SpaceSummaryHandler._summarize_remote_room_hiearchy", + new=summarize_remote_room_hiearchy, + ): + result = self.get_success( + self.handler.get_room_hierarchy(self.user, self.space) + ) + self._assert_hierarchy(result, expected)