Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust: access to groupby agg from custom GroupsProxy? #7211

Closed
1 of 2 tasks
erinov1 opened this issue Feb 26, 2023 · 5 comments
Closed
1 of 2 tasks

Rust: access to groupby agg from custom GroupsProxy? #7211

erinov1 opened this issue Feb 26, 2023 · 5 comments

Comments

@erinov1
Copy link

erinov1 commented Feb 26, 2023

Research

  • I have searched the above polars tags on Stack Overflow for similar questions.

  • I have asked my usage related question on Stack Overflow.

Link to question on Stack Overflow

No response

Question about Polars

Suppose I have some way of defining a custom Vec<Series> of keys and a GroupsProxy. Is there a reasonable way to aggregate a list of expressions either with the usual groupby syntax or to imitate the same functionality given a list of expressions?

Background: I am trying to implement a type of dynamic groupby with overlapping windows, but where windows have irregular start and end times. Currently I am using the python API for this: using search_sorted it is fast to come up with the indexes of the offsets/lengths for each window. I am then creating a list of LazyFrames containing all the slices I need and finally calling collect_all. Unfortunately I have many millions of slices and looping through the python iterator is far too slow for my application, so I am looking for a way to speed this up.

@cmdlineluser
Copy link
Contributor

Hello again @erinov1

I've been experimenting with this after your hint about GroupsProxy in #9467

Using your example from #5891

with Timer(): 
   pl.concat(df.slice(a, b) for a, b in zip(slice_lookup['offset'], slice_lookup['length']))

# shape: (600_000, 3)
# ┌────────────┬────────┬───────┐
# │ date       ┆ values ┆ id    │
# │ ---        ┆ ---    ┆ ---   │
# │ date       ┆ str    ┆ i32   │
# ╞════════════╪════════╪═══════╡
# │ 2000-01-01 ┆ A      ┆ 0     │
# │ 2000-06-01 ┆ B      ┆ 0     │
# │ 2000-01-01 ┆ A      ┆ 0     │
# │ 2000-06-01 ┆ B      ┆ 0     │
# │ …          ┆ …      ┆ …     │
# │ 2000-01-01 ┆ A      ┆ 99999 │
# │ 2000-06-01 ┆ B      ┆ 99999 │
# │ 2001-01-01 ┆ C      ┆ 99999 │
# │ 2003-01-01 ┆ D      ┆ 99999 │
# └────────────┴────────┴───────┘
# Elapsed time: 1.8726 seconds

I modified the pyo3-polars example: https://github.com/pola-rs/pyo3-polars/tree/main/example/extend_polars/src

pub(super) fn groupby_proxy_slices(df: DataFrame, start: DataFrame, end: DataFrame) -> PolarsResult<DataFrame> {
    let start = start.column("start")?;
    let end = end.column("end")?;

    let sa = start.u32()?;
    let sb = end.u32()?;

    let groups = sa.into_iter().zip(sb.into_iter()).map(|(a, b)| {
        match (a, b) {
            (Some(a), Some(b)) => {
                let start = a as IdxSize;
                let end = b as IdxSize;
                let len = end - start as IdxSize;
                [start, len]
            },
            _ => [0, 0] // Is this wrong? What to do here?
        }
    }).collect();

    let groups = GroupsProxy::Slice { groups, rolling: false };

    let out = GroupBy::new(&df, vec![], groups, None);
    //Ok(out.first()?)
    Ok(out.agg_list()?)
}

slices is just slice_lookup but with start, end instead of offset, length

with Timer(): 
   extend_polars.groupby_proxy_slices(df, slices.select('start'), slices.select('end'))

# shape: (300_000, 3)
# ┌───────────────────────────────────┬─────────────────┬───────────────────────┐
# │ date_agg_list                     ┆ values_agg_list ┆ id_agg_list           │
# │ ---                               ┆ ---             ┆ ---                   │
# │ list[date]                        ┆ list[str]       ┆ list[i32]             │
# ╞═══════════════════════════════════╪═════════════════╪═══════════════════════╡
# │ [2000-01-01, 2000-06-01]          ┆ ["A", "B"]      ┆ [0, 0]                │
# │ [2000-01-01, 2000-06-01, 2001-01… ┆ ["A", "B", "C"] ┆ [0, 0, 0]             │
# │ [2003-01-01]                      ┆ ["D"]           ┆ [0]                   │
# │ [2000-01-01, 2000-06-01]          ┆ ["A", "B"]      ┆ [1, 1]                │
# │ …                                 ┆ …               ┆ …                     │
# │ [2003-01-01]                      ┆ ["D"]           ┆ [99998]               │
# │ [2000-01-01, 2000-06-01]          ┆ ["A", "B"]      ┆ [99999, 99999]        │
# │ [2000-01-01, 2000-06-01, 2001-01… ┆ ["A", "B", "C"] ┆ [99999, 99999, 99999] │
# │ [2003-01-01]                      ┆ ["D"]           ┆ [99999]               │
# └───────────────────────────────────┴─────────────────┴───────────────────────┘
# Elapsed time: 0.1756 seconds

