From f7afd2533c4f207a5b21f56769276b5431ecd835 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 26 May 2023 15:38:47 +0200 Subject: [PATCH] ref: Global metrics registry (#245) --- arroyo/processing/processor.py | 50 +++++++++++++++---------------- arroyo/utils/metric_defs.py | 51 ++++++++++++++++++++++++++++++++ arroyo/utils/metrics.py | 27 ++++++++++++----- docs-requirements.txt | 1 - docs/source/conf.py | 5 +++- docs/source/metrics.rst | 37 +++++++++-------------- docs/source/strategies/index.rst | 5 ++++ tests/metrics.py | 17 ++++++----- tests/utils/test_metrics.py | 4 +-- 9 files changed, 130 insertions(+), 67 deletions(-) create mode 100644 arroyo/utils/metric_defs.py diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 2323491b..02d04fe1 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -4,11 +4,11 @@ import logging import time from collections import defaultdict -from enum import Enum from typing import ( Any, Callable, Generic, + Literal, Mapping, MutableMapping, Optional, @@ -51,7 +51,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: raise finally: metrics.incr_timing( - ConsumerTiming.CONSUMER_CALLBACK_TIME, time.time() - start_time + "arroyo.consumer.callback.time", time.time() - start_time ) return cast(F, wrapper) @@ -63,20 +63,18 @@ class InvalidStateError(RuntimeError): pass -class ConsumerTiming(Enum): - CONSUMER_POLL_TIME = "arroyo.consumer.poll.time" - CONSUMER_PROCESSING_TIME = "arroyo.consumer.processing.time" - CONSUMER_PAUSED_TIME = "arroyo.consumer.paused.time" - CONSUMER_DLQ_TIME = "arroyo.consumer.dlq.time" - CONSUMER_JOIN_TIME = "arroyo.consumer.join.time" - +ConsumerTiming = Literal[ + "arroyo.consumer.poll.time", + "arroyo.consumer.processing.time", + "arroyo.consumer.paused.time", + "arroyo.consumer.dlq.time", + "arroyo.consumer.join.time", # This metric's timings overlap with DLQ/join time. - CONSUMER_CALLBACK_TIME = "arroyo.consumer.callback.time" - CONSUMER_SHUTDOWN_TIME = "arroyo.consumer.shutdown.time" - + "arroyo.consumer.callback.time", + "arroyo.consumer.shutdown.time", +] -class ConsumerCounter(Enum): - CONSUMER_RUN_COUNT = "arroyo.consumer.run.count" +ConsumerCounter = Literal["arroyo.consumer.run.count"] class MetricsBuffer: @@ -99,9 +97,9 @@ def flush(self) -> None: value: Union[float, int] for metric, value in self.__timers.items(): - self.__metrics.timing(metric.value, value) + self.__metrics.timing(metric, value) for metric, value in self.__counters.items(): - self.__metrics.increment(metric.value, value) + self.__metrics.increment(metric, value) self.__reset() def __reset(self) -> None: @@ -178,12 +176,12 @@ def _close_strategy() -> None: try: self.__processing_strategy.join(self.__join_timeout) self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join + "arroyo.consumer.join.time", time.time() - start_join ) break except InvalidMessage as e: self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join + "arroyo.consumer.join.time", time.time() - start_join ) self._handle_invalid_message(e) @@ -195,7 +193,7 @@ def _close_strategy() -> None: self.__message = None # avoid leaking buffered messages across assignments self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_SHUTDOWN_TIME, time.time() - start_close + "arroyo.consumer.shutdown.time", time.time() - start_close ) def _create_strategy(partitions: Mapping[Partition, int]) -> None: @@ -326,11 +324,11 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: self.__dlq_policy.produce(invalid_message) self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_DLQ_TIME, time.time() - start_dlq + "arroyo.consumer.dlq.time", time.time() - start_dlq ) def _run_once(self) -> None: - self.__metrics_buffer.incr_counter(ConsumerCounter.CONSUMER_RUN_COUNT, 1) + self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) message_carried_over = self.__message is not None @@ -356,7 +354,7 @@ def _run_once(self) -> None: start_poll = time.time() self.__message = self.__consumer.poll(timeout=1.0) self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_POLL_TIME, time.time() - start_poll + "arroyo.consumer.poll.time", time.time() - start_poll ) except RecoverableError: return @@ -370,7 +368,7 @@ def _run_once(self) -> None: return self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_poll + "arroyo.consumer.processing.time", time.time() - start_poll ) if self.__message is not None: try: @@ -383,7 +381,7 @@ def _run_once(self) -> None: self.__processing_strategy.submit(message) self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PROCESSING_TIME, + "arroyo.consumer.processing.time", time.time() - start_submit, ) except MessageRejected as e: @@ -403,7 +401,7 @@ def _run_once(self) -> None: current_time = time.time() if self.__paused_timestamp: self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PAUSED_TIME, + "arroyo.consumer.paused.time", current_time - self.__paused_timestamp, ) self.__paused_timestamp = current_time @@ -418,7 +416,7 @@ def _run_once(self) -> None: self.__consumer.resume([*self.__consumer.tell().keys()]) self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PAUSED_TIME, + "arroyo.consumer.paused.time", time.time() - self.__paused_timestamp, ) diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py new file mode 100644 index 00000000..5e5c0c2d --- /dev/null +++ b/arroyo/utils/metric_defs.py @@ -0,0 +1,51 @@ +from typing import Literal + +MetricName = Literal[ + # Number of messages in a multiprocessing batch + "batch.size.msg", + # Number of bytes in a multiprocessing batch + "batch.size.bytes", + # Number of times the consumer is spinning + "arroyo.consumer.run.count", + # How long it took the Reduce step to fill up a batch + "arroyo.strategies.reduce.batch_time", + # Counter, incremented when a strategy after multiprocessing applies + # backpressure to multiprocessing. May be a reason why CPU cannot be + # saturated. + "arroyo.strategies.run_task_with_multiprocessing.batch.backpressure", + # Counter, incremented when multiprocessing cannot fill the input batch + # because not enough memory was allocated. This results in batches smaller + # than configured. Increase `input_block_size` to fix. + "arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow", + # Counter, incremented when multiprocessing cannot pull results in batches + # equal to the input batch size, because not enough memory was allocated. + # This can be devastating for throughput. Increase `output_block_size` to + # fix. + "arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow", + # How many batches are being processed in parallel by multiprocessing. + "batches_in_progress", + # Counter. A subprocess by multiprocessing unexpectedly died. + "sigchld.detected", + # Gauge. Shows how many processes the multiprocessing strategy is + # configured with. + "transform.processes", + # Time (unitless) spent polling librdkafka for new messages. + "arroyo.consumer.poll.time", + # Time (unitless) spent in strategies (blocking in strategy.submit or + # strategy.poll) + "arroyo.consumer.processing.time", + # Time (unitless) spent pausing the consumer due to backpressure (MessageRejected) + "arroyo.consumer.paused.time", + # Time (unitless) spent in handling `InvalidMessage` exceptions and sending + # messages to the the DLQ. + "arroyo.consumer.dlq.time", + # Time (unitless) spent in waiting for the strategy to exit, such as during + # shutdown or rebalancing. + "arroyo.consumer.join.time", + # Time (unitless) spent in librdkafka callbacks. This metric's timings + # overlap other timings, and might spike at the same time. + "arroyo.consumer.callback.time", + # Time (unitless) spent in shutting down the consumer. This metric's + # timings overlap other timings, and might spike at the same time. + "arroyo.consumer.shutdown.time", +] diff --git a/arroyo/utils/metrics.py b/arroyo/utils/metrics.py index ec783e55..2c4932e9 100644 --- a/arroyo/utils/metrics.py +++ b/arroyo/utils/metrics.py @@ -1,6 +1,10 @@ +from __future__ import annotations + from abc import abstractmethod from typing import Any, Mapping, Optional, Protocol, Union, runtime_checkable +from arroyo.utils.metric_defs import MetricName + Tags = Mapping[str, str] @@ -12,7 +16,10 @@ class Metrics(Protocol): @abstractmethod def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: MetricName, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, ) -> None: """ Increments a counter metric by a given value. @@ -21,7 +28,7 @@ def increment( @abstractmethod def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: """ Sets a gauge metric to the given value. @@ -30,7 +37,7 @@ def gauge( @abstractmethod def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: """ Records a timing metric. @@ -44,17 +51,20 @@ class DummyMetricsBackend(Metrics): """ def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: MetricName, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, ) -> None: pass def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: pass def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: pass @@ -63,7 +73,7 @@ class Gauge: def __init__( self, metrics: Metrics, - name: str, + name: MetricName, tags: Optional[Tags] = None, ) -> None: self.__metrics = metrics @@ -119,3 +129,6 @@ def get_metrics() -> Metrics: if _metrics_backend is None: return _dummy_metrics_backend return _metrics_backend + + +__all__ = ["configure_metrics", "Metrics", "MetricName"] diff --git a/docs-requirements.txt b/docs-requirements.txt index 171e9346..cec72aac 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -1,5 +1,4 @@ sphinx>=5.3 sphinxcontrib-mermaid==0.8.1 shibuya -sphinx-autodoc-typehints[type-comment]>=1.19.3 typing-extensions diff --git a/docs/source/conf.py b/docs/source/conf.py index 2982a4dd..fdc9dbe5 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -19,7 +19,6 @@ "sphinx.ext.intersphinx", "sphinxcontrib.mermaid", "sphinx.ext.autodoc", - "sphinx_autodoc_typehints", ] always_document_param_types = True @@ -48,3 +47,7 @@ # html_logo = "_static/arroyo.png" autodoc_inherit_docstrings = False + +autodoc_type_aliases = { + "MetricName": "MetricName", +} diff --git a/docs/source/metrics.rst b/docs/source/metrics.rst index 8007efb3..94011dbe 100644 --- a/docs/source/metrics.rst +++ b/docs/source/metrics.rst @@ -14,48 +14,39 @@ This can be done like so: .. code:: python - class Metrics: + from arroyo.utils.metrics import Metrics, MetricName + + class MyMetrics(Metrics): def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float] = 1, tags: Optional[Tags] = None ) -> None: # Increment a counter by the given value. record_incr(name, value, tags) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: # Sets a gauge metric to the given value. record_gauge(name, value, tags) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: # Emit a timing metric with the given value. record_timing(name, value, tags) - metrics_backend = Metrics() + metrics_backend = MyMetrics() configure_metrics(metrics_backend) -Some of the metrics emitted by Arroyo include: - -.. list-table:: Metrics - :widths: 25 25 50 - :header-rows: 1 - - * - Metric name - - Type - - Description - * - arroyo.consumer.poll.time - - Timing - - Time spent polling for messages. A higher number means the consumer has headroom as it is waiting for messages to arrive. - * - arroyo.consumer.processing.time - - Timing - - Time spent processing messages. A higher number means the consumer spends more time processing messages. - * - arroyo.consumer.paused.time - - Timing - - Time spent in backpressure. Usually means there could be some processing bottleneck. +Available Metrics +==================== + +.. literalinclude:: ../../arroyo/utils/metric_defs.py + +API +======= .. automodule:: arroyo.utils.metrics :members: diff --git a/docs/source/strategies/index.rst b/docs/source/strategies/index.rst index 2cffedfb..1350690c 100644 --- a/docs/source/strategies/index.rst +++ b/docs/source/strategies/index.rst @@ -16,7 +16,12 @@ Nevertheless, all arroyo strategies are written against the following interface: :undoc-members: :show-inheritance: +Messages +------------ +.. automodule:: arroyo.types + :members: + :undoc-members: .. toctree:: :hidden: diff --git a/tests/metrics.py b/tests/metrics.py index 72d6a3fe..b8bfde34 100644 --- a/tests/metrics.py +++ b/tests/metrics.py @@ -1,22 +1,22 @@ from typing import MutableSequence, NamedTuple, Optional, Union -from arroyo.utils.metrics import Metrics, Tags +from arroyo.utils.metrics import MetricName, Metrics, Tags class Increment(NamedTuple): - name: str + name: MetricName value: Union[int, float] tags: Optional[Tags] class Gauge(NamedTuple): - name: str + name: MetricName value: Union[int, float] tags: Optional[Tags] class Timing(NamedTuple): - name: str + name: MetricName value: Union[int, float] tags: Optional[Tags] @@ -32,17 +32,20 @@ def __init__(self) -> None: self.calls: MutableSequence[Union[Increment, Gauge, Timing]] = [] def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: MetricName, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, ) -> None: self.calls.append(Increment(name, value, tags)) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: self.calls.append(Gauge(name, value, tags)) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None ) -> None: self.calls.append(Timing(name, value, tags)) diff --git a/tests/utils/test_metrics.py b/tests/utils/test_metrics.py index f7b371de..6df33320 100644 --- a/tests/utils/test_metrics.py +++ b/tests/utils/test_metrics.py @@ -1,6 +1,6 @@ import pytest -from arroyo.utils.metrics import Gauge, configure_metrics, get_metrics +from arroyo.utils.metrics import Gauge, MetricName, configure_metrics, get_metrics from tests.metrics import Gauge as GaugeCall from tests.metrics import TestingMetricsBackend @@ -8,7 +8,7 @@ def test_gauge_simple() -> None: backend = TestingMetricsBackend - name = "name" + name: MetricName = "name" # type: ignore tags = {"tag": "value"} gauge = Gauge(backend, name, tags)