Skip to content

Commit

Permalink
Enable clone_on_ref_ptr clippy lint on physical-plan crate (apache#…
Browse files Browse the repository at this point in the history
…11241)

* Enable clone_on_ref_ptr clippy lint on physical-plan crate

* fmt

* Update for clippy

* Update new clippy

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
lewiszlw and alamb authored Jul 8, 2024
1 parent e4b54f6 commit 37428bb
Show file tree
Hide file tree
Showing 47 changed files with 633 additions and 471 deletions.
94 changes: 48 additions & 46 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl PhysicalGroupBy {
pub fn input_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.expr
.iter()
.map(|(expr, _alias)| expr.clone())
.map(|(expr, _alias)| Arc::clone(expr))
.collect()
}

Expand Down Expand Up @@ -283,9 +283,9 @@ impl AggregateExec {
group_by: self.group_by.clone(),
filter_expr: self.filter_expr.clone(),
limit: self.limit,
input: self.input.clone(),
schema: self.schema.clone(),
input_schema: self.input_schema.clone(),
input: Arc::clone(&self.input),
schema: Arc::clone(&self.schema),
input_schema: Arc::clone(&self.input_schema),
}
}

Expand Down Expand Up @@ -355,7 +355,7 @@ impl AggregateExec {
let mut new_requirement = indices
.iter()
.map(|&idx| PhysicalSortRequirement {
expr: groupby_exprs[idx].clone(),
expr: Arc::clone(&groupby_exprs[idx]),
options: None,
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -399,7 +399,7 @@ impl AggregateExec {

let cache = Self::compute_properties(
&input,
schema.clone(),
Arc::clone(&schema),
&projection_mapping,
&mode,
&input_order_mode,
Expand Down Expand Up @@ -458,7 +458,7 @@ impl AggregateExec {

/// Get the input schema before any aggregates are applied
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
Arc::clone(&self.input_schema)
}

/// number of rows soft limit of the AggregateExec
Expand Down Expand Up @@ -706,9 +706,9 @@ impl ExecutionPlan for AggregateExec {
self.group_by.clone(),
self.aggr_expr.clone(),
self.filter_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
self.schema.clone(),
Arc::clone(&children[0]),
Arc::clone(&self.input_schema),
Arc::clone(&self.schema),
)?;
me.limit = self.limit;

Expand Down Expand Up @@ -1005,7 +1005,7 @@ fn aggregate_expressions(
// way order sensitive aggregators can satisfy requirement
// themselves.
if let Some(ordering_req) = agg.order_bys() {
result.extend(ordering_req.iter().map(|item| item.expr.clone()));
result.extend(ordering_req.iter().map(|item| Arc::clone(&item.expr)));
}
result
})
Expand Down Expand Up @@ -1165,9 +1165,9 @@ pub(crate) fn evaluate_group_by(
.enumerate()
.map(|(idx, is_null)| {
if *is_null {
null_exprs[idx].clone()
Arc::clone(&null_exprs[idx])
} else {
exprs[idx].clone()
Arc::clone(&exprs[idx])
}
})
.collect()
Expand Down Expand Up @@ -1235,10 +1235,10 @@ mod tests {

// define data.
(
schema.clone(),
Arc::clone(&schema),
vec![
RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Expand Down Expand Up @@ -1270,26 +1270,26 @@ mod tests {
// the expected result by accident, but merging actually works properly;
// i.e. it doesn't depend on the data insertion order.
(
schema.clone(),
Arc::clone(&schema),
vec![
RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
Arc::new(Float64Array::from(vec![0.0, 1.0, 2.0, 3.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
Arc::new(Float64Array::from(vec![3.0, 4.0, 5.0, 6.0])),
Expand Down Expand Up @@ -1369,11 +1369,11 @@ mod tests {
aggregates.clone(),
vec![None],
input,
input_schema.clone(),
Arc::clone(&input_schema),
)?);

let result =
common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?;
common::collect(partial_aggregate.execute(0, Arc::clone(&task_ctx))?).await?;

let expected = if spill {
vec![
Expand Down Expand Up @@ -1451,7 +1451,7 @@ mod tests {
)?);

let result =
common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
common::collect(merged_aggregate.execute(0, Arc::clone(&task_ctx))?).await?;
let batch = concat_batches(&result[0].schema(), &result)?;
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), 12);
Expand Down Expand Up @@ -1519,11 +1519,11 @@ mod tests {
aggregates.clone(),
vec![None],
input,
input_schema.clone(),
Arc::clone(&input_schema),
)?);

let result =
common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?;
common::collect(partial_aggregate.execute(0, Arc::clone(&task_ctx))?).await?;

let expected = if spill {
vec![
Expand Down Expand Up @@ -1573,7 +1573,7 @@ mod tests {
// enlarge memory limit to let the final aggregation finish
new_spill_ctx(2, 2600)
} else {
task_ctx.clone()
Arc::clone(&task_ctx)
};
let result = common::collect(merged_aggregate.execute(0, task_ctx)?).await?;
let batch = concat_batches(&result[0].schema(), &result)?;
Expand Down Expand Up @@ -1856,11 +1856,11 @@ mod tests {
groups,
aggregates,
vec![None; n_aggr],
input.clone(),
input_schema.clone(),
Arc::clone(&input),
Arc::clone(&input_schema),
)?);

let stream = partial_aggregate.execute_typed(0, task_ctx.clone())?;
let stream = partial_aggregate.execute_typed(0, Arc::clone(&task_ctx))?;

// ensure that we really got the version we wanted
match version {
Expand Down Expand Up @@ -2112,7 +2112,7 @@ mod tests {
vec![partition3],
vec![partition4],
],
schema.clone(),
Arc::clone(&schema),
None,
)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
Expand All @@ -2121,7 +2121,7 @@ mod tests {
aggregates.clone(),
vec![None],
memory_exec,
schema.clone(),
Arc::clone(&schema),
)?);
let coalesce = if use_coalesce_batches {
let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec));
Expand Down Expand Up @@ -2186,49 +2186,49 @@ mod tests {
let order_by_exprs = vec![
None,
Some(vec![PhysicalSortExpr {
expr: col_a.clone(),
expr: Arc::clone(col_a),
options: options1,
}]),
Some(vec![
PhysicalSortExpr {
expr: col_a.clone(),
expr: Arc::clone(col_a),
options: options1,
},
PhysicalSortExpr {
expr: col_b.clone(),
expr: Arc::clone(col_b),
options: options1,
},
PhysicalSortExpr {
expr: col_c.clone(),
expr: Arc::clone(col_c),
options: options1,
},
]),
Some(vec![
PhysicalSortExpr {
expr: col_a.clone(),
expr: Arc::clone(col_a),
options: options1,
},
PhysicalSortExpr {
expr: col_b.clone(),
expr: Arc::clone(col_b),
options: options1,
},
]),
];
let common_requirement = vec![
PhysicalSortExpr {
expr: col_a.clone(),
expr: Arc::clone(col_a),
options: options1,
},
PhysicalSortExpr {
expr: col_c.clone(),
expr: Arc::clone(col_c),
options: options1,
},
];
let mut aggr_exprs = order_by_exprs
.into_iter()
.map(|order_by_expr| {
Arc::new(OrderSensitiveArrayAgg::new(
col_a.clone(),
Arc::clone(col_a),
"array_agg",
DataType::Int32,
false,
Expand Down Expand Up @@ -2273,12 +2273,11 @@ mod tests {
groups,
aggregates.clone(),
vec![None, None],
blocking_exec.clone(),
Arc::clone(&blocking_exec) as Arc<dyn ExecutionPlan>,
schema,
)?);
let new_agg = aggregate_exec
.clone()
.with_new_children(vec![blocking_exec])?;
let new_agg =
Arc::clone(&aggregate_exec).with_new_children(vec![blocking_exec])?;
assert_eq!(new_agg.schema(), aggregate_exec.schema());
Ok(())
}
Expand Down Expand Up @@ -2340,12 +2339,15 @@ mod tests {
let b = Arc::new(Float32Array::from(vec![0.; 8192]));
let c = Arc::new(Int32Array::from(vec![1; 8192]));

RecordBatch::try_new(schema.clone(), vec![a, b, c]).unwrap()
RecordBatch::try_new(Arc::clone(&schema), vec![a, b, c]).unwrap()
})
.collect();

let input =
Arc::new(MemoryExec::try_new(&[input_batches], schema.clone(), None)?);
let input = Arc::new(MemoryExec::try_new(
&[input_batches],
Arc::clone(&schema),
None,
)?);

let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ impl AggregateStream {
let result =
finalize_aggregation(&mut this.accumulators, &this.mode)
.and_then(|columns| {
RecordBatch::try_new(this.schema.clone(), columns)
.map_err(Into::into)
RecordBatch::try_new(
Arc::clone(&this.schema),
columns,
)
.map_err(Into::into)
})
.record_output(&this.baseline_metrics);

Expand Down Expand Up @@ -181,7 +184,7 @@ impl Stream for AggregateStream {

impl RecordBatchStream for AggregateStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/aggregates/order/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

/// Tracks grouping state when the data is ordered by some subset of
/// the group keys.
Expand Down Expand Up @@ -138,7 +139,7 @@ impl GroupOrderingPartial {
let sort_values: Vec<_> = self
.order_indices
.iter()
.map(|&idx| group_values[idx].clone())
.map(|&idx| Arc::clone(&group_values[idx]))
.collect();

Ok(self.row_converter.convert_columns(&sort_values)?)
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl GroupedHashAggregateStream {
let spill_state = SpillState {
spills: vec![],
spill_expr,
spill_schema: agg_schema.clone(),
spill_schema: Arc::clone(&agg_schema),
is_stream_merging: false,
merging_aggregate_arguments,
merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()),
Expand Down Expand Up @@ -401,7 +401,7 @@ pub(crate) fn create_group_accumulator(
"Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}",
agg_expr.name()
);
let agg_expr_captured = agg_expr.clone();
let agg_expr_captured = Arc::clone(agg_expr);
let factory = move || agg_expr_captured.create_accumulator();
Ok(Box::new(GroupsAccumulatorAdapter::new(factory)))
}
Expand Down Expand Up @@ -515,7 +515,7 @@ impl Stream for GroupedHashAggregateStream {

impl RecordBatchStream for GroupedHashAggregateStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
}

Expand Down Expand Up @@ -625,7 +625,7 @@ impl GroupedHashAggregateStream {
/// accumulator states/values specified in emit_to
fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result<RecordBatch> {
let schema = if spilling {
self.spill_state.spill_schema.clone()
Arc::clone(&self.spill_state.spill_schema)
} else {
self.schema()
};
Expand Down Expand Up @@ -746,13 +746,13 @@ impl GroupedHashAggregateStream {
let expr = self.spill_state.spill_expr.clone();
let schema = batch.schema();
streams.push(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
Arc::clone(&schema),
futures::stream::once(futures::future::lazy(move |_| {
sort_batch(&batch, &expr, None)
})),
)));
for spill in self.spill_state.spills.drain(..) {
let stream = read_spill_as_stream(spill, schema.clone(), 2)?;
let stream = read_spill_as_stream(spill, Arc::clone(&schema), 2)?;
streams.push(stream);
}
self.spill_state.is_stream_merging = true;
Expand Down
Loading

0 comments on commit 37428bb

Please sign in to comment.