Skip to content

Commit

Permalink
YQ-2735 Hopping on analytic (v2) (ydb-platform#1029)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored and EgorkaZ committed Apr 8, 2024
1 parent 35c7dad commit aeb799f
Show file tree
Hide file tree
Showing 8 changed files with 1,002 additions and 757 deletions.
25 changes: 23 additions & 2 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
#include <ydb/library/yql/dq/opt/dq_opt_hopping.h>
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

Expand Down Expand Up @@ -111,8 +112,28 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
}

TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) {
TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Ptr(), ctx);
TMaybeNode<TExprBase> output;
auto aggregate = node.Cast<TCoAggregateBase>();
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
if (hopSetting) {
auto input = aggregate.Input().Maybe<TDqConnection>();
if (!input) {
return node;
}
output = NHopping::RewriteAsHoppingWindow(
node,
ctx,
input.Cast(),
false, // analyticsHopping
TDuration::MilliSeconds(TDqSettings::TDefault::WatermarksLateArrivalDelayMs),
true, // defaultWatermarksMode
true); // syncActor
} else {
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
}
if (output) {
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

Expand Down
101 changes: 101 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_agg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,107 @@ Y_UNIT_TEST_SUITE(KqpAgg) {
[["Value3"];[1]]
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(AggWithHop) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
SELECT
Text,
CAST(COUNT(*) as Int32) as Count,
SUM(Data)
FROM EightShard
GROUP BY HOP(CAST(Key AS Timestamp?), "PT1M", "PT1M", "PT1M"), Text
ORDER BY Text;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[["Value1"];[8];[15]];
[["Value2"];[8];[16]];
[["Value3"];[8];[17]]
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(GroupByLimit) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `TestTable` (
a Uint64,
b Uint64,
c Uint64,
d Uint64,
e Uint64,
PRIMARY KEY (a, b, c)
);
)").GetValueSync());

AssertSuccessResult(session.ExecuteDataQuery(R"(
REPLACE INTO `TestTable` (a, b, c, d, e) VALUES
(1, 11, 21, 31, 41),
(2, 12, 22, 32, 42),
(3, 13, 23, 33, 43);
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync());


{ // query with 36 groups and limit 32
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
PRAGMA GroupByLimit = '32';
SELECT a, b, c, d, SUM(e) Data FROM TestTable
GROUP BY ROLLUP(a, b, c, d, a * b AS ab, b * c AS bc, c * d AS cd, a + b AS sum)
ORDER BY a, b, c, d;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
}

{ // query with 36 groups (without explicit limit)
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
SELECT a, b, c, d, SUM(e) Data FROM TestTable
GROUP BY ROLLUP(a, b, c, d, a * b AS ab, b * c AS bc, c * d AS cd, a + b AS sum)
ORDER BY a, b, c, d;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[#;#;#;#;[126u]];
[[1u];#;#;#;[41u]];
[[1u];[11u];#;#;[41u]];
[[1u];[11u];[21u];#;[41u]];
[[1u];[11u];[21u];[31u];[41u]];
[[1u];[11u];[21u];[31u];[41u]];
[[1u];[11u];[21u];[31u];[41u]];
[[1u];[11u];[21u];[31u];[41u]];
[[1u];[11u];[21u];[31u];[41u]];
[[2u];#;#;#;[42u]];
[[2u];[12u];#;#;[42u]];
[[2u];[12u];[22u];#;[42u]];
[[2u];[12u];[22u];[32u];[42u]];
[[2u];[12u];[22u];[32u];[42u]];
[[2u];[12u];[22u];[32u];[42u]];
[[2u];[12u];[22u];[32u];[42u]];
[[2u];[12u];[22u];[32u];[42u]];
[[3u];#;#;#;[43u]];
[[3u];[13u];#;#;[43u]];
[[3u];[13u];[23u];#;[43u]];
[[3u];[13u];[23u];[33u];[43u]];
[[3u];[13u];[23u];[33u];[43u]];
[[3u];[13u];[23u];[33u];[43u]];
[[3u];[13u];[23u];[33u];[43u]];
[[3u];[13u];[23u];[33u];[43u]]
])", FormatResultSetYson(result.GetResultSet(0)));
}
}
}

} // namespace NKikimr::NKqp
Loading

0 comments on commit aeb799f

Please sign in to comment.