Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stabilize support for MSC3970: updated transaction semantics (scope to device_id) #15629

Merged
merged 15 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15629.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Scope transaction IDs to devices (implement [MSC3970](https://github.com/matrix-org/matrix-spec-proposals/pull/3970)).
9 changes: 0 additions & 9 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,6 @@ def check_config_conflicts(self, root: RootConfig) -> None:
("session_lifetime",),
)

if not root.experimental.msc3970_enabled:
raise ConfigError(
"experimental_features.msc3970_enabled must be 'true' when OAuth delegation is enabled",
("experimental_features", "msc3970_enabled"),
)


@attr.s(auto_attribs=True, frozen=True, slots=True)
class MSC3866Config:
Expand Down Expand Up @@ -384,9 +378,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"Invalid MSC3861 configuration", ("experimental", "msc3861")
) from exc

# MSC3970: Scope transaction IDs to devices
self.msc3970_enabled = experimental.get("msc3970_enabled", self.msc3861.enabled)

# Check that none of the other config options conflict with MSC3861 when enabled
self.msc3861.check_config_conflicts(self.root)

Expand Down
39 changes: 20 additions & 19 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,13 @@ def serialize_event(
time_now_ms: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
msc3970_enabled: bool = False,
) -> JsonDict:
"""Serialize event for clients

Args:
e
time_now_ms
config: Event serialization config
msc3970_enabled: Whether MSC3970 is enabled. It changes whether we should
include the `transaction_id` in the event's `unsigned` section.

Returns:
The serialized event dictionary.
Expand All @@ -440,28 +437,36 @@ def serialize_event(
e.unsigned["redacted_because"],
time_now_ms,
config=config,
msc3970_enabled=msc3970_enabled,
)

# If we have a txn_id saved in the internal_metadata, we should include it in the
# unsigned section of the event if it was sent by the same session as the one
# requesting the event.
txn_id: Optional[str] = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None and config.requester is not None:
# For the MSC3970 rules to be applied, we *need* to have the device ID in the
# event internal metadata. Since we were not recording them before, if it hasn't
# been recorded, we fallback to the old behaviour.
# Some events do not have the device ID stored in the internal metadata,
# this includes old events as well as those created by appservice, guests,
# or with tokens minted with the admin API. For those events, fallback
# to using the access token instead.
event_device_id: Optional[str] = getattr(e.internal_metadata, "device_id", None)
if msc3970_enabled and event_device_id is not None:
if event_device_id is not None:
if event_device_id == config.requester.device_id:
d["unsigned"]["transaction_id"] = txn_id

else:
# The pre-MSC3970 behaviour is to only include the transaction ID if the
# event was sent from the same access token. For regular users, we can use
# the access token ID to determine this. For guests, we can't, but since
# each guest only has one access token, we can just check that the event was
# sent by the same user as the one requesting the event.
# Fallback behaviour: only include the transaction ID if the event
# was sent from the same access token.
#
# For regular users, the access token ID can be used to determine this.
# This includes access tokens minted with the admin API.
#
# For guests, we can't check the access token ID, but since each guest
# only has one access token, just check the event was sent by the same
# user as the one requesting the event.
#
# For appservice users, we can't check the access token ID, just
# check the event was sent by the same user as the one requesting
# the event.
event_token_id: Optional[int] = getattr(
e.internal_metadata, "token_id", None
)
Expand All @@ -472,6 +477,7 @@ def serialize_event(
and event_token_id == config.requester.access_token_id
)
or config.requester.is_guest
or config.requester.app_service
):
d["unsigned"]["transaction_id"] = txn_id

Expand Down Expand Up @@ -504,9 +510,6 @@ class EventClientSerializer:
clients.
"""

def __init__(self, *, msc3970_enabled: bool = False):
self._msc3970_enabled = msc3970_enabled