I tried to get a GroupBy or LazyGroupBy back from pyo3-polars in order to access .agg() but I couldn't figure it out.

I also couldn't figure out how to run arbitrary expressions using pyo3-polars (perhaps it's not possible?)

From some poking around I saw how pivot worked and copied its behaviour:

https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/src/frame/pivot.rs

https://github.com/pola-rs/polars/blob/main/polars/polars-ops/src/frame/pivot/mod.rs

It seems like .evaluate_on_groups() from PhysicalAggExpr is how an .agg() is executed?

/// polars-lazy/src/frame/agg_slices.rs
use rayon::prelude::*;

use polars_core::frame::groupby::expr::PhysicalAggExpr;
use polars_core::prelude::*;
use polars_ops::agg_slices::SliceAgg;

use polars_core::frame::groupby::GroupBy;
use polars_core::POOL;

use crate::physical_plan::planner::create_physical_expr;

use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

// taken from polars/polars-lazy/src/physical_plan/exotic.rs
pub(crate) fn prepare_eval_expr(mut expr: Expr) -> Expr {
    dbg!(expr.clone());
    expr.mutate().apply(|e| match e {
        Expr::Column(name) => {
            *name = Arc::from("");
            true
        }
        Expr::Nth(_) => {
            *e = Expr::Column(Arc::from(""));
            true
        }
        _ => true,
    });
    dbg!(expr.clone());
    expr
}

// taken from polars/polars-lazy/src/physical_plan/exotic.rs
pub(crate) fn prepare_expression_for_context(
    name: &str,
    expr: &Expr,
    dtype: &DataType,
    ctxt: Context,
) -> PolarsResult<Arc<dyn PhysicalExpr>> {
    let mut lp_arena = Arena::with_capacity(8);
    let mut expr_arena = Arena::with_capacity(10);

    dbg!(expr);
    // create a dummy lazyframe and run a very simple optimization run so that
    // type coercion and simplify expression optimizations run.
    let column = Series::full_null(name, 0, dtype);
    dbg!(column.clone());
    let lf = DataFrame::new_no_checks(vec![column])
        .lazy()
        .without_optimizations()
        .with_simplify_expr(true)
        .select([expr.clone()]);
    let optimized = lf.optimize(&mut lp_arena, &mut expr_arena).unwrap();
    let lp = lp_arena.get(optimized);
    let aexpr = lp.get_exprs().pop().unwrap();
    dbg!(aexpr);

    create_physical_expr(aexpr, ctxt, &expr_arena, None, &mut Default::default())
}

struct AggSliceExpr(Expr);

impl PhysicalAggExpr for AggSliceExpr {
    fn evaluate(&self, df: &DataFrame, groups: &GroupsProxy) -> PolarsResult<Series> {
        let expr = &self.0.clone();
        dbg!(expr);
        let mut arena = Arena::with_capacity(10);
        let aexpr = to_aexpr(expr.clone(), &mut arena);
        let phys_expr = create_physical_expr(
            aexpr,
            Context::Aggregation,
            &arena,
            None,
            &mut Default::default(),
        )?;

        dbg!(aexpr);
        let state = ExecutionState::new();

        phys_expr
            .evaluate_on_groups(df, groups, &state)
            .map(|mut ac| ac.aggregated())
    }

    fn root_name(&self) -> PolarsResult<&str> {
        Ok("")
    }
}

pub fn agg_slices(
    df: &DataFrame,
    starts: &DataFrame,
    ends: &DataFrame,
    agg_expr: Vec<Option<Expr>>,
) -> PolarsResult<DataFrame>
{
    let mut df = df.clone();
    let agg_expr = agg_expr.iter().map(|expr| {
        let expr = expr.clone().unwrap();
        SliceAgg::Expr(Arc::new(AggSliceExpr(expr)))
    }).collect::<Vec<_>>();
    agg_slices_helper(
        &mut df, starts, ends, agg_expr
    )
}

