Skip to content

Commit

Permalink
Merge f0f7d4b into 71d6ac0
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Aug 6, 2024
2 parents 71d6ac0 + f0f7d4b commit 6fe0478
Show file tree
Hide file tree
Showing 23 changed files with 12,796 additions and 8,180 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));
FederatedQuerySetup, UserToken, QueryServiceConfig, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>

#include <library/cpp/protobuf/util/pb_io.h>
#include <ydb/core/protos/config.pb.h>

namespace NKikimr {
namespace NKqp {
Expand All @@ -28,7 +29,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
IModuleResolver::TPtr moduleResolver;
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));

auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, false, false, nullptr, actorSystem);
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, NKikimrConfig::TQueryServiceConfig(), nullptr, false, false, nullptr, actorSystem);
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
result.Issues().PrintTo(Cerr);
UNIT_ASSERT(result.Success());
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,9 +955,15 @@ class TKqpHost : public IKqpHost {
TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database,
TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
<<<<<<< HEAD
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr)
=======
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
>>>>>>> 18b7d766e7... YDB-2568 Enable match_recognize in ydb (#6807)
: Gateway(gateway)
, Cluster(cluster)
, ExprCtx(new TExprContext())
Expand All @@ -972,6 +978,7 @@ class TKqpHost : public IKqpHost {
, FakeWorld(ExprCtx->NewWorld(TPosition()))
, ExecuteCtx(MakeIntrusive<TExecuteContext>())
, ActorSystem(actorSystem ? actorSystem : NActors::TActivationContext::ActorSystem())
, QueryServiceConfig(queryServiceConfig)
{
if (funcRegistry) {
FuncRegistry = funcRegistry;
Expand Down Expand Up @@ -1605,10 +1612,21 @@ class TKqpHost : public IKqpHost {
|| settingName == "Warning"
|| settingName == "UseBlocks"
|| settingName == "BlockEngine"
<<<<<<< HEAD
=======
|| settingName == "FilterPushdownOverJoinOptionalSide"
|| settingName == "DisableFilterPushdownOverJoinOptionalSide"
|| settingName == "RotateJoinTree"
|| settingName == "TimeOrderRecoverDelay"
|| settingName == "TimeOrderRecoverAhead"
|| settingName == "TimeOrderRecoverRowLimit"
|| settingName == "MatchRecognizeStream"
>>>>>>> 18b7d766e7... YDB-2568 Enable match_recognize in ydb (#6807)
;
};
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();

YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));

Expand Down Expand Up @@ -1701,6 +1719,7 @@ class TKqpHost : public IKqpHost {

TKqpTempTablesState::TConstPtr TempTablesState;
NActors::TActorSystem* ActorSystem = nullptr;
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
};

} // namespace
Expand All @@ -1721,6 +1740,7 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem)
{
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class IKqpHost : public TThrRefBase {

TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/);

Expand Down
11 changes: 10 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>

#include <ydb/library/yql/core/yql_opt_match_recognize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
Expand Down Expand Up @@ -54,7 +55,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<false>));
AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend));
AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend));

AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize));
AddHandler(1, &TCoFlatMap::Match, HNDL(LatePushExtractedPredicateToReadTable));
AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead));
AddHandler(1, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead));
Expand Down Expand Up @@ -255,6 +256,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
auto output = ExpandMatchRecognize(node.Ptr(), ctx, TypesCtx);
if (output) {
DumpAppliedRule("MatchRecognize", node.Ptr(), output, ctx);
}
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> ApplyExtractMembersToReadTable(TExprBase node, TExprContext& ctx,
const TGetParents& getParents)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), QueryServiceConfig, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);

auto& queryRequest = QueryState->RequestEv;
QueryState->ProxyRequestId = proxyRequestId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
federatedQuerySetup, nullptr, NKikimrConfig::TQueryServiceConfig(), funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
}

NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {
Expand Down
87 changes: 87 additions & 0 deletions ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,93 @@ Y_UNIT_TEST_SUITE(KqpPragma) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
}

Y_UNIT_TEST(MatchRecognizeWithTimeOrderRecoverer) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
settings.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());

auto result = client.ExecuteYqlScript(R"(
PRAGMA FeatureR010="prototype";
CREATE TABLE `/Root/NewTable` (
dt Uint64,
value String,
PRIMARY KEY (dt)
);
COMMIT;
INSERT INTO `/Root/NewTable` (dt, value) VALUES
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
COMMIT;
SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
MATCH_RECOGNIZE(
ORDER BY CAST(dt as Timestamp)
MEASURES
LAST(V1.dt) as v1,
LAST(V4.dt) as v4
ONE ROW PER MATCH
PATTERN (V1 V* V4)
DEFINE
V1 as V1.value = "value1",
V as True,
V4 as V4.value = "value4"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[[1u];[4u]];
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(MatchRecognizeWithoutTimeOrderRecoverer) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
settings.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());

auto result = client.ExecuteYqlScript(R"(
PRAGMA FeatureR010="prototype";
PRAGMA config.flags("MatchRecognizeStream", "disable");
CREATE TABLE `/Root/NewTable` (
dt Uint64,
value String,
PRIMARY KEY (dt)
);
COMMIT;
INSERT INTO `/Root/NewTable` (dt, value) VALUES
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
COMMIT;
SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
MATCH_RECOGNIZE(
ORDER BY CAST(dt as Timestamp)
MEASURES
LAST(V1.dt) as v1,
LAST(V4.dt) as v4
ONE ROW PER MATCH
PATTERN (V1 V* V4)
DEFINE
V1 as V1.value = "value1",
V as True,
V4 as V4.value = "value4"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[[1u];[4u]];
])", FormatResultSetYson(result.GetResultSet(0)));
}
}

} // namspace NKqp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ message TQueryServiceConfig {
optional NYql.TGenericGatewayConfig Generic = 11;
optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12;
optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled
optional bool EnableMatchRecognize = 20 [default = false];
}

// Config describes immediate controls and allows
Expand Down
42 changes: 10 additions & 32 deletions ydb/library/yql/core/yql_opt_match_recognize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
TExprNode::TPtr result;
if (isStreaming) {
YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams");
YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE");
const auto reordered = ctx.Builder(pos)
.Lambda()
.Param("partition")
Expand Down Expand Up @@ -216,37 +216,15 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
.Seal()
.Build();
} else { //non-streaming
if (partitionColumns->ChildrenSize() != 0) {
result = ctx.Builder(pos)
.Callable("PartitionsByKeys")
.Add(0, input)
.Add(1, partitionKeySelector)
.Add(2, sortOrder)
.Add(3, sortKey)
.Add(4, matchRecognize)
.Seal()
.Build();
} else {
if (sortOrder->IsCallable("Void")) {
result = ctx.Builder(pos)
.Apply(matchRecognize)
.With(0, input)
.Seal()
.Build();;
} else {
result = ctx.Builder(pos)
.Apply(matchRecognize)
.With(0)
.Callable("Sort")
.Add(0, input)
.Add(1, sortOrder)
.Add(2, sortKey)
.Seal()
.Done()
.Seal()
.Build();
}
}
result = ctx.Builder(pos)
.Callable("PartitionsByKeys")
.Add(0, input)
.Add(1, partitionKeySelector)
.Add(2, sortOrder)
.Add(3, sortKey)
.Add(4, matchRecognize)
.Seal()
.Build();
}
YQL_CLOG(INFO, Core) << "Expanded MatchRecognize";
return result;
Expand Down
Loading

0 comments on commit 6fe0478

Please sign in to comment.