diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 7ec98aa7f569..4665639536dd 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -111,8 +112,28 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { } TMaybeNode RewriteAggregate(TExprBase node, TExprContext& ctx) { - TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey()); - DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Ptr(), ctx); + TMaybeNode output; + auto aggregate = node.Cast(); + auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); + if (hopSetting) { + auto input = aggregate.Input().Maybe(); + if (!input) { + return node; + } + output = NHopping::RewriteAsHoppingWindow( + node, + ctx, + input.Cast(), + false, // analyticsHopping + TDuration::MilliSeconds(TDqSettings::TDefault::WatermarksLateArrivalDelayMs), + true, // defaultWatermarksMode + true); // syncActor + } else { + output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey()); + } + if (output) { + DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx); + } return output; } diff --git a/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp b/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp index 2000c73ad025..6aa539e02e5f 100644 --- a/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp @@ -89,6 +89,107 @@ Y_UNIT_TEST_SUITE(KqpAgg) { [["Value3"];[1]] ])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(AggWithHop) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT + Text, + CAST(COUNT(*) as Int32) as Count, + SUM(Data) + FROM EightShard + GROUP BY HOP(CAST(Key AS Timestamp?), "PT1M", "PT1M", "PT1M"), Text + ORDER BY Text; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [["Value1"];[8];[15]]; + [["Value2"];[8];[16]]; + [["Value3"];[8];[17]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(GroupByLimit) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `TestTable` ( + a Uint64, + b Uint64, + c Uint64, + d Uint64, + e Uint64, + PRIMARY KEY (a, b, c) + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteDataQuery(R"( + REPLACE INTO `TestTable` (a, b, c, d, e) VALUES + (1, 11, 21, 31, 41), + (2, 12, 22, 32, 42), + (3, 13, 23, 33, 43); + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync()); + + + { // query with 36 groups and limit 32 + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + PRAGMA GroupByLimit = '32'; + + SELECT a, b, c, d, SUM(e) Data FROM TestTable + GROUP BY ROLLUP(a, b, c, d, a * b AS ab, b * c AS bc, c * d AS cd, a + b AS sum) + ORDER BY a, b, c, d; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + } + + { // query with 36 groups (without explicit limit) + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT a, b, c, d, SUM(e) Data FROM TestTable + GROUP BY ROLLUP(a, b, c, d, a * b AS ab, b * c AS bc, c * d AS cd, a + b AS sum) + ORDER BY a, b, c, d; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#;#;#;#;[126u]]; + [[1u];#;#;#;[41u]]; + [[1u];[11u];#;#;[41u]]; + [[1u];[11u];[21u];#;[41u]]; + [[1u];[11u];[21u];[31u];[41u]]; + [[1u];[11u];[21u];[31u];[41u]]; + [[1u];[11u];[21u];[31u];[41u]]; + [[1u];[11u];[21u];[31u];[41u]]; + [[1u];[11u];[21u];[31u];[41u]]; + [[2u];#;#;#;[42u]]; + [[2u];[12u];#;#;[42u]]; + [[2u];[12u];[22u];#;[42u]]; + [[2u];[12u];[22u];[32u];[42u]]; + [[2u];[12u];[22u];[32u];[42u]]; + [[2u];[12u];[22u];[32u];[42u]]; + [[2u];[12u];[22u];[32u];[42u]]; + [[2u];[12u];[22u];[32u];[42u]]; + [[3u];#;#;#;[43u]]; + [[3u];[13u];#;#;[43u]]; + [[3u];[13u];[23u];#;[43u]]; + [[3u];[13u];[23u];[33u];[43u]]; + [[3u];[13u];[23u];[33u];[43u]]; + [[3u];[13u];[23u];[33u];[43u]]; + [[3u];[13u];[23u];[33u];[43u]]; + [[3u];[13u];[23u];[33u];[43u]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt_hopping.cpp b/ydb/library/yql/dq/opt/dq_opt_hopping.cpp new file mode 100644 index 000000000000..ff84188dc929 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_hopping.cpp @@ -0,0 +1,793 @@ +#include "dq_opt_hopping.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NYql::NDq::NHopping { + +using namespace NYql; +using namespace NYql::NDq; +using namespace NYql::NNodes; + +struct THoppingTraits { + TString Column; + TCoHoppingTraits Traits; + ui64 Hop; + ui64 Interval; + ui64 Delay; +}; + + struct TKeysDescription { + TVector PickleKeys; + TVector MemberKeys; + TVector FakeKeys; + + TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { + for (const auto& key : keys) { + if (key.StringValue() == hoppingColumn) { + FakeKeys.emplace_back(key.StringValue()); + continue; + } + + const auto index = rowType.FindItem(key.StringValue()); + Y_ENSURE(index); + + auto itemType = rowType.GetItems()[*index]->GetItemType(); + if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { + MemberKeys.emplace_back(key.StringValue()); + continue; + } + + PickleKeys.emplace_back(key.StringValue()); + } + } + + TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); + + TExprBase body = arg; + + for (const auto& key : PickleKeys) { + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) + .Done() + .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "StablePickle", { member })) + .Done(); + } + + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); + } + + TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); + + TExprBase body = arg; + + for (const auto& key : PickleKeys) { + const auto index = rowType.FindItem(key); + Y_ENSURE(index); + + auto itemType = rowType.GetItems().at(*index)->GetItemType(); + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) + .Done() + .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) + .Done(); + } + + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); + } + + TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const { + TVector res; + res.reserve(PickleKeys.size() + MemberKeys.size()); + + for (const auto& pickleKey : PickleKeys) { + res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); + } + for (const auto& memberKey : MemberKeys) { + res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); + } + return res; + } + + TVector GetActualGroupKeys() { + TVector result; + result.reserve(PickleKeys.size() + MemberKeys.size()); + result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); + result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); + return result; + } + + bool NeedPickle() const { + return !PickleKeys.empty(); + } + + TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { + auto builder = Build(ctx, pos); + for (auto key : GetKeysList(ctx, pos)) { + builder.Add(std::move(key)); + } + return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); + } +}; + +TString BuildColumnName(const TExprBase& column) { + if (const auto columnName = column.Maybe()) { + return columnName.Cast().StringValue(); + } + + if (const auto columnNames = column.Maybe()) { + TStringBuilder columnNameBuilder; + for (const auto columnName : columnNames.Cast()) { + columnNameBuilder.append(columnName.StringValue()); + columnNameBuilder.append("_"); + } + return columnNameBuilder; + } + + YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " + << column.Ptr()->Dump()); +} + +bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { + return !hoppingSetting->Child(1)->IsList(); +} + +void EnsureNotDistinct(const TCoAggregate& aggregate) { + const auto& aggregateHandlers = aggregate.Handlers(); + + YQL_ENSURE( + AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }), + "Distinct is not supported for aggregation with hop"); +} + +TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { + const auto pos = aggregate.Pos(); + + const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); + if (!hopSetting) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); + return Nothing(); + } + + const auto hoppingColumn = IsLegacyHopping(hopSetting) + ? "_yql_time" + : TString(hopSetting->Child(1)->Child(0)->Content()); + + const auto traitsNode = IsLegacyHopping(hopSetting) + ? hopSetting->Child(1) + : hopSetting->Child(1)->Child(1); + + const auto maybeTraits = TMaybeNode(traitsNode); + if (!maybeTraits) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); + return Nothing(); + } + + const auto traits = maybeTraits.Cast(); + + const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { + if (param.Maybe()) { + param = param.Cast().Input(); + } + if (!param.Maybe()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); + return 0; + } + auto value = FromString(param.Cast().Literal().Value()); + if (value <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); + return 0; + } + return (ui64)value; + }; + + const auto hop = checkIntervalParam(traits.Hop()); + if (!hop) { + return Nothing(); + } + const auto interval = checkIntervalParam(traits.Interval()); + if (!interval) { + return Nothing(); + } + const auto delay = checkIntervalParam(traits.Delay()); + if (!delay) { + return Nothing(); + } + + if (interval < hop) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); + return Nothing(); + } + if (delay < hop) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); + return Nothing(); + } + + const auto newTraits = Build(ctx, aggregate.Pos()) + .InitFrom(traits) + .DataWatermarks(analyticsMode + ? ctx.NewAtom(aggregate.Pos(), "false") + : traits.DataWatermarks().Ptr()) + .Done(); + + return THoppingTraits { + hoppingColumn, + newTraits, + hop, + interval, + delay + }; +} + +TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { + const auto pos = hoppingTraits.Pos(); + + if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetSize() == 0) { + // The case when no fields are used in lambda. F.e. when it has only DependsOn. + return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref()); + } + + return Build(ctx, pos) + .Args({"item"}) + .Body() + .Apply(hoppingTraits.TimeExtractor()) + .With(0) + .Type(hoppingTraits.ItemType()) + .Value("item") + .Build() + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto& aggregateHandlers = aggregate.Handlers(); + + const auto initItemArg = Build(ctx, pos).Name("item").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + ui32 index = 0; + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + + TMaybeNode applier; + if (tuple.Trait().Cast().InitHandler().Args().Size() == 1) { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().InitHandler()) + .With(0, initItemArg) + .Done(); + } else { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().InitHandler()) + .With(0, initItemArg) + .With(1) + .Literal().Build(ToString(index)) + .Build() + .Done(); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(BuildColumnName(tuple.ColumnName())) + .Value(applier) + .Done()); + ++index; + } + + return Build(ctx, pos) + .Args({initItemArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto updateItemArg = Build(ctx, pos).Name("item").Done(); + const auto updateStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + i32 index = 0; + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(updateStateArg) + .Name().Build(columnName) + .Done(); + + TMaybeNode applier; + if (tuple.Trait().Cast().UpdateHandler().Args().Size() == 2) { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().UpdateHandler()) + .With(0, updateItemArg) + .With(1, member) + .Done(); + } else { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().UpdateHandler()) + .With(0, updateItemArg) + .With(1, member) + .With(2) + .Literal().Build(ToString(index)) + .Build() + .Done(); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value(applier) + .Done()); + ++index; + } + + return Build(ctx, pos) + .Args({updateItemArg, updateStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr WrapToShuffle( + const TKeysDescription& keysDescription, + const TCoAggregate& aggregate, + const TDqConnection& input, + TExprContext& ctx) +{ + auto pos = aggregate.Pos(); + + TDqStageBase mappedInput = input.Output().Stage(); + if (keysDescription.NeedPickle()) { + mappedInput = Build(ctx, pos) + .Inputs() + .Add() + .Output() + .Stage(input.Output().Stage()) + .Index(input.Output().Index()) + .Build() + .Build() + .Build() + .Program() + .Args({"stream"}) + .Body() + .Input("stream") + .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, pos)) + .Done(); + } + + return Build(ctx, pos) + .Output() + .Stage(mappedInput) + .Index().Value("0").Build() + .Build() + .KeyColumns() + .Add(keysDescription.GetKeysList(ctx, pos)) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto& aggregateHandlers = aggregate.Handlers(); + + const auto mergeState1Arg = Build(ctx, pos).Name("state1").Done(); + const auto mergeState2Arg = Build(ctx, pos).Name("state2").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member1 = Build(ctx, pos) + .Struct(mergeState1Arg) + .Name().Build(columnName) + .Done(); + const auto member2 = Build(ctx, pos) + .Struct(mergeState2Arg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().MergeHandler()) + .With(0, member1) + .With(1, member2) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({mergeState1Arg, mergeState2Arg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildFinishHopLambda( + const TCoAggregate& aggregate, + const TVector& actualGroupKeys, + const TString& hoppingColumn, + TExprContext& ctx) +{ + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto finishKeyArg = Build(ctx, pos).Name("key").Done(); + const auto finishStateArg = Build(ctx, pos).Name("state").Done(); + const auto finishTimeArg = Build(ctx, pos).Name("time").Done(); + + TVector structItems; + structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); + + if (actualGroupKeys.size() == 1) { + structItems.push_back(Build(ctx, pos) + .Name().Build(actualGroupKeys[0]) + .Value(finishKeyArg) + .Done()); + } else { + for (size_t i = 0; i < actualGroupKeys.size(); ++i) { + structItems.push_back(Build(ctx, pos) + .Name().Build(actualGroupKeys[i]) + .Value() + .Tuple(finishKeyArg) + .Index() + .Value(ToString(i)) + .Build() + .Build() + .Done()); + } + } + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString compoundColumnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(finishStateArg) + .Name().Build(compoundColumnName) + .Done(); + + if (tuple.ColumnName().Maybe()) { + structItems.push_back(Build(ctx, pos) + .Name().Build(compoundColumnName) + .Value() + .Apply(tuple.Trait().Cast().FinishHandler()) + .With(0, member) + .Build() + .Done()); + + continue; + } + + if (const auto namesList = tuple.ColumnName().Maybe()) { + const auto expApplier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().FinishHandler()) + .With(0, member) + .Done(); + + int index = 0; + for (const auto columnName : namesList.Cast()) { + const auto extracter = Build(ctx, pos) + .Tuple(expApplier) + .Index().Build(index++) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name(columnName) + .Value(extracter) + .Done()); + } + + continue; + } + + YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " + << tuple.ColumnName().Ptr()->Dump()); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(hoppingColumn) + .Value(finishTimeArg) + .Done()); + + return Build(ctx, pos) + .Args({finishKeyArg, finishStateArg, finishTimeArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto saveStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(saveStateArg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().SaveHandler()) + .With(0, member) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({saveStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + TCoArgument loadStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(loadStateArg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().LoadHandler()) + .With(0, member) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({loadStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TMaybe BuildWatermarkMode( + const TCoAggregate& aggregate, + const TCoHoppingTraits& hoppingTraits, + TExprContext& ctx, + bool analyticsMode, + bool defaultWatermarksMode, + bool syncActor) +{ + const bool enableWatermarks = !analyticsMode && + defaultWatermarksMode && + hoppingTraits.Version().Cast().StringValue() == "v2"; + if (enableWatermarks && syncActor) { + ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor")); + return Nothing(); + } + + if (hoppingTraits.Version().Cast().StringValue() == "v2" && !enableWatermarks) { + ctx.AddError(TIssue( + ctx.GetPosition(aggregate.Pos()), + "HoppingWindow requires watermarks to be enabled. If you don't want to do that, you can use HOP instead.")); + return Nothing(); + } + + return enableWatermarks; +} + +TMaybeNode RewriteAsHoppingWindow( + const TExprBase node, + TExprContext& ctx, + const TDqConnection& input, + bool analyticsMode, + TDuration lateArrivalDelay, + bool defaultWatermarksMode, + bool syncActor) { + const auto aggregate = node.Cast(); + const auto pos = aggregate.Pos(); + + YQL_CLOG(DEBUG, ProviderDq) << "OptimizeStreamingAggregate"; + + EnsureNotDistinct(aggregate); + + const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx, analyticsMode); + if (!maybeHopTraits) { + return nullptr; + } + const auto hopTraits = *maybeHopTraits; + + const auto aggregateInputType = GetSeqItemType(*node.Ptr()->Head().GetTypeAnn()).Cast(); + TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + + if (keysDescription.NeedPickle()) { + return Build(ctx, pos) + .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) + .Input() + .InitFrom(aggregate) + .Input() + .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) + .Input(input) + .Build() + .Build() + .Done(); + } + + const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); + const auto timeExtractorLambda = BuildTimeExtractor(hopTraits.Traits, ctx); + const auto initLambda = BuildInitHopLambda(aggregate, ctx); + const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); + const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); + const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); + const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); + const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); + const auto enableWatermarks = BuildWatermarkMode(aggregate, hopTraits.Traits, ctx, analyticsMode, defaultWatermarksMode, syncActor); + if (!enableWatermarks) { + return nullptr; + } + + const auto streamArg = Build(ctx, pos).Name("stream").Done(); + auto multiHoppingCoreBuilder = Build(ctx, pos) + .KeyExtractor(keyLambda) + .TimeExtractor(timeExtractorLambda) + .Hop(hopTraits.Traits.Hop()) + .Interval(hopTraits.Traits.Interval()) + .DataWatermarks(hopTraits.Traits.DataWatermarks()) + .InitHandler(initLambda) + .UpdateHandler(updateLambda) + .MergeHandler(mergeLambda) + .FinishHandler(finishLambda) + .SaveHandler(saveLambda) + .LoadHandler(loadLambda) + .template WatermarkMode().Build(ToString(*enableWatermarks)); + + if (*enableWatermarks) { + const auto hop = TDuration::MicroSeconds(hopTraits.Hop); + multiHoppingCoreBuilder.template Delay() + .Literal().Build(ToString(Max(hop, lateArrivalDelay).MicroSeconds())) + .Build(); + } else { + multiHoppingCoreBuilder.Delay(hopTraits.Traits.Delay()); + } + + if (analyticsMode) { + return Build(ctx, node.Pos()) + .Input(input.Ptr()) + .KeySelectorLambda(keyLambda) + .SortDirections() + .Literal() + .Value("true") + .Build() + .Build() + .SortKeySelectorLambda(timeExtractorLambda) + .ListHandlerLambda() + .Args(streamArg) + .template Body() + .Stream(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) + .Build() + .Build() + .Done(); + } else { + auto wrappedInput = input.Ptr(); + if (!keysDescription.MemberKeys.empty()) { + // Shuffle input connection by keys + wrappedInput = WrapToShuffle(keysDescription, aggregate, input, ctx); + if (!wrappedInput) { + return nullptr; + } + } + + const auto stage = Build(ctx, node.Pos()) + .Inputs() + .Add(wrappedInput) + .Build() + .Program() + .Args(streamArg) + .Body() + .Input(multiHoppingCoreBuilder + .template Input() + .Input(streamArg) + .Build() + .Done()) + .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Done(); + + return Build(ctx, node.Pos()) + .Output() + .Stage(stage) + .Index().Build(0) + .Build() + .Done(); + } +} + + +} // NYql::NDq::NHopping diff --git a/ydb/library/yql/dq/opt/dq_opt_hopping.h b/ydb/library/yql/dq/opt/dq_opt_hopping.h new file mode 100644 index 000000000000..7d690f6ab2fa --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_hopping.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +#include + +namespace NYql::NDq::NHopping { + +NNodes::TMaybeNode RewriteAsHoppingWindow( + const NNodes::TExprBase node, + TExprContext& ctx, + const NNodes::TDqConnection& input, + bool analyticsHopping, + TDuration lateArrivalDelay, + bool defaultWatermarksMode, + bool syncActor); + +} // namespace NYql::NDq::NHopping diff --git a/ydb/library/yql/dq/opt/ya.make b/ydb/library/yql/dq/opt/ya.make index 15c6c03dafa4..b1c6a87b94c8 100644 --- a/ydb/library/yql/dq/opt/ya.make +++ b/ydb/library/yql/dq/opt/ya.make @@ -15,6 +15,7 @@ SRCS( dq_opt.cpp dq_opt_build.cpp dq_opt_join.cpp + dq_opt_hopping.cpp dq_opt_log.cpp dq_opt_peephole.cpp dq_opt_phy_finalizing.cpp diff --git a/ydb/library/yql/providers/dq/common/ya.make b/ydb/library/yql/providers/dq/common/ya.make index 82704ed75da4..b5a953a629ca 100644 --- a/ydb/library/yql/providers/dq/common/ya.make +++ b/ydb/library/yql/providers/dq/common/ya.make @@ -8,6 +8,7 @@ PEERDIR( ydb/library/yql/utils/log ydb/library/yql/dq/actors ydb/library/yql/dq/proto + ydb/library/yql/dq/integration ) GENERATE_ENUM_SERIALIZATION(yql_dq_settings.h) diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index b294d57a6180..797e52937106 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -23,33 +24,6 @@ using namespace NYql; using namespace NYql::NDq; using namespace NYql::NNodes; -namespace { - - -TString BuildColumnName(const TExprBase column) { - if (const auto columnName = column.Maybe()) { - return columnName.Cast().StringValue(); - } - - if (const auto columnNames = column.Maybe()) { - TStringBuilder columnNameBuilder; - for (const auto columnName : columnNames.Cast()) { - columnNameBuilder.append(columnName.StringValue()); - columnNameBuilder.append("_"); - } - return columnNameBuilder; - } - - YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " - << column.Ptr()->Dump()); -} - -bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { - return !hoppingSetting->Child(1)->IsList(); -} - -} - class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { public: TDqsLogicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) @@ -336,7 +310,13 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); if (input) { if (hopSetting) { - return RewriteAsHoppingWindow(node, ctx, input.Cast()); + bool analyticsHopping = Config->AnalyticsHopping.Get().GetOrElse(false); + const auto lateArrivalDelay = TDuration::MilliSeconds(Config->WatermarksLateArrivalDelayMs + .Get() + .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs)); + bool defaultWatermarksMode = Config->WatermarksMode.Get() == "default"; + bool syncActor = Config->ComputeActorType.Get() != "async"; + return NHopping::RewriteAsHoppingWindow(node, ctx, input.Cast(), analyticsHopping, lateArrivalDelay, defaultWatermarksMode, syncActor); } else { return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false)); } @@ -508,382 +488,6 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { } private: - TMaybeNode RewriteAsHoppingWindow(const TExprBase node, TExprContext& ctx, const TDqConnection& input) { - const auto aggregate = node.Cast(); - const auto pos = aggregate.Pos(); - - YQL_CLOG(DEBUG, ProviderDq) << "OptimizeStreamingAggregate"; - - EnsureNotDistinct(aggregate); - - const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx); - if (!maybeHopTraits) { - return nullptr; - } - const auto hopTraits = *maybeHopTraits; - - const auto aggregateInputType = GetSeqItemType(*node.Ptr()->Head().GetTypeAnn()).Cast(); - TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); - - if (keysDescription.NeedPickle()) { - return Build(ctx, pos) - .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) - .Input() - .InitFrom(aggregate) - .Input() - .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) - .Input(input) - .Build() - .Build() - .Done(); - } - - const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); - const auto timeExtractorLambda = BuildTimeExtractor(hopTraits.Traits, ctx); - const auto initLambda = BuildInitHopLambda(aggregate, ctx); - const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); - const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); - const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); - const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); - const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); - const auto enableWatermarks = BuildWatermarkMode(aggregate, hopTraits.Traits, ctx); - if (!enableWatermarks) { - return nullptr; - } - - const auto streamArg = Build(ctx, pos).Name("stream").Done(); - auto multiHoppingCoreBuilder = Build(ctx, pos) - .KeyExtractor(keyLambda) - .TimeExtractor(timeExtractorLambda) - .Hop(hopTraits.Traits.Hop()) - .Interval(hopTraits.Traits.Interval()) - .DataWatermarks(hopTraits.Traits.DataWatermarks()) - .InitHandler(initLambda) - .UpdateHandler(updateLambda) - .MergeHandler(mergeLambda) - .FinishHandler(finishLambda) - .SaveHandler(saveLambda) - .LoadHandler(loadLambda) - .WatermarkMode().Build(ToString(*enableWatermarks)); - - if (*enableWatermarks) { - const auto hop = TDuration::MicroSeconds(hopTraits.Hop); - const auto lateArrivalDelay = TDuration::MilliSeconds(Config->WatermarksLateArrivalDelayMs - .Get() - .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs)); - - multiHoppingCoreBuilder.Delay() - .Literal().Build(ToString(Max(hop, lateArrivalDelay).MicroSeconds())) - .Build(); - } else { - multiHoppingCoreBuilder.Delay(hopTraits.Traits.Delay()); - } - - if (Config->AnalyticsHopping.Get().GetOrElse(false)) { - return Build(ctx, node.Pos()) - .Input(input.Ptr()) - .KeySelectorLambda(keyLambda) - .SortDirections() - .Literal() - .Value("true") - .Build() - .Build() - .SortKeySelectorLambda(timeExtractorLambda) - .ListHandlerLambda() - .Args(streamArg) - .Body() - .Stream(multiHoppingCoreBuilder - .Input() - .List(streamArg) - .Build() - .Done()) - .Build() - .Build() - .Done(); - } else { - auto wrappedInput = input.Ptr(); - if (!keysDescription.MemberKeys.empty()) { - // Shuffle input connection by keys - wrappedInput = WrapToShuffle(keysDescription, aggregate, input, ctx); - if (!wrappedInput) { - return nullptr; - } - } - - const auto stage = Build(ctx, node.Pos()) - .Inputs() - .Add(wrappedInput) - .Build() - .Program() - .Args(streamArg) - .Body() - .Input(multiHoppingCoreBuilder - .Input() - .Input(streamArg) - .Build() - .Done()) - .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) - .Build() - .Build() - .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) - .Done(); - - return Build(ctx, node.Pos()) - .Output() - .Stage(stage) - .Index().Build(0) - .Build() - .Done(); - } - } - - struct THoppingTraits { - TString Column; - TCoHoppingTraits Traits; - ui64 Hop; - ui64 Interval; - ui64 Delay; - }; - - TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - - const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); - if (!hopSetting) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); - return Nothing(); - } - - const auto hoppingColumn = IsLegacyHopping(hopSetting) - ? "_yql_time" - : TString(hopSetting->Child(1)->Child(0)->Content()); - - const auto traitsNode = IsLegacyHopping(hopSetting) - ? hopSetting->Child(1) - : hopSetting->Child(1)->Child(1); - - const auto maybeTraits = TMaybeNode(traitsNode); - if (!maybeTraits) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); - return Nothing(); - } - - const auto traits = maybeTraits.Cast(); - - const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { - if (param.Maybe()) { - param = param.Cast().Input(); - } - if (!param.Maybe()) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); - return 0; - } - auto value = FromString(param.Cast().Literal().Value()); - if (value <= 0) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); - return 0; - } - return (ui64)value; - }; - - const auto hop = checkIntervalParam(traits.Hop()); - if (!hop) { - return Nothing(); - } - const auto interval = checkIntervalParam(traits.Interval()); - if (!interval) { - return Nothing(); - } - const auto delay = checkIntervalParam(traits.Delay()); - if (!delay) { - return Nothing(); - } - - if (interval < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); - return Nothing(); - } - if (delay < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); - return Nothing(); - } - - const auto newTraits = Build(ctx, aggregate.Pos()) - .InitFrom(traits) - .DataWatermarks(Config->AnalyticsHopping.Get().GetOrElse(false) - ? ctx.NewAtom(aggregate.Pos(), "false") - : traits.DataWatermarks().Ptr()) - .Done(); - - return THoppingTraits { - hoppingColumn, - newTraits, - hop, - interval, - delay - }; - } - - struct TKeysDescription { - TVector PickleKeys; - TVector MemberKeys; - TVector FakeKeys; - - explicit TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { - for (const auto& key : keys) { - if (key.StringValue() == hoppingColumn) { - FakeKeys.emplace_back(key.StringValue()); - continue; - } - - const auto index = rowType.FindItem(key.StringValue()); - Y_ENSURE(index); - - auto itemType = rowType.GetItems()[*index]->GetItemType(); - if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { - MemberKeys.emplace_back(key.StringValue()); - continue; - } - - PickleKeys.emplace_back(key.StringValue()); - } - } - - TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); - - TExprBase body = arg; - - for (const auto& key : PickleKeys) { - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); - - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "StablePickle", { member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); - - TExprBase body = arg; - - for (const auto& key : PickleKeys) { - const auto index = rowType.FindItem(key); - Y_ENSURE(index); - - auto itemType = rowType.GetItems().at(*index)->GetItemType(); - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); - - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) - .Done() - .Ptr(); - } - - TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const { - TVector res; - res.reserve(PickleKeys.size() + MemberKeys.size()); - - for (const auto& pickleKey : PickleKeys) { - res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); - } - for (const auto& memberKey : MemberKeys) { - res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); - } - return res; - } - - TVector GetActualGroupKeys() { - TVector result; - result.reserve(PickleKeys.size() + MemberKeys.size()); - result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); - result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); - return result; - } - - bool NeedPickle() const { - return !PickleKeys.empty(); - } - - TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { - auto builder = Build(ctx, pos); - for (auto key : GetKeysList(ctx, pos)) { - builder.Add(std::move(key)); - } - return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); - } - }; - - TExprNode::TPtr WrapToShuffle( - const TKeysDescription& keysDescription, - const TCoAggregate& aggregate, - const TDqConnection& input, - TExprContext& ctx) - { - auto pos = aggregate.Pos(); - - TDqStageBase mappedInput = input.Output().Stage(); - if (keysDescription.NeedPickle()) { - mappedInput = Build(ctx, pos) - .Inputs() - .Add() - .Output() - .Stage(input.Output().Stage()) - .Index(input.Output().Index()) - .Build() - .Build() - .Build() - .Program() - .Args({"stream"}) - .Body() - .Input("stream") - .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) - .Build() - .Build() - .Settings(TDqStageSettings().BuildNode(ctx, pos)) - .Done(); - } - - return Build(ctx, pos) - .Output() - .Stage(mappedInput) - .Index().Value("0").Build() - .Build() - .KeyColumns() - .Add(keysDescription.GetKeysList(ctx, pos)) - .Build() - .Done() - .Ptr(); - } void EnsureNotDistinct(const TCoAggregate& aggregate) { const auto& aggregateHandlers = aggregate.Handlers(); @@ -893,358 +497,14 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { "Distinct is not supported for aggregation with hop"); } - TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { - const auto pos = hoppingTraits.Pos(); - - if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetSize() == 0) { - // The case when no fields are used in lambda. F.e. when it has only DependsOn. - return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref()); - } - - return Build(ctx, pos) - .Args({"item"}) - .Body() - .Apply(hoppingTraits.TimeExtractor()) - .With(0) - .Type(hoppingTraits.ItemType()) - .Value("item") - .Build() - .Build() - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto& aggregateHandlers = aggregate.Handlers(); - - const auto initItemArg = Build(ctx, pos).Name("item").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - ui32 index = 0; - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - - TMaybeNode applier; - if (tuple.Trait().Cast().InitHandler().Args().Size() == 1) { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().InitHandler()) - .With(0, initItemArg) - .Done(); - } else { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().InitHandler()) - .With(0, initItemArg) - .With(1) - .Literal().Build(ToString(index)) - .Build() - .Done(); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(BuildColumnName(tuple.ColumnName())) - .Value(applier) - .Done()); - ++index; - } - - return Build(ctx, pos) - .Args({initItemArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto updateItemArg = Build(ctx, pos).Name("item").Done(); - const auto updateStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - i32 index = 0; - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(updateStateArg) - .Name().Build(columnName) - .Done(); - - TMaybeNode applier; - if (tuple.Trait().Cast().UpdateHandler().Args().Size() == 2) { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().UpdateHandler()) - .With(0, updateItemArg) - .With(1, member) - .Done(); - } else { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().UpdateHandler()) - .With(0, updateItemArg) - .With(1, member) - .With(2) - .Literal().Build(ToString(index)) - .Build() - .Done(); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value(applier) - .Done()); - ++index; - } - - return Build(ctx, pos) - .Args({updateItemArg, updateStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto& aggregateHandlers = aggregate.Handlers(); - - const auto mergeState1Arg = Build(ctx, pos).Name("state1").Done(); - const auto mergeState2Arg = Build(ctx, pos).Name("state2").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member1 = Build(ctx, pos) - .Struct(mergeState1Arg) - .Name().Build(columnName) - .Done(); - const auto member2 = Build(ctx, pos) - .Struct(mergeState2Arg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().MergeHandler()) - .With(0, member1) - .With(1, member2) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({mergeState1Arg, mergeState2Arg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildFinishHopLambda( - const TCoAggregate& aggregate, - const TVector& actualGroupKeys, - const TString& hoppingColumn, - TExprContext& ctx) - { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto finishKeyArg = Build(ctx, pos).Name("key").Done(); - const auto finishStateArg = Build(ctx, pos).Name("state").Done(); - const auto finishTimeArg = Build(ctx, pos).Name("time").Done(); - - TVector structItems; - structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); - - if (actualGroupKeys.size() == 1) { - structItems.push_back(Build(ctx, pos) - .Name().Build(actualGroupKeys[0]) - .Value(finishKeyArg) - .Done()); - } else { - for (size_t i = 0; i < actualGroupKeys.size(); ++i) { - structItems.push_back(Build(ctx, pos) - .Name().Build(actualGroupKeys[i]) - .Value() - .Tuple(finishKeyArg) - .Index() - .Value(ToString(i)) - .Build() - .Build() - .Done()); - } - } - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString compoundColumnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(finishStateArg) - .Name().Build(compoundColumnName) - .Done(); - - if (tuple.ColumnName().Maybe()) { - structItems.push_back(Build(ctx, pos) - .Name().Build(compoundColumnName) - .Value() - .Apply(tuple.Trait().Cast().FinishHandler()) - .With(0, member) - .Build() - .Done()); - - continue; - } - - if (const auto namesList = tuple.ColumnName().Maybe()) { - const auto expApplier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().FinishHandler()) - .With(0, member) - .Done(); - - int index = 0; - for (const auto columnName : namesList.Cast()) { - const auto extracter = Build(ctx, pos) - .Tuple(expApplier) - .Index().Build(index++) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name(columnName) - .Value(extracter) - .Done()); - } - - continue; - } - - YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " - << tuple.ColumnName().Ptr()->Dump()); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(hoppingColumn) - .Value(finishTimeArg) - .Done()); - - return Build(ctx, pos) - .Args({finishKeyArg, finishStateArg, finishTimeArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto saveStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(saveStateArg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().SaveHandler()) - .With(0, member) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({saveStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - TCoArgument loadStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(loadStateArg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().LoadHandler()) - .With(0, member) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({loadStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); - } - - TMaybe BuildWatermarkMode( - const TCoAggregate& aggregate, - const TCoHoppingTraits& hoppingTraits, - TExprContext& ctx) - { - const auto analyticsMode = Config->AnalyticsHopping.Get().GetOrElse(false); - const bool enableWatermarks = !analyticsMode && - Config->WatermarksMode.Get() == "default" && - hoppingTraits.Version().Cast().StringValue() == "v2"; - if (enableWatermarks && Config->ComputeActorType.Get() != "async") { - ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor")); - return Nothing(); - } - - if (hoppingTraits.Version().Cast().StringValue() == "v2" && !enableWatermarks) { - ctx.AddError(TIssue( - ctx.GetPosition(aggregate.Pos()), - "HoppingWindow requires watermarks to be enabled. If you don't want to do that, you can use HOP instead.")); - return Nothing(); + IDqOptimization* GetDqOptCallback(const TExprBase& providerRead) const { + if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) { + auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content(); + auto datasource = TypesCtx.DataSourceMap.FindPtr(dataSourceName); + YQL_ENSURE(datasource); + return (*datasource)->GetDqOptimization(); } - - return enableWatermarks; + return nullptr; } IDqOptimization* GetDqOptCallback(const TExprBase& providerRead) const { diff --git a/ydb/tests/fq/s3/test_s3.py b/ydb/tests/fq/s3/test_s3.py index e9b555ee0fbb..5db3f15b7a97 100644 --- a/ydb/tests/fq/s3/test_s3.py +++ b/ydb/tests/fq/s3/test_s3.py @@ -83,6 +83,56 @@ def test_csv(self, kikimr, s3, client, runtime_listing, yq_version): assert result_set.rows[2].items[2].int32_value == 33 assert sum(kikimr.control_plane.get_metering()) == 10 + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_csv_with_hopping(self, kikimr, s3, client): + resource = boto3.resource( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + fruits = R'''Time,Fruit,Price +0,Banana,3 +1,Apple,2 +2,Pear,15''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + client.create_storage_connection("fruitbucket", "fbucket") + + sql = R''' + SELECT COUNT(*) as count, + FROM fruitbucket.`fruits.csv` + WITH (format=csv_with_names, SCHEMA ( + Time UInt64 NOT NULL, + Fruit String NOT NULL, + Price Int NOT NULL + )) + GROUP BY HOP(CAST(Time AS Timestamp?), "PT1M", "PT1M", "PT1M") + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].uint64_value == 3 + @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) @pytest.mark.parametrize("runtime_listing", [False, True])