pub fn agg_slices_helper(
    df: &mut DataFrame,
    starts: &DataFrame,
    ends: &DataFrame,
    agg_expr: Vec<SliceAgg>,
) -> PolarsResult<DataFrame> {
    df.as_single_chunk_par();

    let start = starts.column("start")?;
    let end = ends.column("end")?;

    let sa = start.u32()?;
    let sb = end.u32()?;

    let groups = sa.into_iter().zip(sb.into_iter()).map(|(a, b)| {
        match (a, b) {
            (Some(a), Some(b)) => {
                let start = a as IdxSize;
                let end = b as IdxSize;
                let len = end - start as IdxSize;
                [start, len]
            },
            _ => [0, 0] // This is wrong. What to do here?
        }
    }).collect();

    let groups = GroupsProxy::Slice { groups, rolling: false };

    let mut state = ExecutionState::new();

    let agg_columns = POOL.install(|| {
        use SliceAgg::*;
        agg_expr.par_iter()
            .map(|expr| 
               match &expr {
                  Expr(ref expr) => {
                      let agg = expr.evaluate(&df, &groups)?;
                      polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());
                      Ok(agg)
                  }
             })
            .collect::<PolarsResult<Vec<_>>>()
    });

    let mut final_cols = vec![];

    let agg_columns = agg_columns?;

    state.expr_cache = None;

    final_cols.extend_from_slice(&agg_columns);

    Ok(DataFrame::new_no_checks(final_cols))
}

I'm not sure if it's the correct way to do things, but it does seem to allow you to run expressions through the slices:

with Timer():
   pl.DataFrame._from_pydf(
      df._df.agg_slices(
         slices.select('start')._df, 
         slices.select('end')._df, 
         [
            pl.first('values').alias('first')._pyexpr,
            pl.last('values').alias('last')._pyexpr,
            pl.col('values').str.to_lowercase().alias('lower')._pyexpr,
            pl.col('date')._pyexpr
         ]
      )
   )

# shape: (300_000, 4)
# ┌───────┬──────┬─────────────────┬───────────────────────────────────┐
# │ first ┆ last ┆ lower           ┆ date                              │
# │ ---   ┆ ---  ┆ ---             ┆ ---                               │
# │ str   ┆ str  ┆ list[str]       ┆ list[date]                        │
# ╞═══════╪══════╪═════════════════╪═══════════════════════════════════╡
# │ A     ┆ B    ┆ ["a", "b"]      ┆ [2000-01-01, 2000-06-01]          │
# │ A     ┆ C    ┆ ["a", "b", "c"] ┆ [2000-01-01, 2000-06-01, 2001-01… │
# │ D     ┆ D    ┆ ["d"]           ┆ [2003-01-01]                      │
# │ A     ┆ B    ┆ ["a", "b"]      ┆ [2000-01-01, 2000-06-01]          │
# │ …     ┆ …    ┆ …               ┆ …                                 │
# │ D     ┆ D    ┆ ["d"]           ┆ [2003-01-01]                      │
# │ A     ┆ B    ┆ ["a", "b"]      ┆ [2000-01-01, 2000-06-01]          │
# │ A     ┆ C    ┆ ["a", "b", "c"] ┆ [2000-01-01, 2000-06-01, 2001-01… │
# │ D     ┆ D    ┆ ["d"]           ┆ [2003-01-01]                      │
# └───────┴──────┴─────────────────┴───────────────────────────────────┘
# Elapsed time: 0.0843 seconds

It seems like expressions are supposed to go through some other processing somewhere because if I pass in pl.all() it raises:

PanicException: should be no wildcard at this point

@erinov1
Copy link
Author

erinov1 commented Jun 30, 2023

@cmdlineluser very interesting, definitely further than I got trying to sort through the code! It would be nice to get some feedback from the devs whether this is the right way to go about it.

@cmdlineluser
Copy link
Contributor

I think I've figured out all the steps involved with a groupby.agg() operation.

.groupby* returns a LazyGroupBy [1]

LazyGroupBy.agg() calls LogicalPlanBuilder.groupby() [2]

This returns a LogicalPlan::Aggregate [3]

Which gets turned into a LazyFrame [4]

Logical plans are turned into physical plans [5]

It seems options used to define a dynamic/rolling, else it's a normal groupby.

options live here: [6]

options are set when creating LogicalPlan::Aggregate [7]

The physical plan then dispatches to the corresponding groupby executor e.g. polars/polars-lazy/src/physical_plan/executors/groupby.rs

For rolling/dynamic the executors end up dispatching to the functions in: polars/polars-time/src/groupby/dynamic.rs which generate the GroupsProxy objects.


