Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle the case of remote users leaving a partial join room for device lists #13885

Merged
merged 6 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13885.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly handle a race with device lists when a remote user leaves during a partial join.
2 changes: 1 addition & 1 deletion synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@

class AdminCmdSlavedStore(
SlavedFilteringStore,
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedDeviceStore,
TagsWorkerStore,
DeviceInboxWorkerStore,
AccountDataWorkerStore,
Expand Down
71 changes: 0 additions & 71 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,11 +598,6 @@ async def _persist_event_batch(
# room
state_delta_for_room: Dict[str, DeltaState] = {}

# Set of remote users which were in rooms the server has left or who may
# have left rooms the server is in. We should check if we still share any
# rooms and if not we mark their device lists as stale.
potentially_left_users: Set[str] = set()

if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
Expand Down Expand Up @@ -716,29 +711,13 @@ async def _persist_event_batch(
room_id,
ev_ctx_rm,
delta,
current_state,
potentially_left_users,
)
if not is_still_joined:
logger.info("Server no longer in room %s", room_id)
latest_event_ids = set()
current_state = {}
delta.no_longer_in_room = True

# Add all remote users that might have left rooms.
potentially_left_users.update(
user_id
for event_type, user_id in delta.to_delete
if event_type == EventTypes.Member
and not self.is_mine_id(user_id)
)
potentially_left_users.update(
user_id
for event_type, user_id in delta.to_insert.keys()
if event_type == EventTypes.Member
and not self.is_mine_id(user_id)
)

state_delta_for_room[room_id] = delta

await self.persist_events_store._persist_events_and_state_updates(
Expand All @@ -749,8 +728,6 @@ async def _persist_event_batch(
inhibit_local_membership_updates=backfilled,
)

await self._handle_potentially_left_users(potentially_left_users)

return replaced_events

async def _calculate_new_extremities(
Expand Down Expand Up @@ -1126,8 +1103,6 @@ async def _is_server_still_joined(
room_id: str,
ev_ctx_rm: List[Tuple[EventBase, EventContext]],
delta: DeltaState,
current_state: Optional[StateMap[str]],
potentially_left_users: Set[str],
) -> bool:
"""Check if the server will still be joined after the given events have
been persised.
Expand All @@ -1137,11 +1112,6 @@ async def _is_server_still_joined(
ev_ctx_rm
delta: The delta of current state between what is in the database
and what the new current state will be.
current_state: The new current state if it already been calculated,
otherwise None.
potentially_left_users: If the server has left the room, then joined
remote users will be added to this set to indicate that the
server may no longer be sharing a room with them.
"""

if not any(
Expand Down Expand Up @@ -1195,45 +1165,4 @@ async def _is_server_still_joined(
):
return True

# The server will leave the room, so we go and find out which remote
# users will still be joined when we leave.
if current_state is None:
current_state = await self.main_store.get_partial_current_state_ids(room_id)
current_state = dict(current_state)
for key in delta.to_delete:
current_state.pop(key, None)

current_state.update(delta.to_insert)

remote_event_ids = [
event_id
for (
typ,
state_key,
), event_id in current_state.items()
if typ == EventTypes.Member and not self.is_mine_id(state_key)
]
members = await self.main_store.get_membership_from_event_ids(remote_event_ids)
potentially_left_users.update(
member.user_id
for member in members.values()
if member and member.membership == Membership.JOIN
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this logic has moved into events.py.


return False

async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None:
"""Given a set of remote users check if the server still shares a room with
them. If not then mark those users' device cache as stale.
"""

if not user_ids:
return

joined_users = await self.main_store.get_users_server_still_shares_room_with(
user_ids
)
left_users = user_ids - joined_users

for user_id in left_users:
await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

class DataStore(
EventsBackgroundUpdatesStore,
DeviceStore,
RoomMemberStore,
RoomStore,
RoomBatchStore,
Expand Down Expand Up @@ -114,7 +115,6 @@ class DataStore(
StreamWorkerStore,
OpenIdStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
UserErasureStore,
Expand Down
64 changes: 50 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
make_tuple_comparison_clause,
)
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
Expand All @@ -70,7 +71,7 @@
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"


class DeviceWorkerStore(EndToEndKeyWorkerStore):
class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -985,24 +986,59 @@ async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None:
desc="mark_remote_user_device_cache_as_valid",
)

async def handle_potentially_left_users(self, user_ids: Set[str]) -> None:
"""Given a set of remote users check if the server still shares a room with
them. If not then mark those users' device cache as stale.
"""

if not user_ids:
return

await self.db_pool.runInteraction(
"_handle_potentially_left_users",
self.handle_potentially_left_users_txn,
user_ids,
)

def handle_potentially_left_users_txn(
self,
txn: LoggingTransaction,
user_ids: Set[str],
) -> None:
"""Given a set of remote users check if the server still shares a room with
them. If not then mark those users' device cache as stale.
"""

if not user_ids:
return

joined_users = self.get_users_server_still_shares_room_with_txn(txn, user_ids)
left_users = user_ids - joined_users

for user_id in left_users:
self.mark_remote_user_device_list_as_unsubscribed_txn(txn, user_id)

async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
"""Mark that we no longer track device lists for remote user."""

def _mark_remote_user_device_list_as_unsubscribed_txn(
txn: LoggingTransaction,
) -> None:
self.db_pool.simple_delete_txn(
txn,
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
)
self._invalidate_cache_and_stream(
txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
)

await self.db_pool.runInteraction(
"mark_remote_user_device_list_as_unsubscribed",
_mark_remote_user_device_list_as_unsubscribed_txn,
self.mark_remote_user_device_list_as_unsubscribed_txn,
user_id,
)

def mark_remote_user_device_list_as_unsubscribed_txn(
self,
txn: LoggingTransaction,
user_id: str,
) -> None:
self.db_pool.simple_delete_txn(
txn,
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
)
self._invalidate_cache_and_stream(
txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
)

async def get_dehydrated_device(
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,12 @@ def _update_current_state_txn(
txn, room_id, members_changed
)

# Check if any of the remote membership changes requires us to
# unsubscribe from their device lists.
self.store.handle_potentially_left_users_txn(
txn, {m for m in members_changed if not self.hs.is_mine_id(m)}
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we no longer check if the old memberships were joins, which makes this slightly more expensive than at present.


def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
events.
Expand Down
46 changes: 26 additions & 20 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,31 +662,37 @@ async def get_users_server_still_shares_room_with(
if not user_ids:
return set()

def _get_users_server_still_shares_room_with_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT state_key FROM current_state_events
WHERE
type = 'm.room.member'
AND membership = 'join'
AND %s
GROUP BY state_key
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "state_key", user_ids
)
return await self.db_pool.runInteraction(
"get_users_server_still_shares_room_with",
self.get_users_server_still_shares_room_with_txn,
user_ids,
)

txn.execute(sql % (clause,), args)
def get_users_server_still_shares_room_with_txn(
self,
txn: LoggingTransaction,
user_ids: Collection[str],
) -> Set[str]:
if not user_ids:
return set()

return {row[0] for row in txn}
sql = """
SELECT state_key FROM current_state_events
WHERE
type = 'm.room.member'
AND membership = 'join'
AND %s
GROUP BY state_key
"""

return await self.db_pool.runInteraction(
"get_users_server_still_shares_room_with",
_get_users_server_still_shares_room_with_txn,
clause, args = make_in_list_sql_clause(
self.database_engine, "state_key", user_ids
)

txn.execute(sql % (clause,), args)

return {row[0] for row in txn}

@cancellable
async def get_rooms_for_user(
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
Expand Down