Skip to content

Commit

Permalink
chore: enable inlist to join in cluster (#15108)
Browse files Browse the repository at this point in the history
* chore: enable inlist to join in cluster

* update

* fix
  • Loading branch information
xudong963 authored Apr 1, 2024
1 parent e402065 commit d8f0758
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 32 deletions.
7 changes: 7 additions & 0 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_sql::executor::physical_plans::CompactSource;
use databend_common_sql::executor::physical_plans::ConstantTableScan;
use databend_common_sql::executor::physical_plans::CopyIntoTable;
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
use databend_common_sql::executor::physical_plans::DeleteSource;
Expand Down Expand Up @@ -153,6 +154,12 @@ impl PhysicalPlanReplacer for Fragmenter {
Ok(PhysicalPlan::TableScan(plan.clone()))
}

fn replace_constant_table_scan(&mut self, plan: &ConstantTableScan) -> Result<PhysicalPlan> {
self.state = State::SelectLeaf;

Ok(PhysicalPlan::ConstantTableScan(plan.clone()))
}

fn replace_merge_into(&mut self, plan: &MergeInto) -> Result<PhysicalPlan> {
let input = self.replace(&plan.input)?;
if !plan.change_join_order {
Expand Down
136 changes: 122 additions & 14 deletions src/query/service/src/schedulers/fragments/plan_fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::Partitions;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::Value;
use databend_common_settings::ReplaceIntoShuffleStrategy;
use databend_common_sql::executor::physical_plans::CompactSource;
use databend_common_sql::executor::physical_plans::ConstantTableScan;
use databend_common_sql::executor::physical_plans::CopyIntoTable;
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
use databend_common_sql::executor::physical_plans::DeleteSource;
Expand Down Expand Up @@ -163,19 +168,57 @@ impl PlanFragment {

let executors = Fragmenter::get_executors(ctx);

let mut executor_partitions: HashMap<String, HashMap<u32, DataSourcePlan>> = HashMap::new();
let mut executor_partitions: HashMap<String, HashMap<u32, DataSource>> = HashMap::new();

for (plan_id, data_source) in data_sources.iter() {
// Redistribute partitions of ReadDataSourcePlan.
let partitions = &data_source.parts;
let partition_reshuffle = partitions.reshuffle(executors.clone())?;
for (executor, parts) in partition_reshuffle {
let mut source = data_source.clone();
source.parts = parts;
executor_partitions
.entry(executor)
.or_default()
.insert(*plan_id, source);
match data_source {
DataSource::Table(data_source_plan) => {
// Redistribute partitions of ReadDataSourcePlan.
let partitions = &data_source_plan.parts;
let partition_reshuffle = partitions.reshuffle(executors.clone())?;
for (executor, parts) in partition_reshuffle {
let mut source = data_source_plan.clone();
source.parts = parts;
executor_partitions
.entry(executor)
.or_default()
.insert(*plan_id, DataSource::Table(source));
}
}
DataSource::ConstTable(values) => {
let num_executors = executors.len();
let entries = values
.columns
.iter()
.map(|col| BlockEntry::new(col.data_type(), Value::Column(col.clone())))
.collect::<Vec<BlockEntry>>();
let block = DataBlock::new(entries, values.num_rows);
// Scatter the block
let mut indices = Vec::with_capacity(values.num_rows);
for i in 0..values.num_rows {
indices.push((i % num_executors) as u32);
}
let blocks = block.scatter(&indices, num_executors)?;
for (executor, block) in executors.iter().zip(blocks) {
let columns = block
.columns()
.iter()
.map(|entry| {
entry
.value
.convert_to_full_column(&entry.data_type, block.num_rows())
})
.collect::<Vec<Column>>();
let source = DataSource::ConstTable(ConstTableColumn {
columns,
num_rows: block.num_rows(),
});
executor_partitions
.entry(executor.clone())
.or_default()
.insert(*plan_id, source);
}
}
}
}

Expand Down Expand Up @@ -417,7 +460,7 @@ impl PlanFragment {
Ok(executor_part)
}

fn collect_data_sources(&self) -> Result<HashMap<u32, DataSourcePlan>> {
fn collect_data_sources(&self) -> Result<HashMap<u32, DataSource>> {
if self.fragment_type != FragmentType::Source {
return Err(ErrorCode::Internal(
"Cannot get read source from a non-source fragment".to_string(),
Expand All @@ -428,7 +471,15 @@ impl PlanFragment {

let mut collect_data_source = |plan: &PhysicalPlan| {
if let PhysicalPlan::TableScan(scan) = plan {
data_sources.insert(scan.plan_id, *scan.source.clone());
data_sources.insert(scan.plan_id, DataSource::Table(*scan.source.clone()));
} else if let PhysicalPlan::ConstantTableScan(scan) = plan {
data_sources.insert(
scan.plan_id,
DataSource::ConstTable(ConstTableColumn {
columns: scan.values.clone(),
num_rows: scan.num_rows,
}),
);
}
};

Expand All @@ -443,8 +494,45 @@ impl PlanFragment {
}
}

struct ConstTableColumn {
columns: Vec<Column>,
num_rows: usize,
}

enum DataSource {
Table(DataSourcePlan),
// It's possible there is zero column, so we also save row number.
ConstTable(ConstTableColumn),
}

impl TryFrom<DataSource> for DataSourcePlan {
type Error = ErrorCode;

fn try_from(value: DataSource) -> Result<Self> {
match value {
DataSource::Table(plan) => Ok(plan),
DataSource::ConstTable(_) => Err(ErrorCode::Internal(
"Cannot convert ConstTable to DataSourcePlan".to_string(),
)),
}
}
}

impl TryFrom<DataSource> for ConstTableColumn {
type Error = ErrorCode;

fn try_from(value: DataSource) -> Result<Self> {
match value {
DataSource::Table(_) => Err(ErrorCode::Internal(
"Cannot convert Table to Vec<Column>".to_string(),
)),
DataSource::ConstTable(columns) => Ok(columns),
}
}
}

struct ReplaceReadSource {
sources: HashMap<u32, DataSourcePlan>,
sources: HashMap<u32, DataSource>,
}

impl PhysicalPlanReplacer for ReplaceReadSource {
Expand All @@ -456,6 +544,8 @@ impl PhysicalPlanReplacer for ReplaceReadSource {
))
})?;

let source = DataSourcePlan::try_from(source)?;

Ok(PhysicalPlan::TableScan(TableScan {
plan_id: plan.plan_id,
source: Box::new(source),
Expand All @@ -466,6 +556,24 @@ impl PhysicalPlanReplacer for ReplaceReadSource {
}))
}

fn replace_constant_table_scan(&mut self, plan: &ConstantTableScan) -> Result<PhysicalPlan> {
let source = self.sources.remove(&plan.plan_id).ok_or_else(|| {
ErrorCode::Internal(format!(
"Cannot find data source for constant table scan plan {}",
plan.plan_id
))
})?;

let const_table_columns = ConstTableColumn::try_from(source)?;

Ok(PhysicalPlan::ConstantTableScan(ConstantTableScan {
plan_id: plan.plan_id,
values: const_table_columns.columns,
num_rows: const_table_columns.num_rows,
output_schema: plan.output_schema.clone(),
}))
}

fn replace_copy_into_table(&mut self, plan: &CopyIntoTable) -> Result<PhysicalPlan> {
match &plan.source {
CopyIntoTableSource::Query(query_physical_plan) => {
Expand Down
11 changes: 8 additions & 3 deletions src/query/sql/src/planner/optimizer/cost/cost_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::Cost;
use super::CostModel;
use crate::optimizer::MExpr;
use crate::optimizer::Memo;
use crate::plans::ConstantTableScan;
use crate::plans::Exchange;
use crate::plans::Join;
use crate::plans::JoinType;
Expand Down Expand Up @@ -78,9 +79,8 @@ impl DefaultCostModel {
fn compute_cost_impl(&self, memo: &Memo, m_expr: &MExpr) -> Result<Cost> {
match m_expr.plan.as_ref() {
RelOperator::Scan(plan) => self.compute_cost_scan(memo, m_expr, plan),
RelOperator::DummyTableScan(_)
| RelOperator::CteScan(_)
| RelOperator::ConstantTableScan(_) => Ok(Cost(0.0)),
RelOperator::ConstantTableScan(plan) => self.compute_cost_constant_scan(plan),
RelOperator::DummyTableScan(_) | RelOperator::CteScan(_) => Ok(Cost(0.0)),
RelOperator::Join(plan) => self.compute_cost_join(memo, m_expr, plan),
RelOperator::UnionAll(_) => self.compute_cost_union_all(memo, m_expr),
RelOperator::Aggregate(_) => self.compute_aggregate(memo, m_expr),
Expand Down Expand Up @@ -108,6 +108,11 @@ impl DefaultCostModel {
Ok(Cost(cost))
}

fn compute_cost_constant_scan(&self, plan: &ConstantTableScan) -> Result<Cost> {
let cost = plan.num_rows as f64 * self.compute_per_row;
Ok(Cost(cost))
}

fn compute_cost_join(&self, memo: &Memo, m_expr: &MExpr, plan: &Join) -> Result<Cost> {
let build_group = m_expr.child_group(memo, 1)?;
let probe_group = m_expr.child_group(memo, 0)?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/plans/constant_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Operator for ConstantTableScan {

fn derive_physical_prop(&self, _rel_expr: &RelExpr) -> Result<PhysicalProperty> {
Ok(PhysicalProperty {
distribution: Distribution::Serial,
distribution: Distribution::Random,
})
}

Expand Down
4 changes: 1 addition & 3 deletions src/query/sql/src/planner/semantic/type_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,7 @@ impl<'a> TypeChecker<'a> {
not,
..
} => {
if self.ctx.get_cluster().is_empty()
&& list.len() >= self.ctx.get_settings().get_inlist_to_join_threshold()?
{
if list.len() >= self.ctx.get_settings().get_inlist_to_join_threshold()? {
if *not {
return self
.resolve_unary_op(*span, &UnaryOperator::Not, &Expr::InList {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
control sortmode rowsort

statement ok
drop table if exists tbl_1;

Expand Down Expand Up @@ -216,4 +218,4 @@ select count(*) FROM test_types where f IS NOT DISTINCT FROM NULL
query I
select count(*) FROM test_types where g IS NOT DISTINCT FROM NULL
----
1
1
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
control sortmode rowsort

statement ok
set max_block_size = 1;

Expand Down Expand Up @@ -222,4 +224,4 @@ select count(*) FROM test_types where g IS NOT DISTINCT FROM NULL
1

statement ok
set max_block_size = 65536;
set max_block_size = 65536;
58 changes: 58 additions & 0 deletions tests/sqllogictests/suites/mode/cluster/explain_v2.test
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,61 @@ drop table t1;

statement ok
drop table t2;

statement ok
create table t1(a int, b int);

statement ok
insert into t1 values(1, 2), (2, 3), (3, 4);

statement ok
set inlist_to_join_threshold = 1;

query T
explain select * from t1 where a in (1, 2);
----
Exchange
├── output columns: [t1.a (#0), t1.b (#1)]
├── exchange type: Merge
└── HashJoin
├── output columns: [t1.a (#0), t1.b (#1)]
├── join type: INNER
├── build keys: [CAST(subquery_2 (#2) AS Int32 NULL)]
├── probe keys: [t1.a (#0)]
├── filters: []
├── estimated rows: 2.00
├── Exchange(Build)
│ ├── output columns: [col0 (#2)]
│ ├── exchange type: Broadcast
│ └── AggregateFinal
│ ├── output columns: [col0 (#2)]
│ ├── group by: [col0]
│ ├── aggregate functions: []
│ ├── estimated rows: 2.00
│ └── Exchange
│ ├── output columns: [#_group_by_key]
│ ├── exchange type: Hash(_group_by_key)
│ └── AggregatePartial
│ ├── group by: [col0]
│ ├── aggregate functions: []
│ ├── estimated rows: 2.00
│ └── ConstantTableScan
│ ├── output columns: [col0 (#2)]
│ └── column 0: [1, 2]
└── TableScan(Probe)
├── table: default.default.t1
├── output columns: [a (#0), b (#1)]
├── read rows: 3
├── read bytes: 88
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 0 to 0>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 3.00


statement ok
drop table t1;

statement ok
unset inlist_to_join_threshold;
8 changes: 5 additions & 3 deletions tests/sqllogictests/suites/query/cte.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
control sortmode rowsort

statement ok
use default

Expand Down Expand Up @@ -417,14 +419,14 @@ with cte as (select a, a + 1 as X, b + 1 as Y from t1 order by a limit 3) select
----
105 101 104

query IT
query IT rowsort
with t(a) as (values(1,'a'),(2,'b'),(null,'c')) select a, col1 from t
----
1 a
2 b
NULL c

query II
query II rowsort
with t(a,b) as (values(1,1),(2,null),(null,5)) select * from t
----
1 1
Expand All @@ -437,7 +439,7 @@ create table test (a int, b string);
statement ok
insert into test values (1, 'a'), (2, 'b');

query TT
query TT rowsort
WITH tTqmc(cC0, cC1) AS (SELECT a AS cC0, b AS cC1 FROM test) SELECT tTqmc.cC1 FROM tTqmc;
----
a
Expand Down
Loading

0 comments on commit d8f0758

Please sign in to comment.