Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a wait_for_samples method to the MovingWindow #1159

Open
wants to merge 4 commits into
base: v1.x.x
Choose a base branch
from

Conversation

shsms
Copy link
Contributor

@shsms shsms commented Feb 4, 2025

Closes #967

@shsms shsms requested a review from a team as a code owner February 4, 2025 14:20
@shsms shsms requested review from daniel-zullo-frequenz and removed request for a team February 4, 2025 14:20
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:data-pipeline Affects the data pipeline labels Feb 4, 2025
Args:
since: The timestamp from which to start counting. If `None`, the oldest
timestamp in the buffer is used.
until: The timestamp until which to count. If `None`, the newest timestamp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better clarify that until is inclusive since usually the end timestamp (or index) is exclusive in python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

raise ValueError(
"The number of samples to wait for must be greater than 0."
)
if n > self.capacity:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically I don't see why this is required but also don't have a good example where it hurts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are counting the number of items in the buffer, for reporting. If we have to support n > capacity, we'll need a different implementation. But I couldn't think of a usecase for allowing n > capacity.

start_timestamp = self.newest_timestamp
if n < self.capacity:
n += self.count_valid(since=start_timestamp)
while True:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with the Condition concept and cannot follow what is happening here, maybe you could add some explanatory comments here and in the tests below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a few lines, hope that's sufficient.

@@ -318,6 +320,34 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is valid samples? If so, I am actually not sure if we want this or something time-based i.e. allow that not all samples are valid. However, for our current use-case this also works since we would set n=1 anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. valid means that any data was received. If a component is missing data, resampler will send None and that is not a valid value.

If a component is sending only None, should this function return after n Nones are received? I'm guessing it should?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in many scenarios I wouldn't distinguish between missing or None values. I think it shouldn't return after n Nones but after n new time steps which have at least 1 real value. However, we could leave this also for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it to return after n samples are received. Whether they were valid or not needs to be checked with a call to count_valid. I've also updated the docs to state this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see the confusion, I understood that it triggers when n new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.

This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

       """Wait until the next `n` samples have been received in the MovingWindow.

        This function returns after `n` input samples have been received, without considering
        whether the received samples are valid or which timestamp they have. The validity of 
        the samples in the updated moving window can be verified by calling the
        [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it would wait until there are n output samples, but some output samples could be nan.

It does consider the timestamps when the samples are received. It expects n "new" samples to be available in the buffer before it returns.

The current tests only cover cases where there is no resampling in the moving window. I'll rectify that.

It retains the original behaviour of counting all the valid samples in
the buffer when no time range is specified.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
It retains the original behaviour of counting all the valid samples in
the buffer when no time range is specified.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
@shsms shsms force-pushed the moving-window-samples-trigger branch from a001bf7 to f94c4fa Compare February 10, 2025 16:33
@shsms shsms requested a review from cwasicki February 10, 2025 16:36
@@ -318,6 +320,34 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see the confusion, I understood that it triggers when n new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.

This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.

@@ -318,6 +320,34 @@ def window(
start, end, force_copy=force_copy, fill_value=fill_value
)

async def wait_for_samples(self, n: int) -> None:
"""Wait until the next `n` samples are available in the MovingWindow.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

       """Wait until the next `n` samples have been received in the MovingWindow.

        This function returns after `n` input samples have been received, without considering
        whether the received samples are valid or which timestamp they have. The validity of 
        the samples in the updated moving window can be verified by calling the
        [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

await push_logical_meter_data(sender, range(0, 5))
await asyncio.sleep(0)
# After pushing 5 values, the `wait_for_samples` task should be done.
assert task.done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now looking at the indended usage example here, would it be possible to wrap this into a receiver that sends the moving window content each time the method triggers? We could implement this in downstream apps of course, but I guess in the end this would be the pattern we would use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, can do that.

@shsms shsms requested a review from cwasicki February 11, 2025 08:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests
Projects
Status: To do
Development

Successfully merging this pull request may close these issues.

[MovingWindow] Add a trigger that fires after having received a fixed number of samples
2 participants