Skip to content

Commit

Permalink
Revert "feat: add more item size stats to multiprocessing (and metric…
Browse files Browse the repository at this point in the history
…s refactor) (#235)" (#240)

This reverts commit f1f69a4.
  • Loading branch information
untitaker authored May 22, 2023
1 parent c6bcbe9 commit 4ebbefe
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 189 deletions.
99 changes: 72 additions & 27 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,20 @@
import functools
import logging
import time
from collections import defaultdict
from enum import Enum
from typing import Any, Callable, Generic, Mapping, Optional, Sequence, TypeVar, cast
from typing import (
Any,
Callable,
Generic,
Mapping,
MutableMapping,
Optional,
Sequence,
TypeVar,
Union,
cast,
)

from arroyo.backends.abstract import Consumer
from arroyo.commit import CommitPolicy
Expand All @@ -18,10 +30,11 @@
from arroyo.types import BrokerValue, Message, Partition, Topic, TStrategyPayload
from arroyo.utils.logging import handle_internal_error
from arroyo.utils.metrics import get_metrics
from arroyo.utils.metrics_buffer import MetricsBuffer

logger = logging.getLogger(__name__)

METRICS_FREQUENCY_SEC = 1.0 # In seconds

F = TypeVar("F", bound=Callable[[Any], Any])


Expand All @@ -37,9 +50,8 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
logger.exception(f"{f.__name__} crashed")
raise
finally:
metrics.timing(
ConsumerTiming.CONSUMER_CALLBACK_TIME.value,
time.time() - start_time,
metrics.incr_timing(
ConsumerTiming.CONSUMER_CALLBACK_TIME, time.time() - start_time
)

return cast(F, wrapper)
Expand Down Expand Up @@ -67,6 +79,41 @@ class ConsumerCounter(Enum):
CONSUMER_RUN_COUNT = "arroyo.consumer.run.count"


class MetricsBuffer:
def __init__(self) -> None:
self.__metrics = get_metrics()
self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float)
self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int)
self.__reset()

def incr_timing(self, metric: ConsumerTiming, duration: float) -> None:
self.__timers[metric] += duration
self.__throttled_record()

def incr_counter(self, metric: ConsumerCounter, delta: int) -> None:
self.__counters[metric] += delta
self.__throttled_record()

def flush(self) -> None:
metric: Union[ConsumerTiming, ConsumerCounter]
value: Union[float, int]

for metric, value in self.__timers.items():
self.__metrics.timing(metric.value, value)
for metric, value in self.__counters.items():
self.__metrics.increment(metric.value, value)
self.__reset()

def __reset(self) -> None:
self.__timers.clear()
self.__counters.clear()
self.__last_record_time = time.time()

def __throttled_record(self) -> None:
if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC:
self.flush()


class StreamProcessor(Generic[TStrategyPayload]):
"""
A stream processor manages the relationship between a ``Consumer``
Expand All @@ -86,7 +133,7 @@ def __init__(
) -> None:
self.__consumer = consumer
self.__processor_factory = processor_factory
self.__metrics_buffer = MetricsBuffer(get_metrics(), record_timers_sum=True)
self.__metrics_buffer = MetricsBuffer()

self.__processing_strategy: Optional[
ProcessingStrategy[TStrategyPayload]
Expand Down Expand Up @@ -130,15 +177,13 @@ def _close_strategy() -> None:

try:
self.__processing_strategy.join(self.__join_timeout)
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_JOIN_TIME.value,
time.time() - start_join,
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join
)
break
except InvalidMessage as e:
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_JOIN_TIME.value,
time.time() - start_join,
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join
)
self._handle_invalid_message(e)

Expand All @@ -149,8 +194,8 @@ def _close_strategy() -> None:
self.__processing_strategy = None
self.__message = None # avoid leaking buffered messages across assignments

self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_SHUTDOWN_TIME.value, time.time() - start_close
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_SHUTDOWN_TIME, time.time() - start_close
)

def _create_strategy(partitions: Mapping[Partition, int]) -> None:
Expand Down Expand Up @@ -280,12 +325,12 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None:
# XXX: This blocks if there are more than MAX_PENDING_FUTURES in the queue.
self.__dlq_policy.produce(invalid_message)

self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_DLQ_TIME.value, time.time() - start_dlq
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_DLQ_TIME, time.time() - start_dlq
)

def _run_once(self) -> None:
self.__metrics_buffer.increment(ConsumerCounter.CONSUMER_RUN_COUNT.value, 1)
self.__metrics_buffer.incr_counter(ConsumerCounter.CONSUMER_RUN_COUNT, 1)

message_carried_over = self.__message is not None

Expand All @@ -310,8 +355,8 @@ def _run_once(self) -> None:
try:
start_poll = time.time()
self.__message = self.__consumer.poll(timeout=1.0)
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_POLL_TIME.value, time.time() - start_poll
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_POLL_TIME, time.time() - start_poll
)
except RecoverableError:
return
Expand All @@ -324,8 +369,8 @@ def _run_once(self) -> None:
self._handle_invalid_message(e)
return

self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME.value, time.time() - start_poll
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_poll
)
if self.__message is not None:
try:
Expand All @@ -337,8 +382,8 @@ def _run_once(self) -> None:
self.__buffered_messages.append(self.__message)
self.__processing_strategy.submit(message)

self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME.value,
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME,
time.time() - start_submit,
)
except MessageRejected as e:
Expand All @@ -357,8 +402,8 @@ def _run_once(self) -> None:
else:
current_time = time.time()
if self.__paused_timestamp:
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PAUSED_TIME.value,
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PAUSED_TIME,
current_time - self.__paused_timestamp,
)
self.__paused_timestamp = current_time
Expand All @@ -372,8 +417,8 @@ def _run_once(self) -> None:
if message_carried_over and self.__paused_timestamp is not None:
self.__consumer.resume([*self.__consumer.tell().keys()])

self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PAUSED_TIME.value,
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PAUSED_TIME,
time.time() - self.__paused_timestamp,
)

Expand Down
36 changes: 5 additions & 31 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy
from arroyo.types import FilteredPayload, Message, TStrategyPayload
from arroyo.utils.metrics import Gauge, get_metrics
from arroyo.utils.metrics_buffer import MetricsBuffer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,10 +60,6 @@ class ValueTooLarge(ValueError):
"""


