Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove some run_in_background calls in replication code
Browse files Browse the repository at this point in the history
By running this stuff with `run_in_background`, it won't be correctly reported
against the relevant CPU usage stats.

Fixes #7202
  • Loading branch information
richvdh committed Apr 1, 2020
1 parent 60adcbe commit 5f4d114
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/7203.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix some worker-mode replication handling not being correctly recorded in CPU usage stats.
16 changes: 8 additions & 8 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
Expand Down Expand Up @@ -635,7 +635,7 @@ async def on_rdata(self, stream_name, token, rows):
await super(GenericWorkerReplicationHandler, self).on_rdata(
stream_name, token, rows
)
run_in_background(self.process_and_notify, stream_name, token, rows)
await self.process_and_notify(stream_name, token, rows)

def get_streams_to_replicate(self):
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
Expand All @@ -650,7 +650,9 @@ def get_currently_syncing_users(self):
async def process_and_notify(self, stream_name, token, rows):
try:
if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows)
await self.send_handler.process_replication_rows(
stream_name, token, rows
)

if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
Expand Down Expand Up @@ -782,22 +784,20 @@ def wake_destination(self, server: str):
def stream_positions(self):
return {"federation": self.federation_position}

def process_replication_rows(self, stream_name, token, rows):
async def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
run_in_background(self.update_token, token)
await self.update_token(token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
self.federation_sender.notify_new_events(token)

# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
"process_receipts_for_federation", self._on_new_receipts, rows
)
await self._on_new_receipts(rows)

# ... as well as device updates and messages
elif stream_name == DeviceListsStream.NAME:
Expand Down

0 comments on commit 5f4d114

Please sign in to comment.