Skip to content

Commit

Permalink
implement rewrite for EliminateLimit (apache#10253)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinmingtarja authored Apr 26, 2024
1 parent 44f1147 commit a5ce568
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
//! [`EliminateLimit`] eliminates `LIMIT` when possible
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Result};
use datafusion_expr::logical_plan::{tree_node::unwrap_arc, EmptyRelation, LogicalPlan};

/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is
/// greater than or equal to current's fetch
Expand All @@ -41,32 +42,10 @@ impl EliminateLimit {
impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})));
}
}
None => {
if limit.skip == 0 {
let input = limit.input.as_ref();
// input also can be Limit, so we should apply again.
return Ok(Some(
self.try_optimize(input, _config)?
.unwrap_or_else(|| input.clone()),
));
}
}
}
}
Ok(None)
internal_err!("Should have called EliminateLimit::rewrite")
}

fn name(&self) -> &str {
Expand All @@ -76,6 +55,39 @@ impl OptimizerRule for EliminateLimit {
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<
datafusion_common::tree_node::Transformed<LogicalPlan>,
datafusion_common::DataFusionError,
> {
match plan {
LogicalPlan::Limit(limit) => {
if let Some(fetch) = limit.fetch {
if fetch == 0 {
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
},
)));
}
} else if limit.skip == 0 {
// input also can be Limit, so we should apply again.
return Ok(self.rewrite(unwrap_arc(limit.input), _config).unwrap());
}
Ok(Transformed::no(LogicalPlan::Limit(limit)))
}
_ => Ok(Transformed::no(plan)),
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit a5ce568

Please sign in to comment.