Skip to content

Commit

Permalink
replace deque_pushpopmax() function with deque(maxlen=)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paweł Wilczyński committed Feb 26, 2021
1 parent 94ba41a commit 244d9fd
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 81 deletions.
13 changes: 6 additions & 7 deletions faust/livecheck/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from yarl import URL

from faust.utils import uuid
from faust.utils.functional import deque_pushpopmax

from .exceptions import ServiceDown, SuiteFailed, SuiteStalled
from .locals import current_execution_stack, current_test_stack
Expand Down Expand Up @@ -166,9 +165,9 @@ def __init__(
if url_error_delay_max is not None:
self.url_error_delay_max = url_error_delay_max

self.frequency_history = deque()
self.latency_history = deque()
self.runtime_history = deque()
self.frequency_history = deque(maxlen=self.max_history)
self.latency_history = deque(maxlen=self.max_history)
self.runtime_history = deque(maxlen=self.max_history)

self.total_by_state = Counter()

Expand Down Expand Up @@ -252,8 +251,8 @@ async def on_test_start(self, runner: TestRunner) -> None:
wanted_frequency = self.frequency
if wanted_frequency:
latency = time_since - wanted_frequency
deque_pushpopmax(self.latency_history, latency, self.max_history)
deque_pushpopmax(self.frequency_history, time_since, self.max_history)
self.latency_history.append(latency)
self.frequency_history.append(time_since)

async def on_test_skipped(self, runner: TestRunner) -> None:
"""Call when a test is skipped."""
Expand Down Expand Up @@ -299,7 +298,7 @@ async def on_test_pass(self, runner: TestRunner) -> None:
"""Call when a test execution passes."""
test = runner.test
runtime: float = runner.runtime or 0.0
deque_pushpopmax(self.runtime_history, runtime, self.max_history)
self.runtime_history.append(runtime)
ts = test.timestamp.timestamp()
last_fail = self.last_fail
if last_fail is None or ts > last_fail:
Expand Down
51 changes: 22 additions & 29 deletions faust/sensors/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from faust.types.assignor import PartitionAssignorT
from faust.types.transports import ConsumerT, ProducerT
from faust.types.tuples import TP, Message, PendingMessage, RecordMetadata
from faust.utils.functional import deque_pushpopmax

from .base import Sensor

Expand Down Expand Up @@ -256,16 +255,18 @@ def __init__(
self.rebalances = rebalances

self.tables = {} if tables is None else tables
self.commit_latency = deque() if commit_latency is None else commit_latency
self.send_latency = deque() if send_latency is None else send_latency
self.assignment_latency = (
deque() if assignment_latency is None else assignment_latency
self.commit_latency = deque(
commit_latency or [], maxlen=self.max_commit_latency_history
)
self.rebalance_return_latency = (
deque() if rebalance_return_latency is None else rebalance_return_latency
self.send_latency = deque(send_latency or [], self.max_send_latency_history)
self.assignment_latency = deque(
assignment_latency or [], maxlen=self.max_assignment_latency_history
)
self.rebalance_end_latency = (
deque() if rebalance_end_latency is None else rebalance_end_latency
self.rebalance_return_latency = deque(
rebalance_return_latency or [], maxlen=self.max_avg_history
)
self.rebalance_end_latency = deque(
rebalance_end_latency or [], maxlen=self.max_avg_history
)
self.rebalance_return_avg = rebalance_return_avg
self.rebalance_end_avg = rebalance_end_avg
Expand All @@ -283,12 +284,14 @@ def __init__(
self.events_by_stream = Counter()
self.events_s = events_s
self.events_runtime_avg = events_runtime_avg
self.events_runtime = deque() if events_runtime is None else events_runtime
self.events_runtime = deque(events_runtime or [], maxlen=self.max_avg_history)
self.topic_buffer_full = Counter()
self.time: Callable[[], float] = time

self.http_response_codes = Counter()
self.http_response_latency = deque()
self.http_response_latency = deque(
http_response_latency or [], maxlen=self.max_avg_history
)
self.http_response_latency_avg = http_response_latency_avg

self.metric_counts = Counter()
Expand Down Expand Up @@ -460,7 +463,7 @@ def on_stream_event_out(
time_out=time_out,
time_total=time_total,
)
deque_pushpopmax(self.events_runtime, time_total, self.max_avg_history)
self.events_runtime.append(time_total)

def on_topic_buffer_full(self, tp: TP) -> None:
"""Call when conductor topic buffer is full and has to wait."""
Expand Down Expand Up @@ -500,11 +503,7 @@ def on_commit_initiated(self, consumer: ConsumerT) -> Any:
def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None:
"""Call when consumer commit offset operation completed."""
latency = self.time() - cast(float, state)
deque_pushpopmax(
self.commit_latency,
latency,
self.max_commit_latency_history,
)
self.commit_latency.append(latency)

def on_send_initiated(
self,
Expand All @@ -524,7 +523,7 @@ def on_send_completed(
) -> None:
"""Call when producer finished sending message."""
latency = self.time() - cast(float, state)
deque_pushpopmax(self.send_latency, latency, self.max_send_latency_history)
self.send_latency.append(latency)

def on_send_error(
self, producer: ProducerT, exc: BaseException, state: Any
Expand Down Expand Up @@ -553,19 +552,15 @@ def on_assignment_error(
) -> None:
"""Partition assignor did not complete assignor due to error."""
time_total = self.time() - state["time_start"]
deque_pushpopmax(
self.assignment_latency, time_total, self.max_assignment_latency_history
)
self.assignment_latency.append(time_total)
self.assignments_failed += 1

def on_assignment_completed(
self, assignor: PartitionAssignorT, state: Dict
) -> None:
"""Partition assignor completed assignment."""
time_total = self.time() - state["time_start"]
deque_pushpopmax(
self.assignment_latency, time_total, self.max_assignment_latency_history
)
self.assignment_latency.append(time_total)
self.assignments_completed += 1

def on_rebalance_start(self, app: AppT) -> Dict:
Expand All @@ -583,9 +578,7 @@ def on_rebalance_return(self, app: AppT, state: Dict) -> None:
time_return=time_return,
latency_return=latency_return,
)
deque_pushpopmax(
self.rebalance_return_latency, latency_return, self.max_avg_history
)
self.rebalance_return_latency.append(latency_return)

def on_rebalance_end(self, app: AppT, state: Dict) -> None:
"""Cluster rebalance fully completed (including recovery)."""
Expand All @@ -596,7 +589,7 @@ def on_rebalance_end(self, app: AppT, state: Dict) -> None:
time_end=time_end,
latency_end=latency_end,
)
deque_pushpopmax(self.rebalance_end_latency, latency_end, self.max_avg_history)
self.rebalance_end_latency.append(latency_end)
self._clear_topic_related_sensors()

def on_web_request_start(
Expand Down Expand Up @@ -624,7 +617,7 @@ def on_web_request_end(
latency_end=latency_end,
status_code=status_code,
)
deque_pushpopmax(self.http_response_latency, latency_end, self.max_avg_history)
self.http_response_latency.append(latency_end)
self.http_response_codes[status_code] += 1

def on_threaded_producer_buffer_processed(self, app: AppT, size: int) -> None:
Expand Down
21 changes: 1 addition & 20 deletions faust/utils/functional.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
"""Functional utilities."""
from functools import reduce
from itertools import groupby
from typing import Iterable, Iterator, Mapping, Optional, Sequence, Tuple, TypeVar

from mode.utils.typing import Deque
from typing import Iterable, Iterator, Mapping, Sequence, Tuple, TypeVar

__all__ = [
"consecutive_numbers",
"deque_prune",
"deque_pushpopmax",
"translate",
]

Expand All @@ -25,21 +21,6 @@ def consecutive_numbers(it: Iterable[int]) -> Iterator[Sequence[int]]:
yield [a[1] for a in g]


def deque_prune(items: Deque[T], max: int = None) -> Optional[T]:
"""Prune oldest element in deque if size exceeds ``max``."""
if max is not None:
size = len(items)
if size > max:
return items.popleft()
return None


def deque_pushpopmax(items: Deque[T], item: T, max: int = None) -> Optional[T]:
"""Append to deque and remove oldest element if size exceeds ``max``."""
items.append(item)
return deque_prune(items, max)


def translate(table: Mapping, s: str) -> str:
"""Replace characters and patterns in string ``s``.
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/livecheck/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ async def test_execute(
async def test_on_test_start(
self, started, last_received, frequency, *, case, runner
):
case.latency_history = deque([0.03] * case.max_history)
case.frequency_history = deque([0.04] * case.max_history)
case.latency_history.extend([0.03] * case.max_history)
case.frequency_history.extend([0.04] * case.max_history)
runner.started = started
case.frequency = frequency
case.last_test_received = last_received
Expand Down Expand Up @@ -229,7 +229,7 @@ async def test_on_suite_pass(
assert runner.test is execution
runner.test.timestamp = Mock()
runner.test.timestamp.timestamp.return_value = ts
case.runtime_history = deque([3.03] * case.max_history)
case.runtime_history.extend([3.03] * case.max_history)
runner.runtime = 300.0
with self.seconds_since_last_fail(case, now=now, failed=failed):
case.status = initial_state
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/sensors/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,10 @@ def test__sample(self, *, mon):
prev_message_total = 0
mon.events_runtime = []
mon._sample(prev_event_total, prev_message_total)
mon.events_runtime = deque(range(100))
mon.rebalance_return_latency = deque(range(100))
mon.rebalance_end_latency = deque(range(100))
mon.http_response_latency = deque(range(100))
mon.events_runtime.extend(range(100))
mon.rebalance_return_latency.extend(range(100))
mon.rebalance_end_latency.extend(range(100))
mon.http_response_latency.extend(range(100))
prev_event_total = 0
prev_message_total = 0
mon._sample(prev_event_total, prev_message_total)
Expand Down
19 changes: 1 addition & 18 deletions tests/unit/utils/test_functional.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from collections import deque

import pytest

from faust.utils.functional import consecutive_numbers, deque_pushpopmax
from faust.utils.functional import consecutive_numbers


@pytest.mark.parametrize(
Expand All @@ -16,18 +14,3 @@
)
def test_consecutive_numbers(numbers, expected):
assert next(consecutive_numbers(numbers), None) == expected


def test_deque_pushpop_max():
d = deque([])
deque_pushpopmax(d, 1, max=None)
assert d == deque([1])
deque_pushpopmax(d, 2, max=3)
assert d == deque([1, 2])
deque_pushpopmax(d, 3, max=3)
assert d == deque([1, 2, 3])
deque_pushpopmax(d, 4, max=3)
assert d == deque([2, 3, 4])
for i in range(5, 100):
deque_pushpopmax(d, i, max=3)
assert d == deque([i - 2, i - 1, i])

0 comments on commit 244d9fd

Please sign in to comment.