def serialize_event(
self,
event: Union[JsonDict, EventBase],
Expand All @@ -531,9 +534,7 @@ def serialize_event(
if not isinstance(event, EventBase):
return event

serialized_event = serialize_event(
event, time_now, config=config, msc3970_enabled=self._msc3970_enabled
)
serialized_event = serialize_event(event, time_now, config=config)

# Check if there are any bundled aggregations to include with the event.
if bundle_aggregations:
Expand Down
51 changes: 36 additions & 15 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,6 @@ def __init__(self, hs: "HomeServer"):
expiry_ms=30 * 60 * 1000,
)

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

async def create_event(
self,
requester: Requester,
Expand Down Expand Up @@ -887,14 +885,13 @@ async def deduplicate_state_event(
return prev_event
return None

async def get_event_from_transaction(
async def get_event_id_from_transaction(
self,
requester: Requester,
txn_id: str,
room_id: str,
) -> Optional[EventBase]:
"""For the given transaction ID and room ID, check if there is a matching event.
If so, fetch it and return it.
) -> Optional[str]:
"""For the given transaction ID and room ID, check if there is a matching event ID.

Args:
requester: The requester making the request in the context of which we want
Expand All @@ -903,12 +900,12 @@ async def get_event_from_transaction(
room_id: The room ID.

Returns:
An event if one could be found, None otherwise.
An event ID if one could be found, None otherwise.
"""
existing_event_id = None

if self._msc3970_enabled and requester.device_id:
# When MSC3970 is enabled, we lookup for events sent by the same device first,
# and fallback to the old behaviour if none were found.
# According to the spec, transactions are scoped to a user's device ID.
if requester.device_id:
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_device_id(
room_id,
Expand All @@ -918,10 +915,11 @@ async def get_event_from_transaction(
)
)
if existing_event_id:
return await self.store.get_event(existing_event_id)
return existing_event_id

# Pre-MSC3970, we looked up for events that were sent by the same session by
# using the access token ID.
# Some requsters don't have device IDs (appservice, guests, and access
# tokens minted with the admin API), fallback to checking the access token
# ID, which should be close enough.
if requester.access_token_id:
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_token_id(
Expand All @@ -931,9 +929,32 @@ async def get_event_from_transaction(
txn_id,
)
)
if existing_event_id:
return await self.store.get_event(existing_event_id)

return existing_event_id

async def get_event_from_transaction(
self,
requester: Requester,
txn_id: str,
room_id: str,
) -> Optional[EventBase]:
"""For the given transaction ID and room ID, check if there is a matching event.
If so, fetch it and return it.

Args:
requester: The requester making the request in the context of which we want
to fetch the event.
txn_id: The transaction ID.
room_id: The room ID.

Returns:
An event if one could be found, None otherwise.
"""
existing_event_id = await self.get_event_id_from_transaction(
requester, txn_id, room_id
)
if existing_event_id:
return await self.store.get_event(existing_event_id)
return None

async def create_and_send_nonmember_event(
Expand Down
28 changes: 4 additions & 24 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ def __init__(self, hs: "HomeServer"):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.

Expand Down Expand Up @@ -423,29 +421,11 @@ async def _local_membership_update(
# do this check just before we persist an event as well, but may as well
# do it up front for efficiency.)
if txn_id:
existing_event_id = None
if self._msc3970_enabled and requester.device_id:
# When MSC3970 is enabled, we lookup for events sent by the same device
# first, and fallback to the old behaviour if none were found.
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_device_id(
room_id,
requester.user.to_string(),
requester.device_id,
txn_id,
)
existing_event_id = (
await self.event_creation_handler.get_event_id_from_transaction(
requester, txn_id, room_id
)

if requester.access_token_id and not existing_event_id:
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_token_id(
room_id,
requester.user.to_string(),
requester.access_token_id,
txn_id,
)
)

)
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
Expand Down
12 changes: 6 additions & 6 deletions synapse/rest/client/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ def __init__(self, hs: "HomeServer"):
# for at *LEAST* 30 mins, and at *MOST* 60 mins.
self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS)

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable:
"""A helper function which returns a transaction key that can be used
with TransactionCache for idempotent requests.
Expand All @@ -78,18 +76,20 @@ def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hasha
elif requester.app_service is not None:
return (path, "appservice", requester.app_service.id)

# With MSC3970, we use the user ID and device ID as the transaction key
elif self._msc3970_enabled:
# Use the user ID and device ID as the transaction key.
elif requester.device_id:
assert requester.user, "Requester must have a user"
assert requester.device_id, "Requester must have a device_id"
return (path, "user", requester.user, requester.device_id)

# Otherwise, the pre-MSC3970 behaviour is to use the access token ID
# Some requsters don't have device IDs, these are mostly handled above
# (appservice and guest users), but does not cover access tokens minted
# by the admin API. Use the access token ID instead.
else:
assert (
requester.access_token_id is not None
), "Requester must have an access_token_id"
return (path, "user", requester.access_token_id)
return (path, "user_admin", requester.access_token_id)

def fetch_or_execute_request(
self,
Expand Down
4 changes: 1 addition & 3 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,7 @@ def get_oidc_handler(self) -> "OidcHandler":

@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
return EventClientSerializer(
msc3970_enabled=self.config.experimental.msc3970_enabled
)
return EventClientSerializer()

@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:
Expand Down
13 changes: 4 additions & 9 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ def __init__(
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen

self._msc3970_enabled = hs.config.experimental.msc3970_enabled

@trace
async def _persist_events_and_state_updates(
self,
Expand Down Expand Up @@ -1012,9 +1010,9 @@ def _persist_transaction_ids_txn(
)
)

# Pre-MSC3970, we rely on the access_token_id to scope the txn_id for events.
# Since this is an experimental flag, we still store the mapping even if the
# flag is disabled.
# Synapse usually relies on the device_id to scope transactions for events,
# except for users without device IDs (appservice, guests, and access
# tokens minted with the admin API) which use the access token ID instead.
if to_insert_token_id:
self.db_pool.simple_insert_many_txn(
txn,
Expand All @@ -1030,10 +1028,7 @@ def _persist_transaction_ids_txn(
values=to_insert_token_id,
)

# With MSC3970, we rely on the device_id instead to scope the txn_id for events.
# We're only inserting if MSC3970 is *enabled*, because else the pre-MSC3970
# behaviour would allow for a UNIQUE constraint violation on this table
if to_insert_device_id and self._msc3970_enabled:
if to_insert_device_id:
self.db_pool.simple_insert_many_txn(
txn,
table="event_txn_id_device_id",
Expand Down
3 changes: 2 additions & 1 deletion synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class Requester:
request, or None if it came via the appservice API or similar
is_guest: True if the user making this request is a guest user
shadow_banned: True if the user making this request has been shadow-banned.
device_id: device_id which was set at authentication time
device_id: device_id which was set at authentication time, this will be
None for appservices, guests, and tokens generated by the admin API
app_service: the AS requesting on behalf of the user
authenticated_entity: The entity that authenticated when making the request.
This is different to the user_id when an admin user or the server is
Expand Down