Skip to content

Commit

Permalink
[MINOR]:Do not introduce unnecessary repartition when row count is 1. (
Browse files Browse the repository at this point in the history
…#7832)

* Initial commit

* Fix failing tests

* More idiomatic expressions

* Update tests, use batch size during partition benefit check

* Fix failing tests

* is_exact when row count is 1

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak committed Oct 17, 2023
1 parent cb2d03c commit 9aacdee
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 214 deletions.
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1615,10 +1615,12 @@ mod tests {

#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
None,
Some(config_map),
)
.await?;
Ok(())
Expand All @@ -1637,21 +1639,25 @@ mod tests {

#[tokio::test]
async fn test_insert_into_append_new_csv_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
None,
Some(config_map),
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
None,
Some(config_map),
)
.await?;
Ok(())
Expand Down Expand Up @@ -1838,6 +1844,7 @@ mod tests {
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Expand Down
34 changes: 22 additions & 12 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ fn add_hash_on_top(
// until Repartition(Hash).
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
repartition_beneficial_stats: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if n_target == input.output_partitioning().partition_count() && n_target == 1 {
// In this case adding a hash repartition is unnecessary as the hash
Expand All @@ -1044,9 +1045,13 @@ fn add_hash_on_top(
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
let should_preserve_ordering = input.output_ordering().is_some();
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward, 0)?;
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
add_roundrobin_on_top(input, n_target, dist_onward, 0)?
} else {
input
};
new_plan = Arc::new(
RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, n_target))?
.with_preserve_order(should_preserve_ordering),
Expand Down Expand Up @@ -1223,6 +1228,7 @@ fn ensure_distribution(
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
let batch_size = config.execution.batch_size;
let is_unbounded = unbounded_output(&dist_context.plan);
// Use order preserving variants either of the conditions true
// - it is desired according to config
Expand All @@ -1233,13 +1239,7 @@ fn ensure_distribution(
if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
}
// Don't need to apply when the returned row count is not greater than 1:
let stats = dist_context.plan.statistics();
let mut repartition_beneficial_stat = true;
if stats.is_exact {
repartition_beneficial_stat =
stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
}

// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
mut plan,
Expand All @@ -1263,7 +1263,6 @@ fn ensure_distribution(
plan = updated_window;
}
};

let n_children = plan.children().len();
// This loop iterates over all the children to:
// - Increase parallelism for every child if it is beneficial.
Expand All @@ -1289,9 +1288,19 @@ fn ensure_distribution(
maintains,
child_idx,
)| {
// Don't need to apply when the returned row count is not greater than 1:
let stats = child.statistics();
let repartition_beneficial_stats = if stats.is_exact {
stats
.num_rows
.map(|num_rows| num_rows > batch_size)
.unwrap_or(true)
} else {
true
};
if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stat)
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.output_partitioning().partition_count() < target_partitions
{
Expand Down Expand Up @@ -1340,6 +1349,7 @@ fn ensure_distribution(
target_partitions,
dist_onward,
child_idx,
repartition_beneficial_stats,
)?;
}
Distribution::UnspecifiedDistribution => {}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,16 @@ ORDER BY 1, 2;
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
Expand Down
21 changes: 15 additions & 6 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,12 +1021,21 @@ impl ExecutionPlan for AggregateExec {
..Default::default()
}
}
_ => Statistics {
// the output row count is surely not larger than its input row count
num_rows: self.input.statistics().num_rows,
is_exact: false,
..Default::default()
},
_ => {
let input_stats = self.input.statistics();
// Input statistics is exact and number of rows not greater than 1:
let is_exact = input_stats.is_exact
&& (input_stats
.num_rows
.map(|num_rows| num_rows == 1)
.unwrap_or(false));
Statistics {
// the output row count is surely not larger than its input row count
num_rows: self.input.statistics().num_rows,
is_exact,
..Default::default()
}
}
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2342,8 +2342,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]


Expand Down Expand Up @@ -2397,8 +2397,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -2416,8 +2416,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -2435,8 +2435,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -2454,8 +2454,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TI
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_outp
--TableScan: source_table projection=[col1, col2]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
--MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--MemoryExec: partitions=1, partition_sizes=[1]

# Error case
query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension!
Expand Down
Loading

0 comments on commit 9aacdee

Please sign in to comment.