Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sliding Sync /sync/e2ee endpoint for To-Device messages #17167

Merged
merged 39 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f9e6e53
Configurable /sync/e2ee endpoint
MadLittleMods May 6, 2024
1e05a05
Add Sliding Sync `/sync/e2ee` endpoint for To-Device messages
MadLittleMods May 7, 2024
5e925f6
Share tests with test_sendtodevice
MadLittleMods May 8, 2024
69f9143
Comment on tests
MadLittleMods May 8, 2024
d4ff933
Prefer Sync v2 vs Sliding Sync distinction
MadLittleMods May 8, 2024
371ec57
Fix wait_for_sync_for_user in tests
MadLittleMods May 8, 2024
06d12e5
Ugly overloads
MadLittleMods May 8, 2024
b8b70ba
Fix lint
MadLittleMods May 8, 2024
c60a4f8
Add changelog
MadLittleMods May 8, 2024
10ffae6
Shared logic for `get_sync_result_builder()`
MadLittleMods May 8, 2024
6bf4896
Try calculate more
MadLittleMods May 9, 2024
8871dac
Share tests using inheritance
MadLittleMods May 9, 2024
0892283
Add comments docs
MadLittleMods May 9, 2024
adb7e20
Consolidate device_lists /sync tests
MadLittleMods May 9, 2024
f098355
Add `device_one_time_keys_count` tests
MadLittleMods May 9, 2024
6b7cfd7
Add tests for `device_unused_fallback_key_types` in `/sync`
MadLittleMods May 9, 2024
b9e5379
Describe test
MadLittleMods May 9, 2024
9bdfa16
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 16, 2024
7331401
Lint
MadLittleMods May 16, 2024
b23abca
Fix test inheritance
MadLittleMods May 16, 2024
821a1b3
Add missing field to docstring
MadLittleMods May 16, 2024
35ca937
Format docstring
MadLittleMods May 16, 2024
4ad7a8b
No need to change this formatting from develop
MadLittleMods May 16, 2024
3092ab5
Calculate room derived membership info for device_lists
MadLittleMods May 20, 2024
3539abe
Membership in timeline for better derived info
MadLittleMods May 20, 2024
5f194f9
Exclude application services
MadLittleMods May 20, 2024
02cecfa
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 20, 2024
f6122ff
Use `client_patterns()` for endpoint URL
MadLittleMods May 21, 2024
2f112e7
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 21, 2024
c2221bb
Lint
MadLittleMods May 21, 2024
717b160
Adjust wording, add todo
MadLittleMods May 21, 2024
514aba5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 22, 2024
9749795
Update filter to be more precise and avoid more work
MadLittleMods May 22, 2024
06ac1da
Restore copyright header
MadLittleMods May 22, 2024
3da6bc1
Use `@parameterized_class`
MadLittleMods May 22, 2024
d4b41aa
Fix lints
MadLittleMods May 22, 2024
6606ac1
Add docstring for parametized attributes
MadLittleMods May 23, 2024
ab0b844
Add actual typing for params (not just docstrings)
MadLittleMods May 23, 2024
a482545
Fix test after removing type ignore
MadLittleMods May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17167.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync/e2ee` endpoint for To-Device messages and device encryption info.
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)

# MSC3575 (Sliding Sync API endpoints)
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", False)

# MSC3773: Thread notifications
self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)

Expand Down
247 changes: 237 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
Dict,
FrozenSet,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)

import attr
Expand Down Expand Up @@ -128,6 +131,8 @@ class SyncVersion(Enum):

# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"
# Part of MSC3575 Sliding Sync
E2EE_SYNC = "e2ee_sync"


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down Expand Up @@ -280,6 +285,26 @@ def __bool__(self) -> bool:
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeSyncResult:
"""
Attributes:
next_batch: Token for the next sync
to_device: List of direct messages for the device.
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
device_unused_fallback_key_types: List of key types that have an unused fallback
key
"""

next_batch: StreamToken
to_device: List[JsonDict]
device_lists: DeviceListUpdates
device_one_time_keys_count: JsonMapping
device_unused_fallback_key_types: List[str]


class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
Expand Down Expand Up @@ -322,6 +347,31 @@ def __init__(self, hs: "HomeServer"):

self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync

@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult: ...

@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> E2eeSyncResult: ...

@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
Expand All @@ -331,7 +381,18 @@ async def wait_for_sync_for_user(
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
) -> Union[SyncResult, E2eeSyncResult]: ...

async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Expand All @@ -344,8 +405,10 @@ async def wait_for_sync_for_user(
since_token: The point in the stream to sync from.
timeout: How long to wait for new data to arrive before giving up.
full_state: Whether to return the full state for each room.

Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -366,6 +429,29 @@ async def wait_for_sync_for_user(
logger.debug("Returning sync response for %s", user_id)
return res

@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult: ...

@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> E2eeSyncResult: ...

@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
Expand All @@ -374,7 +460,17 @@ async def _wait_for_sync_for_user(
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult:
) -> Union[SyncResult, E2eeSyncResult]: ...

async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]:
"""The start of the machinery that produces a /sync response.

See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
Expand Down Expand Up @@ -417,14 +513,16 @@ async def _wait_for_sync_for_user(
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
result: Union[SyncResult, E2eeSyncResult] = (
await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
)
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
) -> Union[SyncResult, E2eeSyncResult]:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)
Expand Down Expand Up @@ -456,14 +554,43 @@ async def current_sync_callback(

return result

@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult: ...

@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> E2eeSyncResult: ...

@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Generates the response body of a sync result, represented as a SyncResult.
) -> Union[SyncResult, E2eeSyncResult]: ...

async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
"""
Generates the response body of a sync result, represented as a
`SyncResult`/`E2eeSyncResult`.

This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
Expand All @@ -474,15 +601,25 @@ async def current_sync_for_user(
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.p.
full_state: Whether to return the full state for each room.

Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})

# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: SyncResult = await self.generate_sync_result(
sync_config, since_token, full_state
sync_result: Union[SyncResult, E2eeSyncResult] = (
await self.generate_sync_result(
sync_config, since_token, full_state
)
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
sync_config, since_token
)
else:
raise Exception(
Expand Down Expand Up @@ -1691,6 +1828,96 @@ async def generate_sync_result(
next_batch=sync_result_builder.now_token,
)

async def generate_e2ee_sync_result(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
) -> E2eeSyncResult:
"""
Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result.

This is represented by a `E2eeSyncResult` struct, which is built from small
pieces using a `SyncResultBuilder`. The `sync_result_builder` is passed as a
mutable ("inout") parameter to various helper functions. These retrieve and
process the data which forms the sync body, often writing to the
`sync_result_builder` to store their output.

At the end, we transfer data from the `sync_result_builder` to a new `E2eeSyncResult`
instance to signify that the sync calculation is complete.
"""
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

sync_result_builder = await self.get_sync_result_builder(
sync_config,
since_token,
full_state=False,
)

# 1. Calculate `to_device` events
await self._generate_sync_entry_for_to_device(sync_result_builder)

# 2. Calculate `device_lists`
# Device list updates are sent if a since token is provided.
device_lists = DeviceListUpdates()
include_device_list_updates = bool(since_token and since_token.device_list_key)
if include_device_list_updates:
# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
#
# TODO: Running `_generate_sync_entry_for_rooms()` is a lot of work just to
# figure out the membership changes/derived info needed for
# `_generate_sync_entry_for_device_list()`. In the future, we should try to
# refactor this away.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)

# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

# 3. Calculate `device_one_time_keys_count` and `device_unused_fallback_key_types`
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)

return E2eeSyncResult(
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)

async def get_sync_result_builder(
self,
sync_config: SyncConfig,
Expand Down Expand Up @@ -1914,7 +2141,7 @@ async def _generate_sync_entry_for_device_list(
users_that_have_changed = (
await self._device_handler.get_device_changes_in_shared_rooms(
user_id,
sync_result_builder.joined_room_ids,
joined_room_ids,
from_token=since_token,
)
)
Expand Down
Loading
Loading