Skip to content

Commit

Permalink
Support rolling up state/timeline aggs
Browse files Browse the repository at this point in the history
  • Loading branch information
syvb committed Nov 29, 2022
1 parent b652476 commit 41ff3dd
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 15 deletions.
41 changes: 41 additions & 0 deletions docs/state_agg.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,44 @@ start_time | end_time
2019-12-31 00:00:00+00 | 2020-01-01 00:00:00+00
2020-01-01 00:02:00+00 | 2020-01-05 00:00:00+00
```

## rolllup

```SQL
WITH buckets AS (SELECT
date_trunc('minute', ts) as dt,
toolkit_experimental.state_agg(ts, state) AS sa
FROM states_test
GROUP BY date_trunc('minute', ts))
SELECT toolkit_experimental.duration_in(
'START',
toolkit_experimental.rollup(buckets.sa)
)
FROM buckets;
```
```output
interval
----------
00:00:11
```

```SQL
WITH buckets AS (SELECT
date_trunc('minute', ts) as dt,
toolkit_experimental.timeline_agg(ts, state) AS sa
FROM states_test
GROUP BY date_trunc('minute', ts))
SELECT toolkit_experimental.state_timeline(
toolkit_experimental.rollup(buckets.sa)
)
FROM buckets;
```
```output
state_timeline
-----------------------------------------------------------
(START,"2020-01-01 00:00:00+00","2020-01-01 00:00:11+00")
(OK,"2020-01-01 00:00:11+00","2020-01-01 00:00:11+00")
(ERROR,"2020-01-01 00:01:00+00","2020-01-01 00:01:03+00")
(OK,"2020-01-01 00:01:03+00","2020-01-01 00:01:03+00")
(STOP,"2020-01-01 00:02:00+00","2020-01-01 00:02:00+00")
```
135 changes: 120 additions & 15 deletions extension/src/state_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use pgx::{iter::TableIterator, *};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;

use aggregate_builder::aggregate;
use flat_serialize::*;
Expand All @@ -23,6 +24,8 @@ use crate::{

use toolkit_experimental::{StateAgg, TimelineAgg};

mod rollup;

#[pg_schema]
pub mod toolkit_experimental {
use super::*;
Expand Down Expand Up @@ -52,6 +55,24 @@ pub mod toolkit_experimental {
}

impl StateAgg<'_> {
pub(super) fn empty(from_timeline_agg: bool) -> Self {
unsafe {
flatten!(StateAgg {
states_len: 0,
states: Slice::Slice(&[]),
durations_len: 0,
durations: Slice::Slice(&[]),
combined_durations: Slice::Slice(&[]),
combined_durations_len: 0,
first_time: 0,
last_time: 0,
first_state: 0,
last_state: 0,
from_timeline_agg,
})
}
}

pub(super) fn new(
states: String,
durations: Vec<DurationInState>,
Expand All @@ -68,21 +89,7 @@ pub mod toolkit_experimental {
&& combined_durations.map(|v| v.is_empty()).unwrap_or(true)
);

return unsafe {
flatten!(StateAgg {
states_len: 0,
states: Slice::Slice(&[]),
durations_len: 0,
durations: Slice::Slice(&[]),
combined_durations: Slice::Slice(&[]),
combined_durations_len: 0,
first_time: 0,
last_time: 0,
first_state: 0,
last_state: 0,
from_timeline_agg,
})
};
return Self::empty(from_timeline_agg);
}

assert!(first.is_some() && last.is_some());
Expand Down Expand Up @@ -277,6 +284,104 @@ pub mod toolkit_experimental {
combined_durations,
)
}

/// Merges two non-overlapping aggregates.
pub fn merge(&self, other: &Self) -> Self {
assert_eq!(
self.from_timeline_agg, other.from_timeline_agg,
"can't merge state_agg and timeline_agg"
);
let (earlier, later) = match self.first_time.cmp(&other.first_time) {
Ordering::Less => (self, other),
Ordering::Greater => (other, self),
Ordering::Equal => panic!("can't merge overlapping aggregates (same start time)"),
};
assert!(
earlier.last_time < later.first_time,
"can't merge overlapping aggregates"
);

let later_states = String::from_utf8(later.states.iter().collect::<Vec<u8>>())
.expect("invalid later UTF-8 states");
let mut merged_states = String::from_utf8(earlier.states.iter().collect::<Vec<u8>>())
.expect("invalid earlier UTF-8 states");
let mut merged_durations = earlier.durations.iter().collect::<Vec<_>>();

for dis in later.durations.iter() {
let dis_state =
later_states[dis.state_beg as usize..dis.state_end as usize].to_string();

let merged_duration_to_update = merged_durations.iter_mut().find(|merged_dis| {
merged_states[merged_dis.state_beg as usize..merged_dis.state_end as usize]
== dis_state
});
if let Some(merged_duration_to_update) = merged_duration_to_update {
merged_duration_to_update.duration += dis.duration;
} else {
let (state_beg, state_end) = if let Some(bounds) = merged_states
.find(&dis_state)
.map(|idx| (idx as u32, (idx + dis_state.len()) as u32))
{
bounds
} else {
let bounds = (
merged_states.len() as u32,
(merged_states.len() + dis_state.len()) as u32,
);
merged_states.push_str(&dis_state);
bounds
};
merged_durations.push(DurationInState {
state_beg,
state_end,
duration: dis.duration,
});
};
}

let mut combined_durations = earlier
.combined_durations
.iter()
.chain(later.combined_durations.iter().map(|tis| {
let state = &later_states[tis.state_beg as usize..tis.state_end as usize];
let idx = merged_states.find(state).unwrap() as u32;
TimeInState {
state_beg: idx,
state_end: idx + state.len() as u32,
..tis
}
}))
.collect::<Vec<_>>();
if let (Some(TimeInState { end_time, .. }), Some(TimeInState { start_time, .. })) = (
earlier.combined_durations.iter().last(),
later.combined_durations.iter().next(),
) {
// possibly merge adjacent durations
if end_time == start_time {
combined_durations[earlier.combined_durations.len() - 1].end_time =
combined_durations
.remove(earlier.combined_durations.len())
.end_time;
}
};

let merged_states = merged_states.into_bytes();
unsafe {
flatten!(StateAgg {
states_len: merged_states.len() as u64,
states: (&*merged_states).into(),
durations_len: merged_durations.len() as u64,
durations: (&*merged_durations).into(),
combined_durations_len: combined_durations.len() as u64,
combined_durations: (&*combined_durations).into(),
first_time: earlier.first_time,
last_time: later.last_time,
first_state: earlier.first_state,
last_state: later.last_state,
from_timeline_agg: earlier.from_timeline_agg,
})
}
}
}

impl<'input> TimelineAgg<'input> {
Expand Down
82 changes: 82 additions & 0 deletions extension/src/state_aggregate/rollup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use super::*;
use crate::aggregate_utils::in_aggregate_context;

extension_sql!(
"CREATE AGGREGATE toolkit_experimental.rollup(
value toolkit_experimental.StateAgg
) (
sfunc = toolkit_experimental.state_agg_rollup_trans,
stype = toolkit_experimental.StateAgg,
finalfunc = toolkit_experimental.state_agg_rollup_final
);",
name = "state_agg_rollup",
requires = [
// TODO: depend on state_agg somehow?
state_agg_rollup_trans,
state_agg_rollup_final,
],
);

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
pub fn state_agg_rollup_trans<'a>(
state: Option<toolkit_experimental::StateAgg<'a>>,
val: toolkit_experimental::StateAgg<'a>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<toolkit_experimental::StateAgg<'a>> {
Some(unsafe {
in_aggregate_context(fcinfo, || match state {
None => val.into(),
Some(state) => state.merge(&val),
})
})
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
pub fn state_agg_rollup_final<'a>(
state: toolkit_experimental::StateAgg<'a>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<StateAgg<'a>> {
unsafe { in_aggregate_context(fcinfo, || Some(state.in_current_context())) }
}

extension_sql!(
"CREATE AGGREGATE toolkit_experimental.rollup(
value toolkit_experimental.TimelineAgg
) (
sfunc = toolkit_experimental.timeline_agg_rollup_trans,
stype = toolkit_experimental.StateAgg,
finalfunc = toolkit_experimental.timeline_agg_rollup_final
);",
name = "timeline_agg_rollup",
requires = [
// TODO: depend on state_agg somehow?
timeline_agg_rollup_trans,
timeline_agg_rollup_final,
],
);

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
pub fn timeline_agg_rollup_trans<'a>(
state: Option<toolkit_experimental::StateAgg<'a>>,
val: toolkit_experimental::TimelineAgg<'a>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<toolkit_experimental::StateAgg<'a>> {
Some(unsafe {
in_aggregate_context(fcinfo, || match state {
None => val.as_state_agg().into(),
Some(state) => state.merge(&val.as_state_agg()),
})
})
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
pub fn timeline_agg_rollup_final<'a>(
state: toolkit_experimental::StateAgg<'a>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<TimelineAgg<'a>> {
unsafe {
in_aggregate_context(fcinfo, || {
Some(TimelineAgg::new(state).in_current_context())
})
}
}

0 comments on commit 41ff3dd

Please sign in to comment.