diff --git a/NOTICE b/NOTICE new file mode 100644 index 00000000000..3fbb523f72b --- /dev/null +++ b/NOTICE @@ -0,0 +1,195 @@ +// ------------------------------------------------------------------ +// NOTICE file corresponding to the section 4d of The Apache License, +// Version 2.0, in this case for +// ------------------------------------------------------------------ + +abseil-cpp +Revision: 215105818dfde3174fe799600bb0f3cae233d0bf +Address: https://github.com/abseil/abseil-cpp +License: https://github.com/abseil/abseil-cpp/blob/master/LICENSE +-------------------------------------- +benchmark +Revision: f91b6b42b1b9854772a90ae9501464a161707d1e +Address: https://github.com/google/benchmark +License: https://github.com/google/benchmark/blob/main/LICENSE +-------------------------------------- +boost +Revision: f7aa8cd1fea2dcd85b54542d19200c8f8957a0aa +Address: https://github.com/boostorg/boost +License: https://github.com/boostorg/boost/blob/master/LICENSE_1_0.txt +-------------------------------------- +boringssl +Revision: f6ef1c560ae5af51e2df5d8d2175bed207b28b8f +Address: https://github.com/google/boringssl +License: https://github.com/google/boringssl/blob/master/LICENSE +-------------------------------------- +cctz +Revision: 79e037678a5fce43eb7a3584508a67366dc2111d +Address: https://github.com/google/cctz +License: https://github.com/google/cctz/blob/master/LICENSE.txt +-------------------------------------- +client-c +Revision: 0c834b9fe189841e078046b7b66d4960acc339fc +Address: https://github.com/tikv/client-c +License: https://github.com/tikv/client-c/blob/master/LICENSE +-------------------------------------- +cpptoml +Revision: fededad7169e538ca47e11a9ee9251bc361a9a65 +Address: https://github.com/skystrife/cpptoml +License: https://github.com/skystrife/cpptoml/blob/master/LICENSE +-------------------------------------- +cpu_features +Revision: e38dc6d2a8a855dc4bcea421c23eaf68ddd671e6 +Address: https://github.com/google/cpu_features +License: https://github.com/google/cpu_features/blob/main/LICENSE +-------------------------------------- +curl +Revision: 801bd5138ce31aa0d906fa4e2eabfc599d74e793 +Address: https://github.com/curl/curl +License: https://github.com/curl/curl/blob/master/COPYING +-------------------------------------- +double-conversion +Revision: cf2f0f3d547dc73b4612028a155b80536902ba02 +Address: https://github.com/google/double-conversion +License: https://github.com/google/double-conversion/blob/master/LICENSE +-------------------------------------- +fmtlib +Revision: d141cdbeb0fb422a3fb7173b285fd38e0d1772dc +Address: https://github.com/fmtlib/fmt +License: https://github.com/fmtlib/fmt/blob/master/LICENSE.rst +-------------------------------------- +google-test +Revision: d175c8bf823e709d570772b038757fadf63bc632 +Address: https://github.com/google/googletest +License: https://github.com/google/googletest/blob/main/LICENSE +-------------------------------------- +grpc +Revision: de893acb6aef88484a427e64b96727e4926fdcfd +Address: https://github.com/grpc/grpc +License: https://github.com/grpc/grpc/blob/master/LICENSE +-------------------------------------- +jemalloc +Revision: ea6b3e973b477b8061e0076bb257dbd7f3faa756 +Address: https://github.com/jemalloc/jemalloc +License: https://github.com/jemalloc/jemalloc/blob/dev/COPYING +-------------------------------------- +junction +Revision: 5ad3be7ce1d3f16b9f7ed6065bbfeacd2d629a08 +Address: https://github.com/preshing/junction +License: https://github.com/preshing/junction/blob/master/LICENSE +-------------------------------------- +kvproto +Revision: 714e05ea3b18f9f89e88ae88db693bd5e0b383a2 +Address: https://github.com/pingcap/kvproto +License: https://github.com/pingcap/kvproto/blob/master/LICENSE +-------------------------------------- +btrie +Revision: ddb0dcd2fd4f093d9622f68ee48b61655348a512 +Address: https://github.com/ClickHouse/ClickHouse/tree/v1.1.54381-stable/contrib/libbtrie +License: https://github.com/ClickHouse/ClickHouse/blob/v1.1.54381-stable/contrib/libbtrie/LICENSE +-------------------------------------- +cityhash +Revision: caf83a650e88f5c1ac3da33dfc9409c5314f4554 +Address: https://github.com/ClickHouse/ClickHouse/tree/v1.1.54381-stable/contrib/libcityhash +License: https://github.com/ClickHouse/ClickHouse/blob/v1.1.54381-stable/contrib/libcityhash/COPYING +-------------------------------------- +cpuid +Revision: ac178bde1fa3c27adb97d8dfcae2992074c88caa +Address: https://github.com/anrieff/libcpuid +License: https://github.com/anrieff/libcpuid/blob/master/COPYING +-------------------------------------- +divide(lbdivide) +Revision: fdbafd427f329b3c1ccafcdc13a354e88c596787 +Address: https://github.com/ridiculousfish/libdivide +License: https://github.com/ridiculousfish/libdivide/blob/master/LICENSE.txt +-------------------------------------- +farmhash +Revision: afdb2e459d030afd1f3b6116545397509326385e +Address: https://github.com/google/farmhash +License: https://github.com/google/farmhash/blob/master/COPYING +-------------------------------------- +pcg-random +Revision: 93f1e274fb29b6a7cb20676252085a19d8d5a62d +Address: https://github.com/ClickHouse/ClickHouse/tree/v1.1.54381-stable/contrib/libpcg-random +License: https://github.com/ClickHouse/ClickHouse/blob/v1.1.54381-stable/contrib/libpcg-random/LICENSE-APACHE.txt +-------------------------------------- +metrohash +Revision: d6833a0d5569a9dbd4d1cdf67d95015712f13cb6 +Address: https://github.com/ClickHouse/ClickHouse/tree/v1.1.54381-stable/contrib/libmetrohash +License: https://github.com/ClickHouse/ClickHouse/blob/v1.1.54381-stable/contrib/libmetrohash/LICENSE +-------------------------------------- +sparsehash +Revision: 21d37dbaa45742c0bd23cc1e5a70b52cbc27f809 +Address: https://github.com/ClickHouse/ClickHouse/tree/b13313eecca9455f4fdc923f597fe863df409742/contrib/libsparsehash +License: https://github.com/ClickHouse/ClickHouse/blob/b13313eecca9455f4fdc923f597fe863df409742/contrib/libsparsehash/COPYING +-------------------------------------- +tcmalloc +Revision: dde32f8bbc95312379f9f5a651799815bb6327c5 +Address: https://github.com/ClickHouse/ClickHouse/tree/b13313eecca9455f4fdc923f597fe863df409742/contrib/libtcmalloc +License: https://github.com/ClickHouse/ClickHouse/blob/b13313eecca9455f4fdc923f597fe863df409742/contrib/libtcmalloc/COPYING +-------------------------------------- +libunwind +Revision: 19a2c01b1e8ac20871ea09d20f596d425ba53aed +Address: https://github.com/libunwind/libunwind +License: https://github.com/libunwind/libunwind/blob/master/LICENSE +-------------------------------------- +lz4 +Revision: d44371841a2f1728a3f36839fd4b7e872d0927d3 +Address: https://github.com/lz4/lz4 +License: https://github.com/lz4/lz4/blob/dev/LICENSE +-------------------------------------- +mimalloc +Revision: dc6bce256d4f3ce87761f9337977dff3d8b1776c +Address: https://github.com/microsoft/mimalloc +License: https://github.com/microsoft/mimalloc/blob/master/LICENSE +-------------------------------------- +poco +Revision: e411ea34492bf26e76b4e4a03ff5813a0c779240 +Address: https://github.com/ClickHouse-Extras/poco +License: https://github.com/ClickHouse-Extras/poco/blob/clickhouse/LICENSE +-------------------------------------- +prometheus-cpp +Revision: ca1f3463e74d957d1cccddd4a1a29e3e5d34bd83 +Address: https://github.com/jupp0r/prometheus-cpp +License: https://github.com/jupp0r/prometheus-cpp/blob/master/LICENSE +-------------------------------------- +protobuf +Revision: 09745575a923640154bcf307fba8aedff47f240a +Address: https://github.com/protocolbuffers/protobuf +License: https://github.com/protocolbuffers/protobuf/blob/master/LICENSE +-------------------------------------- +re2 +Revision: 7cf8b88e8f70f97fd4926b56aa87e7f53b2717e0 +Address: https://github.com/google/re2 +License: https://github.com/google/re2/blob/main/LICENSE +-------------------------------------- +tiflash-proxy +Revision: 1e3f15fdb93a7ae41958d81c168d9e25ef3d4570 +Address: https://github.com/pingcap/tidb-engine-ext +License: https://github.com/pingcap/tidb-engine-ext/blob/raftstore-proxy/LICENSE +-------------------------------------- +tipb +Revision: 0e3817b1f556337705053dac55606d04030bf1a0 +Address: https://github.com/pingcap/tipb +License: https://github.com/pingcap/tipb/blob/master/LICENSE +-------------------------------------- +xxhash +Revision: 94e5f23e736f2bb67ebdf90727353e65344f9fc0 +Address: https://github.com/Cyan4973/xxHash +License: https://github.com/Cyan4973/xxHash/blob/dev/LICENSE +-------------------------------------- +zlib-ng +Revision: b56a2fd0b126cfe5f13e68ab9090cd4f6a773286 +Address: https://github.com/zlib-ng/zlib-ng +License: https://github.com/zlib-ng/zlib-ng/blob/develop/LICENSE.md +-------------------------------------- +zstd +Revision: 791626dfb92acf4a3d3ba0342636b0dd82848e01 +Address: https://github.com/facebook/zstd +License: https://github.com/facebook/zstd/blob/dev/LICENSE +-------------------------------------- +ClickHouse +Revision: 30fcaeb2a3fff1bf894aae9c776bed7fd83f783f +Address: https://github.com/ClickHouse/ClickHouse +License: https://github.com/ClickHouse/ClickHouse/blob/master/LICENSE + diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index f8f52bdeefa..0ec728c3f18 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -35,9 +35,119 @@ DAGExpressionAnalyzer::DAGExpressionAnalyzer(std::vector source , settings(context.getSettingsRef()) {} +extern const String count_second_stage; + +namespace +{ +bool isUInt8Type(const DataTypePtr & type) +{ + return removeNullable(type)->getTypeId() == TypeIndex::UInt8; +} + +tipb::Expr constructTZExpr(const TimezoneInfo & dag_timezone_info) +{ + return dag_timezone_info.is_name_based + ? constructStringLiteralTiExpr(dag_timezone_info.timezone_name) + : constructInt64LiteralTiExpr(dag_timezone_info.timezone_offset); +} + +String getAggFuncName( + const tipb::Expr & expr, + const tipb::Aggregation & agg, + const Settings & settings) +{ + String agg_func_name = getAggFunctionName(expr); + + static const String count_distinct_func_name = "countDistinct"; + if (expr.has_distinct() && agg_func_name == count_distinct_func_name) + return settings.count_distinct_implementation; + + static const String sum_func_name = "sum"; + if (agg.group_by_size() == 0 && agg_func_name == sum_func_name && expr.has_field_type() + && !getDataTypeByFieldTypeForComputingLayer(expr.field_type())->isNullable()) + { + /// this is a little hack: if the query does not have group by column, and the result of sum is not nullable, then the sum + /// must be the second stage for count, in this case we should return 0 instead of null if the input is empty. + return count_second_stage; + } + + return agg_func_name; +} + +/// return `duplicated agg function`->getReturnType if duplicated. +/// or not return nullptr. +DataTypePtr findDuplicateAggFunc( + const String & func_string, + const AggregateDescriptions & aggregate_descriptions) +{ + for (const auto & aggregated : aggregate_descriptions) + { + if (aggregated.column_name == func_string) + { + auto return_type = aggregated.function->getReturnType(); + assert(return_type); + return return_type; + } + } + return nullptr; +} + +/// Generate AggregateDescription and append it to AggregateDescriptions if need. +/// And append output column to aggregated_columns. +void appendAggDescription( + const Names & arg_names, + const DataTypes & arg_types, + TiDB::TiDBCollators & arg_collators, + const String & agg_func_name, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns, + bool empty_input_as_null) +{ + assert(arg_names.size() == arg_collators.size() && arg_names.size() == arg_types.size()); + + AggregateDescription aggregate; + aggregate.argument_names = arg_names; + String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators); + if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions)) + { + // agg function duplicate, don't need to build again. + aggregated_columns.emplace_back(func_string, duplicated_return_type); + return; + } + + aggregate.column_name = func_string; + aggregate.parameters = Array(); + aggregate.function = AggregateFunctionFactory::instance().get(agg_func_name, arg_types, {}, 0, empty_input_as_null); + aggregate.function->setCollators(arg_collators); + + DataTypePtr result_type = aggregate.function->getReturnType(); + aggregated_columns.emplace_back(func_string, aggregate.function->getReturnType()); + + aggregate_descriptions.push_back(std::move(aggregate)); +} +} // namespace + +ExpressionActionsChain::Step & DAGExpressionAnalyzer::initAndGetLastStep(ExpressionActionsChain & chain) const +{ + initChain(chain, getCurrentInputColumns()); + return chain.getLastStep(); +} + +void DAGExpressionAnalyzer::fillAggArgumentDetail( + const ExpressionActionsPtr & actions, + const tipb::Expr & arg, + Names & arg_names, + DataTypes & arg_types, + TiDB::TiDBCollators & arg_collators) +{ + arg_names.push_back(getActions(arg, actions)); + arg_types.push_back(actions->getSampleBlock().getByName(arg_names.back()).type); + arg_collators.push_back(removeNullable(arg_types.back())->isString() ? getCollatorFromExpr(arg) : nullptr); +} + void DAGExpressionAnalyzer::buildGroupConcat( const tipb::Expr & expr, - ExpressionActionsChain::Step & step, + const ExpressionActionsPtr & actions, const String & agg_func_name, AggregateDescriptions & aggregate_descriptions, NamesAndTypes & aggregated_columns, @@ -52,37 +162,32 @@ void DAGExpressionAnalyzer::buildGroupConcat( bool only_one_column = true; TiDB::TiDBCollators arg_collators; String arg_name; + DataTypes types; /// more than one args will be combined to one - DataTypes types(1); - aggregate.argument_names.resize(1); if (child_size == 1 && expr.order_by_size() == 0) { /// only one arg - arg_name = getActions(expr.children(0), step.actions); - types[0] = step.actions->getSampleBlock().getByName(arg_name).type; + Names arg_names; + fillAggArgumentDetail(actions, expr.children(0), arg_names, types, arg_collators); + arg_name = arg_names.back(); all_columns_names_and_types.emplace_back(arg_name, types[0]); - if (removeNullable(types[0])->isString()) - arg_collators.push_back(getCollatorFromExpr(expr.children(0))); - else - arg_collators.push_back(nullptr); } else { /// args... -> tuple(args...) - arg_name = buildTupleFunctionForGroupConcat(expr, sort_description, all_columns_names_and_types, arg_collators, step.actions); + arg_name = buildTupleFunctionForGroupConcat(expr, sort_description, all_columns_names_and_types, arg_collators, actions); only_one_column = false; - types[0] = step.actions->getSampleBlock().getByName(arg_name).type; + types.push_back(actions->getSampleBlock().getByName(arg_name).type); } - aggregate.argument_names[0] = arg_name; - step.required_output.push_back(arg_name); + aggregate.argument_names.push_back(arg_name); /// the separator - arg_name = getActions(expr.children(child_size), step.actions); + arg_name = getActions(expr.children(child_size), actions); if (expr.children(child_size).tp() == tipb::String) { const ColumnConst * col_delim - = checkAndGetColumnConstStringOrFixedString(step.actions->getSampleBlock().getByName(arg_name).column.get()); + = checkAndGetColumnConstStringOrFixedString(actions->getSampleBlock().getByName(arg_name).column.get()); if (col_delim == nullptr) { throw Exception("the separator of group concat should not be invalid!"); @@ -90,15 +195,12 @@ void DAGExpressionAnalyzer::buildGroupConcat( delimiter = col_delim->getValue(); } + String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators); /// return directly if the agg is duplicated - String func_string = DAGExpressionAnalyzerHelper::genFuncString(agg_func_name, aggregate.argument_names, arg_collators); - for (const auto & pre_agg : aggregate_descriptions) + if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions)) { - if (pre_agg.column_name == func_string) - { - aggregated_columns.emplace_back(func_string, pre_agg.function->getReturnType()); - return; - } + aggregated_columns.emplace_back(func_string, duplicated_return_type); + return; } aggregate.column_name = func_string; @@ -154,111 +256,39 @@ void DAGExpressionAnalyzer::buildGroupConcat( aggregated_columns.emplace_back(func_string, result_type); } -extern const String count_second_stage; - -static String getAggFuncName( - const tipb::Expr & expr, - const tipb::Aggregation & agg, - const Settings & settings) -{ - String agg_func_name = getAggFunctionName(expr); - if (expr.has_distinct() && Poco::toLower(agg_func_name) == "countdistinct") - return settings.count_distinct_implementation; - if (agg.group_by_size() == 0 && agg_func_name == "sum" && expr.has_field_type() - && !getDataTypeByFieldTypeForComputingLayer(expr.field_type())->isNullable()) - { - /// this is a little hack: if the query does not have group by column, and the result of sum is not nullable, then the sum - /// must be the second stage for count, in this case we should return 0 instead of null if the input is empty. - return count_second_stage; - } - return agg_func_name; -} - void DAGExpressionAnalyzer::buildCommonAggFunc( const tipb::Expr & expr, - ExpressionActionsChain::Step & step, + const ExpressionActionsPtr & actions, const String & agg_func_name, AggregateDescriptions & aggregate_descriptions, NamesAndTypes & aggregated_columns, bool empty_input_as_null) { - AggregateDescription aggregate; auto child_size = expr.children_size(); - DataTypes types(child_size); + Names arg_names; + DataTypes arg_types; TiDB::TiDBCollators arg_collators; - aggregate.argument_names.resize(child_size); - for (Int32 i = 0; i < child_size; i++) - { - String arg_name = getActions(expr.children(i), step.actions); - types[i] = step.actions->getSampleBlock().getByName(arg_name).type; - if (removeNullable(types[i])->isString()) - arg_collators.push_back(getCollatorFromExpr(expr.children(i))); - else - arg_collators.push_back(nullptr); - aggregate.argument_names[i] = arg_name; - step.required_output.push_back(arg_name); - } - String func_string = DAGExpressionAnalyzerHelper::genFuncString(agg_func_name, aggregate.argument_names, arg_collators); - bool duplicate = false; - for (const auto & pre_agg : aggregate_descriptions) + for (Int32 i = 0; i < child_size; ++i) { - if (pre_agg.column_name == func_string) - { - aggregated_columns.emplace_back(func_string, pre_agg.function->getReturnType()); - duplicate = true; - break; - } + fillAggArgumentDetail(actions, expr.children(i), arg_names, arg_types, arg_collators); } - if (duplicate) - return; - aggregate.column_name = func_string; - aggregate.parameters = Array(); - aggregate.function = AggregateFunctionFactory::instance().get(agg_func_name, types, {}, 0, empty_input_as_null); - aggregate.function->setCollators(arg_collators); - aggregate_descriptions.push_back(aggregate); - DataTypePtr result_type = aggregate.function->getReturnType(); - // this is a temp result since implicit cast maybe added on these aggregated_columns - aggregated_columns.emplace_back(func_string, result_type); + + appendAggDescription(arg_names, arg_types, arg_collators, agg_func_name, aggregate_descriptions, aggregated_columns, empty_input_as_null); } -std::tuple DAGExpressionAnalyzer::appendAggregation( - ExpressionActionsChain & chain, - const tipb::Aggregation & agg, - bool group_by_collation_sensitive) +void DAGExpressionAnalyzer::buildAggGroupBy( + const google::protobuf::RepeatedPtrField & group_by, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns, + Names & aggregation_keys, + std::unordered_set & agg_key_set, + bool group_by_collation_sensitive, + TiDB::TiDBCollators & collators) { - if (agg.group_by_size() == 0 && agg.agg_func_size() == 0) + for (const tipb::Expr & expr : group_by) { - //should not reach here - throw TiFlashException("Aggregation executor without group by/agg exprs", Errors::Coprocessor::BadRequest); - } - - Names aggregation_keys; - TiDB::TiDBCollators collators; - AggregateDescriptions aggregate_descriptions; - NamesAndTypes aggregated_columns; - - initChain(chain, getCurrentInputColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); - std::unordered_set agg_key_set; - - for (const tipb::Expr & expr : agg.agg_func()) - { - if (expr.tp() == tipb::ExprType::GroupConcat) - { - buildGroupConcat(expr, step, getAggFuncName(expr, agg, settings), aggregate_descriptions, aggregated_columns, agg.group_by().empty()); - } - else - { - /// if there is group by clause, there is no need to consider the empty input case - bool empty_input_as_null = agg.group_by().empty(); - buildCommonAggFunc(expr, step, getAggFuncName(expr, agg, settings), aggregate_descriptions, aggregated_columns, empty_input_as_null); - } - } - - for (const tipb::Expr & expr : agg.group_by()) - { - String name = getActions(expr, step.actions); - step.required_output.push_back(name); + String name = getActions(expr, actions); bool duplicated_key = agg_key_set.find(name) != agg_key_set.end(); if (!duplicated_key) { @@ -274,7 +304,7 @@ std::tuplegetSampleBlock().getByName(name).type; + auto type = actions->getSampleBlock().getByName(name).type; TiDB::TiDBCollatorPtr collator = nullptr; if (removeNullable(type)->isString()) collator = getCollatorFromExpr(expr); @@ -285,62 +315,85 @@ std::tuplegetReturnType()); - duplicate = true; - break; - } - } - if (duplicate) - continue; - aggregate.column_name = func_string; - aggregate.parameters = Array(); - aggregate.function = AggregateFunctionFactory::instance().get(agg_func_name, types, {}, 0, false); - aggregate.function->setCollators(arg_collators); - aggregate_descriptions.push_back(aggregate); - DataTypePtr result_type = aggregate.function->getReturnType(); - // this is a temp result since implicit cast maybe added on these aggregated_columns - aggregated_columns.emplace_back(func_string, result_type); + TiDB::TiDBCollators arg_collators{collator}; + appendAggDescription({name}, {type}, arg_collators, "any", aggregate_descriptions, aggregated_columns, false); } else { - aggregated_columns.emplace_back(name, step.actions->getSampleBlock().getByName(name).type); + aggregated_columns.emplace_back(name, actions->getSampleBlock().getByName(name).type); } } else { - // this is a temp result since implicit cast maybe added on these aggregated_columns - aggregated_columns.emplace_back(name, step.actions->getSampleBlock().getByName(name).type); + aggregated_columns.emplace_back(name, actions->getSampleBlock().getByName(name).type); } } - source_columns = aggregated_columns; +} + +void DAGExpressionAnalyzer::buildAggFuncs( + const tipb::Aggregation & aggregation, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns) +{ + for (const tipb::Expr & expr : aggregation.agg_func()) + { + if (expr.tp() == tipb::ExprType::GroupConcat) + { + buildGroupConcat(expr, actions, getAggFuncName(expr, aggregation, settings), aggregate_descriptions, aggregated_columns, aggregation.group_by().empty()); + } + else + { + /// if there is group by clause, there is no need to consider the empty input case + bool empty_input_as_null = aggregation.group_by().empty(); + buildCommonAggFunc(expr, actions, getAggFuncName(expr, aggregation, settings), aggregate_descriptions, aggregated_columns, empty_input_as_null); + } + } +} + +std::tuple DAGExpressionAnalyzer::appendAggregation( + ExpressionActionsChain & chain, + const tipb::Aggregation & agg, + bool group_by_collation_sensitive) +{ + if (agg.group_by_size() == 0 && agg.agg_func_size() == 0) + { + //should not reach here + throw TiFlashException("Aggregation executor without group by/agg exprs", Errors::Coprocessor::BadRequest); + } + + auto & step = initAndGetLastStep(chain); + + NamesAndTypes aggregated_columns; + AggregateDescriptions aggregate_descriptions; + Names aggregation_keys; + TiDB::TiDBCollators collators; + std::unordered_set agg_key_set; + buildAggFuncs(agg, step.actions, aggregate_descriptions, aggregated_columns); + buildAggGroupBy(agg.group_by(), step.actions, aggregate_descriptions, aggregated_columns, aggregation_keys, agg_key_set, group_by_collation_sensitive, collators); + // set required output for agg funcs's arguments and group by keys. + for (const auto & aggregate_description : aggregate_descriptions) + { + for (const auto & argument_name : aggregate_description.argument_names) + step.required_output.push_back(argument_name); + } + for (const auto & aggregation_key : aggregation_keys) + step.required_output.push_back(aggregation_key); + + source_columns = std::move(aggregated_columns); auto before_agg = chain.getLastActions(); chain.finalize(); chain.clear(); - appendCastAfterAgg(chain, agg); - return {aggregation_keys, collators, aggregate_descriptions, before_agg}; -} -bool isUInt8Type(const DataTypePtr & type) -{ - auto non_nullable_type = type->isNullable() ? std::dynamic_pointer_cast(type)->getNestedType() : type; - return std::dynamic_pointer_cast(non_nullable_type) != nullptr; + initChain(chain, getCurrentInputColumns()); + const auto & actions = chain.getLastActions(); + appendCastAfterAgg(actions, agg); + // after appendCastAfterAgg, current input columns has been modified. + for (const auto & column : getCurrentInputColumns()) + step.required_output.push_back(column.name); + + return {aggregation_keys, collators, aggregate_descriptions, before_agg}; } String DAGExpressionAnalyzer::applyFunction( @@ -349,7 +402,7 @@ String DAGExpressionAnalyzer::applyFunction( const ExpressionActionsPtr & actions, const TiDB::TiDBCollatorPtr & collator) { - String result_name = DAGExpressionAnalyzerHelper::genFuncString(func_name, arg_names, {collator}); + String result_name = genFuncString(func_name, arg_names, {collator}); if (actions->getSampleBlock().has(result_name)) return result_name; const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context); @@ -393,8 +446,7 @@ String DAGExpressionAnalyzer::appendWhere( ExpressionActionsChain & chain, const std::vector & conditions) { - initChain(chain, getCurrentInputColumns()); - ExpressionActionsChain::Step & last_step = chain.steps.back(); + auto & last_step = initAndGetLastStep(chain); String filter_column_name = buildFilterColumn(last_step.actions, conditions); @@ -476,8 +528,7 @@ std::vector DAGExpressionAnalyzer::appendOrderBy( throw TiFlashException("TopN executor without order by exprs", Errors::Coprocessor::BadRequest); } - initChain(chain, getCurrentInputColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + auto & step = initAndGetLastStep(chain); auto order_columns = buildOrderColumns(step.actions, topN.order_by()); assert(static_cast(order_columns.size()) == topN.order_by_size()); @@ -492,14 +543,6 @@ const std::vector & DAGExpressionAnalyzer::getCurrentInputColum return source_columns; } -tipb::Expr constructTZExpr(const TimezoneInfo & dag_timezone_info) -{ - if (dag_timezone_info.is_name_based) - return constructStringLiteralTiExpr(dag_timezone_info.timezone_name); - else - return constructInt64LiteralTiExpr(dag_timezone_info.timezone_offset); -} - String DAGExpressionAnalyzer::appendTimeZoneCast( const String & tz_col, const String & ts_col, @@ -562,8 +605,7 @@ bool DAGExpressionAnalyzer::appendExtraCastsAfterTS( const std::vector & need_cast_column, const tipb::TableScan & table_scan) { - initChain(chain, getCurrentInputColumns()); - ExpressionActionsChain::Step & step = chain.getLastStep(); + auto & step = initAndGetLastStep(chain); bool has_cast = buildExtraCastsAfterTS(step.actions, need_cast_column, table_scan.columns()); @@ -715,28 +757,23 @@ bool DAGExpressionAnalyzer::appendJoinKeyAndJoinFilters( } void DAGExpressionAnalyzer::appendCastAfterAgg( - ExpressionActionsChain & chain, + const ExpressionActionsPtr & actions, const tipb::Aggregation & aggregation) { - initChain(chain, getCurrentInputColumns()); - bool need_update_source_columns = false; std::vector updated_aggregated_columns; - ExpressionActionsChain::Step & step = chain.steps.back(); auto update_cast_column = [&](const tipb::Expr & expr, const NameAndTypePair & origin_column) { - String updated_name = appendCastIfNeeded(expr, step.actions, origin_column.name); + String updated_name = appendCastIfNeeded(expr, actions, origin_column.name); if (origin_column.name != updated_name) { - DataTypePtr type = step.actions->getSampleBlock().getByName(updated_name).type; + DataTypePtr type = actions->getSampleBlock().getByName(updated_name).type; updated_aggregated_columns.emplace_back(updated_name, type); - step.required_output.push_back(updated_name); need_update_source_columns = true; } else { updated_aggregated_columns.emplace_back(origin_column.name, origin_column.type); - step.required_output.push_back(origin_column.name); } }; @@ -762,15 +799,14 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForNonRootQueryBlock( ExpressionActionsChain & chain, const String & column_prefix) const { - const auto & current_columns = getCurrentInputColumns(); NamesWithAliases final_project; UniqueNameGenerator unique_name_generator; - for (const auto & element : current_columns) + for (const auto & element : getCurrentInputColumns()) final_project.emplace_back(element.name, unique_name_generator.toUniqueName(column_prefix + element.name)); - initChain(chain, current_columns); + auto & step = initAndGetLastStep(chain); for (const auto & name : final_project) - chain.steps.back().required_output.push_back(name.first); + step.required_output.push_back(name.first); return final_project; } @@ -820,8 +856,7 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock( { /// for all the columns that need to be returned, if the type is timestamp, then convert /// the timestamp column to UTC based, refer to appendTimeZoneCastsAfterTS for more details - initChain(chain, current_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); + auto & step = initAndGetLastStep(chain); tipb::Expr tz_expr = constructTZExpr(context.getTimezoneInfo()); String tz_col; @@ -867,10 +902,10 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock( } } - initChain(chain, current_columns); + auto & step = initAndGetLastStep(chain); for (const auto & name : final_project) { - chain.steps.back().required_output.push_back(name.first); + step.required_output.push_back(name.first); } return final_project; } diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 1941dc34c1d..48bfe059e6d 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -53,6 +53,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable const tipb::TopN & topN); /// + /// May change the source columns. std::tuple appendAggregation( ExpressionActionsChain & chain, const tipb::Aggregation & agg, @@ -62,6 +63,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable ExpressionActionsChain & chain, const std::vector & columns) const; + ExpressionActionsChain::Step & initAndGetLastStep(ExpressionActionsChain & chain) const; + void appendJoin( ExpressionActionsChain & chain, SubqueryForSet & join_query, @@ -119,7 +122,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable const ::google::protobuf::RepeatedPtrField & order_by); void appendCastAfterAgg( - ExpressionActionsChain & chain, + const ExpressionActionsPtr & actions, const tipb::Aggregation & agg); String buildTupleFunctionForGroupConcat( @@ -131,7 +134,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable void buildGroupConcat( const tipb::Expr & expr, - ExpressionActionsChain::Step & step, + const ExpressionActionsPtr & actions, const String & agg_func_name, AggregateDescriptions & aggregate_descriptions, NamesAndTypes & aggregated_columns, @@ -139,12 +142,35 @@ class DAGExpressionAnalyzer : private boost::noncopyable void buildCommonAggFunc( const tipb::Expr & expr, - ExpressionActionsChain::Step & step, + const ExpressionActionsPtr & actions, const String & agg_func_name, AggregateDescriptions & aggregate_descriptions, NamesAndTypes & aggregated_columns, bool empty_input_as_null); + void buildAggFuncs( + const tipb::Aggregation & aggregation, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns); + + void buildAggGroupBy( + const google::protobuf::RepeatedPtrField & group_by, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns, + Names & aggregation_keys, + std::unordered_set & agg_key_set, + bool group_by_collation_sensitive, + TiDB::TiDBCollators & collators); + + void fillAggArgumentDetail( + const ExpressionActionsPtr & actions, + const tipb::Expr & arg, + Names & arg_names, + DataTypes & arg_types, + TiDB::TiDBCollators & arg_collators); + void makeExplicitSet( const tipb::Expr & expr, const Block & sample_block, diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp index 1585d6ffd79..397be528c70 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -5,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -56,25 +58,6 @@ const std::unordered_map DateSub::unit_to_func_name_map {"SECOND", "subtractSeconds"}}; } // namespace -String DAGExpressionAnalyzerHelper::genFuncString( - const String & func_name, - const Names & argument_names, - const TiDB::TiDBCollators & collators) -{ - assert(!collators.empty()); - FmtBuffer buf; - buf.fmtAppend("{}({})_collator", func_name, fmt::join(argument_names.begin(), argument_names.end(), ", ")); - for (const auto & collator : collators) - { - if (collator == nullptr) - buf.append("_0"); - else - buf.fmtAppend("_{}", collator->getCollatorId()); - } - buf.append(" "); - return buf.toString(); -} - String DAGExpressionAnalyzerHelper::buildMultiIfFunction( DAGExpressionAnalyzer * analyzer, const tipb::Expr & expr, diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.h index 83597d48a53..a474bf512b2 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.h @@ -69,11 +69,6 @@ class DAGExpressionAnalyzerHelper const tipb::Expr & expr, const ExpressionActionsPtr & actions); - static String genFuncString( - const String & func_name, - const Names & argument_names, - const TiDB::TiDBCollators & collators); - using FunctionBuilder = std::function; using FunctionBuilderMap = std::unordered_map; diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 1dd1b187c35..5ff09d6d8ca 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -956,8 +956,7 @@ void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const ti input_columns.emplace_back(p.name, p.type); DAGExpressionAnalyzer dag_analyzer(std::move(input_columns), context); ExpressionActionsChain chain; - dag_analyzer.initChain(chain, dag_analyzer.getCurrentInputColumns()); - ExpressionActionsChain::Step & last_step = chain.steps.back(); + auto & last_step = dag_analyzer.initAndGetLastStep(chain); std::vector output_columns; NamesWithAliases project_cols; UniqueNameGenerator unique_name_generator; diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 9f3f129ae0e..1adab2d5dc0 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -1155,6 +1155,25 @@ SortDescription getSortDescription(const std::vector & order_co return order_descr; } +String genFuncString( + const String & func_name, + const Names & argument_names, + const TiDB::TiDBCollators & collators) +{ + assert(!collators.empty()); + FmtBuffer buf; + buf.fmtAppend("{}({})_collator", func_name, fmt::join(argument_names.begin(), argument_names.end(), ", ")); + for (const auto & collator : collators) + { + if (collator) + buf.fmtAppend("_{}", collator->getCollatorId()); + else + buf.append("_0"); + } + buf.append(" "); + return buf.toString(); +} + TiDB::TiDBCollatorPtr getCollatorFromFieldType(const tipb::FieldType & field_type) { if (field_type.collate() < 0) diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.h b/dbms/src/Flash/Coprocessor/DAGUtils.h index f17caf8c4ec..6c758dc4f80 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.h +++ b/dbms/src/Flash/Coprocessor/DAGUtils.h @@ -36,6 +36,10 @@ DataTypePtr inferDataType4Literal(const tipb::Expr & expr); SortDescription getSortDescription( const std::vector & order_columns, const google::protobuf::RepeatedPtrField & by_items); +String genFuncString( + const String & func_name, + const Names & argument_names, + const TiDB::TiDBCollators & collators); extern const Int8 VAR_SIZE;