Skip to content

Commit

Permalink
Pull in hopping window table fix (#412)
Browse files Browse the repository at this point in the history
* robinhood issues 514

* add hopping example

* fix: 'Nonetype' object is not iterable problem

* lint

* remove unused var

* add mock ranges to table tests

* Pull in changes by @thomas-chauvet

* save the popped value as a backup for now

* cleanup and add more tests

* test for ranges when full and empty

* add linting

* remove MagicMock import

---------

Co-authored-by: Don Wong <doncat99@gmail.com>
  • Loading branch information
wbarnha and doncat99 authored Mar 30, 2024
1 parent ed85356 commit adcbc81
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 3 deletions.
33 changes: 30 additions & 3 deletions faust/tables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,40 @@ async def _del_old_keys(self) -> None:
for partition, timestamps in self._partition_timestamps.items():
while timestamps and window.stale(timestamps[0], time.time()):
timestamp = heappop(timestamps)
triggered_windows = [
self._partition_timestamp_keys.get(
(partition, window_range)
) # noqa
for window_range in self._window_ranges(timestamp)
]
keys_to_remove = self._partition_timestamp_keys.pop(
(partition, timestamp), None
)
window_data = {}
if keys_to_remove:
for key in keys_to_remove:
value = self.data.pop(key, None)
await self.on_window_close(key, value)
for windows in triggered_windows:
if windows:
for processed_window in windows:
# we use set to avoid duplicate element in window's data
# window[0] is the window's key
# it is not related to window's timestamp
# windows are in format:
# (key, (window_start, window_end))
window_data.setdefault(processed_window[0], []).extend(
self.data.get(processed_window, [])
)

for key_to_remove in keys_to_remove:
value = self.data.pop(key_to_remove, None)
if key_to_remove[1][0] > self.last_closed_window:
await self.on_window_close(
key_to_remove,
(
window_data[key_to_remove[0]]
if key_to_remove[0] in window_data
else value
),
)
self.last_closed_window = max(
self.last_closed_window,
max(key[1][0] for key in keys_to_remove),
Expand Down
157 changes: 157 additions & 0 deletions tests/unit/tables/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,45 @@ async def test_last_closed_window(self, *, table):
assert table.last_closed_window == 0.0

table.window = Mock(name="window")
self.mock_no_ranges(table)
table._data = {
("boo", (1.1, 1.4)): "BOO",
("moo", (1.4, 1.6)): "MOO",
("faa", (1.9, 2.0)): "FAA",
("bar", (4.1, 4.2)): "BAR",
}
table._partition_timestamps = {
TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
}
table._partition_timestamp_keys = {
(TP1, 2.0): [
("boo", (1.1, 1.4)),
("moo", (1.4, 1.6)),
("faa", (1.9, 2.0)),
],
(TP1, 5.0): [
("bar", (4.1, 4.2)),
],
}

def get_stale(limit):
def is_stale(timestamp, latest_timestamp):
return timestamp < limit

return is_stale

table.window.stale.side_effect = get_stale(4.0)

await table._del_old_keys()

assert table.last_closed_window == 1.9

@pytest.mark.asyncio
async def test_last_closed_window__mock_ranges(self, *, table):
assert table.last_closed_window == 0.0

table.window = Mock(name="window")
self.mock_ranges(table)
table._data = {
("boo", (1.1, 1.4)): "BOO",
("moo", (1.4, 1.6)): "MOO",
Expand Down Expand Up @@ -233,6 +272,64 @@ async def test_del_old_keys(self, *, table):
on_window_close = table._on_window_close = AsyncMock(name="on_window_close")

table.window = Mock(name="window")
self.mock_no_ranges(table)
table._data = {
("boo", (1.1, 1.4)): "BOO",
("moo", (1.4, 1.6)): "MOO",
("faa", (1.9, 2.0)): "FAA",
("bar", (4.1, 4.2)): "BAR",
}
table._partition_timestamps = {
TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
}
table._partition_timestamp_keys = {
(TP1, 2.0): [
("boo", (1.1, 1.4)),
("moo", (1.4, 1.6)),
("faa", (1.9, 2.0)),
],
(TP1, 5.0): [
("bar", (4.1, 4.2)),
],
}

def get_stale(limit):
def is_stale(timestamp, latest_timestamp):
return timestamp < limit

return is_stale

table.window.stale.side_effect = get_stale(4.0)

await table._del_old_keys()

assert table._partition_timestamps[TP1] == [4.0, 5.0, 6.0, 7.0]
assert table.data == {("bar", (4.1, 4.2)): "BAR"}

on_window_close.assert_has_calls(
[
call.__bool__(),
call(("boo", (1.1, 1.4)), "BOO"),
call.__bool__(),
call(("moo", (1.4, 1.6)), "MOO"),
call.__bool__(),
call(("faa", (1.9, 2.0)), "FAA"),
]
)

table.last_closed_window = 8.0
table.window.stale.side_effect = get_stale(6.0)

await table._del_old_keys()

assert not table.data

@pytest.mark.asyncio
async def test_del_old_keys__mock_ranges(self, *, table):
on_window_close = table._on_window_close = AsyncMock(name="on_window_close")

table.window = Mock(name="window")
self.mock_ranges(table)
table._data = {
("boo", (1.1, 1.4)): "BOO",
("moo", (1.4, 1.6)): "MOO",
Expand Down Expand Up @@ -289,6 +386,61 @@ async def test_del_old_keys_non_async_cb(self, *, table):
on_window_close = table._on_window_close = Mock(name="on_window_close")

table.window = Mock(name="window")
self.mock_no_ranges(table)
table._data = {
("boo", (1.1, 1.4)): "BOO",
("moo", (1.4, 1.6)): "MOO",
("faa", (1.9, 2.0)): "FAA",
("bar", (4.1, 4.2)): "BAR",
}
table._partition_timestamps = {
TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
}
table._partition_timestamp_keys = {
(TP1, 2.0): [
("boo", (1.1, 1.4)),
("moo", (1.4, 1.6)),
("faa", (1.9, 2.0)),
],
(TP1, 5.0): [
("bar", (4.1, 4.2)),
],
}

def get_stale(limit):
def is_stale(timestamp, latest_timestamp):
return timestamp < limit

return is_stale

table.window.stale.side_effect = get_stale(4.0)

await table._del_old_keys()

assert table._partition_timestamps[TP1] == [4.0, 5.0, 6.0, 7.0]
assert table.data == {("bar", (4.1, 4.2)): "BAR"}

on_window_close.assert_has_calls(
[
call(("boo", (1.1, 1.4)), "BOO"),
call(("moo", (1.4, 1.6)), "MOO"),
call(("faa", (1.9, 2.0)), "FAA"),
]
)

table.last_closed_window = 8.0
table.window.stale.side_effect = get_stale(6.0)

await table._del_old_keys()

assert not table.data

@pytest.mark.asyncio
async def test_del_old_keys_non_async_cb__mock_ranges(self, *, table):
on_window_close = table._on_window_close = Mock(name="on_window_close")

table.window = Mock(name="window")
self.mock_ranges(table)
table._data = {
("boo", (1.1, 1.4)): "BOO",
("moo", (1.4, 1.6)): "MOO",
Expand Down Expand Up @@ -527,6 +679,11 @@ def mock_ranges(self, table, ranges=[1.1, 1.2, 1.3]): # noqa
table._window_ranges.return_value = ranges
return ranges

def mock_no_ranges(self, table, ranges=[]): # noqa
table._window_ranges = Mock(name="_window_ranges")
table._window_ranges.return_value = ranges
return ranges

def test_relative_now(self, *, table):
event = Mock(name="event", autospec=Event)
table._partition_latest_timestamp[event.message.partition] = 30.3
Expand Down

0 comments on commit adcbc81

Please sign in to comment.