diff --git a/Config.cpp b/Config.cpp index 341aa300e..59177cfd7 100644 --- a/Config.cpp +++ b/Config.cpp @@ -83,6 +83,7 @@ bool Config::local_disk_mode; int Config::client_listener_port; bool Config::enable_codegen; +bool Config::enable_prune_column; std::string Config::catalog_file; @@ -154,6 +155,8 @@ void Config::initialize() { memory_utilization = getInt("memory_utilization", 100); + enable_prune_column = getBoolean("enable_prune_column", true); + #ifdef DEBUG_Config print_configure(); #endif @@ -211,6 +214,7 @@ void Config::print_configure() const { std::cout << "client_lisener_port:" << client_listener_port << std::endl; std::cout << "catalog_file:" << catalog_file << std::endl; std::cout << "codegen:" << enable_codegen << std::endl; + std::cout << "enable_prune_column: " << enable_prune_column << std::endl; std::cout << "load_thread_num:" << load_thread_num << std::endl; } diff --git a/Config.h b/Config.h index 34defbff1..a6de9ea51 100644 --- a/Config.h +++ b/Config.h @@ -75,6 +75,7 @@ class Config { static bool pipelined_exchange; static int client_listener_port; static bool enable_codegen; + static bool enable_prune_column; static std::string catalog_file; static int thread_pool_init_thread_num; static int memory_utilization; diff --git a/README.md b/README.md index 398ccb856..b040ed0df 100644 --- a/README.md +++ b/README.md @@ -1,55 +1,37 @@ -**CLAIMS** (CLuster-Aware In-Memory Sql query engin) is a parallel in-memory database prototype, which runs on clusters of commodity servers and aims to provide real-time data analytics on relational dataset. +**CLAIMS** (CLuster-Aware In-Memory Sql query engine) is a parallel in-memory database prototype, which runs on clusters of commodity servers and aims to provide real-time data analytics on relational dataset. #### Highlights ##### 1. Massively parallel execution engine. -CLAIMS relies on highly parallel query processing engine to dramatically accelerate data analysis speed. Query evaluations are distributed across the cluster and executed in parallel. Query evaluations are not only distirbuted across the cluster to leverage the computation power of the cluster, but are also executed in a multi-threaded fashion to unleash the power of modern multi-core hardware. +CLAIMS relies on highly parallel query processing engine to dramatically accelerate data analysis speed. Query evaluations are not only distributed across the cluster to leverage the computation power of the cluster, but are also executed in a multi-threaded fashion to unleash the power of modern multi-core hardware. ##### 2. Smart intra-node parallelism. -Pipelining the query execution among nodes in the cluster effectively reduces the response latency and dramatically saves storage space for intermediate query results. However, its benefits degrade tremendously when the workloads are imbalanced among execution partitions due to the improperly generated query execution plan. To tackle this problem, a novel elastic pipelining query processing model is proposed in CLAIMS, which adapts the intra-node parallelism to the runtime workload. Beneficial from elastic pipelining query processing, the parallelism of different execution fragments in a pipelined is self-adaptive with each other and results in an optimal intra-node parallelism assignment. Please refer to our SIGMOD paper for more details about elastic pipelining. -![asdf](http://dase.ecnu.edu.cn/liwang/images/elastic_pipeline.jpg) +Pipelining the query execution among nodes in the cluster effectively reduces the response time and dramatically saves storage space for intermediate query results. However, its benefits degrade tremendously when the workloads are imbalanced among execution partitions due to the improperly generated query execution plan. To tackle this problem, a novel elastic query processing framework, i.e., *elastic pipelining*, is proposed in CLAIMS, which adjusts the intra-node parallelism according to the runtime workload based on elaborate performance model. Beneficial from elastic pipelining query processing, the parallelism of different execution fragments in a pipelined is self-adaptive, resulting in an optimal intra-node parallelism assignment. Please refer to our SIGMOD paper for more details about the elastic pipelining framework. + +![asdf](https://i1.piimg.com/1949/99a94a4e18e6fc21.jpg) + ##### 3. Efficient in-memory data processing. -CLAIMS employs a large set of optimization techniques to achieve efficient in-memory data processing, including batch-at-a-time processing, cache-sensitive operators, SIMD-based optimization, code generation, lock-free and concurrent processing structures. These optimizations work collaborately and enable CLAIMS to process up to gigabytes data per second within a single thread. +CLAIMS employs a large set of optimization techniques to achieve efficient in-memory data processing, including batch-at-a-time processing, cache-sensitive operators, SIMD-based optimization, code generation, lock-free and concurrent processing structures. These optimizations work collaborately and enable CLAIMS to process up to gigabytes data per second on a single thread. ##### 4. Network communication optimization. -Parallel query processing imposes high burdens on network communication, which becomes the performance bottleneck for in-memory parallel databases due to the relatively slow network bandwidth. When compiling a user query into an execution plan, CLAIMS’s query optimizer leverages a sophisticated selectivity propagation system and cost model to generate physical query plans with minimized network communication cost. Furthermore, CLAIMS deploys a new data exchange implementation, which offers efficient, scalable and skew-resilient network data communication among CLAIMS instances. These optimizations greatly reduce the response time of the queries that require network data communication. +Parallel query processing imposes high burdens on network communication, which usually becomes performance bottleneck of the in-memory parallel databases due to the relatively slow network bandwidth. When compiling a user query into an execution plan, CLAIMS’s query optimizer leverages a sophisticated selectivity propagation system and cost model to generate physical query plans with minimized network communication cost. Furthermore, CLAIMS deploys a new data exchange implementation, which offers efficient, scalable and skew-resilient network data transfer among CLAIMS instances. These optimizations greatly reduce the response time for a large variety of queries. #### Performance Beneficial from the smart and massively parallelism and the in-memory data processing optimizations, CLAIMS is up to 5X faster than Shark and Impala, two state-of-the-art systems in the open source community, in the queries against TPCH dataset and Shanghai Stock Exchange dataset. -![asdf](http://dase.ecnu.edu.cn/liwang/images/compare.jpg) - -#### Team members -[Aoying Zhou](http://case.ecnu.edu.cn), Professor in East China Normal University, is the person in charge of this project. - -[Minqi Zhou](https://github.com/polpo1980), Associate Professor in East China Normal University, is the person in charge of this project. - -[Li Wang](https://github.com/wangli1426), Ph.D. student in East China Normal University, manages the master students in this team and is responsible for designing and implementing the key components of CLAIMS, including query optimizer, catalog, physical operators, distributed communication infrastructure, storage layout, etc. - -[Lei Zhang](https://github.com/egraldlo) is responsible for designing and implementing the key components of CLAIMS, including query optimizer, physical operators, persistent data exchange, storage management, etc. - -[Shaochan Dong](https://github.com/scdong) is responsible for designing and implementing in-memory index and index management, data types, as well as data loading and importing. +![asdf](https://i1.piimg.com/1949/de04caa268f1215f.jpg) -[Xinzhou Zhang]() is mainly responsible for web UI design and implementing data importing model. - -[Zhuhe Fang](https://github.com/fzhedu) is mainly responsible for designing and implementing SQL DML parser and physical operators. - -[Yu Kai](https://github.com/yukai2014) is mainly responsible for designing and implementing SQL DDL parser, catalog persistence. - -[Yongfeng Li](https://github.com/NagamineLee) was a formal member of CLAIMS, who participated in designing and implementing catalog model. - -[Lin Gu]() is responsible for designing the demo cases of CLAIMS. #### Publications -1. Li Wang, Minqi Zhou, Zhenjie Zhang, Yin Yang, Aoying Zhou, Dina Bitton. Elastic Pipelining in In-Memory Database Cluster. To appear in Sigmod 2016. +1. Li Wang, Minqi Zhou, Zhenjie Zhang, Yin Yang, Aoying Zhou, Dina Bitton. Elastic Pipelining in In-Memory Database Cluster. ACM SIGMOD 2016, pp. 1279-1294. 2. Li Wang, Minqi Zhou, Zhenjie Zhang, Ming-chien Shan, Aoying Zhou. NUMA-aware Scalable and Efficient Aggregation on Large Domains. IEEE TKDE 2015:4. pp.1071-1084 . 3. Li Wang, Lei Zhang, Chengcheng Yu, Aoying Zhou. Optimizing Pipelined Execution for Distributed In-memory OLAY System. In: DaMen 2014. Springer. 2014. pp. 35-56. 4. Lan Huang, Ke Xun, Xiaozhou Chen, Minqi Zhou, In-memory Cluster Computing: Interactive Data Analysis, Journal of East China Normal University, 2014 @@ -60,4 +42,8 @@ Beneficial from the smart and massively parallelism and the in-memory data proce 9. Yongfeng Li, Minqi Zhou, Hualiang Hu, Survey of resource uniform management and scheduling in cluster, Journal of East China Normal University, 2014 #### Quick Start -Try our CLAIMS, following [Quick Start](https://github.com/dase/CLAIMS/wiki). +Try our CLAIMS, please follow [Quick Start](https://github.com/dase/CLAIMS/wiki/Installation-steps). + +#### More +Learn more information, please go to [Wiki](https://github.com/dase/CLAIMS/wiki). + diff --git a/catalog/projection.cpp b/catalog/projection.cpp index b75f2ee21..c88d5fd1b 100644 --- a/catalog/projection.cpp +++ b/catalog/projection.cpp @@ -72,6 +72,12 @@ bool ProjectionDescriptor::isExist(const string& name) const { } return false; } +bool ProjectionDescriptor::isExist1(const string& name) const { + for (unsigned i = 0; i < column_list_.size(); i++) { + if (column_list_[i].attrName == name) return true; + } + return false; +} bool ProjectionDescriptor::AllPartitionBound() const { return partitioner->allPartitionBound(); } diff --git a/catalog/projection.h b/catalog/projection.h index e4b256e3e..4a992eb47 100644 --- a/catalog/projection.h +++ b/catalog/projection.h @@ -74,6 +74,7 @@ class ProjectionDescriptor { PartitionFunction* partition_functin); Partitioner* getPartitioner() const; bool isExist(const string& name) const; + bool isExist1(const string& name) const; inline void setProjectionID(const ProjectionID& pid) { projection_id_ = pid; } inline map > getFileLocations() const { return fileLocations; @@ -91,7 +92,6 @@ class ProjectionDescriptor { * as this projection's cost */ unsigned int getProjectionCost() const; - private: // ProjectionOffset projection_offset_; ProjectionID projection_id_; diff --git a/common/expression/expr_binary.cpp b/common/expression/expr_binary.cpp index 0200a41b7..194a8c2b2 100644 --- a/common/expression/expr_binary.cpp +++ b/common/expression/expr_binary.cpp @@ -65,5 +65,11 @@ void ExprBinary::InitExprAtPhysicalPlan() { } ExprNode* ExprBinary::ExprCopy() { return new ExprBinary(this); } + +void ExprBinary::GetUniqueAttr(set& attrs) { + arg0_->GetUniqueAttr(attrs); + arg1_->GetUniqueAttr(attrs); +} + } // namespace common } // namespace claims diff --git a/common/expression/expr_binary.h b/common/expression/expr_binary.h index 46c235e25..e7e4cd037 100644 --- a/common/expression/expr_binary.h +++ b/common/expression/expr_binary.h @@ -42,6 +42,7 @@ class ExprBinary : public ExprNode { void InitExprAtPhysicalPlan(); ExprNode* ExprCopy(); + void GetUniqueAttr(set& attrs); private: friend class boost::serialization::access; diff --git a/common/expression/expr_case_when.cpp b/common/expression/expr_case_when.cpp index 45be44571..c2d8ad125 100644 --- a/common/expression/expr_case_when.cpp +++ b/common/expression/expr_case_when.cpp @@ -76,5 +76,15 @@ void ExprCaseWhen::InitExprAtPhysicalPlan() { } ExprNode* ExprCaseWhen::ExprCopy() { return new ExprCaseWhen(this); } + +void ExprCaseWhen::GetUniqueAttr(set& attrs) { + for (int i = 0; i < case_when_.size(); i++) { + case_when_[i]->GetUniqueAttr(attrs); + } + for (int i = 0; i < case_then_.size(); i++) { + case_then_[i]->GetUniqueAttr(attrs); + } +} + } // namespace common } // namespace claims diff --git a/common/expression/expr_case_when.h b/common/expression/expr_case_when.h index 5f987792b..a6c8b0e91 100644 --- a/common/expression/expr_case_when.h +++ b/common/expression/expr_case_when.h @@ -45,6 +45,7 @@ class ExprCaseWhen : public ExprNode { void InitExprAtPhysicalPlan(); ExprNode* ExprCopy(); + void GetUniqueAttr(set& attrs); private: friend class boost::serialization::access; diff --git a/common/expression/expr_column.cpp b/common/expression/expr_column.cpp index c497f8a22..4d0c4f798 100644 --- a/common/expression/expr_column.cpp +++ b/common/expression/expr_column.cpp @@ -53,7 +53,7 @@ void ExprColumn::InitExprAtLogicalPlan(LogicInitCnxt& licnxt) { if (return_type_ == t_string) { value_size_ = std::max(licnxt.schema0_->getcolumn(attr_id_).get_length(), static_cast(BASE_DATA_SIZE)); - } else if (return_type_ == t_decimal) { + } else if (return_type_ == t_decimal) { value_size_ = licnxt.schema0_->getcolumn(attr_id_).size; } else { value_size_ = licnxt.schema0_->getcolumn(attr_id_).get_length(); @@ -68,7 +68,7 @@ void ExprColumn::InitExprAtLogicalPlan(LogicInitCnxt& licnxt) { value_size_ = std::max(licnxt.schema1_->getcolumn(attr_id_).get_length(), static_cast(BASE_DATA_SIZE)); - } else if (return_type_ == t_decimal) { + } else if (return_type_ == t_decimal) { value_size_ = licnxt.schema1_->getcolumn(attr_id_).size; } else { value_size_ = licnxt.schema1_->getcolumn(attr_id_).get_length(); @@ -89,5 +89,10 @@ void ExprColumn::InitExprAtPhysicalPlan() { } ExprNode* ExprColumn::ExprCopy() { return new ExprColumn(this); } + +void ExprColumn::GetUniqueAttr(set& attrs) { + attrs.insert(table_name_ + "." + column_name_); +} + } // namespace common } // namespace claims diff --git a/common/expression/expr_column.h b/common/expression/expr_column.h index 49c3f442b..2333b8469 100644 --- a/common/expression/expr_column.h +++ b/common/expression/expr_column.h @@ -36,6 +36,7 @@ class ExprColumn : public ExprNode { void InitExprAtPhysicalPlan(); ExprNode* ExprCopy(); + void GetUniqueAttr(set& attrs); private: friend class boost::serialization::access; diff --git a/common/expression/expr_in.cpp b/common/expression/expr_in.cpp index d5a9e2491..42db0bf02 100644 --- a/common/expression/expr_in.cpp +++ b/common/expression/expr_in.cpp @@ -95,5 +95,17 @@ void ExprIn::InitExprAtPhysicalPlan() { } ExprNode* ExprIn::ExprCopy() { return new ExprIn(this); } + +void ExprIn::GetUniqueAttr(set& attrs) { + for (int i = 0, size = cmp_expr_.size(); i < size; ++i) { + cmp_expr_[i]->GetUniqueAttr(attrs); + } + for (int i = 0; i < right_node_.size(); i++) { + for (int j = 0; j < right_node_[i].size(); j++) { + right_node_[i][j]->GetUniqueAttr(attrs); + } + } +} + } // namespace common } // namespace claims diff --git a/common/expression/expr_in.h b/common/expression/expr_in.h index 1b542c781..0636a7085 100644 --- a/common/expression/expr_in.h +++ b/common/expression/expr_in.h @@ -51,6 +51,7 @@ class ExprIn : public ExprNode { void InitExprAtPhysicalPlan(); ExprNode* ExprCopy(); + void GetUniqueAttr(set& attrs); private: friend class boost::serialization::access; diff --git a/common/expression/expr_node.h b/common/expression/expr_node.h index a83d97790..a94750636 100644 --- a/common/expression/expr_node.h +++ b/common/expression/expr_node.h @@ -138,6 +138,8 @@ class ExprNode { virtual void InitExprAtLogicalPlan(LogicInitCnxt& licnxt) {} + virtual void GetUniqueAttr(set& attrs) {} + virtual void InitExprAtPhysicalPlan() {} virtual ExprNode* ExprCopy() { return NULL; } bool IsEqualAttr(const Attribute& attr); diff --git a/common/expression/expr_ternary.cpp b/common/expression/expr_ternary.cpp index 14eb4a140..b9b72b1fe 100644 --- a/common/expression/expr_ternary.cpp +++ b/common/expression/expr_ternary.cpp @@ -71,5 +71,12 @@ void ExprTernary::InitExprAtPhysicalPlan() { } ExprNode* ExprTernary::ExprCopy() { return new ExprTernary(this); } + +void ExprTernary::GetUniqueAttr(set& attrs) { + arg0_->GetUniqueAttr(attrs); + arg1_->GetUniqueAttr(attrs); + arg2_->GetUniqueAttr(attrs); +} + } // namespace common } // namespace claims diff --git a/common/expression/expr_ternary.h b/common/expression/expr_ternary.h index 6294c5e3a..a000692f2 100644 --- a/common/expression/expr_ternary.h +++ b/common/expression/expr_ternary.h @@ -42,6 +42,7 @@ class ExprTernary : public ExprNode { void InitExprAtPhysicalPlan(); ExprNode* ExprCopy(); + void GetUniqueAttr(set& attrs); private: friend class boost::serialization::access; diff --git a/common/expression/expr_unary.cpp b/common/expression/expr_unary.cpp index 053a206ff..e06c562e3 100644 --- a/common/expression/expr_unary.cpp +++ b/common/expression/expr_unary.cpp @@ -83,5 +83,8 @@ void ExprUnary::InitExprAtPhysicalPlan() { } ExprNode* ExprUnary::ExprCopy() { return new ExprUnary(this); } +void ExprUnary::GetUniqueAttr(set& attrs) { + arg0_->GetUniqueAttr(attrs); +} } // namespace common } // namespace claims diff --git a/common/expression/expr_unary.h b/common/expression/expr_unary.h index e69699372..b1cedfae6 100644 --- a/common/expression/expr_unary.h +++ b/common/expression/expr_unary.h @@ -35,6 +35,7 @@ class ExprUnary : public ExprNode { virtual void InitExprAtPhysicalPlan(); virtual ExprNode* ExprCopy(); + void GetUniqueAttr(set& attrs); private: friend class boost::serialization::access; diff --git a/conf/config b/conf/config index b3873707e..b0ccfc36e 100755 --- a/conf/config +++ b/conf/config @@ -1,5 +1,5 @@ #本机IP地址 -ip = "219.228.147.12"; +ip = "127.0.0.1"; #端口范围(调试用) PortManager: @@ -11,7 +11,7 @@ PortManager: #master的IP地址和端口 coordinator: { - ip="219.228.147.12" + ip="127.0.0.1" port="12000" } @@ -34,15 +34,16 @@ client_listener_port = 10000 #data="/home/imdb/data/stock/" #data="/home/zzh/data/1partition/" #data="/claimsdata/" -data="/home/zzh/data/sf-1-p4/" - +#data="/home/zzh/data/sf-1-p4/" +data="/home/zzh/Desktop/test_data/" +#data="/home/zzh/Desktop/test_data_old/" #data="/home/fish/data/test/" #data="/home/imdb/data/POC/" #data="/home/imdb/data/POC/" #hdfs主节点 +#data="/home/claims/zzh/4-partiton/" - -hdfs_master_ip="219.228.147.162" +hdfs_master_ip="58.198.176.92" #hdfs主节点端口 hdfs_master_port=9000 @@ -59,7 +60,7 @@ enable_expander_adaptivity=0 pipelined_exchange=1 -local_disk_mode=0 +local_disk_mode=1 scan_batch=100 diff --git a/logical_operator/logical_aggregation.cpp b/logical_operator/logical_aggregation.cpp index dabef085b..ed83d0e78 100755 --- a/logical_operator/logical_aggregation.cpp +++ b/logical_operator/logical_aggregation.cpp @@ -122,8 +122,8 @@ void LogicalAggregation::ChangeAggAttrsForAVG() { PlanContext LogicalAggregation::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } PlanContext ret; const PlanContext child_context = child_->GetPlanContext(); @@ -481,5 +481,16 @@ void LogicalAggregation::Print(int level) const { --level; child_->Print(level); } +void LogicalAggregation::PruneProj(set& above_attrs) { + set above_attrs_copy = above_attrs; + for (int i = 0, size = group_by_attrs_.size(); i < size; ++i) { + group_by_attrs_[i]->GetUniqueAttr(above_attrs_copy); + } + for (int i = 0, size = aggregation_attrs_.size(); i < size; ++i) { + aggregation_attrs_[i]->GetUniqueAttr(above_attrs_copy); + } + child_->PruneProj(above_attrs_copy); + child_ = DecideAndCreateProject(above_attrs_copy, child_); +} } // namespace logical_operator } // namespace claims diff --git a/logical_operator/logical_aggregation.h b/logical_operator/logical_aggregation.h index 7e2289595..1ed702ad3 100755 --- a/logical_operator/logical_aggregation.h +++ b/logical_operator/logical_aggregation.h @@ -94,6 +94,7 @@ class LogicalAggregation : public LogicalOperator { vector aggregation_attrs, LogicalOperator* child); virtual ~LogicalAggregation(); + void PruneProj(set& above_attrs); protected: /** diff --git a/logical_operator/logical_cross_join.cpp b/logical_operator/logical_cross_join.cpp index c83e575bb..7ba31d2fa 100644 --- a/logical_operator/logical_cross_join.cpp +++ b/logical_operator/logical_cross_join.cpp @@ -121,8 +121,8 @@ int LogicalCrossJoin::get_join_policy_() { PlanContext LogicalCrossJoin::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } PlanContext left_plan_context = left_child_->GetPlanContext(); PlanContext right_plan_context = right_child_->GetPlanContext(); @@ -293,6 +293,20 @@ PhysicalOperatorBase* LogicalCrossJoin::GetPhysicalPlan( return cross_join_iterator; } +void LogicalCrossJoin::PruneProj(set& above_attrs) { + set above_attrs_copy = above_attrs; + + for (int i = 0, size = join_condi_.size(); i < size; ++i) { + join_condi_[i]->GetUniqueAttr(above_attrs_copy); + } + set above_attrs_right = above_attrs_copy; + left_child_->PruneProj(above_attrs_copy); + left_child_ = DecideAndCreateProject(above_attrs_copy, left_child_); + + right_child_->PruneProj(above_attrs_right); + right_child_ = DecideAndCreateProject(above_attrs_right, right_child_); +} + int LogicalCrossJoin::GenerateChildPhysicalQueryPlan( PhysicalOperatorBase*& left_child_iterator_tree, PhysicalOperatorBase*& right_child_iterator_tree, diff --git a/logical_operator/logical_cross_join.h b/logical_operator/logical_cross_join.h index 5801d2696..e6c5f25cf 100644 --- a/logical_operator/logical_cross_join.h +++ b/logical_operator/logical_cross_join.h @@ -55,6 +55,7 @@ class LogicalCrossJoin : public LogicalOperator { PlanContext GetPlanContext(); PhysicalOperatorBase* GetPhysicalPlan(const unsigned& blocksize); void Print(int level = 0) const; + void PruneProj(set& above_attrs); protected: /** diff --git a/logical_operator/logical_delete_filter.cpp b/logical_operator/logical_delete_filter.cpp index a0fbd6b78..18d06d175 100755 --- a/logical_operator/logical_delete_filter.cpp +++ b/logical_operator/logical_delete_filter.cpp @@ -87,9 +87,8 @@ LogicalDeleteFilter::~LogicalDeleteFilter() { PlanContext LogicalDeleteFilter::GetPlanContext() { lock_->acquire(); if (NULL != dataflow_) { - // the data flow has been computed*/ - lock_->release(); - return *dataflow_; + delete dataflow_; + dataflow_ = NULL; } /** @@ -98,7 +97,26 @@ PlanContext LogicalDeleteFilter::GetPlanContext() { PlanContext left_dataflow = left_child_->GetPlanContext(); PlanContext right_dataflow = right_child_->GetPlanContext(); PlanContext ret; - + for (int i = 0, size = left_filter_key_list_.size(); i < size; ++i) { + for (int j = 0, jsize = left_dataflow.attribute_list_.size(); j < jsize; + ++j) { + if (left_filter_key_list_[i].attrName == + left_dataflow.attribute_list_[j].attrName) { + left_filter_key_list_[i] = left_dataflow.attribute_list_[j]; + filterkey_pair_list_[i].left_filter_attr_ = + left_dataflow.attribute_list_[j]; + } + } + for (int j = 0, jsize = right_dataflow.attribute_list_.size(); j < jsize; + ++j) { + if (right_filter_key_list_[i].attrName == + right_dataflow.attribute_list_[j].attrName) { + right_filter_key_list_[i] = right_dataflow.attribute_list_[j]; + filterkey_pair_list_[i].right_filter_attr_ = + right_dataflow.attribute_list_[j]; + } + } + } const bool left_dataflow_key_partitioned = CanOmitHashRepartition( left_filter_key_list_, left_dataflow.plan_partitioner_); const bool right_dataflow_key_partitioned = CanOmitHashRepartition( @@ -840,6 +858,23 @@ double LogicalDeleteFilter::PredictFilterSelectivity( } return ret; } + +void LogicalDeleteFilter::PruneProj(set& above_attrs) { + set above_attrs_copy = above_attrs; + set above_attrs_right = above_attrs_copy; + + for (int i = 0, size = filterkey_pair_list_.size(); i < size; ++i) { + above_attrs_copy.insert(filterkey_pair_list_[i].left_filter_attr_.attrName); + above_attrs_right.insert( + filterkey_pair_list_[i].right_filter_attr_.attrName); + } + left_child_->PruneProj(above_attrs_copy); + left_child_ = DecideAndCreateProject(above_attrs_copy, left_child_); + + right_child_->PruneProj(above_attrs_right); + right_child_ = DecideAndCreateProject(above_attrs_right, right_child_); +} + double LogicalDeleteFilter::PredictFilterSelectivityOnSingleJoinAttributePair( const Attribute& attr_left, const Attribute& attr_right) const { double ret; diff --git a/logical_operator/logical_delete_filter.h b/logical_operator/logical_delete_filter.h index aa31f61ef..6e94cab66 100755 --- a/logical_operator/logical_delete_filter.h +++ b/logical_operator/logical_delete_filter.h @@ -127,6 +127,7 @@ class LogicalDeleteFilter : public LogicalOperator { bool GetOptimalPhysicalPlan(Requirement requirement, PhysicalPlanDescriptor& physical_plan_descriptor, const unsigned& block_size = 4096 * 1024); + void PruneProj(set& above_attrs); private: std::vector GetLeftFilterKeyIds() const; diff --git a/logical_operator/logical_equal_join.cpp b/logical_operator/logical_equal_join.cpp index c47a17ea1..b220f3277 100755 --- a/logical_operator/logical_equal_join.cpp +++ b/logical_operator/logical_equal_join.cpp @@ -135,9 +135,8 @@ void LogicalEqualJoin::DecideJoinPolicy(const PlanContext& left_dataflow, PlanContext LogicalEqualJoin::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - // the data flow has been computed*/ - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } /** @@ -145,6 +144,28 @@ PlanContext LogicalEqualJoin::GetPlanContext() { */ PlanContext left_dataflow = left_child_->GetPlanContext(); PlanContext right_dataflow = right_child_->GetPlanContext(); + // update the left and right join key list + for (int i = 0, size = left_join_key_list_.size(); i < size; ++i) { + for (int j = 0, jsize = left_dataflow.attribute_list_.size(); j < jsize; + ++j) { + if (left_join_key_list_[i].attrName == + left_dataflow.attribute_list_[j].attrName) { + left_join_key_list_[i] = left_dataflow.attribute_list_[j]; + joinkey_pair_list_[i].left_join_attr_ = + left_dataflow.attribute_list_[j]; + } + } + for (int j = 0, jsize = right_dataflow.attribute_list_.size(); j < jsize; + ++j) { + if (right_join_key_list_[i].attrName == + right_dataflow.attribute_list_[j].attrName) { + right_join_key_list_[i] = right_dataflow.attribute_list_[j]; + joinkey_pair_list_[i].right_join_attr_ = + right_dataflow.attribute_list_[j]; + } + } + } + PlanContext ret; DecideJoinPolicy(left_dataflow, right_dataflow); const Attribute left_partition_key = @@ -339,9 +360,8 @@ LogicalEqualJoin::JoinPolicy LogicalEqualJoin::DecideLeftOrRightRepartition( PhysicalOperatorBase* LogicalEqualJoin::GetPhysicalPlan( const unsigned& block_size) { - if (NULL == plan_context_) { - GetPlanContext(); - } + // if (NULL == plan_context_) + { GetPlanContext(); } PhysicalHashJoin* join_iterator; PhysicalOperatorBase* child_iterator_left = left_child_->GetPhysicalPlan(block_size); @@ -774,6 +794,21 @@ double LogicalEqualJoin::PredictEqualJoinSelectivity( } return ret; } + +void LogicalEqualJoin::PruneProj(set& above_attrs) { + set above_attrs_copy = above_attrs; + + for (int i = 0, size = join_condi_.size(); i < size; ++i) { + join_condi_[i]->GetUniqueAttr(above_attrs_copy); + } + set above_attrs_right = above_attrs_copy; + left_child_->PruneProj(above_attrs_copy); + left_child_ = DecideAndCreateProject(above_attrs_copy, left_child_); + + right_child_->PruneProj(above_attrs_right); + right_child_ = DecideAndCreateProject(above_attrs_right, right_child_); +} + double LogicalEqualJoin::PredictEqualJoinSelectivityOnSingleJoinAttributePair( const Attribute& attr_left, const Attribute& attr_right) const { double ret; diff --git a/logical_operator/logical_equal_join.h b/logical_operator/logical_equal_join.h index 2eb643e73..5444a1141 100755 --- a/logical_operator/logical_equal_join.h +++ b/logical_operator/logical_equal_join.h @@ -106,6 +106,7 @@ class LogicalEqualJoin : public LogicalOperator { bool GetOptimalPhysicalPlan(Requirement requirement, PhysicalPlanDescriptor& physical_plan_descriptor, const unsigned& block_size = 4096 * 1024); + void PruneProj(set& above_attrs); private: std::vector GetLeftJoinKeyIds() const; diff --git a/logical_operator/logical_filter.cpp b/logical_operator/logical_filter.cpp index 2edb5f6fc..0351b3aa4 100644 --- a/logical_operator/logical_filter.cpp +++ b/logical_operator/logical_filter.cpp @@ -71,8 +71,8 @@ PlanContext LogicalFilter::GetPlanContext() { */ lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } PlanContext plan_context = child_->GetPlanContext(); if (plan_context.IsHashPartitioned()) { @@ -387,6 +387,14 @@ void LogicalFilter::Print(int level) const { child_->Print(level); } +void LogicalFilter::PruneProj(set& above_attrs) { + set above_attrs_copy = above_attrs; + for (int i = 0, size = condition_.size(); i < size; ++i) { + condition_[i]->GetUniqueAttr(above_attrs_copy); + } + child_->PruneProj(above_attrs_copy); + child_ = DecideAndCreateProject(above_attrs_copy, child_); +} } // namespace logical_operator } // namespace claims diff --git a/logical_operator/logical_filter.h b/logical_operator/logical_filter.h index 0ef3c2a65..39613b1f8 100644 --- a/logical_operator/logical_filter.h +++ b/logical_operator/logical_filter.h @@ -87,6 +87,7 @@ class LogicalFilter : public LogicalOperator { * @param level: As an index. */ void Print(int level = 0) const; + void PruneProj(set& above_attrs); private: /** diff --git a/logical_operator/logical_limit.cpp b/logical_operator/logical_limit.cpp index b0908886d..fc952edec 100644 --- a/logical_operator/logical_limit.cpp +++ b/logical_operator/logical_limit.cpp @@ -58,8 +58,8 @@ bool LogicalLimit::CanBeOmitted() const { PlanContext LogicalLimit::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } PlanContext plan_context = child_->GetPlanContext(); if (plan_context.IsHashPartitioned()) { diff --git a/logical_operator/logical_limit.h b/logical_operator/logical_limit.h index 115b2ce9d..bac246a78 100644 --- a/logical_operator/logical_limit.h +++ b/logical_operator/logical_limit.h @@ -92,6 +92,9 @@ class LogicalLimit : public LogicalOperator { virtual void Print(int level = 0) const; LogicalOperator* child_; + void PruneProj(set& above_attrs) { + return child_->PruneProj(above_attrs); + } private: const unsigned PredictCardinality(unsigned i, diff --git a/logical_operator/logical_operator.cpp b/logical_operator/logical_operator.cpp index 30d2ae894..f0c94a411 100755 --- a/logical_operator/logical_operator.cpp +++ b/logical_operator/logical_operator.cpp @@ -30,7 +30,15 @@ #include #include #include + +#include "../common/expression/expr_column.h" +#include "../common/expression/expr_node.h" +#include "../logical_operator/logical_project.h" #include "../Resource/NodeTracker.h" + +using claims::common::ExprColumn; +using claims::common::ExprNode; +using claims::common::ExprNodeType; namespace claims { namespace logical_operator { // LogicalOperator::LogicalOperator() { @@ -104,6 +112,52 @@ void LogicalOperator::GetColumnToId(const std::vector& attributes, column_to_id[attributes[i].attrName] = i; } } +LogicalOperator* LogicalOperator::DecideAndCreateProject( + set& attrs, LogicalOperator* child) { + LogicalOperator* ret = child; + auto child_attr_list = child->GetPlanContext().attribute_list_; + // get the position where the attribute from child should be pruned + vector keep_id; + for (int i = 0, size = child_attr_list.size(); i < size; ++i) { + bool need_prune = true; + for (auto it = attrs.begin(); it != attrs.end() && need_prune; ++it) { + if (*it == child_attr_list[i].attrName) { + need_prune = false; + } + } + if (!need_prune) { + keep_id.push_back(i); + } + } + // if there are attributes should be pruned, then create project + if (keep_id.size() < child_attr_list.size()) { + vector expression_list; + for (int i = 0, size = keep_id.size(); i < size; ++i) { + int pos = child_attr_list[keep_id[i]].attrName.find('.'); + int len = child_attr_list[keep_id[i]].attrName.length(); + expression_list.push_back(new ExprColumn( + ExprNodeType::t_qcolcumns, child_attr_list[keep_id[i]].attrType->type, + child_attr_list[keep_id[i]].attrName, + child_attr_list[keep_id[i]].attrName.substr(0, pos), + child_attr_list[keep_id[i]].attrName.substr(pos + 1, len))); + } + // if no need to provide one column, then choose the first column or don't + // add project + if (keep_id.size() == 0 && child_attr_list.size() > 3) { + int pos = child_attr_list[0].attrName.find('.'); + int len = child_attr_list[0].attrName.length(); + expression_list.push_back(new ExprColumn( + ExprNodeType::t_qcolcumns, child_attr_list[0].attrType->type, + child_attr_list[0].attrName, + child_attr_list[0].attrName.substr(0, pos), + child_attr_list[0].attrName.substr(pos + 1, len))); + } + if (expression_list.size() > 0) { + ret = new LogicalProject(child, expression_list); + } + } + return ret; +} } // namespace logical_operator } // namespace claims diff --git a/logical_operator/logical_operator.h b/logical_operator/logical_operator.h index 28bfb8cdd..2127c9e6c 100755 --- a/logical_operator/logical_operator.h +++ b/logical_operator/logical_operator.h @@ -32,6 +32,9 @@ #include #include #include +#include +#include + #include "../common/ids.h" #include "../common/Schema/SchemaFix.h" #include "../logical_operator/plan_context.h" @@ -111,8 +114,10 @@ class LogicalOperator { const unsigned& block_size = 4096 * 1024){}; virtual void Print(int level = 0) const = 0; - + virtual void PruneProj(set& above_attrs) {} OperatorType get_operator_type() { return operator_type_; } + LogicalOperator* DecideAndCreateProject(set& attrs, + LogicalOperator* child); protected: Schema* GetSchema(const std::vector&) const; diff --git a/logical_operator/logical_outer_join.cpp b/logical_operator/logical_outer_join.cpp index dbc5d9ba3..4ef96e41d 100644 --- a/logical_operator/logical_outer_join.cpp +++ b/logical_operator/logical_outer_join.cpp @@ -79,7 +79,12 @@ LogicalOuterJoin::LogicalOuterJoin( join_policy_(kNull), plan_context_(NULL), join_type_(join_type), - join_condi_(join_condi) {} + join_condi_(join_condi) { + for (unsigned i = 0; i < joinpair_list.size(); ++i) { + left_join_key_list_.push_back(joinpair_list[i].left_join_attr_); + right_join_key_list_.push_back(joinpair_list[i].right_join_attr_); + } +} LogicalOuterJoin::~LogicalOuterJoin() { if (NULL != plan_context_) { delete plan_context_; @@ -133,9 +138,8 @@ void LogicalOuterJoin::DecideJoinPolicy(const PlanContext& left_dataflow, PlanContext LogicalOuterJoin::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - // the data flow has been computed*/ - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } /** @@ -143,12 +147,35 @@ PlanContext LogicalOuterJoin::GetPlanContext() { */ PlanContext left_dataflow = left_child_->GetPlanContext(); PlanContext right_dataflow = right_child_->GetPlanContext(); + + for (int i = 0, size = left_join_key_list_.size(); i < size; ++i) { + for (int j = 0, jsize = left_dataflow.attribute_list_.size(); j < jsize; + ++j) { + if (left_join_key_list_[i].attrName == + left_dataflow.attribute_list_[j].attrName) { + left_join_key_list_[i] = left_dataflow.attribute_list_[j]; + joinkey_pair_list_[i].left_join_attr_ = + left_dataflow.attribute_list_[j]; + } + } + for (int j = 0, jsize = right_dataflow.attribute_list_.size(); j < jsize; + ++j) { + if (right_join_key_list_[i].attrName == + right_dataflow.attribute_list_[j].attrName) { + right_join_key_list_[i] = right_dataflow.attribute_list_[j]; + joinkey_pair_list_[i].right_join_attr_ = + right_dataflow.attribute_list_[j]; + } + } + } PlanContext ret; DecideJoinPolicy(left_dataflow, right_dataflow); const Attribute left_partition_key = left_dataflow.plan_partitioner_.get_partition_key(); const Attribute right_partition_key = right_dataflow.plan_partitioner_.get_partition_key(); + + ret.attribute_list_.insert(ret.attribute_list_.end(), left_dataflow.attribute_list_.begin(), left_dataflow.attribute_list_.end()); @@ -263,9 +290,13 @@ PlanContext LogicalOuterJoin::GetPlanContext() { left_dataflow.plan_partitioner_.GetAggregatedDataSize(); ret.commu_cost_ += right_dataflow.plan_partitioner_.GetAggregatedDataSize(); + auto lt = left_dataflow.plan_partitioner_.get_partition_key(); + auto rt = right_dataflow.plan_partitioner_.get_partition_key(); + ret.plan_partitioner_ = DecideOutputDataflowProperty( left_dataflow, right_dataflow, join_type_); + auto it = ret.plan_partitioner_.get_partition_key(); // // QueryOptimizationLogging::log("[Complete_repartition // hash join] is not implemented, because I'm very lazy. -_- \n"); @@ -308,7 +339,9 @@ bool LogicalOuterJoin::CanOmitHashRepartition( const PlanPartitioner& partitoiner) const { Attribute attribute = partitoiner.get_partition_key(); for (unsigned i = 0; i < join_key_list.size(); i++) { - if (attribute == join_key_list[i]) return true; + if (attribute == join_key_list[i]) { + return true; + } } return false; } @@ -508,6 +541,7 @@ PhysicalOperatorBase* LogicalOuterJoin::GetPhysicalPlan( const Attribute left_partition_key = plan_context_->plan_partitioner_.get_partition_key(); + l_exchange_state.partition_schema_ = partition_schema::set_hash_partition(GetIdInAttributeList( dataflow_left.attribute_list_, left_partition_key)); @@ -810,6 +844,21 @@ double LogicalOuterJoin::PredictEqualJoinSelectivity( } return ret; } + +void LogicalOuterJoin::PruneProj(set& above_attrs) { + set above_attrs_copy = above_attrs; + + for (int i = 0, size = join_condi_.size(); i < size; ++i) { + join_condi_[i]->GetUniqueAttr(above_attrs_copy); + } + set above_attrs_right = above_attrs_copy; + left_child_->PruneProj(above_attrs_copy); + left_child_ = DecideAndCreateProject(above_attrs_copy, left_child_); + + right_child_->PruneProj(above_attrs_right); + right_child_ = DecideAndCreateProject(above_attrs_right, right_child_); +} + double LogicalOuterJoin::PredictEqualJoinSelectivityOnSingleJoinAttributePair( const Attribute& attr_left, const Attribute& attr_right) const { double ret; diff --git a/logical_operator/logical_outer_join.h b/logical_operator/logical_outer_join.h index 3d7fcaf15..f1b6cb468 100644 --- a/logical_operator/logical_outer_join.h +++ b/logical_operator/logical_outer_join.h @@ -110,6 +110,7 @@ class LogicalOuterJoin : public LogicalOperator { bool GetOptimalPhysicalPlan(Requirement requirement, PhysicalPlanDescriptor& physical_plan_descriptor, const unsigned& block_size = 4096 * 1024); + void PruneProj(set& above_attrs); private: std::vector GetLeftJoinKeyIds() const; diff --git a/logical_operator/logical_project.cpp b/logical_operator/logical_project.cpp index cffafc8c6..65e5a914d 100644 --- a/logical_operator/logical_project.cpp +++ b/logical_operator/logical_project.cpp @@ -70,14 +70,14 @@ LogicalProject::~LogicalProject() { delete child_; child_ = NULL; } + for (auto it = expr_list_.begin(); it != expr_list_.end() ; it++) { + delete *it; + } } // construct a PlanContext from child PlanContext LogicalProject::GetPlanContext() { lock_->acquire(); - if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; - } + PlanContext ret; // get the PlanContext of child const PlanContext child_plan_context = child_->GetPlanContext(); @@ -137,7 +137,15 @@ PlanContext LogicalProject::GetPlanContext() { ret_attrs.clear(); LogicInitCnxt licnxt; licnxt.schema0_ = input_schema; - int mid_table_id = MIDINADE_TABLE_ID++; + int mid_table_id = 0; + if (plan_context_ == NULL) { + mid_table_id = MIDINADE_TABLE_ID++; + } else { + mid_table_id = + plan_context_->attribute_list_[0].table_id_; + DELETE_PTR(plan_context_); + plan_context_ == NULL; + } GetColumnToId(child_plan_context.attribute_list_, licnxt.column_id0_); for (int i = 0; i < expr_list_.size(); ++i) { licnxt.return_type_ = expr_list_[i]->actual_type_; @@ -153,7 +161,6 @@ PlanContext LogicalProject::GetPlanContext() { } } } - #endif // set the attribute list of the PlanContext to be returned ret.attribute_list_ = ret_attrs; @@ -185,6 +192,7 @@ PhysicalOperatorBase* LogicalProject::GetPhysicalPlan( // construct a schema from attribute list of PlanContext Schema* LogicalProject::GetOutputSchema() { Schema* schema = GetSchema(plan_context_->attribute_list_); + return schema; } @@ -216,6 +224,12 @@ void LogicalProject::Print(int level) const { #endif child_->Print(level); } +void LogicalProject::PruneProj(set& above_attrs) { + for (int i = 0, size = expr_list_.size(); i < size; ++i) { + expr_list_[i]->GetUniqueAttr(above_attrs); + } + child_->PruneProj(above_attrs); +} } // namespace logical_operator } // namespace claims diff --git a/logical_operator/logical_project.h b/logical_operator/logical_project.h index 23d5a7848..0a78fa8da 100644 --- a/logical_operator/logical_project.h +++ b/logical_operator/logical_project.h @@ -79,6 +79,7 @@ class LogicalProject : public LogicalOperator { * @param level:initialized to zero */ void Print(int level = 0) const; + void PruneProj(set& above_attrs); private: /** diff --git a/logical_operator/logical_query_plan_root.cpp b/logical_operator/logical_query_plan_root.cpp index 02a2260f5..d74b358e4 100644 --- a/logical_operator/logical_query_plan_root.cpp +++ b/logical_operator/logical_query_plan_root.cpp @@ -190,8 +190,8 @@ PhysicalOperatorBase* LogicalQueryPlanRoot::GetPhysicalPlan( PlanContext LogicalQueryPlanRoot::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } PlanContext ret = child_->GetPlanContext(); LOG(INFO) << "Communication cost: " << ret.commu_cost_ @@ -441,6 +441,9 @@ void LogicalQueryPlanRoot::Print(int level) const { << endl; child_->Print(level); } +void LogicalQueryPlanRoot::PruneProj(set& above_attrs) { + child_->PruneProj(above_attrs); +} } // namespace logical_operator } // namespace claims diff --git a/logical_operator/logical_query_plan_root.h b/logical_operator/logical_query_plan_root.h index 67c4bccd9..c0bc53b74 100644 --- a/logical_operator/logical_query_plan_root.h +++ b/logical_operator/logical_query_plan_root.h @@ -99,6 +99,7 @@ class LogicalQueryPlanRoot : public LogicalOperator { * @return void */ void Print(int level = 0) const; + void PruneProj(set& above_attrs); private: /** diff --git a/logical_operator/logical_scan.cpp b/logical_operator/logical_scan.cpp index 70e9c79bc..a969484c0 100644 --- a/logical_operator/logical_scan.cpp +++ b/logical_operator/logical_scan.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include "../catalog/catalog.h" #include "../IDsGenerator.h" @@ -46,6 +47,7 @@ using claims::physical_operator::ExchangeMerger; using claims::physical_operator::PhysicalProjectionScan; namespace claims { namespace logical_operator { +ProjectionOffset get_Max_projection(TableDescriptor* table); LogicalScan::LogicalScan(std::vector attribute_list) : LogicalOperator(kLogicalScan), scan_attribute_list_(attribute_list), @@ -70,15 +72,16 @@ LogicalScan::LogicalScan(ProjectionDescriptor* projection, scan_attribute_list_ = projection->getAttributeList(); target_projection_ = projection; } -LogicalScan::LogicalScan(ProjectionDescriptor* const projection, - string table_alias, const float sample_rate) - : LogicalOperator(kLogicalScan), +LogicalScan::LogicalScan(string table_alias, set columns, + string table_name, bool is_all, + const float sample_rate) + : columns_(columns), + table_name_(table_name), + LogicalOperator(kLogicalScan), table_alias_(table_alias), + is_all_(is_all), sample_rate_(sample_rate), plan_context_(NULL) { - scan_attribute_list_ = projection->getAttributeList(); - ChangeAliasAttr(); - target_projection_ = projection; } LogicalScan::LogicalScan( const TableID& table_id, @@ -124,75 +127,86 @@ void LogicalScan::ChangeAliasAttr() { PlanContext LogicalScan::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } plan_context_ = new PlanContext(); - - TableID table_id = scan_attribute_list_[0].table_id_; - TableDescriptor* table = Catalog::getInstance()->getTable(table_id); - - if (NULL == target_projection_) { - ProjectionOffset target_projection_off = -1; - unsigned int min_projection_cost = -1; - // TODO(KaiYu): get real need column as scan_attribute_list_, otherwise, - // optimization don't work - for (ProjectionOffset projection_off = 0; - projection_off < table->getNumberOfProjection(); projection_off++) { - ProjectionDescriptor* projection = table->getProjectoin(projection_off); - bool fail = false; - for (std::vector::iterator it = scan_attribute_list_.begin(); - it != scan_attribute_list_.end(); it++) { - if (!projection->hasAttribute(*it)) { - /*the attribute *it is not in the projection*/ - fail = true; - break; + TableDescriptor* table = Catalog::getInstance()->getTable(table_name_); + ProjectionOffset target_projection_off = -1; + unsigned int min_projection_cost = 65535; + if (is_all_ != true) { + if (columns_.find("*") != columns_.end()) { + // if is all, select tableA.* from tableA, give largest projection; + target_projection_off = get_Max_projection(table); + } else { + for (ProjectionOffset projection_off = 0; + projection_off < table->getNumberOfProjection(); + projection_off++) { + ProjectionDescriptor* projection = + table->getProjectoin(projection_off); + bool fail = false; + for (set::const_iterator it = columns_.begin(); + it != columns_.end(); it++) { + if (!projection->isExist1(table_name_+"."+*it)) { + /*the attribute *it is not in the projection*/ + fail = true; + break; + } + } + if (fail == true) { + continue; + } + unsigned int projection_cost = projection->getProjectionCost(); + if ( projection_off == 0 ) { + min_projection_cost = projection_cost; + target_projection_off = 0; + } + if (min_projection_cost > projection_cost) { + target_projection_off = projection_off; + min_projection_cost = projection_cost; + } + } + if (target_projection_off != -1) { + target_projection_ = table->getProjectoin(target_projection_off); } - } - if (fail == true) { - continue; - } - unsigned int projection_cost = projection->getProjectionCost(); - // get the projection with minimum cost - if (min_projection_cost > projection_cost) { - target_projection_off = projection_off; - min_projection_cost = projection_cost; - cout << "in " << table->getNumberOfProjection() << " projections, " - "projection " - << projection_off << " has less cost:" << projection_cost << endl; - } - } - if (target_projection_off == -1) { - // fail to find a projection that contains all the scan attribute - LOG(ERROR) << "The current implementation does not support the scanning " - "that involves more than one projection." << std::endl; - assert(false); } + } else { + // if is all, select * from tableA, give largest projection; + target_projection_off = get_Max_projection(table); target_projection_ = table->getProjectoin(target_projection_off); - cout << "in " << table->getNumberOfProjection() << " projections, " - "projection " - << target_projection_off << " has min cost:" << min_projection_cost - << endl; } - if (!target_projection_->AllPartitionBound()) { Catalog::getInstance()->getBindingModele()->BindingEntireProjection( target_projection_->getPartitioner(), DESIRIABLE_STORAGE_LEVEL); } - - /** - * @brief build the PlanContext - */ - - plan_context_->attribute_list_ = scan_attribute_list_; // attribute_list_ - + plan_context_->attribute_list_ = target_projection_->getAttributeList(); + for (auto &it : plan_context_->attribute_list_) { + it.attrName = table_alias_ + it.attrName.substr(it.attrName.find('.')); + } Partitioner* par = target_projection_->getPartitioner(); plan_context_->plan_partitioner_ = PlanPartitioner(*par); plan_context_->plan_partitioner_.UpdateTableNameOfPartitionKey(table_alias_); plan_context_->commu_cost_ = 0; + lock_->release(); return *plan_context_; } +ProjectionOffset get_Max_projection(TableDescriptor* table) { + unsigned int max_projection_cost = 0; + ProjectionOffset target_projection_off = -1; + for (ProjectionOffset projection_off = 0; + projection_off < table->getNumberOfProjection(); projection_off++) { + ProjectionDescriptor* projection = + table->getProjectoin(projection_off); + unsigned int projection_cost = projection->getProjectionCost(); + // get the projection with maximum cost + if (max_projection_cost < projection_cost) { + target_projection_off = projection_off; + max_projection_cost = projection_cost; + } + } + return target_projection_off; +} /** * @brief Set the value of class state_ and get instantiation of physical diff --git a/logical_operator/logical_scan.h b/logical_operator/logical_scan.h index 7e8a0e052..7dd46d3cf 100644 --- a/logical_operator/logical_scan.h +++ b/logical_operator/logical_scan.h @@ -31,6 +31,8 @@ #include #include +#include + #include "../common/ids.h" #include "../catalog/attribute.h" #include "../catalog/table.h" @@ -53,8 +55,8 @@ class LogicalScan : public LogicalOperator { LogicalScan(std::vector attribute_list); LogicalScan(const TableID&); LogicalScan(ProjectionDescriptor* projection, const float sample_rate_ = 1); - LogicalScan(ProjectionDescriptor* const projection, string table_alias, - const float sample_rate_ = 1); + LogicalScan(string table_alias, set columns, string table_name, + bool is_all, const float sample_rate_ = 1); LogicalScan(const TableID&, const std::vector& selected_attribute_index_list); @@ -65,7 +67,6 @@ class LogicalScan : public LogicalOperator { PhysicalPlanDescriptor& physical_plan_descriptor, const unsigned& kBlock_size = 4096 * 1024); void ChangeAliasAttr(); - private: /**check whether all the involved attributes are in the same projection.*/ bool IsInASingleProjection() const; @@ -76,7 +77,10 @@ class LogicalScan : public LogicalOperator { ProjectionDescriptor* target_projection_; PlanContext* plan_context_; string table_alias_; + string table_name_; float sample_rate_; + set columns_; + bool is_all_; }; } // namespace logical_operator diff --git a/logical_operator/logical_sort.cpp b/logical_operator/logical_sort.cpp index 56944012a..ae32b48d2 100644 --- a/logical_operator/logical_sort.cpp +++ b/logical_operator/logical_sort.cpp @@ -78,8 +78,8 @@ LogicalSort::~LogicalSort() { PlanContext LogicalSort::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } // Get the information from its child PlanContext child_plan_context_ = child_->GetPlanContext(); @@ -209,5 +209,15 @@ void LogicalSort::Print(int level) const { PrintOrderByAttr(level); child_->Print(level); } + +void LogicalSort::PruneProj(set &above_attrs) { + set above_attrs_copy = above_attrs; + for (int i = 0, size = order_by_attrs_.size(); i < size; ++i) { + order_by_attrs_[i].first->GetUniqueAttr(above_attrs_copy); + } + child_->PruneProj(above_attrs_copy); + child_ = DecideAndCreateProject(above_attrs_copy, child_); +} + } // namespace logical_operator } // namespace claims diff --git a/logical_operator/logical_sort.h b/logical_operator/logical_sort.h index b67e0453b..5d1d0e57f 100644 --- a/logical_operator/logical_sort.h +++ b/logical_operator/logical_sort.h @@ -110,6 +110,7 @@ class LogicalSort : public LogicalOperator { virtual bool GetOptimalPhysicalPlan( Requirement requirement, PhysicalPlanDescriptor &physical_plan_descriptor, const unsigned &block_size = 4096 * 1024) {} + void PruneProj(set &above_attrs); private: vector> order_by_attrs_; diff --git a/logical_operator/logical_subquery.cpp b/logical_operator/logical_subquery.cpp index feb5180f1..d2fe538f0 100644 --- a/logical_operator/logical_subquery.cpp +++ b/logical_operator/logical_subquery.cpp @@ -53,8 +53,8 @@ LogicalSubquery::~LogicalSubquery() { PlanContext LogicalSubquery::GetPlanContext() { lock_->acquire(); if (NULL != plan_context_) { - lock_->release(); - return *plan_context_; + delete plan_context_; + plan_context_ = NULL; } PlanContext ret; // get the PlanContext of child diff --git a/logical_operator/logical_subquery.h b/logical_operator/logical_subquery.h index 8a7404757..593edec7c 100644 --- a/logical_operator/logical_subquery.h +++ b/logical_operator/logical_subquery.h @@ -48,6 +48,7 @@ class LogicalSubquery : public LogicalOperator { PlanContext GetPlanContext(); PhysicalOperatorBase *GetPhysicalPlan(const unsigned &blocksize); void Print(int level = 0) const; + void PruneProj(set &above_attrs) { child_->PruneProj(above_attrs); } private: vector subquery_attrs_; diff --git a/node_manager/master_node.cpp b/node_manager/master_node.cpp index 9fca1758a..035288440 100644 --- a/node_manager/master_node.cpp +++ b/node_manager/master_node.cpp @@ -96,7 +96,6 @@ class MasterNodeActor : public event_based_actor { }, [=](HeartBeatAtom, unsigned int node_id_, string address_, uint16_t port_) -> caf::message { auto it = master_node_->node_id_to_heartbeat_.find(node_id_); - //有可能再重启后两个不同的ip使用相同的nodeID 所以要避免 if (it != master_node_->node_id_to_heartbeat_.end() && !(master_node_->node_id_to_addr_.find(node_id_)->second.first.compare(address_))){ //clear heartbeat count. diff --git a/sql_parser/ast_node/ast_node.cpp b/sql_parser/ast_node/ast_node.cpp index e0f9153d1..9e559daf3 100644 --- a/sql_parser/ast_node/ast_node.cpp +++ b/sql_parser/ast_node/ast_node.cpp @@ -56,6 +56,10 @@ void AstNode::Print(int level) const { cout << setw(level * 8) << " " << "This is an AST_NODE!" << endl; } + +RetCode AstNode::SetScanAttrList(SemanticContext* sem_cnxt) { + return rSuccess; +} RetCode AstNode::SemanticAnalisys(SemanticContext* sem_cnxt) { LOG(WARNING) << "This is AstNode's semantic analysis" << endl; return rSuccess; @@ -177,6 +181,7 @@ AstNode* AstNode::GetAndExpr(const set& expression) { } return NULL; } + void AstStmtList::Print(int level) const { cout << setw(level * 8) << " " << "|stmt list|" << endl; @@ -187,6 +192,17 @@ void AstStmtList::Print(int level) const { next_->Print(level++); } } + +RetCode AstStmtList::SetScanAttrList(SemanticContext *sem_cnxt) { + if (stmt_ != NULL) { + stmt_->SetScanAttrList(sem_cnxt); + } + if (next_ != NULL) { + next_->SetScanAttrList(sem_cnxt); + } + return rSuccess; +} + RetCode AstStmtList::SemanticAnalisys(SemanticContext* sem_cnxt) { RetCode ret = rSuccess; if (NULL != stmt_) { @@ -225,6 +241,7 @@ SemanticContext::SemanticContext() { clause_type_ = kNone; have_agg = false; select_expr_have_agg = false; + is_all = false; } SemanticContext::~SemanticContext() {} diff --git a/sql_parser/ast_node/ast_node.h b/sql_parser/ast_node/ast_node.h index 060a12567..2aaf4a072 100644 --- a/sql_parser/ast_node/ast_node.h +++ b/sql_parser/ast_node/ast_node.h @@ -185,13 +185,17 @@ class SemanticContext { vector select_expr_; vector index_; // for create projection execute function string error_msg_; - + // for remember all column we need to choose scan projection. + bool is_all; + std::unordered_map> table_to_column; private: set aggregation_; vector groupby_attrs_; set select_attrs_; multimap column_to_table_; set tables_; + + }; class PushDownConditionContext { public: @@ -290,6 +294,7 @@ class AstNode { return rSuccess; } AstNode* GetAndExpr(const set& expression); + virtual RetCode SetScanAttrList(SemanticContext *sem_cnxt); AstNodeType ast_node_type_; string expr_str_; }; @@ -311,6 +316,7 @@ class AstStmtList : public AstNode { AstStmtList(AstNodeType ast_node_type, AstNode* stmt, AstNode* next); ~AstStmtList(); void Print(int level = 0) const; + RetCode SetScanAttrList(SemanticContext *sem_cnxt); RetCode SemanticAnalisys(SemanticContext* sem_cnxt); RetCode PushDownCondition(PushDownConditionContext& pdccnxt); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); diff --git a/sql_parser/ast_node/ast_select_stmt.cpp b/sql_parser/ast_node/ast_select_stmt.cpp index f1308a608..76ffa8176 100644 --- a/sql_parser/ast_node/ast_select_stmt.cpp +++ b/sql_parser/ast_node/ast_select_stmt.cpp @@ -111,6 +111,17 @@ void AstSelectList::Print(int level) const { next_->Print(level); } } + +RetCode AstSelectList::SetScanAttrList(SemanticContext* sem_cnxt) { + if (args_ != NULL) { + args_->SetScanAttrList(sem_cnxt); + } + if (next_ != NULL) { + next_->SetScanAttrList(sem_cnxt); + return rSuccess; + } +} + RetCode AstSelectList::SemanticAnalisys(SemanticContext* sem_cnxt) { RetCode ret = rSuccess; if (NULL != args_) { @@ -166,6 +177,9 @@ AstSelectExpr::AstSelectExpr(AstNodeType ast_node_type, std::string expr_alias, AstSelectExpr::~AstSelectExpr() { delete expr_; } + + + void AstSelectExpr::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|select expr|" << endl; @@ -175,6 +189,14 @@ void AstSelectExpr::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "expr alias: " << expr_alias_ << endl; } + +RetCode AstSelectExpr::SetScanAttrList(SemanticContext* sem_cnxt) { + if (expr_ != NULL) { + expr_->SetScanAttrList(sem_cnxt); + } + return rSuccess; +} + // there is no need to eliminate alias conflict in top select, but in sub query, // the alias conflict will be checked by Ast. RetCode AstSelectExpr::SemanticAnalisys(SemanticContext* sem_cnxt) { @@ -261,6 +283,24 @@ void AstFromList::Print(int level) const { next_->Print(level); } } + +RetCode AstFromList::SetScanAttrList(SemanticContext* sem_cnxt) { + for (auto it = equal_join_condition_.begin(); + it != equal_join_condition_.end(); ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + for (auto it = normal_condition_.begin(); it != normal_condition_.end(); + ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + if (args_ != NULL) { + args_->SetScanAttrList(sem_cnxt); + } + if (next_ != NULL) { + next_->SetScanAttrList(sem_cnxt); + } + return rSuccess; +} RetCode AstFromList::SemanticAnalisys(SemanticContext* sem_cnxt) { sem_cnxt->clause_type_ = SemanticContext::kFromClause; RetCode ret = rSuccess; @@ -402,6 +442,30 @@ void AstTable::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "table_alias: " << table_alias_ << endl; } + +RetCode AstTable::SetScanAttrList(SemanticContext* sem_cnxt) { + for (auto it = equal_join_condition_.begin(); + it != equal_join_condition_.end(); ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + for (auto it = normal_condition_.begin(); it != normal_condition_.end(); + ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + // if sql is not select * + if (sem_cnxt->is_all != true) { + is_all_ = false; + if (sem_cnxt->table_to_column.find(table_name_) != + sem_cnxt->table_to_column.end()) { + columns_ = sem_cnxt->table_to_column[table_name_]; + return rSuccess; + } else { + return rTableNotExisted; + } + } else { + is_all_ = true; + } +} RetCode AstTable::SemanticAnalisys(SemanticContext* sem_cnxt) { RetCode ret = rSuccess; TableDescriptor* tbl = @@ -442,19 +506,12 @@ RetCode AstTable::GetLogicalPlan(LogicalOperator*& logic_plan) { ->getCatalog() ->getTable(table_name_) ->HasDeletedTuples()) { - LogicalOperator* base_table = new LogicalScan(Environment::getInstance() - ->getCatalog() - ->getTable(table_name_) - ->getProjectoin(0), - table_alias_); + LogicalOperator* base_table = new LogicalScan(table_alias_, columns_, table_name_, is_all_); + Attribute filter_base = base_table->GetPlanContext().plan_partitioner_.get_partition_key(); LogicalOperator* del_table = - new LogicalScan(Environment::getInstance() - ->getCatalog() - ->getTable(table_name_ + "_DEL") - ->getProjectoin(0), - table_alias_ + "_DEL"); + new LogicalScan(table_alias_ + "_DEL", columns_, table_name_ , is_all_); Attribute filter_del = del_table->GetPlanContext().plan_partitioner_.get_partition_key(); @@ -466,11 +523,7 @@ RetCode AstTable::GetLogicalPlan(LogicalOperator*& logic_plan) { logic_plan = new LogicalDeleteFilter(filter_pair, del_table, base_table); } else { - logic_plan = new LogicalScan(Environment::getInstance() - ->getCatalog() - ->getTable(table_name_) - ->getProjectoin(0), - table_alias_); + logic_plan = new LogicalScan(table_alias_, columns_, table_name_, is_all_); } if (equal_join_condition_.size() > 0) { LOG(ERROR) << "equal join condition shouldn't occur in a single table!" @@ -528,6 +581,22 @@ void AstSubquery::Print(int level) const { subquery_->Print(level); } } + +RetCode AstSubquery::SetScanAttrList(SemanticContext* sem_cnxt) { + for (auto it = equal_join_condition_.begin(); + it != equal_join_condition_.end(); ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + for (auto it = normal_condition_.begin(); it != normal_condition_.end(); + ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + if (subquery_ != NULL) { + subquery_->SetScanAttrList(sem_cnxt); + } + return rSuccess; +} + RetCode AstSubquery::SemanticAnalisys(SemanticContext* sem_cnxt) { SemanticContext sub_sem_cnxt; // // subquery_alias_ == existed_table? @@ -610,6 +679,14 @@ AstJoinCondition::AstJoinCondition(AstNodeType ast_node_type, condition_(condition) {} AstJoinCondition::~AstJoinCondition() { delete condition_; } + +RetCode AstJoinCondition::SetScanAttrList(SemanticContext* sem_cnxt) { + if (condition_ != NULL) { + condition_->SetScanAttrList(sem_cnxt); + } + return rSuccess; +} + void AstJoinCondition::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|join condition| " << join_condition_type_ << endl; @@ -620,6 +697,7 @@ void AstJoinCondition::Print(int level) const { << "null" << endl; } } + RetCode AstJoinCondition::SemanticAnalisys(SemanticContext* sem_cnxt) { if (NULL != condition_) { return condition_->SemanticAnalisys(sem_cnxt); @@ -671,6 +749,20 @@ AstJoin::~AstJoin() { delete right_table_; delete join_condition_; } +RetCode AstJoin::SetScanAttrList(SemanticContext* sem_cnxt) { + for (auto it = equal_join_condition_.begin(); + it != equal_join_condition_.end(); ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + for (auto it = normal_condition_.begin(); it != normal_condition_.end(); + ++it) { + (*it)->SetScanAttrList(sem_cnxt); + } + if (left_table_ != NULL) left_table_->SetScanAttrList(sem_cnxt); + if (right_table_ != NULL) right_table_->SetScanAttrList(sem_cnxt); + if (join_condition_ != NULL) join_condition_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstJoin::Print(int level) const { cout << setw(level++ * TAB_SIZE) << " " @@ -880,6 +972,12 @@ AstWhereClause::AstWhereClause(AstNodeType ast_node_type, AstNode* expr) AstWhereClause::~AstWhereClause() { delete expr_; } + +RetCode AstWhereClause::SetScanAttrList(SemanticContext* sem_cnxt) { + if (expr_ != NULL) expr_->SetScanAttrList(sem_cnxt); + return rSuccess; +} + void AstWhereClause::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|where clause| " << endl; @@ -910,6 +1008,16 @@ AstGroupByList::~AstGroupByList() { delete next_; } +RetCode AstGroupByList::SetScanAttrList(SemanticContext* sem_cnxt) { + if (expr_ != NULL) { + expr_->SetScanAttrList(sem_cnxt); + } + if (next_ != NULL) { + next_->SetScanAttrList(sem_cnxt); + } + return rSuccess; +} + void AstGroupByList::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|groupby list| " << expr_str_ << endl; @@ -989,6 +1097,10 @@ AstGroupByClause::AstGroupByClause(AstNodeType ast_node_type, AstGroupByClause::~AstGroupByClause() { delete groupby_list_; } +RetCode AstGroupByClause::SetScanAttrList(SemanticContext* sem_cnxt) { + groupby_list_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstGroupByClause::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|groupby clause| " @@ -1037,6 +1149,11 @@ AstOrderByList::~AstOrderByList() { delete next_; } +RetCode AstOrderByList::SetScanAttrList(SemanticContext* sem_cnxt) { + if (expr_ != NULL) expr_->SetScanAttrList(sem_cnxt); + if (next_ != NULL) next_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstOrderByList::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|orderby list| " << endl; @@ -1108,6 +1225,11 @@ AstOrderByClause::AstOrderByClause(AstNodeType ast_node_type, AstOrderByClause::~AstOrderByClause() { delete orderby_list_; } +RetCode AstOrderByClause::SetScanAttrList(SemanticContext *sem_cnxt) { + if (orderby_list_ != NULL) orderby_list_->SetScanAttrList(sem_cnxt); + return rSuccess; +} + void AstOrderByClause::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|orderby clause| " << endl; @@ -1166,6 +1288,10 @@ AstHavingClause::AstHavingClause(AstNodeType ast_node_type, AstNode* expr) AstHavingClause::~AstHavingClause() { delete expr_; } +RetCode AstHavingClause::SetScanAttrList(SemanticContext* sem_cnxt) { + if (expr_ != NULL) expr_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstHavingClause::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|having clause| " << endl; @@ -1223,6 +1349,11 @@ AstLimitClause::~AstLimitClause() { delete row_count_; } +RetCode AstLimitClause::SetScanAttrList(SemanticContext* sem_cnxt) { + if (offset_ != NULL) offset_->SetScanAttrList(sem_cnxt); + if (row_count_ != NULL) row_count_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstLimitClause::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|limit clause| " << endl; @@ -1311,6 +1442,10 @@ AstColumn::AstColumn(AstColumn* node) } AstColumn::~AstColumn() { delete next_; } +RetCode AstColumn::SetScanAttrList(SemanticContext* sem_cnxt) { +// next_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstColumn::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|column| " << expr_str_ << endl; @@ -1328,6 +1463,7 @@ RetCode AstColumn::SemanticAnalisys(SemanticContext* sem_cnxt) { RetCode ret = rSuccess; if (AST_COLUMN_ALL_ALL == ast_node_type_) { if (SemanticContext::kSelectClause == sem_cnxt->clause_type_) { + sem_cnxt->is_all = true; return rSuccess; } else { return rColumnAllShouldNotInOtherClause; @@ -1342,6 +1478,9 @@ RetCode AstColumn::SemanticAnalisys(SemanticContext* sem_cnxt) { } else { return rColumnAllShouldNotInOtherClause; } + // insert * means all column in relation + sem_cnxt->is_all = false; + sem_cnxt->table_to_column[relation_name_].insert("*"); return rSuccess; } ret = sem_cnxt->IsColumnExist(relation_name_, column_name_); @@ -1352,6 +1491,8 @@ RetCode AstColumn::SemanticAnalisys(SemanticContext* sem_cnxt) { "column: '\e[1m" + column_name_ + "\e[0m' is invalid"; return ret; } + sem_cnxt->is_all = false; + sem_cnxt->table_to_column[relation_name_].insert(column_name_); if (NULL != next_) { return next_->SemanticAnalisys(sem_cnxt); } @@ -1394,6 +1535,7 @@ RetCode AstColumn::GetLogicalPlan(ExprNode*& logic_expr, LogicalOperator* const right_lplan) { Attribute ret_lattr = left_lplan->GetPlanContext().GetAttribute( string(relation_name_ + "." + column_name_)); + if (NULL != right_lplan) { Attribute ret_rattr = right_lplan->GetPlanContext().GetAttribute( string(relation_name_ + "." + column_name_)); @@ -1477,6 +1619,18 @@ AstSelectStmt::~AstSelectStmt() { delete select_into_clause_; } +RetCode AstSelectStmt::SetScanAttrList(SemanticContext* sem_cnxt) { + select_list_->SetScanAttrList(sem_cnxt); + if (from_list_ != NULL) from_list_->SetScanAttrList(sem_cnxt); + if (where_clause_ != NULL) where_clause_->SetScanAttrList(sem_cnxt); + if (groupby_clause_ != NULL) groupby_clause_->SetScanAttrList(sem_cnxt); + if (having_clause_ != NULL) having_clause_->SetScanAttrList(sem_cnxt); + if (orderby_clause_ != NULL) orderby_clause_->SetScanAttrList(sem_cnxt); + if (limit_clause_ != NULL) limit_clause_->SetScanAttrList(sem_cnxt); + if (select_into_clause_ != NULL) + select_into_clause_->SetScanAttrList(sem_cnxt); + return rSuccess; +} void AstSelectStmt::Print(int level) const { cout << setw(level * TAB_SIZE) << " " << "|select statement| " << endl; diff --git a/sql_parser/ast_node/ast_select_stmt.h b/sql_parser/ast_node/ast_select_stmt.h index cb3343f57..bc7b28863 100644 --- a/sql_parser/ast_node/ast_select_stmt.h +++ b/sql_parser/ast_node/ast_select_stmt.h @@ -53,7 +53,7 @@ class AstSelectList : public AstNode { void ReplaceAggregation(AstNode*& agg_column, set& agg_node, bool need_collect); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); bool is_all_; AstNode* args_; AstNode* next_; @@ -70,6 +70,7 @@ class AstSelectExpr : public AstNode { void RecoverExprName(string& name); void ReplaceAggregation(AstNode*& agg_column, set& agg_node, bool need_collect); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); string expr_alias_; AstNode* expr_; bool have_agg_func_; @@ -87,7 +88,7 @@ class AstFromList : public AstNode { RetCode SemanticAnalisys(SemanticContext* sem_cnxt); RetCode PushDownCondition(PushDownConditionContext& pdccnxt); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); map table_joined_root; AstNode* args_; AstNode* next_; @@ -109,13 +110,15 @@ class AstTable : public AstNode { RetCode SemanticAnalisys(SemanticContext* sem_cnxt); RetCode PushDownCondition(PushDownConditionContext& pdccnxt); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); vector equal_join_condition_; vector normal_condition_; string db_name_; string table_name_; string table_alias_; + set columns_; int table_id_; + bool is_all_; // AstNode* condition_; // }; /** @@ -132,7 +135,7 @@ class AstSubquery : public AstNode { RetCode SemanticAnalisys(SemanticContext* sem_cnxt); RetCode PushDownCondition(PushDownConditionContext& pdccnxt); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); string subquery_alias_; AstNode* subquery_; vector equal_join_condition_; @@ -148,6 +151,7 @@ class AstJoinCondition : public AstNode { ~AstJoinCondition(); void Print(int level = 0) const; RetCode SemanticAnalisys(SemanticContext* sem_cnxt); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); string join_condition_type_; AstNode* condition_; }; @@ -166,7 +170,7 @@ class AstJoin : public AstNode { RetCode PushDownCondition(PushDownConditionContext& pdccnxt); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); RetCode GetFilterLogicalPlan(LogicalOperator*& logic_plan); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); string join_type_; AstNode* left_table_; AstNode* right_table_; @@ -184,6 +188,7 @@ class AstWhereClause : public AstNode { void Print(int level = 0) const; RetCode SemanticAnalisys(SemanticContext* sem_cnxt); void RecoverExprName(string& name); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstNode* expr_; }; /** @@ -199,7 +204,7 @@ class AstGroupByList : public AstNode { RetCode SemanticAnalisys(SemanticContext* sem_cnxt); void RecoverExprName(string& name); RetCode SolveSelectAlias(SelectAliasSolver* const select_alias_solver); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstNode* expr_; AstNode* next_; }; @@ -215,6 +220,7 @@ class AstGroupByClause : public AstNode { RetCode SemanticAnalisys(SemanticContext* sem_cnxt); void RecoverExprName(string& name); RetCode SolveSelectAlias(SelectAliasSolver* const select_alias_solver); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstGroupByList* groupby_list_; bool with_roolup_; }; @@ -232,6 +238,7 @@ class AstOrderByList : public AstNode { void ReplaceAggregation(AstNode*& agg_column, set& agg_node, bool need_collect); RetCode SolveSelectAlias(SelectAliasSolver* const select_alias_solver); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstNode* expr_; string orderby_direction_; AstNode* next_; @@ -250,6 +257,7 @@ class AstOrderByClause : public AstNode { bool need_collect); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); RetCode SolveSelectAlias(SelectAliasSolver* const select_alias_solver); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstOrderByList* orderby_list_; }; /** @@ -266,6 +274,7 @@ class AstHavingClause : public AstNode { bool need_collect); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); RetCode SolveSelectAlias(SelectAliasSolver* const select_alias_solver); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstNode* expr_; }; /** @@ -279,6 +288,7 @@ class AstLimitClause : public AstNode { void Print(int level = 0) const; RetCode SemanticAnalisys(SemanticContext* sem_cnxt); RetCode GetLogicalPlan(LogicalOperator*& logic_plan); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); AstNode* offset_; AstNode* row_count_; }; @@ -308,12 +318,12 @@ class AstColumn : public AstNode { RetCode SemanticAnalisys(SemanticContext* sem_cnxt); void RecoverExprName(string& name); void GetRefTable(set& ref_table); - RetCode GetLogicalPlan(ExprNode*& logic_expr, LogicalOperator* const left_lplan, LogicalOperator* const right_lplan); RetCode SolveSelectAlias(SelectAliasSolver* const select_alias_solver); AstNode* AstNodeCopy(); + RetCode SetScanAttrList(SemanticContext* sem_cnxt); string relation_name_; string column_name_; AstNode* next_; @@ -345,7 +355,7 @@ class AstSelectStmt : public AstNode { RetCode GetLogicalPlan(LogicalOperator*& logic_plan); RetCode GetLogicalPlanOfAggeration(LogicalOperator*& logic_plan); RetCode GetLogicalPlanOfProject(LogicalOperator*& logic_plan); - + RetCode SetScanAttrList(SemanticContext* sem_cnxt); SelectOpts select_opts_; AstNode* select_list_; AstNode* from_list_; diff --git a/stmt_handler/select_exec.cpp b/stmt_handler/select_exec.cpp index ddb048506..d45872b9f 100644 --- a/stmt_handler/select_exec.cpp +++ b/stmt_handler/select_exec.cpp @@ -36,6 +36,8 @@ #include #include #include +#include +#include #include "../common/error_define.h" #include "../common/ids.h" @@ -143,6 +145,7 @@ RetCode SelectExec::Execute() { LOG(ERROR) << "semantic analysis error result= : " << ret; return ret; } + #ifdef PRINTCONTEXT select_ast_->Print(); cout << "--------------begin push down condition ------------" << endl; @@ -157,10 +160,18 @@ RetCode SelectExec::Execute() { cout << stmt_exec_status_->get_exec_info(); return ret; } -#ifndef PRINTCONTEXT - select_ast_->Print(); +//#ifndef PRINTCONTEXT + ret = select_ast_->SetScanAttrList(&sem_cnxt); + if (rSuccess != ret) { + stmt_exec_status_->set_exec_info("semantic analysis error \n" + + sem_cnxt.error_msg_); + stmt_exec_status_->set_exec_status(StmtExecStatus::ExecStatus::kError); + LOG(ERROR) << " Set Scan Attribute list error result= : " << ret; + return ret; + } +// select_ast_->Print(); cout << "--------------begin logical plan -------------------" << endl; -#endif +//#endif LogicalOperator* logic_plan = NULL; ret = select_ast_->GetLogicalPlan(logic_plan); @@ -176,6 +187,12 @@ RetCode SelectExec::Execute() { logic_plan = new LogicalQueryPlanRoot(0, logic_plan, raw_sql_, LogicalQueryPlanRoot::kResultCollector); logic_plan->GetPlanContext(); + + if (Config::enable_prune_column) { + set attrs; + logic_plan->PruneProj(attrs); + } + logic_plan->GetPlanContext(); #ifndef PRINTCONTEXT logic_plan->Print(); cout << "--------------begin physical plan -------------------" << endl;