Skip to content

Commit

Permalink
ref: Global metrics registry (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored May 26, 2023
1 parent de09257 commit f7afd25
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 67 deletions.
50 changes: 24 additions & 26 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import logging
import time
from collections import defaultdict
from enum import Enum
from typing import (
Any,
Callable,
Generic,
Literal,
Mapping,
MutableMapping,
Optional,
Expand Down Expand Up @@ -51,7 +51,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
raise
finally:
metrics.incr_timing(
ConsumerTiming.CONSUMER_CALLBACK_TIME, time.time() - start_time
"arroyo.consumer.callback.time", time.time() - start_time
)

return cast(F, wrapper)
Expand All @@ -63,20 +63,18 @@ class InvalidStateError(RuntimeError):
pass


class ConsumerTiming(Enum):
CONSUMER_POLL_TIME = "arroyo.consumer.poll.time"
CONSUMER_PROCESSING_TIME = "arroyo.consumer.processing.time"
CONSUMER_PAUSED_TIME = "arroyo.consumer.paused.time"
CONSUMER_DLQ_TIME = "arroyo.consumer.dlq.time"
CONSUMER_JOIN_TIME = "arroyo.consumer.join.time"

ConsumerTiming = Literal[
"arroyo.consumer.poll.time",
"arroyo.consumer.processing.time",
"arroyo.consumer.paused.time",
"arroyo.consumer.dlq.time",
"arroyo.consumer.join.time",
# This metric's timings overlap with DLQ/join time.
CONSUMER_CALLBACK_TIME = "arroyo.consumer.callback.time"
CONSUMER_SHUTDOWN_TIME = "arroyo.consumer.shutdown.time"

"arroyo.consumer.callback.time",
"arroyo.consumer.shutdown.time",
]

class ConsumerCounter(Enum):
CONSUMER_RUN_COUNT = "arroyo.consumer.run.count"
ConsumerCounter = Literal["arroyo.consumer.run.count"]


class MetricsBuffer:
Expand All @@ -99,9 +97,9 @@ def flush(self) -> None:
value: Union[float, int]

for metric, value in self.__timers.items():
self.__metrics.timing(metric.value, value)
self.__metrics.timing(metric, value)
for metric, value in self.__counters.items():
self.__metrics.increment(metric.value, value)
self.__metrics.increment(metric, value)
self.__reset()

def __reset(self) -> None:
Expand Down Expand Up @@ -178,12 +176,12 @@ def _close_strategy() -> None:
try:
self.__processing_strategy.join(self.__join_timeout)
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join
"arroyo.consumer.join.time", time.time() - start_join
)
break
except InvalidMessage as e:
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join
"arroyo.consumer.join.time", time.time() - start_join
)
self._handle_invalid_message(e)

Expand All @@ -195,7 +193,7 @@ def _close_strategy() -> None:
self.__message = None # avoid leaking buffered messages across assignments

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_SHUTDOWN_TIME, time.time() - start_close
"arroyo.consumer.shutdown.time", time.time() - start_close
)

def _create_strategy(partitions: Mapping[Partition, int]) -> None:
Expand Down Expand Up @@ -326,11 +324,11 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None:
self.__dlq_policy.produce(invalid_message)

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_DLQ_TIME, time.time() - start_dlq
"arroyo.consumer.dlq.time", time.time() - start_dlq
)

def _run_once(self) -> None:
self.__metrics_buffer.incr_counter(ConsumerCounter.CONSUMER_RUN_COUNT, 1)
self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1)

message_carried_over = self.__message is not None

Expand All @@ -356,7 +354,7 @@ def _run_once(self) -> None:
start_poll = time.time()
self.__message = self.__consumer.poll(timeout=1.0)
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_POLL_TIME, time.time() - start_poll
"arroyo.consumer.poll.time", time.time() - start_poll
)
except RecoverableError:
return
Expand All @@ -370,7 +368,7 @@ def _run_once(self) -> None:
return

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_poll
"arroyo.consumer.processing.time", time.time() - start_poll
)
if self.__message is not None:
try:
Expand All @@ -383,7 +381,7 @@ def _run_once(self) -> None:
self.__processing_strategy.submit(message)

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME,
"arroyo.consumer.processing.time",
time.time() - start_submit,
)
except MessageRejected as e:
Expand All @@ -403,7 +401,7 @@ def _run_once(self) -> None:
current_time = time.time()
if self.__paused_timestamp:
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PAUSED_TIME,
"arroyo.consumer.paused.time",
current_time - self.__paused_timestamp,
)
self.__paused_timestamp = current_time
Expand All @@ -418,7 +416,7 @@ def _run_once(self) -> None:
self.__consumer.resume([*self.__consumer.tell().keys()])

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PAUSED_TIME,
"arroyo.consumer.paused.time",
time.time() - self.__paused_timestamp,
)

Expand Down
51 changes: 51 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Literal

