Skip to content

Commit

Permalink
make RewriteAsHoppingWindow last optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov committed Oct 15, 2024
1 parent f1035af commit a1c52bb
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 105 deletions.
104 changes: 104 additions & 0 deletions ydb/library/yql/core/common_opt/yql_co_last.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "yql_co.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_opt_hopping.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>

Expand All @@ -9,6 +11,99 @@ namespace {

using namespace NNodes;

TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();

NHopping::EnsureNotDistinct(aggregate);

const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
if (!maybeHopTraits) {
return nullptr;
}
const auto hopTraits = *maybeHopTraits;

const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);

const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);

const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
.Hop(hopTraits.Traits.Hop())
.Interval(hopTraits.Traits.Interval())
.Delay(hopTraits.Traits.Delay())
.DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(false));

return Build<TCoPartitionsByKeys>(ctx, pos)
.Input(aggregate.Input())
.KeySelectorLambda(keyLambda)
.SortDirections<TCoBool>()
.Literal()
.Value("true")
.Build()
.Build()
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Stream(Build<TCoMap>(ctx, pos)
.Input(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
.Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
.Done())
.Build()
.Build()
.Done()
.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
const auto aggregate = TCoAggregate(node);

if (!IsPureIsolatedLambda(*aggregate.Ptr())) {
return nullptr;
}

if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
return nullptr;
}

auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
if (!result) {
return result;
}

auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
if (!outputColumnSetting) {
return result;
}

return Build<TCoExtractMembers>(ctx, aggregate.Pos())
.Input(result)
.Members(outputColumnSetting->ChildPtr(1))
.Done()
.Ptr();
}

std::unordered_set<ui32> GetUselessSortedJoinInputs(const TCoEquiJoin& equiJoin) {
std::unordered_map<std::string_view, std::tuple<ui32, const TSortedConstraintNode*, const TChoppedConstraintNode*>> sorteds(equiJoin.ArgCount() - 2U);
for (ui32 i = 0U; i + 2U < equiJoin.ArgCount(); ++i) {
Expand Down Expand Up @@ -56,6 +151,15 @@ std::unordered_set<ui32> GetUselessSortedJoinInputs(const TCoEquiJoin& equiJoin)
} // namespace

void RegisterCoFinalCallables(TCallableOptimizerMap& map) {
map["Aggregate"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
if (auto hopping = RewriteAsHoppingWindow(node, ctx)) {
YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow";
return hopping;
}

return node;
};

map["UnorderedSubquery"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
Y_UNUSED(optCtx);
if (node->Head().IsCallable("Sort")) {
Expand Down
99 changes: 0 additions & 99 deletions ydb/library/yql/core/common_opt/yql_co_simple1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <ydb/library/yql/core/yql_atom_enums.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_join.h>
#include <ydb/library/yql/core/yql_opt_hopping.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_opt_window.h>
#include <ydb/library/yql/core/yql_type_helpers.h>
Expand Down Expand Up @@ -3302,99 +3301,6 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
return aggr.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();

NHopping::EnsureNotDistinct(aggregate);

const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
if (!maybeHopTraits) {
return nullptr;
}
const auto hopTraits = *maybeHopTraits;

const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);

const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);

const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
.Hop(hopTraits.Traits.Hop())
.Interval(hopTraits.Traits.Interval())
.Delay(hopTraits.Traits.Delay())
.DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(false));

return Build<TCoPartitionsByKeys>(ctx, pos)
.Input(aggregate.Input())
.KeySelectorLambda(keyLambda)
.SortDirections<TCoBool>()
.Literal()
.Value("true")
.Build()
.Build()
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Stream(Build<TCoMap>(ctx, pos)
.Input(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
.Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
.Done())
.Build()
.Build()
.Done()
.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
const auto aggregate = TCoAggregate(node);

if (!IsPureIsolatedLambda(*aggregate.Ptr())) {
return nullptr;
}

if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
return nullptr;
}

auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
if (!result) {
return result;
}

auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
if (!outputColumnSetting) {
return result;
}

return Build<TCoExtractMembers>(ctx, aggregate.Pos())
.Input(result)
.Members(outputColumnSetting->ChildPtr(1))
.Done()
.Ptr();
}

TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
TVector<ui32> withAssume;
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
Expand Down Expand Up @@ -5130,11 +5036,6 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
return clean;
}

if (auto hopping = RewriteAsHoppingWindow(node, ctx)) {
YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow";
return hopping;
}

return DropReorder<false>(node, ctx);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@
],
"test.test[aggregate-group_by_hop_static-default.txt-Debug]": [
{
"checksum": "07d9a8f046f4661ba479dbaf70979aac",
"size": 1630,
"uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Debug_/opt.yql_patched"
"checksum": "caee4b9d62738134c8863de61acd0435",
"size": 1684,
"uri": "https://{canondata_backend}/1937027/3fc60ac9717c508baa95f9138fadc7c68a1ddf69/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[aggregate-group_by_hop_static-default.txt-Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@
],
"test.test[aggregate-group_by_hop_static_list_key-default.txt-Debug]": [
{
"checksum": "41d48b8937d3e4bcc583915a7460727d",
"size": 1946,
"uri": "https://{canondata_backend}/1925821/6132b4b967a7c6d2d9c522d4a344e781b4121793/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Debug_/opt.yql_patched"
"checksum": "4be5a233eebd00cc365b1441345d3cf3",
"size": 2214,
"uri": "https://{canondata_backend}/1775319/6040330bbfecf8df32d9fd6118d6de5e8b4aa38e/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[aggregate-group_by_hop_static_list_key-default.txt-Plan]": [
Expand Down

0 comments on commit a1c52bb

Please sign in to comment.