-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a wait_for_samples
method to the MovingWindow
#1159
base: v1.x.x
Are you sure you want to change the base?
Conversation
Args: | ||
since: The timestamp from which to start counting. If `None`, the oldest | ||
timestamp in the buffer is used. | ||
until: The timestamp until which to count. If `None`, the newest timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better clarify that until
is inclusive since usually the end timestamp (or index) is exclusive in python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
raise ValueError( | ||
"The number of samples to wait for must be greater than 0." | ||
) | ||
if n > self.capacity: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically I don't see why this is required but also don't have a good example where it hurts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are counting the number of items in the buffer, for reporting. If we have to support n > capacity, we'll need a different implementation. But I couldn't think of a usecase for allowing n > capacity.
start_timestamp = self.newest_timestamp | ||
if n < self.capacity: | ||
n += self.count_valid(since=start_timestamp) | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with the Condition concept and cannot follow what is happening here, maybe you could add some explanatory comments here and in the tests below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a few lines, hope that's sufficient.
@@ -318,6 +320,34 @@ def window( | |||
start, end, force_copy=force_copy, fill_value=fill_value | |||
) | |||
|
|||
async def wait_for_samples(self, n: int) -> None: | |||
"""Wait until the next `n` samples are available in the MovingWindow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is valid samples? If so, I am actually not sure if we want this or something time-based i.e. allow that not all samples are valid. However, for our current use-case this also works since we would set n=1 anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's interesting. valid means that any data was received. If a component is missing data, resampler will send None
and that is not a valid value.
If a component is sending only None
, should this function return after n
None
s are received? I'm guessing it should?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in many scenarios I wouldn't distinguish between missing or None values. I think it shouldn't return after n Nones but after n
new time steps which have at least 1 real value. However, we could leave this also for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated it to return after n
samples are received. Whether they were valid or not needs to be checked with a call to count_valid
. I've also updated the docs to state this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see the confusion, I understood that it triggers when n
new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.
This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe:
"""Wait until the next `n` samples have been received in the MovingWindow.
This function returns after `n` input samples have been received, without considering
whether the received samples are valid or which timestamp they have. The validity of
the samples in the updated moving window can be verified by calling the
[`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it would wait until there are n
output samples, but some output samples could be nan.
It does consider the timestamps when the samples are received. It expects n
"new" samples to be available in the buffer before it returns.
The current tests only cover cases where there is no resampling in the moving window. I'll rectify that.
It retains the original behaviour of counting all the valid samples in the buffer when no time range is specified. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
It retains the original behaviour of counting all the valid samples in the buffer when no time range is specified. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
a001bf7
to
f94c4fa
Compare
@@ -318,6 +320,34 @@ def window( | |||
start, end, force_copy=force_copy, fill_value=fill_value | |||
) | |||
|
|||
async def wait_for_samples(self, n: int) -> None: | |||
"""Wait until the next `n` samples are available in the MovingWindow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see the confusion, I understood that it triggers when n
new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.
This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.
@@ -318,6 +320,34 @@ def window( | |||
start, end, force_copy=force_copy, fill_value=fill_value | |||
) | |||
|
|||
async def wait_for_samples(self, n: int) -> None: | |||
"""Wait until the next `n` samples are available in the MovingWindow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe:
"""Wait until the next `n` samples have been received in the MovingWindow.
This function returns after `n` input samples have been received, without considering
whether the received samples are valid or which timestamp they have. The validity of
the samples in the updated moving window can be verified by calling the
[`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.
await push_logical_meter_data(sender, range(0, 5)) | ||
await asyncio.sleep(0) | ||
# After pushing 5 values, the `wait_for_samples` task should be done. | ||
assert task.done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now looking at the indended usage example here, would it be possible to wrap this into a receiver that sends the moving window content each time the method triggers? We could implement this in downstream apps of course, but I guess in the end this would be the pattern we would use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, can do that.
Closes #967