Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Pass currently syncing devices between workers.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Aug 15, 2023
1 parent 1e703b5 commit 04a185f
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 54 deletions.
158 changes: 113 additions & 45 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
import abc
import contextlib
import itertools
import logging
from bisect import bisect
from contextlib import contextmanager
Expand Down Expand Up @@ -191,7 +192,9 @@ async def user_syncing(
"""

@abc.abstractmethod
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
def get_currently_syncing_users_for_replication(
self,
) -> Iterable[Tuple[str, Optional[str]]]:
"""Get an iterable of syncing users on this worker, to send to the presence handler
This is called when a replication connection is established. It should return
Expand Down Expand Up @@ -286,7 +289,12 @@ async def bump_presence_active_time(
"""

async def update_external_syncs_row( # noqa: B027 (no-op by design)
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
self,
process_id: str,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.
Expand All @@ -297,6 +305,7 @@ async def update_external_syncs_row( # noqa: B027 (no-op by design)
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
device_id: The device ID of the device that has started or stopped syncing.
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
Expand Down Expand Up @@ -427,16 +436,18 @@ def __init__(self, hs: "HomeServer"):
hs.config.worker.writers.presence,
)

# The number of ongoing syncs on this process, by user id.
# The number of ongoing syncs on this process, by (user ID, device ID).
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs: Dict[str, int] = {}
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}

self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline: Dict[str, int] = {}
# (user_id, device_id) -> last_sync_ms. Lists the devices that have stopped
# syncing but we haven't notified the presence writer of that yet
self.user_devices_going_offline: Dict[Tuple[str, Optional[str]], int] = {}

self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
Expand All @@ -459,39 +470,47 @@ async def _on_shutdown(self) -> None:
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
def send_user_sync(
self,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
last_sync_ms: int,
) -> None:
if self._presence_enabled:
self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
self.instance_id, user_id, device_id, is_syncing, last_sync_ms
)

def mark_as_coming_online(self, user_id: str) -> None:
def mark_as_coming_online(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
"""
going_offline = self.users_going_offline.pop(user_id, None)
going_offline = self.user_devices_going_offline.pop((user_id, device_id), None)
if not going_offline:
# Safe to skip because we haven't yet told the presence writer they
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
self.send_user_sync(user_id, device_id, True, self.clock.time_msec())

def mark_as_going_offline(self, user_id: str) -> None:
def mark_as_going_offline(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
"""
self.users_going_offline[user_id] = self.clock.time_msec()
self.user_devices_going_offline[(user_id, device_id)] = self.clock.time_msec()

def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
for (user_id, device_id), last_sync_ms in list(
self.user_devices_going_offline.items()
):
if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
self.user_devices_going_offline.pop((user_id, device_id), None)
self.send_user_sync(user_id, device_id, False, last_sync_ms)

async def user_syncing(
self,
Expand Down Expand Up @@ -523,23 +542,23 @@ async def user_syncing(
is_sync=True,
)

curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1
curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1

# If we went from no in flight sync to some, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
if self._user_device_to_num_current_syncs[(user_id, device_id)] == 1:
self.mark_as_coming_online(user_id, device_id)

def _end() -> None:
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1
if (user_id, device_id) in self._user_device_to_num_current_syncs:
self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1

# If we went from one in flight sync to non, notify replication
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)
if self._user_device_to_num_current_syncs[(user_id, device_id)] == 0:
self.mark_as_going_offline(user_id, device_id)

@contextlib.contextmanager
def _user_syncing() -> Generator[None, None, None]:
Expand Down Expand Up @@ -606,10 +625,15 @@ async def process_replication_rows(
# If this is a federation sender, notify about presence updates.
await self.maybe_send_presence_to_interested_destinations(state_to_notify)

def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
def get_currently_syncing_users_for_replication(
self,
) -> Iterable[Tuple[str, Optional[str]]]:
return [
user_id
for user_id, count in self._user_to_num_current_syncs.items()
(user_id, device_id)
for (
user_id,
device_id,
), count in self._user_device_to_num_current_syncs.items()
if count > 0
]

Expand Down Expand Up @@ -736,14 +760,18 @@ def __init__(self, hs: "HomeServer"):
] = {}

# Keeps track of the number of *ongoing* syncs on other processes.
# While any sync is ongoing on another process the user will never
#
# While any sync is ongoing on another process the user's device will never
# go offline.
#
# Each process has a unique identifier and an update frequency. If
# no update is received from that process within the update period then
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs: Dict[str, Set[str]] = {}
# Stored as a dict from process_id to set of (user_id, device_id), and
# a dict of process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs: Dict[
str, Set[Tuple[str, Optional[str]]]
] = {}
self.external_process_last_updated_ms: Dict[str, int] = {}

self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
Expand Down Expand Up @@ -953,7 +981,10 @@ async def _handle_timeouts(self) -> None:
# that were syncing on that process to see if they need to be timed
# out.
users_to_check.update(
self.external_process_to_current_syncs.pop(process_id, ())
user_id
for user_id, device_id in self.external_process_to_current_syncs.pop(
process_id, ()
)
)
self.external_process_last_updated_ms.pop(process_id)

Expand All @@ -969,9 +1000,10 @@ async def _handle_timeouts(self) -> None:
user_id: {device_id for device_id, count in devices.items() if count}
for user_id, devices in self.user_to_device_to_num_current_syncs.items()
}
# XXX This will not work yet.
# for user_ids in self.external_process_to_current_syncs.values():
# syncing_user_devices.update(user_ids)
for user_id, device_id in itertools.chain(
*self.external_process_to_current_syncs.values()
):
syncing_user_devices.setdefault(user_id, set()).add(device_id)

changes = handle_timeouts(
states,
Expand Down Expand Up @@ -1072,6 +1104,9 @@ async def user_syncing(

# To keep the single process behaviour consistent with worker mode, run the
# same logic as `update_external_syncs_row`, even though it looks weird.
#
# TODO This does not really match update_external_syncs_row which sets
# the device state and then generates a new state from that.
if prev_state.state == PresenceState.OFFLINE:
await self._update_states(
[
Expand Down Expand Up @@ -1115,12 +1150,19 @@ def _user_syncing() -> Generator[None, None, None]:

return _user_syncing()

def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
def get_currently_syncing_users_for_replication(
self,
) -> Iterable[Tuple[str, Optional[str]]]:
# since we are the process handling presence, there is nothing to do here.
return []

async def update_external_syncs_row(
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
self,
process_id: str,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.
Expand All @@ -1129,6 +1171,7 @@ async def update_external_syncs_row(
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
device_id: The device ID of the device that has started or stopped syncing.
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
Expand All @@ -1139,12 +1182,23 @@ async def update_external_syncs_row(
process_id, set()
)

devices = self.user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id, UserDevicePresenceState.default(user_id, device_id)
)
device_state.last_sync_ts = sync_time_msec

updates = []
if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE:
if is_syncing and (user_id, device_id) not in process_presence:
if device_state.state == PresenceState.OFFLINE:
# Mark the device as online since it is not syncing, this
# might mark the user as online, so send an update.
device_state.state = PresenceState.ONLINE
presence = _combine_device_states(devices.values())

updates.append(
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
state=presence,
last_active_ts=sync_time_msec,
last_user_sync_ts=sync_time_msec,
)
Expand All @@ -1153,14 +1207,14 @@ async def update_external_syncs_row(
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
)
process_presence.add(user_id)
elif user_id in process_presence:
process_presence.add((user_id, device_id))
elif (user_id, device_id) in process_presence:
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
)

if not is_syncing:
process_presence.discard(user_id)
process_presence.discard((user_id, device_id))

if updates:
await self._update_states(updates)
Expand All @@ -1177,9 +1231,23 @@ async def update_external_syncs_clear(self, process_id: str) -> None:
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
prev_states = await self.current_state_for_users(process_presence)

# Mark each device as having a last sync time.
time_now_ms = self.clock.time_msec()
updated_users = set()
for user_id, device_id in process_presence:
device_state = self.user_to_device_to_current_state.setdefault(
user_id, {}
).setdefault(
device_id, UserDevicePresenceState.default(user_id, device_id)
)

device_state.last_sync_ts = time_now_ms
updated_users.add(user_id)

# Update eah user (and insert into the appropriate timers to check if
# they've gone offline).
prev_states = await self.current_state_for_users(updated_users)
await self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
Expand Down
17 changes: 14 additions & 3 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,38 @@ class UserSyncCommand(Command):
NAME = "USER_SYNC"

def __init__(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
self,
instance_id: str,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
last_sync_ms: int,
):
self.instance_id = instance_id
self.user_id = user_id
self.device_id = device_id
self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms

@classmethod
def from_line(cls: Type["UserSyncCommand"], line: str) -> "UserSyncCommand":
instance_id, user_id, state, last_sync_ms = line.split(" ", 3)
device_id: Optional[str]
instance_id, user_id, device_id, state, last_sync_ms = line.split(" ", 4)

if device_id == "None":
device_id = None

if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))

return cls(instance_id, user_id, state == "start", int(last_sync_ms))
return cls(instance_id, user_id, device_id, state == "start", int(last_sync_ms))

def to_line(self) -> str:
return " ".join(
(
self.instance_id,
self.user_id,
str(self.device_id),
"start" if self.is_syncing else "end",
str(self.last_sync_ms),
)
Expand Down
Loading

0 comments on commit 04a185f

Please sign in to comment.