Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a wait_for_samples method to the MovingWindow #1159

Merged
merged 5 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- The `MovingWindow` now has an async `wait_for_samples` method that waits for a given number of samples to become available in the moving window and then returns.

## Bug Fixes

Expand Down
82 changes: 74 additions & 8 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def __init__( # pylint: disable=too-many-arguments
align_to=align_to,
)

self._condition_new_sample = asyncio.Condition()

def start(self) -> None:
"""Start the MovingWindow.

Expand Down Expand Up @@ -318,6 +320,44 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
llucax marked this conversation as resolved.
Show resolved Hide resolved
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is valid samples? If so, I am actually not sure if we want this or something time-based i.e. allow that not all samples are valid. However, for our current use-case this also works since we would set n=1 anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. valid means that any data was received. If a component is missing data, resampler will send None and that is not a valid value.

If a component is sending only None, should this function return after n Nones are received? I'm guessing it should?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in many scenarios I wouldn't distinguish between missing or None values. I think it shouldn't return after n Nones but after n new time steps which have at least 1 real value. However, we could leave this also for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it to return after n samples are received. Whether they were valid or not needs to be checked with a call to count_valid. I've also updated the docs to state this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see the confusion, I understood that it triggers when n new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.

This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

       """Wait until the next `n` samples have been received in the MovingWindow.

        This function returns after `n` input samples have been received, without considering
        whether the received samples are valid or which timestamp they have. The validity of 
        the samples in the updated moving window can be verified by calling the
        [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it would wait until there are n output samples, but some output samples could be nan.

It does consider the timestamps when the samples are received. It expects n "new" samples to be available in the buffer before it returns.

The current tests only cover cases where there is no resampling in the moving window. I'll rectify that.

Copy link
Contributor Author

@shsms shsms Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to clarify that it returns after n new samples in the moving window, and not just input samples, as discussed.


This function returns after `n` new samples are available in the MovingWindow,
without considering whether the new samples are valid. The validity of the
samples can be verified by calling the
[`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

Args:
n: The number of samples to wait for.

Raises:
ValueError: If `n` is less than or equal to 0 or greater than the capacity
of the MovingWindow.
"""
if n == 0:
return
if n < 0:
raise ValueError("The number of samples to wait for must be 0 or greater.")
if n > self.capacity:
shsms marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"The number of samples to wait for must be less than or equal to the "
+ f"capacity of the MovingWindow ({self.capacity})."
)
Comment on lines +343 to +346
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also just silently wait for self.capacity instead, like slicing when you do [:10] for an array that has less than 10 items. It can be more confusing if you wanted to wait for 10 and got 5 instead but it is something at least familiar in Python. Again maybe @cwasicki can hint which approach would be more intuitive for data scientists.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of it that if n > capacity the n would still be respected, but calculated in terms of time steps since this was triggered. But I think the different understanding here shows already that this could be confusing and we can limit it until we have a case where we need it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, true, best to start with a safe approach, for my behaviour you can still easily get is via mw.wait_for_samples(min(n, mw.capacity)) I guess, so it doesn't look like it is adding too much value.

start_timestamp = (
# Start from the next expected timestamp.
self.newest_timestamp + self.sampling_period
if self.newest_timestamp is not None
else None
)
while True:
shsms marked this conversation as resolved.
Show resolved Hide resolved
async with self._condition_new_sample:
# Every time a new sample is received, this condition gets notified and
# will wake up.
_ = await self._condition_new_sample.wait()
Comment on lines +354 to +357
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, and I don't even think we need to think about it for this PR, but maybe this could be done more efficiently by reversing the logic, and only set the condition when the counter is set (it could be even be a simple Event in this case). This would mean we would need to save the "waiters" in the instance, and then notify the appropriate waiter, so maybe that's a bit costly too, but I guess the most common case will be having only one or very few waiters. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most common case is also with n=1 and resampling_interval=15 minutes.

if self.count_covered(since=start_timestamp) >= n:
return

async def _run_impl(self) -> None:
"""Awaits samples from the receiver and updates the underlying ring buffer.

Expand All @@ -331,6 +371,9 @@ async def _run_impl(self) -> None:
await self._resampler_sender.send(sample)
else:
self._buffer.update(sample)
async with self._condition_new_sample:
# Wake up all coroutines waiting for new samples.
self._condition_new_sample.notify_all()

except asyncio.CancelledError:
_logger.info("MovingWindow task has been cancelled.")
Expand All @@ -343,8 +386,10 @@ def _configure_resampler(self) -> None:
assert self._resampler is not None

async def sink_buffer(sample: Sample[Quantity]) -> None:
if sample.value is not None:
self._buffer.update(sample)
self._buffer.update(sample)
async with self._condition_new_sample:
# Wake up all coroutines waiting for new samples.
self._condition_new_sample.notify_all()

resampler_channel = Broadcast[Sample[Quantity]](name="average")
self._resampler_sender = resampler_channel.new_sender()
Expand All @@ -355,23 +400,44 @@ async def sink_buffer(sample: Sample[Quantity]) -> None:
asyncio.create_task(self._resampler.resample(), name="resample")
)

def count_valid(self) -> int:
"""
Count the number of valid samples in this `MovingWindow`.
def count_valid(
self, *, since: datetime | None = None, until: datetime | None = None
) -> int:
"""Count the number of valid samples in this `MovingWindow`.

If `since` and `until` are provided, the count is limited to the samples between
(and including) the given timestamps.

Args:
since: The timestamp from which to start counting. If `None`, the oldest
timestamp of the buffer is used.
until: The timestamp until (and including) which to count. If `None`, the
newest timestamp of the buffer is used.

Returns:
The number of valid samples in this `MovingWindow`.
"""
return self._buffer.count_valid()
return self._buffer.count_valid(since=since, until=until)

def count_covered(self) -> int:
def count_covered(
self, *, since: datetime | None = None, until: datetime | None = None
) -> int:
"""Count the number of samples that are covered by the oldest and newest valid samples.

If `since` and `until` are provided, the count is limited to the samples between
(and including) the given timestamps.

Args:
since: The timestamp from which to start counting. If `None`, the oldest
timestamp of the buffer is used.
until: The timestamp until (and including) which to count. If `None`, the
newest timestamp of the buffer is used.

Returns:
The count of samples between the oldest and newest (inclusive) valid samples
or 0 if there are is no time range covered.
"""
return self._buffer.count_covered()
return self._buffer.count_covered(since=since, until=until)

@overload
def __getitem__(self, key: SupportsIndex) -> float:
Expand Down
70 changes: 59 additions & 11 deletions src/frequenz/sdk/timeseries/_ringbuffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,20 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr
"""
return self._buffer.__getitem__(index_or_slice)

def _covered_time_range(self) -> timedelta:
def _covered_time_range(
self, since: datetime | None = None, until: datetime | None = None
) -> timedelta:
"""Return the time range that is covered by the oldest and newest valid samples.

If `since` and `until` are provided, the time range is limited to the items
between (and including) the given timestamps.

Args:
since: The timestamp from which to start counting. If `None`, the oldest
timestamp in the buffer is used.
until: The timestamp until (and including) which to count. If `None`, the
newest timestamp in the buffer is used.

Returns:
The time range between the oldest and newest valid samples or 0 if
there are is no time range covered.
Expand All @@ -664,45 +675,82 @@ def _covered_time_range(self) -> timedelta:
assert (
self.newest_timestamp is not None
), "Newest timestamp cannot be None here."
return self.newest_timestamp - self.oldest_timestamp + self._sampling_period

def count_covered(self) -> int:
if since is None or since < self.oldest_timestamp:
since = self.oldest_timestamp
if until is None or until > self.newest_timestamp:
until = self.newest_timestamp

if until < since:
return timedelta(0)

return until - since + self._sampling_period

def count_covered(
self, *, since: datetime | None = None, until: datetime | None = None
) -> int:
"""Count the number of samples that are covered by the oldest and newest valid samples.

If `since` and `until` are provided, the count is limited to the items between
(and including) the given timestamps.

Args:
since: The timestamp from which to start counting. If `None`, the oldest
timestamp in the buffer is used.
until: The timestamp until (and including) which to count. If `None`, the
newest timestamp in the buffer is used.

Returns:
The count of samples between the oldest and newest (inclusive) valid samples
or 0 if there are is no time range covered.
"""
return int(
self._covered_time_range().total_seconds()
self._covered_time_range(since, until).total_seconds()
// self._sampling_period.total_seconds()
)

def count_valid(self) -> int:
"""Count the number of valid items that this buffer currently holds.
def count_valid(
self, *, since: datetime | None = None, until: datetime | None = None
) -> int:
"""Count the number of valid items in this buffer.

If `since` and `until` are provided, the count is limited to the items between
(and including) the given timestamps.

Args:
since: The timestamp from which to start counting. If `None`, the oldest
timestamp in the buffer is used.
until: The timestamp until (and including) which to count. If `None`, the
newest timestamp in the buffer is used.

Returns:
The number of valid items in this buffer.
"""
if self._timestamp_newest == self._TIMESTAMP_MIN:
if since is None or since < self._timestamp_oldest:
since = self._timestamp_oldest
if until is None or until > self._timestamp_newest:
until = self._timestamp_newest

if until == self._TIMESTAMP_MIN or until < since:
return 0

# Sum of all elements in the gap ranges
sum_missing_entries = max(
0,
sum(
(
gap.end
min(gap.end, until + self._sampling_period)
# Don't look further back than oldest timestamp
- max(gap.start, self._timestamp_oldest)
- max(gap.start, since)
)
// self._sampling_period
for gap in self._gaps
if gap.start <= until and gap.end >= since
),
)

start_pos = self.to_internal_index(self._timestamp_oldest)
end_pos = self.to_internal_index(self._timestamp_newest)
start_pos = self.to_internal_index(since)
end_pos = self.to_internal_index(until)

if end_pos < start_pos:
return len(self._buffer) - start_pos + end_pos + 1 - sum_missing_entries
Expand Down
Loading
Loading