-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor create_new_client_event
to use a new parameter, state_event_ids
, which accurately describes the usage with MSC2716 instead of abusing auth_event_ids
#12083
Changes from 6 commits
93bf1f1
6a7879f
f4fde12
0942f53
ffa58ed
09be642
f3e234e
878bdda
225675b
aa0a109
9f902da
289faf1
cd93056
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Refactor `create_new_client_event` to use a new parameter, `state_event_ids`, which accurately describes the usage with [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) instead of abusing `auth_event_ids`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -493,6 +493,7 @@ async def create_event( | |
allow_no_prev_events: bool = False, | ||
prev_event_ids: Optional[List[str]] = None, | ||
auth_event_ids: Optional[List[str]] = None, | ||
state_event_ids: Optional[List[str]] = None, | ||
require_consent: bool = True, | ||
outlier: bool = False, | ||
historical: bool = False, | ||
|
@@ -527,6 +528,16 @@ async def create_event( | |
|
||
If non-None, prev_event_ids must also be provided. | ||
|
||
state_event_ids: | ||
The full state at a given event. This is used particularly by the MSC2716 | ||
/batch_send endpoint which shares the same state across the whole batch. | ||
The state events will be stripped down to only what's necessary to auth | ||
a given event and set as the auth_event_ids. For insertion events, we will | ||
add all of these state events as the explicit state so the rest of the | ||
historical batch can inherit the same state and state_group. This should | ||
normally be left as None, which will cause the auth_event_ids to be | ||
calculated based on the room state at the prev_events. | ||
|
||
require_consent: Whether to check if the requester has | ||
consented to the privacy policy. | ||
|
||
|
@@ -612,6 +623,7 @@ async def create_event( | |
allow_no_prev_events=allow_no_prev_events, | ||
prev_event_ids=prev_event_ids, | ||
auth_event_ids=auth_event_ids, | ||
state_event_ids=state_event_ids, | ||
depth=depth, | ||
) | ||
|
||
|
@@ -772,6 +784,7 @@ async def create_and_send_nonmember_event( | |
allow_no_prev_events: bool = False, | ||
prev_event_ids: Optional[List[str]] = None, | ||
auth_event_ids: Optional[List[str]] = None, | ||
state_event_ids: Optional[List[str]] = None, | ||
ratelimit: bool = True, | ||
txn_id: Optional[str] = None, | ||
ignore_shadow_ban: bool = False, | ||
|
@@ -801,6 +814,15 @@ async def create_and_send_nonmember_event( | |
based on the room state at the prev_events. | ||
|
||
If non-None, prev_event_ids must also be provided. | ||
state_event_ids: | ||
The full state at a given event. This is used particularly by the MSC2716 | ||
/batch_send endpoint which shares the same state across the whole batch. | ||
The state events will be stripped down to only what's necessary to auth | ||
a given event and set as the auth_event_ids. For insertion events, we will | ||
add all of these state events as the explicit state so the rest of the | ||
historical batch can inherit the same state and state_group. This should | ||
normally be left as None, which will cause the auth_event_ids to be | ||
calculated based on the room state at the prev_events. | ||
ratelimit: Whether to rate limit this send. | ||
txn_id: The transaction ID. | ||
ignore_shadow_ban: True if shadow-banned users should be allowed to | ||
|
@@ -856,8 +878,10 @@ async def create_and_send_nonmember_event( | |
requester, | ||
event_dict, | ||
txn_id=txn_id, | ||
allow_no_prev_events=allow_no_prev_events, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was the previous omission of this a bug? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (could we get rid of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes and seems to be why this PR fixes @tulir's issue, #12083 (comment)
I'm leaning to no although technically yes probably. The assertion nature of Also in our case, we only want the first event to have no
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For context: we see a fair few "can't create event with no prev events" errors already, so having an |
||
prev_event_ids=prev_event_ids, | ||
auth_event_ids=auth_event_ids, | ||
state_event_ids=state_event_ids, | ||
outlier=outlier, | ||
historical=historical, | ||
depth=depth, | ||
|
@@ -893,6 +917,7 @@ async def create_new_client_event( | |
allow_no_prev_events: bool = False, | ||
prev_event_ids: Optional[List[str]] = None, | ||
auth_event_ids: Optional[List[str]] = None, | ||
state_event_ids: Optional[List[str]] = None, | ||
depth: Optional[int] = None, | ||
) -> Tuple[EventBase, EventContext]: | ||
"""Create a new event for a local client | ||
|
@@ -915,38 +940,43 @@ async def create_new_client_event( | |
Should normally be left as None, which will cause them to be calculated | ||
based on the room state at the prev_events. | ||
|
||
state_event_ids: | ||
The full state at a given event. This is used particularly by the MSC2716 | ||
/batch_send endpoint which shares the same state across the whole batch. | ||
The state events will be stripped down to only what's necessary to auth | ||
a given event and set as the auth_event_ids. For insertion events, we will | ||
add all of these state events as the explicit state so the rest of the | ||
historical batch can inherit the same state and state_group. This should | ||
normally be left as None, which will cause the auth_event_ids to be | ||
calculated based on the room state at the prev_events. | ||
|
||
depth: Override the depth used to order the event in the DAG. | ||
Should normally be set to None, which will cause the depth to be calculated | ||
based on the prev_events. | ||
|
||
Returns: | ||
Tuple of created event, context | ||
""" | ||
# Strip down the auth_event_ids to only what we need to auth the event. | ||
# Strip down the state_event_ids to only what we need to auth the event. | ||
# For example, we don't need extra m.room.member that don't match event.sender | ||
full_state_ids_at_event = None | ||
if auth_event_ids is not None: | ||
# If auth events are provided, prev events must be also. | ||
if state_event_ids is not None: | ||
# Do a quick check to make sure that prev_event_ids is present to | ||
# make the type-checking around `builder.build` happy. | ||
# prev_event_ids could be an empty array though. | ||
assert prev_event_ids is not None | ||
|
||
# Copy the full auth state before it stripped down | ||
full_state_ids_at_event = auth_event_ids.copy() | ||
|
||
temp_event = await builder.build( | ||
prev_event_ids=prev_event_ids, | ||
auth_event_ids=auth_event_ids, | ||
auth_event_ids=state_event_ids, | ||
depth=depth, | ||
) | ||
auth_events = await self.store.get_events_as_list(auth_event_ids) | ||
state_events = await self.store.get_events_as_list(state_event_ids) | ||
# Create a StateMap[str] | ||
auth_event_state_map = { | ||
(e.type, e.state_key): e.event_id for e in auth_events | ||
} | ||
# Actually strip down and use the necessary auth events | ||
state_map = {(e.type, e.state_key): e.event_id for e in state_events} | ||
# Actually strip down and only use the necessary auth events | ||
auth_event_ids = self._event_auth_handler.compute_auth_events( | ||
event=temp_event, | ||
current_state_ids=auth_event_state_map, | ||
current_state_ids=state_map, | ||
for_verification=False, | ||
) | ||
|
||
|
@@ -989,10 +1019,12 @@ async def create_new_client_event( | |
context = EventContext.for_outlier() | ||
elif ( | ||
event.type == EventTypes.MSC2716_INSERTION | ||
and full_state_ids_at_event | ||
and state_event_ids | ||
and builder.internal_metadata.is_historical() | ||
): | ||
old_state = await self.store.get_events_as_list(full_state_ids_at_event) | ||
# Add explicit state to the insertion event so the rest of the batch | ||
# can inherit the same state and state_group | ||
old_state = await self.store.get_events_as_list(state_event_ids) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some nice context for why we're doing this that was only a PR review comment previously, #10975 (comment) |
||
context = await self.state.compute_event_context(event, old_state=old_state) | ||
else: | ||
context = await self.state.compute_event_context(event) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,12 +121,11 @@ async def create_requester_for_user_id_from_app_service( | |
|
||
return create_requester(user_id, app_service=app_service) | ||
|
||
async def get_most_recent_auth_event_ids_from_event_id_list( | ||
async def get_most_recent_full_state_ids_from_event_id_list( | ||
self, event_ids: List[str] | ||
) -> List[str]: | ||
"""Find the most recent auth event ids (derived from state events) that | ||
allowed that message to be sent. We will use this as a base | ||
to auth our historical messages against. | ||
"""Find the most recent event_id and grab the full state at that event. | ||
We will use this as a base to auth our historical messages against. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making this comment more accurate |
||
|
||
Args: | ||
event_ids: List of event ID's to look at | ||
|
@@ -136,24 +135,23 @@ async def get_most_recent_auth_event_ids_from_event_id_list( | |
""" | ||
|
||
( | ||
most_recent_prev_event_id, | ||
most_recent_event_id, | ||
_, | ||
) = await self.store.get_max_depth_of(event_ids) | ||
# mapping from (type, state_key) -> state_event_id | ||
prev_state_map = await self.state_store.get_state_ids_for_event( | ||
most_recent_prev_event_id | ||
most_recent_event_id | ||
) | ||
# List of state event ID's | ||
prev_state_ids = list(prev_state_map.values()) | ||
auth_event_ids = prev_state_ids | ||
full_state_ids = list(prev_state_map.values()) | ||
|
||
return auth_event_ids | ||
return full_state_ids | ||
|
||
async def persist_state_events_at_start( | ||
self, | ||
state_events_at_start: List[JsonDict], | ||
room_id: str, | ||
initial_auth_event_ids: List[str], | ||
initial_state_ids_at_event: List[str], | ||
app_service_requester: Requester, | ||
) -> List[str]: | ||
"""Takes all `state_events_at_start` event dictionaries and creates/persists | ||
|
@@ -164,10 +162,13 @@ async def persist_state_events_at_start( | |
Args: | ||
state_events_at_start: | ||
room_id: Room where you want the events persisted in. | ||
initial_auth_event_ids: These will be the auth_events for the first | ||
state event created. Each event created afterwards will be | ||
added to the list of auth events for the next state event | ||
created. | ||
initial_state_ids_at_event: | ||
The base set of state of the historical batch. When persisting each of | ||
the events in state_events_at_start, the state will be stripped down to | ||
only what's necessary to auth the given event and set as the | ||
auth_event_ids. After each state event from state_events_at_start is | ||
persisted, it will be added to the ongoing list of state_event_ids for | ||
the next state event to be persisted with. | ||
app_service_requester: The requester of an application service. | ||
|
||
Returns: | ||
|
@@ -176,7 +177,7 @@ async def persist_state_events_at_start( | |
assert app_service_requester.app_service | ||
|
||
state_event_ids_at_start = [] | ||
auth_event_ids = initial_auth_event_ids.copy() | ||
state_event_ids = initial_state_ids_at_event.copy() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Side note: all these copies will be expensive for rooms with a lot of state, which will affect how quickly we can batch insert history.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how exactly to side-step this.
And we There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For reference on why we have to define
|
||
|
||
# Make the state events float off on their own by specifying no | ||
# prev_events for the first one in the chain so we don't have a bunch of | ||
|
@@ -189,9 +190,9 @@ async def persist_state_events_at_start( | |
) | ||
|
||
logger.debug( | ||
"RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", | ||
"RoomBatchSendEventRestServlet inserting state_event=%s, state_event_ids=%s", | ||
state_event, | ||
auth_event_ids, | ||
state_event_ids, | ||
) | ||
|
||
event_dict = { | ||
|
@@ -226,7 +227,7 @@ async def persist_state_events_at_start( | |
# Make sure to use a copy of this list because we modify it | ||
# later in the loop here. Otherwise it will be the same | ||
# reference and also update in the event when we append later. | ||
auth_event_ids=auth_event_ids.copy(), | ||
state_event_ids=state_event_ids.copy(), | ||
) | ||
else: | ||
# TODO: Add some complement tests that adds state that is not member joins | ||
|
@@ -249,12 +250,12 @@ async def persist_state_events_at_start( | |
# Make sure to use a copy of this list because we modify it | ||
# later in the loop here. Otherwise it will be the same | ||
# reference and also update in the event when we append later. | ||
auth_event_ids=auth_event_ids.copy(), | ||
state_event_ids=state_event_ids.copy(), | ||
) | ||
event_id = event.event_id | ||
|
||
state_event_ids_at_start.append(event_id) | ||
auth_event_ids.append(event_id) | ||
state_event_ids.append(event_id) | ||
# Connect all the state in a floating chain | ||
prev_event_ids_for_state_chain = [event_id] | ||
|
||
|
@@ -265,7 +266,7 @@ async def persist_historical_events( | |
events_to_create: List[JsonDict], | ||
room_id: str, | ||
inherited_depth: int, | ||
auth_event_ids: List[str], | ||
state_event_ids: List[str], | ||
app_service_requester: Requester, | ||
) -> List[str]: | ||
"""Create and persists all events provided sequentially. Handles the | ||
|
@@ -281,8 +282,13 @@ async def persist_historical_events( | |
room_id: Room where you want the events persisted in. | ||
inherited_depth: The depth to create the events at (you will | ||
probably by calling inherit_depth_from_prev_ids(...)). | ||
auth_event_ids: Define which events allow you to create the given | ||
event in the room. | ||
state_event_ids: | ||
The full state at a given event. We share the same state across the whole | ||
historical batch. The state events will be stripped down to only what's | ||
necessary to auth a given event and set as the auth_event_ids. For | ||
insertion events, we will add all of these state events as the explicit | ||
state so the rest of the historical batch can inherit the same state and | ||
state_group. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. We probably don't actually want this behaviour, instead we want to only set the state for the insertion event and allow subsequent events to inherit the state from their prev event. When just sending messages this doesn't matter, but if we send state events in the batch then the current behaviour means that the state event won't become part of the state. For example, if a user joins half way through the batch and they won't be able to send the message (assuming they weren't added as part of I think its trivial to change, that we only set Or am I forgetting how this works yet again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is what currently is happening. And as far as I can tell, the comment explains it that way too. For non-insertion events, we only use For insertion events, we set the state as
It's not expected to have any state events in the batch of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really don't know why I get confused about this every time. Ah well. From the doc-comment:
Ok, so I think is what is getting me. I think what you mean is that the state for the historical batch is all derived from the state at the insertion event? My reading of it was "all events in the batch have the same state", which is not true?
Why do we need this for non-insertion events? Can't we just let the code derive it as normal from the state? That way the only special case is handling the insertion event.
This means that the timeline will never see when a user joins/leaves, which may be fine for the Gitter use case, but we probably want to support it more generally than other use cases. I think we get this for free though, if we stop specifying the auth events directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ahh, this does work and is a good simplification! Updated comments to be more nuanced here too ⏩ I also tried doing something similar for the Overall, more understanding of why things are the way they are from this prompt to change 👍
The problem is that the events are persisted in reverse-chronological order so they have the correct As an example of why this fails, if we There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh yes, nyargh. I think we want to fix that, but not in this PR I guess. My hunch is that we want to try and persist the entire batch of events all in one go, rather than doing them one-by-one in reverse order. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my own reference, here is a Complement test around adding state in the middle of the Complement test for "State in the middle of the events batch resolves for subsequent events"t.Run("State in the middle of the events batch resolves for subsequent events", func(t *testing.T) {
t.Parallel()
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)
// Create the "live" event we are going to import our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()
events := createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 1)
// Add some events from Ricky who joined in the middle of the events batch
virtualUserID2 := "@ricky:hs1"
ensureVirtualUserRegistered(t, as, "ricky")
events = append(
events,
createJoinStateEventsForBatchSendRequest([]string{virtualUserID2}, timeAfterEventBefore)...,
)
events = append(
events,
createMessageEventsForBatchSendRequest([]string{virtualUserID2}, timeAfterEventBefore, 1)...,
)
// Import a historical event
batchSendHistoricalMessages(
t,
as,
roomID,
eventIdBefore,
"",
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
events,
// Status
200,
)
}) |
||
app_service_requester: The requester of an application service. | ||
|
||
Returns: | ||
|
@@ -325,7 +331,7 @@ async def persist_historical_events( | |
# The rest should hang off each other in a chain. | ||
allow_no_prev_events=index == 0, | ||
prev_event_ids=event_dict.get("prev_events"), | ||
auth_event_ids=auth_event_ids, | ||
state_event_ids=state_event_ids, | ||
historical=True, | ||
depth=inherited_depth, | ||
) | ||
|
@@ -343,10 +349,10 @@ async def persist_historical_events( | |
) | ||
|
||
logger.debug( | ||
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", | ||
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, state_event_ids=%s", | ||
event, | ||
prev_event_ids, | ||
auth_event_ids, | ||
state_event_ids, | ||
) | ||
|
||
events_to_persist.append((event, context)) | ||
|
@@ -376,7 +382,7 @@ async def handle_batch_of_events( | |
room_id: str, | ||
batch_id_to_connect_to: str, | ||
inherited_depth: int, | ||
auth_event_ids: List[str], | ||
state_event_ids: List[str], | ||
app_service_requester: Requester, | ||
) -> Tuple[List[str], str]: | ||
""" | ||
|
@@ -391,8 +397,13 @@ async def handle_batch_of_events( | |
want this batch to connect to. | ||
inherited_depth: The depth to create the events at (you will | ||
probably by calling inherit_depth_from_prev_ids(...)). | ||
auth_event_ids: Define which events allow you to create the given | ||
event in the room. | ||
state_event_ids: | ||
The full state at a given event. We share the same state across the whole | ||
historical batch. The state events will be stripped down to only what's | ||
necessary to auth a given event and set as the auth_event_ids. For | ||
insertion events, we will add all of these state events as the explicit | ||
state so the rest of the historical batch can inherit the same state and | ||
state_group. | ||
app_service_requester: The requester of an application service. | ||
|
||
Returns: | ||
|
@@ -438,7 +449,7 @@ async def handle_batch_of_events( | |
events_to_create=events_to_create, | ||
room_id=room_id, | ||
inherited_depth=inherited_depth, | ||
auth_event_ids=auth_event_ids, | ||
state_event_ids=state_event_ids, | ||
app_service_requester=app_service_requester, | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's broken about it?
This PR doesn't fix anything (just refactors a parameter differently)
Here are the Complement tests passing on
v1.53.0
Synapse tag. The only thing broken is Complement itself which is being fixed in matrix-org/complement#324There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to always throw
Happened on 1.53, so I tested this branch and it worked, then I tried plain develop and it failed again
Maybe specific to event creation workers again? Did Complement ever get worker support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stack trace (I think this was from develop):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And of course a simple batch send with curl doesn't reproduce it, while the bridge does consistently on every attempt 🙃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I was able to reproduce it with curl by sending a non-member event in
state_events_at_start
. Happens on develop and 1.53.0, doesn't happen after merging this PR on top of develop. Doesn't happen on 1.52.0Is non-member state events included in complement tests? I've stopped sending them from the bridge now (it only sent a dummy event as a workaround for #11188), but it should probably be tested (or rejected with a proper error, if it's not supposed to be allowed in
state_events_at_start
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😫 Ayee, that's one area I haven't really used yet.
Relevant code:
synapse/handlers/room_batch.py#L232-L253
Feel free to create a separate issue to track this and will create a fix shortly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the specific issue seems to go away with this PR. No idea why, but that's why I thought this was the fix. Anyway, I made #12110 for either adding tests or removing the branch.