Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obtain multiple ranges of data (for aggregation) from a huge dataset - similar to R's foverlaps() in the data.table package #9481

Closed
lux5 opened this issue Jun 21, 2023 · 10 comments
Labels
enhancement New feature or an improvement of an existing feature

Comments

@lux5
Copy link

lux5 commented Jun 21, 2023

Problem description

I would like to use Polars to perform some analysis on multiple small ranges of data from a huge dataset. I will provide example data for a quick demo of my goal.

import polars as pl

ts = pl.read_parquet('flo_ts.parquet')
print(ts)

# shape: (35_849, 2)
# ┌──────────────────────────┬────────┐
# │ Time                     ┆ Flow   │
# │ ---                      ┆ ---    │
# │ datetime[μs, Etc/GMT-12] ┆ f64    │
# ╞══════════════════════════╪════════╡
# │ 2013-07-01 00:00:00 +12  ┆ 1.293  │
# │ 2013-07-01 00:15:00 +12  ┆ 1.293  │
# │ 2013-07-01 00:30:00 +12  ┆ 1.293  │
# │ 2013-07-01 00:45:00 +12  ┆ 1.27   │
# │ …                        ┆ …      │
# │ 2014-06-30 23:00:00 +12  ┆ 23.693 │
# │ 2014-06-30 23:15:00 +12  ┆ 21.968 │
# │ 2014-06-30 23:30:00 +12  ┆ 21.456 │
# │ 2014-06-30 23:45:00 +12  ┆ 20.169 │
# └──────────────────────────┴────────┘

lookup = pl.read_parquet('events_of_interests.parquet')
print(lookup)

# shape: (4, 3)
# ┌─────┬──────────────────────────┬──────────────────────────┐
# │ ID  ┆ Start                    ┆ End                      │
# │ --- ┆ ---                      ┆ ---                      │
# │ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] │
# ╞═════╪══════════════════════════╪══════════════════════════╡
# │ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  │
# │ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  │
# │ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  │
# │ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  │
# └─────┴──────────────────────────┴──────────────────────────┘

Now, I would like to get the results shown below - have the Maximum and Time to Maximum for each event ID in the lookup DataFrame:

"""
shape: (4, 5)
┌─────┬──────────────────────────┬──────────────────────────┬────────┬──────────────────────────┐
│ ID  ┆ Start                    ┆ End                      ┆ Max    ┆ Time_max                 │
│ --- ┆ ---                      ┆ ---                      ┆ ---    ┆ ---                      │
│ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
╞═════╪══════════════════════════╪══════════════════════════╪════════╪══════════════════════════╡
│ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
│ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
│ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
│ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
└─────┴──────────────────────────┴──────────────────────────┴────────┴──────────────────────────┘
"""

Before I can get the above results, I need to assign the ID to the input flow time series based on the Start and End of each event - so that I could aggregate on the Flow values by each corresponding ID. For the moment, based on my understanding of Polars module. It would be very easy for me to write a loop like follows:

ts_lookup = pl.DataFrame()
for row in lookup.iter_rows(named=True):
    ts_event = (
        ts.filter(
            (pl.col('Time') >= row['Start'])
            & (pl.col('Time') <= row['End'])
        )
        .with_columns(pl.lit(row['ID']).alias('ID'))
    )
    ts_lookup = pl.concat([ts_lookup, ts_event])

print(ts_lookup)

# shape: (16_703, 3)
# ┌──────────────────────────┬────────┬─────┐
# │ Time                     ┆ Flow   ┆ ID  │
# │ ---                      ┆ ---    ┆ --- │
# │ datetime[μs, Etc/GMT-12] ┆ f64    ┆ i32 │
# ╞══════════════════════════╪════════╪═════╡
# │ 2013-07-02 10:25:00 +12  ┆ 3.042  ┆ 1   │
# │ 2013-07-02 10:30:00 +12  ┆ 3.07   ┆ 1   │
# │ 2013-07-02 10:35:00 +12  ┆ 3.113  ┆ 1   │
# │ 2013-07-02 10:40:00 +12  ┆ 3.142  ┆ 1   │
# │ …                        ┆ …      ┆ …   │
# │ 2014-06-30 23:00:00 +12  ┆ 23.693 ┆ 4   │
# │ 2014-06-30 23:15:00 +12  ┆ 21.968 ┆ 4   │
# │ 2014-06-30 23:30:00 +12  ┆ 21.456 ┆ 4   │
# │ 2014-06-30 23:45:00 +12  ┆ 20.169 ┆ 4   │
# └──────────────────────────┴────────┴─────┘

Now I can aggregate on the Flow values to get what I need:

