Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve rle_id iteration performance and set sorted flags #16893

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions crates/polars-ops/src/series/ops/rle.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,53 @@
use polars_core::prelude::*;
use polars_core::series::IsSorted;

/// Get the lengths of runs of identical values.
pub fn rle(s: &Series) -> PolarsResult<Series> {
let (s1, s2) = (s.slice(0, s.len() - 1), s.slice(1, s.len()));
let s_neq = s1.not_equal_missing(&s2)?;
let n_runs = s_neq.sum().unwrap() + 1;
let n_runs = s_neq.sum().ok_or_else(|| polars_err!(InvalidOperation: "could not evaluate 'rle_id' on series of dtype: {}", s.dtype()))? + 1;

let mut lengths = Vec::<IdxSize>::with_capacity(n_runs as usize);
lengths.push(1);
let mut vals = Series::new_empty("value", s.dtype());
let vals = vals.extend(&s.head(Some(1)))?.extend(&s2.filter(&s_neq)?)?;
let mut idx = 0;
for v in s_neq.into_iter() {
if v.unwrap() {
idx += 1;
lengths.push(1);
} else {
lengths[idx] += 1;

assert_eq!(s_neq.null_count(), 0);
for arr in s_neq.downcast_iter() {
for v in arr.values_iter() {
if v {
idx += 1;
lengths.push(1)
} else {
lengths[idx] += 1;
}
}
}

let outvals = vec![Series::from_vec("len", lengths), vals.to_owned()];
Ok(StructChunked::new("rle", &outvals)?.into_series())
Ok(StructChunked::new(s.name(), &outvals)?.into_series())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always keep names.

}

/// Similar to `rle`, but maps values to run IDs.
pub fn rle_id(s: &Series) -> PolarsResult<Series> {
if s.len() == 0 {
return Ok(Series::new_empty("id", &IDX_DTYPE));
return Ok(Series::new_empty(s.name(), &IDX_DTYPE));
}
let (s1, s2) = (s.slice(0, s.len() - 1), s.slice(1, s.len()));
let s_neq = s1.not_equal_missing(&s2)?;

let mut out = Vec::<IdxSize>::with_capacity(s.len());
let mut last = 0;
out.push(last); // Run numbers start at zero
assert_eq!(s_neq.null_count(), 0);
for a in s_neq.downcast_iter() {
for aa in a.values_iter() {
last += aa as IdxSize;
out.push(last);
}
}
Ok(Series::from_vec("id", out))
Ok(IdxCa::from_vec(s.name(), out)
.with_sorted_flag(IsSorted::Ascending)
.into_series())
}
13 changes: 0 additions & 13 deletions py-polars/tests/unit/operations/test_group_by_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,19 +329,6 @@ def test_rolling_kernels_group_by_dynamic_7548() -> None:
}


def test_sorted_flag_group_by_dynamic() -> None:
df = pl.DataFrame({"ts": [date(2020, 1, 1), date(2020, 1, 2)], "val": [1, 2]})
assert (
(
df.group_by_dynamic(pl.col("ts").set_sorted(), every="1d").agg(
pl.col("val").sum()
)
)
.to_series()
.flags["SORTED_ASC"]
)


def test_rolling_dynamic_sortedness_check() -> None:
# when the by argument is passed, the sortedness flag
# will be unset as the take shuffles data, so we must explicitly
Expand Down
Loading