From 4ebbefe70b1048b128935fc8df5549f873d0831c Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 22 May 2023 22:31:15 +0200 Subject: [PATCH] Revert "feat: add more item size stats to multiprocessing (and metrics refactor) (#235)" (#240) This reverts commit f1f69a47002baed3694a5dd3a2946b560dbb9a08. --- arroyo/processing/processor.py | 99 +++++++++++----- .../run_task_with_multiprocessing.py | 36 +----- arroyo/utils/metrics_buffer.py | 112 ------------------ tests/processing/test_processor.py | 12 +- tests/utils/test_metrics_buffer.py | 13 -- 5 files changed, 83 insertions(+), 189 deletions(-) delete mode 100644 arroyo/utils/metrics_buffer.py delete mode 100644 tests/utils/test_metrics_buffer.py diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 91c93049..2323491b 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -3,8 +3,20 @@ import functools import logging import time +from collections import defaultdict from enum import Enum -from typing import Any, Callable, Generic, Mapping, Optional, Sequence, TypeVar, cast +from typing import ( + Any, + Callable, + Generic, + Mapping, + MutableMapping, + Optional, + Sequence, + TypeVar, + Union, + cast, +) from arroyo.backends.abstract import Consumer from arroyo.commit import CommitPolicy @@ -18,10 +30,11 @@ from arroyo.types import BrokerValue, Message, Partition, Topic, TStrategyPayload from arroyo.utils.logging import handle_internal_error from arroyo.utils.metrics import get_metrics -from arroyo.utils.metrics_buffer import MetricsBuffer logger = logging.getLogger(__name__) +METRICS_FREQUENCY_SEC = 1.0 # In seconds + F = TypeVar("F", bound=Callable[[Any], Any]) @@ -37,9 +50,8 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: logger.exception(f"{f.__name__} crashed") raise finally: - metrics.timing( - ConsumerTiming.CONSUMER_CALLBACK_TIME.value, - time.time() - start_time, + metrics.incr_timing( + ConsumerTiming.CONSUMER_CALLBACK_TIME, time.time() - start_time ) return cast(F, wrapper) @@ -67,6 +79,41 @@ class ConsumerCounter(Enum): CONSUMER_RUN_COUNT = "arroyo.consumer.run.count" +class MetricsBuffer: + def __init__(self) -> None: + self.__metrics = get_metrics() + self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float) + self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int) + self.__reset() + + def incr_timing(self, metric: ConsumerTiming, duration: float) -> None: + self.__timers[metric] += duration + self.__throttled_record() + + def incr_counter(self, metric: ConsumerCounter, delta: int) -> None: + self.__counters[metric] += delta + self.__throttled_record() + + def flush(self) -> None: + metric: Union[ConsumerTiming, ConsumerCounter] + value: Union[float, int] + + for metric, value in self.__timers.items(): + self.__metrics.timing(metric.value, value) + for metric, value in self.__counters.items(): + self.__metrics.increment(metric.value, value) + self.__reset() + + def __reset(self) -> None: + self.__timers.clear() + self.__counters.clear() + self.__last_record_time = time.time() + + def __throttled_record(self) -> None: + if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC: + self.flush() + + class StreamProcessor(Generic[TStrategyPayload]): """ A stream processor manages the relationship between a ``Consumer`` @@ -86,7 +133,7 @@ def __init__( ) -> None: self.__consumer = consumer self.__processor_factory = processor_factory - self.__metrics_buffer = MetricsBuffer(get_metrics(), record_timers_sum=True) + self.__metrics_buffer = MetricsBuffer() self.__processing_strategy: Optional[ ProcessingStrategy[TStrategyPayload] @@ -130,15 +177,13 @@ def _close_strategy() -> None: try: self.__processing_strategy.join(self.__join_timeout) - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_JOIN_TIME.value, - time.time() - start_join, + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join ) break except InvalidMessage as e: - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_JOIN_TIME.value, - time.time() - start_join, + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join ) self._handle_invalid_message(e) @@ -149,8 +194,8 @@ def _close_strategy() -> None: self.__processing_strategy = None self.__message = None # avoid leaking buffered messages across assignments - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_SHUTDOWN_TIME.value, time.time() - start_close + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_SHUTDOWN_TIME, time.time() - start_close ) def _create_strategy(partitions: Mapping[Partition, int]) -> None: @@ -280,12 +325,12 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: # XXX: This blocks if there are more than MAX_PENDING_FUTURES in the queue. self.__dlq_policy.produce(invalid_message) - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_DLQ_TIME.value, time.time() - start_dlq + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_DLQ_TIME, time.time() - start_dlq ) def _run_once(self) -> None: - self.__metrics_buffer.increment(ConsumerCounter.CONSUMER_RUN_COUNT.value, 1) + self.__metrics_buffer.incr_counter(ConsumerCounter.CONSUMER_RUN_COUNT, 1) message_carried_over = self.__message is not None @@ -310,8 +355,8 @@ def _run_once(self) -> None: try: start_poll = time.time() self.__message = self.__consumer.poll(timeout=1.0) - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_POLL_TIME.value, time.time() - start_poll + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_POLL_TIME, time.time() - start_poll ) except RecoverableError: return @@ -324,8 +369,8 @@ def _run_once(self) -> None: self._handle_invalid_message(e) return - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_PROCESSING_TIME.value, time.time() - start_poll + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_poll ) if self.__message is not None: try: @@ -337,8 +382,8 @@ def _run_once(self) -> None: self.__buffered_messages.append(self.__message) self.__processing_strategy.submit(message) - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_PROCESSING_TIME.value, + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_submit, ) except MessageRejected as e: @@ -357,8 +402,8 @@ def _run_once(self) -> None: else: current_time = time.time() if self.__paused_timestamp: - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_PAUSED_TIME.value, + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_PAUSED_TIME, current_time - self.__paused_timestamp, ) self.__paused_timestamp = current_time @@ -372,8 +417,8 @@ def _run_once(self) -> None: if message_carried_over and self.__paused_timestamp is not None: self.__consumer.resume([*self.__consumer.tell().keys()]) - self.__metrics_buffer.timing( - ConsumerTiming.CONSUMER_PAUSED_TIME.value, + self.__metrics_buffer.incr_timing( + ConsumerTiming.CONSUMER_PAUSED_TIME, time.time() - self.__paused_timestamp, ) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index eff2199c..658c2372 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -29,7 +29,6 @@ from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy from arroyo.types import FilteredPayload, Message, TStrategyPayload from arroyo.utils.metrics import Gauge, get_metrics -from arroyo.utils.metrics_buffer import MetricsBuffer logger = logging.getLogger(__name__) @@ -61,10 +60,6 @@ class ValueTooLarge(ValueError): """ -# Item size in bytes (inline pickle + space taken in shm buffer) -ItemStats = Tuple[int, int] - - class MessageBatch(Generic[TBatchValue]): """ Contains a sequence of ``Message`` instances that are intended to be @@ -134,7 +129,7 @@ def __iter__(self) -> Iterator[Tuple[int, TBatchValue]]: def reset_iterator(self, iter_offset: int) -> None: self.__iter_offset = iter_offset - def append(self, message: TBatchValue) -> ItemStats: + def append(self, message: TBatchValue) -> None: """ Add a message to this batch. @@ -148,11 +143,7 @@ def append(self, message: TBatchValue) -> ItemStats: """ buffers: MutableSequence[Tuple[int, int]] = [] - buffer_data_len = 0 - def buffer_callback(buffer: PickleBuffer) -> None: - nonlocal buffer_data_len - value = buffer.raw() offset = self.__offset length = len(value) @@ -162,14 +153,12 @@ def buffer_callback(buffer: PickleBuffer) -> None: f"bytes needed but {self.block.size - offset} bytes free" ) self.block.buf[offset : offset + length] = value - buffer_data_len += length + self.__offset += length buffers.append((offset, length)) data = pickle.dumps(message, protocol=5, buffer_callback=buffer_callback) - self.__offset += buffer_data_len self.__items.append((data, buffers)) - return len(data), buffer_data_len class BatchBuilder(Generic[TBatchValue]): @@ -186,8 +175,8 @@ def __init__( def __len__(self) -> int: return len(self.__batch) - def append(self, message: TBatchValue) -> ItemStats: - return self.__batch.append(message) + def append(self, message: TBatchValue) -> None: + self.__batch.append(message) def ready(self) -> bool: if len(self.__batch) >= self.__max_batch_size: @@ -382,12 +371,6 @@ def __init__( self.__invalid_messages = InvalidMessageState() self.__metrics = get_metrics() - self.__metrics_buffer = MetricsBuffer( - self.__metrics, - record_timers_avg=True, - record_timers_max=True, - record_timers_min=True, - ) self.__batches_in_progress = Gauge(self.__metrics, "batches_in_progress") self.__pool_waiting_time: Optional[float] = None self.__metrics.gauge("transform.processes", num_processes) @@ -564,7 +547,7 @@ def submit( assert self.__batch_builder is not None try: - pickle_bytes, buffer_bytes = self.__batch_builder.append(message) + self.__batch_builder.append(message) except ValueTooLarge as e: logger.debug("Caught %r, closing batch and retrying...", e) self.__metrics.increment( @@ -581,15 +564,6 @@ def submit( # size is too small (smaller than the Kafka payload limit without # compression.) self.__batch_builder.append(message) - else: - self.__metrics_buffer.timing( - "arroyo.strategies.run_task_with_multiprocessing.item.pickle_bytes", - pickle_bytes, - ) - self.__metrics_buffer.timing( - "arroyo.strategies.run_task_with_multiprocessing.item.buffer_bytes", - buffer_bytes, - ) def close(self) -> None: self.__closed = True diff --git a/arroyo/utils/metrics_buffer.py b/arroyo/utils/metrics_buffer.py deleted file mode 100644 index 21f62fb7..00000000 --- a/arroyo/utils/metrics_buffer.py +++ /dev/null @@ -1,112 +0,0 @@ -import time -from collections import defaultdict -from typing import MutableMapping, Optional, Union - -from arroyo.utils.metrics import Metrics, Tags - -METRICS_FREQUENCY_SEC = 1.0 # In seconds - - -class MetricsBuffer: - """ - A buffer for metrics, preaggregating them before they are sent to the - underlying metrics backend. - - Should only be used in performance sensitive code if there's no other - option. Buffering is lossy. - - Buffer is flushed every second at least. - """ - - def __init__( - self, - metrics: Metrics, - tags: Optional[Tags] = None, - record_timers_min: bool = False, - record_timers_max: bool = False, - record_timers_avg: bool = False, - record_timers_count: bool = False, - record_timers_sum: bool = False, - ) -> None: - self.__metrics = metrics - self.__tags = tags - - self.__record_timers_min = record_timers_min - self.__record_timers_max = record_timers_max - self.__record_timers_avg = record_timers_avg - self.__record_timers_count = record_timers_count - self.__record_timers_sum = record_timers_sum - - self.__timers_max: MutableMapping[str, float] = defaultdict(float) - self.__timers_min: MutableMapping[str, float] = defaultdict(float) - self.__timers_sum: MutableMapping[str, float] = defaultdict(float) - self.__timers_count: MutableMapping[str, float] = defaultdict(float) - self.__counters: MutableMapping[str, int] = defaultdict(int) - self.__gauges: MutableMapping[str, Union[float, int]] = defaultdict(int) - self.__reset() - - def timing(self, metric: str, duration: float, tags: Optional[Tags] = None) -> None: - self.__timers_max[metric] = max(self.__timers_max[metric], duration) - self.__timers_min[metric] = min(self.__timers_max[metric], duration) - self.__timers_sum[metric] += duration - self.__timers_count[metric] += 1 - self.__throttled_record() - - def increment(self, metric: str, delta: int, tags: Optional[Tags] = None) -> None: - self.__counters[metric] += delta - self.__throttled_record() - - def gauge( - self, metric: str, value: Union[int, float], tags: Optional[Tags] = None - ) -> None: - self.__gauges[metric] = value - - def flush(self) -> None: - value: Union[float, int] - - if self.__record_timers_sum: - for metric, value in self.__timers_sum.items(): - self.__metrics.timing(f"{metric}.buffered_sum", value, tags=self.__tags) - - if self.__record_timers_avg: - for metric, value in self.__timers_sum.items(): - self.__metrics.timing( - f"{metric}.buffered_avg", - value / self.__timers_count[metric], - tags=self.__tags, - ) - - if self.__record_timers_count: - for metric, value in self.__timers_count.items(): - self.__metrics.timing( - f"{metric}.buffered_count", value, tags=self.__tags - ) - - if self.__record_timers_max: - for metric, value in self.__timers_max.items(): - self.__metrics.timing(f"{metric}.buffered_max", value, tags=self.__tags) - - if self.__record_timers_min: - for metric, value in self.__timers_min.items(): - self.__metrics.timing(f"{metric}.buffered_min", value, tags=self.__tags) - - for metric, value in self.__counters.items(): - self.__metrics.increment(metric, value, tags=self.__tags) - - for metric, value in self.__gauges.items(): - self.__metrics.gauge(metric, value, tags=self.__tags) - - self.__reset() - - def __reset(self) -> None: - self.__timers_min.clear() - self.__timers_max.clear() - self.__timers_sum.clear() - self.__timers_count.clear() - self.__counters.clear() - self.__gauges.clear() - self.__last_record_time = time.time() - - def __throttled_record(self) -> None: - if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC: - self.flush() diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 4605f8f8..69f2ba43 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -124,12 +124,12 @@ def test_stream_processor_lifecycle() -> None: processor._shutdown() assert list((type(call), call.name) for call in metrics.calls) == [ - (Timing, "arroyo.consumer.poll.time.buffered_sum"), - (Timing, "arroyo.consumer.callback.time.buffered_sum"), - (Timing, "arroyo.consumer.processing.time.buffered_sum"), - (Timing, "arroyo.consumer.paused.time.buffered_sum"), - (Timing, "arroyo.consumer.join.time.buffered_sum"), - (Timing, "arroyo.consumer.shutdown.time.buffered_sum"), + (Timing, "arroyo.consumer.poll.time"), + (Timing, "arroyo.consumer.callback.time"), + (Timing, "arroyo.consumer.processing.time"), + (Timing, "arroyo.consumer.paused.time"), + (Timing, "arroyo.consumer.join.time"), + (Timing, "arroyo.consumer.shutdown.time"), (Increment, "arroyo.consumer.run.count"), ] diff --git a/tests/utils/test_metrics_buffer.py b/tests/utils/test_metrics_buffer.py deleted file mode 100644 index b2923719..00000000 --- a/tests/utils/test_metrics_buffer.py +++ /dev/null @@ -1,13 +0,0 @@ -from arroyo.utils.metrics_buffer import MetricsBuffer -from tests.metrics import Increment, TestingMetricsBackend - - -def test_basic() -> None: - buffer = MetricsBuffer(TestingMetricsBackend) - buffer.increment("test", 1) - buffer.increment("test", 1) - buffer.increment("test", 1) - - buffer.flush() - - assert TestingMetricsBackend.calls == [Increment("test", 3, None)]