Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window frame GROUPS mode support #4155

Merged
merged 8 commits into from
Nov 11, 2022
Merged

Window frame GROUPS mode support #4155

merged 8 commits into from
Nov 11, 2022

Conversation

zembunia
Copy link
Contributor

@zembunia zembunia commented Nov 9, 2022

Which issue does this PR close?

This PR provides the support of the GROUPS mode in the window frames, which was a missing item in #3570 enhancement.
The GROUPS mode is implemented regarding the specification in PostgreSQL window function calls.

Rationale for this change

This change is part of an enhancement #361 that is on the roadmap.

What changes are included in this PR?

The common single method calculating the window range (calculate_range) is removed from the window_expr. New structs that can hold any state information for each window frame mode are introduced.
The ROWS mode does not require a state as it is simple row index calculation, thought the state struct is empty apart from the simple calculate_range method specific to ROWS mode.
For the RANGE mode, a stateful calculation can be utilized in the future. For now, the state struct is empty and the specific calculate_range implementation is moved to the state struct.
For the GROUPS mode, a stateful implementation, that keeps track of the moving window range of groups for each consecutive row, is provided.
The frame exclusion is still not supported.

Observations

The implementation for the RANGE mode can also utilize a stateful implementation, instead of calculating the window range for each row from scratch.

Future work

  • Stateful RANGE mode implementation
  • A method to find the next group index, utilizing an exponentially growing step size, is implemented in this PR (find_next_group_and_start_index). This method can be improved to choose an approach depending on statistics about previous group sizes. It can either search the next group by advancing one-by-one (for small group sizes) or utilizing the exponentially growing step size, or even setting a base step size when exponentially growing. We can also create a benchmark implementation to get insights about the crossover point.

Are these changes tested?

New unit tests relevant to the added functionality are added in window_frame_state.rs. The tests in windows.rs is extended to cover the GROUPS mode, and a test file is added to the integration test SQLs.

Are there any user-facing changes?

No

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions labels Nov 9, 2022
@zembunia zembunia changed the title Feature/window frame groups Window frame GROUPS mode support Nov 9, 2022
@ozankabak
Copy link
Contributor

@alamb, this already went through our internal review process, so I can say LGTM. Looking forward to getting community feedback.

@alamb
Copy link
Contributor

alamb commented Nov 9, 2022

Thank you @ozankabak -- I will put this on my review queue for tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zembunia this is a very nice PR and a pleasure to read. It is well tested, well commented, and well structured. 🏆 Thank you.

I left a few style comments, but nothing that needs to be completed prior to merging from my perspective.

Here is the relevant description of GROUPs for anyone else reviewing this PR

In GROUPS mode, the offset again must yield a non-null, non-negative integer, and the option means that the frame starts or ends the specified number of peer groups before or after the current row's peer group, where a peer group is a set of rows that are equivalent in the ORDER BY ordering. (There must be an ORDER BY clause in the window definition to use GROUPS mode.)

I'll plan to merge this PR tomorrow unless there are any additional comments

datafusion/common/src/bisect.rs Show resolved Hide resolved
@@ -1511,12 +1511,6 @@ pub fn create_window_expr_with_name(
})
.collect::<Result<Vec<_>>>()?;
if let Some(ref window_frame) = window_frame {
if window_frame.units == WindowFrameUnits::Groups {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

let err = df.collect().await.unwrap_err();
assert_contains!(
err.to_string(),
"Execution error: GROUPS mode requires an ORDER BY clause".to_owned()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -113,10 +114,10 @@ impl WindowExpr for BuiltInWindowExpr {
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very nice encapsulation of the window frame calculation. Thank you

#[derive(Debug)]
pub enum WindowFrameContext<'a> {
// ROWS-frames are inherently stateless:
Rows(&'a Arc<WindowFrame>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a matter of style, I find &Arc<WindowFrame> strange. I would expect either an owned Arc (aka increment the ref count):

Suggested change
Rows(&'a Arc<WindowFrame>),
Rows(Arc<WindowFrame>),

Or else just a reference:

Suggested change
Rows(&'a Arc<WindowFrame>),
Rows(&'a WindowFrame),

I don't understand the need to have a reference to the Arc

&mut self,
window_frame: &Arc<WindowFrame>,
range_columns: &[ArrayRef],
_sort_options: &[SortOptions],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the sort options are not used, I wonder why pass it to calculate_range

// This structure encapsulates all the state information we require as we
// scan groups of data while processing window frames.
#[derive(Debug, Default)]
pub struct WindowFrameStateGroups {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure is fine -- I just wanted to point out a pattern for writing state machines in Rust is to encapsulate each state in an enum variant, and then you can use the type system to ensure the state transitions are valid

So like

pub enum WindowFrameStateGroupsState {
  Start { },
  InGroup { ... }
  End {}
}

Or something similar

/// This function finds the next group and its start index for a given group and start index.
/// It utilizes an exponentially growing step size to find the group boundary.
// TODO: For small group sizes, proceeding one-by-one to find the group change can be more efficient.
// Statistics about previous group sizes can be used to choose one-by-one vs. exponentially growing,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree there are likely many ways to improve the performance of this algorithm -- however, I think the first thing to do is getting a working implementation that is well tested and structured (as this PR is) and as you say we can drive additional optimizations from there.

Comment on lines 711 to 716
let arrays: Vec<ArrayRef> = vec![
Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 9., 10.])),
Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 4.0, 5.0])),
Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 10., 11.0])),
Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 5., 0.0])),
];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, you can probably avoid a non trivial amount of repetition from these tests by refactoring the array functions into an individual function.

}

#[test]
fn test_window_frame_groups_preceding_huge_delta() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a delta of 10 "huge"? Is it because the delta is larger than the size of the arrays?

@jimexist
Copy link
Member

thanks for putting the work into this.

turns out the psql comparison integration test is of great use.

@ozankabak
Copy link
Contributor

Thanks @alamb and @jimexist! We will incorporate review feedback and mention you when it is ready to merge.

@alamb
Copy link
Contributor

alamb commented Nov 11, 2022

turns out the psql comparison integration test is of great use.

Yes I agree -- it is wonderful!

@zembunia
Copy link
Contributor Author

@alamb The low hanging fruits are addressed. Two of your non-blocker feedbacks (state machine implementation, ref counting) will be examined and worked on next week. This can be merged, if you approve.

@alamb
Copy link
Contributor

alamb commented Nov 11, 2022

Thanks @zembunia

The low hanging fruits are addressed. Two of your non-blocker feedbacks (state machine implementation, ref counting) will be examined and worked on next week. This can be merged, if you approve.

will do.

Also to be clear, those are suggestions / nice to haves. Definitely not required unless you think it would improve the code and you have the time

@alamb alamb merged commit 129654c into apache:master Nov 11, 2022
@ursabot
Copy link

ursabot commented Nov 11, 2022

Benchmark runs are scheduled for baseline = 225d62c and contender = 129654c. 129654c is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants