Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 8dd3e0e

Browse files
authored
Immediately retry any requests that have backed off when a server comes back online. (#12500)
Otherwise it can take up to a minute for any in-flight `/send` requests to be retried.
1 parent ade3008 commit 8dd3e0e

File tree

6 files changed

+179
-6
lines changed

6 files changed

+179
-6
lines changed

changelog.d/12500.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Immediately retry any requests that have backed off when a server comes back online.

synapse/http/matrixfederationclient.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
from synapse.logging.opentracing import set_tag, start_active_span, tags
7474
from synapse.types import JsonDict
7575
from synapse.util import json_decoder
76-
from synapse.util.async_helpers import timeout_deferred
76+
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
7777
from synapse.util.metrics import Measure
7878

7979
if TYPE_CHECKING:
@@ -353,6 +353,13 @@ def schedule(x):
353353

354354
self._cooperator = Cooperator(scheduler=schedule)
355355

356+
self._sleeper = AwakenableSleeper(self.reactor)
357+
358+
def wake_destination(self, destination: str) -> None:
359+
"""Called when the remote server may have come back online."""
360+
361+
self._sleeper.wake(destination)
362+
356363
async def _send_request_with_optional_trailing_slash(
357364
self,
358365
request: MatrixFederationRequest,
@@ -474,6 +481,8 @@ async def _send_request(
474481
self._store,
475482
backoff_on_404=backoff_on_404,
476483
ignore_backoff=ignore_backoff,
484+
notifier=self.hs.get_notifier(),
485+
replication_client=self.hs.get_replication_command_handler(),
477486
)
478487

479488
method_bytes = request.method.encode("ascii")
@@ -664,7 +673,9 @@ async def _send_request(
664673
delay,
665674
)
666675

667-
await self.clock.sleep(delay)
676+
# Sleep for the calculated delay, or wake up immediately
677+
# if we get notified that the server is back up.
678+
await self._sleeper.sleep(request.destination, delay * 1000)
668679
retries_left -= 1
669680
else:
670681
raise

synapse/notifier.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,7 @@ def __init__(self, hs: "HomeServer"):
228228
# Called when there are new things to stream over replication
229229
self.replication_callbacks: List[Callable[[], None]] = []
230230

231-
# Called when remote servers have come back online after having been
232-
# down.
233-
self.remote_server_up_callbacks: List[Callable[[str], None]] = []
231+
self._federation_client = hs.get_federation_http_client()
234232

235233
self._third_party_rules = hs.get_third_party_event_rules()
236234

@@ -731,3 +729,7 @@ def notify_remote_server_up(self, server: str) -> None:
731729
# circular dependencies.
732730
if self.federation_sender:
733731
self.federation_sender.wake_destination(server)
732+
733+
# Tell the federation client about the fact the server is back up, so
734+
# that any in flight requests can be immediately retried.
735+
self._federation_client.wake_destination(server)

synapse/util/async_helpers.py

+57
Original file line numberDiff line numberDiff line change
@@ -778,3 +778,60 @@ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None:
778778
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
779779
deferred.chainDeferred(new_deferred)
780780
return new_deferred
781+
782+
783+
class AwakenableSleeper:
784+
"""Allows explicitly waking up deferreds related to an entity that are
785+
currently sleeping.
786+
"""
787+
788+
def __init__(self, reactor: IReactorTime) -> None:
789+
self._streams: Dict[str, Set[defer.Deferred[None]]] = {}
790+
self._reactor = reactor
791+
792+
def wake(self, name: str) -> None:
793+
"""Wake everything related to `name` that is currently sleeping."""
794+
stream_set = self._streams.pop(name, set())
795+
for deferred in stream_set:
796+
try:
797+
with PreserveLoggingContext():
798+
deferred.callback(None)
799+
except Exception:
800+
pass
801+
802+
async def sleep(self, name: str, delay_ms: int) -> None:
803+
"""Sleep for the given number of milliseconds, or return if the given
804+
`name` is explicitly woken up.
805+
"""
806+
807+
# Create a deferred that gets called in N seconds
808+
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
809+
call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None)
810+
811+
# Create a deferred that will get called if `wake` is called with
812+
# the same `name`.
813+
stream_set = self._streams.setdefault(name, set())
814+
notify_deferred: "defer.Deferred[None]" = defer.Deferred()
815+
stream_set.add(notify_deferred)
816+
817+
try:
818+
# Wait for either the delay or for `wake` to be called.
819+
await make_deferred_yieldable(
820+
defer.DeferredList(
821+
[sleep_deferred, notify_deferred],
822+
fireOnOneCallback=True,
823+
fireOnOneErrback=True,
824+
consumeErrors=True,
825+
)
826+
)
827+
finally:
828+
# Clean up the state
829+
curr_stream_set = self._streams.get(name)
830+
if curr_stream_set is not None:
831+
curr_stream_set.discard(notify_deferred)
832+
if len(curr_stream_set) == 0:
833+
self._streams.pop(name)
834+
835+
# Cancel the sleep if we were woken up
836+
if call.active():
837+
call.cancel()

synapse/util/retryutils.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
import logging
1515
import random
1616
from types import TracebackType
17-
from typing import Any, Optional, Type
17+
from typing import TYPE_CHECKING, Any, Optional, Type
1818

1919
import synapse.logging.context
2020
from synapse.api.errors import CodeMessageException
2121
from synapse.storage import DataStore
2222
from synapse.util import Clock
2323

24+
if TYPE_CHECKING:
25+
from synapse.notifier import Notifier
26+
from synapse.replication.tcp.handler import ReplicationCommandHandler
27+
2428
logger = logging.getLogger(__name__)
2529

2630
# the initial backoff, after the first transaction fails
@@ -131,6 +135,8 @@ def __init__(
131135
retry_interval: int,
132136
backoff_on_404: bool = False,
133137
backoff_on_failure: bool = True,
138+
notifier: Optional["Notifier"] = None,
139+
replication_client: Optional["ReplicationCommandHandler"] = None,
134140
):
135141
"""Marks the destination as "down" if an exception is thrown in the
136142
context, except for CodeMessageException with code < 500.
@@ -160,6 +166,9 @@ def __init__(
160166
self.backoff_on_404 = backoff_on_404
161167
self.backoff_on_failure = backoff_on_failure
162168

169+
self.notifier = notifier
170+
self.replication_client = replication_client
171+
163172
def __enter__(self) -> None:
164173
pass
165174

@@ -239,6 +248,19 @@ async def store_retry_timings() -> None:
239248
retry_last_ts,
240249
self.retry_interval,
241250
)
251+
252+
if self.notifier:
253+
# Inform the relevant places that the remote server is back up.
254+
self.notifier.notify_remote_server_up(self.destination)
255+
256+
if self.replication_client:
257+
# If we're on a worker we try and inform master about this. The
258+
# replication client doesn't hook into the notifier to avoid
259+
# infinite loops where we send a `REMOTE_SERVER_UP` command to
260+
# master, which then echoes it back to us which in turn pokes
261+
# the notifier.
262+
self.replication_client.send_remote_server_up(self.destination)
263+
242264
except Exception:
243265
logger.exception("Failed to store destination_retry_timings")
244266

tests/util/test_async_helpers.py

+80
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
make_deferred_yieldable,
2929
)
3030
from synapse.util.async_helpers import (
31+
AwakenableSleeper,
3132
ObservableDeferred,
3233
concurrently_execute,
3334
delay_cancellation,
3435
stop_cancellation,
3536
timeout_deferred,
3637
)
3738

39+
from tests.server import get_clock
3840
from tests.unittest import TestCase
3941

4042

@@ -496,3 +498,81 @@ async def outer():
496498
# logging context.
497499
blocking_d.callback(None)
498500
self.successResultOf(d)
501+
502+
503+
class AwakenableSleeperTests(TestCase):
504+
"Tests AwakenableSleeper"
505+
506+
def test_sleep(self):
507+
reactor, _ = get_clock()
508+
sleeper = AwakenableSleeper(reactor)
509+
510+
d = defer.ensureDeferred(sleeper.sleep("name", 1000))
511+
512+
reactor.pump([0.0])
513+
self.assertFalse(d.called)
514+
515+
reactor.advance(0.5)
516+
self.assertFalse(d.called)
517+
518+
reactor.advance(0.6)
519+
self.assertTrue(d.called)
520+
521+
def test_explicit_wake(self):
522+
reactor, _ = get_clock()
523+
sleeper = AwakenableSleeper(reactor)
524+
525+
d = defer.ensureDeferred(sleeper.sleep("name", 1000))
526+
527+
reactor.pump([0.0])
528+
self.assertFalse(d.called)
529+
530+
reactor.advance(0.5)
531+
self.assertFalse(d.called)
532+
533+
sleeper.wake("name")
534+
self.assertTrue(d.called)
535+
536+
reactor.advance(0.6)
537+
538+
def test_multiple_sleepers_timeout(self):
539+
reactor, _ = get_clock()
540+
sleeper = AwakenableSleeper(reactor)
541+
542+
d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))
543+
544+
reactor.advance(0.6)
545+
self.assertFalse(d1.called)
546+
547+
# Add another sleeper
548+
d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))
549+
550+
# Only the first sleep should time out now.
551+
reactor.advance(0.6)
552+
self.assertTrue(d1.called)
553+
self.assertFalse(d2.called)
554+
555+
reactor.advance(0.6)
556+
self.assertTrue(d2.called)
557+
558+
def test_multiple_sleepers_wake(self):
559+
reactor, _ = get_clock()
560+
sleeper = AwakenableSleeper(reactor)
561+
562+
d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))
563+
564+
reactor.advance(0.5)
565+
self.assertFalse(d1.called)
566+
567+
# Add another sleeper
568+
d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))
569+
570+
# Neither should fire yet
571+
reactor.advance(0.3)
572+
self.assertFalse(d1.called)
573+
self.assertFalse(d2.called)
574+
575+
# Explicitly waking both up works
576+
sleeper.wake("name")
577+
self.assertTrue(d1.called)
578+
self.assertTrue(d2.called)

0 commit comments

Comments
 (0)