Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion should scan Parquet statistics once per query #871

Closed
andygrove opened this issue Aug 13, 2021 · 0 comments · Fixed by #3649
Closed

DataFusion should scan Parquet statistics once per query #871

andygrove opened this issue Aug 13, 2021 · 0 comments · Fixed by #3649
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@andygrove
Copy link
Member

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

When running the benchmarks with DataFusion I noticed that we scan statistics for all tables early on (even tables not referenced in the query). This happens in ExecutionContext::register_table. We then scan statistics again later on for the tables that are actually used in the query.

../target/release/tpch benchmark datafusion   --path /mnt/bigdata/tpch-sf1000-parquet/   --format parquet   --iterations 1   --debug   --concurrency 24   --query 3
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 3, debug: true, iterations: 1, concurrency: 24, batch_size: 8192, path: "/mnt/bigdata/tpch-sf1000-parquet/", file_format: "parquet", mem_table: false, partitions: 8 }
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//part)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//supplier)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//partsupp)
Scanned 48 Parquet files for statistics in 1 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
Scanned 48 Parquet files for statistics in 4 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
Scanned 48 Parquet files for statistics in 30 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//nation)
Scanned 1 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//region)
Scanned 1 Parquet files for statistics in 0 seconds
=== Logical plan ===
Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
  Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
    Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
      Filter: #customer.c_mktsegment Eq Utf8("BUILDING") And #orders.o_orderdate Lt CAST(Utf8("1995-03-15") AS Date32) And #lineitem.l_shipdate Gt CAST(Utf8("1995-03-15") AS Date32)
        Join: #orders.o_orderkey = #lineitem.l_orderkey
          Join: #customer.c_custkey = #orders.o_custkey
            TableScan: customer projection=None
            TableScan: orders projection=None
          TableScan: lineitem projection=None

=== Optimized logical plan ===
Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
  Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
    Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
      Join: #orders.o_orderkey = #lineitem.l_orderkey
        Join: #customer.c_custkey = #orders.o_custkey
          Filter: #customer.c_mktsegment Eq Utf8("BUILDING")
            TableScan: customer projection=Some([0, 6]), filters=[#customer.c_mktsegment Eq Utf8("BUILDING")]
          Filter: #orders.o_orderdate Lt Date32("9204")
            TableScan: orders projection=Some([0, 1, 4, 7]), filters=[#orders.o_orderdate Lt Date32("9204")]
        Filter: #lineitem.l_shipdate Gt Date32("9204")
          TableScan: lineitem projection=Some([0, 5, 6, 10]), filters=[#lineitem.l_shipdate Gt Date32("9204")]

ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
Scanned 48 Parquet files for statistics in 4 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
Scanned 48 Parquet files for statistics in 30 seconds
=== Physical plan ===
SortExec: [revenue@1 DESC,o_orderdate@2 ASC]
  CoalescePartitionsExec
    ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
      HashAggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
        CoalesceBatchesExec: target_batch_size=4096
          RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 24)
            HashAggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
              CoalesceBatchesExec: target_batch_size=4096
                HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
                  CoalesceBatchesExec: target_batch_size=4096
                    RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 24)
                      CoalesceBatchesExec: target_batch_size=4096
                        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
                          CoalesceBatchesExec: target_batch_size=4096
                            RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 24)
                              CoalesceBatchesExec: target_batch_size=4096
                                FilterExec: c_mktsegment@1 = BUILDING
                                  ParquetExec: batch_size=8192, limit=None, partitions=[...]
                          CoalesceBatchesExec: target_batch_size=4096
                            RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 24)
                              CoalesceBatchesExec: target_batch_size=4096
                                FilterExec: o_orderdate@2 < 9204
                                  ParquetExec: batch_size=8192, limit=None, partitions=[...]
                  CoalesceBatchesExec: target_batch_size=4096
                    RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 24)
                      CoalesceBatchesExec: target_batch_size=4096
                        FilterExec: l_shipdate@3 > 9204
                          ParquetExec: batch_size=8192, limit=None, partitions=[...]

Describe the solution you'd like

  • We should only scan statistics for tables that are used in the query
  • We should only scan statistics once

Describe alternatives you've considered
N/A

Additional context
N/A

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant