From 286469f2d678eb8c75f6a9bf5fa67b3d1d33df99 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 13:18:10 -0800 Subject: [PATCH 01/25] add class UnpersistedEventContext --- synapse/events/snapshot.py | 79 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 1c0e96bec720..8f6956357236 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -16,11 +16,13 @@ import attr from frozendict import frozendict +from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService from synapse.events import EventBase from synapse.types import JsonDict, StateMap if TYPE_CHECKING: + from synapse.state import StateHandler from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore from synapse.storage.state import StateFilter @@ -254,6 +256,83 @@ async def get_prev_state_ids( ) +@attr.s(slots=True, auto_attribs=True) +class UnpersistedEventContext: + """ + This is a version of an EventContext before the new state group (if any) has been + computed and stored. It contains information about the state before the event (which + also may be the information after the event, if the event is not a state event). The + UnpersistedEventContext must be converted into an EventContext by calling the method + 'persist' on it before it is suitable to be sent to the DB for processing. + + state_handler: + an instance of the class StateHandler, this is required for all + but outlier instances + state_group_before_event: + the state group at/before the event. This is required if `for_batch` is True. + state_map: + the current state of the room + for_batch: + whether the context is part of a group of events being created for batch persisting + to the DB. + app_service: If this event is being sent by a (local) application service, that + app service. + + """ + _storage: "StorageControllers" + state_handler: Optional["StateHandler"] = None + state_group_before_event: Optional[int] = None + state_map: Optional[StateMap[str]] = None + for_batch: bool = False + app_service: Optional[ApplicationService] = None + + @staticmethod + def for_outlier( + storage: "StorageControllers", + ) -> "UnpersistedEventContext": + """Returns an UnpersistedEventContext instance suitable for an outlier event""" + return UnpersistedEventContext(storage=storage) + + async def persist( + self, + event: EventBase, + ) -> "EventContext": + """ + Converts an UnpersistedEventContext into an EventContext, which is now suitable for + sending to the DB along with the event. + """ + if event.internal_metadata.is_outlier(): + return EventContext.for_outlier(self._storage) + + assert self.state_handler is not None + if self.for_batch: + assert self.state_group_before_event is not None + assert self.state_map is not None + return await self.state_handler.compute_event_context_for_batched( + event, self.state_map, self.state_group_before_event + ) + elif event.type == EventTypes.MSC2716_INSERTION: + return await self.state_handler.compute_event_context( + event, state_ids_before_event=self.state_map, partial_state=False + ) + return await self.state_handler.compute_event_context(event) + + async def get_prev_state_ids( + self, state_filter: Optional["StateFilter"] = None + ) -> StateMap[str]: + """ + Returns the state at/before the event (which also may be the same as after the event + if the event is not a state event.) + """ + if self.state_map is not None: + return self.state_map + else: + assert self.state_group_before_event is not None + return await self._storage.state.get_state_ids_for_group( + self.state_group_before_event, state_filter + ) + + def _encode_state_dict( state_dict: Optional[StateMap[str]], ) -> Optional[List[Tuple[str, str, str]]]: From 6322dac8b6ebd6361a2f6631ad794351f4d10aa5 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 13:36:37 -0800 Subject: [PATCH 02/25] modify create new client event to create unpersistedeventcontexts --- synapse/handlers/message.py | 52 ++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5cbe89f4fddf..90915f7bb9b1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -49,7 +49,7 @@ from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase, relation_from_event from synapse.events.builder import EventBuilder -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContext from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler from synapse.logging import opentracing @@ -708,7 +708,8 @@ async def create_event( builder.internal_metadata.historical = historical - event, context = await self.create_new_client_event( + + event, unpersisted_context = await self.create_new_client_event( builder=builder, requester=requester, allow_no_prev_events=allow_no_prev_events, @@ -721,6 +722,8 @@ async def create_event( current_state_group=current_state_group, ) + context = await unpersisted_context.persist(event) + # In an ideal world we wouldn't need the second part of this condition. However, # this behaviour isn't spec'd yet, meaning we should be able to deactivate this # behaviour. Another reason is that this code is also evaluated each time a new @@ -1179,8 +1182,13 @@ async def create_new_client_event( event = await builder.build( prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth ) - context = await self.state.compute_event_context_for_batched( - event, state_map, current_state_group + + context = UnpersistedEventContext( + self._storage_controllers, + state_map=state_map, + state_handler=self.state, + state_group_before_event=current_state_group, + for_batch=True, ) else: event = await builder.build( @@ -1231,16 +1239,36 @@ async def create_new_client_event( state_map_for_event[(data.event_type, data.state_key)] = state_id - context = await self.state.compute_event_context( - event, - state_ids_before_event=state_map_for_event, - # TODO(faster_joins): check how MSC2716 works and whether we can have - # partial state here - # https://github.com/matrix-org/synapse/issues/13003 - partial_state=False, + context = UnpersistedEventContext( + self._storage_controllers, + state_map=state_map_for_event, + state_handler=self.state, ) else: - context = await self.state.compute_event_context(event) + entry = ( + await self.hs.get_state_handler().resolve_state_groups_for_events( + event.room_id, + event.prev_event_ids(), + await_full_state=False, + ) + ) + + if not entry.state_group: + state_before_event = await entry.get_state( + self.state._state_storage_controller, StateFilter.all() + ) + + context = UnpersistedEventContext( + self._storage_controllers, + state_map=state_before_event, + state_handler=self.state, + ) + else: + context = UnpersistedEventContext( + self._storage_controllers, + state_group_before_event=entry.state_group, + state_handler=self.state, + ) if requester: context.app_service = requester.app_service From 54fa7f05b6b80dd67b6b10917d5ddeefd0e658da Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 13:36:14 -0800 Subject: [PATCH 03/25] persist event contexts after creation --- synapse/handlers/federation.py | 54 +++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3398fcaf7d47..fb83f3c509eb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -56,7 +56,7 @@ from synapse.crypto.event_signing import compute_event_signature from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContext from synapse.events.validator import EventValidator from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict @@ -1123,7 +1123,7 @@ async def on_make_leave_request( }, ) - event, context = await self.event_creation_handler.create_new_client_event( + event, _ = await self.event_creation_handler.create_new_client_event( builder=builder ) @@ -1173,12 +1173,14 @@ async def on_make_knock_request( }, ) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) + + ( + event, + unpersisted_context, + ) = await self.event_creation_handler.create_new_client_event(builder=builder) event_allowed, _ = await self.third_party_event_rules.check_event_allowed( - event, context + event, unpersisted_context ) if not event_allowed: logger.warning("Creation of knock %s forbidden by third-party rules", event) @@ -1343,14 +1345,19 @@ async def exchange_third_party_invite( ) EventValidator().validate_builder(builder) - event, context = await self.event_creation_handler.create_new_client_event( + event, unpersisted_context = await self.event_creation_handler.create_new_client_event( builder=builder ) - event, context = await self.add_display_name_to_third_party_invite( - room_version_obj, event_dict, event, context + ( + event, + unpersisted_context, + ) = await self.add_display_name_to_third_party_invite( + room_version_obj, event_dict, event, unpersisted_context ) + context = await unpersisted_context.persist(event) + EventValidator().validate_new(event, self.config) # We need to tell the transaction queue to send this out, even @@ -1400,13 +1407,17 @@ async def on_exchange_third_party_invite_request( room_version_obj, event_dict ) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) - event, context = await self.add_display_name_to_third_party_invite( - room_version_obj, event_dict, event, context + ( + event, + unpersisted_context, + ) = await self.event_creation_handler.create_new_client_event(builder=builder) + + event, unpersisted_context = await self.add_display_name_to_third_party_invite( + room_version_obj, event_dict, event, unpersisted_context ) + context = await unpersisted_context.persist(event) + try: validate_event_for_room_version(event) await self._event_auth_handler.check_auth_rules_from_context(event) @@ -1428,8 +1439,8 @@ async def add_display_name_to_third_party_invite( room_version_obj: RoomVersion, event_dict: JsonDict, event: EventBase, - context: EventContext, - ) -> Tuple[EventBase, EventContext]: + context: UnpersistedEventContext, + ) -> Tuple[EventBase, UnpersistedEventContext]: key = ( EventTypes.ThirdPartyInvite, event.content["third_party_invite"]["signed"]["token"], @@ -1463,11 +1474,14 @@ async def add_display_name_to_third_party_invite( room_version_obj, event_dict ) EventValidator().validate_builder(builder) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) + + ( + event, + unpersisted_context, + ) = await self.event_creation_handler.create_new_client_event(builder=builder) + EventValidator().validate_new(event, self.config) - return event, context + return event, unpersisted_context async def _check_signature(self, event: EventBase, context: EventContext) -> None: """ From d23d1bed56c8ac20f26255f161a74b24affd9eaf Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 13:37:14 -0800 Subject: [PATCH 04/25] fix tests to persist unpersisted event contexts --- tests/handlers/test_user_directory.py | 4 +++- tests/rest/admin/test_user.py | 4 +++- tests/storage/test_redaction.py | 23 +++++++++++++++++------ tests/storage/test_state.py | 4 +++- tests/test_utils/event_injection.py | 7 ++++++- tests/test_visibility.py | 9 ++++++--- tests/utils.py | 5 ++++- 7 files changed, 42 insertions(+), 14 deletions(-) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 75fc5a17a47b..e9be5fb504f8 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -949,10 +949,12 @@ def _add_user_to_room( }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) + self.get_success( self.hs.get_storage_controllers().persistence.persist_event(event, context) ) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 5c1ced355ff1..b50406e12929 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -2934,10 +2934,12 @@ def test_get_rooms_with_nonlocal_user(self) -> None: }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) + self.get_success(storage_controllers.persistence.persist_event(event, context)) # Now get rooms diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 6c4e63b77cac..d5caf2e5aa2f 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -67,10 +67,12 @@ def inject_room_member( }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) + self.get_success(self._storage.persistence.persist_event(event, context)) return event @@ -89,10 +91,12 @@ def inject_message(self, room, user, body): }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) + self.get_success(self._storage.persistence.persist_event(event, context)) return event @@ -110,10 +114,12 @@ def inject_redaction(self, room, event_id, user, reason): }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) + self.get_success(self._storage.persistence.persist_event(event, context)) return event @@ -250,7 +256,7 @@ def type(self): def internal_metadata(self): return self._base_builder.internal_metadata - event_1, context_1 = self.get_success( + event_1, unpersisted_context_1 = self.get_success( self.event_creation_handler.create_new_client_event( EventIdManglingBuilder( self.event_builder_factory.for_room_version( @@ -268,9 +274,11 @@ def internal_metadata(self): ) ) + context_1 = self.get_success(unpersisted_context_1.persist(event_1)) + self.get_success(self._storage.persistence.persist_event(event_1, context_1)) - event_2, context_2 = self.get_success( + event_2, unpersisted_context_2 = self.get_success( self.event_creation_handler.create_new_client_event( EventIdManglingBuilder( self.event_builder_factory.for_room_version( @@ -287,6 +295,7 @@ def internal_metadata(self): ) ) ) + context_2 = self.get_success(unpersisted_context_2.persist(event_2)) self.get_success(self._storage.persistence.persist_event(event_2, context_2)) # fetch one of the redactions @@ -406,10 +415,12 @@ def test_store_redacted_redaction(self): }, ) - redaction_event, context = self.get_success( + redaction_event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(redaction_event)) + self.get_success( self._storage.persistence.persist_event(redaction_event, context) ) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 5564161750e6..e5ad43bf9e53 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -60,10 +60,12 @@ def inject_state_event(self, room, sender, typ, state_key, content): }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) + self.get_success(self.storage.persistence.persist_event(event, context)) return event diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index 8027c7a856e2..a4b752583773 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -92,8 +92,13 @@ async def create_event( builder = hs.get_event_builder_factory().for_room_version( KNOWN_ROOM_VERSIONS[room_version], kwargs ) - event, context = await hs.get_event_creation_handler().create_new_client_event( + ( + event, + unpersisted_context, + ) = await hs.get_event_creation_handler().create_new_client_event( builder, prev_event_ids=prev_event_ids ) + context = await unpersisted_context.persist(event) + return event, context diff --git a/tests/test_visibility.py b/tests/test_visibility.py index d0b9ad54540d..478e917c3aa5 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -175,9 +175,10 @@ def _inject_visibility(self, user_id: str, visibility: str) -> EventBase: }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) self.get_success( self._storage_controllers.persistence.persist_event(event, context) ) @@ -202,9 +203,10 @@ def _inject_room_member( }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) self.get_success( self._storage_controllers.persistence.persist_event(event, context) @@ -226,9 +228,10 @@ def _inject_message( }, ) - event, context = self.get_success( + event, unpersisted_context = self.get_success( self.event_creation_handler.create_new_client_event(builder) ) + context = self.get_success(unpersisted_context.persist(event)) self.get_success( self._storage_controllers.persistence.persist_event(event, context) diff --git a/tests/utils.py b/tests/utils.py index d76bf9716ad2..15fabbc2d01e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -335,6 +335,9 @@ async def create_room(hs: HomeServer, room_id: str, creator_id: str) -> None: }, ) - event, context = await event_creation_handler.create_new_client_event(builder) + event, unpersisted_context = await event_creation_handler.create_new_client_event( + builder + ) + context = await unpersisted_context.persist(event) await persistence_store.persist_event(event, context) From b483243dfbb8aba4ac00c3f0b15d0ccaccadb0a2 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 13:37:21 -0800 Subject: [PATCH 05/25] cleanup --- synapse/events/third_party_rules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 72ab69689887..518660f0b472 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -18,7 +18,7 @@ from synapse.api.errors import ModuleFailedException, SynapseError from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContext from synapse.storage.roommember import ProfileInfo from synapse.types import Requester, StateMap from synapse.util.async_helpers import delay_cancellation, maybe_awaitable From c008b6f90641131c7e9cb877ae4644b89f61625e Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 15:19:19 -0800 Subject: [PATCH 06/25] misc lints + cleanup --- synapse/events/snapshot.py | 1 + synapse/events/third_party_rules.py | 4 ++-- synapse/handlers/federation.py | 15 +++++++++++---- synapse/handlers/federation_event.py | 5 +++-- synapse/handlers/message.py | 16 ++++++---------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 8f6956357236..a57fb47317c2 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -279,6 +279,7 @@ class UnpersistedEventContext: app service. """ + _storage: "StorageControllers" state_handler: Optional["StateHandler"] = None state_group_before_event: Optional[int] = None diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 518660f0b472..2fac1593e3a7 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple, Union from twisted.internet.defer import CancelledError @@ -231,7 +231,7 @@ def register_third_party_rules_callbacks( self._on_threepid_bind_callbacks.append(on_threepid_bind) async def check_event_allowed( - self, event: EventBase, context: EventContext + self, event: EventBase, context: Union[EventContext, UnpersistedEventContext] ) -> Tuple[bool, Optional[dict]]: """Check if a provided event should be allowed in the given context. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fb83f3c509eb..3c0716a03e0b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -935,7 +935,10 @@ async def on_make_join_request( ) try: - event, context = await self.event_creation_handler.create_new_client_event( + ( + event, + unpersisted_context, + ) = await self.event_creation_handler.create_new_client_event( builder=builder ) except SynapseError as e: @@ -943,7 +946,9 @@ async def on_make_join_request( raise # Ensure the user can even join the room. - await self._federation_event_handler.check_join_restrictions(context, event) + await self._federation_event_handler.check_join_restrictions( + unpersisted_context, event + ) # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` @@ -1173,7 +1178,6 @@ async def on_make_knock_request( }, ) - ( event, unpersisted_context, @@ -1345,7 +1349,10 @@ async def exchange_third_party_invite( ) EventValidator().validate_builder(builder) - event, unpersisted_context = await self.event_creation_handler.create_new_client_event( + ( + event, + unpersisted_context, + ) = await self.event_creation_handler.create_new_client_event( builder=builder ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f7223b03c364..a49eb4a88dc2 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -27,6 +27,7 @@ Sequence, Set, Tuple, + Union, ) from prometheus_client import Counter, Histogram @@ -57,7 +58,7 @@ validate_event_for_room_version, ) from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContext from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import ( @@ -424,7 +425,7 @@ async def on_send_membership_event( return event, context async def check_join_restrictions( - self, context: EventContext, event: EventBase + self, context: Union[EventContext, UnpersistedEventContext], event: EventBase ) -> None: """Check that restrictions in restricted join rules are matched diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 90915f7bb9b1..b1c458c485c8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -708,7 +708,6 @@ async def create_event( builder.internal_metadata.historical = historical - event, unpersisted_context = await self.create_new_client_event( builder=builder, requester=requester, @@ -1073,7 +1072,7 @@ async def create_new_client_event( state_map: Optional[StateMap[str]] = None, for_batch: bool = False, current_state_group: Optional[int] = None, - ) -> Tuple[EventBase, EventContext]: + ) -> Tuple[EventBase, UnpersistedEventContext]: """Create a new event for a local client. If bool for_batch is true, will create an event using the prev_event_ids, and will create an event context for the event using the parameters state_map and current_state_group, thus these parameters @@ -1201,7 +1200,7 @@ async def create_new_client_event( # after it is created if builder.internal_metadata.outlier: event.internal_metadata.outlier = True - context = EventContext.for_outlier(self._storage_controllers) + context = UnpersistedEventContext.for_outlier(self._storage_controllers) elif ( event.type == EventTypes.MSC2716_INSERTION and state_event_ids @@ -1287,7 +1286,7 @@ async def create_new_client_event( elif new_content is not None: # the third-party rules want to replace the event. We'll need to build a new # event. - event, context = await self._rebuild_event_after_third_party_rules( + event = await self._rebuild_event_after_third_party_rules( new_content, event ) @@ -2076,9 +2075,9 @@ def _expire_rooms_to_exclude_from_dummy_event_insertion(self) -> None: async def _rebuild_event_after_third_party_rules( self, third_party_result: dict, original_event: EventBase - ) -> Tuple[EventBase, EventContext]: + ) -> EventBase: # the third_party_event_rules want to replace the event. - # we do some basic checks, and then return the replacement event and context. + # we do some basic checks, and then return the replacement event. # Construct a new EventBuilder and validate it, which helps with the # rest of these checks. @@ -2130,7 +2129,4 @@ async def _rebuild_event_after_third_party_rules( auth_event_ids=None, ) - # we rebuild the event context, to be on the safe side. If nothing else, - # delta_ids might need an update. - context = await self.state.compute_event_context(event) - return event, context + return event From 0f07c0e089766410b4f7e2bd141333551b870354 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 15:51:06 -0800 Subject: [PATCH 07/25] changelog + fix comments --- changelog.d/14675.misc | 1 + synapse/events/snapshot.py | 3 +++ synapse/handlers/message.py | 5 +++-- 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 changelog.d/14675.misc diff --git a/changelog.d/14675.misc b/changelog.d/14675.misc new file mode 100644 index 000000000000..bc1ac1c82aeb --- /dev/null +++ b/changelog.d/14675.misc @@ -0,0 +1 @@ +Add a class UnpersistedEventContext to allow for the batching up of storing state groups. diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a57fb47317c2..1e5605ab0512 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -313,6 +313,9 @@ async def persist( event, self.state_map, self.state_group_before_event ) elif event.type == EventTypes.MSC2716_INSERTION: + # TODO(faster_joins): check how MSC2716 works and whether we can have + # partial state here + # https://github.com/matrix-org/synapse/issues/13003 return await self.state_handler.compute_event_context( event, state_ids_before_event=self.state_map, partial_state=False ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b1c458c485c8..e1973690ad2e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1078,7 +1078,8 @@ async def create_new_client_event( the event using the parameters state_map and current_state_group, thus these parameters must be provided in this case if for_batch is True. The subsequently created event and context are suitable for being batched up and bulk persisted to the database - with other similarly created events. + with other similarly created events. Note that this returns an UnpersistedEventContext, + which must be converted to an EventContext before it can be sent to the DB. Args: builder: @@ -1120,7 +1121,7 @@ async def create_new_client_event( batch persisting Returns: - Tuple of created event, context + Tuple of created event, UnpersistedEventContext """ # Strip down the state_event_ids to only what we need to auth the event. # For example, we don't need extra m.room.member that don't match event.sender From 78a71ed11c866273520849c4a0e3b14f6e477094 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 15:56:06 -0800 Subject: [PATCH 08/25] lints --- tests/storage/test_redaction.py | 14 ++++++-------- tests/storage/test_state.py | 1 - 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 5df29cdcd7b5..0100f7da14c6 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -80,7 +80,7 @@ def inject_room_member( # type: ignore[override] context = self.get_success(unpersisted_context.persist(event)) - self.get_success(self._storage._persistence.persist_event(event, context)) + self.get_success(self._persistence.persist_event(event, context)) return event @@ -104,7 +104,7 @@ def inject_message(self, room: RoomID, user: UserID, body: str) -> EventBase: context = self.get_success(unpersisted_context.persist(event)) - self.get_success(self._storage._persistence.persist_event(event, context)) + self.get_success(self._persistence.persist_event(event, context)) return event @@ -129,7 +129,7 @@ def inject_redaction( context = self.get_success(unpersisted_context.persist(event)) - self.get_success(self._storage._persistence.persist_event(event, context)) + self.get_success(self._persistence.persist_event(event, context)) return event @@ -288,7 +288,7 @@ def internal_metadata(self) -> _EventInternalMetadata: context_1 = self.get_success(unpersisted_context_1.persist(event_1)) - self.get_success(self._storage._persistence.persist_event(event_1, context_1)) + self.get_success(self._persistence.persist_event(event_1, context_1)) event_2, unpersisted_context_2 = self.get_success( self.event_creation_handler.create_new_client_event( @@ -312,7 +312,7 @@ def internal_metadata(self) -> _EventInternalMetadata: ) context_2 = self.get_success(unpersisted_context_2.persist(event_2)) - self.get_success(self._storage._persistence.persist_event(event_2, context_2)) + self.get_success(self._persistence.persist_event(event_2, context_2)) # fetch one of the redactions fetched = self.get_success(self.store.get_event(redaction_event_id1)) @@ -437,9 +437,7 @@ def test_store_redacted_redaction(self) -> None: context = self.get_success(unpersisted_context.persist(redaction_event)) - self.get_success( - self._storage._persistence.persist_event(redaction_event, context) - ) + self.get_success(self._persistence.persist_event(redaction_event, context)) # Now lets jump to the future where we have censored the redaction event # in the DB. diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index d890aefd8c85..f730b888f7d2 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -71,7 +71,6 @@ def inject_state_event( self.event_creation_handler.create_new_client_event(builder) ) - context = self.get_success(unpersisted_context.persist(event)) assert self.storage.persistence is not None From 6321576e7e36faa6469d7db63b24776edc731d4c Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Dec 2022 21:13:04 -0800 Subject: [PATCH 09/25] fix batch insertion? --- synapse/events/snapshot.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 2c5879722559..b0394de81bf1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -312,7 +312,11 @@ async def persist( return await self.state_handler.compute_event_context_for_batched( event, self.state_map, self.state_group_before_event ) - elif event.type == EventTypes.MSC2716_INSERTION: + elif ( + event.type == EventTypes.MSC2716_INSERTION + and self.state_map + and event.internal_metadata.is_historical() + ): # TODO(faster_joins): check how MSC2716 works and whether we can have # partial state here # https://github.com/matrix-org/synapse/issues/13003 From d6454ced08f5f8add398f4ee8877e9abd1f6aaf0 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 14 Dec 2022 13:45:27 -0800 Subject: [PATCH 10/25] reduce redundant calculation --- synapse/events/snapshot.py | 10 ++++++++-- synapse/handlers/message.py | 2 ++ synapse/state/__init__.py | 16 +++++++++++----- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index b0394de81bf1..14a3db8b9c1c 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -22,7 +22,7 @@ from synapse.types import JsonDict, StateMap if TYPE_CHECKING: - from synapse.state import StateHandler + from synapse.state import StateHandler, _StateCacheEntry from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore from synapse.types.state import StateFilter @@ -275,6 +275,9 @@ class UnpersistedEventContext: for_batch: whether the context is part of a group of events being created for batch persisting to the DB. + state_entry: + if a state entry is created while calculating this context, it can be stored and + passed in when persisting the context to avoid duplicated work app_service: If this event is being sent by a (local) application service, that app service. @@ -285,6 +288,7 @@ class UnpersistedEventContext: state_group_before_event: Optional[int] = None state_map: Optional[StateMap[str]] = None for_batch: bool = False + state_entry: Optional["_StateCacheEntry"] = None app_service: Optional[ApplicationService] = None @staticmethod @@ -323,7 +327,9 @@ async def persist( return await self.state_handler.compute_event_context( event, state_ids_before_event=self.state_map, partial_state=False ) - return await self.state_handler.compute_event_context(event) + return await self.state_handler.compute_event_context( + event, entry=self.state_entry + ) async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ed0af1d929be..c870b3039fc7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1263,12 +1263,14 @@ async def create_new_client_event( self._storage_controllers, state_map=state_before_event, state_handler=self.state, + state_entry=entry, ) else: context = UnpersistedEventContext( self._storage_controllers, state_group_before_event=entry.state_group, state_handler=self.state, + state_entry=entry, ) if requester: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index ee5469d5a8e9..ff9e557bfffa 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -261,6 +261,7 @@ async def compute_event_context( event: EventBase, state_ids_before_event: Optional[StateMap[str]] = None, partial_state: Optional[bool] = None, + entry: Optional[_StateCacheEntry] = None, ) -> EventContext: """Build an EventContext structure for a non-outlier event. @@ -281,6 +282,9 @@ async def compute_event_context( `False` if `state_ids_before_event` is the full state. `None` when `state_ids_before_event` is not provided. In this case, the flag will be calculated based on `event`'s prev events. + entry: + A state cache entry for the resolved state across the prev events. We may + have already calculated this, so if it's available pass it in Returns: The event context. @@ -339,11 +343,12 @@ async def compute_event_context( logger.debug("calling resolve_state_groups from compute_event_context") # we've already taken into account partial state, so no need to wait for # complete state here. - entry = await self.resolve_state_groups_for_events( - event.room_id, - event.prev_event_ids(), - await_full_state=False, - ) + if not entry: + entry = await self.resolve_state_groups_for_events( + event.room_id, + event.prev_event_ids(), + await_full_state=False, + ) state_group_before_event_prev_group = entry.prev_group deltas_to_state_group_before_event = entry.delta_ids @@ -396,6 +401,7 @@ async def compute_event_context( if state_ids_before_event is not None: replaces = state_ids_before_event.get(key) else: + assert entry is not None replaces_state_map = await entry.get_state( self._state_storage_controller, StateFilter.from_types([key]) ) From 8b994362cbb6c15ea0cb5e7f5195a8051f2adcc1 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 9 Jan 2023 10:35:19 -0800 Subject: [PATCH 11/25] add unpersisted event classes --- synapse/events/snapshot.py | 282 ++++++++++++++++++++++++++++--------- 1 file changed, 216 insertions(+), 66 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 14a3db8b9c1c..a98959939a3f 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -11,25 +11,44 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABC, abstractmethod from typing import TYPE_CHECKING, List, Optional, Tuple import attr from frozendict import frozendict -from synapse.api.constants import EventTypes +# from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService from synapse.events import EventBase from synapse.types import JsonDict, StateMap if TYPE_CHECKING: - from synapse.state import StateHandler, _StateCacheEntry from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore from synapse.types.state import StateFilter +class UnpersistedEventContextBase(ABC): + def __init__(self, storage_controller: "StorageControllers"): + self._storage: "StorageControllers" = storage_controller + self.app_service: Optional[ApplicationService] = None + + @abstractmethod + async def persist( + self, + event: EventBase, + ) -> "EventContext": + pass + + @abstractmethod + async def get_prev_state_ids( + self, state_filter: Optional["StateFilter"] = None + ) -> StateMap[str]: + pass + + @attr.s(slots=True, auto_attribs=True) -class EventContext: +class EventContext(UnpersistedEventContextBase): """ Holds information relevant to persisting an event @@ -124,6 +143,9 @@ def for_outlier( """Return an EventContext instance suitable for persisting an outlier event""" return EventContext(storage=storage) + async def persist(self, event: EventBase) -> "EventContext": + return self + async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict: """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -256,8 +278,7 @@ async def get_prev_state_ids( ) -@attr.s(slots=True, auto_attribs=True) -class UnpersistedEventContext: +class UnpersistedEventContextForBatched(UnpersistedEventContextBase): """ This is a version of an EventContext before the new state group (if any) has been computed and stored. It contains information about the state before the event (which @@ -265,85 +286,214 @@ class UnpersistedEventContext: UnpersistedEventContext must be converted into an EventContext by calling the method 'persist' on it before it is suitable to be sent to the DB for processing. - state_handler: - an instance of the class StateHandler, this is required for all - but outlier instances state_group_before_event: the state group at/before the event. This is required if `for_batch` is True. - state_map: - the current state of the room - for_batch: - whether the context is part of a group of events being created for batch persisting - to the DB. - state_entry: - if a state entry is created while calculating this context, it can be stored and - passed in when persisting the context to avoid duplicated work - app_service: If this event is being sent by a (local) application service, that - app service. - """ + state_group: + this will always be None until it is persisted - _storage: "StorageControllers" - state_handler: Optional["StateHandler"] = None - state_group_before_event: Optional[int] = None - state_map: Optional[StateMap[str]] = None - for_batch: bool = False - state_entry: Optional["_StateCacheEntry"] = None - app_service: Optional[ApplicationService] = None + state_group_before_event: The ID of the state group representing the state + of the room before this event. - @staticmethod - def for_outlier( - storage: "StorageControllers", - ) -> "UnpersistedEventContext": - """Returns an UnpersistedEventContext instance suitable for an outlier event""" - return UnpersistedEventContext(storage=storage) + state_delta_due_to_event: If the event is a state event, then this is the + delta of the state between `state_group` and `state_group_before_event` - async def persist( + prev_group: If it is known, ``state_group``'s prev_group. + + If the event is a state event, this is normally the same as + ``state_group_before_event``. + + delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` + and ``state_group``. + + state_map: A map of the current state of the room + """ + + def __init__( self, - event: EventBase, - ) -> "EventContext": + storage_controller: "StorageControllers", + state_group: Optional[int], + state_delta_due_to_event: dict, + prev_group: Optional[int], + delta_ids: Optional[StateMap[str]], + partial_state: bool, + state_map: StateMap[str], + state_group_before_event: int, + ) -> None: + super().__init__(storage_controller) + self.state_group = state_group + self.state_delta_due_to_event = state_delta_due_to_event + self.prev_group = prev_group + self.delta_ids = delta_ids + self.partial_state = partial_state + self.state_map = state_map + self.state_group_before_event = state_group_before_event + + async def get_prev_state_ids( + self, state_filter: Optional["StateFilter"] = None + ) -> StateMap[str]: """ - Converts an UnpersistedEventContext into an EventContext, which is now suitable for - sending to the DB along with the event. + Gets the room state map, excluding this event. + + Args: + state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules + + Returns: + Maps a (type, state_key) to the event ID of the state event matching + this tuple. + """ + return self.state_map + + async def persist(self, event: EventBase) -> EventContext: + """ + If the event is a state event, calculates the current state group for the context, + stores it, and returns a EventContext. If the event is not state, returns + an EventContext. + + Args: + event: event that the EventContext is associated with. + + Returns: An EventContext suitable for sending to the database with the event + for persisting """ - if event.internal_metadata.is_outlier(): - return EventContext.for_outlier(self._storage) - - assert self.state_handler is not None - if self.for_batch: - assert self.state_group_before_event is not None - assert self.state_map is not None - return await self.state_handler.compute_event_context_for_batched( - event, self.state_map, self.state_group_before_event + assert self.partial_state is not None + if not event.is_state(): + return EventContext.with_state( + storage=self._storage, + state_group=self.state_group_before_event, + state_group_before_event=self.state_group_before_event, + state_delta_due_to_event=self.state_delta_due_to_event, + partial_state=self.partial_state, + prev_group=self.prev_group, + delta_ids=self.delta_ids, ) - elif ( - event.type == EventTypes.MSC2716_INSERTION - and self.state_map - and event.internal_metadata.is_historical() - ): - # TODO(faster_joins): check how MSC2716 works and whether we can have - # partial state here - # https://github.com/matrix-org/synapse/issues/13003 - return await self.state_handler.compute_event_context( - event, state_ids_before_event=self.state_map, partial_state=False + else: + state_group_after_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + current_state_ids=None, ) - return await self.state_handler.compute_event_context( - event, entry=self.state_entry - ) + + return EventContext.with_state( + storage=self._storage, + state_group=state_group_after_event, + state_group_before_event=self.state_group_before_event, + state_delta_due_to_event=self.state_delta_due_to_event, + partial_state=self.partial_state, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + ) + + +class UnpersistedEventContext(UnpersistedEventContextBase): + """ + This is a version of an EventContext before the new state group (if any) has been + computed and stored. It contains information about the state before the event (which + also may be the information after the event, if the event is not a state event). The + UnpersistedEventContext must be converted into an EventContext by calling the method + 'persist' on it before it is suitable to be sent to the DB for processing. + + state_group_before_event: + the state group at/before the event. This is required if `for_batch` is True. + state_group: + this will always be None until it is persisted + + state_group_before_event: The ID of the state group representing the state + of the room before this event. + + state_delta_due_to_event: If the event is a state event, then this is the + delta of the state between `state_group` and `state_group_before_event` + + prev_group: If it is known, ``state_group``'s prev_group. + + If the event is a state event, this is normally the same as + ``state_group_before_event``. + + delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` + and ``state_group``. + + partial_state: Whether the event has partial state. + """ + + def __init__( + self, + storage_controller: "StorageControllers", + state_group_before_event: Optional[int], + state_group: Optional[int], + state_delta_due_to_event: Optional[dict], + prev_group: Optional[int], + delta_ids: Optional[StateMap[str]], + partial_state: Optional[bool], + ): + super().__init__(storage_controller) + self.state_group_before_event = state_group_before_event + self.state_group = state_group + self.state_delta_due_to_event = state_delta_due_to_event + self.prev_group = prev_group + self.delta_ids = delta_ids + self.partial_state = partial_state async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None ) -> StateMap[str]: """ - Returns the state at/before the event (which also may be the same as after the event - if the event is not a state event.) + Gets the room state map, excluding this event. + + Args: + state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules + + Returns: + Maps a (type, state_key) to the event ID of the state event matching + this tuple. """ - if self.state_map is not None: - return self.state_map + + assert self.state_group_before_event is not None + return await self._storage.state.get_state_ids_for_group( + self.state_group_before_event, state_filter + ) + + async def persist(self, event: EventBase) -> EventContext: + """ + If the event is a state event, calculates the current state group for the context, + stores it, and returns a EventContext. If the event is not state, returns + an EventContext. + + Args: + event: event that the EventContext is associated with. + + Returns: An EventContext suitable for sending to the database with the event + for persisting + """ + assert self.partial_state is not None + if not event.is_state(): + return EventContext.with_state( + storage=self._storage, + state_group=self.state_group_before_event, + state_group_before_event=self.state_group_before_event, + state_delta_due_to_event=self.state_delta_due_to_event, + partial_state=self.partial_state, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + ) else: - assert self.state_group_before_event is not None - return await self._storage.state.get_state_ids_for_group( - self.state_group_before_event, state_filter + state_group_after_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + current_state_ids=None, + ) + + return EventContext.with_state( + storage=self._storage, + state_group=state_group_after_event, + state_group_before_event=self.state_group_before_event, + state_delta_due_to_event=self.state_delta_due_to_event, + partial_state=self.partial_state, + prev_group=self.prev_group, + delta_ids=self.delta_ids, ) From 1668fd22936d8b7802d7d9548f44442c27901d11 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 9 Jan 2023 10:36:32 -0800 Subject: [PATCH 12/25] rework compute_event_context, split into function that returns unpersisted event context and then persists it --- synapse/state/__init__.py | 240 +++++++++++++++++++------------------- 1 file changed, 123 insertions(+), 117 deletions(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index ff9e557bfffa..12ef5b8642b5 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -39,7 +39,12 @@ from synapse.api.constants import EventTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events import EventBase -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import ( + EventContext, + UnpersistedEventContext, + UnpersistedEventContextBase, + UnpersistedEventContextForBatched, +) from synapse.logging.context import ContextResourceUsage from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet from synapse.state import v1, v2 @@ -256,43 +261,39 @@ async def get_hosts_in_room_at_events( state = await entry.get_state(self._state_storage_controller, StateFilter.all()) return await self.store.get_joined_hosts(room_id, state, entry) - async def compute_event_context( + async def calculate_context_info( self, event: EventBase, state_ids_before_event: Optional[StateMap[str]] = None, partial_state: Optional[bool] = None, entry: Optional[_StateCacheEntry] = None, - ) -> EventContext: - """Build an EventContext structure for a non-outlier event. - - (for an outlier, call EventContext.for_outlier directly) - - This works out what the current state should be for the event, and - generates a new state group if necessary. - - Args: - event: - state_ids_before_event: The event ids of the state before the event if - it can't be calculated from existing events. This is normally - only specified when receiving an event from federation where we - don't have the prev events, e.g. when backfilling. - partial_state: - `True` if `state_ids_before_event` is partial and omits non-critical - membership events. - `False` if `state_ids_before_event` is the full state. - `None` when `state_ids_before_event` is not provided. In this case, the - flag will be calculated based on `event`'s prev events. - entry: - A state cache entry for the resolved state across the prev events. We may - have already calculated this, so if it's available pass it in - Returns: - The event context. - - Raises: - RuntimeError if `state_ids_before_event` is not provided and one or more - prev events are missing or outliers. + current_state_group: Optional[int] = None, + ) -> UnpersistedEventContextBase: + """ + Calulates the contents of an unpersisted event context, other than the current + state group (which is either provided or calculated when the event context is persisted) + + state_ids_before_event: The event ids of the full state before the event if + it can't be calculated from existing events. This is normally + only specified when receiving an event from federation where we + don't have the prev events, e.g. when backfilling or when the event + is being created for batch persisting. + partial_state: + `True` if `state_ids_before_event` is partial and omits non-critical + membership events. + `False` if `state_ids_before_event` is the full state. + `None` when `state_ids_before_event` is not provided. In this case, the + flag will be calculated based on `event`'s prev events. + entry: + A state cache entry for the resolved state across the prev events. We may + have already calculated this, so if it's available pass it in + Returns: + The event context. + + Raises: + RuntimeError if `state_ids_before_event` is not provided and one or more + prev events are missing or outliers. """ - assert not event.internal_metadata.is_outlier() # @@ -304,16 +305,19 @@ async def compute_event_context( state_group_before_event_prev_group = None deltas_to_state_group_before_event = None - # .. though we need to get a state group for it. - state_group_before_event = ( - await self._state_storage_controller.store_state_group( - event.event_id, - event.room_id, - prev_group=None, - delta_ids=None, - current_state_ids=state_ids_before_event, + # .. though we need to get a state group for it if we don't have it + if not current_state_group: + state_group_before_event = ( + await self._state_storage_controller.store_state_group( + event.event_id, + event.room_id, + prev_group=None, + delta_ids=None, + current_state_ids=state_ids_before_event, + ) ) - ) + else: + state_group_before_event = current_state_group # the partial_state flag must be provided assert partial_state is not None @@ -343,6 +347,7 @@ async def compute_event_context( logger.debug("calling resolve_state_groups from compute_event_context") # we've already taken into account partial state, so no need to wait for # complete state here. + if not entry: entry = await self.resolve_state_groups_for_events( event.room_id, @@ -382,15 +387,28 @@ async def compute_event_context( # if not event.is_state(): - return EventContext.with_state( - storage=self._storage_controllers, - state_group_before_event=state_group_before_event, - state_group=state_group_before_event, - state_delta_due_to_event={}, - prev_group=state_group_before_event_prev_group, - delta_ids=deltas_to_state_group_before_event, - partial_state=partial_state, - ) + if state_ids_before_event: + return UnpersistedEventContextForBatched( + storage_controller=self._storage_controllers, + state_group_before_event=state_group_before_event, + state_group=state_group_before_event, + state_delta_due_to_event={}, + prev_group=state_group_before_event_prev_group, + delta_ids=deltas_to_state_group_before_event, + partial_state=partial_state, + state_map=state_ids_before_event, + ) + + else: + return UnpersistedEventContext( + storage_controller=self._storage_controllers, + state_group_before_event=state_group_before_event, + state_group=state_group_before_event, + state_delta_due_to_event={}, + prev_group=state_group_before_event_prev_group, + delta_ids=deltas_to_state_group_before_event, + partial_state=partial_state, + ) # # otherwise, we'll need to create a new state group for after the event @@ -412,88 +430,76 @@ async def compute_event_context( delta_ids = {key: event.event_id} - state_group_after_event = ( - await self._state_storage_controller.store_state_group( - event.event_id, - event.room_id, + if state_ids_before_event: + return UnpersistedEventContextForBatched( + storage_controller=self._storage_controllers, + state_group_before_event=state_group_before_event, + state_group=None, + state_delta_due_to_event=delta_ids, prev_group=state_group_before_event, delta_ids=delta_ids, - current_state_ids=None, + partial_state=partial_state, + state_map=state_ids_before_event, ) - ) - return EventContext.with_state( - storage=self._storage_controllers, - state_group=state_group_after_event, - state_group_before_event=state_group_before_event, - state_delta_due_to_event=delta_ids, - prev_group=state_group_before_event, - delta_ids=delta_ids, - partial_state=partial_state, - ) + else: + return UnpersistedEventContext( + storage_controller=self._storage_controllers, + state_group_before_event=state_group_before_event, + state_group=None, + state_delta_due_to_event=delta_ids, + prev_group=state_group_before_event, + delta_ids=delta_ids, + partial_state=partial_state, + ) - async def compute_event_context_for_batched( + async def compute_event_context( self, event: EventBase, - state_ids_before_event: StateMap[str], - current_state_group: int, + state_ids_before_event: Optional[StateMap[str]] = None, + partial_state: Optional[bool] = None, + entry: Optional[_StateCacheEntry] = None, + current_state_group: Optional[int] = None, ) -> EventContext: - """ - Generate an event context for an event that has not yet been persisted to the - database. Intended for use with events that are created to be persisted in a batch. - Args: - event: the event the context is being computed for - state_ids_before_event: a state map consisting of the state ids of the events - created prior to this event. - current_state_group: the current state group before the event. - """ - state_group_before_event_prev_group = None - deltas_to_state_group_before_event = None - - state_group_before_event = current_state_group - - # if the event is not state, we are set - if not event.is_state(): - return EventContext.with_state( - storage=self._storage_controllers, - state_group_before_event=state_group_before_event, - state_group=state_group_before_event, - state_delta_due_to_event={}, - prev_group=state_group_before_event_prev_group, - delta_ids=deltas_to_state_group_before_event, - partial_state=False, - ) + """Build an EventContext structure for a non-outlier event. - # otherwise, we'll need to create a new state group for after the event - key = (event.type, event.state_key) + (for an outlier, call EventContext.for_outlier directly) - if state_ids_before_event is not None: - replaces = state_ids_before_event.get(key) + This works out what the current state should be for the event, and + generates a new state group if necessary. - if replaces and replaces != event.event_id: - event.unsigned["replaces_state"] = replaces + Args: + event: + state_ids_before_event: The event ids of the state before the event if + it can't be calculated from existing events. This is normally + only specified when receiving an event from federation where we + don't have the prev events, e.g. when backfilling. + partial_state: + `True` if `state_ids_before_event` is partial and omits non-critical + membership events. + `False` if `state_ids_before_event` is the full state. + `None` when `state_ids_before_event` is not provided. In this case, the + flag will be calculated based on `event`'s prev events. + entry: + A state cache entry for the resolved state across the prev events. We may + have already calculated this, so if it's available pass it in + Returns: + The event context. - delta_ids = {key: event.event_id} + Raises: + RuntimeError if `state_ids_before_event` is not provided and one or more + prev events are missing or outliers. + """ - state_group_after_event = ( - await self._state_storage_controller.store_state_group( - event.event_id, - event.room_id, - prev_group=state_group_before_event, - delta_ids=delta_ids, - current_state_ids=None, - ) + unpersisted_context = await self.calculate_context_info( + event=event, + state_ids_before_event=state_ids_before_event, + partial_state=partial_state, + entry=entry, + current_state_group=current_state_group, ) - return EventContext.with_state( - storage=self._storage_controllers, - state_group=state_group_after_event, - state_group_before_event=state_group_before_event, - state_delta_due_to_event=delta_ids, - prev_group=state_group_before_event, - delta_ids=delta_ids, - partial_state=False, - ) + return await unpersisted_context.persist(event) @measure_func() async def resolve_state_groups_for_events( From 7e494fc630fdb94a679431dd5b27b52976502536 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 9 Jan 2023 10:37:12 -0800 Subject: [PATCH 13/25] use calculate_context_info to create unpersisted event contexts --- synapse/handlers/message.py | 64 +++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c870b3039fc7..7035a5eaa9dd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -49,7 +49,7 @@ from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase, relation_from_event from synapse.events.builder import EventBuilder -from synapse.events.snapshot import EventContext, UnpersistedEventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.utils import maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler @@ -1073,7 +1073,7 @@ async def create_new_client_event( state_map: Optional[StateMap[str]] = None, for_batch: bool = False, current_state_group: Optional[int] = None, - ) -> Tuple[EventBase, UnpersistedEventContext]: + ) -> Tuple[EventBase, UnpersistedEventContextBase]: """Create a new event for a local client. If bool for_batch is true, will create an event using the prev_event_ids, and will create an event context for the event using the parameters state_map and current_state_group, thus these parameters @@ -1184,13 +1184,15 @@ async def create_new_client_event( prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth ) - context = UnpersistedEventContext( - self._storage_controllers, - state_map=state_map, - state_handler=self.state, - state_group_before_event=current_state_group, - for_batch=True, + context: UnpersistedEventContextBase = ( + await self.state.calculate_context_info( + event, + state_ids_before_event=state_map, + partial_state=False, + current_state_group=current_state_group, + ) ) + else: event = await builder.build( prev_event_ids=prev_event_ids, @@ -1202,7 +1204,7 @@ async def create_new_client_event( # after it is created if builder.internal_metadata.outlier: event.internal_metadata.outlier = True - context = UnpersistedEventContext.for_outlier(self._storage_controllers) + context = EventContext.for_outlier(self._storage_controllers) elif ( event.type == EventTypes.MSC2716_INSERTION and state_event_ids @@ -1240,11 +1242,12 @@ async def create_new_client_event( state_map_for_event[(data.event_type, data.state_key)] = state_id - context = UnpersistedEventContext( - self._storage_controllers, - state_map=state_map_for_event, - state_handler=self.state, + context = await self.state.calculate_context_info( + event, + state_ids_before_event=state_map_for_event, + partial_state=False, ) + else: entry = ( await self.hs.get_state_handler().resolve_state_groups_for_events( @@ -1254,23 +1257,26 @@ async def create_new_client_event( ) ) - if not entry.state_group: + if entry.state_group is None: state_before_event = await entry.get_state( self.state._state_storage_controller, StateFilter.all() ) - context = UnpersistedEventContext( - self._storage_controllers, - state_map=state_before_event, - state_handler=self.state, - state_entry=entry, - ) + if state_before_event: + context = await self.state.calculate_context_info( + event, + state_ids_before_event=state_before_event, + entry=entry, + partial_state=False, + ) + else: + context = await self.state.calculate_context_info( + event, entry=entry + ) + else: - context = UnpersistedEventContext( - self._storage_controllers, - state_group_before_event=entry.state_group, - state_handler=self.state, - state_entry=entry, + context = await self.state.calculate_context_info( + event, entry=entry, current_state_group=entry.state_group ) if requester: @@ -1290,7 +1296,7 @@ async def create_new_client_event( elif new_content is not None: # the third-party rules want to replace the event. We'll need to build a new # event. - event = await self._rebuild_event_after_third_party_rules( + event, context = await self._rebuild_event_after_third_party_rules( new_content, event ) @@ -2085,7 +2091,7 @@ def _expire_rooms_to_exclude_from_dummy_event_insertion(self) -> None: async def _rebuild_event_after_third_party_rules( self, third_party_result: dict, original_event: EventBase - ) -> EventBase: + ) -> Tuple[EventBase, UnpersistedEventContextBase]: # the third_party_event_rules want to replace the event. # we do some basic checks, and then return the replacement event. @@ -2139,4 +2145,6 @@ async def _rebuild_event_after_third_party_rules( auth_event_ids=None, ) - return event + context = await self.state.calculate_context_info(event) + + return event, context From 61c8de0756ab547b6807b9bdecf2e4e648fad394 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 9 Jan 2023 10:37:35 -0800 Subject: [PATCH 14/25] update typing --- synapse/events/third_party_rules.py | 6 ++++-- synapse/handlers/federation.py | 6 +++--- synapse/handlers/federation_event.py | 6 ++++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 2fac1593e3a7..252985c98bf0 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -18,7 +18,7 @@ from synapse.api.errors import ModuleFailedException, SynapseError from synapse.events import EventBase -from synapse.events.snapshot import EventContext, UnpersistedEventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.storage.roommember import ProfileInfo from synapse.types import Requester, StateMap from synapse.util.async_helpers import delay_cancellation, maybe_awaitable @@ -231,7 +231,9 @@ def register_third_party_rules_callbacks( self._on_threepid_bind_callbacks.append(on_threepid_bind) async def check_event_allowed( - self, event: EventBase, context: Union[EventContext, UnpersistedEventContext] + self, + event: EventBase, + context: Union[EventContext, UnpersistedEventContextBase], ) -> Tuple[bool, Optional[dict]]: """Check if a provided event should be allowed in the given context. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ce73ef550b5f..0aa1453aad83 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -56,7 +56,7 @@ from synapse.crypto.event_signing import compute_event_signature from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase -from synapse.events.snapshot import EventContext, UnpersistedEventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.validator import EventValidator from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict @@ -1446,8 +1446,8 @@ async def add_display_name_to_third_party_invite( room_version_obj: RoomVersion, event_dict: JsonDict, event: EventBase, - context: UnpersistedEventContext, - ) -> Tuple[EventBase, UnpersistedEventContext]: + context: UnpersistedEventContextBase, + ) -> Tuple[EventBase, UnpersistedEventContextBase]: key = ( EventTypes.ThirdPartyInvite, event.content["third_party_invite"]["signed"]["token"], diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index d2d4971a5a1c..d7dee02ab309 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,7 +59,7 @@ validate_event_for_room_version, ) from synapse.events import EventBase -from synapse.events.snapshot import EventContext, UnpersistedEventContext +from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import ( @@ -426,7 +426,9 @@ async def on_send_membership_event( return event, context async def check_join_restrictions( - self, context: Union[EventContext, UnpersistedEventContext], event: EventBase + self, + context: Union[EventContext, UnpersistedEventContextBase], + event: EventBase, ) -> None: """Check that restrictions in restricted join rules are matched From f7eafcff5b1f3a0495f8ab0dd9faae37f75c77c5 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 10 Jan 2023 13:51:54 -0800 Subject: [PATCH 15/25] $%#^&* --- synapse/handlers/federation.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 581ec0ff3999..8f8cd0efda19 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1362,7 +1362,10 @@ async def exchange_third_party_invite( builder=builder ) - event, unpersisted_context = await self.add_display_name_to_third_party_invite( + ( + event, + unpersisted_context, + ) = await self.add_display_name_to_third_party_invite( room_version_obj, event_dict, event, unpersisted_context ) @@ -1429,7 +1432,6 @@ async def on_exchange_third_party_invite_request( room_version_obj, event_dict ) - # Try several times, it could fail with PartialStateConflictError # in send_membership_event, cf comment in except block. max_retries = 5 @@ -1441,7 +1443,7 @@ async def on_exchange_third_party_invite_request( ) = await self.event_creation_handler.create_new_client_event( builder=builder ) - event, context = await self.add_display_name_to_third_party_invite( + event, unpersisted_context = await self.add_display_name_to_third_party_invite( room_version_obj, event_dict, event, unpersisted_context ) From 0402917d9538bbf2742b45199279547844da7042 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 10 Jan 2023 14:04:40 -0800 Subject: [PATCH 16/25] black --- synapse/handlers/federation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8f8cd0efda19..b076a0ca8db5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1443,7 +1443,10 @@ async def on_exchange_third_party_invite_request( ) = await self.event_creation_handler.create_new_client_event( builder=builder ) - event, unpersisted_context = await self.add_display_name_to_third_party_invite( + ( + event, + unpersisted_context, + ) = await self.add_display_name_to_third_party_invite( room_version_obj, event_dict, event, unpersisted_context ) From 373e3317b53dd11c1a4de83fcf79aca324a47d9a Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 11 Jan 2023 20:29:45 -0800 Subject: [PATCH 17/25] fix comments and consolidate classes, use attr.s for class --- synapse/events/snapshot.py | 212 +++++++++++++------------------------ 1 file changed, 71 insertions(+), 141 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a98959939a3f..a4d0bdd3de7b 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -29,6 +29,18 @@ class UnpersistedEventContextBase(ABC): + """ + This is a base class for EventContext and UnpersistedEventContext, objects which + hold information relevant to storing an associated event. Note that an + UnpersistedEventContexts must be converted into an EventContext before it is + suitable to send to the db with its associated event. + + Attributes: + _storage: storage controllers for interfacing with the database + app_service: If the associated event is being sent by a (local) application service, that + app service. + """ + def __init__(self, storage_controller: "StorageControllers"): self._storage: "StorageControllers" = storage_controller self.app_service: Optional[ApplicationService] = None @@ -38,12 +50,24 @@ async def persist( self, event: EventBase, ) -> "EventContext": + """ + A method to convert an UnpersistedEventContext to an EventContext, suitable for + sending to the database with the associated event. + """ pass @abstractmethod async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None ) -> StateMap[str]: + """ + Gets the room state at the event (ie not including the event if the event is a + state event). + + Args: + state_filter: specifies the type of state event to fetch from DB, example: + EventTypes.JoinRules + """ pass @@ -98,9 +122,6 @@ class EventContext(UnpersistedEventContextBase): delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` and ``state_group``. - app_service: If this event is being sent by a (local) application service, that - app service. - partial_state: if True, we may be storing this event with a temporary, incomplete state. """ @@ -277,116 +298,7 @@ async def get_prev_state_ids( self.state_group_before_event, state_filter ) - -class UnpersistedEventContextForBatched(UnpersistedEventContextBase): - """ - This is a version of an EventContext before the new state group (if any) has been - computed and stored. It contains information about the state before the event (which - also may be the information after the event, if the event is not a state event). The - UnpersistedEventContext must be converted into an EventContext by calling the method - 'persist' on it before it is suitable to be sent to the DB for processing. - - state_group_before_event: - the state group at/before the event. This is required if `for_batch` is True. - - state_group: - this will always be None until it is persisted - - state_group_before_event: The ID of the state group representing the state - of the room before this event. - - state_delta_due_to_event: If the event is a state event, then this is the - delta of the state between `state_group` and `state_group_before_event` - - prev_group: If it is known, ``state_group``'s prev_group. - - If the event is a state event, this is normally the same as - ``state_group_before_event``. - - delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` - and ``state_group``. - - state_map: A map of the current state of the room - """ - - def __init__( - self, - storage_controller: "StorageControllers", - state_group: Optional[int], - state_delta_due_to_event: dict, - prev_group: Optional[int], - delta_ids: Optional[StateMap[str]], - partial_state: bool, - state_map: StateMap[str], - state_group_before_event: int, - ) -> None: - super().__init__(storage_controller) - self.state_group = state_group - self.state_delta_due_to_event = state_delta_due_to_event - self.prev_group = prev_group - self.delta_ids = delta_ids - self.partial_state = partial_state - self.state_map = state_map - self.state_group_before_event = state_group_before_event - - async def get_prev_state_ids( - self, state_filter: Optional["StateFilter"] = None - ) -> StateMap[str]: - """ - Gets the room state map, excluding this event. - - Args: - state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules - - Returns: - Maps a (type, state_key) to the event ID of the state event matching - this tuple. - """ - return self.state_map - - async def persist(self, event: EventBase) -> EventContext: - """ - If the event is a state event, calculates the current state group for the context, - stores it, and returns a EventContext. If the event is not state, returns - an EventContext. - - Args: - event: event that the EventContext is associated with. - - Returns: An EventContext suitable for sending to the database with the event - for persisting - """ - assert self.partial_state is not None - if not event.is_state(): - return EventContext.with_state( - storage=self._storage, - state_group=self.state_group_before_event, - state_group_before_event=self.state_group_before_event, - state_delta_due_to_event=self.state_delta_due_to_event, - partial_state=self.partial_state, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - ) - else: - state_group_after_event = await self._storage.state.store_state_group( - event.event_id, - event.room_id, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - current_state_ids=None, - ) - - return EventContext.with_state( - storage=self._storage, - state_group=state_group_after_event, - state_group_before_event=self.state_group_before_event, - state_delta_due_to_event=self.state_delta_due_to_event, - partial_state=self.partial_state, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - ) - - +@attr.s(slots=True, auto_attribs=True) class UnpersistedEventContext(UnpersistedEventContextBase): """ This is a version of an EventContext before the new state group (if any) has been @@ -415,25 +327,18 @@ class UnpersistedEventContext(UnpersistedEventContextBase): and ``state_group``. partial_state: Whether the event has partial state. + + state_map: A map of the state before the event, i.e. the state at `state_group_before_event` """ - def __init__( - self, - storage_controller: "StorageControllers", - state_group_before_event: Optional[int], - state_group: Optional[int], - state_delta_due_to_event: Optional[dict], - prev_group: Optional[int], - delta_ids: Optional[StateMap[str]], - partial_state: Optional[bool], - ): - super().__init__(storage_controller) - self.state_group_before_event = state_group_before_event - self.state_group = state_group - self.state_delta_due_to_event = state_delta_due_to_event - self.prev_group = prev_group - self.delta_ids = delta_ids - self.partial_state = partial_state + _storage: "StorageControllers" + state_group_before_event: Optional[int] + state_group: Optional[int] + state_delta_due_to_event: Optional[dict] + prev_group: Optional[int] + delta_ids: Optional[StateMap[str]] + partial_state: Optional[bool] + state_map: Optional[StateMap[str]] = None async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None @@ -442,12 +347,14 @@ async def get_prev_state_ids( Gets the room state map, excluding this event. Args: - state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules + state_filter: specifies the type of state event to fetch from DB Returns: Maps a (type, state_key) to the event ID of the state event matching this tuple. """ + if self.state_map: + return self.state_map assert self.state_group_before_event is not None return await self._storage.state.get_state_ids_for_group( @@ -456,9 +363,8 @@ async def get_prev_state_ids( async def persist(self, event: EventBase) -> EventContext: """ - If the event is a state event, calculates the current state group for the context, - stores it, and returns a EventContext. If the event is not state, returns - an EventContext. + Creates a full `EventContext` for the event, persisting any referenced state that + has not yet been persisted. Args: event: event that the EventContext is associated with. @@ -467,6 +373,19 @@ async def persist(self, event: EventBase) -> EventContext: for persisting """ assert self.partial_state is not None + # If we have a full set of state for at/before the event but don't have a state + # group for that state, we need to get one + if self.state_map: + if self.state_group_before_event is None: + state_group_before_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=None, + delta_ids=None, + current_state_ids=self.state_map, + ) + self.state_group_before_event = state_group_before_event + self.prev_group = state_group_before_event if not event.is_state(): return EventContext.with_state( storage=self._storage, @@ -478,13 +397,24 @@ async def persist(self, event: EventBase) -> EventContext: delta_ids=self.delta_ids, ) else: - state_group_after_event = await self._storage.state.store_state_group( - event.event_id, - event.room_id, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - current_state_ids=None, - ) + if not self.prev_group and not self.delta_ids: + # if we don't have a prev_group and the delta from the prev_group we need + # to store the state group with the full state + state_group_after_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + current_state_ids=self.state_map, + ) + else: + state_group_after_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + current_state_ids=None, + ) return EventContext.with_state( storage=self._storage, From 408fa21afc1b1215db409d30ef2ba045e107e25f Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 11 Jan 2023 20:30:08 -0800 Subject: [PATCH 18/25] requested changes --- synapse/handlers/federation_event.py | 2 +- synapse/handlers/message.py | 35 ++----- synapse/state/__init__.py | 132 ++++++++++----------------- 3 files changed, 53 insertions(+), 116 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 91c51be8d7c1..3e9afedaac4a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -427,7 +427,7 @@ async def on_send_membership_event( async def check_join_restrictions( self, - context: Union[EventContext, UnpersistedEventContextBase], + context: UnpersistedEventContextBase, event: EventBase, ) -> None: """Check that restrictions in restricted join rules are matched diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3459c48bdfb0..0f510e29039c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1254,6 +1254,9 @@ async def create_new_client_event( state_map_for_event[(data.event_type, data.state_key)] = state_id + # TODO(faster_joins): check how MSC2716 works and whether we can have + # partial state here + # https://github.com/matrix-org/synapse/issues/13003 context = await self.state.calculate_context_info( event, state_ids_before_event=state_map_for_event, @@ -1261,35 +1264,7 @@ async def create_new_client_event( ) else: - entry = ( - await self.hs.get_state_handler().resolve_state_groups_for_events( - event.room_id, - event.prev_event_ids(), - await_full_state=False, - ) - ) - - if entry.state_group is None: - state_before_event = await entry.get_state( - self.state._state_storage_controller, StateFilter.all() - ) - - if state_before_event: - context = await self.state.calculate_context_info( - event, - state_ids_before_event=state_before_event, - entry=entry, - partial_state=False, - ) - else: - context = await self.state.calculate_context_info( - event, entry=entry - ) - - else: - context = await self.state.calculate_context_info( - event, entry=entry, current_state_group=entry.state_group - ) + context = await self.state.calculate_context_info(event) if requester: context.app_service = requester.app_service @@ -2159,6 +2134,8 @@ async def _rebuild_event_after_third_party_rules( auth_event_ids=None, ) + # we rebuild the event context, to be on the safe side. If nothing else, + # delta_ids might need an update. context = await self.state.calculate_context_info(event) return event, context diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 411a981d8c2a..10cc5fbb19f6 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -43,7 +43,6 @@ EventContext, UnpersistedEventContext, UnpersistedEventContextBase, - UnpersistedEventContextForBatched, ) from synapse.logging.context import ContextResourceUsage from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet @@ -272,33 +271,32 @@ async def calculate_context_info( event: EventBase, state_ids_before_event: Optional[StateMap[str]] = None, partial_state: Optional[bool] = None, - entry: Optional[_StateCacheEntry] = None, current_state_group: Optional[int] = None, ) -> UnpersistedEventContextBase: """ Calulates the contents of an unpersisted event context, other than the current state group (which is either provided or calculated when the event context is persisted) - state_ids_before_event: The event ids of the full state before the event if - it can't be calculated from existing events. This is normally - only specified when receiving an event from federation where we - don't have the prev events, e.g. when backfilling or when the event - is being created for batch persisting. - partial_state: - `True` if `state_ids_before_event` is partial and omits non-critical - membership events. - `False` if `state_ids_before_event` is the full state. - `None` when `state_ids_before_event` is not provided. In this case, the - flag will be calculated based on `event`'s prev events. - entry: - A state cache entry for the resolved state across the prev events. We may - have already calculated this, so if it's available pass it in - Returns: - The event context. - - Raises: - RuntimeError if `state_ids_before_event` is not provided and one or more - prev events are missing or outliers. + state_ids_before_event: + The event ids of the full state before the event if + it can't be calculated from existing events. This is normally + only specified when receiving an event from federation where we + don't have the prev events, e.g. when backfilling or when the event + is being created for batch persisting. + partial_state: + `True` if `state_ids_before_event` is partial and omits non-critical + membership events. + `False` if `state_ids_before_event` is the full state. + `None` when `state_ids_before_event` is not provided. In this case, the + flag will be calculated based on `event`'s prev events. + current_state_group: + the current state group at the time of event, if known + Returns: + The event context. + + Raises: + RuntimeError if `state_ids_before_event` is not provided and one or more + prev events are missing or outliers. """ assert not event.internal_metadata.is_outlier() @@ -313,15 +311,7 @@ async def calculate_context_info( # .. though we need to get a state group for it if we don't have it if not current_state_group: - state_group_before_event = ( - await self._state_storage_controller.store_state_group( - event.event_id, - event.room_id, - prev_group=None, - delta_ids=None, - current_state_ids=state_ids_before_event, - ) - ) + state_group_before_event = None else: state_group_before_event = current_state_group @@ -354,12 +344,11 @@ async def calculate_context_info( # we've already taken into account partial state, so no need to wait for # complete state here. - if not entry: - entry = await self.resolve_state_groups_for_events( - event.room_id, - event.prev_event_ids(), - await_full_state=False, - ) + entry = await self.resolve_state_groups_for_events( + event.room_id, + event.prev_event_ids(), + await_full_state=False, + ) state_group_before_event_prev_group = entry.prev_group deltas_to_state_group_before_event = entry.delta_ids @@ -393,28 +382,16 @@ async def calculate_context_info( # if not event.is_state(): - if state_ids_before_event: - return UnpersistedEventContextForBatched( - storage_controller=self._storage_controllers, - state_group_before_event=state_group_before_event, - state_group=state_group_before_event, - state_delta_due_to_event={}, - prev_group=state_group_before_event_prev_group, - delta_ids=deltas_to_state_group_before_event, - partial_state=partial_state, - state_map=state_ids_before_event, - ) - - else: - return UnpersistedEventContext( - storage_controller=self._storage_controllers, - state_group_before_event=state_group_before_event, - state_group=state_group_before_event, - state_delta_due_to_event={}, - prev_group=state_group_before_event_prev_group, - delta_ids=deltas_to_state_group_before_event, - partial_state=partial_state, - ) + return UnpersistedEventContext( + storage=self._storage_controllers, + state_group_before_event=state_group_before_event, + state_group=state_group_before_event, + state_delta_due_to_event={}, + prev_group=state_group_before_event_prev_group, + delta_ids=deltas_to_state_group_before_event, + partial_state=partial_state, + state_map=state_ids_before_event, + ) # # otherwise, we'll need to create a new state group for after the event @@ -425,7 +402,6 @@ async def calculate_context_info( if state_ids_before_event is not None: replaces = state_ids_before_event.get(key) else: - assert entry is not None replaces_state_map = await entry.get_state( self._state_storage_controller, StateFilter.from_types([key]) ) @@ -436,36 +412,22 @@ async def calculate_context_info( delta_ids = {key: event.event_id} - if state_ids_before_event: - return UnpersistedEventContextForBatched( - storage_controller=self._storage_controllers, - state_group_before_event=state_group_before_event, - state_group=None, - state_delta_due_to_event=delta_ids, - prev_group=state_group_before_event, - delta_ids=delta_ids, - partial_state=partial_state, - state_map=state_ids_before_event, - ) - - else: - return UnpersistedEventContext( - storage_controller=self._storage_controllers, - state_group_before_event=state_group_before_event, - state_group=None, - state_delta_due_to_event=delta_ids, - prev_group=state_group_before_event, - delta_ids=delta_ids, - partial_state=partial_state, - ) + return UnpersistedEventContext( + storage=self._storage_controllers, + state_group_before_event=state_group_before_event, + state_group=None, + state_delta_due_to_event=delta_ids, + prev_group=state_group_before_event, + delta_ids=delta_ids, + partial_state=partial_state, + state_map=state_ids_before_event, + ) async def compute_event_context( self, event: EventBase, state_ids_before_event: Optional[StateMap[str]] = None, partial_state: Optional[bool] = None, - entry: Optional[_StateCacheEntry] = None, - current_state_group: Optional[int] = None, ) -> EventContext: """Build an EventContext structure for a non-outlier event. @@ -501,8 +463,6 @@ async def compute_event_context( event=event, state_ids_before_event=state_ids_before_event, partial_state=partial_state, - entry=entry, - current_state_group=current_state_group, ) return await unpersisted_context.persist(event) From 345d7462640a9daf7c71d12535b08cb9499a36ae Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 11 Jan 2023 20:47:36 -0800 Subject: [PATCH 19/25] lint --- synapse/events/snapshot.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a4d0bdd3de7b..b108c42ba188 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -298,6 +298,7 @@ async def get_prev_state_ids( self.state_group_before_event, state_filter ) + @attr.s(slots=True, auto_attribs=True) class UnpersistedEventContext(UnpersistedEventContextBase): """ From cdce9a22de3da1a0162fb37292b61ba759395cd1 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 19 Jan 2023 19:34:36 -0800 Subject: [PATCH 20/25] requested changes --- synapse/events/snapshot.py | 36 +++++++++++++--------------- synapse/events/third_party_rules.py | 6 ++--- synapse/handlers/federation_event.py | 1 - synapse/state/__init__.py | 11 +++------ 4 files changed, 23 insertions(+), 31 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index b108c42ba188..ba6848a2aa23 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -17,7 +17,6 @@ import attr from frozendict import frozendict -# from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService from synapse.events import EventBase from synapse.types import JsonDict, StateMap @@ -329,7 +328,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase): partial_state: Whether the event has partial state. - state_map: A map of the state before the event, i.e. the state at `state_group_before_event` + state_map_before_event: A map of the state before the event, i.e. the state at `state_group_before_event` """ _storage: "StorageControllers" @@ -339,7 +338,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase): prev_group: Optional[int] delta_ids: Optional[StateMap[str]] partial_state: Optional[bool] - state_map: Optional[StateMap[str]] = None + state_map_before_event: Optional[StateMap[str]] = None async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None @@ -354,8 +353,8 @@ async def get_prev_state_ids( Maps a (type, state_key) to the event ID of the state event matching this tuple. """ - if self.state_map: - return self.state_map + if self.state_map_before_event: + return self.state_map_before_event assert self.state_group_before_event is not None return await self._storage.state.get_state_ids_for_group( @@ -376,17 +375,16 @@ async def persist(self, event: EventBase) -> EventContext: assert self.partial_state is not None # If we have a full set of state for at/before the event but don't have a state # group for that state, we need to get one - if self.state_map: - if self.state_group_before_event is None: - state_group_before_event = await self._storage.state.store_state_group( - event.event_id, - event.room_id, - prev_group=None, - delta_ids=None, - current_state_ids=self.state_map, - ) - self.state_group_before_event = state_group_before_event - self.prev_group = state_group_before_event + if self.state_map_before_event and self.state_group_before_event is None: + state_group_before_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + current_state_ids=self.state_map_before_event, + ) + self.state_group_before_event = state_group_before_event + self.prev_group = state_group_before_event if not event.is_state(): return EventContext.with_state( storage=self._storage, @@ -404,9 +402,9 @@ async def persist(self, event: EventBase) -> EventContext: state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - current_state_ids=self.state_map, + prev_group=self.state_group_before_event, + delta_ids=self.state_delta_due_to_event, + current_state_ids=self.state_map_before_event, ) else: state_group_after_event = await self._storage.state.store_state_group( diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 252985c98bf0..97c61cc2586b 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple from twisted.internet.defer import CancelledError from synapse.api.errors import ModuleFailedException, SynapseError from synapse.events import EventBase -from synapse.events.snapshot import EventContext, UnpersistedEventContextBase +from synapse.events.snapshot import UnpersistedEventContextBase from synapse.storage.roommember import ProfileInfo from synapse.types import Requester, StateMap from synapse.util.async_helpers import delay_cancellation, maybe_awaitable @@ -233,7 +233,7 @@ def register_third_party_rules_callbacks( async def check_event_allowed( self, event: EventBase, - context: Union[EventContext, UnpersistedEventContextBase], + context: UnpersistedEventContextBase, ) -> Tuple[bool, Optional[dict]]: """Check if a provided event should be allowed in the given context. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 1de23745e64f..9fa5167696fc 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -27,7 +27,6 @@ Sequence, Set, Tuple, - Union, ) from prometheus_client import Counter, Histogram diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 10cc5fbb19f6..f15d54aa8a33 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -308,12 +308,7 @@ async def calculate_context_info( # if we're given the state before the event, then we use that state_group_before_event_prev_group = None deltas_to_state_group_before_event = None - - # .. though we need to get a state group for it if we don't have it - if not current_state_group: - state_group_before_event = None - else: - state_group_before_event = current_state_group + state_group_before_event = current_state_group # the partial_state flag must be provided assert partial_state is not None @@ -390,7 +385,7 @@ async def calculate_context_info( prev_group=state_group_before_event_prev_group, delta_ids=deltas_to_state_group_before_event, partial_state=partial_state, - state_map=state_ids_before_event, + state_map_before_event=state_ids_before_event, ) # @@ -420,7 +415,7 @@ async def calculate_context_info( prev_group=state_group_before_event, delta_ids=delta_ids, partial_state=partial_state, - state_map=state_ids_before_event, + state_map_before_event=state_ids_before_event, ) async def compute_event_context( From b69a87ca658d1c49ad0f2ee7ff805c27f2c952b4 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 23 Jan 2023 12:46:48 -0800 Subject: [PATCH 21/25] requested changes --- synapse/events/snapshot.py | 73 ++++++++++++++++++------------------- synapse/handlers/message.py | 2 +- synapse/state/__init__.py | 5 +-- 3 files changed, 39 insertions(+), 41 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ba6848a2aa23..60d30cc71684 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -307,10 +307,10 @@ class UnpersistedEventContext(UnpersistedEventContextBase): UnpersistedEventContext must be converted into an EventContext by calling the method 'persist' on it before it is suitable to be sent to the DB for processing. - state_group_before_event: - the state group at/before the event. This is required if `for_batch` is True. - state_group: - this will always be None until it is persisted + state_group_after_event: + this will always be None until it is persisted. The state group after the + event. If the event is not a state event, this will be the same as + state_group_before_event. state_group_before_event: The ID of the state group representing the state of the room before this event. @@ -318,17 +318,18 @@ class UnpersistedEventContext(UnpersistedEventContextBase): state_delta_due_to_event: If the event is a state event, then this is the delta of the state between `state_group` and `state_group_before_event` - prev_group: If it is known, ``state_group``'s prev_group. + prev_group_to_state_group_after_event: If it is known, ``state_group_after_event``'s prev_group. If the event is a state event, this is normally the same as ``state_group_before_event``. - delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` - and ``state_group``. + delta_ids_to_state_group_before_event: If ``prev_group`` is not None, the state delta between ``prev_group`` + and ``state_group_after_event``. partial_state: Whether the event has partial state. - state_map_before_event: A map of the state before the event, i.e. the state at `state_group_before_event` + state_map_before_event: A map of the state before the event, i.e. the state at + `state_group_before_event` """ _storage: "StorageControllers" @@ -373,9 +374,11 @@ async def persist(self, event: EventBase) -> EventContext: for persisting """ assert self.partial_state is not None + # If we have a full set of state for at/before the event but don't have a state # group for that state, we need to get one - if self.state_map_before_event and self.state_group_before_event is None: + if self.state_group_before_event is None: + assert self.state_map_before_event state_group_before_event = await self._storage.state.store_state_group( event.event_id, event.room_id, @@ -384,46 +387,42 @@ async def persist(self, event: EventBase) -> EventContext: current_state_ids=self.state_map_before_event, ) self.state_group_before_event = state_group_before_event - self.prev_group = state_group_before_event - if not event.is_state(): - return EventContext.with_state( - storage=self._storage, - state_group=self.state_group_before_event, - state_group_before_event=self.state_group_before_event, - state_delta_due_to_event=self.state_delta_due_to_event, - partial_state=self.partial_state, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - ) + + # if the event isn't a state event the state group doesn't change + if not self.state_delta_due_to_event: + state_group_after_event = self.state_group_before_event + # if it is a state event we need to get a state group for it, either we have the + # full current state or we have a prev group and delta ids else: - if not self.prev_group and not self.delta_ids: - # if we don't have a prev_group and the delta from the prev_group we need - # to store the state group with the full state + if self.prev_group is not None: state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, - prev_group=self.state_group_before_event, - delta_ids=self.state_delta_due_to_event, - current_state_ids=self.state_map_before_event, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + current_state_ids=None, ) else: + assert self.state_map_before_event is not None + self.state_map_before_event.update(self.state_delta_due_to_event) state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, prev_group=self.prev_group, delta_ids=self.delta_ids, - current_state_ids=None, + current_state_ids=self.state_map_before_event, ) - - return EventContext.with_state( - storage=self._storage, - state_group=state_group_after_event, - state_group_before_event=self.state_group_before_event, - state_delta_due_to_event=self.state_delta_due_to_event, - partial_state=self.partial_state, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - ) + self.prev_group = self.state_group_before_event + + return EventContext.with_state( + storage=self._storage, + state_group=state_group_after_event, + state_group_before_event=self.state_group_before_event, + state_delta_due_to_event=self.state_delta_due_to_event, + partial_state=self.partial_state, + prev_group=self.prev_group, + delta_ids=self.delta_ids, + ) def _encode_state_dict( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e2c493483190..88f8fc56f20c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1201,7 +1201,7 @@ async def create_new_client_event( event, state_ids_before_event=state_map, partial_state=False, - current_state_group=current_state_group, + state_group_before_event=current_state_group, ) ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index f15d54aa8a33..bb960f0bdc39 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -271,7 +271,7 @@ async def calculate_context_info( event: EventBase, state_ids_before_event: Optional[StateMap[str]] = None, partial_state: Optional[bool] = None, - current_state_group: Optional[int] = None, + state_group_before_event: Optional[int] = None, ) -> UnpersistedEventContextBase: """ Calulates the contents of an unpersisted event context, other than the current @@ -289,7 +289,7 @@ async def calculate_context_info( `False` if `state_ids_before_event` is the full state. `None` when `state_ids_before_event` is not provided. In this case, the flag will be calculated based on `event`'s prev events. - current_state_group: + state_group_before_event: the current state group at the time of event, if known Returns: The event context. @@ -308,7 +308,6 @@ async def calculate_context_info( # if we're given the state before the event, then we use that state_group_before_event_prev_group = None deltas_to_state_group_before_event = None - state_group_before_event = current_state_group # the partial_state flag must be provided assert partial_state is not None From 8830c94a02c9580a007b2d7d671fbdd5b238d0fb Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 25 Jan 2023 16:51:04 -0800 Subject: [PATCH 22/25] refactor to be stupidly explicit --- synapse/events/snapshot.py | 68 +++++++++++++++++++++++--------------- synapse/state/__init__.py | 12 +++---- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 60d30cc71684..f339883d62db 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -301,6 +301,12 @@ async def get_prev_state_ids( @attr.s(slots=True, auto_attribs=True) class UnpersistedEventContext(UnpersistedEventContextBase): """ + The event context holds information about the state groups for an event. It is important + to remember that an event technically has two state groups: the state group before the + event, and the state group after the event. If the event is not a state event, the state + group will not change (ie the state group before the event will be the same as the state + group after the event), but if it is a state event the state group before the event + will differ from the state group after the event. This is a version of an EventContext before the new state group (if any) has been computed and stored. It contains information about the state before the event (which also may be the information after the event, if the event is not a state event). The @@ -308,36 +314,40 @@ class UnpersistedEventContext(UnpersistedEventContextBase): 'persist' on it before it is suitable to be sent to the DB for processing. state_group_after_event: - this will always be None until it is persisted. The state group after the - event. If the event is not a state event, this will be the same as + The state group after the event. This will always be None until it is persisted. + If the event is not a state event, this will be the same as state_group_before_event. - state_group_before_event: The ID of the state group representing the state - of the room before this event. + state_group_before_event: + The ID of the state group representing the state of the room before this event. - state_delta_due_to_event: If the event is a state event, then this is the - delta of the state between `state_group` and `state_group_before_event` + state_delta_due_to_event: + If the event is a state event, then this is the delta of the state between + `state_group` and `state_group_before_event` - prev_group_to_state_group_after_event: If it is known, ``state_group_after_event``'s prev_group. + prev_group_for_state_group_after_event: + If it is known, ``state_group_after_event``'s previous state group. If the event is a state event, this is normally the same as ``state_group_before_event``. - delta_ids_to_state_group_before_event: If ``prev_group`` is not None, the state delta between ``prev_group`` - and ``state_group_after_event``. + delta_ids_to_state_group_after_event: + If ``prev_group_for_state_group_after_event`` is not None, the state delta + between ``prev_group_for_state_group_after_event`` and ``state_group_after_event``. - partial_state: Whether the event has partial state. + partial_state: + Whether the event has partial state. - state_map_before_event: A map of the state before the event, i.e. the state at - `state_group_before_event` + state_map_before_event: + A map of the state before the event, i.e. the state at `state_group_before_event` """ _storage: "StorageControllers" state_group_before_event: Optional[int] - state_group: Optional[int] + state_group_after_event: Optional[int] state_delta_due_to_event: Optional[dict] - prev_group: Optional[int] - delta_ids: Optional[StateMap[str]] + prev_group_for_state_group_after_event: Optional[int] + delta_ids_to_state_group_after_event: Optional[StateMap[str]] partial_state: Optional[bool] state_map_before_event: Optional[StateMap[str]] = None @@ -382,8 +392,8 @@ async def persist(self, event: EventBase) -> EventContext: state_group_before_event = await self._storage.state.store_state_group( event.event_id, event.room_id, - prev_group=self.prev_group, - delta_ids=self.delta_ids, + prev_group=self.prev_group_for_state_group_after_event, + delta_ids=self.delta_ids_to_state_group_after_event, current_state_ids=self.state_map_before_event, ) self.state_group_before_event = state_group_before_event @@ -394,25 +404,29 @@ async def persist(self, event: EventBase) -> EventContext: # if it is a state event we need to get a state group for it, either we have the # full current state or we have a prev group and delta ids else: - if self.prev_group is not None: + if self.prev_group_for_state_group_after_event is not None: state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, - prev_group=self.prev_group, - delta_ids=self.delta_ids, + prev_group=self.prev_group_for_state_group_after_event, + delta_ids=self.delta_ids_to_state_group_after_event, current_state_ids=None, ) else: assert self.state_map_before_event is not None - self.state_map_before_event.update(self.state_delta_due_to_event) + state = dict(self.state_map_before_event) + state.update(self.state_delta_due_to_event) + state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, - prev_group=self.prev_group, - delta_ids=self.delta_ids, - current_state_ids=self.state_map_before_event, + prev_group=self.prev_group_for_state_group_after_event, + delta_ids=self.delta_ids_to_state_group_after_event, + current_state_ids=state, + ) + self.prev_group_for_state_group_after_event = ( + self.state_group_before_event ) - self.prev_group = self.state_group_before_event return EventContext.with_state( storage=self._storage, @@ -420,8 +434,8 @@ async def persist(self, event: EventBase) -> EventContext: state_group_before_event=self.state_group_before_event, state_delta_due_to_event=self.state_delta_due_to_event, partial_state=self.partial_state, - prev_group=self.prev_group, - delta_ids=self.delta_ids, + prev_group=self.prev_group_for_state_group_after_event, + delta_ids=self.delta_ids_to_state_group_after_event, ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index bb960f0bdc39..fc4fbe1b481b 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -379,10 +379,10 @@ async def calculate_context_info( return UnpersistedEventContext( storage=self._storage_controllers, state_group_before_event=state_group_before_event, - state_group=state_group_before_event, + state_group_after_event=state_group_before_event, state_delta_due_to_event={}, - prev_group=state_group_before_event_prev_group, - delta_ids=deltas_to_state_group_before_event, + prev_group_for_state_group_after_event=state_group_before_event_prev_group, + delta_ids_to_state_group_after_event=deltas_to_state_group_before_event, partial_state=partial_state, state_map_before_event=state_ids_before_event, ) @@ -409,10 +409,10 @@ async def calculate_context_info( return UnpersistedEventContext( storage=self._storage_controllers, state_group_before_event=state_group_before_event, - state_group=None, + state_group_after_event=None, state_delta_due_to_event=delta_ids, - prev_group=state_group_before_event, - delta_ids=delta_ids, + prev_group_for_state_group_after_event=state_group_before_event, + delta_ids_to_state_group_after_event=delta_ids, partial_state=partial_state, state_map_before_event=state_ids_before_event, ) From 3614ed05a829b12470a257477400bd052d3828ec Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 8 Feb 2023 10:21:17 -0800 Subject: [PATCH 23/25] clearer renaming and flow --- synapse/events/snapshot.py | 48 +++++++++++++------------------------- synapse/state/__init__.py | 10 ++++---- 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f339883d62db..0343e226d684 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -346,8 +346,8 @@ class UnpersistedEventContext(UnpersistedEventContextBase): state_group_before_event: Optional[int] state_group_after_event: Optional[int] state_delta_due_to_event: Optional[dict] - prev_group_for_state_group_after_event: Optional[int] - delta_ids_to_state_group_after_event: Optional[StateMap[str]] + prev_group_for_state_group_before_event: Optional[int] + delta_ids_to_state_group_before_event: Optional[StateMap[str]] partial_state: Optional[bool] state_map_before_event: Optional[StateMap[str]] = None @@ -385,15 +385,15 @@ async def persist(self, event: EventBase) -> EventContext: """ assert self.partial_state is not None - # If we have a full set of state for at/before the event but don't have a state + # If we have a full set of state for before the event but don't have a state # group for that state, we need to get one if self.state_group_before_event is None: assert self.state_map_before_event state_group_before_event = await self._storage.state.store_state_group( event.event_id, event.room_id, - prev_group=self.prev_group_for_state_group_after_event, - delta_ids=self.delta_ids_to_state_group_after_event, + prev_group=self.prev_group_for_state_group_before_event, + delta_ids=self.delta_ids_to_state_group_before_event, current_state_ids=self.state_map_before_event, ) self.state_group_before_event = state_group_before_event @@ -401,32 +401,16 @@ async def persist(self, event: EventBase) -> EventContext: # if the event isn't a state event the state group doesn't change if not self.state_delta_due_to_event: state_group_after_event = self.state_group_before_event - # if it is a state event we need to get a state group for it, either we have the - # full current state or we have a prev group and delta ids + + # otherwise if it is a state event we need to get a state group for it else: - if self.prev_group_for_state_group_after_event is not None: - state_group_after_event = await self._storage.state.store_state_group( - event.event_id, - event.room_id, - prev_group=self.prev_group_for_state_group_after_event, - delta_ids=self.delta_ids_to_state_group_after_event, - current_state_ids=None, - ) - else: - assert self.state_map_before_event is not None - state = dict(self.state_map_before_event) - state.update(self.state_delta_due_to_event) - - state_group_after_event = await self._storage.state.store_state_group( - event.event_id, - event.room_id, - prev_group=self.prev_group_for_state_group_after_event, - delta_ids=self.delta_ids_to_state_group_after_event, - current_state_ids=state, - ) - self.prev_group_for_state_group_after_event = ( - self.state_group_before_event - ) + state_group_after_event = await self._storage.state.store_state_group( + event.event_id, + event.room_id, + prev_group=self.state_group_before_event, + delta_ids=self.state_delta_due_to_event, + current_state_ids=None, + ) return EventContext.with_state( storage=self._storage, @@ -434,8 +418,8 @@ async def persist(self, event: EventBase) -> EventContext: state_group_before_event=self.state_group_before_event, state_delta_due_to_event=self.state_delta_due_to_event, partial_state=self.partial_state, - prev_group=self.prev_group_for_state_group_after_event, - delta_ids=self.delta_ids_to_state_group_after_event, + prev_group=self.state_group_before_event, + delta_ids=self.state_delta_due_to_event, ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index fc4fbe1b481b..e877e6f1a12b 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -381,14 +381,14 @@ async def calculate_context_info( state_group_before_event=state_group_before_event, state_group_after_event=state_group_before_event, state_delta_due_to_event={}, - prev_group_for_state_group_after_event=state_group_before_event_prev_group, - delta_ids_to_state_group_after_event=deltas_to_state_group_before_event, + prev_group_for_state_group_before_event=state_group_before_event_prev_group, + delta_ids_to_state_group_before_event=deltas_to_state_group_before_event, partial_state=partial_state, state_map_before_event=state_ids_before_event, ) # - # otherwise, we'll need to create a new state group for after the event + # otherwise, we'll need to set up creating a new state group for after the event # key = (event.type, event.state_key) @@ -411,8 +411,8 @@ async def calculate_context_info( state_group_before_event=state_group_before_event, state_group_after_event=None, state_delta_due_to_event=delta_ids, - prev_group_for_state_group_after_event=state_group_before_event, - delta_ids_to_state_group_after_event=delta_ids, + prev_group_for_state_group_before_event=state_group_before_event_prev_group, + delta_ids_to_state_group_before_event=deltas_to_state_group_before_event, partial_state=partial_state, state_map_before_event=state_ids_before_event, ) From 427d0078f56dedcf773e668525fdfe4ad8dc4ad3 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 8 Feb 2023 13:24:17 -0800 Subject: [PATCH 24/25] make partial state non-optional --- synapse/events/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 0343e226d684..683d647cc7ef 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -348,7 +348,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase): state_delta_due_to_event: Optional[dict] prev_group_for_state_group_before_event: Optional[int] delta_ids_to_state_group_before_event: Optional[StateMap[str]] - partial_state: Optional[bool] + partial_state: bool state_map_before_event: Optional[StateMap[str]] = None async def get_prev_state_ids( From c3bdb6ee0b7b5982973b423c1977433dca651f4f Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 9 Feb 2023 11:11:03 -0800 Subject: [PATCH 25/25] update docstrings --- synapse/events/snapshot.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 683d647cc7ef..e0d82ad81cf9 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -325,15 +325,12 @@ class UnpersistedEventContext(UnpersistedEventContextBase): If the event is a state event, then this is the delta of the state between `state_group` and `state_group_before_event` - prev_group_for_state_group_after_event: - If it is known, ``state_group_after_event``'s previous state group. + prev_group_for_state_group_before_event: + If it is known, ``state_group_before_event``'s previous state group. - If the event is a state event, this is normally the same as - ``state_group_before_event``. - - delta_ids_to_state_group_after_event: - If ``prev_group_for_state_group_after_event`` is not None, the state delta - between ``prev_group_for_state_group_after_event`` and ``state_group_after_event``. + delta_ids_to_state_group_before_event: + If ``prev_group_for_state_group_before_event`` is not None, the state delta + between ``prev_group_for_state_group_before_event`` and ``state_group_before_event``. partial_state: Whether the event has partial state.