Skip to content

Commit

Permalink
feat(planner): support limit for new planner
Browse files Browse the repository at this point in the history
  • Loading branch information
fkuner committed May 11, 2022
1 parent f070535 commit 5ba330f
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 3 deletions.
30 changes: 29 additions & 1 deletion query/src/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::pipelines::new::processors::SortMergeCompactor;
use crate::pipelines::new::processors::TransformAggregator;
use crate::pipelines::new::processors::TransformFilter;
use crate::pipelines::new::processors::TransformHashJoinProbe;
use crate::pipelines::new::processors::TransformLimit;
use crate::pipelines::new::processors::TransformSortMerge;
use crate::pipelines::new::processors::TransformSortPartial;
use crate::pipelines::new::NewPipeline;
Expand All @@ -57,6 +58,7 @@ use crate::sql::optimizer::SExpr;
use crate::sql::plans::AggregatePlan;
use crate::sql::plans::AndExpr;
use crate::sql::plans::FilterPlan;
use crate::sql::plans::LimitPlan;
use crate::sql::plans::PhysicalHashJoin;
use crate::sql::plans::PhysicalScan;
use crate::sql::plans::PlanType;
Expand Down Expand Up @@ -199,6 +201,12 @@ impl PipelineBuilder {
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_order_by(&sort_plan, input_schema, pipeline)
}
PlanType::Limit => {
let limit_plan: LimitPlan = plan.try_into()?;
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_limit(&limit_plan, input_schema, pipeline)
}
_ => Err(ErrorCode::LogicalError("Invalid physical plan")),
}
}
Expand Down Expand Up @@ -574,6 +582,26 @@ impl PipelineBuilder {
)
})?;

Ok(output_schema.clone())
Ok(output_schema)
}

fn build_limit(
&mut self,
limit_plan: &LimitPlan,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
) -> Result<DataSchemaRef> {
pipeline.resize(1)?;

pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformLimit::try_create(
limit_plan.limit,
limit_plan.offset,
transform_input_port,
transform_output_port,
)
})?;

Ok(input_schema)
}
}
67 changes: 67 additions & 0 deletions query/src/sql/planner/binder/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_ast::ast::Expr;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::sql::binder::Binder;
use crate::sql::optimizer::SExpr;
use crate::sql::planner::semantic::TypeChecker;
use crate::sql::plans::LimitPlan;
use crate::sql::BindContext;

impl<'a> Binder {
pub(super) async fn bind_limit(
&mut self,
limit: Option<&Expr<'a>>,
offset: &Option<Expr<'a>>,
bind_context: &mut BindContext,
) -> Result<()> {
let type_checker = TypeChecker::new(bind_context, self.ctx.clone());

let limit_cnt = match limit {
Some(Expr::Literal(x)) => Some(
type_checker
.parse_literal(x, None)
.unwrap()
.as_u64()
.unwrap() as usize,
),
Some(_) => {
return Err(ErrorCode::IllegalDataType("Unsupported limit type"));
}
None => None,
};

let offset_cnt = if let Some(Expr::Literal(x)) = offset {
type_checker
.parse_literal(x, None)
.unwrap()
.as_u64()
.unwrap() as usize
} else {
0
};

let limit_plan = LimitPlan {
limit: limit_cnt,
offset: offset_cnt,
};
let new_expr =
SExpr::create_unary(limit_plan.into(), bind_context.expression.clone().unwrap());
bind_context.expression = Some(new_expr);
Ok(())
}
}
1 change: 1 addition & 0 deletions query/src/sql/planner/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::storages::Table;
mod aggregate;
mod bind_context;
mod join;
mod limit;
mod project;
mod scalar;
mod scalar_common;
Expand Down
15 changes: 14 additions & 1 deletion query/src/sql/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,20 @@ impl<'a> Binder {
}

if !query.limit.is_empty() {
return Err(ErrorCode::UnImplement("Unsupported LIMIT"));
if query.limit.len() == 1 {
self.bind_limit(Some(&query.limit[0]), &query.offset, &mut bind_context)
.await?;
} else {
self.bind_limit(
Some(&query.limit[0]),
&Some(query.limit[1].clone()),
&mut bind_context,
)
.await?;
}
} else if query.offset.is_some() {
self.bind_limit(None, &query.offset, &mut bind_context)
.await?;
}

Ok(bind_context)
Expand Down
67 changes: 67 additions & 0 deletions query/src/sql/planner/plans/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use crate::sql::optimizer::PhysicalProperty;
use crate::sql::optimizer::RelationalProperty;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::BasePlan;
use crate::sql::plans::LogicalPlan;
use crate::sql::plans::PhysicalPlan;
use crate::sql::plans::PlanType;

#[derive(Clone, Debug)]
pub struct LimitPlan {
pub limit: Option<usize>,
pub offset: usize,
}

impl BasePlan for LimitPlan {
fn plan_type(&self) -> PlanType {
PlanType::Limit
}

fn is_physical(&self) -> bool {
true
}

fn is_logical(&self) -> bool {
true
}

fn as_physical(&self) -> Option<&dyn PhysicalPlan> {
todo!()
}

fn as_logical(&self) -> Option<&dyn LogicalPlan> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl PhysicalPlan for LimitPlan {
fn compute_physical_prop(&self, _expression: &SExpr) -> PhysicalProperty {
todo!()
}
}

impl LogicalPlan for LimitPlan {
fn compute_relational_prop(&self, _expression: &SExpr) -> RelationalProperty {
todo!()
}
}
4 changes: 4 additions & 0 deletions query/src/sql/planner/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod aggregate;
mod filter;
mod hash_join;
mod limit;
mod logical_get;
mod logical_join;
mod pattern;
Expand All @@ -29,6 +30,7 @@ pub use aggregate::AggregatePlan;
use enum_dispatch::enum_dispatch;
pub use filter::FilterPlan;
pub use hash_join::PhysicalHashJoin;
pub use limit::LimitPlan;
pub use logical_get::LogicalGet;
pub use logical_join::LogicalInnerJoin;
pub use pattern::PatternPlan;
Expand Down Expand Up @@ -86,6 +88,7 @@ pub enum PlanType {
Filter,
Aggregate,
Sort,
Limit,

// Pattern
Pattern,
Expand All @@ -104,6 +107,7 @@ pub enum BasePlanImpl {
Filter(FilterPlan),
Aggregate(AggregatePlan),
Sort(SortPlan),
Limit(LimitPlan),

Pattern(PatternPlan),
}
2 changes: 1 addition & 1 deletion query/src/sql/planner/semantic/type_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl<'a> TypeChecker<'a> {
}

/// Resolve literal values.
fn parse_literal(
pub fn parse_literal(
&self,
literal: &Literal,
_required_type: Option<DataTypeImpl>,
Expand Down
79 changes: 79 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,82 @@
2
8
new_planner
=== Test limit ===
0
1
2
3
4
5
6
7
8
9
==================
0
1
2
3
4
5
6
7
8
9
=== Test limit n, m ===
10
11
12
13
14
15
16
17
18
19
==================
10
11
12
13
14
15
16
17
18
19
=== Test limit with offset ===
10
11
12
13
14
15
16
17
18
19
==============================
10
11
12
13
14
15
16
17
18
19
=== Test offset ===
5
6
7
8
9
===================
5
6
7
8
9
0
19 changes: 19 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,23 @@ select to_int(8);
select "new_planner";
select *; -- {ErrorCode 1065}

-- limit
select '=== Test limit ===';
select number from numbers(100) order by number asc limit 10;
select '==================';
select number from numbers(100) order by number*2 asc limit 10;
select '=== Test limit n, m ===';
select number from numbers(100) order by number asc limit 10, 10;
select '==================';
select number from numbers(100) order by number-2 asc limit 10, 10;
select '=== Test limit with offset ===';
select number from numbers(100) order by number asc limit 10 offset 10;
select '==============================';
select number from numbers(100) order by number/2 asc limit 10 offset 10;
select '=== Test offset ===';
select number from numbers(10) order by number asc offset 5;
select '===================';
select number from numbers(10) order by number+number asc offset 5;
select number from numbers(10000000000000) limit 1;

set enable_planner_v2 = 0;

0 comments on commit 5ba330f

Please sign in to comment.