Skip to content

Commit

Permalink
Merge #740
Browse files Browse the repository at this point in the history
740: Fix right boundary when interpolating 'locf' TWA r=WireBaron a=WireBaron

This change will make it so a time weighted average with the locf method will include the range between the last data point and the interpolation boundary, even when there is no next aggregate to interpolate.

Note that the behavior here might not be entirely intuitive, so we need to make sure we update our docs to describe our exact behavior.

Fixes #732 

Co-authored-by: Brian Rowe <brian@timescale.com>
  • Loading branch information
bors[bot] and Brian Rowe authored Mar 28, 2023
2 parents c158764 + cc17f8d commit 0a8cb9c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo

#### Bug fixes
- [#733](https://github.com/timescale/timescaledb-toolkit/pull/733): Fix a bug when rolling up overlapping heartbeat_aggs
- [#740](https://github.com/timescale/timescaledb-toolkit/pull/740): When interpolating an 'locf' time weighted average, extend last point to interpolation boundary

#### Other notable changes

Expand Down
39 changes: 30 additions & 9 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,23 @@ impl<'input> TimeWeightSummary<'input> {
}
_ => self.first,
};
let new_end = match next {
Some(next) => {
let new_end = match (self.method, next) {
(_, Some(next)) => {
let new_end = self
.method
.interpolate(self.last, Some(next.first), end)
.expect("unable to interpolate end of interval");
new_sum += self.method.weighted_sum(self.last, new_end);
new_end
}
(TimeWeightMethod::LOCF, None) => {
let new_end = self
.method
.interpolate(self.last, None, end)
.expect("unable to interpolate end of interval");
new_sum += self.method.weighted_sum(self.last, new_end);
new_end
}
_ => self.last,
};

Expand Down Expand Up @@ -492,8 +500,8 @@ pub fn time_weighted_average_interpolated_average<'a>(
tws: Option<TimeWeightSummary<'a>>,
start: crate::raw::TimestampTz,
duration: crate::raw::Interval,
prev: Option<TimeWeightSummary<'a>>,
next: Option<TimeWeightSummary<'a>>,
prev: default!(Option<TimeWeightSummary<'a>>, "NULL"),
next: default!(Option<TimeWeightSummary<'a>>, "NULL"),
) -> Option<f64> {
let target = interpolate(tws, start, duration, prev, next);
time_weighted_average_average(target)
Expand Down Expand Up @@ -530,8 +538,8 @@ pub fn time_weighted_average_interpolated_integral<'a>(
tws: Option<TimeWeightSummary<'a>>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary<'a>>,
next: Option<TimeWeightSummary<'a>>,
prev: default!(Option<TimeWeightSummary<'a>>, "NULL"),
next: default!(Option<TimeWeightSummary<'a>>, "NULL"),
unit: default!(String, "'second'"),
) -> Option<f64> {
let target = interpolate(tws, start, interval, prev, next);
Expand Down Expand Up @@ -984,17 +992,30 @@ mod tests {
integrals.next().unwrap()[1].value().unwrap(),
Some(2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.)
);
// Day 3, 10 hours @ 25, 2 @ 30, 4 @ 0
// Day 3, 10 hours @ 25, 2 @ 30, 4 @ 0, 8 @ 35
let result = averages.next().unwrap()[1].value().unwrap();
assert_eq!(result, Some((10. * 25. + 2. * 30.) / 16.));
assert_eq!(result, Some((10. * 25. + 2. * 30. + 8. * 35.) / 24.));
assert_eq!(result, arrow_averages.next().unwrap()[1].value().unwrap());
assert_eq!(
integrals.next().unwrap()[1].value().unwrap(),
Some(10. * 25. + 2. * 30.)
Some(10. * 25. + 2. * 30. + 8. * 35.)
);
assert!(averages.next().is_none());
assert!(arrow_averages.next().is_none());
assert!(integrals.next().is_none());
});
}

#[pg_test]
fn test_locf_interpolation_to_null() {
Spi::connect(|mut client| {
let stmt =
"SELECT interpolated_average(time_weight('locf', '2020-01-01 20:00:00+00', 100),
'2020-01-01 00:00:00+00', '1d')";
assert_eq!(select_one!(client, stmt, f64), 100.0);
let stmt = "SELECT time_weight('locf', '2020-01-01 20:00:00+00', 100)
-> interpolated_integral('2020-01-01 00:00:00+00', '1d')";
assert_eq!(select_one!(client, stmt, f64), 1440000.0);
});
}
}
8 changes: 4 additions & 4 deletions extension/src/time_weighted_average/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ ron_inout_funcs!(TimeWeightInterpolatedAverageAccessor);
fn time_weight_interpolated_average_accessor<'a>(
start: crate::raw::TimestampTz,
duration: crate::raw::Interval,
prev: Option<TimeWeightSummary<'a>>,
next: Option<TimeWeightSummary<'a>>,
prev: default!(Option<TimeWeightSummary<'a>>, "NULL"),
next: default!(Option<TimeWeightSummary<'a>>, "NULL"),
) -> TimeWeightInterpolatedAverageAccessor<'static> {
fn empty_summary<'b>() -> Option<TimeWeightSummary<'b>> {
Some(unsafe {
Expand Down Expand Up @@ -75,8 +75,8 @@ ron_inout_funcs!(TimeWeightInterpolatedIntegralAccessor);
fn time_weight_interpolated_integral_accessor<'a>(
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary<'a>>,
next: Option<TimeWeightSummary<'a>>,
prev: default!(Option<TimeWeightSummary<'a>>, "NULL"),
next: default!(Option<TimeWeightSummary<'a>>, "NULL"),
unit: default!(String, "'second'"),
) -> TimeWeightInterpolatedIntegralAccessor<'static> {
fn empty_summary<'b>() -> Option<TimeWeightSummary<'b>> {
Expand Down

0 comments on commit 0a8cb9c

Please sign in to comment.