Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Timeout individual device presence states.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Aug 23, 2023
1 parent d9456e3 commit a55a198
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 47 deletions.
119 changes: 82 additions & 37 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from types import TracebackType
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Collection,
Expand Down Expand Up @@ -762,6 +763,19 @@ def __init__(self, hs: "HomeServer"):
now = self.clock.time_msec()
if self._presence_enabled:
for state in self.user_to_current_state.values():
# Create a psuedo-device to properly handle time outs. This will
# be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
pseudo_device_id = None
self._user_to_device_to_current_state[state.user_id] = {
pseudo_device_id: UserDevicePresenceState(
user_id=state.user_id,
device_id=pseudo_device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
}

self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
Expand Down Expand Up @@ -803,7 +817,7 @@ def __init__(self, hs: "HomeServer"):

# Keeps track of the number of *ongoing* syncs on other processes.
#
# While any sync is ongoing on another process the user will never
# While any sync is ongoing on another process the user's device will never
# go offline.
#
# Each process has a unique identifier and an update frequency. If
Expand Down Expand Up @@ -1032,20 +1046,21 @@ async def _handle_timeouts(self) -> None:

timers_fired_counter.inc(len(states))

syncing_user_ids = {
user_id
for (user_id, _), count in self._user_device_to_num_current_syncs.items()
# Set of user ID & device IDs which are currently syncing.
syncing_user_devices = {
user_id_device_id
for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count
}
for user_id, device_id in itertools.chain(
*self.external_process_to_current_syncs.values()
):
syncing_user_ids.add(user_id)
syncing_user_devices.update(
itertools.chain(*self.external_process_to_current_syncs.values())
)

changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=syncing_user_ids,
syncing_user_devices=syncing_user_devices,
user_to_devices=self._user_to_device_to_current_state,
now=now,
)

Expand Down Expand Up @@ -1973,7 +1988,8 @@ def get_current_key(self) -> int:
def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
syncing_user_ids: Set[str],
syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as
Expand All @@ -1982,7 +1998,8 @@ def handle_timeouts(
Args:
user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours
syncing_user_ids: Set of user_ids with active syncs.
syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
Expand All @@ -1991,24 +2008,36 @@ def handle_timeouts(
changes = {} # Actual changes we need to notify people about

for state in user_states:
is_mine = is_mine_fn(state.user_id)

new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
user_id = state.user_id
is_mine = is_mine_fn(user_id)

new_state = handle_timeout(
state,
is_mine,
syncing_user_devices,
user_to_devices.get(user_id, {}),
now,
)
if new_state:
changes[state.user_id] = new_state

return list(changes.values())


def handle_timeout(
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
state: UserPresenceState,
is_mine: bool,
syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
user_devices: Dict[Optional[str], UserDevicePresenceState],
now: int,
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state
state: UserPresenceState to check.
is_mine: Whether the user is ours
syncing_user_ids: Set of user_ids with active syncs.
syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
user_devices: A map of device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
Expand All @@ -2019,34 +2048,50 @@ def handle_timeout(
return None

changed = False
user_id = state.user_id

if is_mine:
if state.state == PresenceState.ONLINE:
if now - state.last_active_ts > IDLE_TIMER:
# Currently online, but last activity ages ago so auto
# idle
state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
changed = True
elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
# stopped updating.
# Check per-device whether the device should be considered idle or offline
# due to timeouts.
device_changed = False
for device_id, device_state in user_devices.items():
if device_state.state == PresenceState.ONLINE:
if now - device_state.last_active_ts > IDLE_TIMER:
# Currently online, but last activity ages ago so auto
# idle
device_state.state = PresenceState.UNAVAILABLE
device_changed = True

# If there are have been no sync for a while (and none ongoing),
# set presence to offline.
if (state.user_id, device_id) not in syncing_device_ids:
# If the user has done something recently but hasn't synced,
# don't set them as offline.
sync_or_active = max(
device_state.last_sync_ts, device_state.last_active_ts
)

if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
# Mark the device as going offline.
device_state.state = PresenceState.OFFLINE
device_changed = True

# If the presence state of the devices changed, then (maybe) update
# the user's overall presence state.
if device_changed:
new_presence = _combine_device_states(user_devices.values())
if new_presence != state.state:
state = state.copy_and_replace(state=new_presence)
changed = True

if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
# stopped updating.
changed = True

if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
# Need to send ping to other servers to ensure they don't
# timeout and set us to offline
changed = True

# If there are have been no sync for a while (and none ongoing),
# set presence to offline
if user_id not in syncing_user_ids:
# If the user has done something recently but hasn't synced,
# don't set them as offline.
sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(state=PresenceState.OFFLINE)
changed = True
else:
# We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
Expand Down
Loading

0 comments on commit a55a198

Please sign in to comment.