diff --git a/changelog.d/11724.misc b/changelog.d/11724.misc
new file mode 100644
index 000000000000..e9d5dae857b2
--- /dev/null
+++ b/changelog.d/11724.misc
@@ -0,0 +1 @@
+Improve accuracy of `python_twisted_reactor_tick_time` prometheus metric.
diff --git a/changelog.d/11724.removal b/changelog.d/11724.removal
new file mode 100644
index 000000000000..088c3ff31ff8
--- /dev/null
+++ b/changelog.d/11724.removal
@@ -0,0 +1 @@
+Remove `python_twisted_reactor_pending_calls` prometheus metric.
diff --git a/contrib/prometheus/consoles/synapse.html b/contrib/prometheus/consoles/synapse.html
index cd9ad15231fc..d17c8a08d9e3 100644
--- a/contrib/prometheus/consoles/synapse.html
+++ b/contrib/prometheus/consoles/synapse.html
@@ -92,22 +92,6 @@
Average reactor tick time
})
-Pending calls per tick
-
-
-
Storage
Queries
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ba7ca0f2d4ed..9e6c1b2f3b54 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -12,15 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import functools
import itertools
import logging
import os
import platform
import threading
-import time
from typing import (
- Any,
Callable,
Dict,
Generic,
@@ -33,7 +30,6 @@
Type,
TypeVar,
Union,
- cast,
)
import attr
@@ -44,11 +40,9 @@
GaugeMetricFamily,
)
-from twisted.internet import reactor
-from twisted.internet.base import ReactorBase
from twisted.python.threadpool import ThreadPool
-import synapse
+import synapse.metrics._reactor_metrics
from synapse.metrics._exposition import (
MetricsResource,
generate_latest,
@@ -368,21 +362,6 @@ def collect(self) -> Iterable[Metric]:
REGISTRY.register(CPUMetrics())
-#
-# Twisted reactor metrics
-#
-
-tick_time = Histogram(
- "python_twisted_reactor_tick_time",
- "Tick time of the Twisted reactor (sec)",
- buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
-)
-pending_calls_metric = Histogram(
- "python_twisted_reactor_pending_calls",
- "Pending calls",
- buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
-)
-
#
# Federation Metrics
#
@@ -434,8 +413,6 @@ def collect(self) -> Iterable[Metric]:
" ".join([platform.system(), platform.release()]),
).set(1)
-last_ticked = time.time()
-
# 3PID send info
threepid_send_requests = Histogram(
"synapse_threepid_send_requests_with_tries",
@@ -483,75 +460,6 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None:
)
-class ReactorLastSeenMetric:
- def collect(self) -> Iterable[Metric]:
- cm = GaugeMetricFamily(
- "python_twisted_reactor_last_seen",
- "Seconds since the Twisted reactor was last seen",
- )
- cm.add_metric([], time.time() - last_ticked)
- yield cm
-
-
-REGISTRY.register(ReactorLastSeenMetric())
-
-F = TypeVar("F", bound=Callable[..., Any])
-
-
-def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
- @functools.wraps(func)
- def f(*args: Any, **kwargs: Any) -> Any:
- now = reactor.seconds()
- num_pending = 0
-
- # _newTimedCalls is one long list of *all* pending calls. Below loop
- # is based off of impl of reactor.runUntilCurrent
- for delayed_call in reactor._newTimedCalls:
- if delayed_call.time > now:
- break
-
- if delayed_call.delayed_time > 0:
- continue
-
- num_pending += 1
-
- num_pending += len(reactor.threadCallQueue)
- start = time.time()
- ret = func(*args, **kwargs)
- end = time.time()
-
- # record the amount of wallclock time spent running pending calls.
- # This is a proxy for the actual amount of time between reactor polls,
- # since about 25% of time is actually spent running things triggered by
- # I/O events, but that is harder to capture without rewriting half the
- # reactor.
- tick_time.observe(end - start)
- pending_calls_metric.observe(num_pending)
-
- # Update the time we last ticked, for the metric to test whether
- # Synapse's reactor has frozen
- global last_ticked
- last_ticked = end
-
- return ret
-
- return cast(F, f)
-
-
-try:
- # Ensure the reactor has all the attributes we expect
- reactor.seconds # type: ignore
- reactor.runUntilCurrent # type: ignore
- reactor._newTimedCalls # type: ignore
- reactor.threadCallQueue # type: ignore
-
- # runUntilCurrent is called when we have pending calls. It is called once
- # per iteratation after fd polling.
- reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore
-except AttributeError:
- pass
-
-
__all__ = [
"MetricsResource",
"generate_latest",
diff --git a/synapse/metrics/_reactor_metrics.py b/synapse/metrics/_reactor_metrics.py
new file mode 100644
index 000000000000..ce0688621c37
--- /dev/null
+++ b/synapse/metrics/_reactor_metrics.py
@@ -0,0 +1,83 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import select
+import time
+from typing import Any, Iterable, List, Tuple
+
+from prometheus_client import Histogram, Metric
+from prometheus_client.core import REGISTRY, GaugeMetricFamily
+
+from twisted.internet import reactor
+
+#
+# Twisted reactor metrics
+#
+
+tick_time = Histogram(
+ "python_twisted_reactor_tick_time",
+ "Tick time of the Twisted reactor (sec)",
+ buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
+)
+
+
+class EpollWrapper:
+ """a wrapper for an epoll object which records the time between polls"""
+
+ def __init__(self, poller: "select.epoll"):
+ self.last_polled = time.time()
+ self._poller = poller
+
+ def poll(self, *args, **kwargs) -> List[Tuple[int, int]]: # type: ignore[no-untyped-def]
+ # record the time since poll() was last called. This gives a good proxy for
+ # how long it takes to run everything in the reactor - ie, how long anything
+ # waiting for the next tick will have to wait.
+ tick_time.observe(time.time() - self.last_polled)
+
+ ret = self._poller.poll(*args, **kwargs)
+
+ self.last_polled = time.time()
+ return ret
+
+ def __getattr__(self, item: str) -> Any:
+ return getattr(self._poller, item)
+
+
+class ReactorLastSeenMetric:
+ def __init__(self, epoll_wrapper: EpollWrapper):
+ self._epoll_wrapper = epoll_wrapper
+
+ def collect(self) -> Iterable[Metric]:
+ cm = GaugeMetricFamily(
+ "python_twisted_reactor_last_seen",
+ "Seconds since the Twisted reactor was last seen",
+ )
+ cm.add_metric([], time.time() - self._epoll_wrapper.last_polled)
+ yield cm
+
+
+try:
+ # if the reactor has a `_poller` attribute, which is an `epoll` object
+ # (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will
+ # measure the time between ticks
+ from select import epoll
+
+ poller = reactor._poller # type: ignore[attr-defined]
+except (AttributeError, ImportError):
+ pass
+else:
+ if isinstance(poller, epoll):
+ poller = EpollWrapper(poller)
+ reactor._poller = poller # type: ignore[attr-defined]
+ REGISTRY.register(ReactorLastSeenMetric(poller))