From f1f69a47002baed3694a5dd3a2946b560dbb9a08 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 17 May 2023 21:38:02 +0200 Subject: [PATCH] feat: add more item size stats to multiprocessing (and metrics refactor) (#235) --- arroyo/processing/processor.py | 99 +++++----------- .../run_task_with_multiprocessing.py | 36 +++++- arroyo/utils/metrics_buffer.py | 112 ++++++++++++++++++ tests/processing/test_processor.py | 12 +- tests/utils/test_metrics_buffer.py | 13 ++ 5 files changed, 189 insertions(+), 83 deletions(-) create mode 100644 arroyo/utils/metrics_buffer.py create mode 100644 tests/utils/test_metrics_buffer.py diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 2323491b..91c93049 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -3,20 +3,8 @@ import functools import logging import time -from collections import defaultdict from enum import Enum -from typing import ( - Any, - Callable, - Generic, - Mapping, - MutableMapping, - Optional, - Sequence, - TypeVar, - Union, - cast, -) +from typing import Any, Callable, Generic, Mapping, Optional, Sequence, TypeVar, cast from arroyo.backends.abstract import Consumer from arroyo.commit import CommitPolicy @@ -30,11 +18,10 @@ from arroyo.types import BrokerValue, Message, Partition, Topic, TStrategyPayload from arroyo.utils.logging import handle_internal_error from arroyo.utils.metrics import get_metrics +from arroyo.utils.metrics_buffer import MetricsBuffer logger = logging.getLogger(__name__) -METRICS_FREQUENCY_SEC = 1.0 # In seconds - F = TypeVar("F", bound=Callable[[Any], Any]) @@ -50,8 +37,9 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: logger.exception(f"{f.__name__} crashed") raise finally: - metrics.incr_timing( - ConsumerTiming.CONSUMER_CALLBACK_TIME, time.time() - start_time + metrics.timing( + ConsumerTiming.CONSUMER_CALLBACK_TIME.value, + time.time() - start_time, ) return cast(F, wrapper) @@ -79,41 +67,6 @@ class ConsumerCounter(Enum): CONSUMER_RUN_COUNT = "arroyo.consumer.run.count" -class MetricsBuffer: - def __init__(self) -> None: - self.__metrics = get_metrics() - self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float) - self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int) - self.__reset() - - def incr_timing(self, metric: ConsumerTiming, duration: float) -> None: - self.__timers[metric] += duration - self.__throttled_record() - - def incr_counter(self, metric: ConsumerCounter, delta: int) -> None: - self.__counters[metric] += delta - self.__throttled_record() - - def flush(self) -> None: - metric: Union[ConsumerTiming, ConsumerCounter] - value: Union[float, int] - - for metric, value in self.__timers.items(): - self.__metrics.timing(metric.value, value) - for metric, value in self.__counters.items(): - self.__metrics.increment(metric.value, value) - self.__reset() - - def __reset(self) -> None: - self.__timers.clear() - self.__counters.clear() - self.__last_record_time = time.time() - - def __throttled_record(self) -> None: - if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC: - self.flush() - - class StreamProcessor(Generic[TStrategyPayload]): """ A stream processor manages the relationship between a ``Consumer`` @@ -133,7 +86,7 @@ def __init__( ) -> None: self.__consumer = consumer self.__processor_factory = processor_factory - self.__metrics_buffer = MetricsBuffer() + self.__metrics_buffer = MetricsBuffer(get_metrics(), record_timers_sum=True) self.__processing_strategy: Optional[ ProcessingStrategy[TStrategyPayload] @@ -177,13 +130,15 @@ def _close_strategy() -> None: try: self.__processing_strategy.join(self.__join_timeout) - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_JOIN_TIME.value, + time.time() - start_join, ) break except InvalidMessage as e: - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_JOIN_TIME.value, + time.time() - start_join, ) self._handle_invalid_message(e) @@ -194,8 +149,8 @@ def _close_strategy() -> None: self.__processing_strategy = None self.__message = None # avoid leaking buffered messages across assignments - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_SHUTDOWN_TIME, time.time() - start_close + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_SHUTDOWN_TIME.value, time.time() - start_close ) def _create_strategy(partitions: Mapping[Partition, int]) -> None: @@ -325,12 +280,12 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: # XXX: This blocks if there are more than MAX_PENDING_FUTURES in the queue. self.__dlq_policy.produce(invalid_message) - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_DLQ_TIME, time.time() - start_dlq + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_DLQ_TIME.value, time.time() - start_dlq ) def _run_once(self) -> None: - self.__metrics_buffer.incr_counter(ConsumerCounter.CONSUMER_RUN_COUNT, 1) + self.__metrics_buffer.increment(ConsumerCounter.CONSUMER_RUN_COUNT.value, 1) message_carried_over = self.__message is not None @@ -355,8 +310,8 @@ def _run_once(self) -> None: try: start_poll = time.time() self.__message = self.__consumer.poll(timeout=1.0) - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_POLL_TIME, time.time() - start_poll + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_POLL_TIME.value, time.time() - start_poll ) except RecoverableError: return @@ -369,8 +324,8 @@ def _run_once(self) -> None: self._handle_invalid_message(e) return - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_poll + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_PROCESSING_TIME.value, time.time() - start_poll ) if self.__message is not None: try: @@ -382,8 +337,8 @@ def _run_once(self) -> None: self.__buffered_messages.append(self.__message) self.__processing_strategy.submit(message) - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PROCESSING_TIME, + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_PROCESSING_TIME.value, time.time() - start_submit, ) except MessageRejected as e: @@ -402,8 +357,8 @@ def _run_once(self) -> None: else: current_time = time.time() if self.__paused_timestamp: - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PAUSED_TIME, + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_PAUSED_TIME.value, current_time - self.__paused_timestamp, ) self.__paused_timestamp = current_time @@ -417,8 +372,8 @@ def _run_once(self) -> None: if message_carried_over and self.__paused_timestamp is not None: self.__consumer.resume([*self.__consumer.tell().keys()]) - self.__metrics_buffer.incr_timing( - ConsumerTiming.CONSUMER_PAUSED_TIME, + self.__metrics_buffer.timing( + ConsumerTiming.CONSUMER_PAUSED_TIME.value, time.time() - self.__paused_timestamp, ) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 9bf487f3..d21a07a3 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -29,6 +29,7 @@ from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy from arroyo.types import FilteredPayload, Message, TStrategyPayload from arroyo.utils.metrics import Gauge, get_metrics +from arroyo.utils.metrics_buffer import MetricsBuffer logger = logging.getLogger(__name__) @@ -56,6 +57,10 @@ class ValueTooLarge(ValueError): """ +# Item size in bytes (inline pickle + space taken in shm buffer) +ItemStats = Tuple[int, int] + + class MessageBatch(Generic[TBatchValue]): """ Contains a sequence of ``Message`` instances that are intended to be @@ -125,7 +130,7 @@ def __iter__(self) -> Iterator[Tuple[int, TBatchValue]]: def reset_iterator(self, iter_offset: int) -> None: self.__iter_offset = iter_offset - def append(self, message: TBatchValue) -> None: + def append(self, message: TBatchValue) -> ItemStats: """ Add a message to this batch. @@ -139,7 +144,11 @@ def append(self, message: TBatchValue) -> None: """ buffers: MutableSequence[Tuple[int, int]] = [] + buffer_data_len = 0 + def buffer_callback(buffer: PickleBuffer) -> None: + nonlocal buffer_data_len + value = buffer.raw() offset = self.__offset length = len(value) @@ -149,12 +158,14 @@ def buffer_callback(buffer: PickleBuffer) -> None: f"bytes needed but {self.block.size - offset} bytes free" ) self.block.buf[offset : offset + length] = value - self.__offset += length + buffer_data_len += length buffers.append((offset, length)) data = pickle.dumps(message, protocol=5, buffer_callback=buffer_callback) + self.__offset += buffer_data_len self.__items.append((data, buffers)) + return len(data), buffer_data_len class BatchBuilder(Generic[TBatchValue]): @@ -171,8 +182,8 @@ def __init__( def __len__(self) -> int: return len(self.__batch) - def append(self, message: TBatchValue) -> None: - self.__batch.append(message) + def append(self, message: TBatchValue) -> ItemStats: + return self.__batch.append(message) def ready(self) -> bool: if len(self.__batch) >= self.__max_batch_size: @@ -367,6 +378,12 @@ def __init__( self.__invalid_messages = InvalidMessageState() self.__metrics = get_metrics() + self.__metrics_buffer = MetricsBuffer( + self.__metrics, + record_timers_avg=True, + record_timers_max=True, + record_timers_min=True, + ) self.__batches_in_progress = Gauge(self.__metrics, "batches_in_progress") self.__pool_waiting_time: Optional[float] = None self.__metrics.gauge("transform.processes", num_processes) @@ -533,7 +550,7 @@ def submit( assert self.__batch_builder is not None try: - self.__batch_builder.append(message) + pickle_bytes, buffer_bytes = self.__batch_builder.append(message) except ValueTooLarge as e: logger.debug("Caught %r, closing batch and retrying...", e) self.__metrics.increment( @@ -550,6 +567,15 @@ def submit( # size is too small (smaller than the Kafka payload limit without # compression.) self.__batch_builder.append(message) + else: + self.__metrics_buffer.timing( + "arroyo.strategies.run_task_with_multiprocessing.item.pickle_bytes", + pickle_bytes, + ) + self.__metrics_buffer.timing( + "arroyo.strategies.run_task_with_multiprocessing.item.buffer_bytes", + buffer_bytes, + ) def close(self) -> None: self.__closed = True diff --git a/arroyo/utils/metrics_buffer.py b/arroyo/utils/metrics_buffer.py new file mode 100644 index 00000000..21f62fb7 --- /dev/null +++ b/arroyo/utils/metrics_buffer.py @@ -0,0 +1,112 @@ +import time +from collections import defaultdict +from typing import MutableMapping, Optional, Union + +from arroyo.utils.metrics import Metrics, Tags + +METRICS_FREQUENCY_SEC = 1.0 # In seconds + + +class MetricsBuffer: + """ + A buffer for metrics, preaggregating them before they are sent to the + underlying metrics backend. + + Should only be used in performance sensitive code if there's no other + option. Buffering is lossy. + + Buffer is flushed every second at least. + """ + + def __init__( + self, + metrics: Metrics, + tags: Optional[Tags] = None, + record_timers_min: bool = False, + record_timers_max: bool = False, + record_timers_avg: bool = False, + record_timers_count: bool = False, + record_timers_sum: bool = False, + ) -> None: + self.__metrics = metrics + self.__tags = tags + + self.__record_timers_min = record_timers_min + self.__record_timers_max = record_timers_max + self.__record_timers_avg = record_timers_avg + self.__record_timers_count = record_timers_count + self.__record_timers_sum = record_timers_sum + + self.__timers_max: MutableMapping[str, float] = defaultdict(float) + self.__timers_min: MutableMapping[str, float] = defaultdict(float) + self.__timers_sum: MutableMapping[str, float] = defaultdict(float) + self.__timers_count: MutableMapping[str, float] = defaultdict(float) + self.__counters: MutableMapping[str, int] = defaultdict(int) + self.__gauges: MutableMapping[str, Union[float, int]] = defaultdict(int) + self.__reset() + + def timing(self, metric: str, duration: float, tags: Optional[Tags] = None) -> None: + self.__timers_max[metric] = max(self.__timers_max[metric], duration) + self.__timers_min[metric] = min(self.__timers_max[metric], duration) + self.__timers_sum[metric] += duration + self.__timers_count[metric] += 1 + self.__throttled_record() + + def increment(self, metric: str, delta: int, tags: Optional[Tags] = None) -> None: + self.__counters[metric] += delta + self.__throttled_record() + + def gauge( + self, metric: str, value: Union[int, float], tags: Optional[Tags] = None + ) -> None: + self.__gauges[metric] = value + + def flush(self) -> None: + value: Union[float, int] + + if self.__record_timers_sum: + for metric, value in self.__timers_sum.items(): + self.__metrics.timing(f"{metric}.buffered_sum", value, tags=self.__tags) + + if self.__record_timers_avg: + for metric, value in self.__timers_sum.items(): + self.__metrics.timing( + f"{metric}.buffered_avg", + value / self.__timers_count[metric], + tags=self.__tags, + ) + + if self.__record_timers_count: + for metric, value in self.__timers_count.items(): + self.__metrics.timing( + f"{metric}.buffered_count", value, tags=self.__tags + ) + + if self.__record_timers_max: + for metric, value in self.__timers_max.items(): + self.__metrics.timing(f"{metric}.buffered_max", value, tags=self.__tags) + + if self.__record_timers_min: + for metric, value in self.__timers_min.items(): + self.__metrics.timing(f"{metric}.buffered_min", value, tags=self.__tags) + + for metric, value in self.__counters.items(): + self.__metrics.increment(metric, value, tags=self.__tags) + + for metric, value in self.__gauges.items(): + self.__metrics.gauge(metric, value, tags=self.__tags) + + self.__reset() + + def __reset(self) -> None: + self.__timers_min.clear() + self.__timers_max.clear() + self.__timers_sum.clear() + self.__timers_count.clear() + self.__counters.clear() + self.__gauges.clear() + self.__last_record_time = time.time() + + def __throttled_record(self) -> None: + if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC: + self.flush() diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 69f2ba43..4605f8f8 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -124,12 +124,12 @@ def test_stream_processor_lifecycle() -> None: processor._shutdown() assert list((type(call), call.name) for call in metrics.calls) == [ - (Timing, "arroyo.consumer.poll.time"), - (Timing, "arroyo.consumer.callback.time"), - (Timing, "arroyo.consumer.processing.time"), - (Timing, "arroyo.consumer.paused.time"), - (Timing, "arroyo.consumer.join.time"), - (Timing, "arroyo.consumer.shutdown.time"), + (Timing, "arroyo.consumer.poll.time.buffered_sum"), + (Timing, "arroyo.consumer.callback.time.buffered_sum"), + (Timing, "arroyo.consumer.processing.time.buffered_sum"), + (Timing, "arroyo.consumer.paused.time.buffered_sum"), + (Timing, "arroyo.consumer.join.time.buffered_sum"), + (Timing, "arroyo.consumer.shutdown.time.buffered_sum"), (Increment, "arroyo.consumer.run.count"), ] diff --git a/tests/utils/test_metrics_buffer.py b/tests/utils/test_metrics_buffer.py new file mode 100644 index 00000000..b2923719 --- /dev/null +++ b/tests/utils/test_metrics_buffer.py @@ -0,0 +1,13 @@ +from arroyo.utils.metrics_buffer import MetricsBuffer +from tests.metrics import Increment, TestingMetricsBackend + + +def test_basic() -> None: + buffer = MetricsBuffer(TestingMetricsBackend) + buffer.increment("test", 1) + buffer.increment("test", 1) + buffer.increment("test", 1) + + buffer.flush() + + assert TestingMetricsBackend.calls == [Increment("test", 3, None)]