Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Wait for lazy join to complete when getting current state #12872

Merged
merged 23 commits into from
Jun 1, 2022
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
78faa1a
Rename StateGroupStorage
erikjohnston May 25, 2022
334844d
Add fetching current state funcs to StateStorage
erikjohnston May 25, 2022
fe86915
Use StateStorage.get_current_state_ids
erikjohnston May 25, 2022
a2465b8
Use StateStorage.get_filtered_current_state_ids
erikjohnston May 25, 2022
a2945a5
Use StateStorage.get_current_state_deltas
erikjohnston May 25, 2022
e786f68
Rename to get_partial_current_state_ids
erikjohnston May 25, 2022
3f74b37
Rename to get_partial_filtered_current_state_ids
erikjohnston May 25, 2022
c068581
Rename to get_partial_current_state_deltas
erikjohnston May 25, 2022
6f386d1
Block calls to `get_current_state_ids` and co. when have partial state
erikjohnston May 25, 2022
2b674f8
Add test for `PartialCurrentStateTracker`
erikjohnston May 25, 2022
ecae768
Newsfile
erikjohnston May 25, 2022
d1a1d6a
Don't block sync when lazy joining a room
erikjohnston May 25, 2022
531955f
Fix type annotation
erikjohnston May 25, 2022
6cc7269
Apply suggestions from code review
erikjohnston May 26, 2022
3a20548
s/self.storage/self._storage/
erikjohnston May 27, 2022
6e7625e
s/is_room_got_partial_state/has_room_only_got_partial_state/
erikjohnston May 27, 2022
711ea44
Merge get_filtered and get_current_state_ids
erikjohnston May 27, 2022
1f99ce0
Merge remote-tracking branch 'origin/develop' into erikj/await_curren…
erikjohnston May 27, 2022
b59418f
Merge remote-tracking branch 'origin/develop' into erikj/await_curren…
erikjohnston May 31, 2022
c6fe132
Fix up naming
erikjohnston May 31, 2022
4757b00
Apply suggestions from code review
erikjohnston Jun 1, 2022
b6cb65f
Lint
erikjohnston Jun 1, 2022
c513c20
Rename has_room_only_got_partial_state
erikjohnston Jun 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Block calls to get_current_state_ids and co. when have partial state
erikjohnston committed May 25, 2022
commit 6f386d1513ee717c3bd6a8bbfaf01214a176876c
1 change: 1 addition & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
@@ -1488,6 +1488,7 @@ async def _sync_partial_state_room(
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self.storage.state.notify_room_un_partial_stated(room_id)

# TODO(faster_joins) update room stats and user directory?
return
13 changes: 13 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
@@ -1112,6 +1112,19 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None
keyvalues={"room_id": room_id},
)

async def is_room_got_partial_state(self, room_id: str) -> bool:
"Whether the given room only has partial state stored"

entry = await self.db_pool.simple_select_one_onecol(
table="partial_state_rooms",
keyvalues={"room_id": room_id},
retcol="room_id",
allow_none=True,
desc="is_room_got_partial_state",
)

return entry is not None


class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
19 changes: 18 additions & 1 deletion synapse/storage/state.py
Original file line number Diff line number Diff line change
@@ -34,7 +34,10 @@

from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateKey, StateMap

if TYPE_CHECKING:
@@ -590,10 +593,18 @@ def __init__(self, hs: "HomeServer", stores: "Databases"):
self._is_mine_id = hs.is_mine_id
self.stores = stores
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)

def notify_event_un_partial_stated(self, event_id: str) -> None:
self._partial_state_events_tracker.notify_un_partial_stated(event_id)

def notify_room_un_partial_stated(self, room_id: str) -> None:
"""Notify that the room no longer has any partial state.

Must be called after `clear_partial_state_room`
"""
self._partial_state_room_tracker.notify_un_partial_stated(room_id)

async def get_state_group_delta(
self, state_group: int
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
@@ -911,6 +922,7 @@ async def get_current_state_ids(
Returns:
The current state of the room.
"""
await self._partial_state_room_tracker.await_full_state(room_id)

return await self.stores.main.get_partial_current_state_ids(
room_id, on_invalidate=on_invalidate
@@ -931,6 +943,9 @@ async def get_filtered_current_state_ids(
Returns:
Map from type/state_key to event ID.
"""
if not state_filter or state_filter.must_await_full_state(self._is_mine_id):
await self._partial_state_room_tracker.await_full_state(room_id)

return await self.stores.main.get_partial_filtered_current_state_ids(
room_id, state_filter
)
@@ -985,6 +1000,8 @@ async def get_current_state_deltas(
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
"""
# FIXME(faster room joins): what do we do here?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how we want to deal with state deltas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errr, nor me. I guess we want to wait for full state though?

Suggested change
# FIXME(faster room joins): what do we do here?
# FIXME(faster_joins): what do we do here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, though the query isn't limited to a room so we'd effectively by blocking all forward progress on handling deltas until it completes?


return await self.stores.main.get_partial_current_state_deltas(
prev_stream_id, max_stream_id
)
60 changes: 60 additions & 0 deletions synapse/storage/util/partial_state_events_tracker.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.util import unwrapFirstError

logger = logging.getLogger(__name__)
@@ -118,3 +119,62 @@ async def await_full_state(self, event_ids: Collection[str]) -> None:
observer_set.discard(observer)
if not observer_set:
del self._observers[event_id]


class PartialCurrentStateTracker:
"""Keeps track of which rooms have partial state, after partial-state joins"""

def __init__(self, store: RoomWorkerStore):
self._store = store

# a map from room id to a set of Deferreds which are waiting for that room to be
# un-partial-stated.
self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)

def notify_un_partial_stated(self, room_id: str) -> None:
"""Notify that we now have full current state for a given room

Unblocks any callers to await_full_state() for that room.

Args:
room_id: the room that now has full current state.
"""
observers = self._observers.pop(room_id, None)
if not observers:
return
logger.info(
"Notifying %i things waiting for un-partial-stating of room %s",
len(observers),
room_id,
)
with PreserveLoggingContext():
for o in observers:
o.callback(None)

async def await_full_state(self, room_id: str) -> None:
# We add the deferred immediately so that the DB call to check for
# partial state doesn't race when we unpartial the room.
d = Deferred[None]()
self._observers.setdefault(room_id, set()).add(d)

try:
# Check if the room has partial current state or not.
has_partial_state = await self._store.is_room_got_partial_state(room_id)
if not has_partial_state:
return

logger.info(
"Awaiting un-partial-stating of room %s",
room_id,
)

await make_deferred_yieldable(d)

logger.info("Room has un-partial-stated")
finally:
# Remove the added observer, and remove the room entry if its empty.
ds = self._observers.get(room_id)
if ds is not None:
ds.discard(d)
if not ds:
self._observers.pop(room_id, None)