diff --git a/changelog.d/15280.misc b/changelog.d/15280.misc new file mode 100644 index 000000000000..41d56b0cf0b4 --- /dev/null +++ b/changelog.d/15280.misc @@ -0,0 +1 @@ +Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations. diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 94b86c1d6ff5..1dcb397ba456 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -68,7 +68,10 @@ MediaRepositoryBackgroundUpdateStore, ) from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore -from synapse.storage.databases.main.pusher import PusherWorkerStore +from synapse.storage.databases.main.pusher import ( + PusherBackgroundUpdatesStore, + PusherWorkerStore, +) from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, @@ -226,6 +229,7 @@ class Store( AccountDataWorkerStore, PushRuleStore, PusherWorkerStore, + PusherBackgroundUpdatesStore, PresenceBackgroundUpdateStore, ReceiptsBackgroundUpdateStore, RelationsWorkerStore, diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 308e38edea2d..1e89447044f5 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1504,8 +1504,10 @@ async def delete_access_token(self, access_token: str) -> None: ) # delete pushers associated with this access token + # XXX(quenting): This is only needed until the 'set_device_id_for_pushers' + # background update completes. if token.token_id is not None: - await self.hs.get_pusherpool().remove_pushers_by_access_token( + await self.hs.get_pusherpool().remove_pushers_by_access_tokens( token.user_id, (token.token_id,) ) @@ -1535,7 +1537,9 @@ async def delete_access_tokens_for_user( ) # delete pushers associated with the access tokens - await self.hs.get_pusherpool().remove_pushers_by_access_token( + # XXX(quenting): This is only needed until the 'set_device_id_for_pushers' + # background update completes. + await self.hs.get_pusherpool().remove_pushers_by_access_tokens( user_id, (token_id for _, token_id, _ in tokens_and_devices) ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6f7963df43ae..9ded6389acdb 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -503,6 +503,8 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: else: raise + await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) + # Delete data specific to each device. Not optimised as it is not # considered as part of a critical path. for device_id in device_ids: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6b110dcb6e4d..c8bf2439afb5 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -1013,11 +1013,11 @@ async def _register_email_threepid( user_tuple = await self.store.get_user_by_access_token(token) # The token better still exist. assert user_tuple - token_id = user_tuple.token_id + device_id = user_tuple.device_id await self.pusher_pool.add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="email", app_id="m.email", app_display_name="Email Notifications", diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index a0c760239db8..9e3a98741a4f 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -103,7 +103,7 @@ class PusherConfig: id: Optional[str] user_name: str - access_token: Optional[int] + profile_tag: str kind: str app_id: str @@ -119,6 +119,11 @@ class PusherConfig: enabled: bool device_id: Optional[str] + # XXX(quenting): The access_token is not persisted anymore for new pushers, but we + # keep it when reading from the database, so that we don't get stale pushers + # while the "set_device_id_for_pushers" background update is running. + access_token: Optional[int] + def as_dict(self) -> Dict[str, Any]: """Information that can be retrieved about a pusher after creation.""" return { diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index e2648cbc93c9..6517e3566fae 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,7 +25,7 @@ from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory from synapse.replication.http.push import ReplicationRemovePusherRestServlet -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, StrCollection from synapse.util.async_helpers import concurrently_execute from synapse.util.threepids import canonicalise_email @@ -97,7 +97,6 @@ def start(self) -> None: async def add_or_update_pusher( self, user_id: str, - access_token: Optional[int], kind: str, app_id: str, app_display_name: str, @@ -128,6 +127,22 @@ async def add_or_update_pusher( # stream ordering, so it will process pushes from this point onwards. last_stream_ordering = self.store.get_room_max_stream_ordering() + # Before we actually persist the pusher, we check if the user already has one + # for this app ID and pushkey. If so, we want to keep the access token and + # device ID in place, since this could be one device modifying + # (e.g. enabling/disabling) another device's pusher. + # XXX(quenting): Even though we're not persisting the access_token_id for new + # pushers anymore, we still need to copy existing access_token_ids over when + # updating a pusher, in case the "set_device_id_for_pushers" background update + # hasn't run yet. + access_token_id = None + existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( + user_id, app_id, pushkey + ) + if existing_config: + device_id = existing_config.device_id + access_token_id = existing_config.access_token + # we try to create the pusher just to validate the config: it # will then get pulled out of the database, # recreated, added and started: this means we have only one @@ -136,7 +151,6 @@ async def add_or_update_pusher( PusherConfig( id=None, user_name=user_id, - access_token=access_token, profile_tag=profile_tag, kind=kind, app_id=app_id, @@ -151,23 +165,12 @@ async def add_or_update_pusher( failing_since=None, enabled=enabled, device_id=device_id, + access_token=access_token_id, ) ) - # Before we actually persist the pusher, we check if the user already has one - # this app ID and pushkey. If so, we want to keep the access token and device ID - # in place, since this could be one device modifying (e.g. enabling/disabling) - # another device's pusher. - existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( - user_id, app_id, pushkey - ) - if existing_config: - access_token = existing_config.access_token - device_id = existing_config.device_id - await self.store.add_pusher( user_id=user_id, - access_token=access_token, kind=kind, app_id=app_id, app_display_name=app_display_name, @@ -180,6 +183,7 @@ async def add_or_update_pusher( profile_tag=profile_tag, enabled=enabled, device_id=device_id, + access_token_id=access_token_id, ) pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id) @@ -199,7 +203,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_pushers_by_access_token( + async def remove_pushers_by_access_tokens( self, user_id: str, access_tokens: Iterable[int] ) -> None: """Remove the pushers for a given user corresponding to a set of @@ -209,6 +213,8 @@ async def remove_pushers_by_access_token( user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ + # XXX(quenting): This is only needed until the "set_device_id_for_pushers" + # background update finishes tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: @@ -220,6 +226,26 @@ async def remove_pushers_by_access_token( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) + async def remove_pushers_by_devices( + self, user_id: str, devices: StrCollection + ) -> None: + """Remove the pushers for a given user corresponding to a set of devices + + Args: + user_id: user to remove pushers for + devices: device IDs to remove pushers for + """ + device_ids = set(devices) + for p in await self.store.get_pushers_by_user_id(user_id): + if p.device_id in device_ids: + logger.info( + "Removing pusher for app id %s, pushkey %s, user %s", + p.app_id, + p.pushkey, + p.user_name, + ) + await self.remove_pusher(p.app_id, p.pushkey, p.user_name) + def on_new_notifications(self, max_token: RoomStreamToken) -> None: if not self.pushers: # nothing to do here. diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 281e8fd0adc2..331f22511618 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -425,7 +425,6 @@ async def on_PUT( ): await self.pusher_pool.add_or_update_pusher( user_id=user_id, - access_token=None, kind="email", app_id="m.email", app_display_name="Email Notifications", diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index 975eef2144b6..1a8f5292ac5c 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -126,7 +126,6 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: try: await self.pusher_pool.add_or_update_pusher( user_id=user.to_string(), - access_token=requester.access_token_id, kind=content["kind"], app_id=content["app_id"], app_display_name=content["app_display_name"], diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 9a24f7a65526..ab76b754e0e8 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -509,19 +509,24 @@ def __init__( async def _set_device_id_for_pushers( self, progress: JsonDict, batch_size: int ) -> int: - """Background update to populate the device_id column of the pushers table.""" + """ + Background update to populate the device_id column and clear the access_token + column for the pushers table. + """ last_pusher_id = progress.get("pusher_id", 0) def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: txn.execute( """ - SELECT p.id, at.device_id + SELECT + p.id AS pusher_id, + p.device_id AS pusher_device_id, + at.device_id AS token_device_id FROM pushers AS p - INNER JOIN access_tokens AS at + LEFT JOIN access_tokens AS at ON p.access_token = at.id WHERE p.access_token IS NOT NULL - AND at.device_id IS NOT NULL AND p.id > ? ORDER BY p.id LIMIT ? @@ -533,13 +538,27 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: if len(rows) == 0: return 0 + # The reason we're clearing the access_token column here is a bit subtle. + # When a user logs out, we: + # (1) delete the access token + # (2) delete the device + # + # Ideally, we would delete the pushers only via its link to the device + # during (2), but since this background update might not have fully run yet, + # we're still deleting the pushers via the access token during (1). self.db_pool.simple_update_many_txn( txn=txn, table="pushers", key_names=("id",), - key_values=[(row["id"],) for row in rows], - value_names=("device_id",), - value_values=[(row["device_id"],) for row in rows], + key_values=[(row["pusher_id"],) for row in rows], + value_names=("device_id", "access_token"), + # If there was already a device_id on the pusher, we only want to clear + # the access_token column, so we keep the existing device_id. Otherwise, + # we set the device_id we got from joining the access_tokens table. + value_values=[ + (row["pusher_device_id"] or row["token_device_id"], None) + for row in rows + ], ) self.db_pool.updates._background_update_progress_txn( @@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): async def add_pusher( self, user_id: str, - access_token: Optional[int], kind: str, app_id: str, app_display_name: str, @@ -581,13 +599,13 @@ async def add_pusher( profile_tag: str = "", enabled: bool = True, device_id: Optional[str] = None, + access_token_id: Optional[int] = None, ) -> None: async with self._pushers_id_gen.get_next() as stream_id: await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ - "access_token": access_token, "kind": kind, "app_display_name": app_display_name, "device_display_name": device_display_name, @@ -599,6 +617,10 @@ async def add_pusher( "id": stream_id, "enabled": enabled, "device_id": device_id, + # XXX(quenting): We're only really persisting the access token ID + # when updating an existing pusher. This is in case the + # 'set_device_id_for_pushers' background update hasn't finished yet. + "access_token": access_token_id, }, desc="add_pusher", ) diff --git a/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql new file mode 100644 index 000000000000..1367fb626733 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql @@ -0,0 +1,19 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Triggers the background update to set the device_id for pushers +-- that don't have one, and clear the access_token column. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7402, 'set_device_id_for_pushers', '{}'); diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 4ea5472eb475..4b5c96aeaea8 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -105,7 +105,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.hs.get_datastores().main.get_user_by_access_token(self.access_token) ) assert user_tuple is not None - self.token_id = user_tuple.token_id + self.device_id = user_tuple.device_id # We need to add email to account before we can create a pusher. self.get_success( @@ -117,7 +117,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: pusher = self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=self.user_id, - access_token=self.token_id, + device_id=self.device_id, kind="email", app_id="m.email", app_display_name="Email Notifications", @@ -141,7 +141,7 @@ def test_need_validated_email(self) -> None: self.get_success_or_raise( self.hs.get_pusherpool().add_or_update_pusher( user_id=self.user_id, - access_token=self.token_id, + device_id=self.device_id, kind="email", app_id="m.email", app_display_name="Email Notifications", diff --git a/tests/push/test_http.py b/tests/push/test_http.py index c280ddcdf6a9..99cec0836b1d 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -67,13 +67,13 @@ def test_invalid_configuration(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id def test_data(data: Any) -> None: self.get_failure( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -114,12 +114,12 @@ def test_sends_http(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -235,12 +235,12 @@ def test_sends_high_priority_for_encrypted(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -356,12 +356,12 @@ def test_sends_high_priority_for_one_to_one_only(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -443,12 +443,12 @@ def test_sends_high_priority_for_mention(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -521,12 +521,12 @@ def test_sends_high_priority_for_atroom(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -628,12 +628,12 @@ def _test_push_unread_count(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -764,12 +764,12 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -778,7 +778,6 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, enabled=enabled, - device_id=user_tuple.device_id, ) ) @@ -895,19 +894,17 @@ def test_null_enabled(self) -> None: def test_update_different_device_access_token_device_id(self) -> None: """Tests that if we create a pusher from one device, the update it from another - device, the access token and device ID associated with the pusher stays the - same. + device, the device ID associated with the pusher stays the same. """ # Create a user with a pusher. user_id, access_token = self._make_user_with_pusher("user") - # Get the token ID for the current access token, since that's what we store in - # the pushers table. Also get the device ID from it. + # Get the device ID for the current access token, since that's what we store in + # the pushers table. user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id device_id = user_tuple.device_id # Generate a new access token, and update the pusher with it. @@ -920,10 +917,9 @@ def test_update_different_device_access_token_device_id(self) -> None: ) pushers: List[PusherConfig] = list(ret) - # Check that we still have one pusher, and that the access token and device ID - # associated with it didn't change. + # Check that we still have one pusher, and that the device ID associated with + # it didn't change. self.assertEqual(len(pushers), 1) - self.assertEqual(pushers[0].access_token, token_id) self.assertEqual(pushers[0].device_id, device_id) @override_config({"experimental_features": {"msc3881_enabled": True}}) diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index 0798b021c3d0..dcb3e6669bb9 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -51,12 +51,12 @@ def _create_pusher_and_send_msg(self, localpart: str) -> str: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_dict is not None - token_id = user_dict.token_id + device_id = user_dict.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 4b8f889a71ba..b4241ceaf023 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -3047,12 +3047,12 @@ def test_get_pushers(self) -> None: self.store.get_user_by_access_token(other_user_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=self.other_user, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications",