Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Substrait: Add producer and consumer for physical plans #5173

Open
4 of 12 tasks
andygrove opened this issue Feb 4, 2023 · 8 comments
Open
4 of 12 tasks

[EPIC] Substrait: Add producer and consumer for physical plans #5173

andygrove opened this issue Feb 4, 2023 · 8 comments
Labels
enhancement New feature or request substrait

Comments

@andygrove
Copy link
Member

andygrove commented Feb 4, 2023

@andygrove andygrove added enhancement New feature or request substrait labels Feb 4, 2023
@andygrove
Copy link
Member Author

@waynexia @nseekhao fyi

@waynexia
Copy link
Member

waynexia commented Mar 2, 2023

What is the expected behavior for converting "LogicalPlan -> Substrait -> PhysicalPlan" or "PhysicalPlan -> Substrait -> LogicalPlan"? Or is it allowed?

@alamb alamb changed the title Substrait: Add producer and consumer for physical plans [EPIC] Substrait: Add producer and consumer for physical plans Feb 26, 2024
@alamb
Copy link
Contributor

alamb commented Feb 26, 2024

I renamed this ticket to be an epic and started collecting tasks needed for better support

@niebayes
Copy link
Contributor

@andygrove @alamb Could you recommend the best path for implementing these tasks? Since we’re building a distributed query engine based on DataFusion, which requires splitting a physical plan into pipelines, we’re willing to contribute to enhancing the current Substrait functionality in DataFusion.

@alamb
Copy link
Contributor

alamb commented Feb 19, 2025

Hi @niebayes -- I recommend coordinating with @vbarua and @Blizzara and @wackywendell , others who I think use substrait with physical plans

I think we maybe already have physical consumer/producers, see: https://docs.rs/datafusion-substrait/45.0.0/datafusion_substrait/physical_plan/index.html

The first task migh tbe to go through the existing tickets and see which ones are still relevant

@niebayes
Copy link
Contributor

@alamb Thanks for your advice. I would first pick a few small tickets to be more familiar with the codebase.

@Blizzara
Copy link
Contributor

Blizzara commented Feb 20, 2025

There indeed exists some kind of producer and consumer for physical plans, but quickly checking they seem very limited. My interest is only in logical plans currently, and I think the same applies for Victor at least from what I've seen (but I may be wrong there).

I don't know much about physical plans overall so dunno if it could reuse parts of the logical plans work, but at least the logical plan consumer/producer can be used as inspiration :)

@niebayes
Copy link
Contributor

@Blizzara Thanks for your reply. I initially choose the physical plan because there're more computation can be distributed to executors in a distributed query engine.

Say a sql:

select avg(value) from sx1 group by sid having sid > 1;

The corresponding logical plan might be:

Projection: avg(sx1.value)                                                           |
  Aggregate: groupBy=[[sx1.sid]], aggr=[[avg(CAST(sx1.value AS Float64))]]           |
    Filter: sx1.sid > Int8(1)                                                        |
      TableScan: sx1 projection=[sid, value], partial_filters=[sx1.sid > Int8(1)]  

And the physical plan might look like:

ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)]                            |
  AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)]    |
    CoalesceBatchesExec: target_batch_size=8192                                      |
      RepartitionExec: partitioning=Hash([sid@0], 8), input_partitions=8             |
        AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)]       |
          CoalesceBatchesExec: target_batch_size=8192                                |
            FilterExec: sid@0 > 1                                                    |
              ParquetExec: file_groups = [..]  

By learning from the datafusion-ballista project, I know we can split the execution plan at pipeline breakers (including RepartitionExec, SortPreservingExec, CoalescePartitionsExec, etc.). So the above execution plan would be split into two parts (aka. pipelines):

ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)]                            |
  AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)]    |
    CoalesceBatchesExec: target_batch_size=8192   
       MergePipelinesExec: pipeline_ids = [...]
          AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)]       |
            CoalesceBatchesExec: target_batch_size=8192                                |
              FilterExec: sid@0 > 1                                                    |
                ParquetExec: file_groups = [..]

As you can see, the first stage of the parallel aggregation algorithm can be distributed to multiple executors which makes the resource utilization better.
If we choose to split the logical plan, it seems we can't distribute the aggregation operation, even part of it. I don't know if my understanding is right.

By the way, datafusion-ballista is good for OLAP workloads and it assumes executors are stateless. However, in my scenario, executors are stateful and each executor maintain an in-memory buffer containing the most recently written data (History data are stored in shared object storage). So, when the scheduler is about to construct a physical plan, it has to query each executor for the latest statistics which is required for query optimization. I wonder if it's the standard approach to achieve distributed query based on DataFusion, since the implementation seems complicated.

I really hope the DataFusion community can provide some recommendations on building a distributed query engine based on DataFusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request substrait
Projects
None yet
Development

No branches or pull requests

5 participants