diff --git a/Changelog.md b/Changelog.md index e264156e..e7fb93e1 100644 --- a/Changelog.md +++ b/Changelog.md @@ -12,6 +12,8 @@ This changelog should be updated as part of a PR if the work is worth noting (mo - [#715](https://github.com/timescale/timescaledb-toolkit/pull/715): Fix out-of-bounds indexing error in `state_agg` rollup #### Other notable changes +- [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Add arrow operator support for counter aggregate and time-weighted aggregate interpolated accessors. +- [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Remove experimental versions of interpolated accessors for counter aggregate and time-weighted aggregates. The stable versions introduced in 1.14.0 should be used instead. #### Shout-outs diff --git a/extension/src/counter_agg.rs b/extension/src/counter_agg.rs index f298f9a3..9fdf7ae8 100644 --- a/extension/src/counter_agg.rs +++ b/extension/src/counter_agg.rs @@ -29,6 +29,10 @@ use crate::raw::tstzrange; use crate::raw::bytea; +mod accessors; + +use accessors::{CounterInterpolatedDeltaAccessor, CounterInterpolatedRateAccessor}; + // pg_type! can't handle generics so use a type alias to specify the type for `stats` type PgTypeHackStatsSummary2D = StatsSummary2D; // TODO wrap FlatSummary a la GaugeSummary - requires serialization version bump @@ -619,20 +623,6 @@ fn counter_agg_extrapolated_delta<'a>(summary: CounterSummary<'a>, method: &str) } } -// Public facing interpolated_delta -extension_sql!( - "\n\ - CREATE FUNCTION toolkit_experimental.interpolated_delta(summary countersummary,\n\ - start timestamptz,\n\ - duration interval,\n\ - prev countersummary,\n\ - next countersummary) RETURNS DOUBLE PRECISION\n\ - AS $$\n\ - SELECT interpolated_delta(summary,start,duration,prev,next) $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;\n\ -", - name = "experimental_interpolated_delta", requires=[counter_agg_interpolated_delta] -); - #[pg_extern(name = "interpolated_delta", immutable, parallel_safe)] fn counter_agg_interpolated_delta<'a>( summary: CounterSummary<'a>, @@ -648,6 +638,32 @@ fn counter_agg_interpolated_delta<'a>( .delta() } +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_counter_interpolated_delta<'a>( + sketch: CounterSummary<'a>, + accessor: CounterInterpolatedDeltaAccessor<'a>, +) -> f64 { + let prev = if accessor.flags & 1 == 1 { + Some(accessor.prev.clone().into()) + } else { + None + }; + let next = if accessor.flags & 2 == 2 { + Some(accessor.next.clone().into()) + } else { + None + }; + + counter_agg_interpolated_delta( + sketch, + accessor.timestamp.into(), + accessor.interval.into(), + prev, + next, + ) +} + #[pg_operator(immutable, parallel_safe)] #[opname(->)] pub fn arrow_counter_agg_extrapolated_rate<'a>( @@ -668,21 +684,6 @@ fn counter_agg_extrapolated_rate<'a>(summary: CounterSummary<'a>, method: &str) } } -// Public facing interpolated_rate -extension_sql!( - "\n\ - CREATE FUNCTION toolkit_experimental.interpolated_rate(summary countersummary,\n\ - start timestamptz,\n\ - duration interval,\n\ - prev countersummary,\n\ - next countersummary) RETURNS DOUBLE PRECISION\n\ - AS $$\n\ - SELECT interpolated_rate(summary,start,duration,prev,next) $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;\n\ -", - name = "experimental_interpolated_rate", - requires = [counter_agg_interpolated_rate] -); - #[pg_extern(name = "interpolated_rate", immutable, parallel_safe)] fn counter_agg_interpolated_rate<'a>( summary: CounterSummary<'a>, @@ -698,6 +699,32 @@ fn counter_agg_interpolated_rate<'a>( .rate() } +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_counter_interpolated_rate<'a>( + sketch: CounterSummary<'a>, + accessor: CounterInterpolatedRateAccessor<'a>, +) -> Option { + let prev = if accessor.flags & 1 == 1 { + Some(accessor.prev.clone().into()) + } else { + None + }; + let next = if accessor.flags & 2 == 2 { + Some(accessor.next.clone().into()) + } else { + None + }; + + counter_agg_interpolated_rate( + sketch, + accessor.timestamp.into(), + accessor.interval.into(), + prev, + next, + ) +} + #[pg_operator(immutable, parallel_safe)] #[opname(->)] pub fn arrow_counter_agg_num_elements<'a>( @@ -1335,7 +1362,7 @@ mod tests { let mut deltas = client .update( r#"SELECT - toolkit_experimental.interpolated_delta( + interpolated_delta( agg, bucket, '1 day'::interval, @@ -1369,12 +1396,11 @@ mod tests { ); assert!(deltas.next().is_none()); - // test that the non experimental version also returns the same result + // test that the arrow version also returns the same result let mut deltas = client .update( r#"SELECT - interpolated_delta( - agg, + agg -> interpolated_delta( bucket, '1 day'::interval, LAG(agg) OVER (ORDER BY bucket), @@ -1410,7 +1436,7 @@ mod tests { let mut rates = client .update( r#"SELECT - toolkit_experimental.interpolated_rate( + interpolated_rate( agg, bucket, '1 day'::interval, @@ -1442,14 +1468,13 @@ mod tests { rates.next().unwrap()[1].value().unwrap(), Some((35. + 30. - 27.5) / (16. * 60. * 60.)) ); - - // test that the non experimental version also returns the same result assert!(rates.next().is_none()); + + // test that the arrow operator version also returns the same result let mut rates = client .update( r#"SELECT - interpolated_rate( - agg, + agg -> interpolated_rate( bucket, '1 day'::interval, LAG(agg) OVER (ORDER BY bucket), @@ -1509,7 +1534,7 @@ mod tests { let mut deltas = client .update( r#"SELECT - toolkit_experimental.interpolated_delta( + interpolated_delta( agg, bucket, '1 day'::interval, diff --git a/extension/src/counter_agg/accessors.rs b/extension/src/counter_agg/accessors.rs new file mode 100644 index 00000000..54ec7041 --- /dev/null +++ b/extension/src/counter_agg/accessors.rs @@ -0,0 +1,91 @@ +use pgx::*; + +use crate::{ + counter_agg::{CounterSummary, CounterSummaryData, MetricSummary}, + datum_utils::interval_to_ms, + pg_type, ron_inout_funcs, +}; + +use tspoint::TSPoint; + +pg_type! { + struct CounterInterpolatedRateAccessor { + timestamp : i64, + interval : i64, + prev : CounterSummaryData, + next : CounterSummaryData, + flags : u64, + } +} + +ron_inout_funcs!(CounterInterpolatedRateAccessor); + +#[pg_extern(immutable, parallel_safe, name = "interpolated_rate")] +fn counter_interpolated_rate_accessor<'a>( + start: crate::raw::TimestampTz, + duration: crate::raw::Interval, + prev: Option>, + next: Option>, +) -> CounterInterpolatedRateAccessor<'static> { + fn empty_summary<'b>() -> Option> { + let tmp = TSPoint { ts: 0, val: 0.0 }; + let tmp = MetricSummary::new(&tmp, None); + let tmp = CounterSummary::from_internal_counter_summary(tmp); + Some(tmp) + } + + let flags = u64::from(prev.is_some()) + if next.is_some() { 2 } else { 0 }; + let prev = prev.or_else(empty_summary).unwrap().0; + let next = next.or_else(empty_summary).unwrap().0; + let interval = interval_to_ms(&start, &duration); + crate::build! { + CounterInterpolatedRateAccessor { + timestamp : start.into(), + interval, + prev, + next, + flags, + } + } +} + +pg_type! { + struct CounterInterpolatedDeltaAccessor { + timestamp : i64, + interval : i64, + prev : CounterSummaryData, + next : CounterSummaryData, + flags : u64, + } +} + +ron_inout_funcs!(CounterInterpolatedDeltaAccessor); + +#[pg_extern(immutable, parallel_safe, name = "interpolated_delta")] +fn counter_interpolated_delta_accessor<'a>( + start: crate::raw::TimestampTz, + duration: crate::raw::Interval, + prev: Option>, + next: Option>, +) -> CounterInterpolatedDeltaAccessor<'static> { + fn empty_summary<'b>() -> Option> { + let tmp = TSPoint { ts: 0, val: 0.0 }; + let tmp = MetricSummary::new(&tmp, None); + let tmp = CounterSummary::from_internal_counter_summary(tmp); + Some(tmp) + } + + let flags = u64::from(prev.is_some()) + if next.is_some() { 2 } else { 0 }; + let prev = prev.or_else(empty_summary).unwrap().0; + let next = next.or_else(empty_summary).unwrap().0; + let interval = interval_to_ms(&start, &duration); + crate::build! { + CounterInterpolatedDeltaAccessor { + timestamp : start.into(), + interval, + prev, + next, + flags, + } + } +} diff --git a/extension/src/stabilization_info.rs b/extension/src/stabilization_info.rs index 5259d894..1fc8dd3d 100644 --- a/extension/src/stabilization_info.rs +++ b/extension/src/stabilization_info.rs @@ -11,6 +11,20 @@ crate::functions_stabilized_at! { STABLE_FUNCTIONS + "1.15.0" => { + arrow_counter_interpolated_delta(countersummary,counterinterpolateddeltaaccessor), + arrow_counter_interpolated_rate(countersummary,counterinterpolatedrateaccessor), + arrow_time_weighted_average_interpolated_average(timeweightsummary,timeweightinterpolatedaverageaccessor), + counterinterpolateddeltaaccessor_in(cstring), + counterinterpolateddeltaaccessor_out(counterinterpolateddeltaaccessor), + counterinterpolatedrateaccessor_in(cstring), + counterinterpolatedrateaccessor_out(counterinterpolatedrateaccessor), + interpolated_average(timestamp with time zone,interval,timeweightsummary,timeweightsummary), + interpolated_delta(timestamp with time zone,interval,countersummary,countersummary), + interpolated_rate(timestamp with time zone,interval,countersummary,countersummary), + timeweightinterpolatedaverageaccessor_in(cstring), + timeweightinterpolatedaverageaccessor_out(timeweightinterpolatedaverageaccessor), + } "1.14.0" => { interpolated_average(timeweightsummary,timestamp with time zone,interval,timeweightsummary,timeweightsummary), interpolated_delta(countersummary,timestamp with time zone,interval,countersummary,countersummary), @@ -492,6 +506,11 @@ crate::functions_stabilized_at! { crate::types_stabilized_at! { STABLE_TYPES + "1.15.0" => { + counterinterpolateddeltaaccessor, + counterinterpolatedrateaccessor, + timeweightinterpolatedaverageaccessor, + } "1.14.0" => { candlestick, accessorclose, @@ -581,6 +600,11 @@ crate::types_stabilized_at! { crate::operators_stabilized_at! { STABLE_OPERATORS + "1.15.0" => { + "->"(countersummary,counterinterpolateddeltaaccessor), + "->"(countersummary,counterinterpolatedrateaccessor), + "->"(timeweightsummary,timeweightinterpolatedaverageaccessor), + } "1.14.0" => { "->"(candlestick,accessorclose), "->"(candlestick,accessorclosetime), diff --git a/extension/src/time_weighted_average.rs b/extension/src/time_weighted_average.rs index f52d0a11..8477cc64 100644 --- a/extension/src/time_weighted_average.rs +++ b/extension/src/time_weighted_average.rs @@ -23,6 +23,10 @@ use time_weighted_average::{ use crate::raw::bytea; +mod accessors; + +use accessors::TimeWeightInterpolatedAverageAccessor; + pg_type! { #[derive(Debug)] struct TimeWeightSummary { @@ -488,20 +492,6 @@ fn interpolate<'a>( } } -// Public facing interpolated_average -extension_sql!( - "\n\ - CREATE FUNCTION toolkit_experimental.interpolated_average(tws timeweightsummary,\n\ - start timestamptz,\n\ - duration interval,\n\ - prev timeweightsummary,\n\ - next timeweightsummary) RETURNS DOUBLE PRECISION\n\ - AS $$\n\ - SELECT interpolated_average(tws,start,duration,prev,next) $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;\n\ -", - name = "experimental_interpolated_average", requires = [time_weighted_average_interpolated_average] -); - #[pg_extern(immutable, parallel_safe, name = "interpolated_average")] pub fn time_weighted_average_interpolated_average<'a>( tws: Option>, @@ -514,6 +504,32 @@ pub fn time_weighted_average_interpolated_average<'a>( time_weighted_average_average(target) } +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_time_weighted_average_interpolated_average<'a>( + tws: Option>, + accessor: TimeWeightInterpolatedAverageAccessor<'a>, +) -> Option { + let prev = if accessor.flags & 1 == 1 { + Some(accessor.prev.clone().into()) + } else { + None + }; + let next = if accessor.flags & 2 == 2 { + Some(accessor.next.clone().into()) + } else { + None + }; + + time_weighted_average_interpolated_average( + tws, + accessor.timestamp.into(), + accessor.interval.into(), + prev, + next, + ) +} + #[pg_extern( immutable, parallel_safe, @@ -840,11 +856,11 @@ mod tests { None, ) .unwrap(); - // test experimental version - let mut experimental_averages = client + + let mut averages = client .update( r#"SELECT - toolkit_experimental.interpolated_average( + interpolated_average( agg, bucket, '1 day'::interval, @@ -860,12 +876,11 @@ mod tests { None, ) .unwrap(); - // test non_experimental version - let mut averages = client + // test arrow version + let mut arrow_averages = client .update( r#"SELECT - interpolated_average( - agg, + agg -> interpolated_average( bucket, '1 day'::interval, LAG(agg) OVER (ORDER BY bucket), @@ -922,35 +937,35 @@ mod tests { .unwrap(); // Day 1, 4 hours @ 10, 4 @ 40, 8 @ 20 - let result = experimental_averages.next().unwrap()[1].value().unwrap(); + let result = averages.next().unwrap()[1].value().unwrap(); assert_eq!(result, Some((4. * 10. + 4. * 40. + 8. * 20.) / 16.)); - assert_eq!(result, averages.next().unwrap()[1].value().unwrap()); + assert_eq!(result, arrow_averages.next().unwrap()[1].value().unwrap()); assert_eq!( integrals.next().unwrap()[1].value().unwrap(), Some(4. * 10. + 4. * 40. + 8. * 20.) ); // Day 2, 2 hours @ 20, 10 @ 15, 8 @ 50, 4 @ 25 - let result = experimental_averages.next().unwrap()[1].value().unwrap(); + let result = averages.next().unwrap()[1].value().unwrap(); assert_eq!( result, Some((2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.) / 24.) ); - assert_eq!(result, averages.next().unwrap()[1].value().unwrap()); + assert_eq!(result, arrow_averages.next().unwrap()[1].value().unwrap()); assert_eq!( integrals.next().unwrap()[1].value().unwrap(), Some(2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.) ); // Day 3, 10 hours @ 25, 2 @ 30, 4 @ 0 - let result = experimental_averages.next().unwrap()[1].value().unwrap(); + let result = averages.next().unwrap()[1].value().unwrap(); assert_eq!(result, Some((10. * 25. + 2. * 30.) / 16.)); - assert_eq!(result, averages.next().unwrap()[1].value().unwrap()); + assert_eq!(result, arrow_averages.next().unwrap()[1].value().unwrap()); assert_eq!( integrals.next().unwrap()[1].value().unwrap(), Some(10. * 25. + 2. * 30.) ); - assert!(experimental_averages.next().is_none()); assert!(averages.next().is_none()); + assert!(arrow_averages.next().is_none()); assert!(integrals.next().is_none()); }); } diff --git a/extension/src/time_weighted_average/accessors.rs b/extension/src/time_weighted_average/accessors.rs new file mode 100644 index 00000000..60667dcc --- /dev/null +++ b/extension/src/time_weighted_average/accessors.rs @@ -0,0 +1,56 @@ +use pgx::*; + +use crate::{ + datum_utils::interval_to_ms, + flatten, pg_type, ron_inout_funcs, + time_weighted_average::{TimeWeightMethod, TimeWeightSummary, TimeWeightSummaryData}, +}; + +use tspoint::TSPoint; + +pg_type! { + struct TimeWeightInterpolatedAverageAccessor { + timestamp : i64, + interval : i64, + prev : TimeWeightSummaryData, + pad : [u8;3], + flags : u32, + next : TimeWeightSummaryData, + } +} + +ron_inout_funcs!(TimeWeightInterpolatedAverageAccessor); + +#[pg_extern(immutable, parallel_safe, name = "interpolated_average")] +fn time_weight_interpolated_average_accessor<'a>( + start: crate::raw::TimestampTz, + duration: crate::raw::Interval, + prev: Option>, + next: Option>, +) -> TimeWeightInterpolatedAverageAccessor<'static> { + fn empty_summary<'b>() -> Option> { + Some(unsafe { + flatten!(TimeWeightSummary { + first: TSPoint { ts: 0, val: 0.0 }, + last: TSPoint { ts: 0, val: 0.0 }, + weighted_sum: 0.0, + method: TimeWeightMethod::LOCF, + }) + }) + } + + let flags = u32::from(prev.is_some()) + if next.is_some() { 2 } else { 0 }; + let prev = prev.or_else(empty_summary).unwrap().0; + let next = next.or_else(empty_summary).unwrap().0; + let interval = interval_to_ms(&start, &duration); + crate::build! { + TimeWeightInterpolatedAverageAccessor { + timestamp : start.into(), + interval, + prev, + pad : [0,0,0], + flags, + next, + } + } +}