From 05ebffc995cac89df8a36ec2ace4834f0676aac3 Mon Sep 17 00:00:00 2001 From: fkuner <784819644@qq.com> Date: Thu, 12 May 2022 09:18:21 +0800 Subject: [PATCH] feat(planner): support limit for new planner --- query/src/sql/exec/mod.rs | 30 ++++++- query/src/sql/planner/binder/limit.rs | 61 +++++++++++++++ query/src/sql/planner/binder/mod.rs | 1 + query/src/sql/planner/binder/select.rs | 15 +++- query/src/sql/planner/plans/limit.rs | 67 ++++++++++++++++ query/src/sql/planner/plans/mod.rs | 4 + query/src/sql/planner/semantic/type_check.rs | 2 +- .../20+_others/20_0001_planner_v2.result | 78 +++++++++++++++++++ .../20+_others/20_0001_planner_v2.sql | 18 +++++ 9 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 query/src/sql/planner/binder/limit.rs create mode 100644 query/src/sql/planner/plans/limit.rs diff --git a/query/src/sql/exec/mod.rs b/query/src/sql/exec/mod.rs index eb2d2b8f97ee6..d7793f013ac58 100644 --- a/query/src/sql/exec/mod.rs +++ b/query/src/sql/exec/mod.rs @@ -44,6 +44,7 @@ use crate::pipelines::new::processors::SortMergeCompactor; use crate::pipelines::new::processors::TransformAggregator; use crate::pipelines::new::processors::TransformFilter; use crate::pipelines::new::processors::TransformHashJoinProbe; +use crate::pipelines::new::processors::TransformLimit; use crate::pipelines::new::processors::TransformSortMerge; use crate::pipelines::new::processors::TransformSortPartial; use crate::pipelines::new::NewPipeline; @@ -57,6 +58,7 @@ use crate::sql::optimizer::SExpr; use crate::sql::plans::AggregatePlan; use crate::sql::plans::AndExpr; use crate::sql::plans::FilterPlan; +use crate::sql::plans::LimitPlan; use crate::sql::plans::PhysicalHashJoin; use crate::sql::plans::PhysicalScan; use crate::sql::plans::PlanType; @@ -199,6 +201,12 @@ impl PipelineBuilder { self.build_pipeline(context, &expression.children()[0], pipeline)?; self.build_order_by(&sort_plan, input_schema, pipeline) } + PlanType::Limit => { + let limit_plan: LimitPlan = plan.try_into()?; + let input_schema = + self.build_pipeline(context, &expression.children()[0], pipeline)?; + self.build_limit(&limit_plan, input_schema, pipeline) + } _ => Err(ErrorCode::LogicalError("Invalid physical plan")), } } @@ -574,6 +582,26 @@ impl PipelineBuilder { ) })?; - Ok(output_schema.clone()) + Ok(output_schema) + } + + fn build_limit( + &mut self, + limit_plan: &LimitPlan, + input_schema: DataSchemaRef, + pipeline: &mut NewPipeline, + ) -> Result { + pipeline.resize(1)?; + + pipeline.add_transform(|transform_input_port, transform_output_port| { + TransformLimit::try_create( + limit_plan.limit, + limit_plan.offset, + transform_input_port, + transform_output_port, + ) + })?; + + Ok(input_schema) } } diff --git a/query/src/sql/planner/binder/limit.rs b/query/src/sql/planner/binder/limit.rs new file mode 100644 index 0000000000000..5eb5c1b5bf99d --- /dev/null +++ b/query/src/sql/planner/binder/limit.rs @@ -0,0 +1,61 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_ast::ast::Expr; +use common_exception::ErrorCode; +use common_exception::Result; + +use crate::sql::binder::Binder; +use crate::sql::optimizer::SExpr; +use crate::sql::planner::semantic::TypeChecker; +use crate::sql::plans::LimitPlan; +use crate::sql::BindContext; + +impl<'a> Binder { + pub(super) async fn bind_limit( + &mut self, + limit: Option<&Expr<'a>>, + offset: &Option>, + bind_context: &mut BindContext, + ) -> Result<()> { + let type_checker = TypeChecker::new(bind_context, self.ctx.clone()); + + let limit_cnt = match limit { + Some(Expr::Literal { span: _, lit: x }) => { + let data = type_checker.parse_literal(x, None)?.as_u64()?; + Some(data as usize) + } + Some(_) => { + return Err(ErrorCode::IllegalDataType("Unsupported limit type")); + } + None => None, + }; + + let offset_cnt = if let Some(Expr::Literal { span: _, lit: x }) = offset { + let data = type_checker.parse_literal(x, None)?.as_u64()?; + data as usize + } else { + 0 + }; + + let limit_plan = LimitPlan { + limit: limit_cnt, + offset: offset_cnt, + }; + let new_expr = + SExpr::create_unary(limit_plan.into(), bind_context.expression.clone().unwrap()); + bind_context.expression = Some(new_expr); + Ok(()) + } +} diff --git a/query/src/sql/planner/binder/mod.rs b/query/src/sql/planner/binder/mod.rs index 3b37a8f7028f9..363db41f838cc 100644 --- a/query/src/sql/planner/binder/mod.rs +++ b/query/src/sql/planner/binder/mod.rs @@ -28,6 +28,7 @@ use crate::storages::Table; mod aggregate; mod bind_context; mod join; +mod limit; mod project; mod scalar; mod scalar_common; diff --git a/query/src/sql/planner/binder/select.rs b/query/src/sql/planner/binder/select.rs index b45517fcfe285..56be193ea1384 100644 --- a/query/src/sql/planner/binder/select.rs +++ b/query/src/sql/planner/binder/select.rs @@ -76,7 +76,20 @@ impl<'a> Binder { } if !query.limit.is_empty() { - return Err(ErrorCode::UnImplement("Unsupported LIMIT")); + if query.limit.len() == 1 { + self.bind_limit(Some(&query.limit[0]), &query.offset, &mut bind_context) + .await?; + } else { + self.bind_limit( + Some(&query.limit[0]), + &Some(query.limit[1].clone()), + &mut bind_context, + ) + .await?; + } + } else if query.offset.is_some() { + self.bind_limit(None, &query.offset, &mut bind_context) + .await?; } Ok(bind_context) diff --git a/query/src/sql/planner/plans/limit.rs b/query/src/sql/planner/plans/limit.rs new file mode 100644 index 0000000000000..13ea4481f8a14 --- /dev/null +++ b/query/src/sql/planner/plans/limit.rs @@ -0,0 +1,67 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use crate::sql::optimizer::PhysicalProperty; +use crate::sql::optimizer::RelationalProperty; +use crate::sql::optimizer::SExpr; +use crate::sql::plans::BasePlan; +use crate::sql::plans::LogicalPlan; +use crate::sql::plans::PhysicalPlan; +use crate::sql::plans::PlanType; + +#[derive(Clone, Debug)] +pub struct LimitPlan { + pub limit: Option, + pub offset: usize, +} + +impl BasePlan for LimitPlan { + fn plan_type(&self) -> PlanType { + PlanType::Limit + } + + fn is_physical(&self) -> bool { + true + } + + fn is_logical(&self) -> bool { + true + } + + fn as_physical(&self) -> Option<&dyn PhysicalPlan> { + todo!() + } + + fn as_logical(&self) -> Option<&dyn LogicalPlan> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl PhysicalPlan for LimitPlan { + fn compute_physical_prop(&self, _expression: &SExpr) -> PhysicalProperty { + todo!() + } +} + +impl LogicalPlan for LimitPlan { + fn compute_relational_prop(&self, _expression: &SExpr) -> RelationalProperty { + todo!() + } +} diff --git a/query/src/sql/planner/plans/mod.rs b/query/src/sql/planner/plans/mod.rs index 22b2a1d85575e..c2ca6b75eb59a 100644 --- a/query/src/sql/planner/plans/mod.rs +++ b/query/src/sql/planner/plans/mod.rs @@ -15,6 +15,7 @@ mod aggregate; mod filter; mod hash_join; +mod limit; mod logical_get; mod logical_join; mod pattern; @@ -29,6 +30,7 @@ pub use aggregate::AggregatePlan; use enum_dispatch::enum_dispatch; pub use filter::FilterPlan; pub use hash_join::PhysicalHashJoin; +pub use limit::LimitPlan; pub use logical_get::LogicalGet; pub use logical_join::LogicalInnerJoin; pub use pattern::PatternPlan; @@ -86,6 +88,7 @@ pub enum PlanType { Filter, Aggregate, Sort, + Limit, // Pattern Pattern, @@ -104,6 +107,7 @@ pub enum BasePlanImpl { Filter(FilterPlan), Aggregate(AggregatePlan), Sort(SortPlan), + Limit(LimitPlan), Pattern(PatternPlan), } diff --git a/query/src/sql/planner/semantic/type_check.rs b/query/src/sql/planner/semantic/type_check.rs index 9114fc7544c23..d5896247dd9e4 100644 --- a/query/src/sql/planner/semantic/type_check.rs +++ b/query/src/sql/planner/semantic/type_check.rs @@ -455,7 +455,7 @@ impl<'a> TypeChecker<'a> { } /// Resolve literal values. - fn parse_literal( + pub fn parse_literal( &self, literal: &Literal, _required_type: Option, diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result index 4e83c8fd60b49..1279aba63c8d5 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result @@ -112,3 +112,81 @@ 2 8 new_planner +=== Test limit === +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +================== +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +=== Test limit n, m === +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +================== +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +=== Test limit with offset === +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +============================== +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +=== Test offset === +5 +6 +7 +8 +9 +=================== +5 +6 +7 +8 +9 diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql index e1254a69b9628..e5d143392a2a3 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql @@ -91,4 +91,22 @@ select to_int(8); select "new_planner"; select *; -- {ErrorCode 1065} +-- limit +select '=== Test limit ==='; +select number from numbers(100) order by number asc limit 10; +select '=================='; +select number from numbers(100) order by number*2 asc limit 10; +select '=== Test limit n, m ==='; +select number from numbers(100) order by number asc limit 10, 10; +select '=================='; +select number from numbers(100) order by number-2 asc limit 10, 10; +select '=== Test limit with offset ==='; +select number from numbers(100) order by number asc limit 10 offset 10; +select '=============================='; +select number from numbers(100) order by number/2 asc limit 10 offset 10; +select '=== Test offset ==='; +select number from numbers(10) order by number asc offset 5; +select '==================='; +select number from numbers(10) order by number+number asc offset 5; + set enable_planner_v2 = 0;