From 7a4aacd0103cbf0dc7b8bc3bb604c5820ee7fea8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Oct 2023 14:55:51 +0100 Subject: [PATCH 1/3] Reduce replication traffic due to reflected cache stream POSITION --- synapse/replication/tcp/resource.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 1d9a29d22ee6..558728a73021 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -27,7 +27,7 @@ from synapse.replication.tcp.commands import PositionCommand from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream -from synapse.replication.tcp.streams._base import StreamRow, Token +from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -205,6 +205,13 @@ async def _run_notifier_loop(self) -> None: # send, so we send a `POSITION` to inform other # workers of the updated position. + # We skip this for the `caches` stream as a) it + # generates a lot of traffic as every worker would + # echo each write, and b) nothing cares if a given + # worker's caches stream position lags. + if stream.NAME == CachesStream.NAME: + continue + # Note: `last_token` may not *actually* be the # last token we sent out in a RDATA or POSITION. # This can happen if we sent out an RDATA for From 60693489d3924b1b0f9f98401a255617708ff3f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Oct 2023 14:56:38 +0100 Subject: [PATCH 2/3] Newsfile --- changelog.d/16557.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16557.bugfix diff --git a/changelog.d/16557.bugfix b/changelog.d/16557.bugfix new file mode 100644 index 000000000000..4f4a0380cd77 --- /dev/null +++ b/changelog.d/16557.bugfix @@ -0,0 +1 @@ +Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`. From 22b2cca388dc7a59259a686a1dff012206f2dcd4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Oct 2023 15:32:47 +0100 Subject: [PATCH 3/3] Correctly handle the different cases --- synapse/replication/tcp/resource.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 558728a73021..38abb5df54a5 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -204,13 +204,23 @@ async def _run_notifier_loop(self) -> None: # The token has advanced but there is no data to # send, so we send a `POSITION` to inform other # workers of the updated position. - - # We skip this for the `caches` stream as a) it - # generates a lot of traffic as every worker would - # echo each write, and b) nothing cares if a given - # worker's caches stream position lags. + # + # There are two reasons for this: 1) this instance + # requested a stream ID but didn't use it, or 2) + # this instance advanced its own stream position due + # to receiving notifications about other instances + # advancing their stream position. + + # We skip sending `POSITION` for the `caches` stream + # for the second case as a) it generates a lot of + # traffic as every worker would echo each write, and + # b) nothing cares if a given worker's caches stream + # position lags. if stream.NAME == CachesStream.NAME: - continue + # If there haven't been any writes since the + # `last_token` then we're in the second case. + if stream.minimal_local_current_token() <= last_token: + continue # Note: `last_token` may not *actually* be the # last token we sent out in a RDATA or POSITION.