Skip to content

Commit

Permalink
chore(query): introduce EmptyResultScan plan (#16411)
Browse files Browse the repository at this point in the history
* chore(query): elimate false filter to be ConstantTableScan

* fix

* fix

* fix

* update

* update

* update

* update

* update
  • Loading branch information
sundy-li authored Sep 7, 2024
1 parent a954b63 commit fb6e250
Show file tree
Hide file tree
Showing 26 changed files with 219 additions and 577 deletions.
6 changes: 5 additions & 1 deletion src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,10 @@ fn constant_table_scan_to_format_tree(
plan: &ConstantTableScan,
metadata: &Metadata,
) -> Result<FormatTreeNode<String>> {
if plan.num_rows == 0 {
return Ok(FormatTreeNode::new(plan.name().to_string()));
}

let mut children = Vec::with_capacity(plan.values.len() + 1);
children.push(FormatTreeNode::new(format!(
"output columns: [{}]",
Expand All @@ -850,7 +854,7 @@ fn constant_table_scan_to_format_tree(
children.push(FormatTreeNode::new(format!("column {}: [{}]", i, column)));
}
Ok(FormatTreeNode::with_children(
"ConstantTableScan".to_string(),
plan.name().to_string(),
children,
))
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Display for ConstantTableScan {
})
.collect::<Vec<String>>();

write!(f, "ConstantTableScan: {}", columns.join(", "))
write!(f, "{}: {}", self.name(), columns.join(", "))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ impl ConstantTableScan {
pub fn output_schema(&self) -> Result<DataSchemaRef> {
Ok(self.output_schema.clone())
}

pub fn name(&self) -> &str {
if self.num_rows == 0 {
"EmptyResultScan"
} else {
"ConstantTableScan"
}
}
}

impl PhysicalPlanBuilder {
Expand Down
10 changes: 5 additions & 5 deletions src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::MetadataRef;
use crate::ScalarExpr;
use crate::Visibility;

pub fn bind_one_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRef)> {
pub fn bind_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRef)> {
let mut bind_context = BindContext::new();
let metadata = Arc::new(RwLock::new(Metadata::default()));
let table_index = metadata.write().add_table(
Expand Down Expand Up @@ -117,7 +117,7 @@ pub fn parse_exprs(
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<Vec<Expr>> {
let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = Settings::create(Tenant::new_literal("dummy"));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
Expand Down Expand Up @@ -378,7 +378,7 @@ pub fn parse_cluster_keys(
table_meta: Arc<dyn Table>,
cluster_key_str: &str,
) -> Result<Vec<Expr>> {
let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = Settings::create(Tenant::new_literal("dummy"));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
Expand Down Expand Up @@ -455,7 +455,7 @@ pub fn parse_hilbert_cluster_key(
table_meta: Arc<dyn Table>,
cluster_key_str: &str,
) -> Result<Vec<Expr>> {
let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = Settings::create(Tenant::new_literal("dummy"));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
Expand Down Expand Up @@ -609,7 +609,7 @@ pub fn analyze_cluster_keys(
}
}

let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = Settings::create(Tenant::new_literal("dummy"));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::try_create(
Expand Down
22 changes: 22 additions & 0 deletions src/query/sql/src/planner/format/display_rel_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::planner::format::display::TreeHumanizer;
use crate::plans::Aggregate;
use crate::plans::AggregateMode;
use crate::plans::AsyncFunction;
use crate::plans::ConstantTableScan;
use crate::plans::EvalScalar;
use crate::plans::Exchange;
use crate::plans::Filter;
Expand Down Expand Up @@ -151,6 +152,7 @@ pub(super) fn to_format_tree<I: IdHumanizer<ColumnId = IndexType, TableId = Inde
RelOperator::Sort(op) => sort_to_format_tree(id_humanizer, op),
RelOperator::Limit(op) => limit_to_format_tree(id_humanizer, op),
RelOperator::Exchange(op) => exchange_to_format_tree(id_humanizer, op),
RelOperator::ConstantTableScan(op) => constant_scan_to_format_tree(id_humanizer, op),
_ => FormatTreeNode::with_children(format!("{:?}", op), vec![]),
}
}
Expand Down Expand Up @@ -389,6 +391,26 @@ fn sort_to_format_tree<I: IdHumanizer<ColumnId = IndexType, TableId = IndexType>
])
}

fn constant_scan_to_format_tree<I: IdHumanizer<ColumnId = IndexType, TableId = IndexType>>(
id_humanizer: &I,
plan: &ConstantTableScan,
) -> FormatTreeNode {
if plan.num_rows == 0 {
return FormatTreeNode::new(plan.name().to_string());
}

FormatTreeNode::with_children(plan.name().to_string(), vec![
FormatTreeNode::new(format!(
"columns: [{}]",
plan.columns
.iter()
.map(|col| id_humanizer.humanize_column_id(*col))
.join(", ")
)),
FormatTreeNode::new(format!("num_rows: [{}]", plan.num_rows)),
])
}

fn limit_to_format_tree<I: IdHumanizer<ColumnId = IndexType, TableId = IndexType>>(
_id_humanizer: &I,
op: &Limit,
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/optimizer/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn display_rel_op(rel_op: &RelOperator) -> String {
RelOperator::Window(_) => "WindowFunc".to_string(),
RelOperator::CteScan(_) => "CteScan".to_string(),
RelOperator::MaterializedCte(_) => "MaterializedCte".to_string(),
RelOperator::ConstantTableScan(_) => "ConstantTableScan".to_string(),
RelOperator::ConstantTableScan(s) => s.name().to_string(),
RelOperator::ExpressionScan(_) => "ExpressionScan".to_string(),
RelOperator::CacheScan(_) => "CacheScan".to_string(),
RelOperator::Udf(_) => "Udf".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/optimizer/rule/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl RuleFactory {
RuleID::PushDownLimitAggregate => Ok(Box::new(RulePushDownLimitAggregate::new())),
RuleID::PushDownFilterAggregate => Ok(Box::new(RulePushDownFilterAggregate::new())),
RuleID::PushDownFilterWindow => Ok(Box::new(RulePushDownFilterWindow::new())),
RuleID::EliminateFilter => Ok(Box::new(RuleEliminateFilter::new())),
RuleID::EliminateFilter => Ok(Box::new(RuleEliminateFilter::new(metadata))),
RuleID::MergeEvalScalar => Ok(Box::new(RuleMergeEvalScalar::new())),
RuleID::MergeFilter => Ok(Box::new(RuleMergeFilter::new())),
RuleID::NormalizeScalarFilter => Ok(Box::new(RuleNormalizeScalarFilter::new())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,34 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::Scalar;
use itertools::Itertools;

use crate::optimizer::extract::Matcher;
use crate::optimizer::rule::Rule;
use crate::optimizer::rule::RuleID;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::plans::ConstantExpr;
use crate::plans::ConstantTableScan;
use crate::plans::Filter;
use crate::plans::Operator;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::plans::ScalarExpr;
use crate::MetadataRef;

pub struct RuleEliminateFilter {
id: RuleID,
matchers: Vec<Matcher>,
metadata: MetadataRef,
}

impl RuleEliminateFilter {
pub fn new() -> Self {
pub fn new(metadata: MetadataRef) -> Self {
Self {
id: RuleID::EliminateFilter,
// Filter
Expand All @@ -42,6 +52,7 @@ impl RuleEliminateFilter {
op_type: RelOp::Filter,
children: vec![Matcher::Leaf],
}],
metadata,
}
}
}
Expand All @@ -55,11 +66,45 @@ impl Rule for RuleEliminateFilter {
let eval_scalar: Filter = s_expr.plan().clone().try_into()?;
// First, de-duplication predicates.
let origin_predicates = eval_scalar.predicates.clone();
let predicates = eval_scalar
.predicates
let predicates = origin_predicates
.clone()
.into_iter()
.unique()
.collect::<Vec<ScalarExpr>>();

// Rewrite false filter to be empty scan
if predicates.iter().any(|predicate| {
matches!(
predicate,
ScalarExpr::ConstantExpr(ConstantExpr {
value: Scalar::Boolean(false),
..
}) | ScalarExpr::ConstantExpr(ConstantExpr {
value: Scalar::Null,
..
})
)
}) {
let output_columns = eval_scalar
.derive_relational_prop(&RelExpr::with_s_expr(s_expr))?
.output_columns
.clone();
let metadata = self.metadata.read();
let mut fields = Vec::with_capacity(output_columns.len());

for col in output_columns.iter().sorted() {
fields.push(DataField::new(
&col.to_string(),
metadata.column(*col).data_type(),
));
}
let empty_scan =
ConstantTableScan::new_empty_scan(DataSchemaRefExt::create(fields), output_columns);
let result = SExpr::create_leaf(Arc::new(RelOperator::ConstantTableScan(empty_scan)));
state.add_result(result);
return Ok(());
}

// Delete identically equal predicate
// After constant fold is ready, we can delete the following code
let predicates = predicates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::Column;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRefExt;
use itertools::Itertools;

use crate::optimizer::extract::Matcher;
use crate::optimizer::rule::Rule;
Expand Down Expand Up @@ -67,18 +67,14 @@ impl Rule for RulePushDownLimit {
.clone();
let metadata = self.metadata.read();
let mut fields = Vec::with_capacity(output_columns.len());
for col in output_columns.iter() {
for col in output_columns.iter().sorted() {
fields.push(DataField::new(
&col.to_string(),
metadata.column(*col).data_type(),
));
}
let empty_scan = ConstantTableScan {
values: vec![Column::Null { len: 0 }; output_columns.len()],
num_rows: 0,
schema: DataSchemaRefExt::create(fields),
columns: output_columns,
};
let empty_scan =
ConstantTableScan::new_empty_scan(DataSchemaRefExt::create(fields), output_columns);
let result = SExpr::create_leaf(Arc::new(RelOperator::ConstantTableScan(empty_scan)));
state.add_result(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use databend_common_expression::types::NumberScalar;
use databend_common_expression::types::F64;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
use log::info;

use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
Expand Down Expand Up @@ -91,9 +90,6 @@ impl CollectStatisticsOptimizer {
if let Some(col_id) = *leaf_index {
let col_stat = column_statistics_provider
.column_statistics(col_id as ColumnId);
if col_stat.is_none() {
info!("column {} doesn't have global statistics", col_id);
}
column_stats.insert(*column_index, col_stat.cloned());
let histogram =
column_statistics_provider.histogram(col_id as ColumnId);
Expand Down
27 changes: 27 additions & 0 deletions src/query/sql/src/planner/plans/constant_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_exception::Result;
use databend_common_expression::types::NumberType;
use databend_common_expression::types::ValueType;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataSchemaRef;
use databend_common_functions::aggregates::eval_aggr;
use databend_common_storage::Datum;
Expand Down Expand Up @@ -50,6 +51,24 @@ pub struct ConstantTableScan {
}

impl ConstantTableScan {
pub fn new_empty_scan(schema: DataSchemaRef, columns: ColumnSet) -> Self {
let values = schema
.fields
.iter()
.map(|f| {
let builder = ColumnBuilder::with_capacity(f.data_type(), 0);
builder.build()
})
.collect::<Vec<_>>();

Self {
values,
num_rows: 0,
schema,
columns,
}
}

pub fn prune_columns(&self, columns: ColumnSet) -> Self {
let mut projection = columns
.iter()
Expand All @@ -74,6 +93,14 @@ impl ConstantTableScan {
pub fn used_columns(&self) -> Result<ColumnSet> {
Ok(self.columns.clone())
}

pub fn name(&self) -> &str {
if self.num_rows == 0 {
"EmptyResultScan"
} else {
"ConstantTableScan"
}
}
}

impl PartialEq for ConstantTableScan {
Expand Down
3 changes: 1 addition & 2 deletions src/query/sql/src/planner/plans/dummy_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::optimizer::StatInfo;
use crate::optimizer::Statistics;
use crate::plans::Operator;
use crate::plans::RelOp;
use crate::DUMMY_COLUMN_INDEX;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DummyTableScan;
Expand All @@ -50,7 +49,7 @@ impl Operator for DummyTableScan {

fn derive_relational_prop(&self, _rel_expr: &RelExpr) -> Result<Arc<RelationalProperty>> {
Ok(Arc::new(RelationalProperty {
output_columns: ColumnSet::from([DUMMY_COLUMN_INDEX]),
output_columns: ColumnSet::new(),
outer_columns: ColumnSet::new(),
used_columns: ColumnSet::new(),
orderings: vec![],
Expand Down
Loading

0 comments on commit fb6e250

Please sign in to comment.