Skip to content

Commit

Permalink
Merge 8dc2fa2 into 6fcb3bd
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov authored Oct 17, 2024
2 parents 6fcb3bd + 8dc2fa2 commit 88b0b07
Show file tree
Hide file tree
Showing 25 changed files with 223 additions and 183 deletions.
111 changes: 111 additions & 0 deletions ydb/library/yql/core/common_opt/yql_co_last.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,120 @@
#include "yql_co.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_opt_hopping.h>
#include <ydb/library/yql/core/yql_opt_utils.h>

namespace NYql {

namespace {

using namespace NNodes;

TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();

NHopping::EnsureNotDistinct(aggregate);

const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
if (!maybeHopTraits) {
return nullptr;
}
const auto hopTraits = *maybeHopTraits;

const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);

const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);

const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
.Hop(hopTraits.Traits.Hop())
.Interval(hopTraits.Traits.Interval())
.Delay(hopTraits.Traits.Delay())
.DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(false));

return Build<TCoPartitionsByKeys>(ctx, pos)
.Input(aggregate.Input())
.KeySelectorLambda(keyLambda)
.SortDirections<TCoBool>()
.Literal()
.Value("true")
.Build()
.Build()
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Stream(Build<TCoMap>(ctx, pos)
.Input(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
.Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
.Done())
.Build()
.Build()
.Done()
.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
const auto aggregate = TCoAggregate(node);

if (!IsPureIsolatedLambda(*aggregate.Ptr())) {
return nullptr;
}

if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
return nullptr;
}

auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
if (!result) {
return result;
}

auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
if (!outputColumnSetting) {
return result;
}

return Build<TCoExtractMembers>(ctx, aggregate.Pos())
.Input(result)
.Members(outputColumnSetting->ChildPtr(1))
.Done()
.Ptr();
}

} // namespace

void RegisterCoFinalCallables(TCallableOptimizerMap& map) {
map["Aggregate"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
if (auto hopping = RewriteAsHoppingWindow(node, ctx)) {
YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow";
return hopping;
}

return node;
};

map["UnorderedSubquery"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
Y_UNUSED(optCtx);
if (node->Head().IsCallable("Sort")) {
Expand Down
99 changes: 0 additions & 99 deletions ydb/library/yql/core/common_opt/yql_co_simple1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <ydb/library/yql/core/yql_atom_enums.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_join.h>
#include <ydb/library/yql/core/yql_opt_hopping.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_opt_window.h>
#include <ydb/library/yql/core/yql_type_helpers.h>
Expand Down Expand Up @@ -3229,99 +3228,6 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
return aggr.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();

NHopping::EnsureNotDistinct(aggregate);

const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
if (!maybeHopTraits) {
return nullptr;
}
const auto hopTraits = *maybeHopTraits;

const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);

const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);

const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
.Hop(hopTraits.Traits.Hop())
.Interval(hopTraits.Traits.Interval())
.Delay(hopTraits.Traits.Delay())
.DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(false));

return Build<TCoPartitionsByKeys>(ctx, pos)
.Input(aggregate.Input())
.KeySelectorLambda(keyLambda)
.SortDirections<TCoBool>()
.Literal()
.Value("true")
.Build()
.Build()
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Stream(Build<TCoMap>(ctx, pos)
.Input(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
.Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
.Done())
.Build()
.Build()
.Done()
.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
const auto aggregate = TCoAggregate(node);

if (!IsPureIsolatedLambda(*aggregate.Ptr())) {
return nullptr;
}

if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
return nullptr;
}

auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
if (!result) {
return result;
}

auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
if (!outputColumnSetting) {
return result;
}

return Build<TCoExtractMembers>(ctx, aggregate.Pos())
.Input(result)
.Members(outputColumnSetting->ChildPtr(1))
.Done()
.Ptr();
}

TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
TVector<ui32> withAssume;
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
Expand Down Expand Up @@ -5101,11 +5007,6 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
return clean;
}

if (auto hopping = RewriteAsHoppingWindow(node, ctx)) {
YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow";
return hopping;
}

return DropReorder<false>(node, ctx);
};

Expand Down
38 changes: 12 additions & 26 deletions ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,32 +182,18 @@ static void CollectSinkStages(const NNodes::TDqQuery& dqQuery, THashSet<TExprNod
}

NNodes::TExprBase DqMergeQueriesWithSinks(NNodes::TExprBase dqQueryNode, TExprContext& ctx) {
NNodes::TDqQuery dqQuery = dqQueryNode.Cast<NNodes::TDqQuery>();

THashSet<TExprNode::TPtr, TExprNode::TPtrHash> sinkStages;
CollectSinkStages(dqQuery, sinkStages);
TOptimizeExprSettings settings{nullptr};
settings.VisitLambdas = false;
bool deletedDqQueryChild = false;
TExprNode::TPtr newDqQueryNode;
auto status = OptimizeExpr(dqQueryNode.Ptr(), newDqQueryNode, [&sinkStages, &deletedDqQueryChild](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
for (ui32 childIndex = 0; childIndex < node->ChildrenSize(); ++childIndex) {
TExprNode* child = node->Child(childIndex);
if (child->IsCallable(NNodes::TDqQuery::CallableName())) {
NNodes::TDqQuery dqQueryChild(child);
CollectSinkStages(dqQueryChild, sinkStages);
deletedDqQueryChild = true;
return ctx.ChangeChild(*node, childIndex, dqQueryChild.World().Ptr());
}
}
return node;
}, ctx, settings);
YQL_ENSURE(status != IGraphTransformer::TStatus::Error, "Failed to merge DqQuery nodes: " << status);

if (deletedDqQueryChild) {
auto dqQueryBuilder = Build<TDqQuery>(ctx, dqQuery.Pos());
dqQueryBuilder.World(newDqQueryNode->ChildPtr(TDqQuery::idx_World));

auto maybeDqQuery = dqQueryNode.Maybe<NNodes::TDqQuery>();
YQL_ENSURE(maybeDqQuery, "Expected DqQuery!");
auto dqQuery = maybeDqQuery.Cast();

if (auto maybeDqQueryChild = dqQuery.World().Maybe<NNodes::TDqQuery>()) {
auto dqQueryChild = maybeDqQueryChild.Cast();
auto dqQueryBuilder = Build<TDqQuery>(ctx, dqQuery.Pos())
.World(dqQueryChild.World());

THashSet<TExprNode::TPtr, TExprNode::TPtrHash> sinkStages;
CollectSinkStages(dqQuery, sinkStages);
CollectSinkStages(maybeDqQueryChild.Cast(), sinkStages);
auto sinkStagesBuilder = dqQueryBuilder.SinkStages();
for (const TExprNode::TPtr& stage : sinkStages) {
sinkStagesBuilder.Add(stage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "ClSourceSettings"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ class TClickHouseDataSourceTypeAnnotationTransformer : public TVisitorTransforme
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 3U, ctx)) {
if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TClSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TClSourceSettings>()
.World(clReadTable.World())
.Table(clReadTable.Table())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GenSourceSettings"},
"Children": [
{"Index": 0, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ namespace NYql {
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 5, ctx)) {
if (!EnsureArgsCount(*input, 6, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TGenSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ namespace NYql {
// clang-format off
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TGenSourceSettings>()
.World(genReadTable.World())
.Cluster(genReadTable.DataSource().Cluster())
.Table(genReadTable.Table())
.Token<TCoSecureParam>()
Expand Down
11 changes: 6 additions & 5 deletions ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "DqPqTopicSource"},
"Children": [
{"Index": 0, "Name": "Topic", "Type": "TPqTopic"},
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Topic", "Type": "TPqTopic"},
{"Index": 2, "Name": "Columns", "Type": "TExprBase"},
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,16 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
if (!EnsureArgsCount(input.Ref(), 5, ctx)) {
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
return TStatus::Error;
}

TDqPqTopicSource topicSource = input.Cast<TDqPqTopicSource>();

if (!EnsureWorldType(topicSource.World().Ref(), ctx)) {
return TStatus::Error;
}

TPqTopic topic = topicSource.Topic();

if (!EnsureCallable(topic.Ref(), ctx)) {
Expand Down
Loading

0 comments on commit 88b0b07

Please sign in to comment.