From ae9a446c41dfa515eb454561c160ad9aa26a7117 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 10:33:09 -0400 Subject: [PATCH] Minor: reduce use of cfg(parquet) in tests (#7930) --- .../enforce_distribution.rs | 56 +------------------ 1 file changed, 3 insertions(+), 53 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 072c3cb6d7a6..7b91dce32aa9 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1634,6 +1634,8 @@ impl TreeNode for PlanWithKeyRequirements { } } +/// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on +#[cfg(feature = "parquet")] #[cfg(test)] mod tests { use std::ops::Deref; @@ -1643,7 +1645,6 @@ mod tests { use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::FileScanConfig; - #[cfg(feature = "parquet")] use crate::datasource::physical_plan::ParquetExec; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::output_requirements::OutputRequirements; @@ -1783,12 +1784,10 @@ mod tests { ])) } - #[cfg(feature = "parquet")] fn parquet_exec() -> Arc { parquet_exec_with_sort(vec![]) } - #[cfg(feature = "parquet")] fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { @@ -1809,13 +1808,11 @@ mod tests { )) } - #[cfg(feature = "parquet")] fn parquet_exec_multiple() -> Arc { parquet_exec_multiple_sorted(vec![]) } // Created a sorted parquet exec with multiple files - #[cfg(feature = "parquet")] fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { @@ -2170,7 +2167,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_hash_joins() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2333,7 +2329,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_joins_after_alias() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); @@ -2413,7 +2408,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_joins_after_multi_alias() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); @@ -2469,7 +2463,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn join_after_agg_alias() -> Result<()> { // group by (a as a1) let left = aggregate_exec_with_alias( @@ -2509,7 +2502,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn hash_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) let left = aggregate_exec_with_alias( @@ -2562,7 +2554,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_hash_join_key_ordering() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2679,7 +2670,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn reorder_join_keys_to_left_input() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2810,7 +2800,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn reorder_join_keys_to_right_input() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2936,7 +2925,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn multi_smj_joins() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -3210,7 +3198,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn smj_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) let left = aggregate_exec_with_alias( @@ -3306,7 +3293,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn merge_does_not_need_sort() -> Result<()> { // see https://github.com/apache/arrow-datafusion/issues/4331 let schema = schema(); @@ -3347,7 +3333,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn union_to_interleave() -> Result<()> { // group by (a as a1) let left = aggregate_exec_with_alias( @@ -3389,7 +3374,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn added_repartition_to_single_partition() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias(parquet_exec(), alias); @@ -3408,7 +3392,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_deepest_node() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias(filter_exec(parquet_exec()), alias); @@ -3428,7 +3411,7 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] + fn repartition_unsorted_limit() -> Result<()> { let plan = limit_exec(filter_exec(parquet_exec())); @@ -3448,7 +3431,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_sorted_limit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3471,7 +3453,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_sorted_limit_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3497,7 +3478,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_limit() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias( @@ -3528,7 +3508,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_union() -> Result<()> { let plan = union_exec(vec![parquet_exec(); 5]); @@ -3548,7 +3527,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input let schema = schema(); @@ -3571,7 +3549,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_sort_preserving_merge() -> Result<()> { // sort preserving merge already sorted input, let schema = schema(); @@ -3603,7 +3580,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved) let schema = schema(); @@ -3636,7 +3612,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_does_not_destroy_sort() -> Result<()> { // SortRequired // Parquet(sorted) @@ -3662,7 +3637,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { // model a more complicated scenario where one child of a union can be repartitioned for performance // but the other can not be @@ -3701,7 +3675,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_transitively_with_projection() -> Result<()> { let schema = schema(); let proj_exprs = vec![( @@ -3744,7 +3717,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_ignores_transitively_with_projection() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3775,7 +3747,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_projection() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3805,7 +3776,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3880,7 +3850,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_single_partition() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone()); @@ -3969,7 +3938,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_two_partitions() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = @@ -3997,7 +3965,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_two_partitions_into_four() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = @@ -4025,7 +3992,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_sorted_limit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4058,7 +4024,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_limit_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4104,7 +4069,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_ignores_limit() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = aggregate_exec_with_alias( @@ -4155,7 +4119,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_union_inputs() -> Result<()> { let plan_parquet = union_exec(vec![parquet_exec(); 5]); let plan_csv = union_exec(vec![csv_exec(); 5]); @@ -4185,7 +4148,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4216,7 +4178,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_sort_preserving_merge_with_union() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4251,7 +4212,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_does_not_benefit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4280,7 +4240,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> { // sorted input let schema = schema(); @@ -4361,7 +4320,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn remove_redundant_roundrobins() -> Result<()> { let input = parquet_exec(); let repartition = repartition_exec(repartition_exec(input)); @@ -4412,7 +4370,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4445,7 +4402,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition2() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4484,7 +4440,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition3() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4507,7 +4462,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_put_sort_when_input_is_invalid() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4546,7 +4500,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn put_sort_when_input_is_valid() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4589,7 +4542,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn do_not_add_unnecessary_hash() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4645,7 +4597,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn optimize_away_unnecessary_repartition() -> Result<()> { let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec())); let expected = &[ @@ -4665,7 +4616,6 @@ mod tests { } #[test] - #[cfg(feature = "parquet")] fn optimize_away_unnecessary_repartition2() -> Result<()> { let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec( filter_exec(repartition_exec(parquet_exec())),