Skip to content

Commit

Permalink
YDB-2568 Enable match_recognize in ydb / q-stable-ydb-24-2 (ydb-platf…
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik committed Sep 12, 2024
1 parent bd7afda commit 63dbb3b
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 114 deletions.
23 changes: 23 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,29 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

auto prepareSettings = PrepareCompilationSettings(ctx);

Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, QueryServiceConfig, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
prepareSettings.IsInternalCall = QueryId.Settings.IsInternalCall;

switch (QueryId.Settings.Syntax) {
case Ydb::Query::Syntax::SYNTAX_YQL_V1:
prepareSettings.UsePgParser = false;
prepareSettings.SyntaxVersion = 1;
break;

case Ydb::Query::Syntax::SYNTAX_PG:
prepareSettings.UsePgParser = true;
break;

default:
break;
}

NCpuTime::TCpuTimer timer(CompileCpuTime);

switch (QueryId.Settings.QueryType) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
IModuleResolver::TPtr moduleResolver;
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));

auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), Nothing(), nullptr, nullptr, false, false, nullptr, actorSystem, nullptr);
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, NKikimrConfig::TQueryServiceConfig(), nullptr, false, false, nullptr, actorSystem);
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
result.Issues().PrintTo(Cerr);
UNIT_ASSERT(result.Success());
Expand Down
25 changes: 12 additions & 13 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,10 @@ class TKqpHost : public IKqpHost {
TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, const TGUCSettings::TPtr& gUCSettings,
const TMaybe<TString>& applicationName, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr)
: Gateway(gateway)
, Cluster(cluster)
, GUCSettings(gUCSettings)
Expand Down Expand Up @@ -1824,9 +1825,6 @@ class TKqpHost : public IKqpHost {
|| settingName == "Warning"
|| settingName == "UseBlocks"
|| settingName == "BlockEngine"
|| settingName == "FilterPushdownOverJoinOptionalSide"
|| settingName == "DisableFilterPushdownOverJoinOptionalSide"
|| settingName == "RotateJoinTree"
|| settingName == "TimeOrderRecoverDelay"
|| settingName == "TimeOrderRecoverAhead"
|| settingName == "TimeOrderRecoverRowLimit"
Expand Down Expand Up @@ -1955,14 +1953,15 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats
}
}

TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, queryServiceConfig,
funcRegistry, keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem);
}

} // namespace NKqp
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ class IKqpHost : public TThrRefBase {

TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
NYql::TExprContext* ctx = nullptr);
Expand Down
73 changes: 0 additions & 73 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<false>));
AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend));
AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend));
AddHandler(0, &TCoUnorderedBase::Match, HNDL(UnorderedOverDqReadWrap));
AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqReadWrap));
AddHandler(0, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqReadWrap));
AddHandler(0, &TCoExtendBase::Match, HNDL(ExtendOverDqReadWrap));
AddHandler(0, &TCoNarrowMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoNarrowFlatMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoNarrowMultiMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoWideMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize));

AddHandler(1, &TCoFlatMap::Match, HNDL(LatePushExtractedPredicateToReadTable));
AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead));
AddHandler(1, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead));
Expand Down Expand Up @@ -273,46 +264,6 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
auto output = NDq::UnorderedOverDqReadWrap(node, ctx, getParents, true, TypesCtx);
if (output) {
DumpAppliedRule("UnorderedOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
auto output = NDq::ExtractMembersOverDqReadWrap(node, ctx, getParents, true, TypesCtx);
if (output) {
DumpAppliedRule("ExtractMembersOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx) {
auto output = NDq::TakeOrSkipOverDqReadWrap(node, ctx, TypesCtx);
if (output) {
DumpAppliedRule("TakeOrSkipOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx) {
auto output = NDq::ExtendOverDqReadWrap(node, ctx, TypesCtx);
if (output) {
DumpAppliedRule("ExtendOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> DqReadWideWrapFieldSubset(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
auto output = NDq::DqReadWideWrapFieldSubset(node, ctx, getParents, TypesCtx);
if (output) {
DumpAppliedRule("DqReadWideWrapFieldSubset", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
auto output = ExpandMatchRecognize(node.Ptr(), ctx, TypesCtx);
if (output) {
Expand All @@ -321,30 +272,6 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx) {
auto output = NDq::DqReadWrapByProvider(node, ctx, TypesCtx);
if (output) {
DumpAppliedRule("DqReadWrapByProvider", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> ExtractMembersOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
auto output = NDq::ExtractMembersOverDqReadWrapMultiUsage(node, ctx, optCtx, getParents, TypesCtx);
if (output) {
DumpAppliedRule("ExtractMembersOverDqReadWrapMultiUsage", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

TMaybeNode<TExprBase> UnorderedOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
auto output = NDq::UnorderedOverDqReadWrapMultiUsage(node, ctx, optCtx, getParents, TypesCtx);
if (output) {
DumpAppliedRule("UnorderedOverDqReadWrapMultiUsage", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> ApplyExtractMembersToReadTable(TExprBase node, TExprContext& ctx,
const TGetParents& getParents)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {

Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr);
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), QueryServiceConfig, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);

auto& queryRequest = QueryState->RequestEv;
QueryState->ProxyRequestId = proxyRequestId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem, nullptr);
federatedQuerySetup, nullptr, NKikimrConfig::TQueryServiceConfig(), funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
}

NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1892,9 +1892,9 @@
],
"test.test[match_recognize-test_type-default.txt-Debug]": [
{
"checksum": "648119cc488bae598a0936f9d2c82b7e",
"size": 3458,
"uri": "https://{canondata_backend}/1942173/c4d7dbc720e57397caf847cd2616b1362110ddd2/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched"
"checksum": "d5867efa618053b3a7c823ca3c65ac62",
"size": 3444,
"uri": "https://{canondata_backend}/1597364/713adc15e86f968fe8142e7d100829cc68d825bb/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-test_type-default.txt-Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1717,9 +1717,9 @@
],
"test.test[match_recognize-alerts-default.txt-Debug]": [
{
"checksum": "c8b1e13d6da573f8a1afd415db1d00e7",
"size": 5787,
"uri": "https://{canondata_backend}/1917492/86ab0de654a60bf1e3145a3d8e3d7eae4a9f26b8/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched"
"checksum": "36044ee7a7ae01d0b976600d0fb6112e",
"size": 5756,
"uri": "https://{canondata_backend}/212715/6b165614609e516560d0c88a6f8ecd84824ba506/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-alerts-default.txt-Plan]": [
Expand Down
10 changes: 5 additions & 5 deletions ydb/library/yql/tests/sql/dq_file/part19/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1788,21 +1788,21 @@
{
"checksum": "b4dd508a329723c74293d80f0278c705",
"size": 505,
"uri": "https://{canondata_backend}/1917492/ef839f70e5a2f493427f7f92ed00d26a993f6d4a/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Analyze_/plan.txt"
"uri": "https://{canondata_backend}/1597364/fc135efcabe2a4c94deee0dd810e591fa1b56eef/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Analyze_/plan.txt"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Debug]": [
{
"checksum": "17c5c1f84ac65b6a82234cd0b0a41a68",
"size": 5699,
"uri": "https://{canondata_backend}/1917492/ef839f70e5a2f493427f7f92ed00d26a993f6d4a/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Debug_/opt.yql_patched"
"checksum": "9583095b37a579b3d8f782ca1410d648",
"size": 5669,
"uri": "https://{canondata_backend}/1597364/fc135efcabe2a4c94deee0dd810e591fa1b56eef/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Plan]": [
{
"checksum": "b4dd508a329723c74293d80f0278c705",
"size": 505,
"uri": "https://{canondata_backend}/1917492/ef839f70e5a2f493427f7f92ed00d26a993f6d4a/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Plan_/plan.txt"
"uri": "https://{canondata_backend}/1597364/fc135efcabe2a4c94deee0dd810e591fa1b56eef/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Plan_/plan.txt"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Results]": [],
Expand Down
16 changes: 8 additions & 8 deletions ydb/library/yql/tests/sql/sql2yql/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -10781,9 +10781,9 @@
],
"test_sql2yql.test[match_recognize-alerts_without_order]": [
{
"checksum": "4a7d1c9ca704a076217e529b5489ad87",
"size": 8780,
"uri": "https://{canondata_backend}/1937001/f1ec239726ab3e2cf00695f3d10461ff9ef6c3b0/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql"
"checksum": "ef289fff70d333859534243df7451fab",
"size": 8759,
"uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql"
}
],
"test_sql2yql.test[match_recognize-permute]": [
Expand Down Expand Up @@ -10816,9 +10816,9 @@
],
"test_sql2yql.test[match_recognize-test_type]": [
{
"checksum": "0a5812e84f194b487eae4084027bd170",
"size": 10249,
"uri": "https://{canondata_backend}/1936842/c0fac16b134e7c8f865a197ac63738ced4fac271/resource.tar.gz#test_sql2yql.test_match_recognize-test_type_/sql.yql"
"checksum": "64d8e2edbef833724049eac26329ff12",
"size": 10144,
"uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql2yql.test_match_recognize-test_type_/sql.yql"
}
],
"test_sql2yql.test[match_recognize-test_type_predicate]": [
Expand Down Expand Up @@ -30047,7 +30047,7 @@
{
"checksum": "779c2c3a4eab619646509ce5008863e8",
"size": 2906,
"uri": "https://{canondata_backend}/1937001/f1ec239726ab3e2cf00695f3d10461ff9ef6c3b0/resource.tar.gz#test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql"
"uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql"
}
],
"test_sql_format.test[match_recognize-permute]": [
Expand Down Expand Up @@ -30082,7 +30082,7 @@
{
"checksum": "36104b385f3b9986c22f409931b80564",
"size": 1302,
"uri": "https://{canondata_backend}/1936842/c0fac16b134e7c8f865a197ac63738ced4fac271/resource.tar.gz#test_sql_format.test_match_recognize-test_type_/formatted.sql"
"uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql_format.test_match_recognize-test_type_/formatted.sql"
}
],
"test_sql_format.test[match_recognize-test_type_predicate]": [
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/yql/tools/dqrun/examples/gateways.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ HttpGateway {
}

YqlCore {
Flags {
Name: "_EnableStreamLookupJoin"
}
Flags {
Name: "_EnableMatchRecognize"
}
Expand Down

0 comments on commit 63dbb3b

Please sign in to comment.