diff --git a/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp b/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp index b2929dccac22..3719a33cd92c 100644 --- a/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp @@ -50,7 +50,8 @@ IGraphTransformer::TStatus MultiUsageFlatMapOverJoin(const TExprNode::TPtr& node bool IsFilterMultiusageEnabled(const TOptimizeContext& optCtx) { YQL_ENSURE(optCtx.Types); static const TString multiUsageFlags = to_lower(TString("FilterPushdownEnableMultiusage")); - return optCtx.Types->OptimizerFlags.contains(multiUsageFlags); + static const TString noMultiUsageFlags = to_lower(TString("FilterPushdownDisableMultiusage")); + return optCtx.Types->OptimizerFlags.contains(multiUsageFlags) && !optCtx.Types->OptimizerFlags.contains(noMultiUsageFlags); } void FilterPushdownWithMultiusage(const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp index 3a279cb35e65..99f9f8365391 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp @@ -22,6 +22,15 @@ bool AllowSubsetFieldsForNode(const TExprNode& node, const TOptimizeContext& opt return optCtx.IsSingleUsage(node) || optCtx.Types->OptimizerFlags.contains(multiUsageFlags); } +bool AllowComplexFiltersOverAggregatePushdown(const TOptimizeContext& optCtx) { + YQL_ENSURE(optCtx.Types); + static const TString pushdown = to_lower(TString("PushdownComplexFiltersOverAggregate")); + static const TString noPushdown = to_lower(TString("DisablePushdownComplexFiltersOverAggregate")); + return optCtx.Types->OptimizerFlags.contains(pushdown) && + !optCtx.Types->OptimizerFlags.contains(noPushdown) && + optCtx.Types->MaxAggPushdownPredicates > 0; +} + TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) { auto inputType = node.Input().Ref().GetTypeAnn(); auto structType = inputType->GetKind() == ETypeAnnotationKind::List @@ -1040,16 +1049,281 @@ TExprNode::TPtr OptimizeCollect(const TExprNode::TPtr& node, TExprContext& ctx, return node; } -TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) { +enum ESubgraphType { + EXPR_CONST, + EXPR_KEYS, + EXPR_PAYLOADS, + EXPR_MIXED, +}; + +TNodeMap MarkSubgraphForAggregate(const TExprNode::TPtr& root, const TCoArgument& row, const THashSet& keys) { + TNodeMap result; + size_t insideDependsOn = 0; + VisitExpr(root, [&](const TExprNode::TPtr& node) { + if (node->IsComplete()) { + result[node.Get()] = EXPR_CONST; + return false; + } + if (node->IsCallable("DependsOn")) { + ++insideDependsOn; + return true; + } + + if (!insideDependsOn && node->IsCallable("Member") && &node->Head() == row.Raw()) { + result[node.Get()] = keys.contains(node->Child(1)->Content()) ? EXPR_KEYS : EXPR_PAYLOADS; + return false; + } + + if (node->IsArgument()) { + result[node.Get()] = node.Get() == row.Raw() ? EXPR_MIXED : EXPR_CONST; + return false; + } + + return true; + }, [&](const TExprNode::TPtr& node) { + if (node->IsCallable("DependsOn")) { + YQL_ENSURE(insideDependsOn); + --insideDependsOn; + } + if (result.contains(node.Get())) { + return true; + } + ESubgraphType derivedType = EXPR_CONST; + for (auto& child : node->ChildrenList()) { + auto it = result.find(child.Get()); + YQL_ENSURE(it != result.end()); + switch (it->second) { + case EXPR_CONST: + break; + case EXPR_KEYS: { + if (derivedType == EXPR_CONST) { + derivedType = EXPR_KEYS; + } else if (derivedType == EXPR_PAYLOADS) { + derivedType = EXPR_MIXED; + } + break; + } + case EXPR_PAYLOADS: { + if (derivedType == EXPR_CONST) { + derivedType = EXPR_PAYLOADS; + } else if (derivedType == EXPR_KEYS) { + derivedType = EXPR_MIXED; + } + break; + } + case EXPR_MIXED: + derivedType = EXPR_MIXED; + break; + } + } + YQL_ENSURE(result.insert({node.Get(), derivedType}).second); + return true; + }); + + return result; +} + +class ICalcualtor : public TThrRefBase { +public: + TMaybe Calculate() const { + if (!Cached_.Defined()) { + Cached_ = DoCalculate(); + } + return *Cached_; + } + + void DropCache() { + Cached_ = {}; + DropChildCaches(); + } + + using TPtr = TIntrusivePtr; +protected: + virtual TMaybe DoCalculate() const = 0; + virtual void DropChildCaches() = 0; +private: + mutable TMaybe> Cached_; +}; + +class TUnknownValue : public ICalcualtor { +public: + TUnknownValue() = default; +private: + TMaybe DoCalculate() const override { + return {}; + } + void DropChildCaches() override { + } +}; + +class TImmediateValue : public ICalcualtor { +public: + TImmediateValue(ui64& input, size_t index) + : Input_(&input) + , Index_(index) + { + YQL_ENSURE(index < 64); + } +private: + TMaybe DoCalculate() const override { + return ((*Input_) & (ui64(1) << Index_)) != 0; + } + void DropChildCaches() override { + } + + const ui64* const Input_; + const size_t Index_; +}; + +class TAndValue : public ICalcualtor { +public: + explicit TAndValue(TVector&& children) + : Children_(std::move(children)) + { + YQL_ENSURE(!Children_.empty()); + } +private: + TMaybe DoCalculate() const override { + bool allTrue = true; + for (auto& child : Children_) { + YQL_ENSURE(child); + auto val = child->Calculate(); + if (!val.Defined()) { + allTrue = false; + } else if (!*val) { + return false; + } + } + if (allTrue) { + return true; + } + return {}; + } + void DropChildCaches() override { + for (auto& child : Children_) { + child->DropCache(); + } + } + + const TVector Children_; +}; + +class TOrValue : public ICalcualtor { +public: + explicit TOrValue(TVector&& children) + : Children_(std::move(children)) + { + YQL_ENSURE(!Children_.empty()); + } +private: + TMaybe DoCalculate() const override { + bool allFalse = true; + for (auto& child : Children_) { + YQL_ENSURE(child); + auto val = child->Calculate(); + if (!val.Defined()) { + allFalse = false; + } else if (*val) { + return true; + } + } + if (allFalse) { + return false; + } + return {}; + } + + void DropChildCaches() override { + for (auto& child : Children_) { + child->DropCache(); + } + } + + const TVector Children_; +}; + +class TNotValue : public ICalcualtor { +public: + explicit TNotValue(ICalcualtor::TPtr child) + : Child_(std::move(child)) + { + YQL_ENSURE(Child_); + } +private: + TMaybe DoCalculate() const override { + auto val = Child_->Calculate(); + if (!val.Defined()) { + return val; + } + return !*val; + } + + void DropChildCaches() override { + Child_->DropCache(); + } + const ICalcualtor::TPtr Child_; +}; + +ICalcualtor::TPtr BuildProgram(const TExprNode::TPtr& node, const TNodeMap& markedGraph, + TNodeMap& calcCache, TExprNodeList& keyPredicates, ui64& inputs) +{ + auto cached = calcCache.find(node.Get()); + if (cached != calcCache.end()) { + return cached->second; + } + + if (node->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Data || node->GetTypeAnn()->Cast()->GetSlot() != EDataSlot::Bool) { + return nullptr; + } + + auto it = markedGraph.find(node.Get()); + YQL_ENSURE(it != markedGraph.end()); + ESubgraphType type = it->second; + + ICalcualtor::TPtr result; + if (type == EXPR_CONST || type == EXPR_PAYLOADS) { + result = new TUnknownValue(); + } else if (type == EXPR_KEYS) { + size_t index = keyPredicates.size(); + if (index >= 64) { + return nullptr; + } + result = new TImmediateValue(inputs, index); + keyPredicates.push_back(node); + } else if (node->IsCallable({"And", "Or", "Not"})) { + YQL_ENSURE(type == EXPR_MIXED); + YQL_ENSURE(node->ChildrenSize()); + TVector childCalcs; + childCalcs.reserve(node->ChildrenSize()); + for (auto& childNode : node->ChildrenList()) { + childCalcs.emplace_back(BuildProgram(childNode, markedGraph, calcCache, keyPredicates, inputs)); + if (!childCalcs.back()) { + return nullptr; + } + } + if (node->IsCallable("And")) { + result = new TAndValue(std::move(childCalcs)); + } else if (node->IsCallable("Or")) { + result = new TOrValue(std::move(childCalcs)); + } else { + result = new TNotValue(childCalcs.front()); + } + } + + if (result) { + calcCache[node.Get()] = result; + } + return result; +} + +TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, TOptimizeContext& optCtx) { + YQL_ENSURE(optCtx.ParentsMap); if (!TCoConditionalValueBase::Match(node.Lambda().Body().Raw())) { return node; } - TExprBase arg = node.Lambda().Args().Arg(0); + const TCoArgument arg = node.Lambda().Args().Arg(0); TCoConditionalValueBase body = node.Lambda().Body().Cast(); - if (HasDependsOn(body.Predicate().Ptr(), arg.Ptr())) { - return node; - } const TCoAggregate agg = node.Input().Cast(); THashSet keyColumns; @@ -1066,16 +1340,64 @@ TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, con TExprNodeList pushComponents; TExprNodeList restComponents; + size_t separableComponents = 0; for (auto& p : andComponents) { TSet usedFields; if (p->IsCallable("Likely") || - !HaveFieldsSubset(p, arg.Ref(), usedFields, parentsMap) || + HasDependsOn(p, arg.Ptr()) || + !HaveFieldsSubset(p, arg.Ref(), usedFields, *optCtx.ParentsMap) || !AllOf(usedFields, [&](TStringBuf field) { return keyColumns.contains(field); }) || - !IsStrict(p)) + !p->IsComplete() && !IsStrict(p)) { restComponents.push_back(p); } else { pushComponents.push_back(p); + ++separableComponents; + } + } + + size_t nonSeparableComponents = 0; + size_t maxKeyPredicates = 0; + if (AllowComplexFiltersOverAggregatePushdown(optCtx)) { + for (auto& p : restComponents) { + if (p->IsCallable("Likely")) { + continue; + } + const TNodeMap marked = MarkSubgraphForAggregate(p, arg, keyColumns); + auto rootIt = marked.find(p.Get()); + YQL_ENSURE(rootIt != marked.end()); + YQL_ENSURE(rootIt->second == EXPR_MIXED, "Key-only or const predicates should be handled earlier"); + + TNodeMap calcCache; + TExprNodeList keyPredicates; + ui64 inputs = 0; + + auto calculator = BuildProgram(p, marked, calcCache, keyPredicates, inputs); + if (!calculator || keyPredicates.empty() || keyPredicates.size() > optCtx.Types->MaxAggPushdownPredicates) { + continue; + } + + ui64 maxInputs = ui64(1) << keyPredicates.size(); + maxKeyPredicates = std::max(maxKeyPredicates, keyPredicates.size()); + bool canPush = false; + for (inputs = 0; inputs < maxInputs; ++inputs) { + // the goal is to find all keyPredicate values for which p yields False value irrespective of all constants and payloads + auto pResult = calculator->Calculate(); + if (pResult.Defined() && !*pResult) { + canPush = true; + TExprNodeList orItems; + for (size_t i = 0; i < keyPredicates.size(); ++i) { + // not (P1 == X and P2 == Y) -> (P1 != X or P2 != Y) + // P1 != X: (X is true -> not P1, X is false -> P1) + bool value = (inputs & (ui64(1) << i)) != 0; + orItems.emplace_back(ctx.WrapByCallableIf(value, "Not", TExprNode::TPtr(keyPredicates[i]))); + } + pushComponents.push_back(ctx.NewCallable(p->Pos(), "Or", std::move(orItems))); + } + calculator->DropCache(); + } + nonSeparableComponents += canPush; + p = ctx.WrapByCallableIf(canPush, "Likely", std::move(p)); } } @@ -1083,6 +1405,10 @@ TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, con return node; } + YQL_CLOG(DEBUG, Core) << "Filter over Aggregate : " << separableComponents << " separable, " + << nonSeparableComponents << " non-separable predicates out of " << andComponents.size() + << ". Pushed " << pushComponents.size() << " components. Maximum analyzed key predicates " << maxKeyPredicates; + TExprNode::TPtr pushPred = ctx.NewCallable(body.Predicate().Pos(), "And", std::move(pushComponents)); TExprNode::TPtr restPred = restComponents.empty() ? MakeBool(body.Predicate().Pos(), ctx) : @@ -1123,13 +1449,12 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { } if (self.Input().Ref().IsCallable("Aggregate")) { - auto ret = FilterOverAggregate(self, ctx, *optCtx.ParentsMap); + auto ret = FilterOverAggregate(self, ctx, optCtx); if (!ret.Raw()) { return nullptr; } if (ret.Raw() != self.Raw()) { - YQL_CLOG(DEBUG, Core) << "Filter over Aggregate"; return ret.Ptr(); } } diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h index da1c270bb98e..b225c5789d0b 100644 --- a/ydb/library/yql/core/yql_type_annotation.h +++ b/ydb/library/yql/core/yql_type_annotation.h @@ -353,6 +353,7 @@ struct TTypeAnnotationContext: public TThrRefBase { TColumnOrderStorage::TPtr ColumnOrderStorage = new TColumnOrderStorage; THashSet OptimizerFlags; bool StreamLookupJoin = false; + ui32 MaxAggPushdownPredicates = 6; // algorithm complexity is O(2^N) TMaybe LookupColumnOrder(const TExprNode& node) const; IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx); diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp index 160536ac7b39..d1f67e904a74 100644 --- a/ydb/library/yql/providers/config/yql_config_provider.cpp +++ b/ydb/library/yql/providers/config/yql_config_provider.cpp @@ -957,6 +957,22 @@ namespace { return false; } Types.StreamLookupJoin = name == "_EnableStreamLookupJoin"; + } else if (name == "MaxAggPushdownPredicates") { + if (args.size() != 1) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected single numeric argument, but got " << args.size())); + return false; + } + ui32 value; + if (!TryFromString(args[0], value)) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected non-negative integer, but got: " << args[0])); + return false; + } + const ui32 hardLimit = 10; + if (value > hardLimit) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Hard limit for setting MaxAggPushdownPredicates is " << hardLimit << ", but got: " << args[0])); + return false; + } + Types.MaxAggPushdownPredicates = value; } else { ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name)); return false; diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index 571dd7ad2254..9507f982a2a8 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -11031,6 +11031,13 @@ "uri": "https://{canondata_backend}/1784117/d56ae82ad9d30397a41490647be1bd2124718f98/resource.tar.gz#test_sql2yql.test_optimizers-passthrough_sortness_over_map_/sql.yql" } ], + "test_sql2yql.test[optimizers-pushdown_nonsep_over_aggregate]": [ + { + "checksum": "9754c147d630a173aa594d1cc3495c1f", + "size": 2418, + "uri": "https://{canondata_backend}/1899731/407203c9a441603d4ce4780c1c3daab84de46096/resource.tar.gz#test_sql2yql.test_optimizers-pushdown_nonsep_over_aggregate_/sql.yql" + } + ], "test_sql2yql.test[optimizers-sort_by_nonstrict_const]": [ { "checksum": "169c512d23e47b59ae22f678661c4ac2", @@ -30582,6 +30589,13 @@ "uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_optimizers-passthrough_sortness_over_map_/formatted.sql" } ], + "test_sql_format.test[optimizers-pushdown_nonsep_over_aggregate]": [ + { + "checksum": "59dcdde5e1a360b7a06ab4eb12d91f2e", + "size": 326, + "uri": "https://{canondata_backend}/1899731/407203c9a441603d4ce4780c1c3daab84de46096/resource.tar.gz#test_sql_format.test_optimizers-pushdown_nonsep_over_aggregate_/formatted.sql" + } + ], "test_sql_format.test[optimizers-sort_by_nonstrict_const]": [ { "checksum": "a8ef5fa0a5a869ab80571d8a9c27cb93", diff --git a/ydb/library/yql/tests/sql/suites/optimizers/input5.txt b/ydb/library/yql/tests/sql/suites/optimizers/input5.txt new file mode 100644 index 000000000000..b214aab0d929 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/optimizers/input5.txt @@ -0,0 +1,10 @@ +{"key"="023";"subkey"="3";"value"="aaa"}; +{"key"="037";"subkey"="5";"value"="ddd"}; +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="150";"subkey"="1";"value"="aaa"}; +{"key"="150";"subkey"="3";"value"="iii"}; +{"key"="150";"subkey"="8";"value"="zzz"}; +{"key"="200";"subkey"="7";"value"="qqq"}; +{"key"="527";"subkey"="4";"value"="bbb"}; +{"key"="761";"subkey"="6";"value"="ccc"}; +{"key"="911";"subkey"="2";"value"="kkk"}; diff --git a/ydb/library/yql/tests/sql/suites/optimizers/input5.txt.attr b/ydb/library/yql/tests/sql/suites/optimizers/input5.txt.attr new file mode 100644 index 000000000000..ed13e2022391 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/optimizers/input5.txt.attr @@ -0,0 +1,11 @@ +{"_yql_row_spec"={ + "Type"=["StructType";[ + ["key";["DataType";"String"]]; + ["subkey";["DataType";"String"]]; + ["value";["DataType";"String"]] + ]]; + "SortDirections"=[1;1;]; + "SortedBy"=["key";"subkey";]; + "SortedByTypes"=[["DataType";"String";];["DataType";"String";];]; + "SortMembers"=["key";"subkey";]; +}} diff --git a/ydb/library/yql/tests/sql/suites/optimizers/pushdown_nonsep_over_aggregate.cfg b/ydb/library/yql/tests/sql/suites/optimizers/pushdown_nonsep_over_aggregate.cfg new file mode 100644 index 000000000000..f3472752d8ff --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/optimizers/pushdown_nonsep_over_aggregate.cfg @@ -0,0 +1,3 @@ +in Input input5.txt +res result.txt +providers yt diff --git a/ydb/library/yql/tests/sql/suites/optimizers/pushdown_nonsep_over_aggregate.sql b/ydb/library/yql/tests/sql/suites/optimizers/pushdown_nonsep_over_aggregate.sql new file mode 100644 index 000000000000..b3469e9a3be9 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/optimizers/pushdown_nonsep_over_aggregate.sql @@ -0,0 +1,12 @@ +USE plato; + +pragma config.flags("OptimizerFlags", "PushdownComplexFiltersOverAggregate"); + +SELECT * FROM ( + SELECT + key as key, + min(value) as mv + FROM Input + GROUP BY key +) +WHERE AssumeNonStrict(200 > 100) and (2000 > 1000) and key != "911" and (key < "150" and mv != "ddd" or key > "200"); diff --git a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json index c326cdf4e336..e9f05840e59f 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json @@ -1729,6 +1729,27 @@ "uri": "https://{canondata_backend}/1899731/c92a452de9406ab0662eb9deef9f0799dbd3673d/resource.tar.gz#test.test_optimizers-instant_contains_lookup-default.txt-Results_/results.txt" } ], + "test.test[optimizers-pushdown_nonsep_over_aggregate--Debug]": [ + { + "checksum": "5f5a85bd3f1e1232ecbfaf31d31aba9b", + "size": 3334, + "uri": "https://{canondata_backend}/1775319/ad87b75389e49e361d63dc9c035af466273c5ede/resource.tar.gz#test.test_optimizers-pushdown_nonsep_over_aggregate--Debug_/opt.yql" + } + ], + "test.test[optimizers-pushdown_nonsep_over_aggregate--Plan]": [ + { + "checksum": "d5941f47b04c3cfe085b48a7c72fe53a", + "size": 4910, + "uri": "https://{canondata_backend}/1775319/ad87b75389e49e361d63dc9c035af466273c5ede/resource.tar.gz#test.test_optimizers-pushdown_nonsep_over_aggregate--Plan_/plan.txt" + } + ], + "test.test[optimizers-pushdown_nonsep_over_aggregate--Results]": [ + { + "checksum": "46c155f8a1857a330ed66a432da3023a", + "size": 1303, + "uri": "https://{canondata_backend}/1775319/ad87b75389e49e361d63dc9c035af466273c5ede/resource.tar.gz#test.test_optimizers-pushdown_nonsep_over_aggregate--Results_/results.txt" + } + ], "test.test[optimizers-sort_constraint_in_left--Debug]": [ { "checksum": "02165f080ec4f429f4febd61ba335a30",