-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rust: access to groupby agg from custom GroupsProxy? #7211
Comments
Hello again @erinov1 I've been experimenting with this after your hint about GroupsProxy in #9467 Using your example from #5891 with Timer():
pl.concat(df.slice(a, b) for a, b in zip(slice_lookup['offset'], slice_lookup['length']))
# shape: (600_000, 3)
# ┌────────────┬────────┬───────┐
# │ date ┆ values ┆ id │
# │ --- ┆ --- ┆ --- │
# │ date ┆ str ┆ i32 │
# ╞════════════╪════════╪═══════╡
# │ 2000-01-01 ┆ A ┆ 0 │
# │ 2000-06-01 ┆ B ┆ 0 │
# │ 2000-01-01 ┆ A ┆ 0 │
# │ 2000-06-01 ┆ B ┆ 0 │
# │ … ┆ … ┆ … │
# │ 2000-01-01 ┆ A ┆ 99999 │
# │ 2000-06-01 ┆ B ┆ 99999 │
# │ 2001-01-01 ┆ C ┆ 99999 │
# │ 2003-01-01 ┆ D ┆ 99999 │
# └────────────┴────────┴───────┘
# Elapsed time: 1.8726 seconds I modified the pyo3-polars example: https://github.com/pola-rs/pyo3-polars/tree/main/example/extend_polars/src pub(super) fn groupby_proxy_slices(df: DataFrame, start: DataFrame, end: DataFrame) -> PolarsResult<DataFrame> {
let start = start.column("start")?;
let end = end.column("end")?;
let sa = start.u32()?;
let sb = end.u32()?;
let groups = sa.into_iter().zip(sb.into_iter()).map(|(a, b)| {
match (a, b) {
(Some(a), Some(b)) => {
let start = a as IdxSize;
let end = b as IdxSize;
let len = end - start as IdxSize;
[start, len]
},
_ => [0, 0] // Is this wrong? What to do here?
}
}).collect();
let groups = GroupsProxy::Slice { groups, rolling: false };
let out = GroupBy::new(&df, vec![], groups, None);
//Ok(out.first()?)
Ok(out.agg_list()?)
}
with Timer():
extend_polars.groupby_proxy_slices(df, slices.select('start'), slices.select('end'))
# shape: (300_000, 3)
# ┌───────────────────────────────────┬─────────────────┬───────────────────────┐
# │ date_agg_list ┆ values_agg_list ┆ id_agg_list │
# │ --- ┆ --- ┆ --- │
# │ list[date] ┆ list[str] ┆ list[i32] │
# ╞═══════════════════════════════════╪═════════════════╪═══════════════════════╡
# │ [2000-01-01, 2000-06-01] ┆ ["A", "B"] ┆ [0, 0] │
# │ [2000-01-01, 2000-06-01, 2001-01… ┆ ["A", "B", "C"] ┆ [0, 0, 0] │
# │ [2003-01-01] ┆ ["D"] ┆ [0] │
# │ [2000-01-01, 2000-06-01] ┆ ["A", "B"] ┆ [1, 1] │
# │ … ┆ … ┆ … │
# │ [2003-01-01] ┆ ["D"] ┆ [99998] │
# │ [2000-01-01, 2000-06-01] ┆ ["A", "B"] ┆ [99999, 99999] │
# │ [2000-01-01, 2000-06-01, 2001-01… ┆ ["A", "B", "C"] ┆ [99999, 99999, 99999] │
# │ [2003-01-01] ┆ ["D"] ┆ [99999] │
# └───────────────────────────────────┴─────────────────┴───────────────────────┘
# Elapsed time: 0.1756 seconds I tried to get a I also couldn't figure out how to run arbitrary expressions using pyo3-polars (perhaps it's not possible?) From some poking around I saw how pivot worked and copied its behaviour: https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/src/frame/pivot.rs https://github.com/pola-rs/polars/blob/main/polars/polars-ops/src/frame/pivot/mod.rs It seems like /// polars-lazy/src/frame/agg_slices.rs
use rayon::prelude::*;
use polars_core::frame::groupby::expr::PhysicalAggExpr;
use polars_core::prelude::*;
use polars_ops::agg_slices::SliceAgg;
use polars_core::frame::groupby::GroupBy;
use polars_core::POOL;
use crate::physical_plan::planner::create_physical_expr;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
// taken from polars/polars-lazy/src/physical_plan/exotic.rs
pub(crate) fn prepare_eval_expr(mut expr: Expr) -> Expr {
dbg!(expr.clone());
expr.mutate().apply(|e| match e {
Expr::Column(name) => {
*name = Arc::from("");
true
}
Expr::Nth(_) => {
*e = Expr::Column(Arc::from(""));
true
}
_ => true,
});
dbg!(expr.clone());
expr
}
// taken from polars/polars-lazy/src/physical_plan/exotic.rs
pub(crate) fn prepare_expression_for_context(
name: &str,
expr: &Expr,
dtype: &DataType,
ctxt: Context,
) -> PolarsResult<Arc<dyn PhysicalExpr>> {
let mut lp_arena = Arena::with_capacity(8);
let mut expr_arena = Arena::with_capacity(10);
dbg!(expr);
// create a dummy lazyframe and run a very simple optimization run so that
// type coercion and simplify expression optimizations run.
let column = Series::full_null(name, 0, dtype);
dbg!(column.clone());
let lf = DataFrame::new_no_checks(vec![column])
.lazy()
.without_optimizations()
.with_simplify_expr(true)
.select([expr.clone()]);
let optimized = lf.optimize(&mut lp_arena, &mut expr_arena).unwrap();
let lp = lp_arena.get(optimized);
let aexpr = lp.get_exprs().pop().unwrap();
dbg!(aexpr);
create_physical_expr(aexpr, ctxt, &expr_arena, None, &mut Default::default())
}
struct AggSliceExpr(Expr);
impl PhysicalAggExpr for AggSliceExpr {
fn evaluate(&self, df: &DataFrame, groups: &GroupsProxy) -> PolarsResult<Series> {
let expr = &self.0.clone();
dbg!(expr);
let mut arena = Arena::with_capacity(10);
let aexpr = to_aexpr(expr.clone(), &mut arena);
let phys_expr = create_physical_expr(
aexpr,
Context::Aggregation,
&arena,
None,
&mut Default::default(),
)?;
dbg!(aexpr);
let state = ExecutionState::new();
phys_expr
.evaluate_on_groups(df, groups, &state)
.map(|mut ac| ac.aggregated())
}
fn root_name(&self) -> PolarsResult<&str> {
Ok("")
}
}
pub fn agg_slices(
df: &DataFrame,
starts: &DataFrame,
ends: &DataFrame,
agg_expr: Vec<Option<Expr>>,
) -> PolarsResult<DataFrame>
{
let mut df = df.clone();
let agg_expr = agg_expr.iter().map(|expr| {
let expr = expr.clone().unwrap();
SliceAgg::Expr(Arc::new(AggSliceExpr(expr)))
}).collect::<Vec<_>>();
agg_slices_helper(
&mut df, starts, ends, agg_expr
)
}
pub fn agg_slices_helper(
df: &mut DataFrame,
starts: &DataFrame,
ends: &DataFrame,
agg_expr: Vec<SliceAgg>,
) -> PolarsResult<DataFrame> {
df.as_single_chunk_par();
let start = starts.column("start")?;
let end = ends.column("end")?;
let sa = start.u32()?;
let sb = end.u32()?;
let groups = sa.into_iter().zip(sb.into_iter()).map(|(a, b)| {
match (a, b) {
(Some(a), Some(b)) => {
let start = a as IdxSize;
let end = b as IdxSize;
let len = end - start as IdxSize;
[start, len]
},
_ => [0, 0] // This is wrong. What to do here?
}
}).collect();
let groups = GroupsProxy::Slice { groups, rolling: false };
let mut state = ExecutionState::new();
let agg_columns = POOL.install(|| {
use SliceAgg::*;
agg_expr.par_iter()
.map(|expr|
match &expr {
Expr(ref expr) => {
let agg = expr.evaluate(&df, &groups)?;
polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());
Ok(agg)
}
})
.collect::<PolarsResult<Vec<_>>>()
});
let mut final_cols = vec![];
let agg_columns = agg_columns?;
state.expr_cache = None;
final_cols.extend_from_slice(&agg_columns);
Ok(DataFrame::new_no_checks(final_cols))
} I'm not sure if it's the correct way to do things, but it does seem to allow you to run expressions through the slices: with Timer():
pl.DataFrame._from_pydf(
df._df.agg_slices(
slices.select('start')._df,
slices.select('end')._df,
[
pl.first('values').alias('first')._pyexpr,
pl.last('values').alias('last')._pyexpr,
pl.col('values').str.to_lowercase().alias('lower')._pyexpr,
pl.col('date')._pyexpr
]
)
)
# shape: (300_000, 4)
# ┌───────┬──────┬─────────────────┬───────────────────────────────────┐
# │ first ┆ last ┆ lower ┆ date │
# │ --- ┆ --- ┆ --- ┆ --- │
# │ str ┆ str ┆ list[str] ┆ list[date] │
# ╞═══════╪══════╪═════════════════╪═══════════════════════════════════╡
# │ A ┆ B ┆ ["a", "b"] ┆ [2000-01-01, 2000-06-01] │
# │ A ┆ C ┆ ["a", "b", "c"] ┆ [2000-01-01, 2000-06-01, 2001-01… │
# │ D ┆ D ┆ ["d"] ┆ [2003-01-01] │
# │ A ┆ B ┆ ["a", "b"] ┆ [2000-01-01, 2000-06-01] │
# │ … ┆ … ┆ … ┆ … │
# │ D ┆ D ┆ ["d"] ┆ [2003-01-01] │
# │ A ┆ B ┆ ["a", "b"] ┆ [2000-01-01, 2000-06-01] │
# │ A ┆ C ┆ ["a", "b", "c"] ┆ [2000-01-01, 2000-06-01, 2001-01… │
# │ D ┆ D ┆ ["d"] ┆ [2003-01-01] │
# └───────┴──────┴─────────────────┴───────────────────────────────────┘
# Elapsed time: 0.0843 seconds It seems like expressions are supposed to go through some other processing somewhere because if I pass in
|
@cmdlineluser very interesting, definitely further than I got trying to sort through the code! It would be nice to get some feedback from the devs whether this is the right way to go about it. |
So it seems like the only way to do this currently is to copy how I've done that here to make Code exampleimport polars as pl
from datetime import date
df = pl.DataFrame(
{
"date": [
date(2000, 1, 1),
date(2000, 6, 1),
date(2001, 1, 1),
date(2003, 1, 1),
date(2022, 1, 1),
],
"id": [1, 1, 1, 2, 2],
"values": ["A", "B", "C", "D", "E"],
}
).lazy()
date_lookup = pl.DataFrame(
{
"id": [1, 1, 2],
"start_date": [date(2000, 1, 1), date(2000, 1, 1), date(2002, 6, 1)],
"end_date": [date(2000, 7, 1), date(2002, 1, 1), date(2003, 6, 1)],
}
).lazy()
df = df.with_row_count("index")
slices = (
date_lookup.join_asof(
df,
left_on="start_date",
right_on="date",
by="id",
strategy="forward",
)
.join_asof(
df,left_on="end_date", right_on="date", by="id", strategy="backward"
)
.select(
[
pl.col("id").alias("id_right"),
"start_date",
"end_date",
pl.col("index").alias("start"),
(pl.col("index_right") + 1).alias("end"),
]
)
)
df = df.with_row_count().join(slices.with_row_count("index"), on="index", how='left')
print(
pl.LazyFrame._from_pyldf(
df._ldf
.groupby_slicing(pl.col('start')._pyexpr, pl.col('end')._pyexpr)
.agg([
pl.col('start', 'end', 'date', 'id', 'values')._pyexpr,
pl.col('date').min().alias('min')._pyexpr,
pl.col('values').n_unique().alias('n_unique')._pyexpr
])
)
.collect()
)
The only part that's currently missing is you need to respecify the start, end columns in the And I'm not sure if the building of the GroupsProxy is supposed to be multithreaded on the POOL or not: https://github.com/cmdlineluser/polars/blob/groupby-slicing/polars/polars-lazy/src/physical_plan/executors/groupby_slicing.rs#L30 Update: Thanks to reading through #9649 it seems the step I was missing for the ColumnNotFound error was adding the start/end columns to the |
#9691 has just been posted which seems like a better approach. |
Research
I have searched the above polars tags on Stack Overflow for similar questions.
I have asked my usage related question on Stack Overflow.
Link to question on Stack Overflow
No response
Question about Polars
Suppose I have some way of defining a custom
Vec<Series>
of keys and aGroupsProxy
. Is there a reasonable way to aggregate a list of expressions either with the usualgroupby
syntax or to imitate the same functionality given a list of expressions?Background: I am trying to implement a type of dynamic groupby with overlapping windows, but where windows have irregular start and end times. Currently I am using the python API for this: using
search_sorted
it is fast to come up with the indexes of the offsets/lengths for each window. I am then creating a list of LazyFrames containing all the slices I need and finally callingcollect_all
. Unfortunately I have many millions of slices and looping through the python iterator is far too slow for my application, so I am looking for a way to speed this up.The text was updated successfully, but these errors were encountered: