Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for recursive CTEs #7581

Closed
wants to merge 10 commits into from

Conversation

matthewgapp
Copy link
Contributor

@matthewgapp matthewgapp commented Sep 17, 2023

This PR implements support for recursive CTEs based on @isidentical's design here #462 (comment)

Caveats:

It does not attempt to place safeguards against infinite recursion. That could be implemented as a follow-up PR. It would be great to have some syntax like OPTON (MAXRECURSION 100) to prevent infinite recursion. Since the sql parser crate doesn’t support OPTIONS (MAXRECURSION) I might create a max-recursion parameter to the logical and physical plans that can be inserted manually once the logical plan is produced via SQL as a near-term workaround to protect against infinite recursion. I'll do that in a separate follow-on PR though.

It's slower than it needs to be because of the execution plan inserts coalesce and repartition execution steps within the recursive term. This optimization is solved via this follow-on PR. There is also room to omit gathering of statistics for each execution iteration because setting up statistics on each execution iteration is expensive.

Which issue does this PR close?

Closes #462

Rationale for this change

Datafusion should support recursive CTEs so that it can express calculations that depend on the results of previous iterations.

What changes are included in this PR?

Are these changes tested?

Yes, they're tested via the SQL tests. The SQL tests use CSVs introduced by this PR apache/arrow-testing#93

Are there any user-facing changes?

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate labels Sep 17, 2023
@github-actions github-actions bot added the physical-expr Physical Expressions label Sep 17, 2023
@matthewgapp matthewgapp changed the title Recursive CTE (refreshed) Add support for recursive CTEs (refreshed) Sep 18, 2023
@matthewgapp matthewgapp changed the title Add support for recursive CTEs (refreshed) Add support for recursive CTEs Sep 18, 2023
@github-actions github-actions bot removed the physical-expr Physical Expressions label Sep 19, 2023
@matthewgapp matthewgapp force-pushed the matt/recursive-cte branch 3 times, most recently from 815c72f to f9614d0 Compare September 19, 2023 03:14
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Sep 25, 2023
@matthewgapp
Copy link
Contributor Author

Started adding tests, will continue to add more.

@matthewgapp
Copy link
Contributor Author

I've been out for the last couple weeks because life got busy. I'll be back on this soon to push it over the finish line.

@matthewgapp
Copy link
Contributor Author

matthewgapp commented Nov 18, 2023

Rebased, plan to finish this out today/tomorrow.

@matthewgapp matthewgapp marked this pull request as ready for review November 19, 2023 01:04
@matthewgapp
Copy link
Contributor Author

@alamb Finally got around to cleaning this up. I think this is ready for initial review. Happy to hear what other tests/safeguards you'd like to see in the initial version. But would be great to get in so that I can continue layering on improvements without the PR blowing up into too much of a larger change.

wip: fixes after rebase but tpcds_physical_q54 keeps overflowing its stack
@@ -1387,22 +1387,6 @@ fn select_interval_out_of_range() {
);
}

#[test]
fn recursive_ctes() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

@@ -112,6 +112,8 @@ pub enum LogicalPlan {
/// produces 0 or 1 row. This is used to implement SQL `SELECT`
/// that has no values in the `FROM` clause.
EmptyRelation(EmptyRelation),
/// A named temporary relation with a schema.
NamedRelation(NamedRelation),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering whether the NamedRelation and RecursiveQuery could be implemented as two TableSources, one being CTESelfRefTable and the other being CTERecursiveTable, and then use TableScan to read them.

Use CTESelfRefTable within the recursive term and CTERecursiveTable in the outer query.

But this idea is in its early stages and may be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonahgao, could you provide the rationale for your suggested strategy? I'm interested in understanding why it might be more effective than the current implementation. Performance is critical to our use case. And the implementation for recursion is very sensitive to performance considerations, as the setup for execution and stream management isn't amortized over all input record batches. Instead, it's incurred with each iteration. For instance, we've observed a substantial performance boost—up to 30 times faster—by eliminating certain intermediate nodes, like coalesce, from our plan (as evidenced in this PR). I've drafted another PR that appears to again double the speed of execution merely by omitting metric collection in recursive sub-graphs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One rationale might be to make the implementation simpler -- if we could implement the recursive relation as a table provider, it would likely allow the changes to be more localized / smaller (e.g. maybe we could reuse MemTable::load to update the batches on each iteration)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I understand the need to have LogicalPlan::RecursiveQuery but I don't (yet) understand the need to have the NamedRelation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NamedRelation is primarily a way to mirror batches back to the RecursiveQuery via its physical counterpart, ContinuanceExec

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matthewgapp Another rationale might be to support pushing down filters to the working table, which may be useful if we support spilling the working table to disk in the future. I think the performance should not be affected, the execution of physical plans is almost the same as it is now.

I implemented a demo on this branch and in this commit. GitHub does not allow forking a repository twice, so I directly pushed it to another repository for convenience.

In this demo, I attempted to replace the NamedRelation with a TableProvider, namely CteWorkTable. The benefit of this is that it can avoid maintaining a new logical plan.

Another change is that I used a structure called WorkTable to connect the RecursiveQueryExec and the WorkTableExec (it was previously ContinuanceExec). The advantage of this is that it avoids maintaining some external context information, such as relation_handlers in TaskContext, and the ctx in create_initial_plan.

The WorkTable is a shared table, it will be scanned by the WorkTableExec during the execution of the recursive term, and after the execution is completed, the results will be written back to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, tyty! I was in the process of implementing the shared table and my implementation turned out very similar to yours although I ended up working around the crate dependency graph constraints a bit differently by introducing a couple new traits. But I did end up exposing a method on the context to generate a table. I like your approach better.

I tested out your poc and performance remains about the same between my previous implementation and your new worktable approach! (which makes sense).

I'm going to go ahead and work based on your POC toward the list of PRs that Andrew wants to get this landed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your work and for the nexting contributions! @matthewgapp

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your patience @matthewgapp -- I was finally able to find the few hours I needed to load all the context into my head to review this PR properly

I left some high level comments on the design (specifically how to simplify it / potentially keep the implementation more isolated). However overall I found this PR easy to read and follow. Nice work

BTW if anyone else needs a reminder how recursive CTEs work, here is the postgres documentation: https://www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4

datafusion/expr/src/logical_plan/builder.rs Show resolved Hide resolved
@@ -112,6 +112,8 @@ pub enum LogicalPlan {
/// produces 0 or 1 row. This is used to implement SQL `SELECT`
/// that has no values in the `FROM` clause.
EmptyRelation(EmptyRelation),
/// A named temporary relation with a schema.
NamedRelation(NamedRelation),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One rationale might be to make the implementation simpler -- if we could implement the recursive relation as a table provider, it would likely allow the changes to be more localized / smaller (e.g. maybe we could reuse MemTable::load to update the batches on each iteration)

datafusion/physical-plan/src/continuance.rs Show resolved Hide resolved
datafusion/sqllogictest/test_files/cte.slt Show resolved Hide resolved
// Downstream plans should not expect any partitioning.
let partition = 0;

self.recursive_stream = Some(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this calls execute again recursively, if we used a TableProvider the underlying TableProvider::execute would be called again too

@@ -112,6 +112,8 @@ pub enum LogicalPlan {
/// produces 0 or 1 row. This is used to implement SQL `SELECT`
/// that has no values in the `FROM` clause.
EmptyRelation(EmptyRelation),
/// A named temporary relation with a schema.
NamedRelation(NamedRelation),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I understand the need to have LogicalPlan::RecursiveQuery but I don't (yet) understand the need to have the NamedRelation

@alamb
Copy link
Contributor

alamb commented Jan 9, 2024

cc @Dandandan for your thoughts

@alamb
Copy link
Contributor

alamb commented Jan 10, 2024

I thought more about this PR last night and in the shower. First of all, I think the direction it is headed is great and would make a good addition to DataFusion. Thank you @matthewgapp both for the code as well as for your patience

In terms of implementation, I think there are a few items that we need prior to making this feature available for everyone

  1. More test coverage (I hilighted some areas)
  2. Memory limits (so a recursive CTE where the intermediate relation grows without limit will not cause the process to be OOM killed). This is to prevent a denial of service (DOS) attack vector against systems that use DataFusion by exploding memory

Some nice to have (but not strictly required) items:

  1. Simpler API (as @jonahgao and I have suggested)
  2. A config flag to enable/disable support (as a way to protect against DOS)

In order to avoid a massive PR, my suggestion is to implement this feature in stages:

  1. Add a config flag to enable/disable CTE support, default to disable
  2. Add parser / LogicalPlan support in one PR
  3. Add execution support in a second PR
  4. Add memory limiting in another PR
  5. Iterate / update tests
  6. When we are happy that everything is working well, change the switches default to enabled by default

@matthewgapp
Copy link
Contributor Author

I thought more about this PR last night and in the shower. First of all, I think the direction it is headed is great and would make a good addition to DataFusion. Thank you @matthewgapp both for the code as well as for your patience

In terms of implementation, I think there are a few items that we need prior to making this feature available for everyone

  1. More test coverage (I hilighted some areas)
  2. Memory limits (so a recursive CTE where the intermediate relation grows without limit will not cause the process to be OOM killed). This is to prevent a denial of service (DOS) attack vector against systems that use DataFusion by exploding memory

Some nice to have (but not strictly required) items:

  1. Simpler API (as @jonahgao and I have suggested)
  2. A config flag to enable/disable support (as a way to protect against DOS)

In order to avoid a massive PR, my suggestion is to implement this feature in stages:

  1. Add a config flag to enable/disable CTE support, default to disable
  2. Add parser / LogicalPlan support in one PR
  3. Add execution support in a second PR
  4. Add memory limiting in another PR
  5. Iterate / update tests
  6. When we are happy that everything is working well, change the switches default to enabled by default

Thanks @alamb! Think this impl plan makes sense. I'll start knocking out the PRs. Also going to have a go at reworking the existing PR to use MemTable in the two plans (as @jonahgao mentioned here #7581 (comment)). I'll link the resulting PRs to this one.

@alamb
Copy link
Contributor

alamb commented Jan 31, 2024

I think this PR has been superseded by other PRs on #462 (comment)

I think all that remains for CTE work is make sure it doesn't explode with memory and turn it on by default 🎉 (that is tracked by #462)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for recursive CTEs
3 participants