From 244d9fd1165ca1e59b0c2cf0f53a0e956e385661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Wilczy=C5=84ski?= Date: Fri, 26 Feb 2021 10:43:53 +0100 Subject: [PATCH] replace deque_pushpopmax() function with deque(maxlen=) --- faust/livecheck/case.py | 13 ++++---- faust/sensors/monitor.py | 51 +++++++++++++---------------- faust/utils/functional.py | 21 +----------- tests/unit/livecheck/test_case.py | 6 ++-- tests/unit/sensors/test_monitor.py | 8 ++--- tests/unit/utils/test_functional.py | 19 +---------- 6 files changed, 37 insertions(+), 81 deletions(-) diff --git a/faust/livecheck/case.py b/faust/livecheck/case.py index e284a07f2..12858bc68 100644 --- a/faust/livecheck/case.py +++ b/faust/livecheck/case.py @@ -18,7 +18,6 @@ from yarl import URL from faust.utils import uuid -from faust.utils.functional import deque_pushpopmax from .exceptions import ServiceDown, SuiteFailed, SuiteStalled from .locals import current_execution_stack, current_test_stack @@ -166,9 +165,9 @@ def __init__( if url_error_delay_max is not None: self.url_error_delay_max = url_error_delay_max - self.frequency_history = deque() - self.latency_history = deque() - self.runtime_history = deque() + self.frequency_history = deque(maxlen=self.max_history) + self.latency_history = deque(maxlen=self.max_history) + self.runtime_history = deque(maxlen=self.max_history) self.total_by_state = Counter() @@ -252,8 +251,8 @@ async def on_test_start(self, runner: TestRunner) -> None: wanted_frequency = self.frequency if wanted_frequency: latency = time_since - wanted_frequency - deque_pushpopmax(self.latency_history, latency, self.max_history) - deque_pushpopmax(self.frequency_history, time_since, self.max_history) + self.latency_history.append(latency) + self.frequency_history.append(time_since) async def on_test_skipped(self, runner: TestRunner) -> None: """Call when a test is skipped.""" @@ -299,7 +298,7 @@ async def on_test_pass(self, runner: TestRunner) -> None: """Call when a test execution passes.""" test = runner.test runtime: float = runner.runtime or 0.0 - deque_pushpopmax(self.runtime_history, runtime, self.max_history) + self.runtime_history.append(runtime) ts = test.timestamp.timestamp() last_fail = self.last_fail if last_fail is None or ts > last_fail: diff --git a/faust/sensors/monitor.py b/faust/sensors/monitor.py index b8385bed7..638765342 100644 --- a/faust/sensors/monitor.py +++ b/faust/sensors/monitor.py @@ -26,7 +26,6 @@ from faust.types.assignor import PartitionAssignorT from faust.types.transports import ConsumerT, ProducerT from faust.types.tuples import TP, Message, PendingMessage, RecordMetadata -from faust.utils.functional import deque_pushpopmax from .base import Sensor @@ -256,16 +255,18 @@ def __init__( self.rebalances = rebalances self.tables = {} if tables is None else tables - self.commit_latency = deque() if commit_latency is None else commit_latency - self.send_latency = deque() if send_latency is None else send_latency - self.assignment_latency = ( - deque() if assignment_latency is None else assignment_latency + self.commit_latency = deque( + commit_latency or [], maxlen=self.max_commit_latency_history ) - self.rebalance_return_latency = ( - deque() if rebalance_return_latency is None else rebalance_return_latency + self.send_latency = deque(send_latency or [], self.max_send_latency_history) + self.assignment_latency = deque( + assignment_latency or [], maxlen=self.max_assignment_latency_history ) - self.rebalance_end_latency = ( - deque() if rebalance_end_latency is None else rebalance_end_latency + self.rebalance_return_latency = deque( + rebalance_return_latency or [], maxlen=self.max_avg_history + ) + self.rebalance_end_latency = deque( + rebalance_end_latency or [], maxlen=self.max_avg_history ) self.rebalance_return_avg = rebalance_return_avg self.rebalance_end_avg = rebalance_end_avg @@ -283,12 +284,14 @@ def __init__( self.events_by_stream = Counter() self.events_s = events_s self.events_runtime_avg = events_runtime_avg - self.events_runtime = deque() if events_runtime is None else events_runtime + self.events_runtime = deque(events_runtime or [], maxlen=self.max_avg_history) self.topic_buffer_full = Counter() self.time: Callable[[], float] = time self.http_response_codes = Counter() - self.http_response_latency = deque() + self.http_response_latency = deque( + http_response_latency or [], maxlen=self.max_avg_history + ) self.http_response_latency_avg = http_response_latency_avg self.metric_counts = Counter() @@ -460,7 +463,7 @@ def on_stream_event_out( time_out=time_out, time_total=time_total, ) - deque_pushpopmax(self.events_runtime, time_total, self.max_avg_history) + self.events_runtime.append(time_total) def on_topic_buffer_full(self, tp: TP) -> None: """Call when conductor topic buffer is full and has to wait.""" @@ -500,11 +503,7 @@ def on_commit_initiated(self, consumer: ConsumerT) -> Any: def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None: """Call when consumer commit offset operation completed.""" latency = self.time() - cast(float, state) - deque_pushpopmax( - self.commit_latency, - latency, - self.max_commit_latency_history, - ) + self.commit_latency.append(latency) def on_send_initiated( self, @@ -524,7 +523,7 @@ def on_send_completed( ) -> None: """Call when producer finished sending message.""" latency = self.time() - cast(float, state) - deque_pushpopmax(self.send_latency, latency, self.max_send_latency_history) + self.send_latency.append(latency) def on_send_error( self, producer: ProducerT, exc: BaseException, state: Any @@ -553,9 +552,7 @@ def on_assignment_error( ) -> None: """Partition assignor did not complete assignor due to error.""" time_total = self.time() - state["time_start"] - deque_pushpopmax( - self.assignment_latency, time_total, self.max_assignment_latency_history - ) + self.assignment_latency.append(time_total) self.assignments_failed += 1 def on_assignment_completed( @@ -563,9 +560,7 @@ def on_assignment_completed( ) -> None: """Partition assignor completed assignment.""" time_total = self.time() - state["time_start"] - deque_pushpopmax( - self.assignment_latency, time_total, self.max_assignment_latency_history - ) + self.assignment_latency.append(time_total) self.assignments_completed += 1 def on_rebalance_start(self, app: AppT) -> Dict: @@ -583,9 +578,7 @@ def on_rebalance_return(self, app: AppT, state: Dict) -> None: time_return=time_return, latency_return=latency_return, ) - deque_pushpopmax( - self.rebalance_return_latency, latency_return, self.max_avg_history - ) + self.rebalance_return_latency.append(latency_return) def on_rebalance_end(self, app: AppT, state: Dict) -> None: """Cluster rebalance fully completed (including recovery).""" @@ -596,7 +589,7 @@ def on_rebalance_end(self, app: AppT, state: Dict) -> None: time_end=time_end, latency_end=latency_end, ) - deque_pushpopmax(self.rebalance_end_latency, latency_end, self.max_avg_history) + self.rebalance_end_latency.append(latency_end) self._clear_topic_related_sensors() def on_web_request_start( @@ -624,7 +617,7 @@ def on_web_request_end( latency_end=latency_end, status_code=status_code, ) - deque_pushpopmax(self.http_response_latency, latency_end, self.max_avg_history) + self.http_response_latency.append(latency_end) self.http_response_codes[status_code] += 1 def on_threaded_producer_buffer_processed(self, app: AppT, size: int) -> None: diff --git a/faust/utils/functional.py b/faust/utils/functional.py index 4bceea3b2..729630ea3 100644 --- a/faust/utils/functional.py +++ b/faust/utils/functional.py @@ -1,14 +1,10 @@ """Functional utilities.""" from functools import reduce from itertools import groupby -from typing import Iterable, Iterator, Mapping, Optional, Sequence, Tuple, TypeVar - -from mode.utils.typing import Deque +from typing import Iterable, Iterator, Mapping, Sequence, Tuple, TypeVar __all__ = [ "consecutive_numbers", - "deque_prune", - "deque_pushpopmax", "translate", ] @@ -25,21 +21,6 @@ def consecutive_numbers(it: Iterable[int]) -> Iterator[Sequence[int]]: yield [a[1] for a in g] -def deque_prune(items: Deque[T], max: int = None) -> Optional[T]: - """Prune oldest element in deque if size exceeds ``max``.""" - if max is not None: - size = len(items) - if size > max: - return items.popleft() - return None - - -def deque_pushpopmax(items: Deque[T], item: T, max: int = None) -> Optional[T]: - """Append to deque and remove oldest element if size exceeds ``max``.""" - items.append(item) - return deque_prune(items, max) - - def translate(table: Mapping, s: str) -> str: """Replace characters and patterns in string ``s``. diff --git a/tests/unit/livecheck/test_case.py b/tests/unit/livecheck/test_case.py index 3608a5d4a..1c7590d67 100644 --- a/tests/unit/livecheck/test_case.py +++ b/tests/unit/livecheck/test_case.py @@ -139,8 +139,8 @@ async def test_execute( async def test_on_test_start( self, started, last_received, frequency, *, case, runner ): - case.latency_history = deque([0.03] * case.max_history) - case.frequency_history = deque([0.04] * case.max_history) + case.latency_history.extend([0.03] * case.max_history) + case.frequency_history.extend([0.04] * case.max_history) runner.started = started case.frequency = frequency case.last_test_received = last_received @@ -229,7 +229,7 @@ async def test_on_suite_pass( assert runner.test is execution runner.test.timestamp = Mock() runner.test.timestamp.timestamp.return_value = ts - case.runtime_history = deque([3.03] * case.max_history) + case.runtime_history.extend([3.03] * case.max_history) runner.runtime = 300.0 with self.seconds_since_last_fail(case, now=now, failed=failed): case.status = initial_state diff --git a/tests/unit/sensors/test_monitor.py b/tests/unit/sensors/test_monitor.py index f42c97105..6b0b07872 100644 --- a/tests/unit/sensors/test_monitor.py +++ b/tests/unit/sensors/test_monitor.py @@ -446,10 +446,10 @@ def test__sample(self, *, mon): prev_message_total = 0 mon.events_runtime = [] mon._sample(prev_event_total, prev_message_total) - mon.events_runtime = deque(range(100)) - mon.rebalance_return_latency = deque(range(100)) - mon.rebalance_end_latency = deque(range(100)) - mon.http_response_latency = deque(range(100)) + mon.events_runtime.extend(range(100)) + mon.rebalance_return_latency.extend(range(100)) + mon.rebalance_end_latency.extend(range(100)) + mon.http_response_latency.extend(range(100)) prev_event_total = 0 prev_message_total = 0 mon._sample(prev_event_total, prev_message_total) diff --git a/tests/unit/utils/test_functional.py b/tests/unit/utils/test_functional.py index a7ae86e34..e24f91a36 100644 --- a/tests/unit/utils/test_functional.py +++ b/tests/unit/utils/test_functional.py @@ -1,8 +1,6 @@ -from collections import deque - import pytest -from faust.utils.functional import consecutive_numbers, deque_pushpopmax +from faust.utils.functional import consecutive_numbers @pytest.mark.parametrize( @@ -16,18 +14,3 @@ ) def test_consecutive_numbers(numbers, expected): assert next(consecutive_numbers(numbers), None) == expected - - -def test_deque_pushpop_max(): - d = deque([]) - deque_pushpopmax(d, 1, max=None) - assert d == deque([1]) - deque_pushpopmax(d, 2, max=3) - assert d == deque([1, 2]) - deque_pushpopmax(d, 3, max=3) - assert d == deque([1, 2, 3]) - deque_pushpopmax(d, 4, max=3) - assert d == deque([2, 3, 4]) - for i in range(5, 100): - deque_pushpopmax(d, i, max=3) - assert d == deque([i - 2, i - 1, i])