From a1c52bbd21e664ab526bb7430ca85cb2145a0601 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Tue, 15 Oct 2024 13:19:30 +0000 Subject: [PATCH] make RewriteAsHoppingWindow last optimization --- .../yql/core/common_opt/yql_co_last.cpp | 104 ++++++++++++++++++ .../yql/core/common_opt/yql_co_simple1.cpp | 99 ----------------- .../sql/dq_file/part10/canondata/result.json | 6 +- .../sql/dq_file/part18/canondata/result.json | 6 +- 4 files changed, 110 insertions(+), 105 deletions(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_last.cpp b/ydb/library/yql/core/common_opt/yql_co_last.cpp index 8b3f011e05e6..fe941b772e1e 100644 --- a/ydb/library/yql/core/common_opt/yql_co_last.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_last.cpp @@ -1,5 +1,7 @@ #include "yql_co.h" #include +#include +#include #include #include @@ -9,6 +11,99 @@ namespace { using namespace NNodes; +TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + + NHopping::EnsureNotDistinct(aggregate); + + const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false); + if (!maybeHopTraits) { + return nullptr; + } + const auto hopTraits = *maybeHopTraits; + + const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); + NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + + const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); + const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx); + const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx); + const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx); + const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx); + const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx); + const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx); + const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); + + const auto streamArg = Build(ctx, pos).Name("stream").Done(); + auto multiHoppingCoreBuilder = Build(ctx, pos) + .KeyExtractor(keyLambda) + .TimeExtractor(timeExtractorLambda) + .Hop(hopTraits.Traits.Hop()) + .Interval(hopTraits.Traits.Interval()) + .Delay(hopTraits.Traits.Delay()) + .DataWatermarks(hopTraits.Traits.DataWatermarks()) + .InitHandler(initLambda) + .UpdateHandler(updateLambda) + .MergeHandler(mergeLambda) + .FinishHandler(finishLambda) + .SaveHandler(saveLambda) + .LoadHandler(loadLambda) + .template WatermarkMode().Build(ToString(false)); + + return Build(ctx, pos) + .Input(aggregate.Input()) + .KeySelectorLambda(keyLambda) + .SortDirections() + .Literal() + .Value("true") + .Build() + .Build() + .SortKeySelectorLambda(timeExtractorLambda) + .ListHandlerLambda() + .Args(streamArg) + .template Body() + .Stream(Build(ctx, pos) + .Input(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) + .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) + .Done()) + .Build() + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { + const auto aggregate = TCoAggregate(node); + + if (!IsPureIsolatedLambda(*aggregate.Ptr())) { + return nullptr; + } + + if (!GetSetting(aggregate.Settings().Ref(), "hopping")) { + return nullptr; + } + + auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx); + if (!result) { + return result; + } + + auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns"); + if (!outputColumnSetting) { + return result; + } + + return Build(ctx, aggregate.Pos()) + .Input(result) + .Members(outputColumnSetting->ChildPtr(1)) + .Done() + .Ptr(); +} + std::unordered_set GetUselessSortedJoinInputs(const TCoEquiJoin& equiJoin) { std::unordered_map> sorteds(equiJoin.ArgCount() - 2U); for (ui32 i = 0U; i + 2U < equiJoin.ArgCount(); ++i) { @@ -56,6 +151,15 @@ std::unordered_set GetUselessSortedJoinInputs(const TCoEquiJoin& equiJoin) } // namespace void RegisterCoFinalCallables(TCallableOptimizerMap& map) { + map["Aggregate"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) { + if (auto hopping = RewriteAsHoppingWindow(node, ctx)) { + YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow"; + return hopping; + } + + return node; + }; + map["UnorderedSubquery"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { Y_UNUSED(optCtx); if (node->Head().IsCallable("Sort")) { diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 74369b736332..f1776842c531 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -3302,99 +3301,6 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext& return aggr.Ptr(); } -TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - - NHopping::EnsureNotDistinct(aggregate); - - const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false); - if (!maybeHopTraits) { - return nullptr; - } - const auto hopTraits = *maybeHopTraits; - - const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); - NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); - - const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); - const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx); - const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx); - const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx); - const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx); - const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx); - const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx); - const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); - - const auto streamArg = Build(ctx, pos).Name("stream").Done(); - auto multiHoppingCoreBuilder = Build(ctx, pos) - .KeyExtractor(keyLambda) - .TimeExtractor(timeExtractorLambda) - .Hop(hopTraits.Traits.Hop()) - .Interval(hopTraits.Traits.Interval()) - .Delay(hopTraits.Traits.Delay()) - .DataWatermarks(hopTraits.Traits.DataWatermarks()) - .InitHandler(initLambda) - .UpdateHandler(updateLambda) - .MergeHandler(mergeLambda) - .FinishHandler(finishLambda) - .SaveHandler(saveLambda) - .LoadHandler(loadLambda) - .template WatermarkMode().Build(ToString(false)); - - return Build(ctx, pos) - .Input(aggregate.Input()) - .KeySelectorLambda(keyLambda) - .SortDirections() - .Literal() - .Value("true") - .Build() - .Build() - .SortKeySelectorLambda(timeExtractorLambda) - .ListHandlerLambda() - .Args(streamArg) - .template Body() - .Stream(Build(ctx, pos) - .Input(multiHoppingCoreBuilder - .template Input() - .List(streamArg) - .Build() - .Done()) - .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) - .Done()) - .Build() - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { - const auto aggregate = TCoAggregate(node); - - if (!IsPureIsolatedLambda(*aggregate.Ptr())) { - return nullptr; - } - - if (!GetSetting(aggregate.Settings().Ref(), "hopping")) { - return nullptr; - } - - auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx); - if (!result) { - return result; - } - - auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns"); - if (!outputColumnSetting) { - return result; - } - - return Build(ctx, aggregate.Pos()) - .Input(result) - .Members(outputColumnSetting->ChildPtr(1)) - .Done() - .Ptr(); -} - TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { TVector withAssume; for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) { @@ -5130,11 +5036,6 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return clean; } - if (auto hopping = RewriteAsHoppingWindow(node, ctx)) { - YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow"; - return hopping; - } - return DropReorder(node, ctx); }; diff --git a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json index 18ce95e625d2..d44dc10eb9de 100644 --- a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json @@ -303,9 +303,9 @@ ], "test.test[aggregate-group_by_hop_static-default.txt-Debug]": [ { - "checksum": "07d9a8f046f4661ba479dbaf70979aac", - "size": 1630, - "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Debug_/opt.yql_patched" + "checksum": "caee4b9d62738134c8863de61acd0435", + "size": 1684, + "uri": "https://{canondata_backend}/1937027/3fc60ac9717c508baa95f9138fadc7c68a1ddf69/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Debug_/opt.yql_patched" } ], "test.test[aggregate-group_by_hop_static-default.txt-Plan]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json index ec21342b17d3..cd2f0765f7d6 100644 --- a/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json @@ -359,9 +359,9 @@ ], "test.test[aggregate-group_by_hop_static_list_key-default.txt-Debug]": [ { - "checksum": "41d48b8937d3e4bcc583915a7460727d", - "size": 1946, - "uri": "https://{canondata_backend}/1925821/6132b4b967a7c6d2d9c522d4a344e781b4121793/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Debug_/opt.yql_patched" + "checksum": "4be5a233eebd00cc365b1441345d3cf3", + "size": 2214, + "uri": "https://{canondata_backend}/1775319/6040330bbfecf8df32d9fd6118d6de5e8b4aa38e/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Debug_/opt.yql_patched" } ], "test.test[aggregate-group_by_hop_static_list_key-default.txt-Plan]": [