Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track device list updates per room. #12321

Merged
merged 30 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b5a9c6b
Operate on room IDs
erikjohnston Mar 25, 2022
6fa639e
Don't copy the hosts list
erikjohnston Mar 28, 2022
ecf98b9
Write to a new `device_lists_changes_in_room` table.
erikjohnston Mar 28, 2022
c5dd83f
Convert to doing everything in a transaction
erikjohnston Mar 28, 2022
bb44214
Track if we've calculated remote hosts
erikjohnston Mar 28, 2022
98fceb3
We actually don't want stream_id to be unique
erikjohnston Mar 28, 2022
eda0e64
Handle room pokes that haven't been converted to outbound pokes
erikjohnston Mar 29, 2022
c7790ab
Deduplicate outbound pokes
erikjohnston Mar 29, 2022
6e9b31a
Add a config option that allows using new code path
erikjohnston Mar 29, 2022
3b2ab93
Add tests for new code path
erikjohnston Mar 29, 2022
f8af30f
Newsfile
erikjohnston Mar 29, 2022
7266580
Fix tests
erikjohnston Mar 29, 2022
f24b70b
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Mar 31, 2022
8bd8ee2
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
56f0913
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
90d41a0
Encode opentracing context just once.
erikjohnston Apr 4, 2022
c470a12
Rename var
erikjohnston Apr 4, 2022
d030062
Remove `if not room_ids` check.
erikjohnston Apr 4, 2022
ad5d46b
Add unique index
erikjohnston Apr 4, 2022
d5031b0
Note lack of foreign key constraint
erikjohnston Apr 4, 2022
28dacc8
Add comment about stream_id duplicates
erikjohnston Apr 4, 2022
f48527f
Update synapse_port_db
erikjohnston Apr 4, 2022
bd45f19
Inequality the wrong way round
erikjohnston Apr 4, 2022
dee8f55
Add note about 'num_stream_ids'
erikjohnston Apr 4, 2022
3574541
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Apr 4, 2022
cf04f1a
Use different stream IDs for device_list_outbound_pokes
erikjohnston Apr 4, 2022
89e10d7
Correctly order device list stream updates
erikjohnston Apr 4, 2022
e54d2d4
Wake up replication after adding otubound pokes
erikjohnston Apr 4, 2022
7d79dee
Apply suggestions from code review
erikjohnston Apr 4, 2022
b61c5c7
Remove get_users_who_share_room_with_user stub in test
erikjohnston Apr 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ def read_config(self, config, **kwargs):
config.get("use_account_validity_in_account_status") or False
)

# This is a temporary option that enables fully using the new
# `device_lists_changes_in_room` without the backwards compat code. This
# is primarily for testing. If enabled the server should *not* be
# downgraded, as it may lead to missing device list updates.
self.use_new_device_lists_changes_in_room = (
config.get("use_new_device_lists_changes_in_room") or False
)

def has_tls_listener(self) -> bool:
return any(listener.tls for listener in self.listeners)

Expand Down
23 changes: 16 additions & 7 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ def __init__(self, hs: "HomeServer"):
# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)

# Used to decide if we calculate outbound pokes up front or not. By
# default we do to allow safely downgrading Synapse.
self.use_new_device_lists_changes_in_room = (
hs.config.server.use_new_device_lists_changes_in_room
)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -484,15 +490,18 @@ async def notify_device_update(

room_ids = await self.store.get_rooms_for_user(user_id)

hosts: Set[str] = set()
if self.hs.is_mine_id(user_id):
for room_id in room_ids:
joined_users = await self.store.get_users_in_room(room_id)
hosts.update(get_domain_from_id(u) for u in joined_users)
hosts: Optional[Set[str]] = None
if self.use_new_device_lists_changes_in_room:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the wrong way round? we should skip calculating hosts if use_new_device_lists_changes_in_room?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bah, yes. And this is the only place that uses it directly ─ everything else infers it from hosts being None ─ so nothing broke.

I'm not sure if there is an easy way to test that this is the right way round?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might be able to block the _handle_new_device_update_async job in the tests (eg by setting _handle_new_device_update_is_processing), and check what actually gets written to the db?

Not sure it's worth it though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this code is going to be ripped out in the next release anyway

hosts = set()

if self.hs.is_mine_id(user_id):
for room_id in room_ids:
joined_users = await self.store.get_users_in_room(room_id)
hosts.update(get_domain_from_id(u) for u in joined_users)

hosts.discard(self.server_name)
set_tag("target_hosts", hosts)

set_tag("target_hosts", hosts)
hosts.discard(self.server_name)

position = await self.store.add_device_change_to_streams(
user_id,
Expand Down
29 changes: 22 additions & 7 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ async def add_device_change_to_streams(
self,
user_id: str,
device_ids: Collection[str],
hosts: Collection[str],
hosts: Optional[Collection[str]],
room_ids: Collection[str],
) -> Optional[int]:
"""Persist that a user's devices have been updated, and which hosts
Expand All @@ -1519,7 +1519,9 @@ async def add_device_change_to_streams(
user_id: The ID of the user whose device changed.
device_ids: The IDs of any changed devices. If empty, this function will
return None.
hosts: The remote destinations that should be notified of the change.
hosts: The remote destinations that should be notified of the change. If
None then the set of hosts have *not* been calculated, and will be
calculated later by a background task.
room_ids: The rooms that the user is in

Returns:
Expand All @@ -1529,10 +1531,9 @@ async def add_device_change_to_streams(
if not device_ids:
return None

num_stream_ids = max(
len(device_ids),
len(hosts) * len(device_ids),
)
num_stream_ids = len(device_ids)
if hosts:
num_stream_ids = len(hosts) * len(device_ids)

context = get_active_span_text_map()

Expand All @@ -1554,6 +1555,7 @@ def add_device_changes_txn(txn, stream_ids):
room_ids,
stream_ids,
context,
hosts_have_been_calculated=hosts is not None,
)

if not hosts:
Expand Down Expand Up @@ -1668,7 +1670,20 @@ def _add_device_outbound_room_poke_txn(
room_ids: Collection[str],
stream_ids: List[str],
context: Dict[str, str],
hosts_have_been_calculated: bool,
) -> None:
"""Record the user in the room has updated their device.

Args:
hosts_have_been_calculated: True if `device_lists_outbound_pokes`
has been updated already with the updates.
"""

# We only need to convert to outbound pokes if they are our user.
converted_to_destinations = (
hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
)

self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_changes_in_room",
Expand All @@ -1686,7 +1701,7 @@ def _add_device_outbound_room_poke_txn(
device_id,
room_id,
stream_id,
True, # As we're updating `device_lists_outbound_pokes` at the same time.
converted_to_destinations,
json_encoder.encode(context),
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
)
for room_id in room_ids
Expand Down