From c4198fa5e586e1fa9d646acbbe9e3fdd26a59952 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 7 Jun 2023 18:27:45 +0200 Subject: [PATCH 1/8] Regularly try to wake up dests instead of waiting for next PDU/EDU --- synapse/federation/sender/__init__.py | 31 +++++++++----------- tests/federation/test_federation_catch_up.py | 6 ---- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index f3bdc5a4d220..1ce39004b4d1 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -184,14 +184,18 @@ "Total number of PDUs queued for sending across all destinations", ) -# Time (in s) after Synapse's startup that we will begin to wake up destinations -# that have catch-up outstanding. -CATCH_UP_STARTUP_DELAY_SEC = 15 +# Time (in s) to wait before trying to wake up destinations that have +# catch-up outstanding. This will also be the delay applied at startup +# before trying the same. +# Please note that rate limiting still applies, so while the loop is +# executed every X seconds the destinations may not be wake up because +# they are being rate limited following previous attempt failures. +WAKEUP_RETRY_PERIOD_SEC = 60 # Time (in s) to wait in between waking up each destination, i.e. one destination -# will be woken up every seconds after Synapse's startup until we have woken -# every destination has outstanding catch-up. -CATCH_UP_STARTUP_INTERVAL_SEC = 5 +# will be woken up every seconds until we have woken every destination +# has outstanding catch-up. +WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 1 class AbstractFederationSender(metaclass=abc.ABCMeta): @@ -415,12 +419,10 @@ def __init__(self, hs: "HomeServer"): / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second ) - # wake up destinations that have outstanding PDUs to be caught up - self._catchup_after_startup_timer: Optional[ - IDelayedCall - ] = self.clock.call_later( - CATCH_UP_STARTUP_DELAY_SEC, + # Regularly wake up destinations that have outstanding PDUs to be caught up + self.clock.looping_call( run_as_background_process, + WAKEUP_RETRY_PERIOD_SEC * 1000.0, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) @@ -964,11 +966,6 @@ async def _wake_destinations_needing_catchup(self) -> None: await self.store.get_catch_up_outstanding_destinations(last_processed) ) - if not destinations_to_wake: - # finished waking all destinations! - self._catchup_after_startup_timer = None - break - last_processed = destinations_to_wake[-1] destinations_to_wake = [ @@ -983,4 +980,4 @@ async def _wake_destinations_needing_catchup(self) -> None: last_processed, ) self.wake_destination(destination) - await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) + await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 391ae5170729..f214d105cc2d 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -438,12 +438,6 @@ def wake_destination_track(destination: str) -> None: self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment] - # cancel the pre-existing timer for _wake_destinations_needing_catchup - # this is because we are calling it manually rather than waiting for it - # to be called automatically - assert self.federation_sender._catchup_after_startup_timer is not None - self.federation_sender._catchup_after_startup_timer.cancel() - self.get_success( self.federation_sender._wake_destinations_needing_catchup(), by=5.0 ) From 34dae7e7ef599f1dee8dd59ad1dc5114e462dfdc Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 7 Jun 2023 18:32:39 +0200 Subject: [PATCH 2/8] Add changelog --- changelog.d/15743.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15743.misc diff --git a/changelog.d/15743.misc b/changelog.d/15743.misc new file mode 100644 index 000000000000..b95eed929e4b --- /dev/null +++ b/changelog.d/15743.misc @@ -0,0 +1 @@ +Regularly try to send transactions to other servers after they failed instead of waiting for a new event to be available before trying. From 081a7a67e091c8d1a56c222b10b2dfbeab0e3e4f Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 7 Jun 2023 18:34:26 +0200 Subject: [PATCH 3/8] Remove import --- synapse/federation/sender/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 1ce39004b4d1..1b7662ff4851 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -145,7 +145,6 @@ from typing_extensions import Literal from twisted.internet import defer -from twisted.internet.interfaces import IDelayedCall import synapse.metrics from synapse.api.presence import UserPresenceState From 0c70f6551baa237a28b7ac2031f5ef8eaa6d2325 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 8 Jun 2023 11:27:35 +0200 Subject: [PATCH 4/8] Missing break + fix test --- synapse/federation/sender/__init__.py | 4 ++++ tests/federation/test_federation_catch_up.py | 14 +++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 1b7662ff4851..a0dc55c39583 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -965,6 +965,10 @@ async def _wake_destinations_needing_catchup(self) -> None: await self.store.get_catch_up_outstanding_destinations(last_processed) ) + if not destinations_to_wake: + # finished waking all destinations! + break + last_processed = destinations_to_wake[-1] destinations_to_wake = [ diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index f214d105cc2d..5316ac4ffa82 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -431,22 +431,22 @@ def test_catch_up_on_synapse_startup(self) -> None: # ACT: call _wake_destinations_needing_catchup # patch wake_destination to just count the destinations instead - woken = [] + woken = set() def wake_destination_track(destination: str) -> None: - woken.append(destination) + woken.add(destination) self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment] - self.get_success( - self.federation_sender._wake_destinations_needing_catchup(), by=5.0 - ) + self.pump(by=5.0) # ASSERT (_wake_destinations_needing_catchup): # - all remotes are woken up, save for zzzerver self.assertNotIn("zzzerver", woken) - # - all destinations are woken exactly once; they appear once in woken. - self.assertCountEqual(woken, server_names[:-1]) + # - all destinations are woken, potentially more than once, since the + # wake up is called regularly and we don't ack in this test that at transaction + # has been successfully sent. + self.assertCountEqual(woken, set(server_names[:-1])) def test_not_latest_event(self) -> None: """Test that we send the latest event in the room even if its not ours.""" From 67d7e5c62c0621fc3f170b2e625a841bb9793d4e Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 8 Jun 2023 11:29:35 +0200 Subject: [PATCH 5/8] typo --- tests/federation/test_federation_catch_up.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 5316ac4ffa82..77b02f3c7e34 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -444,7 +444,7 @@ def wake_destination_track(destination: str) -> None: # - all remotes are woken up, save for zzzerver self.assertNotIn("zzzerver", woken) # - all destinations are woken, potentially more than once, since the - # wake up is called regularly and we don't ack in this test that at transaction + # wake up is called regularly and we don't ack in this test that a transaction # has been successfully sent. self.assertCountEqual(woken, set(server_names[:-1])) From b73b4f40e9cb8d4d5db1d449c4752275178d2ce7 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 8 Jun 2023 14:15:57 +0200 Subject: [PATCH 6/8] interval back to 5s --- synapse/federation/sender/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index a0dc55c39583..999973cdafc4 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -194,7 +194,7 @@ # Time (in s) to wait in between waking up each destination, i.e. one destination # will be woken up every seconds until we have woken every destination # has outstanding catch-up. -WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 1 +WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5 class AbstractFederationSender(metaclass=abc.ABCMeta): From 64eff7d6a932e562a8b2f69a7f7a93f6f4eea7bd Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 9 Jun 2023 15:42:26 +0200 Subject: [PATCH 7/8] Update comment --- synapse/federation/sender/__init__.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 999973cdafc4..97abbdee18e5 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -109,10 +109,8 @@ If a remote server is unreachable over federation, we back off from that server, with an exponentially-increasing retry interval. -Whilst we don't automatically retry after the interval, we prevent making new attempts -until such time as the back-off has cleared. -Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission -loop resumes and empties the queue by making federation requests. +We automatically retry after the retry interval expires (roughly, the logic to do so +being triggered every minute). If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent unbounded growth) and Catch-Up Mode is entered. From f1731bf31ea514e7b46991046d30060ca6fb902b Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 15 Jun 2023 13:18:13 +0200 Subject: [PATCH 8/8] Add comment --- tests/federation/test_federation_catch_up.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 77b02f3c7e34..b290b020a274 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -438,6 +438,8 @@ def wake_destination_track(destination: str) -> None: self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment] + # We wait quite long so that all dests can be woken up, since there is a delay + # between them. self.pump(by=5.0) # ASSERT (_wake_destinations_needing_catchup):