Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add window expression part 1 - logical and physical planning, structure, to/from proto, and explain, for empty over clause only #334

Merged
merged 21 commits into from
May 21, 2021

Conversation

jimexist
Copy link
Member

@jimexist jimexist commented May 13, 2021

Which issue does this PR close?

We'd like to support window function in three or more steps:

  1. Support window functions with basic logical planning and physical planning #359 basic structure
  2. Support window functions with empty OVER clause #298 empty over clause
  3. Support window functions with PARTITION BY clause #299 with partition clause
  4. Support window functions with order by #360 with order by
  5. Support window functions with window frame #361 with window frame

Closes #359.

This is partly re #298.

Work to be done next: finish #298 and #299

Rationale for this change

this is a very stretch version of adding window function constructs to the planner, proto, etc.

What changes are included in this PR?

  • logical expression for window expression
  • physical expression for window expression
  • basic exec plan structure
  • basic from/to proto parsing
  • support explain parsing now
> explain select c1, sum(c3) over () from test;
+--------------+------------------------------------------------------------------+
| plan_type    | plan                                                             |
+--------------+------------------------------------------------------------------+
| logical_plan | Projection: #c1, #SUM(c3)                                        |
|              |   WindowAggr: windowExpr=[[SUM(#c3)]] partitionBy=[], orderBy=[] |
|              |     TableScan: test projection=None                              |
+--------------+------------------------------------------------------------------+
1 row in set. Query took 0 seconds.
> explain select c1, c3, sum(c3 + 2) over () from test;
+--------------+--------------------------------------------------------------------------------+
| plan_type    | plan                                                                           |
+--------------+--------------------------------------------------------------------------------+
| logical_plan | Projection: #c1, #c3, #SUM(c3 Plus Int64(2))                                   |
|              |   WindowAggr: windowExpr=[[SUM(#c3 Plus Int64(2))]] partitionBy=[], orderBy=[] |
|              |     TableScan: test projection=None                                            |
+--------------+--------------------------------------------------------------------------------+
1 row in set. Query took 0 seconds.
> explain select c1, c3, sum(c3) over (), max(c3) over (), avg(c3) over () from test;
+--------------+--------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                 |
+--------------+--------------------------------------------------------------------------------------+
| logical_plan | Projection: #c1, #c3, #SUM(c3), #MAX(c3), #AVG(c3)                                   |
|              |   WindowAggr: windowExpr=[[SUM(#c3), MAX(#c3), AVG(#c3)]] partitionBy=[], orderBy=[] |
|              |     TableScan: test projection=None                                                  |
+--------------+--------------------------------------------------------------------------------------+
1 row in set. Query took 0 seconds.

Are there any user-facing changes?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a cool start @jimexist 👍

datafusion/src/logical_plan/builder.rs Outdated Show resolved Hide resolved
@jimexist jimexist force-pushed the add-window-expr branch 2 times, most recently from 7ae445a to 11e9541 Compare May 15, 2021 15:46
@jimexist
Copy link
Member Author

I wonder which approach makes more sense:

  • To implement proto and serde as an API first and leave a lot of unimplemented stubs
  • To try to cover a minimal working end to end feature and leave advanced use cases like window frames for later

@Dandandan
Copy link
Contributor

@jimexist I think it would be perfectly fine if more "advanced" features like window frames etc. are not supported, just as not all window functions have to be supported.
If we could just support e.g. ROW_NUMBER() OVER () or MAX(x) OVER () with default spec that would be great already.

@jimexist jimexist changed the title Add window expr Add window expr (part I - to only support empty over () clauses) May 17, 2021
@jimexist jimexist force-pushed the add-window-expr branch 7 times, most recently from ad02da7 to a300aae Compare May 19, 2021 14:41
@jimexist jimexist changed the title Add window expr (part I - to only support empty over () clauses) Add window expression part 1 - logical and physical planning, structure, to/from proto, and explain, for empty over clause only May 19, 2021
@jimexist jimexist marked this pull request as ready for review May 19, 2021 14:58
@jimexist
Copy link
Member Author

@Dandandan @alamb PTAL

@alamb
Copy link
Contributor

alamb commented May 19, 2021

I will try to review this later today but I may not get to it until tomorrow


message WindowExprNode {
oneof window_function {
AggregateFunction aggr_function = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked whether this makes sense to reuse aggregate functions for window expressions - I think it does! E.g. PostgreSQL also says:
https://www.postgresql.org/docs/9.3/functions-window.html

In addition to these functions, any built-in or user-defined aggregate function can be used as a window function (see Section 9.20 for a list of the built-in aggregates). Aggregate functions act as window functions only when an OVER clause follows the call; otherwise they act as regular aggregates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes in general three types of things can be used:

  1. aggregation
  2. UDAF
  3. built in window function

for both 1. and 2. they are not order sensitive, but for 3 we'll have to take sort into account

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[postgres] # explain select c1, count(c3) over (partition by c1 order by c3) from test;
                            QUERY PLAN
------------------------------------------------------------------
 WindowAgg  (cost=6.32..8.32 rows=100 width=12)
   ->  Sort  (cost=6.32..6.57 rows=100 width=4)
         Sort Key: c1, c3
         ->  Seq Scan on test  (cost=0.00..3.00 rows=100 width=4)
(4 rows)
[postgres] # explain select c1, first_value(c3) over (partition by c1 order by c3) from test;
                            QUERY PLAN
------------------------------------------------------------------
 WindowAgg  (cost=6.32..8.32 rows=100 width=6)
   ->  Sort  (cost=6.32..6.57 rows=100 width=4)
         Sort Key: c1, c3
         ->  Seq Scan on test  (cost=0.00..3.00 rows=100 width=4)
(4 rows)

IMO only the second time we'll really need to sort by c3

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also fun thing to notice:

[postgres] # explain analyze select c1, sum(c3) over (partition by c1 order by c3), avg(c3) over (partition by c1 order by c3 desc) from test;
                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
 WindowAgg  (cost=11.64..13.64 rows=100 width=44) (actual time=1.287..1.373 rows=100 loops=1)
   ->  Sort  (cost=11.64..11.89 rows=100 width=36) (actual time=1.281..1.292 rows=100 loops=1)
         Sort Key: c1, c3
         Sort Method: quicksort  Memory: 31kB
         ->  WindowAgg  (cost=6.32..8.32 rows=100 width=36) (actual time=1.051..1.174 rows=100 loops=1)
               ->  Sort  (cost=6.32..6.57 rows=100 width=4) (actual time=0.221..0.231 rows=100 loops=1)
                     Sort Key: c1, c3 DESC
                     Sort Method: quicksort  Memory: 29kB
                     ->  Seq Scan on test  (cost=0.00..3.00 rows=100 width=4) (actual time=0.010..0.028 rows=100 loops=1)
 Planning Time: 0.087 ms
 Execution Time: 1.437 ms
(11 rows)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked whether this makes sense to reuse aggregate functions for window expressions - I think it does! E.g. PostgreSQL also says:
https://www.postgresql.org/docs/9.3/functions-window.html

In addition to these functions, any built-in or user-defined aggregate function can be used as a window function (see Section 9.20 for a list of the built-in aggregates). Aggregate functions act as window functions only when an OVER clause follows the call; otherwise they act as regular aggregates.

it is very useful for analytics, e.g. if you want to know (in an employee table with name, department, and salary) the list of employees in each department with salary above average.

Copy link
Contributor

@Dandandan Dandandan May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe count(..) over order by .. also needs to be sorted, it will do a count over the window, which means a running count (over sorted rows) by default.
But yeah very useful for analytics indeed 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment above was more about re-using the same functions over here - as I thought we might not want to support every aggregation function here too. But for me it sounds like a good idea to reuse them. Maybe @alamb has some ideas about it as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SQL is confusing in this area -- as @jimexist says, all "normal" aggregate functions (e.g. sum, count, etc) are also valid window functions, but the reverse is not true. You can't use window functions (e.g. LAG, LEAD, etc) outside of a window clause.

Thus I think representing window functions as a new type of function, as this PR does, makes the most sense. They are different enough (e.g. require information on the incoming windows) that trying to wrangle them into the same structures as normal aggregates seems like it will get messy. Long term I would expect we have a UDWF (user defined window function) api as well.

Ideally the physical implementation for sum / count / etc can be mostly reused but in the plans I think they are different enough to warrant different plan structures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe count(..) over order by .. also needs to be sorted, it will do a count over the window, which means a running count (over sorted rows) by default.

Good point! Indeed.

@alamb
Copy link
Contributor

alamb commented May 20, 2021

Checking it out...

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over this PR quite carefully. Thank you very much @jimexist for the contribution! ❤️
❤️ -- this PR looks to be in great shape.

All I think this PR needs is a few more tests and it could be merged in.

I am not familiar with the ballista code, but it looked ok to me. @andygrove do you have any suggestions of who might be interested in those changes?


message WindowExprNode {
oneof window_function {
AggregateFunction aggr_function = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SQL is confusing in this area -- as @jimexist says, all "normal" aggregate functions (e.g. sum, count, etc) are also valid window functions, but the reverse is not true. You can't use window functions (e.g. LAG, LEAD, etc) outside of a window clause.

Thus I think representing window functions as a new type of function, as this PR does, makes the most sense. They are different enough (e.g. require information on the incoming windows) that trying to wrangle them into the same structures as normal aggregates seems like it will get messy. Long term I would expect we have a UDWF (user defined window function) api as well.

Ideally the physical implementation for sum / count / etc can be mostly reused but in the plans I think they are different enough to warrant different plan structures.

datafusion/src/logical_plan/builder.rs Show resolved Hide resolved
datafusion/src/logical_plan/builder.rs Outdated Show resolved Hide resolved
datafusion/src/logical_plan/builder.rs Show resolved Hide resolved
datafusion/src/physical_plan/window_functions.rs Outdated Show resolved Hide resolved
)));
}

// window needs to operate on a single partition currently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Eventually it would be cool to push the partitioning expressions into a RepartitionExec so that we can execute the window functions in parallel on different windows but that is definitely an optimization for the future (not this initial PR) :)

@@ -2641,13 +2701,23 @@ mod tests {
}

#[test]
fn over_not_supported() {
fn empty_over() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Great

datafusion/src/sql/planner.rs Show resolved Hide resolved
// .iter()
// .map(|expr| expr.try_into())
// .collect::<Result<Vec<_>, _>>()?;
// // FIXME parse the window_frame data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks this is still WIP? Is the plan to complete this as part of this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @jimexist plans to implement the feature in several PRs as outlined in the PR description

@jorgecarleitao
Copy link
Member

I haven't had the time to go through yet;

Since this is a large decomposable change, a suggestion here is to create a branch and merge this on that branch, so that we do not have incomplete features in master and allow PRs to that branch without an issue. Then merge the branch into master once its first iteration is ready (e.g. logical and 1 physical plan).

(This is something that I wished to have when implementing large features; it gives the time to review in parts without the risk of having incomplete code in master)

With that said, I am also fine risk it and merg to master; @jimexist do you have any preference how would you prefer to work on this here? :)

@alamb
Copy link
Contributor

alamb commented May 20, 2021

I am supportive of putting this directly on master -- we have various other "not yet implemented" functionality in DataFusion and I think we can handle any potential confusion with additional documentation

@jimexist
Copy link
Member Author

jimexist commented May 20, 2021

I haven't had the time to go through yet;

Since this is a large decomposable change, a suggestion here is to create a branch and merge this on that branch, so that we do not have incomplete features in master and allow PRs to that branch without an issue. Then merge the branch into master once its first iteration is ready (e.g. logical and 1 physical plan).

(This is something that I wished to have when implementing large features; it gives the time to review in parts without the risk of having incomplete code in master)

With that said, I am also fine risk it and merg to master; @jimexist do you have any preference how would you prefer to work on this here? :)

I wish to:

  1. add unit tests
  2. fix some small comments, per @alamb, like
    1. adding filter clause to leave room for future room without API disruption,
    2. adding nth_value and tile and leaving them unimplemented and leave room for good first PR
  3. merge it into master as it

3 because although I agree with having a feature branch, the maintenance of continuous rebasing would be troublesome, and I can estimate the whole series of window function implementations take > 1 month. if I can keep the merged code isolated and structurally complete then there's little worry about breaking changes in future: I believe this PR:

  1. is isolated because the explain works (thus mostly the API is determined), while actually executing the window SQL returns zero rows, due to unimplemented exec plan, which I intend to do next
  2. is structurally complete if reviewers can focus on the phase and timing of the window clause in the query planner. future changes of e.g. adding one or more sort phases requires only modifications within the .window fn and thus can be non-intrusive

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing design and implementation. I left two small comments, but it is regardless ready to merge.

Thank you very much, @jimexist ; great work.

/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
pub fn window(
&self,
window_expr: impl IntoIterator<Item = Expr>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we should use IntoIterator<Item = Expr> for every field with 6 fields. This will produce a version of the compiled function for every combination, which IMO adds an unnecessary compile time and binary size.

IntoIterator is more relevant when we want to avoid an extra allocation; these are very small vectors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracked in #372

@@ -183,7 +182,7 @@ static TIMESTAMPS: &[DataType] = &[
];

/// the signatures supported by the function `fun`.
fn signature(fun: &AggregateFunction) -> Signature {
pub fn signature(fun: &AggregateFunction) -> Signature {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub(super) or pub(crate) instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracked in #373

@codecov-commenter
Copy link

Codecov Report

Merging #334 (6941151) into master (913bf86) will decrease coverage by 0.95%.
The diff coverage is 36.84%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #334      +/-   ##
==========================================
- Coverage   75.89%   74.94%   -0.96%     
==========================================
  Files         144      146       +2     
  Lines       23771    24314     +543     
==========================================
+ Hits        18040    18221     +181     
- Misses       5731     6093     +362     
Impacted Files Coverage Δ
...lista/rust/core/src/serde/logical_plan/to_proto.rs 62.37% <0.00%> (-5.46%) ⬇️
datafusion/src/optimizer/constant_folding.rs 91.63% <0.00%> (-0.30%) ⬇️
datafusion/src/optimizer/hash_build_probe_order.rs 60.50% <0.00%> (-1.57%) ⬇️
datafusion/src/optimizer/projection_push_down.rs 91.11% <0.00%> (-7.63%) ⬇️
datafusion/src/optimizer/utils.rs 47.76% <0.00%> (-2.05%) ⬇️
datafusion/src/physical_plan/mod.rs 82.75% <0.00%> (-1.95%) ⬇️
datafusion/src/physical_plan/planner.rs 76.51% <0.00%> (-4.11%) ⬇️
datafusion/src/physical_plan/sort.rs 92.07% <ø> (ø)
datafusion/src/physical_plan/windows.rs 0.00% <0.00%> (ø)
datafusion/tests/sql.rs 99.89% <ø> (ø)
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 913bf86...6941151. Read the comment docs.

@alamb alamb merged commit db4f098 into apache:master May 21, 2021
@jimexist jimexist deleted the add-window-expr branch May 21, 2021 15:52
@houqp houqp added ballista datafusion Changes in the datafusion crate enhancement New feature or request labels Jul 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support window functions with basic logical planning and physical planning
7 participants