ts_lookup.groupby('ID').agg(
    pl.col('Flow').max().alias('Max'),
    pl.col('Time').sort_by('Flow', descending=True).first().alias('Time_max'),
)
# shape: (4, 3)
# ┌─────┬────────┬──────────────────────────┐
# │ ID  ┆ Max    ┆ Time_max                 │
# │ --- ┆ ---    ┆ ---                      │
# │ i32 ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
# ╞═════╪════════╪══════════════════════════╡
# │ 1   ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
# │ 2   ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
# │ 3   ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
# │ 4   ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
# └─────┴────────┴──────────────────────────┘

I only used a small portion of my dataset, and the lookup table's height is only 4. So the loop is fine. But I sometimes have 10K events needed from close to 2 million values of the input time series, that is a problem! I have tried around 5k loops to do the aggregation at the same time (see here for an example), but it ended up with the terminal window being frozen...

I would like to request a Polars function/method - to achieve this, a similar function like R's foverlaps() in the data.table package. This tool can clip out all the needed time series (in my case) in a single operation. It is so fast and convenient (You can find example data and R codes here).

Many thanks to the Polars developers for making such a useful tool.

@lux5 lux5 added the enhancement New feature or an improvement of an existing feature label Jun 21, 2023
@erinov1
Copy link

erinov1 commented Jun 21, 2023

This is essentially the same as #9467 and #5891.

In your case it is probably much faster to find which row indices in ts each interval in lookup corresponds to (by sorting the data and using either search_sorted or a rolling join). Then you can replace the filter operation in the construction of ts_lookup with a much faster zero-copy slice operation (e.g., #5891 (comment)).

As I commented in #9467, in my own applications (~500M rows of data, ~50M intervals each spanning ~10K rows), it was much faster to first find the slice indices using polars, but then iterate + aggregate over the data slices with numba.

In principle, one might also be able to do this on the rust side by constructing a custom GroupsProxy consisting of all the slices, but I wasn't able to see an entrypoint to the aggregation context from such a construction. I previously opened opened #7211, perhaps @ritchie46 can comment.

@avimallu
Copy link
Contributor

Oh wow, @erinov1, I did not catch the link between foverlap like functionality and groupby_slice requests! Once non-equi joins are in place in Polars, I think implementing these will become easier.

@lux5
Copy link
Author

lux5 commented Jun 22, 2023

This is essentially the same as #9467 and #5891.

In your case it is probably much faster to find which row indices in ts each interval in lookup corresponds to (by sorting the data and using either search_sorted or a rolling join). Then you can replace the filter operation in the construction of ts_lookup with a much faster zero-copy slice operation (e.g., #5891 (comment)).

As I commented in #9467, in my own applications (~500M rows of data, ~50M intervals each spanning ~10K rows), it was much faster to first find the slice indices using polars, but then iterate + aggregate over the data slices with numba.

In principle, one might also be able to do this on the rust side by constructing a custom GroupsProxy consisting of all the slices, but I wasn't able to see an entrypoint to the aggregation context from such a construction. I previously opened opened #7211, perhaps @ritchie46 can comment.

Thank you for giving me the idea of using search_sorted! I will give it a try myself. As for the rolling join example in here, I couldn't get the example code working.

@cmdlineluser
Copy link
Contributor

This is the approach mentioned in #9467 incase you want to try it.

You explode the range of slices and use .take(idx) to populate the values corresponding values:

ts = pl.scan_parquet('flo_ts.parquet')
lookup = pl.scan_parquet('events_of_interests.parquet')

slices = ts.with_context(lookup).select( 
   'ID',
   'Start',
   'End',
   idx = pl.arange(
      pl.col('Time').search_sorted('Start'),
      pl.col('Time').search_sorted('End')
   )
)

(ts.with_context(slices.explode('idx'))
   .select('ID', 'Start', 'End', pl.col('Time', 'Flow').take('idx'))
   .groupby('ID').agg(
      pl.col('Flow').max().alias('Max'),
      pl.col('Time').sort_by('Flow', descending=True).first().alias('Time_max'),
   )
).collect()
shape: (4, 3)
┌─────┬────────┬──────────────────────────┐
│ ID  ┆ Max    ┆ Time_max                 │
│ --- ┆ ---    ┆ ---                      │
│ i32 ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
╞═════╪════════╪══════════════════════════╡
│ 1   ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
│ 2   ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
│ 3   ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
│ 4   ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
└─────┴────────┴──────────────────────────┘

@lux5
Copy link
Author

lux5 commented Jul 1, 2023

This is the approach mentioned in #9467 incase you want to try it.

You explode the range of slices and use .take(idx) to populate the values corresponding values:

ts = pl.scan_parquet('flo_ts.parquet')
lookup = pl.scan_parquet('events_of_interests.parquet')

slices = ts.with_context(lookup).select( 
   'ID',
   'Start',
   'End',
   idx = pl.arange(
      pl.col('Time').search_sorted('Start'),
      pl.col('Time').search_sorted('End')
   )
)

(ts.with_context(slices.explode('idx'))
   .select('ID', 'Start', 'End', pl.col('Time', 'Flow').take('idx'))
   .groupby('ID').agg(
      pl.col('Flow').max().alias('Max'),
      pl.col('Time').sort_by('Flow', descending=True).first().alias('Time_max'),
   )
).collect()
shape: (4, 3)
┌─────┬────────┬──────────────────────────┐
│ ID  ┆ Max    ┆ Time_max                 │
│ --- ┆ ---    ┆ ---                      │
│ i32 ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
╞═════╪════════╪══════════════════════════╡
│ 1   ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
│ 2   ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
│ 3   ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
│ 4   ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
└─────┴────────┴──────────────────────────┘

Wow, this is awesome! I like this one. The hard part for me is

...
pl.arange(
      pl.col('Time').search_sorted('Start'),
      pl.col('Time').search_sorted('End')
   )
...

I eventually see what it does after I break it and check the DOC.
Thank you for this solution, and it generally solves my issues.

@cmdlineluser
Copy link
Contributor

Yeah, it explodes out the range - it seems to perform well enough in some of these cases.

If I try it with a larger version of @erinov1 's #5891 example though it gets real slow and tries to use huge amounts of memory.

I guess it's not an efficient/scalable approach for when there are large amounts of overlapping ranges.

@lux5
Copy link
Author

lux5 commented Jul 2, 2023

Thank you for explanation about its use case, cmdlineluser. Hopefully, Polars could have a similar function like foverlaps() in the near future.

@areeh
Copy link

areeh commented Jul 3, 2023

I hit the same situation a while back and wrote some code inside Polars to get it to work (See: #9691, bear in mind it probably has errors and was copying from the groupby_dynamic/rolling functions as they were months ago). I'd love to see similar functionality added to Polars properly

@cmdlineluser
Copy link
Contributor

I think the new .join_where() may also close this issue?

(lookup
  .join_where(ts,
     pl.col.Time >= pl.col.Start,
     pl.col.Time <= pl.col.End
  )
  .group_by(pl.col.ID)
  .agg(pl.all().get(pl.col.Flow.arg_max()))
)

# shape: (4, 5)
# ┌─────┬──────────────────────────┬──────────────────────────┬──────────────────────────┬────────┐
# │ ID  ┆ Start                    ┆ End                      ┆ Time                     ┆ Flow   │
# │ --- ┆ ---                      ┆ ---                      ┆ ---                      ┆ ---    │
# │ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ f64    │
# ╞═════╪══════════════════════════╪══════════════════════════╪══════════════════════════╪════════╡
# │ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  ┆ 2014-05-29 05:00:00 +12  ┆ 24.522 │
# │ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  ┆ 2014-06-25 12:15:00 +12  ┆ 117.61 │
# │ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  ┆ 2013-07-06 04:20:00 +12  ┆ 93.504 │
# │ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  ┆ 2013-10-24 08:00:00 +12  ┆ 23.561 │
# └─────┴──────────────────────────┴──────────────────────────┴──────────────────────────┴────────┘

@lux5
Copy link
Author

lux5 commented Sep 15, 2024

I think the new .join_where() may also close this issue?

(lookup

  .join_where(ts,

     pl.col.Time >= pl.col.Start,

     pl.col.Time <= pl.col.End

  )

  .group_by(pl.col.ID)

  .agg(pl.all().get(pl.col.Flow.arg_max()))

)



# shape: (4, 5)

# ┌─────┬──────────────────────────┬──────────────────────────┬──────────────────────────┬────────┐

# │ ID  ┆ Start                    ┆ End                      ┆ Time                     ┆ Flow   │

# │ --- ┆ ---                      ┆ ---                      ┆ ---                      ┆ ---    │

# │ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ f64    │

# ╞═════╪══════════════════════════╪══════════════════════════╪══════════════════════════╪════════╡

# │ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  ┆ 2014-05-29 05:00:00 +12  ┆ 24.522 │

# │ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  ┆ 2014-06-25 12:15:00 +12  ┆ 117.61 │

# │ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  ┆ 2013-07-06 04:20:00 +12  ┆ 93.504 │

# │ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  ┆ 2013-10-24 08:00:00 +12  ┆ 23.561 │

# └─────┴──────────────────────────┴──────────────────────────┴──────────────────────────┴────────┘

Thank you for the nice solution. I didn't know that method .join_where() was added. This answers my question. Yes, the post can be closed.

@lux5 lux5 closed this as completed Sep 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

5 participants