MetricName = Literal[
# Number of messages in a multiprocessing batch
"batch.size.msg",
# Number of bytes in a multiprocessing batch
"batch.size.bytes",
# Number of times the consumer is spinning
"arroyo.consumer.run.count",
# How long it took the Reduce step to fill up a batch
"arroyo.strategies.reduce.batch_time",
# Counter, incremented when a strategy after multiprocessing applies
# backpressure to multiprocessing. May be a reason why CPU cannot be
# saturated.
"arroyo.strategies.run_task_with_multiprocessing.batch.backpressure",
# Counter, incremented when multiprocessing cannot fill the input batch
# because not enough memory was allocated. This results in batches smaller
# than configured. Increase `input_block_size` to fix.
"arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow",
# Counter, incremented when multiprocessing cannot pull results in batches
# equal to the input batch size, because not enough memory was allocated.
# This can be devastating for throughput. Increase `output_block_size` to
# fix.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow",
# How many batches are being processed in parallel by multiprocessing.
"batches_in_progress",
# Counter. A subprocess by multiprocessing unexpectedly died.
"sigchld.detected",
# Gauge. Shows how many processes the multiprocessing strategy is
# configured with.
"transform.processes",
# Time (unitless) spent polling librdkafka for new messages.
"arroyo.consumer.poll.time",
# Time (unitless) spent in strategies (blocking in strategy.submit or
# strategy.poll)
"arroyo.consumer.processing.time",
# Time (unitless) spent pausing the consumer due to backpressure (MessageRejected)
"arroyo.consumer.paused.time",
# Time (unitless) spent in handling `InvalidMessage` exceptions and sending
# messages to the the DLQ.
"arroyo.consumer.dlq.time",
# Time (unitless) spent in waiting for the strategy to exit, such as during
# shutdown or rebalancing.
"arroyo.consumer.join.time",
# Time (unitless) spent in librdkafka callbacks. This metric's timings
# overlap other timings, and might spike at the same time.
"arroyo.consumer.callback.time",
# Time (unitless) spent in shutting down the consumer. This metric's
# timings overlap other timings, and might spike at the same time.
"arroyo.consumer.shutdown.time",
]
27 changes: 20 additions & 7 deletions arroyo/utils/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Any, Mapping, Optional, Protocol, Union, runtime_checkable

from arroyo.utils.metric_defs import MetricName

Tags = Mapping[str, str]


Expand All @@ -12,7 +16,10 @@ class Metrics(Protocol):

@abstractmethod
def increment(
self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None
self,
name: MetricName,
value: Union[int, float] = 1,
tags: Optional[Tags] = None,
) -> None:
"""
Increments a counter metric by a given value.
Expand All @@ -21,7 +28,7 @@ def increment(

@abstractmethod
def gauge(
self, name: str, value: Union[int, float], tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
) -> None:
"""
Sets a gauge metric to the given value.
Expand All @@ -30,7 +37,7 @@ def gauge(

@abstractmethod
def timing(
self, name: str, value: Union[int, float], tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
) -> None:
"""
Records a timing metric.
Expand All @@ -44,17 +51,20 @@ class DummyMetricsBackend(Metrics):
"""

def increment(
self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None
self,
name: MetricName,
value: Union[int, float] = 1,
tags: Optional[Tags] = None,
) -> None:
pass

def gauge(
self, name: str, value: Union[int, float], tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
) -> None:
pass

def timing(
self, name: str, value: Union[int, float], tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
) -> None:
pass

Expand All @@ -63,7 +73,7 @@ class Gauge:
def __init__(
self,
metrics: Metrics,
name: str,
name: MetricName,
tags: Optional[Tags] = None,
) -> None:
self.__metrics = metrics
Expand Down Expand Up @@ -119,3 +129,6 @@ def get_metrics() -> Metrics:
if _metrics_backend is None:
return _dummy_metrics_backend
return _metrics_backend


__all__ = ["configure_metrics", "Metrics", "MetricName"]
1 change: 0 additions & 1 deletion docs-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
sphinx>=5.3
sphinxcontrib-mermaid==0.8.1
shibuya
sphinx-autodoc-typehints[type-comment]>=1.19.3
typing-extensions
5 changes: 4 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"sphinx.ext.intersphinx",
"sphinxcontrib.mermaid",
"sphinx.ext.autodoc",
"sphinx_autodoc_typehints",
]

always_document_param_types = True
Expand Down Expand Up @@ -48,3 +47,7 @@
# html_logo = "_static/arroyo.png"

autodoc_inherit_docstrings = False

autodoc_type_aliases = {
"MetricName": "MetricName",
}
37 changes: 14 additions & 23 deletions docs/source/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,39 @@ This can be done like so:

.. code:: python
class Metrics:
from arroyo.utils.metrics import Metrics, MetricName
class MyMetrics(Metrics):
def increment(
self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float] = 1, tags: Optional[Tags] = None
) -> None:
# Increment a counter by the given value.
record_incr(name, value, tags)
def gauge(
self, name: str, value: Union[int, float], tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
) -> None:
# Sets a gauge metric to the given value.
record_gauge(name, value, tags)
def timing(
self, name: str, value: Union[int, float], tags: Optional[Tags] = None
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
) -> None:
# Emit a timing metric with the given value.
record_timing(name, value, tags)
metrics_backend = Metrics()
metrics_backend = MyMetrics()
configure_metrics(metrics_backend)
Some of the metrics emitted by Arroyo include:

.. list-table:: Metrics
:widths: 25 25 50
:header-rows: 1

* - Metric name
- Type
- Description
* - arroyo.consumer.poll.time
- Timing
- Time spent polling for messages. A higher number means the consumer has headroom as it is waiting for messages to arrive.
* - arroyo.consumer.processing.time
- Timing
- Time spent processing messages. A higher number means the consumer spends more time processing messages.
* - arroyo.consumer.paused.time
- Timing
- Time spent in backpressure. Usually means there could be some processing bottleneck.
Available Metrics
====================

.. literalinclude:: ../../arroyo/utils/metric_defs.py

API
=======

.. automodule:: arroyo.utils.metrics
:members:
Expand Down
5 changes: 5 additions & 0 deletions docs/source/strategies/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ Nevertheless, all arroyo strategies are written against the following interface:
:undoc-members:
:show-inheritance:

Messages
------------

.. automodule:: arroyo.types
:members:
:undoc-members:

.. toctree::
:hidden:
Expand Down
Loading

0 comments on commit f7afd25

Please sign in to comment.