Skip to content

Commit

Permalink
feat: add more item size stats to multiprocessing (and metrics refact…
Browse files Browse the repository at this point in the history
…or) (#235)
  • Loading branch information
untitaker authored May 17, 2023
1 parent 18da2c4 commit f1f69a4
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 83 deletions.
99 changes: 27 additions & 72 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,8 @@
import functools
import logging
import time
from collections import defaultdict
from enum import Enum
from typing import (
Any,
Callable,
Generic,
Mapping,
MutableMapping,
Optional,
Sequence,
TypeVar,
Union,
cast,
)
from typing import Any, Callable, Generic, Mapping, Optional, Sequence, TypeVar, cast

from arroyo.backends.abstract import Consumer
from arroyo.commit import CommitPolicy
Expand All @@ -30,11 +18,10 @@
from arroyo.types import BrokerValue, Message, Partition, Topic, TStrategyPayload
from arroyo.utils.logging import handle_internal_error
from arroyo.utils.metrics import get_metrics
from arroyo.utils.metrics_buffer import MetricsBuffer

logger = logging.getLogger(__name__)

METRICS_FREQUENCY_SEC = 1.0 # In seconds

F = TypeVar("F", bound=Callable[[Any], Any])


Expand All @@ -50,8 +37,9 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
logger.exception(f"{f.__name__} crashed")
raise
finally:
metrics.incr_timing(
ConsumerTiming.CONSUMER_CALLBACK_TIME, time.time() - start_time
metrics.timing(
ConsumerTiming.CONSUMER_CALLBACK_TIME.value,
time.time() - start_time,
)

return cast(F, wrapper)
Expand Down Expand Up @@ -79,41 +67,6 @@ class ConsumerCounter(Enum):
CONSUMER_RUN_COUNT = "arroyo.consumer.run.count"


class MetricsBuffer:
def __init__(self) -> None:
self.__metrics = get_metrics()
self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float)
self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int)
self.__reset()

def incr_timing(self, metric: ConsumerTiming, duration: float) -> None:
self.__timers[metric] += duration
self.__throttled_record()

def incr_counter(self, metric: ConsumerCounter, delta: int) -> None:
self.__counters[metric] += delta
self.__throttled_record()

def flush(self) -> None:
metric: Union[ConsumerTiming, ConsumerCounter]
value: Union[float, int]

for metric, value in self.__timers.items():
self.__metrics.timing(metric.value, value)
for metric, value in self.__counters.items():
self.__metrics.increment(metric.value, value)
self.__reset()

def __reset(self) -> None:
self.__timers.clear()
self.__counters.clear()
self.__last_record_time = time.time()

def __throttled_record(self) -> None:
if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC:
self.flush()


class StreamProcessor(Generic[TStrategyPayload]):
"""
A stream processor manages the relationship between a ``Consumer``
Expand All @@ -133,7 +86,7 @@ def __init__(
) -> None:
self.__consumer = consumer
self.__processor_factory = processor_factory
self.__metrics_buffer = MetricsBuffer()
self.__metrics_buffer = MetricsBuffer(get_metrics(), record_timers_sum=True)

self.__processing_strategy: Optional[
ProcessingStrategy[TStrategyPayload]
Expand Down Expand Up @@ -177,13 +130,15 @@ def _close_strategy() -> None:

try:
self.__processing_strategy.join(self.__join_timeout)
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_JOIN_TIME.value,
time.time() - start_join,
)
break
except InvalidMessage as e:
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_JOIN_TIME, time.time() - start_join
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_JOIN_TIME.value,
time.time() - start_join,
)
self._handle_invalid_message(e)

Expand All @@ -194,8 +149,8 @@ def _close_strategy() -> None:
self.__processing_strategy = None
self.__message = None # avoid leaking buffered messages across assignments

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_SHUTDOWN_TIME, time.time() - start_close
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_SHUTDOWN_TIME.value, time.time() - start_close
)

def _create_strategy(partitions: Mapping[Partition, int]) -> None:
Expand Down Expand Up @@ -325,12 +280,12 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None:
# XXX: This blocks if there are more than MAX_PENDING_FUTURES in the queue.
self.__dlq_policy.produce(invalid_message)

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_DLQ_TIME, time.time() - start_dlq
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_DLQ_TIME.value, time.time() - start_dlq
)

def _run_once(self) -> None:
self.__metrics_buffer.incr_counter(ConsumerCounter.CONSUMER_RUN_COUNT, 1)
self.__metrics_buffer.increment(ConsumerCounter.CONSUMER_RUN_COUNT.value, 1)

message_carried_over = self.__message is not None

Expand All @@ -355,8 +310,8 @@ def _run_once(self) -> None:
try:
start_poll = time.time()
self.__message = self.__consumer.poll(timeout=1.0)
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_POLL_TIME, time.time() - start_poll
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_POLL_TIME.value, time.time() - start_poll
)
except RecoverableError:
return
Expand All @@ -369,8 +324,8 @@ def _run_once(self) -> None:
self._handle_invalid_message(e)
return

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME, time.time() - start_poll
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME.value, time.time() - start_poll
)
if self.__message is not None:
try:
Expand All @@ -382,8 +337,8 @@ def _run_once(self) -> None:
self.__buffered_messages.append(self.__message)
self.__processing_strategy.submit(message)

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME,
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PROCESSING_TIME.value,
time.time() - start_submit,
)
except MessageRejected as e:
Expand All @@ -402,8 +357,8 @@ def _run_once(self) -> None:
else:
current_time = time.time()
if self.__paused_timestamp:
self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PAUSED_TIME,
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PAUSED_TIME.value,
current_time - self.__paused_timestamp,
)
self.__paused_timestamp = current_time
Expand All @@ -417,8 +372,8 @@ def _run_once(self) -> None:
if message_carried_over and self.__paused_timestamp is not None:
self.__consumer.resume([*self.__consumer.tell().keys()])

self.__metrics_buffer.incr_timing(
ConsumerTiming.CONSUMER_PAUSED_TIME,
self.__metrics_buffer.timing(
ConsumerTiming.CONSUMER_PAUSED_TIME.value,
time.time() - self.__paused_timestamp,
)

Expand Down
36 changes: 31 additions & 5 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy
from arroyo.types import FilteredPayload, Message, TStrategyPayload
from arroyo.utils.metrics import Gauge, get_metrics
from arroyo.utils.metrics_buffer import MetricsBuffer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,6 +57,10 @@ class ValueTooLarge(ValueError):
"""


# Item size in bytes (inline pickle + space taken in shm buffer)
ItemStats = Tuple[int, int]


class MessageBatch(Generic[TBatchValue]):
"""
Contains a sequence of ``Message`` instances that are intended to be
Expand Down Expand Up @@ -125,7 +130,7 @@ def __iter__(self) -> Iterator[Tuple[int, TBatchValue]]:
def reset_iterator(self, iter_offset: int) -> None:
self.__iter_offset = iter_offset

def append(self, message: TBatchValue) -> None:
def append(self, message: TBatchValue) -> ItemStats:
"""
Add a message to this batch.
Expand All @@ -139,7 +144,11 @@ def append(self, message: TBatchValue) -> None:
"""
buffers: MutableSequence[Tuple[int, int]] = []

buffer_data_len = 0

def buffer_callback(buffer: PickleBuffer) -> None:
nonlocal buffer_data_len

value = buffer.raw()
offset = self.__offset
length = len(value)
Expand All @@ -149,12 +158,14 @@ def buffer_callback(buffer: PickleBuffer) -> None:
f"bytes needed but {self.block.size - offset} bytes free"
)
self.block.buf[offset : offset + length] = value
self.__offset += length
buffer_data_len += length
buffers.append((offset, length))

data = pickle.dumps(message, protocol=5, buffer_callback=buffer_callback)

self.__offset += buffer_data_len
self.__items.append((data, buffers))
return len(data), buffer_data_len


class BatchBuilder(Generic[TBatchValue]):
Expand All @@ -171,8 +182,8 @@ def __init__(
def __len__(self) -> int:
return len(self.__batch)

def append(self, message: TBatchValue) -> None:
self.__batch.append(message)
def append(self, message: TBatchValue) -> ItemStats:
return self.__batch.append(message)

def ready(self) -> bool:
if len(self.__batch) >= self.__max_batch_size:
Expand Down Expand Up @@ -367,6 +378,12 @@ def __init__(
self.__invalid_messages = InvalidMessageState()

self.__metrics = get_metrics()
self.__metrics_buffer = MetricsBuffer(
self.__metrics,
record_timers_avg=True,
record_timers_max=True,
record_timers_min=True,
)
self.__batches_in_progress = Gauge(self.__metrics, "batches_in_progress")
self.__pool_waiting_time: Optional[float] = None
self.__metrics.gauge("transform.processes", num_processes)
Expand Down Expand Up @@ -533,7 +550,7 @@ def submit(
assert self.__batch_builder is not None

try:
self.__batch_builder.append(message)
pickle_bytes, buffer_bytes = self.__batch_builder.append(message)
except ValueTooLarge as e:
logger.debug("Caught %r, closing batch and retrying...", e)
self.__metrics.increment(
Expand All @@ -550,6 +567,15 @@ def submit(
# size is too small (smaller than the Kafka payload limit without
# compression.)
self.__batch_builder.append(message)
else:
self.__metrics_buffer.timing(
"arroyo.strategies.run_task_with_multiprocessing.item.pickle_bytes",
pickle_bytes,
)
self.__metrics_buffer.timing(
"arroyo.strategies.run_task_with_multiprocessing.item.buffer_bytes",
buffer_bytes,
)

def close(self) -> None:
self.__closed = True
Expand Down
Loading

0 comments on commit f1f69a4

Please sign in to comment.