[1] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/src/frame/mod.rs#L700
[2] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/src/frame/mod.rs#L1334
[3] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs#L480
[4] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/src/frame/mod.rs#L1351
[5] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/src/physical_plan/planner/lp.rs#L389
[6] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/polars-plan/src/logical_plan/options.rs#L137
[7] https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs#L550

@cmdlineluser
Copy link
Contributor

cmdlineluser commented Jul 3, 2023

So it seems like the only way to do this currently is to copy how groupby_rolling / .groupby_dynamic work.

I've done that here to make .groupby_slicing(): https://github.com/cmdlineluser/polars/tree/groupby-slicing

Code example
import polars as pl
from   datetime import date

df = pl.DataFrame(
    {
        "date": [
            date(2000, 1, 1),
            date(2000, 6, 1),
            date(2001, 1, 1),
            date(2003, 1, 1),
            date(2022, 1, 1),
        ],
        "id": [1, 1, 1, 2, 2],
        "values": ["A", "B", "C", "D", "E"],
    }
).lazy()

date_lookup = pl.DataFrame(
    {
        "id": [1, 1, 2],
        "start_date": [date(2000, 1, 1), date(2000, 1, 1), date(2002, 6, 1)],
        "end_date": [date(2000, 7, 1), date(2002, 1, 1), date(2003, 6, 1)],
    }
).lazy()

df = df.with_row_count("index")

slices = (
    date_lookup.join_asof(
        df,
        left_on="start_date",
        right_on="date",
        by="id",
        strategy="forward",
    )
    .join_asof(
        df,left_on="end_date", right_on="date", by="id", strategy="backward"
    )
    .select(
        [
            pl.col("id").alias("id_right"),
            "start_date",
            "end_date",
            pl.col("index").alias("start"),
            (pl.col("index_right") + 1).alias("end"),
        ]
    )
)

df = df.with_row_count().join(slices.with_row_count("index"), on="index", how='left')

print(
    pl.LazyFrame._from_pyldf(
       df._ldf
       .groupby_slicing(pl.col('start')._pyexpr, pl.col('end')._pyexpr)
       .agg([
          pl.col('start', 'end', 'date', 'id', 'values')._pyexpr,
          pl.col('date').min().alias('min')._pyexpr,
          pl.col('values').n_unique().alias('n_unique')._pyexpr
       ]) 
    )
    .collect()
)
shape: (5, 7)
┌───────────┬───────────┬───────────────────────────────────┬───────────┬─────────────────┬────────────┬──────────┐
│ start     ┆ end       ┆ date                              ┆ id        ┆ values          ┆ min        ┆ n_unique │
│ ---       ┆ ---       ┆ ---                               ┆ ---       ┆ ---             ┆ ---        ┆ ---      │
│ list[u32] ┆ list[u32] ┆ list[date]                        ┆ list[i64] ┆ list[str]       ┆ date       ┆ u32      │
╞═══════════╪═══════════╪═══════════════════════════════════╪═══════════╪═════════════════╪════════════╪══════════╡
│ [0, 0]    ┆ [2, 3]    ┆ [2000-01-01, 2000-06-01]          ┆ [1, 1]    ┆ ["A", "B"]      ┆ 2000-01-01 ┆ 2        │
│ [0, 0, 3] ┆ [2, 3, 4] ┆ [2000-01-01, 2000-06-01, 2001-01… ┆ [1, 1, 1] ┆ ["A", "B", "C"] ┆ 2000-01-01 ┆ 3        │
│ [null]    ┆ [null]    ┆ [2003-01-01]                      ┆ [2]       ┆ ["D"]           ┆ 2003-01-01 ┆ 1        │
│ []        ┆ []        ┆ []                                ┆ []        ┆ []              ┆ null       ┆ null     │
│ []        ┆ []        ┆ []                                ┆ []        ┆ []              ┆ null       ┆ null     │
└───────────┴───────────┴───────────────────────────────────┴───────────┴─────────────────┴────────────┴──────────┘

The only part that's currently missing is you need to respecify the start, end columns in the .agg() call otherwise there is a ColumnNotFound error.

And I'm not sure if the building of the GroupsProxy is supposed to be multithreaded on the POOL or not: https://github.com/cmdlineluser/polars/blob/groupby-slicing/polars/polars-lazy/src/physical_plan/executors/groupby_slicing.rs#L30


Update: Thanks to reading through #9649 it seems the step I was missing for the ColumnNotFound error was adding the start/end columns to the projection_pushdown

@cmdlineluser
Copy link
Contributor

#9691 has just been posted which seems like a better approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants