Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split out DB writes in federation handler #3621

Merged
merged 5 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3621.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor FederationHandler to move DB writes into separate functions
175 changes: 88 additions & 87 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):
)

try:
event_stream_id, max_stream_id = yield self._persist_auth_tree(
yield self._persist_auth_tree(
origin, auth_chain, state, event
)
except AuthError as e:
Expand Down Expand Up @@ -444,7 +444,7 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):
yield self._handle_new_events(origin, event_infos)

try:
context, event_stream_id, max_stream_id = yield self._handle_new_event(
context = yield self._handle_new_event(
origin,
event,
state=state,
Expand All @@ -469,17 +469,6 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):
except StoreError:
logger.exception("Failed to store room.")

extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)

self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)

if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
Expand All @@ -501,7 +490,7 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):

if newly_joined:
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
yield self.user_joined_room(user, event.room_id)

@log_function
@defer.inlineCallbacks
Expand Down Expand Up @@ -942,7 +931,7 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):

self.room_queues[room_id] = []

yield self.store.clean_room_for_join(room_id)
yield self._clean_room_for_join(room_id)

handled_events = set()

Expand Down Expand Up @@ -981,15 +970,10 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
# FIXME
pass

event_stream_id, max_stream_id = yield self._persist_auth_tree(
yield self._persist_auth_tree(
origin, auth_chain, state, event
)

self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[joinee]
)

logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
Expand Down Expand Up @@ -1084,7 +1068,7 @@ def on_send_join_request(self, origin, pdu):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin

context, event_stream_id, max_stream_id = yield self._handle_new_event(
context = yield self._handle_new_event(
origin, event
)

Expand All @@ -1094,20 +1078,10 @@ def on_send_join_request(self, origin, pdu):
event.signatures,
)

extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)

self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
yield self.user_joined_room(user, event.room_id)

prev_state_ids = yield context.get_prev_state_ids(self.store)

Expand Down Expand Up @@ -1176,17 +1150,7 @@ def on_invite_request(self, origin, pdu):
)

context = yield self.state_handler.compute_event_context(event)

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
)

target_user = UserID.from_string(event.state_key)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
yield self._persist_events([(event, context)])

defer.returnValue(event)

Expand Down Expand Up @@ -1217,17 +1181,7 @@ def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
)

context = yield self.state_handler.compute_event_context(event)

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
)

target_user = UserID.from_string(event.state_key)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
yield self._persist_events([(event, context)])

defer.returnValue(event)

Expand Down Expand Up @@ -1318,7 +1272,7 @@ def on_send_leave_request(self, origin, pdu):

event.internal_metadata.outlier = False

context, event_stream_id, max_stream_id = yield self._handle_new_event(
yield self._handle_new_event(
origin, event
)

Expand All @@ -1328,16 +1282,6 @@ def on_send_leave_request(self, origin, pdu):
event.signatures,
)

extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)

self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

defer.returnValue(None)

@defer.inlineCallbacks
Expand Down Expand Up @@ -1472,9 +1416,8 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None,
event, context
)

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
yield self._persist_events(
[(event, context)],
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
Expand All @@ -1487,22 +1430,16 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None,

six.reraise(tp, value, tb)

if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)

defer.returnValue((context, event_stream_id, max_stream_id))
defer.returnValue(context)

@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
"""Creates the appropriate contexts and persists events. The events
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.

Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
Expand All @@ -1517,7 +1454,7 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
], consumeErrors=True,
))

yield self.store.persist_events(
yield self._persist_events(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can you update the docstring to say that we may notify

[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
Expand All @@ -1529,7 +1466,8 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
def _persist_auth_tree(self, origin, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event seperately.
Persists the event separately. Notifies about the persisted events
where appropriate.

Will attempt to fetch missing auth events.

Expand All @@ -1540,8 +1478,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
event (Event)

Returns:
2-tuple of (event_stream_id, max_stream_id) from the persist_event
call for `event`
Deferred
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
Expand Down Expand Up @@ -1605,7 +1542,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

yield self.store.persist_events(
yield self._persist_events(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
Expand All @@ -1616,12 +1553,10 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
event, old_state=state
)

event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
yield self._persist_events(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to keep track of which methods do the notification and which do not. Could you update the docstring on persist_auth_tree?

[(event, new_event_context)],
)

defer.returnValue((event_stream_id, max_stream_id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the docstring?


@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
"""
Expand Down Expand Up @@ -2347,3 +2282,69 @@ def _check_key_revocation(self, public_key, url):
)
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")

@defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.

Args:
event_and_contexts(list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether these events are a result of
backfilling or not

Returns:
Deferred
"""
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)

if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)

def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
event or not.

Args:
event (FrozenEvent)
max_stream_id (int): The max_stream_id returned by persist_events
"""

extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key

# We notify for memberships if its an invite for one of our
# users
if event.internal_metadata.is_outlier():
if event.membership != Membership.INVITE:
if not self.is_mine_id(target_user_id):
return

target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
elif event.internal_metadata.is_outlier():
return

event_stream_id = event.internal_metadata.stream_ordering
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)

logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)

def _clean_room_for_join(self, room_id):
return self.store.clean_room_for_join(room_id)

def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
return user_joined_room(self.distributor, user, room_id)
14 changes: 12 additions & 2 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,18 @@ def __init__(self, db_conn, hs):

self._state_resolution_handler = hs.get_state_resolution_handler()

@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled: ?
backfilled (bool): Whether the results are retrieved from federation
via backfill or not. Used to determine if they're "new" events
which might update the current state etc.

Returns:
Deferred[int]: the stream ordering of the latest persisted event
"""
partitioned = {}
for event, ctx in events_and_contexts:
Expand All @@ -253,10 +259,14 @@ def persist_events(self, events_and_contexts, backfilled=False):
for room_id in partitioned:
self._maybe_start_persisting(room_id)

return make_deferred_yieldable(
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)

max_persisted_id = yield self._stream_id_gen.get_current_token()

defer.returnValue(max_persisted_id)

@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False):
Expand Down