Skip to content

Commit

Permalink
Merge #715
Browse files Browse the repository at this point in the history
715: Fix errors in state_agg rollup r=Smittyvb a=Smittyvb

This PR should fix out-of-bounds indexing in `rollup`:
- before, `last_state` in the merged aggregate was computed incorrectly; it is now determined correctly
- merging aggregates now sorts them first

It also improves the various error messages that could arise when using rollup.

Co-authored-by: Smitty <smitty@timescale.com>
Co-authored-by: syvb <smitty@timescale.com>
Co-authored-by: Smittyvb <smitty@timescale.com>
  • Loading branch information
bors[bot] and syvb authored Feb 24, 2023
2 parents 96678c5 + 5e081e0 commit f697f0b
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 33 deletions.
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
#### New experimental features

#### Bug fixes
- [#715](https://github.com/timescale/timescaledb-toolkit/pull/715): Fix out-of-bounds indexing error in `state_agg` rollup

#### Other notable changes

Expand Down
11 changes: 9 additions & 2 deletions extension/src/state_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,20 @@ impl StateEntry {
if self.a == i64::MAX {
MaterializedState::Integer(self.b)
} else {
MaterializedState::String(states[self.a as usize..self.b as usize].to_string())
MaterializedState::String(
states
.get(self.a as usize..self.b as usize)
.expect("tried to materialize out-of-bounds state")
.to_string(),
)
}
}

fn as_str(self, states: &str) -> &str {
assert!(self.a != i64::MAX, "Tried to get non-string state");
&states[self.a as usize..self.b as usize]
states
.get(self.a as usize..self.b as usize)
.expect("tried to stringify out-of-bounds state")
}

fn as_integer(self) -> i64 {
Expand Down
215 changes: 184 additions & 31 deletions extension/src/state_aggregate/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct RollupTransState {
compact: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct OwnedCompactStateAgg {
durations: Vec<DurationInState>,
combined_durations: Vec<TimeInState>,
Expand All @@ -80,14 +80,22 @@ impl OwnedCompactStateAgg {
"can't merge aggs with different state types"
);

let (earlier, later) = match self.first_time.cmp(&other.first_time) {
let (earlier, later) = match self.cmp(&other) {
Ordering::Less => (self, other),
Ordering::Greater => (other, self),
Ordering::Equal => panic!("can't merge overlapping aggregates (same start time)"),
Ordering::Equal => panic!(
"can't merge overlapping aggregates (same start time: {})",
self.first_time
),
};

assert!(
earlier.last_time <= later.first_time,
"can't merge overlapping aggregates"
"can't merge overlapping aggregates (earlier={}-{}, later={}-{})",
earlier.first_time,
earlier.last_time,
later.first_time,
later.last_time,
);
assert_ne!(
later.durations.len(),
Expand All @@ -97,7 +105,7 @@ impl OwnedCompactStateAgg {
assert_ne!(
earlier.durations.len(),
0,
"later aggregate must be non-empty"
"earlier aggregate must be non-empty"
);

let later_states =
Expand All @@ -108,25 +116,37 @@ impl OwnedCompactStateAgg {

let earlier_len = earlier.combined_durations.len();

let mut added_entries = 0;
for dis in later.durations.iter() {
let merged_duration_to_update = merged_durations.iter_mut().find(|merged_dis| {
merged_dis.state.materialize(&merged_states) == dis.state.materialize(&later_states)
});
if let Some(merged_duration_to_update) = merged_duration_to_update {
merged_duration_to_update.duration += dis.duration;
} else {
let state = dis
.state
.materialize(&later_states)
.entry(&mut merged_states);
merged_durations.push(DurationInState {
state,
duration: dis.duration,
});
added_entries += 1;
let mut merged_last_state = None;
for (later_idx, dis) in later.durations.iter().enumerate() {
let materialized_dis = dis.state.materialize(&later_states);
let merged_duration_info =
merged_durations
.iter_mut()
.enumerate()
.find(|(_, merged_dis)| {
merged_dis.state.materialize(&merged_states) == materialized_dis
});

let merged_idx =
if let Some((merged_idx, merged_duration_to_update)) = merged_duration_info {
merged_duration_to_update.duration += dis.duration;
merged_idx
} else {
let state = materialized_dis.entry(&mut merged_states);
merged_durations.push(DurationInState {
state,
duration: dis.duration,
});
merged_durations.len() - 1
};

if later_idx == later.last_state as usize {
// this is the last state
merged_last_state = Some(merged_idx);
};
}
let merged_last_state =
merged_last_state.expect("later last_state not in later.durations") as u32;

let mut combined_durations = earlier
.combined_durations
Expand All @@ -142,22 +162,36 @@ impl OwnedCompactStateAgg {

let gap = later.first_time - earlier.last_time;
assert!(gap >= 0);
merged_durations[earlier.last_state as usize].duration += gap;
merged_durations
.get_mut(earlier.last_state as usize)
.expect("earlier.last_state doesn't point to a state")
.duration += gap;

// ensure combined_durations covers the whole range of time
if !earlier.compact {
if combined_durations[earlier_len - 1]
if combined_durations
.get_mut(earlier_len - 1)
.expect("invalid combined_durations: nothing at end of earlier")
.state
.materialize(&merged_states)
== combined_durations[earlier_len]
== combined_durations
.get(earlier_len)
.expect("invalid combined_durations: nothing at start of earlier")
.state
.materialize(&merged_states)
{
combined_durations[earlier_len - 1].end_time =
combined_durations.remove(earlier_len).end_time;
combined_durations
.get_mut(earlier_len - 1)
.expect("invalid combined_durations (nothing at earlier_len - 1, equal)")
.end_time = combined_durations.remove(earlier_len).end_time;
} else {
combined_durations[earlier_len - 1].end_time =
combined_durations[earlier_len].start_time;
combined_durations
.get_mut(earlier_len - 1)
.expect("invalid combined_durations (nothing at earlier_len - 1, not equal)")
.end_time = combined_durations
.get(earlier_len)
.expect("invalid combined_durations (nothing at earlier_len, not equal)")
.start_time;
}
}

Expand All @@ -169,8 +203,8 @@ impl OwnedCompactStateAgg {

first_time: earlier.first_time,
last_time: later.last_time,
first_state: earlier.first_state,
last_state: added_entries + later.last_state,
first_state: earlier.first_state, // indexes into earlier durations are same for merged_durations
last_state: merged_last_state,

// these values are always the same for both
compact: earlier.compact,
Expand Down Expand Up @@ -216,8 +250,23 @@ impl<'a> From<CompactStateAgg<'a>> for OwnedCompactStateAgg {
}
}

impl PartialOrd for OwnedCompactStateAgg {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for OwnedCompactStateAgg {
fn cmp(&self, other: &Self) -> Ordering {
// compare using first time (OwnedCompactStateAgg::merge will handle any overlap)
self.first_time.cmp(&other.first_time)
}
}

impl RollupTransState {
fn merge(&mut self) {
// OwnedCompactStateAgg::merge can't merge overlapping aggregates
self.values.sort();
self.values = self
.values
.drain(..)
Expand Down Expand Up @@ -417,4 +466,108 @@ mod tests {

r2.merge(r1);
}

#[test]
fn merges_compact_aggs_correctly() {
let s1 = OwnedCompactStateAgg {
durations: vec![
DurationInState {
duration: 500,
state: StateEntry::from_integer(555_2),
},
DurationInState {
duration: 400,
state: StateEntry::from_integer(555_1),
},
],
combined_durations: vec![],
first_time: 100,
last_time: 1000,
first_state: 1,
last_state: 0,
states: vec![],
compact: true,
integer_states: true,
};
let s2 = OwnedCompactStateAgg {
durations: vec![
DurationInState {
duration: 500,
state: StateEntry::from_integer(555_2),
},
DurationInState {
duration: 400,
state: StateEntry::from_integer(555_1),
},
],
combined_durations: vec![],
first_time: 1000 + 12345,
last_time: 1900 + 12345,
first_state: 1,
last_state: 0,
states: vec![],
compact: true,
integer_states: true,
};
let s3 = OwnedCompactStateAgg {
durations: vec![
DurationInState {
duration: 500,
state: StateEntry::from_integer(555_2),
},
DurationInState {
duration: 400,
state: StateEntry::from_integer(555_1),
},
],
combined_durations: vec![],
first_time: 1900 + 12345,
last_time: 1900 + 12345 + 900,
first_state: 1,
last_state: 0,
states: vec![],
compact: true,
integer_states: true,
};
let expected = OwnedCompactStateAgg {
durations: vec![
DurationInState {
duration: 500 * 3 + 12345,
state: StateEntry::from_integer(555_2),
},
DurationInState {
duration: 400 * 3,
state: StateEntry::from_integer(555_1),
},
],
combined_durations: vec![],
first_time: 100,
last_time: 1900 + 12345 + 900,
first_state: 1,
last_state: 0,
states: vec![],
compact: true,
integer_states: true,
};
let merged = s1.clone().merge(s2.clone().merge(s3.clone()));
assert_eq!(merged, expected);
let merged = s3.clone().merge(s2.clone().merge(s1.clone()));
assert_eq!(merged, expected);

let mut trans_state = RollupTransState {
values: vec![s1.clone(), s2.clone(), s3.clone()],
compact: true,
};
trans_state.merge();
assert_eq!(trans_state.values.len(), 1);
assert_eq!(trans_state.values[0], expected.clone());

let mut trans_state = RollupTransState {
values: vec![s3.clone(), s1.clone(), s2.clone()],
compact: true,
};
trans_state.merge();
assert_eq!(trans_state.values.len(), 1);
assert_eq!(trans_state.values[0], expected.clone());
}
}

0 comments on commit f697f0b

Please sign in to comment.