Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #4799 from matrix-org/rav/clean_up_replication_code
Browse files Browse the repository at this point in the history
Clean ups in replication notifier
  • Loading branch information
erikjohnston authored Mar 5, 2019
2 parents c3c542b + eaa9f43 commit 9e8bca5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 60 deletions.
1 change: 1 addition & 0 deletions changelog.d/4799.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up some replication code.
65 changes: 5 additions & 60 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ def __init__(self, hs):
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
)

self.replication_deferred = ObservableDeferred(defer.Deferred())

# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
Expand All @@ -205,7 +203,9 @@ def count_listeners():

def add_replication_callback(self, cb):
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
wrapped with run_as_background_process.
"""
self.replication_callbacks.append(cb)

Expand Down Expand Up @@ -517,60 +517,5 @@ def _user_joined_room(self, user_id, room_id):

def notify_replication(self):
"""Notify the any replication listeners that there's a new event"""
with PreserveLoggingContext():
deferred = self.replication_deferred
self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None)

# the callbacks may well outlast the current request, so we run
# them in the sentinel logcontext.
#
# (ideally it would be up to the callbacks to know if they were
# starting off background processes and drop the logcontext
# accordingly, but that requires more changes)
for cb in self.replication_callbacks:
cb()

@defer.inlineCallbacks
def wait_for_replication(self, callback, timeout):
"""Wait for an event to happen.
Args:
callback: Gets called whenever an event happens. If this returns a
truthy value then ``wait_for_replication`` returns, otherwise
it waits for another event.
timeout: How many milliseconds to wait for callback return a truthy
value.
Returns:
A deferred that resolves with the value returned by the callback.
"""
listener = _NotificationListener(None)

end_time = self.clock.time_msec() + timeout

while True:
listener.deferred = self.replication_deferred.observe()
result = yield callback()
if result:
break

now = self.clock.time_msec()
if end_time <= now:
break

listener.deferred = timeout_deferred(
listener.deferred,
timeout=(end_time - now) / 1000.,
reactor=self.hs.get_reactor(),
)

try:
with PreserveLoggingContext():
yield listener.deferred
except defer.TimeoutError:
break
except defer.CancelledError:
break

defer.returnValue(result)
for cb in self.replication_callbacks:
cb()

0 comments on commit 9e8bca5

Please sign in to comment.