# Item size in bytes (inline pickle + space taken in shm buffer)
ItemStats = Tuple[int, int]


class MessageBatch(Generic[TBatchValue]):
"""
Contains a sequence of ``Message`` instances that are intended to be
Expand Down Expand Up @@ -134,7 +129,7 @@ def __iter__(self) -> Iterator[Tuple[int, TBatchValue]]:
def reset_iterator(self, iter_offset: int) -> None:
self.__iter_offset = iter_offset

def append(self, message: TBatchValue) -> ItemStats:
def append(self, message: TBatchValue) -> None:
"""
Add a message to this batch.
Expand All @@ -148,11 +143,7 @@ def append(self, message: TBatchValue) -> ItemStats:
"""
buffers: MutableSequence[Tuple[int, int]] = []

buffer_data_len = 0

def buffer_callback(buffer: PickleBuffer) -> None:
nonlocal buffer_data_len

value = buffer.raw()
offset = self.__offset
length = len(value)
Expand All @@ -162,14 +153,12 @@ def buffer_callback(buffer: PickleBuffer) -> None:
f"bytes needed but {self.block.size - offset} bytes free"
)
self.block.buf[offset : offset + length] = value
buffer_data_len += length
self.__offset += length
buffers.append((offset, length))

data = pickle.dumps(message, protocol=5, buffer_callback=buffer_callback)

self.__offset += buffer_data_len
self.__items.append((data, buffers))
return len(data), buffer_data_len


class BatchBuilder(Generic[TBatchValue]):
Expand All @@ -186,8 +175,8 @@ def __init__(
def __len__(self) -> int:
return len(self.__batch)

def append(self, message: TBatchValue) -> ItemStats:
return self.__batch.append(message)
def append(self, message: TBatchValue) -> None:
self.__batch.append(message)

def ready(self) -> bool:
if len(self.__batch) >= self.__max_batch_size:
Expand Down Expand Up @@ -382,12 +371,6 @@ def __init__(
self.__invalid_messages = InvalidMessageState()

self.__metrics = get_metrics()
self.__metrics_buffer = MetricsBuffer(
self.__metrics,
record_timers_avg=True,
record_timers_max=True,
record_timers_min=True,
)
self.__batches_in_progress = Gauge(self.__metrics, "batches_in_progress")
self.__pool_waiting_time: Optional[float] = None
self.__metrics.gauge("transform.processes", num_processes)
Expand Down Expand Up @@ -564,7 +547,7 @@ def submit(
assert self.__batch_builder is not None

try:
pickle_bytes, buffer_bytes = self.__batch_builder.append(message)
self.__batch_builder.append(message)
except ValueTooLarge as e:
logger.debug("Caught %r, closing batch and retrying...", e)
self.__metrics.increment(
Expand All @@ -581,15 +564,6 @@ def submit(
# size is too small (smaller than the Kafka payload limit without
# compression.)
self.__batch_builder.append(message)
else:
self.__metrics_buffer.timing(
"arroyo.strategies.run_task_with_multiprocessing.item.pickle_bytes",
pickle_bytes,
)
self.__metrics_buffer.timing(
"arroyo.strategies.run_task_with_multiprocessing.item.buffer_bytes",
buffer_bytes,
)

def close(self) -> None:
self.__closed = True
Expand Down
Loading

0 comments on commit 4ebbefe

Please sign in to comment.