Skip to content

Commit

Permalink
Merge commit '67cf1d606b901eaf591f44389702e285557d39e1' into update_a…
Browse files Browse the repository at this point in the history
…ugust_wk_3
  • Loading branch information
itsjunetime committed Aug 27, 2024
2 parents b37c5d1 + 67cf1d6 commit ca93ea3
Show file tree
Hide file tree
Showing 58 changed files with 1,699 additions and 1,098 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ readme = "README.md"

[dependencies]
arrow = { version = "52.2.0" }
async-trait = "0.1.41"
async-trait = "0.1.73"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ version.workspace = true

[dependencies]
arrow-schema = { workspace = true }
async-trait = "0.1.41"
async-trait = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

mod column;
mod dfschema;
mod error;
mod functional_dependencies;
mod join_type;
mod param_value;
Expand All @@ -33,6 +32,7 @@ pub mod alias;
pub mod cast;
pub mod config;
pub mod display;
pub mod error;
pub mod file_options;
pub mod format;
pub mod hash_utils;
Expand Down
Binary file removed datafusion/core/example.parquet
Binary file not shown.
26 changes: 12 additions & 14 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3046,13 +3046,12 @@ mod tests {
assert_eq!(
"\
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
format!("{}", df_with_column.clone().into_optimized_plan()?)
);

Expand Down Expand Up @@ -3240,13 +3239,12 @@ mod tests {

assert_eq!("\
Projection: t1.c1 AS AAA, t1.c2, t1.c3, t2.c1, t2.c2, t2.c3\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{}", df_renamed.clone().into_optimized_plan()?)
);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ impl ExecutionPlan for ArrowExec {
Ok(self.projected_statistics.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ExecutionPlan for AvroExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ impl ExecutionPlan for CsvExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ impl ExecutionPlan for NdJsonExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,10 @@ impl ExecutionPlan for ParquetExec {
Ok(self.projected_statistics.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::limit::LocalLimitExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::ExecutionPlanProperties;
Expand Down Expand Up @@ -405,7 +405,16 @@ fn analyze_immediate_sort_removal(
node.children = node.children.swap_remove(0).children;
if let Some(fetch) = sort_exec.fetch() {
// If the sort has a fetch, we need to add a limit:
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch))
if sort_exec
.properties()
.output_partitioning()
.partition_count()
== 1
{
Arc::new(GlobalLimitExec::new(sort_input.clone(), 0, Some(fetch)))
} else {
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch))
}
} else {
sort_input.clone()
}
Expand Down Expand Up @@ -1124,7 +1133,7 @@ mod tests {
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"LocalLimitExec: fetch=2",
"GlobalLimitExec: skip=0, fetch=2",
" SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2177,9 +2177,6 @@ mod tests {
assert!(format!("{plan:?}").contains("GlobalLimitExec"));
assert!(format!("{plan:?}").contains("skip: 3, fetch: Some(5)"));

// LocalLimitExec adjusts the `fetch`
assert!(format!("{plan:?}").contains("LocalLimitExec"));
assert!(format!("{plan:?}").contains("fetch: 8"));
Ok(())
}

Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,15 @@ async fn sort_preserving_merge() {
// SortPreservingMergeExec (not a Sort which would compete
// with the SortPreservingMergeExec for memory)
&[
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Limit: skip=0, fetch=10 |",
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | LocalLimitExec: fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
]
)
.run()
Expand Down
80 changes: 72 additions & 8 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_optimizer::limit_pushdown::LimitPushdown;
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::config::ConfigOptions;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -31,8 +32,10 @@ use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{get_plan_string, ExecutionPlan};
use datafusion_physical_plan::{get_plan_string, ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;

struct DummyStreamPartition {
Expand Down Expand Up @@ -201,6 +204,52 @@ fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batc
Ok(())
}

#[test]
fn pushes_global_limit_into_multiple_fetch_plans() -> datafusion_common::Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(schema.clone()).unwrap();
let coalesce_batches = coalesce_batches_exec(streaming_table);
let projection = projection_exec(schema.clone(), coalesce_batches)?;
let repartition = repartition_exec(projection)?;
let sort = sort_exec(
vec![PhysicalSortExpr {
expr: col("c1", &schema)?,
options: SortOptions::default(),
}],
repartition,
);
let spm = sort_preserving_merge_exec(sort.output_ordering().unwrap().to_vec(), sort);
let global_limit = global_limit_exec(spm, 0, Some(5));

let initial = get_plan_string(&global_limit);
let expected_initial = [
"GlobalLimitExec: skip=0, fetch=5",
" SortPreservingMergeExec: [c1@0 ASC]",
" SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" CoalesceBatchesExec: target_batch_size=8192",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];

assert_eq!(initial, expected_initial);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;

let expected = [
"SortPreservingMergeExec: [c1@0 ASC], fetch=5",
" SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" CoalesceBatchesExec: target_batch_size=8192",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Ok(())
}

#[test]
fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
) -> datafusion_common::Result<()> {
Expand All @@ -227,10 +276,9 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
let expected = [
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" LocalLimitExec: fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand All @@ -256,7 +304,7 @@ fn merges_local_limit_with_local_limit() -> datafusion_common::Result<()> {
let after_optimize =
LimitPushdown::new().optimize(parent_local_limit, &ConfigOptions::new())?;

let expected = ["LocalLimitExec: fetch=10", " EmptyExec"];
let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"];
assert_eq!(get_plan_string(&after_optimize), expected);

Ok(())
Expand Down Expand Up @@ -375,6 +423,22 @@ fn local_limit_exec(
Arc::new(LocalLimitExec::new(input, fetch))
}

fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortExec::new(sort_exprs, input))
}

fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}

fn projection_exec(
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
Expand Down
28 changes: 11 additions & 17 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ async fn explain_analyze_baseline_metrics() {
assert_metrics!(
&formatted,
"GlobalLimitExec: skip=0, fetch=3, ",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
&formatted,
"LocalLimitExec: fetch=3",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
Expand Down Expand Up @@ -612,18 +607,17 @@ async fn test_physical_plan_display_indent() {
let dataframe = ctx.sql(sql).await.unwrap();
let physical_plan = dataframe.create_physical_plan().await.unwrap();
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
" SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]",
" ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
"SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]",
" ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
];

let normalizer = ExplainNormalizer::new();
Expand Down
Loading

0 comments on commit ca93ea3

Please sign in to comment.