Skip to content

Commit

Permalink
Adding arrow operators for stable interpolating functions
Browse files Browse the repository at this point in the history
Also removing experimental copies of stable interpolation functions.
  • Loading branch information
Brian Rowe committed Mar 1, 2023
1 parent f697f0b commit be415b7
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 67 deletions.
2 changes: 2 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
- [#715](https://github.com/timescale/timescaledb-toolkit/pull/715): Fix out-of-bounds indexing error in `state_agg` rollup

#### Other notable changes
- [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Add arrow operator support for counter aggregate and time-weighted aggregate interpolated accessors.
- [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Remove experimental versions of interpolated accessors for counter aggregate and time-weighted aggregates. The stable versions introduced in 1.14.0 should be used instead.

#### Shout-outs

Expand Down
103 changes: 64 additions & 39 deletions extension/src/counter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ use crate::raw::tstzrange;

use crate::raw::bytea;

mod accessors;

use accessors::{CounterInterpolatedDeltaAccessor, CounterInterpolatedRateAccessor};

// pg_type! can't handle generics so use a type alias to specify the type for `stats`
type PgTypeHackStatsSummary2D = StatsSummary2D<f64>;
// TODO wrap FlatSummary a la GaugeSummary - requires serialization version bump
Expand Down Expand Up @@ -619,20 +623,6 @@ fn counter_agg_extrapolated_delta<'a>(summary: CounterSummary<'a>, method: &str)
}
}

// Public facing interpolated_delta
extension_sql!(
"\n\
CREATE FUNCTION toolkit_experimental.interpolated_delta(summary countersummary,\n\
start timestamptz,\n\
duration interval,\n\
prev countersummary,\n\
next countersummary) RETURNS DOUBLE PRECISION\n\
AS $$\n\
SELECT interpolated_delta(summary,start,duration,prev,next) $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;\n\
",
name = "experimental_interpolated_delta", requires=[counter_agg_interpolated_delta]
);

#[pg_extern(name = "interpolated_delta", immutable, parallel_safe)]
fn counter_agg_interpolated_delta<'a>(
summary: CounterSummary<'a>,
Expand All @@ -648,6 +638,32 @@ fn counter_agg_interpolated_delta<'a>(
.delta()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_counter_interpolated_delta<'a>(
sketch: CounterSummary<'a>,
accessor: CounterInterpolatedDeltaAccessor<'a>,
) -> f64 {
let prev = if accessor.flags & 1 == 1 {
Some(accessor.prev.clone().into())
} else {
None
};
let next = if accessor.flags & 2 == 2 {
Some(accessor.next.clone().into())
} else {
None
};

counter_agg_interpolated_delta(
sketch,
accessor.timestamp.into(),
accessor.interval.into(),
prev,
next,
)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_counter_agg_extrapolated_rate<'a>(
Expand All @@ -668,21 +684,6 @@ fn counter_agg_extrapolated_rate<'a>(summary: CounterSummary<'a>, method: &str)
}
}

// Public facing interpolated_rate
extension_sql!(
"\n\
CREATE FUNCTION toolkit_experimental.interpolated_rate(summary countersummary,\n\
start timestamptz,\n\
duration interval,\n\
prev countersummary,\n\
next countersummary) RETURNS DOUBLE PRECISION\n\
AS $$\n\
SELECT interpolated_rate(summary,start,duration,prev,next) $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;\n\
",
name = "experimental_interpolated_rate",
requires = [counter_agg_interpolated_rate]
);

