Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Process device list updates asynchronously
Browse files Browse the repository at this point in the history
Carries on from #12321.
  • Loading branch information
erikjohnston committed Apr 5, 2022
1 parent 66053b6 commit f2b3d45
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 96 deletions.
8 changes: 0 additions & 8 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,6 @@ def read_config(self, config, **kwargs):
config.get("use_account_validity_in_account_status") or False
)

# This is a temporary option that enables fully using the new
# `device_lists_changes_in_room` without the backwards compat code. This
# is primarily for testing. If enabled the server should *not* be
# downgraded, as it may lead to missing device list updates.
self.use_new_device_lists_changes_in_room = (
config.get("use_new_device_lists_changes_in_room") or False
)

self.rooms_to_exclude_from_sync: List[str] = (
config.get("exclude_rooms_from_sync") or []
)
Expand Down
28 changes: 0 additions & 28 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,6 @@ def __init__(self, hs: "HomeServer"):
# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)

# Used to decide if we calculate outbound pokes up front or not. By
# default we do to allow safely downgrading Synapse.
self.use_new_device_lists_changes_in_room = (
hs.config.server.use_new_device_lists_changes_in_room
)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -490,23 +484,9 @@ async def notify_device_update(

room_ids = await self.store.get_rooms_for_user(user_id)

hosts: Optional[Set[str]] = None
if not self.use_new_device_lists_changes_in_room:
hosts = set()

if self.hs.is_mine_id(user_id):
for room_id in room_ids:
joined_users = await self.store.get_users_in_room(room_id)
hosts.update(get_domain_from_id(u) for u in joined_users)

set_tag("target_hosts", hosts)

hosts.discard(self.server_name)

position = await self.store.add_device_change_to_streams(
user_id,
device_ids,
hosts=hosts,
room_ids=room_ids,
)

Expand All @@ -528,14 +508,6 @@ async def notify_device_update(
# We may need to do some processing asynchronously.
self._handle_new_device_update_async()

if hosts:
logger.info(
"Sending device list update notif for %r to: %r", user_id, hosts
)
for host in hosts:
self.federation_sender.send_device_messages(host, immediate=False)
log_kv({"message": "sent device update to host", "host": host})

async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
) -> None:
Expand Down
61 changes: 9 additions & 52 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,6 @@ async def add_device_change_to_streams(
self,
user_id: str,
device_ids: Collection[str],
hosts: Optional[Collection[str]],
room_ids: Collection[str],
) -> Optional[int]:
"""Persist that a user's devices have been updated, and which hosts
Expand All @@ -1592,9 +1591,6 @@ async def add_device_change_to_streams(
user_id: The ID of the user whose device changed.
device_ids: The IDs of any changed devices. If empty, this function will
return None.
hosts: The remote destinations that should be notified of the change. If
None then the set of hosts have *not* been calculated, and will be
calculated later by a background task.
room_ids: The rooms that the user is in
Returns:
Expand All @@ -1606,58 +1602,30 @@ async def add_device_change_to_streams(

context = get_active_span_text_map()

def add_device_changes_txn(
txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
):
def add_device_changes_txn(txn, stream_ids):
self._add_device_change_to_stream_txn(
txn,
user_id,
device_ids,
stream_ids_for_device_change,
stream_ids,
)

self._add_device_outbound_room_poke_txn(
txn,
user_id,
device_ids,
room_ids,
stream_ids_for_device_change,
context,
hosts_have_been_calculated=hosts is not None,
)

# If the set of hosts to send to has not been calculated yet (and so
# `hosts` is None) or there are no `hosts` to send to, then skip
# trying to persist them to the DB.
if not hosts:
return

self._add_device_outbound_poke_to_stream_txn(
txn,
user_id,
device_ids,
hosts,
stream_ids_for_outbound_pokes,
stream_ids,
context,
)

# `device_lists_stream` wants a stream ID per device update.
num_stream_ids = len(device_ids)

if hosts:
# `device_lists_outbound_pokes` wants a different stream ID for
# each row, which is a row per host per device update.
num_stream_ids += len(hosts) * len(device_ids)

async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
stream_ids_for_device_change = stream_ids[: len(device_ids)]
stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]

async with self._device_list_id_gen.get_next_mult(
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
"add_device_change_to_stream",
add_device_changes_txn,
stream_ids_for_device_change,
stream_ids_for_outbound_pokes,
stream_ids,
)

return stream_ids[-1]
Expand Down Expand Up @@ -1752,19 +1720,8 @@ def _add_device_outbound_room_poke_txn(
room_ids: Collection[str],
stream_ids: List[str],
context: Dict[str, str],
hosts_have_been_calculated: bool,
) -> None:
"""Record the user in the room has updated their device.
Args:
hosts_have_been_calculated: True if `device_lists_outbound_pokes`
has been updated already with the updates.
"""

# We only need to convert to outbound pokes if they are our user.
converted_to_destinations = (
hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
)
"""Record the user in the room has updated their device."""

encoded_context = json_encoder.encode(context)

Expand All @@ -1789,7 +1746,7 @@ def _add_device_outbound_room_poke_txn(
device_id,
room_id,
stream_id,
converted_to_destinations,
False,
encoded_context,
)
for room_id in room_ids
Expand Down
8 changes: 0 additions & 8 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from typing import Optional
from unittest.mock import Mock

from parameterized import parameterized_class
from signedjson import key, sign
from signedjson.types import BaseKey, SigningKey

Expand Down Expand Up @@ -155,12 +154,6 @@ def test_send_receipts_with_backoff(self):
)


@parameterized_class(
[
{"enable_room_poke_code_path": False},
{"enable_room_poke_code_path": True},
]
)
class FederationSenderDevicesTestCases(HomeserverTestCase):
servlets = [
admin.register_servlets,
Expand All @@ -175,7 +168,6 @@ def make_homeserver(self, reactor, clock):
def default_config(self):
c = super().default_config()
c["send_federation"] = True
c["use_new_device_lists_changes_in_room"] = self.enable_room_poke_code_path
return c

def prepare(self, reactor, clock, hs):
Expand Down

0 comments on commit f2b3d45

Please sign in to comment.