Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sliding Sync /sync/e2ee endpoint for To-Device messages #17167

Merged
merged 39 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f9e6e53
Configurable /sync/e2ee endpoint
MadLittleMods May 6, 2024
1e05a05
Add Sliding Sync `/sync/e2ee` endpoint for To-Device messages
MadLittleMods May 7, 2024
5e925f6
Share tests with test_sendtodevice
MadLittleMods May 8, 2024
69f9143
Comment on tests
MadLittleMods May 8, 2024
d4ff933
Prefer Sync v2 vs Sliding Sync distinction
MadLittleMods May 8, 2024
371ec57
Fix wait_for_sync_for_user in tests
MadLittleMods May 8, 2024
06d12e5
Ugly overloads
MadLittleMods May 8, 2024
b8b70ba
Fix lint
MadLittleMods May 8, 2024
c60a4f8
Add changelog
MadLittleMods May 8, 2024
10ffae6
Shared logic for `get_sync_result_builder()`
MadLittleMods May 8, 2024
6bf4896
Try calculate more
MadLittleMods May 9, 2024
8871dac
Share tests using inheritance
MadLittleMods May 9, 2024
0892283
Add comments docs
MadLittleMods May 9, 2024
adb7e20
Consolidate device_lists /sync tests
MadLittleMods May 9, 2024
f098355
Add `device_one_time_keys_count` tests
MadLittleMods May 9, 2024
6b7cfd7
Add tests for `device_unused_fallback_key_types` in `/sync`
MadLittleMods May 9, 2024
b9e5379
Describe test
MadLittleMods May 9, 2024
9bdfa16
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 16, 2024
7331401
Lint
MadLittleMods May 16, 2024
b23abca
Fix test inheritance
MadLittleMods May 16, 2024
821a1b3
Add missing field to docstring
MadLittleMods May 16, 2024
35ca937
Format docstring
MadLittleMods May 16, 2024
4ad7a8b
No need to change this formatting from develop
MadLittleMods May 16, 2024
3092ab5
Calculate room derived membership info for device_lists
MadLittleMods May 20, 2024
3539abe
Membership in timeline for better derived info
MadLittleMods May 20, 2024
5f194f9
Exclude application services
MadLittleMods May 20, 2024
02cecfa
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 20, 2024
f6122ff
Use `client_patterns()` for endpoint URL
MadLittleMods May 21, 2024
2f112e7
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 21, 2024
c2221bb
Lint
MadLittleMods May 21, 2024
717b160
Adjust wording, add todo
MadLittleMods May 21, 2024
514aba5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-e2ee
MadLittleMods May 22, 2024
9749795
Update filter to be more precise and avoid more work
MadLittleMods May 22, 2024
06ac1da
Restore copyright header
MadLittleMods May 22, 2024
3da6bc1
Use `@parameterized_class`
MadLittleMods May 22, 2024
d4b41aa
Fix lints
MadLittleMods May 22, 2024
6606ac1
Add docstring for parametized attributes
MadLittleMods May 23, 2024
ab0b844
Add actual typing for params (not just docstrings)
MadLittleMods May 23, 2024
a482545
Fix test after removing type ignore
MadLittleMods May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17167.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync/e2ee` endpoint for To-Device messages.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

@MadLittleMods MadLittleMods May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more on that "relieving pressure on the Sliding Sync proxy now" point. Since the proxy is just another Matrix client which uses the /sync v2 loop, I don't think this new /sync/e2ee endpoint will help anything because:

There is a critical problem in the current to-device implementation, which is
that events are implicitly acknowledged when the user advances the /sync token. This causes problems when clients need to have 2 or more sync streams open at a time, e.g a push notification process and a main process. This can cause the two processes to race to fetch the to-device events, resulting in the need for complex synchronisation rules to ensure the token is correctly and atomically exchanged between processes. Sliding Sync's implementation of to-device events removes this requirement by associating an explicit token just for to-device events. This token uses the same terminology of the current /sync implementation: since and next_batch.

-- MSC3885: Sliding Sync Extension: To-Device messages

(/sync v2 will acknowledge all of the To-Device events we're trying to serve with this new endpoint)

Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I think I wasn't taking into account that the proxy will be its own device that can be separate from device we can use for this endpoint. So acknowledging the To-Device messages for one device doesn't affect the other.

Copy link
Contributor Author

@MadLittleMods MadLittleMods May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this point in a meeting with @erikjohnston today and I think this might be a problem since it will be one device consuming everything. Would probably require updates to the Sliding Sync proxy to utilize both properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have thought you'd either have a device using the proxy entirely or you have a device using Synapse's SS impl entirely, not a weird mixture of both. If so, this is not a problem afaict?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of this endpoint isn't to remove pressure from the proxy directly.

  • By having a new endpoint, the Matrix Rust SDK can consume it directly instead of using sliding sync proxy with a particular conn_id for Sliding Sync.

  • That's the same idea with the improved /context, it removes the need for another use of sliding sync with another conn_id.

  • Finally, the last usage of sliding sync is the room list API. Since there will be only one usage of sliding sync, the conn_id won't be necessary anymore. In the simplified sliding sync implementation inside synapse, it means you would not have to manage multiple connection contexts at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By having a new endpoint, the Matrix Rust SDK can consume it directly instead of using sliding sync proxy with a particular conn_id for Sliding Sync.

This isn't true though, because the proxy will consume to-device messages that would have been delivered via /sync/e2ee, hence why it has to be an all-or-nothing for E2EE data. For /context then absolutely, that can be done via synapse only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm indeed. So the migration should ideally be all-on-synapse, instead of half-proxy, half-synapse. Gotcha.

Copy link
Contributor

@MatMaul MatMaul May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't true though, because the proxy will consume to-device messages that would have been delivered via /sync/e2ee

You could probably use a whole another device for all e2ee operations, but it's quite ugly and a workaround.

Copy link
Contributor Author

@MadLittleMods MadLittleMods May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this is just sharing and summarizing the clarifications from @erikjohnston 🙇

Discussed this point a bit further and got a little more clarification. The main aim of /sync/e2ee and /context [2] changes is to make handling notifications easier for the clients. Being able to use /sync/e2ee alongside the Sliding Sync proxy to relieve pressure was just an extra thought but given that the proxy and client share the same device [1], will not work (see To-Device problem explained above). If usage of /sync v2 and /sync/e2ee usage is split between separate devices (as @MatMaul mentioned) it would work but I don't think people have bandwidth/desire/benefit to do it.

This means that this endpoint probably won't be useful until we have a Sliding Sync implementation in Synapse itself. I started working on /sync/e2ee because I thought it would have some "sliding" aspect to ease my way into the concept but turns out to be just some /sync v2 refactoring. It's still good in terms of getting familiar with /sync v2 though.

It may very well turn out that /sync/e2ee is not actually as helpful as we thought (or if device_lists is sufficiently heavyweight that its just as slow), but from @erikjohnston's understanding right now, it'd be quite helpful to the SDK team (I don't know the exact details here).


[1] Sliding sync proxy shares the same device as the ElementX client (as far as it has been explained to me). The way it works is that EX calls the SS proxy with that token, and then the SS proxy caches it to make the requests.

[2] For onlookers, the /context changes just entail making the endpoint work for all events like invites. I haven't looked into what's necessary but it may be as simple as adjusting the check for whether someone is joined to the room.

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)

# MSC3575 (Sliding Sync API endpoints)
self.msc3575_enabled: bool = experimental.get("msc3575_enabled", False)

# MSC3773: Thread notifications
self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)

Expand Down
227 changes: 213 additions & 14 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
FrozenSet,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)

import attr
Expand Down Expand Up @@ -112,12 +116,30 @@
SyncRequestKey = Tuple[Any, ...]


class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.

This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""

# These string values are semantically significant because they are used in the the
# metrics

# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"
# Part of MSC3575 Sliding Sync
E2EE_SYNC = "e2ee_sync"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
filter_collection: FilterCollection
is_guest: bool
request_key: SyncRequestKey
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
device_id: Optional[str]


Expand Down Expand Up @@ -263,6 +285,15 @@ def __bool__(self) -> bool:
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeSyncResult:
next_batch: StreamToken
to_device: List[JsonDict]
# device_lists: DeviceListUpdates
# device_one_time_keys_count: JsonMapping
# device_unused_fallback_key_types: List[str]


class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
Expand Down Expand Up @@ -305,17 +336,58 @@ def __init__(self, hs: "HomeServer"):

self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync

@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
) -> SyncResult: ...

@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> E2eeSyncResult: ...

@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]: ...

async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.

Args:
request_key: The key to use for caching the response.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -324,9 +396,10 @@ async def wait_for_sync_for_user(
await self.auth_blocking.check_auth_blocking(requester=requester)

res = await self.response_cache.wrap(
sync_config.request_key,
request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
Expand All @@ -335,14 +408,48 @@ async def wait_for_sync_for_user(
logger.debug("Returning sync response for %s", user_id)
return res

@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult:
) -> SyncResult: ...

@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> E2eeSyncResult: ...

@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]: ...

async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]:
"""The start of the machinery that produces a /sync response.

See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
Expand All @@ -363,9 +470,11 @@ async def _wait_for_sync_for_user(
else:
sync_type = "incremental_sync"

sync_label = f"{sync_version}:{sync_type}"

context = current_context()
if context:
context.tag = sync_type
context.tag = sync_label
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
Expand All @@ -383,15 +492,19 @@ async def _wait_for_sync_for_user(
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
result: Union[SyncResult, E2eeSyncResult] = (
await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
)
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
) -> Union[SyncResult, E2eeSyncResult]:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand All @@ -416,16 +529,44 @@ async def current_sync_callback(
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()

return result

@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
) -> SyncResult: ...

@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult: ...

@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]: ...

async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
"""Generates the response body of a sync result, represented as a SyncResult.

This is a wrapper around `generate_sync_result` which starts an open tracing
Expand All @@ -434,9 +575,23 @@ async def current_sync_for_user(
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)

# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: Union[SyncResult, E2eeSyncResult] = (
await self.generate_sync_result(
sync_config, since_token, full_state
)
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
sync_config, since_token
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
Expand Down Expand Up @@ -1751,6 +1906,50 @@ async def generate_sync_result(
next_batch=sync_result_builder.now_token,
)

async def generate_e2ee_sync_result(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
) -> E2eeSyncResult:
"""Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result."""

user_id = sync_config.user.to_string()
# TODO: Should we exclude app services here? There could be an argument to allow
# them since the appservice doesn't have to make a massive initial sync.
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

joined_room_ids = await self.store.get_rooms_for_user(user_id)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state=False,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
# Dummy values to fill out `SyncResultBuilder`
excluded_room_ids=frozenset({}),
forced_newly_joined_room_ids=frozenset({}),
membership_change_events=[],
)

await self._generate_sync_entry_for_to_device(sync_result_builder)

return E2eeSyncResult(
to_device=sync_result_builder.to_device,
# to_device: List[JsonDict]
# device_lists: DeviceListUpdates
# device_one_time_keys_count: JsonMapping
# device_unused_fallback_key_types: List[str]
next_batch=sync_result_builder.now_token,
)

@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
Expand Down
Loading
Loading