#[pg_extern(name = "interpolated_rate", immutable, parallel_safe)]
fn counter_agg_interpolated_rate<'a>(
summary: CounterSummary<'a>,
Expand All @@ -698,6 +699,32 @@ fn counter_agg_interpolated_rate<'a>(
.rate()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_counter_interpolated_rate<'a>(
sketch: CounterSummary<'a>,
accessor: CounterInterpolatedRateAccessor<'a>,
) -> Option<f64> {
let prev = if accessor.flags & 1 == 1 {
Some(accessor.prev.clone().into())
} else {
None
};
let next = if accessor.flags & 2 == 2 {
Some(accessor.next.clone().into())
} else {
None
};

counter_agg_interpolated_rate(
sketch,
accessor.timestamp.into(),
accessor.interval.into(),
prev,
next,
)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_counter_agg_num_elements<'a>(
Expand Down Expand Up @@ -1335,7 +1362,7 @@ mod tests {
let mut deltas = client
.update(
r#"SELECT
toolkit_experimental.interpolated_delta(
interpolated_delta(
agg,
bucket,
'1 day'::interval,
Expand Down Expand Up @@ -1369,12 +1396,11 @@ mod tests {
);
assert!(deltas.next().is_none());

// test that the non experimental version also returns the same result
// test that the arrow version also returns the same result
let mut deltas = client
.update(
r#"SELECT
interpolated_delta(
agg,
agg -> interpolated_delta(
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
Expand Down Expand Up @@ -1410,7 +1436,7 @@ mod tests {
let mut rates = client
.update(
r#"SELECT
toolkit_experimental.interpolated_rate(
interpolated_rate(
agg,
bucket,
'1 day'::interval,
Expand Down Expand Up @@ -1442,14 +1468,13 @@ mod tests {
rates.next().unwrap()[1].value().unwrap(),
Some((35. + 30. - 27.5) / (16. * 60. * 60.))
);

// test that the non experimental version also returns the same result
assert!(rates.next().is_none());

// test that the arrow operator version also returns the same result
let mut rates = client
.update(
r#"SELECT
interpolated_rate(
agg,
agg -> interpolated_rate(
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
Expand Down Expand Up @@ -1509,7 +1534,7 @@ mod tests {
let mut deltas = client
.update(
r#"SELECT
toolkit_experimental.interpolated_delta(
interpolated_delta(
agg,
bucket,
'1 day'::interval,
Expand Down
91 changes: 91 additions & 0 deletions extension/src/counter_agg/accessors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use pgx::*;

use crate::{
counter_agg::{CounterSummary, CounterSummaryData, MetricSummary},
datum_utils::interval_to_ms,
pg_type, ron_inout_funcs,
};

use tspoint::TSPoint;

pg_type! {
struct CounterInterpolatedRateAccessor {
timestamp : i64,
interval : i64,
prev : CounterSummaryData,
next : CounterSummaryData,
flags : u64,
}
}

ron_inout_funcs!(CounterInterpolatedRateAccessor);

#[pg_extern(immutable, parallel_safe, name = "interpolated_rate")]
fn counter_interpolated_rate_accessor<'a>(
start: crate::raw::TimestampTz,
duration: crate::raw::Interval,
prev: Option<CounterSummary<'a>>,
next: Option<CounterSummary<'a>>,
) -> CounterInterpolatedRateAccessor<'static> {
fn empty_summary<'b>() -> Option<CounterSummary<'b>> {
let tmp = TSPoint { ts: 0, val: 0.0 };
let tmp = MetricSummary::new(&tmp, None);
let tmp = CounterSummary::from_internal_counter_summary(tmp);
Some(tmp)
}

let flags = u64::from(prev.is_some()) + if next.is_some() { 2 } else { 0 };
let prev = prev.or_else(empty_summary).unwrap().0;
let next = next.or_else(empty_summary).unwrap().0;
let interval = interval_to_ms(&start, &duration);
crate::build! {
CounterInterpolatedRateAccessor {
timestamp : start.into(),
interval,
prev,
next,
flags,
}
}
}

pg_type! {
struct CounterInterpolatedDeltaAccessor {
timestamp : i64,
interval : i64,
prev : CounterSummaryData,
next : CounterSummaryData,
flags : u64,
}
}

ron_inout_funcs!(CounterInterpolatedDeltaAccessor);

#[pg_extern(immutable, parallel_safe, name = "interpolated_delta")]
fn counter_interpolated_delta_accessor<'a>(
start: crate::raw::TimestampTz,
duration: crate::raw::Interval,
prev: Option<CounterSummary<'a>>,
next: Option<CounterSummary<'a>>,
) -> CounterInterpolatedDeltaAccessor<'static> {
fn empty_summary<'b>() -> Option<CounterSummary<'b>> {
let tmp = TSPoint { ts: 0, val: 0.0 };
let tmp = MetricSummary::new(&tmp, None);
let tmp = CounterSummary::from_internal_counter_summary(tmp);
Some(tmp)
}

let flags = u64::from(prev.is_some()) + if next.is_some() { 2 } else { 0 };
let prev = prev.or_else(empty_summary).unwrap().0;
let next = next.or_else(empty_summary).unwrap().0;
let interval = interval_to_ms(&start, &duration);
crate::build! {
CounterInterpolatedDeltaAccessor {
timestamp : start.into(),
interval,
prev,
next,
flags,
}
}
}
24 changes: 24 additions & 0 deletions extension/src/stabilization_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@

crate::functions_stabilized_at! {
STABLE_FUNCTIONS
"1.15.0" => {
arrow_counter_interpolated_delta(countersummary,counterinterpolateddeltaaccessor),
arrow_counter_interpolated_rate(countersummary,counterinterpolatedrateaccessor),
arrow_time_weighted_average_interpolated_average(timeweightsummary,timeweightinterpolatedaverageaccessor),
counterinterpolateddeltaaccessor_in(cstring),
counterinterpolateddeltaaccessor_out(counterinterpolateddeltaaccessor),
counterinterpolatedrateaccessor_in(cstring),
counterinterpolatedrateaccessor_out(counterinterpolatedrateaccessor),
interpolated_average(timestamp with time zone,interval,timeweightsummary,timeweightsummary),
interpolated_delta(timestamp with time zone,interval,countersummary,countersummary),
interpolated_rate(timestamp with time zone,interval,countersummary,countersummary),
timeweightinterpolatedaverageaccessor_in(cstring),
timeweightinterpolatedaverageaccessor_out(timeweightinterpolatedaverageaccessor),
}
"1.14.0" => {
interpolated_average(timeweightsummary,timestamp with time zone,interval,timeweightsummary,timeweightsummary),
interpolated_delta(countersummary,timestamp with time zone,interval,countersummary,countersummary),
Expand Down Expand Up @@ -492,6 +506,11 @@ crate::functions_stabilized_at! {

crate::types_stabilized_at! {
STABLE_TYPES
"1.15.0" => {
counterinterpolateddeltaaccessor,
counterinterpolatedrateaccessor,
timeweightinterpolatedaverageaccessor,
}
"1.14.0" => {
candlestick,
accessorclose,
Expand Down Expand Up @@ -581,6 +600,11 @@ crate::types_stabilized_at! {

crate::operators_stabilized_at! {
STABLE_OPERATORS
"1.15.0" => {
"->"(countersummary,counterinterpolateddeltaaccessor),
"->"(countersummary,counterinterpolatedrateaccessor),
"->"(timeweightsummary,timeweightinterpolatedaverageaccessor),
}
"1.14.0" => {
"->"(candlestick,accessorclose),
"->"(candlestick,accessorclosetime),
Expand Down
Loading

0 comments on commit be415b7

Please sign in to comment.