diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 064624366938..6ef214c97fd5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1615,10 +1615,12 @@ mod tests { #[tokio::test] async fn test_insert_into_append_new_json_files() -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert("datafusion.execution.batch_size".into(), "1".into()); helper_test_append_new_files_to_table( FileType::JSON, FileCompressionType::UNCOMPRESSED, - None, + Some(config_map), ) .await?; Ok(()) @@ -1637,10 +1639,12 @@ mod tests { #[tokio::test] async fn test_insert_into_append_new_csv_files() -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert("datafusion.execution.batch_size".into(), "1".into()); helper_test_append_new_files_to_table( FileType::CSV, FileCompressionType::UNCOMPRESSED, - None, + Some(config_map), ) .await?; Ok(()) @@ -1648,10 +1652,12 @@ mod tests { #[tokio::test] async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert("datafusion.execution.batch_size".into(), "1".into()); helper_test_append_new_files_to_table( FileType::PARQUET, FileCompressionType::UNCOMPRESSED, - None, + Some(config_map), ) .await?; Ok(()) @@ -1838,6 +1844,7 @@ mod tests { "datafusion.execution.parquet.write_batch_size".into(), "5".into(), ); + config_map.insert("datafusion.execution.batch_size".into(), "1".into()); helper_test_append_new_files_to_table( FileType::PARQUET, FileCompressionType::UNCOMPRESSED, diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 9be566f10a72..52525d1fc44d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1021,6 +1021,7 @@ fn add_hash_on_top( // until Repartition(Hash). dist_onward: &mut Option, input_idx: usize, + repartition_beneficial_stats: bool, ) -> Result> { if n_target == input.output_partitioning().partition_count() && n_target == 1 { // In this case adding a hash repartition is unnecessary as the hash @@ -1044,9 +1045,13 @@ fn add_hash_on_top( // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.bounded_order_preserving_variants`). let should_preserve_ordering = input.output_ordering().is_some(); - // Since hashing benefits from partitioning, add a round-robin repartition - // before it: - let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward, 0)?; + let mut new_plan = if repartition_beneficial_stats { + // Since hashing benefits from partitioning, add a round-robin repartition + // before it: + add_roundrobin_on_top(input, n_target, dist_onward, 0)? + } else { + input + }; new_plan = Arc::new( RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, n_target))? .with_preserve_order(should_preserve_ordering), @@ -1223,6 +1228,7 @@ fn ensure_distribution( let enable_round_robin = config.optimizer.enable_round_robin_repartition; let repartition_file_scans = config.optimizer.repartition_file_scans; let repartition_file_min_size = config.optimizer.repartition_file_min_size; + let batch_size = config.execution.batch_size; let is_unbounded = unbounded_output(&dist_context.plan); // Use order preserving variants either of the conditions true // - it is desired according to config @@ -1233,13 +1239,7 @@ fn ensure_distribution( if dist_context.plan.children().is_empty() { return Ok(Transformed::No(dist_context)); } - // Don't need to apply when the returned row count is not greater than 1: - let stats = dist_context.plan.statistics(); - let mut repartition_beneficial_stat = true; - if stats.is_exact { - repartition_beneficial_stat = - stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); - } + // Remove unnecessary repartition from the physical plan if any let DistributionContext { mut plan, @@ -1263,7 +1263,6 @@ fn ensure_distribution( plan = updated_window; } }; - let n_children = plan.children().len(); // This loop iterates over all the children to: // - Increase parallelism for every child if it is beneficial. @@ -1289,9 +1288,19 @@ fn ensure_distribution( maintains, child_idx, )| { + // Don't need to apply when the returned row count is not greater than 1: + let stats = child.statistics(); + let repartition_beneficial_stats = if stats.is_exact { + stats + .num_rows + .map(|num_rows| num_rows > batch_size) + .unwrap_or(true) + } else { + true + }; if enable_round_robin // Operator benefits from partitioning (e.g. filter): - && (would_benefit && repartition_beneficial_stat) + && (would_benefit && repartition_beneficial_stats) // Unless partitioning doesn't increase the partition count, it is not beneficial: && child.output_partitioning().partition_count() < target_partitions { @@ -1340,6 +1349,7 @@ fn ensure_distribution( target_partitions, dist_onward, child_idx, + repartition_beneficial_stats, )?; } Distribution::UnspecifiedDistribution => {} diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index 0142675bbd49..6e3f6319e119 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -209,16 +209,16 @@ ORDER BY 1, 2; " AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2", - " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]", " ProjectionExec: expr=[column1@0 as t]", " ValuesExec", " ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]", " AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2", - " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]", " ProjectionExec: expr=[column1@0 as t]", " ValuesExec", ]; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c33f49049de1..3c9dabf60590 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1021,12 +1021,21 @@ impl ExecutionPlan for AggregateExec { ..Default::default() } } - _ => Statistics { - // the output row count is surely not larger than its input row count - num_rows: self.input.statistics().num_rows, - is_exact: false, - ..Default::default() - }, + _ => { + let input_stats = self.input.statistics(); + // Input statistics is exact and number of rows not greater than 1: + let is_exact = input_stats.is_exact + && (input_stats + .num_rows + .map(|num_rows| num_rows == 1) + .unwrap_or(false)); + Statistics { + // the output row count is surely not larger than its input row count + num_rows: self.input.statistics().num_rows, + is_exact, + ..Default::default() + } + } } } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index c6f9f42fa2d8..777b634e93b1 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2342,8 +2342,8 @@ GlobalLimitExec: skip=0, fetch=4 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ---------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ----------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2397,8 +2397,8 @@ GlobalLimitExec: skip=0, fetch=4 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] ---------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] ----------------MemoryExec: partitions=1, partition_sizes=[1] query TT @@ -2416,8 +2416,8 @@ GlobalLimitExec: skip=0, fetch=4 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] ---------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] ----------------MemoryExec: partitions=1, partition_sizes=[1] query TT @@ -2435,8 +2435,8 @@ GlobalLimitExec: skip=0, fetch=4 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ---------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ----------------MemoryExec: partitions=1, partition_sizes=[1] query TT @@ -2454,8 +2454,8 @@ GlobalLimitExec: skip=0, fetch=4 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ---------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ----------------MemoryExec: partitions=1, partition_sizes=[1] query TI diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index a41d1fca66a4..f2fe216ee864 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -33,7 +33,7 @@ CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_outp --TableScan: source_table projection=[col1, col2] physical_plan InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) ---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--MemoryExec: partitions=1, partition_sizes=[1] # Error case query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension! diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index b7070a8d7efb..bf93c6633bfc 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2024,13 +2024,11 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ----------------CoalesceBatchesExec: target_batch_size=8192 ------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] --------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4 -------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] --------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4 -------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------MemoryExec: partitions=1, partition_sizes=[3] +----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[3] # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by # a,b,c column. Column a has cardinality 2, column b has cardinality 4. @@ -2762,9 +2760,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------------SortExec: expr=[ts@1 ASC NULLS LAST] -----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +----------------SortExec: expr=[ts@1 ASC NULLS LAST] ------------------MemoryExec: partitions=1, partition_sizes=[1] query TRR @@ -2799,9 +2797,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] ---------------SortExec: expr=[ts@1 ASC NULLS LAST] -----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +----------------SortExec: expr=[ts@1 ASC NULLS LAST] ------------------MemoryExec: partitions=1, partition_sizes=[1] query TRR @@ -2815,6 +2813,11 @@ FRA 50 50 GRC 30 30 TUR 75 75 +# make sure that batch size is small. So that query below runs in multi partitions +# row number of the sales_global is 5. Hence we choose batch size 4 to make is smaller. +statement ok +set datafusion.execution.batch_size = 4; + # order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in # multi-partitions without group by also. query TT @@ -2958,7 +2961,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1] ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] --------------SortExec: expr=[amount@1 ASC NULLS LAST] @@ -2994,7 +2997,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------------SortExec: expr=[amount@1 DESC] @@ -3195,7 +3198,7 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST] --SortExec: expr=[sn@0 ASC NULLS LAST] ----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS Int64) as Int64(2) * s.sn] ------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] --------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] @@ -3248,7 +3251,7 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST] --SortExec: expr=[sn@0 ASC NULLS LAST] ----ProjectionExec: expr=[sn@0 as sn, SUM(l.amount)@2 as SUM(l.amount), amount@1 as amount] ------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[SUM(l.amount)] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], aggr=[SUM(l.amount)] --------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 as amount] @@ -3396,7 +3399,7 @@ SortPreservingMergeExec: [sn@2 ASC NULLS LAST] --SortExec: expr=[sn@2 ASC NULLS LAST] ----ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount] ------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, zip_code@1 as zip_code, country@2 as country, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount], aggr=[] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([sn@0, zip_code@1, country@2, ts@3, currency@4, amount@5, sum_amount@6], 8), input_partitions=8 ------------AggregateExec: mode=Partial, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount], aggr=[] --------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 @@ -3566,6 +3569,12 @@ ORDER BY y; 2 1 3 1 +# Make sure to choose a batch size smaller than, row number of the table. +# In this case we choose 2 (Row number of the table is 3). +# otherwise we won't see parallelism in tests. +statement ok +set datafusion.execution.batch_size = 2; + # plan of the query above should contain partial # and final aggregation stages query TT @@ -3579,7 +3588,8 @@ physical_plan AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)] --CoalescePartitionsExec ----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)] -------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------MemoryExec: partitions=1, partition_sizes=[1] query I SELECT FIRST_VALUE(x) @@ -3600,7 +3610,8 @@ physical_plan AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)] --CoalescePartitionsExec ----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)] -------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------MemoryExec: partitions=1, partition_sizes=[1] query TT @@ -3644,7 +3655,7 @@ Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_wi physical_plan AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered --SortExec: expr=[c@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=8192 +----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 --------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 @@ -3685,7 +3696,7 @@ Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_wi physical_plan AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered --SortExec: expr=[c@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=8192 +----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 --------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 283ff57a984c..874d849e9a29 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -558,6 +558,10 @@ explain select * from t1 join t2 on false; logical_plan EmptyRelation physical_plan EmptyExec: produce_one_row=false +# Make batch size smaller than table row number. to introduce parallelism to the plan. +statement ok +set datafusion.execution.batch_size = 1; + # test covert inner join to cross join when condition is true query TT explain select * from t1 inner join t2 on true; @@ -568,9 +572,9 @@ CrossJoin: --TableScan: t2 projection=[t2_id, t2_name, t2_int] physical_plan CrossJoinExec ---CoalescePartitionsExec -----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--MemoryExec: partitions=1, partition_sizes=[1] +--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----MemoryExec: partitions=1, partition_sizes=[1] statement ok drop table IF EXISTS t1; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 01c0131fdb62..cc90e6431389 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -24,7 +24,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; statement ok set datafusion.explain.logical_plan_only = true; @@ -185,6 +185,10 @@ FROM statement ok set datafusion.execution.target_partitions = 2; +# make sure to a batch size smaller than row number of the table. +statement ok +set datafusion.execution.batch_size = 2; + ########## ## Joins Tests ########## @@ -1311,13 +1315,13 @@ Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[]] physical_plan AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[] --ProjectionExec: expr=[t1_id@0 as t1_id] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -1339,13 +1343,13 @@ physical_plan ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)] --AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[COUNT(*)] ----ProjectionExec: expr=[t1_id@0 as t1_id] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)] -----------CoalesceBatchesExec: target_batch_size=4096 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------------MemoryExec: partitions=1, partition_sizes=[1] -----------CoalesceBatchesExec: target_batch_size=4096 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------------MemoryExec: partitions=1, partition_sizes=[1] @@ -1372,13 +1376,13 @@ ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT join_t1.t1_id)] --------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] ----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[] ------------ProjectionExec: expr=[t1_id@0 as t1_id] ---------------CoalesceBatchesExec: target_batch_size=4096 +--------------CoalesceBatchesExec: target_batch_size=2 ----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)] -------------------CoalesceBatchesExec: target_batch_size=4096 +------------------CoalesceBatchesExec: target_batch_size=2 --------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------MemoryExec: partitions=1, partition_sizes=[1] -------------------CoalesceBatchesExec: target_batch_size=4096 +------------------CoalesceBatchesExec: target_batch_size=2 --------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------MemoryExec: partitions=1, partition_sizes=[1] @@ -1441,7 +1445,7 @@ Projection: join_t1.t1_id, join_t1.t1_name, join_t1.t1_int, join_t2.t2_id, join_ physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@4 as t2_id, t2_name@5 as t2_name, t2_int@6 as t2_int] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)] --------CoalescePartitionsExec ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] @@ -1468,14 +1472,14 @@ Projection: join_t1.t1_id, join_t1.t1_name, join_t1.t1_int, join_t2.t2_id, join_ physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@4 as t2_id, t2_name@5 as t2_name, t2_int@6 as t2_int] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([join_t1.t1_id + Int64(11)@3], 2), input_partitions=2 ------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([CAST(join_t2.t2_id AS Int64)@3], 2), input_partitions=2 ------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)] --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -1501,7 +1505,7 @@ physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id] ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)] ----------CoalescePartitionsExec ------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] @@ -1529,14 +1533,14 @@ physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id] ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)] -----------CoalesceBatchesExec: target_batch_size=4096 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2 --------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] ----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------MemoryExec: partitions=1, partition_sizes=[1] -----------CoalesceBatchesExec: target_batch_size=4096 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2 --------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] ----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -1562,7 +1566,7 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)] --------CoalescePartitionsExec ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] @@ -1589,14 +1593,14 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 2), input_partitions=2 ------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -1622,7 +1626,7 @@ physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id] ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)] ----------CoalescePartitionsExec ------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] @@ -1650,14 +1654,14 @@ physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id] ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)] -----------CoalesceBatchesExec: target_batch_size=4096 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2 --------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] ----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------MemoryExec: partitions=1, partition_sizes=[1] -----------CoalesceBatchesExec: target_batch_size=4096 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------------MemoryExec: partitions=1, partition_sizes=[1] @@ -1680,7 +1684,7 @@ Inner Join: join_t1.t1_id = join_t2.t2_id - UInt32(11) --TableScan: join_t2 projection=[t2_id, t2_name, t2_int] physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int] ---CoalesceBatchesExec: target_batch_size=4096 +--CoalesceBatchesExec: target_batch_size=2 ----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@3)] ------MemoryExec: partitions=1, partition_sizes=[1] ------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] @@ -1703,13 +1707,13 @@ Inner Join: join_t1.t1_id = join_t2.t2_id - UInt32(11) --TableScan: join_t2 projection=[t2_id, t2_name, t2_int] physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int] ---CoalesceBatchesExec: target_batch_size=4096 +--CoalesceBatchesExec: target_batch_size=2 ----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@3)] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------MemoryExec: partitions=1, partition_sizes=[1] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@3], 2), input_partitions=2 ----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -2032,13 +2036,13 @@ Inner Join: Filter: join_t1.t1_id > join_t2.t2_id ------TableScan: join_t2 projection=[t2_id, t2_int] physical_plan NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1 ---CoalesceBatchesExec: target_batch_size=4096 +--CoalesceBatchesExec: target_batch_size=2 ----FilterExec: t1_id@0 > 10 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 --------MemoryExec: partitions=1, partition_sizes=[1] --CoalescePartitionsExec ----ProjectionExec: expr=[t2_id@0 as t2_id] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------FilterExec: t2_int@1 > 1 ----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2073,11 +2077,11 @@ Right Join: Filter: join_t1.t1_id < join_t2.t2_id physical_plan NestedLoopJoinExec: join_type=Right, filter=t1_id@0 < t2_id@1 --CoalescePartitionsExec -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------FilterExec: t1_id@0 > 22 --------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ----------MemoryExec: partitions=1, partition_sizes=[1] ---CoalesceBatchesExec: target_batch_size=4096 +--CoalesceBatchesExec: target_batch_size=2 ----FilterExec: t2_id@0 > 11 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 --------MemoryExec: partitions=1, partition_sizes=[1] @@ -2643,7 +2647,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; # explain sort_merge_join_on_date32 inner sort merge join on data type (Date32) query TT @@ -2658,12 +2662,12 @@ Inner Join: t1.c1 = t2.c1 physical_plan SortMergeJoin: join_type=Inner, on=[(c1@0, c1@0)] --SortExec: expr=[c1@0 ASC] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------MemoryExec: partitions=1, partition_sizes=[1] --SortExec: expr=[c1@0 ASC] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------MemoryExec: partitions=1, partition_sizes=[1] @@ -2689,13 +2693,13 @@ physical_plan ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4] --SortMergeJoin: join_type=Right, on=[(CAST(t1.c3 AS Decimal128(10, 2))@4, c3@2)] ----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([CAST(t1.c3 AS Decimal128(10, 2))@4], 2), input_partitions=2 ----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))] ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ----SortExec: expr=[c3@2 ASC] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([c3@2], 2), input_partitions=2 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2722,7 +2726,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; @@ -2743,7 +2747,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; query TT explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id @@ -2751,13 +2755,13 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id I physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2792,13 +2796,13 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOI physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2827,7 +2831,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; #Test the left_semi_join scenarios where the current repartition_joins parameter is set to false . #### @@ -2846,7 +2850,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; query TT explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id @@ -2854,7 +2858,7 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id I physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] --------MemoryExec: partitions=1, partition_sizes=[1] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -2890,7 +2894,7 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOI physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] --------MemoryExec: partitions=1, partition_sizes=[1] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -2920,7 +2924,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; #Test the right_semi_join scenarios where the current repartition_joins parameter is set to true . @@ -2940,7 +2944,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; query TT explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id @@ -2948,13 +2952,13 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHER physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2970,13 +2974,13 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGH physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3002,7 +3006,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; #Test the right_semi_join scenarios where the current repartition_joins parameter is set to false . @@ -3022,7 +3026,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; query TT explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id @@ -3030,7 +3034,7 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHER physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0 --------MemoryExec: partitions=1, partition_sizes=[1] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -3047,7 +3051,7 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGH physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] -----CoalesceBatchesExec: target_batch_size=4096 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1 --------MemoryExec: partitions=1, partition_sizes=[1] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -3074,7 +3078,7 @@ statement ok set datafusion.execution.target_partitions = 2; statement ok -set datafusion.execution.batch_size = 4096; +set datafusion.execution.batch_size = 2; #### @@ -3126,14 +3130,14 @@ physical_plan SortPreservingMergeExec: [rn1@5 ASC NULLS LAST] --SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] ----SortExec: expr=[rn1@5 ASC NULLS LAST] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] --------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted] ----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true ----SortExec: expr=[a@1 ASC] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true @@ -3162,12 +3166,12 @@ physical_plan SortPreservingMergeExec: [rn1@10 ASC NULLS LAST] --SortMergeJoin: join_type=Right, on=[(a@1, a@1)] ----SortExec: expr=[a@1 ASC] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true ----SortExec: expr=[rn1@5 ASC NULLS LAST] -------CoalesceBatchesExec: target_batch_size=4096 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] @@ -3203,14 +3207,14 @@ SortPreservingMergeExec: [a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 A --SortExec: expr=[a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 ASC NULLS LAST] ----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] ------SortExec: expr=[a@1 ASC] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] ----------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted] ------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true ------SortExec: expr=[a@1 ASC] ---------CoalesceBatchesExec: target_batch_size=4096 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] @@ -3245,7 +3249,7 @@ Sort: r_table.rn1 ASC NULLS LAST --------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ----------TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan -CoalesceBatchesExec: target_batch_size=4096 +CoalesceBatchesExec: target_batch_size=2 --HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@1, a@1)] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true ----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] @@ -3272,7 +3276,7 @@ Sort: r_table.rn1 ASC NULLS LAST --------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ----------TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan -CoalesceBatchesExec: target_batch_size=4096 +CoalesceBatchesExec: target_batch_size=2 --HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(a@0, a@1)] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC], has_header=true ----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] diff --git a/datafusion/sqllogictest/test_files/options.slt b/datafusion/sqllogictest/test_files/options.slt index 5fbb2102f4bf..83fe85745ef8 100644 --- a/datafusion/sqllogictest/test_files/options.slt +++ b/datafusion/sqllogictest/test_files/options.slt @@ -33,7 +33,7 @@ Filter: a.c0 < Int32(1) physical_plan CoalesceBatchesExec: target_batch_size=8192 --FilterExec: c0@0 < 1 -----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +----MemoryExec: partitions=1, partition_sizes=[1] ## # test_disable_coalesce @@ -51,7 +51,7 @@ Filter: a.c0 < Int32(1) --TableScan: a projection=[c0] physical_plan FilterExec: c0@0 < 1 ---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--MemoryExec: partitions=1, partition_sizes=[1] statement ok set datafusion.execution.coalesce_batches = true @@ -74,7 +74,7 @@ Filter: a.c0 < Int32(1) physical_plan CoalesceBatchesExec: target_batch_size=1234 --FilterExec: c0@0 < 1 -----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +----MemoryExec: partitions=1, partition_sizes=[1] statement ok diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index b09910735809..1d427479763a 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -485,8 +485,7 @@ Projection: select_between_data.c1 >= Int64(2) AND select_between_data.c1 <= Int --TableScan: select_between_data projection=[c1] physical_plan ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as select_between_data.c1 BETWEEN Int64(2) AND Int64(3)] ---RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----MemoryExec: partitions=1, partition_sizes=[1] +--MemoryExec: partitions=1, partition_sizes=[1] # TODO: query_get_indexed_field diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 2eccb60aad3e..7cbb848f3333 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -15,6 +15,10 @@ # specific language governing permissions and limitations # under the License. +# make sure to a batch size smaller than row number of the table. +statement ok +set datafusion.execution.batch_size = 2; + ############# ## Subquery Tests ############# @@ -178,15 +182,15 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum] --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id] -----CoalesceBatchesExec: target_batch_size=8192 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] --------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] ----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] -------------CoalesceBatchesExec: target_batch_size=8192 +------------CoalesceBatchesExec: target_batch_size=2 --------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 ----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] ------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 ------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] @@ -213,15 +217,15 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2 physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@1 as t2_sum] --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id] -----CoalesceBatchesExec: target_batch_size=8192 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] --------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id] ----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))] -------------CoalesceBatchesExec: target_batch_size=8192 +------------CoalesceBatchesExec: target_batch_size=2 --------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 ----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))] ------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 ------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] @@ -247,16 +251,16 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum ----------TableScan: t2 projection=[t2_id, t2_int] physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum] ---CoalesceBatchesExec: target_batch_size=8192 +--CoalesceBatchesExec: target_batch_size=2 ----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)] -------CoalesceBatchesExec: target_batch_size=8192 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 ----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -------CoalesceBatchesExec: target_batch_size=8192 +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([t2_id@1], 4), input_partitions=4 ----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as t2_id] ------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id, Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)] ---------------CoalesceBatchesExec: target_batch_size=8192 +--------------CoalesceBatchesExec: target_batch_size=2 ----------------RepartitionExec: partitioning=Hash([t2_id@0, Utf8("a")@1], 4), input_partitions=4 ------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as Utf8("a")], aggr=[SUM(t2.t2_int)] --------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] @@ -285,17 +289,17 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum] --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id] -----CoalesceBatchesExec: target_batch_size=8192 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] --------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] -----------CoalesceBatchesExec: target_batch_size=8192 +----------CoalesceBatchesExec: target_batch_size=2 ------------FilterExec: SUM(t2.t2_int)@1 < 3 --------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] -----------------CoalesceBatchesExec: target_batch_size=8192 +----------------CoalesceBatchesExec: target_batch_size=2 ------------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 --------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] ----------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 ------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] diff --git a/datafusion/sqllogictest/test_files/tpch/q15.slt.part b/datafusion/sqllogictest/test_files/tpch/q15.slt.part index 4515b8ae1fb4..a872e96acf04 100644 --- a/datafusion/sqllogictest/test_files/tpch/q15.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q15.slt.part @@ -95,20 +95,19 @@ SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST] ----------------------------------FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 ------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate], has_header=false ----------CoalesceBatchesExec: target_batch_size=8192 -------------RepartitionExec: partitioning=Hash([MAX(revenue0.total_revenue)@0], 4), input_partitions=4 ---------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------------AggregateExec: mode=Final, gby=[], aggr=[MAX(revenue0.total_revenue)] -------------------CoalescePartitionsExec ---------------------AggregateExec: mode=Partial, gby=[], aggr=[MAX(revenue0.total_revenue)] -----------------------ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] -------------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] ---------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), input_partitions=4 -------------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] ---------------------------------ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] -----------------------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------------------FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 ---------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate], has_header=false +------------RepartitionExec: partitioning=Hash([MAX(revenue0.total_revenue)@0], 4), input_partitions=1 +--------------AggregateExec: mode=Final, gby=[], aggr=[MAX(revenue0.total_revenue)] +----------------CoalescePartitionsExec +------------------AggregateExec: mode=Partial, gby=[], aggr=[MAX(revenue0.total_revenue)] +--------------------ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] +----------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), input_partitions=4 +----------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +------------------------------ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] +--------------------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------------------FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 +------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate], has_header=false query ITTTR with revenue0 (supplier_no, total_revenue) as ( diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index cbb1896efb13..688774c906fe 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -200,13 +200,14 @@ Aggregate: groupBy=[[t1.name]], aggr=[[]] physical_plan AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] --CoalesceBatchesExec: target_batch_size=8192 -----RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=12 -------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] ---------UnionExec -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -----------ProjectionExec: expr=[name@0 || _new as name] -------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +----RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +--------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] +----------UnionExec +------------MemoryExec: partitions=1, partition_sizes=[1] +------------MemoryExec: partitions=1, partition_sizes=[1] +------------ProjectionExec: expr=[name@0 || _new as name] +--------------MemoryExec: partitions=1, partition_sizes=[1] # nested_union_all query T rowsort @@ -234,11 +235,14 @@ Union ----TableScan: t2 projection=[name] physical_plan UnionExec ---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--MemoryExec: partitions=1, partition_sizes=[1] +--MemoryExec: partitions=1, partition_sizes=[1] --ProjectionExec: expr=[name@0 || _new as name] -----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +----MemoryExec: partitions=1, partition_sizes=[1] +# Make sure to choose a small batch size to introduce parallelism to the plan. +statement ok +set datafusion.execution.batch_size = 2; # union_with_type_coercion query TT @@ -269,32 +273,36 @@ Union physical_plan UnionExec --ProjectionExec: expr=[id@0 as id, name@1 as name] -----CoalesceBatchesExec: target_batch_size=8192 +----CoalesceBatchesExec: target_batch_size=2 ------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(id@0, CAST(t2.id AS Int32)@2), (name@1, name@1)] --------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] -----------CoalesceBatchesExec: target_batch_size=8192 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 --------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] -----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---------CoalesceBatchesExec: target_batch_size=8192 +----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------MemoryExec: partitions=1, partition_sizes=[1] +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, name@1], 4), input_partitions=4 ------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] ---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] --ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name] ----ProjectionExec: expr=[id@0 as id, name@1 as name] -------CoalesceBatchesExec: target_batch_size=8192 +------CoalesceBatchesExec: target_batch_size=2 --------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)] -----------CoalesceBatchesExec: target_batch_size=8192 +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, name@1], 4), input_partitions=4 --------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] ----------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] -------------------CoalesceBatchesExec: target_batch_size=8192 +------------------CoalesceBatchesExec: target_batch_size=2 --------------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 ----------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] -------------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -----------CoalesceBatchesExec: target_batch_size=8192 +------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------------------MemoryExec: partitions=1, partition_sizes=[1] +----------CoalesceBatchesExec: target_batch_size=2 ------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 ---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] query IT rowsort ( @@ -339,26 +347,30 @@ Union ----TableScan: t1 projection=[name] physical_plan InterleaveExec ---CoalesceBatchesExec: target_batch_size=8192 +--CoalesceBatchesExec: target_batch_size=2 ----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)] ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] ---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -------CoalesceBatchesExec: target_batch_size=8192 +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] ---CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------MemoryExec: partitions=1, partition_sizes=[1] +--CoalesceBatchesExec: target_batch_size=2 ----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)] ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] ---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -------CoalesceBatchesExec: target_batch_size=8192 +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] +------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------MemoryExec: partitions=1, partition_sizes=[1] # union_upcast_types query TT @@ -416,15 +428,17 @@ ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)] --AggregateExec: mode=SinglePartitioned, gby=[name@0 as name], aggr=[COUNT(*)] ----InterleaveExec ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] ---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[] ---------CoalesceBatchesExec: target_batch_size=8192 +--------CoalesceBatchesExec: target_batch_size=2 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[] ---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] ######## @@ -530,11 +544,10 @@ physical_plan UnionExec --ProjectionExec: expr=[Int64(1)@0 as a] ----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[] -------CoalesceBatchesExec: target_batch_size=8192 ---------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=4 +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1 ----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[] -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------EmptyExec: produce_one_row=true +------------EmptyExec: produce_one_row=true --ProjectionExec: expr=[2 as a] ----EmptyExec: produce_one_row=true --ProjectionExec: expr=[3 as a] @@ -562,16 +575,12 @@ physical_plan UnionExec --ProjectionExec: expr=[COUNT(*)@1 as count, n@0 as n] ----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)] -------CoalesceBatchesExec: target_batch_size=8192 ---------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=4 +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1 ----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)] -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------ProjectionExec: expr=[5 as n] -----------------EmptyExec: produce_one_row=true +------------ProjectionExec: expr=[5 as n] +--------------EmptyExec: produce_one_row=true --ProjectionExec: expr=[x@0 as count, y@1 as n] ----ProjectionExec: expr=[1 as x, MAX(Int64(10))@0 as y] -------AggregateExec: mode=Final, gby=[], aggr=[MAX(Int64(10))] ---------CoalescePartitionsExec -----------AggregateExec: mode=Partial, gby=[], aggr=[MAX(Int64(10))] -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------EmptyExec: produce_one_row=true +------AggregateExec: mode=Single, gby=[], aggr=[MAX(Int64(10))] +--------EmptyExec: produce_one_row=true