Skip to content

Commit

Permalink
feat: improve monitor performance
Browse files Browse the repository at this point in the history
cache via a weak key the str names of tasks and streams.
This greatly reduces iterations as this code path takes 50%
of the iteration loop on streams in the _py_aiter function.
  • Loading branch information
cbrand committed Mar 24, 2022
1 parent c230076 commit 5446529
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions faust/sensors/monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Monitor - sensor tracking metrics."""
import asyncio
import re
import weakref
from collections import deque
from http import HTTPStatus
from statistics import median
Expand Down Expand Up @@ -208,6 +209,16 @@ class Monitor(Sensor, KeywordReduce):

stream_inbound_time: Dict[TP, float] = cast(Dict[TP, float], None)

# Lookup for names to streams to reduce __repr__ overhead
stream_lookup: MutableMapping[StreamT, str] = cast(
MutableMapping[StreamT, str], None
)

# Lookup for names to tasks to reduce __repr__ overhead
task_lookup: MutableMapping[Optional[asyncio.Task], str] = cast(
MutableMapping[Optional[asyncio.Task], str], None
)

def __init__(
self,
*,
Expand Down Expand Up @@ -301,6 +312,10 @@ def __init__(
self.tp_end_offsets = {}

self.stream_inbound_time = {}

self.stream_lookup = weakref.WeakKeyDictionary()
self.task_lookup = weakref.WeakKeyDictionary()

Service.__init__(self, **kwargs)

def secs_since(self, start_time: float) -> float:
Expand Down Expand Up @@ -441,8 +456,24 @@ def on_stream_event_in(
"""Call when stream starts processing an event."""
self.events_total += 1
self.stream_inbound_time[tp] = monotonic()
self.events_by_stream[str(stream)] += 1
self.events_by_task[str(stream.task_owner)] += 1

# Performance wise it is better to only stringify stream once.
if stream in self.stream_lookup:
stream_lookup_key = self.stream_lookup[stream]
else:
stream_lookup_key = self.stream_lookup[stream] = str(stream)

self.events_by_stream[stream_lookup_key] += 1

# Same thing for tasks
if stream.task_owner in self.task_lookup:
task_lookup_key = self.task_lookup[stream.task_owner]
else:
task_lookup_key = self.task_lookup[stream.task_owner] = str(
stream.task_owner
)

self.events_by_task[task_lookup_key] += 1
self.events_active += 1
return {
"time_in": self.time(),
Expand Down

0 comments on commit 5446529

Please sign in to comment.