From f5ba47aa8dbd3e502a365e42ec10ab5c453b29b7 Mon Sep 17 00:00:00 2001 From: Alexey Pozdniakov Date: Tue, 22 Oct 2024 20:41:24 +0300 Subject: [PATCH] [YQ-3621] support AFTER MATCH SKIP PAST LAST ROW (#10597) --- .../yql/core/sql_types/match_recognize.h | 15 ++ .../comp_nodes/mkql_match_recognize.cpp | 179 +++--------------- .../comp_nodes/mkql_match_recognize_nfa.h | 7 +- .../comp_nodes/ut/mkql_match_recognize_ut.cpp | 4 +- .../yql/minikql/mkql_program_builder.cpp | 7 +- .../yql/minikql/mkql_program_builder.h | 3 +- .../yql/minikql/mkql_runtime_version.h | 2 +- .../common/mkql/yql_provider_mkql.cpp | 11 +- ydb/library/yql/sql/v1/match_recognize.cpp | 4 +- ydb/library/yql/sql/v1/match_recognize.h | 21 +- .../yql/sql/v1/sql_match_recognize.cpp | 32 ++-- ydb/library/yql/sql/v1/sql_match_recognize.h | 2 +- .../sql/dq_file/part5/canondata/result.json | 22 +++ .../hybrid_file/part4/canondata/result.json | 14 ++ .../tests/sql/sql2yql/canondata/result.json | 38 ++-- .../after_match_skip_past_last_row.sql | 19 ++ .../match_recognize/alerts-streaming.sql | 1 + .../sql/suites/match_recognize/alerts.sql | 1 + .../match_recognize/alerts_without_order.sql | 1 + .../sql/suites/match_recognize/permute.sql | 1 + .../part5/canondata/result.json | 21 ++ 21 files changed, 199 insertions(+), 206 deletions(-) create mode 100644 ydb/library/yql/tests/sql/suites/match_recognize/after_match_skip_past_last_row.sql diff --git a/ydb/library/yql/core/sql_types/match_recognize.h b/ydb/library/yql/core/sql_types/match_recognize.h index e30e11eb2131..0c6105ad9413 100644 --- a/ydb/library/yql/core/sql_types/match_recognize.h +++ b/ydb/library/yql/core/sql_types/match_recognize.h @@ -8,6 +8,21 @@ namespace NYql::NMatchRecognize { +enum class EAfterMatchSkipTo { + NextRow, + PastLastRow, + ToFirst, + ToLast, + To +}; + +struct TAfterMatchSkipTo { + EAfterMatchSkipTo To; + TString Var; + + [[nodiscard]] bool operator==(const TAfterMatchSkipTo&) const noexcept = default; +}; + constexpr size_t MaxPatternNesting = 20; //Limit recursion for patterns constexpr size_t MaxPermutedItems = 6; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp index 7530cf06df25..772bd6cbd0f7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp @@ -39,131 +39,13 @@ struct TMatchRecognizeProcessorParameters { TMeasureInputColumnOrder MeasureInputColumnOrder; TComputationNodePtrVector Measures; TOutputColumnOrder OutputColumnOrder; -}; - -class TBackTrackingMatchRecognize { - using TPartitionList = TSimpleList; - using TRange = TPartitionList::TRange; - using TMatchedVars = TMatchedVars; -public: - //TODO(YQL-16486): create a tree for backtracking(replace var names with indexes) - - struct TPatternConfiguration { - void Save(TMrOutputSerializer& /*serializer*/) const { - } - - void Load(TMrInputSerializer& /*serializer*/) { - } - - friend bool operator==(const TPatternConfiguration&, const TPatternConfiguration&) { - return true; - } - }; - - struct TPatternConfigurationBuilder { - using TPatternConfigurationPtr = std::shared_ptr; - static TPatternConfigurationPtr Create(const TRowPattern& pattern, const THashMap& varNameToIndex) { - Y_UNUSED(pattern); - Y_UNUSED(varNameToIndex); - return std::make_shared(); - } - }; - - TBackTrackingMatchRecognize( - NUdf::TUnboxedValue&& partitionKey, - const TMatchRecognizeProcessorParameters& parameters, - const TPatternConfigurationBuilder::TPatternConfigurationPtr pattern, - const TContainerCacheOnContext& cache - ) - : PartitionKey(std::move(partitionKey)) - , Parameters(parameters) - , Cache(cache) - , CurMatchedVars(parameters.Defines.size()) - , MatchNumber(0) - { - //TODO(YQL-16486) - Y_UNUSED(pattern); - } - - bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) { - Y_UNUSED(ctx); - Rows.Append(std::move(row)); - return false; - } - NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { - if (Matches.empty()) - return NUdf::TUnboxedValue{}; - Parameters.MatchedVarsArg->SetValue(ctx, ToValue(ctx.HolderFactory, std::move(Matches.front()))); - Matches.pop_front(); - Parameters.MeasureInputDataArg->SetValue(ctx, ctx.HolderFactory.Create( - Parameters.InputDataArg->GetValue(ctx), - Parameters.MeasureInputColumnOrder, - Parameters.MatchedVarsArg->GetValue(ctx), - Parameters.VarNames, - ++MatchNumber - )); - NUdf::TUnboxedValue *itemsPtr = nullptr; - const auto result = Cache.NewArray(ctx, Parameters.OutputColumnOrder.size(), itemsPtr); - for (auto const& c: Parameters.OutputColumnOrder) { - switch(c.first) { - case EOutputColumnSource::Measure: - *itemsPtr++ = Parameters.Measures[c.second]->GetValue(ctx); - break; - case EOutputColumnSource::PartitionKey: - *itemsPtr++ = PartitionKey.GetElement(c.second); - break; - } - } - return result; - } - bool ProcessEndOfData(TComputationContext& ctx) { - //Assume, that data moved to IComputationExternalNode node, will not be modified or released - //till the end of the current function - auto rowsSize = Rows.Size(); - Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.Create>(Rows)); - for (size_t i = 0; i != rowsSize; ++i) { - Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(static_cast(i))); - for (size_t v = 0; v != Parameters.Defines.size(); ++v) { - const auto &d = Parameters.Defines[v]->GetValue(ctx); - if (d && d.GetOptionalValue().Get()) { - Extend(CurMatchedVars[v], TRange{i}); - } - } - //for the sake of dummy usage assume non-overlapped matches at every 5th row of any partition - if (i % 5 == 0) { - TMatchedVars temp; - temp.swap(CurMatchedVars); - Matches.emplace_back(std::move(temp)); - CurMatchedVars.resize(Parameters.Defines.size()); - } - } - return not Matches.empty(); - } - - void Save(TOutputSerializer& /*serializer*/) const { - // Not used in not streaming mode. - } - - void Load(TMrInputSerializer& /*serializer*/) { - // Not used in not streaming mode. - } - -private: - const NUdf::TUnboxedValue PartitionKey; - const TMatchRecognizeProcessorParameters& Parameters; - const TContainerCacheOnContext& Cache; - TSimpleList Rows; - TMatchedVars CurMatchedVars; - std::deque> Matches; - ui64 MatchNumber; + TAfterMatchSkipTo SkipTo; }; class TStreamingMatchRecognize { using TPartitionList = TSparseList; using TRange = TPartitionList::TRange; public: - using TPatternConfiguration = TNfaTransitionGraph; - using TPatternConfigurationBuilder = TNfaTransitionGraphBuilder; TStreamingMatchRecognize( NUdf::TUnboxedValue&& partitionKey, const TMatchRecognizeProcessorParameters& parameters, @@ -213,6 +95,9 @@ class TStreamingMatchRecognize { break; } } + if (EAfterMatchSkipTo::PastLastRow == Parameters.SkipTo.To) { + Nfa.Clear(); + } return result; } bool ProcessEndOfData(TComputationContext& ctx) { @@ -243,11 +128,9 @@ class TStreamingMatchRecognize { ui64 MatchNumber = 0; }; -template class TStateForNonInterleavedPartitions - : public TComputationValue> + : public TComputationValue { - using TRowPatternConfigurationBuilder = typename Algo::TPatternConfigurationBuilder; public: TStateForNonInterleavedPartitions( TMemoryUsageInfo* memInfo, @@ -265,7 +148,7 @@ class TStateForNonInterleavedPartitions , PartitionKey(partitionKey) , PartitionKeyPacker(true, partitionKeyType) , Parameters(parameters) - , RowPatternConfiguration(TRowPatternConfigurationBuilder::Create(parameters.Pattern, parameters.VarNamesLookup)) + , RowPatternConfiguration(TNfaTransitionGraphBuilder::Create(parameters.Pattern, parameters.VarNamesLookup)) , Cache(cache) , Terminating(false) , SerializerContext(ctx, rowType, rowPacker) @@ -301,7 +184,7 @@ class TStateForNonInterleavedPartitions bool validPartitionHandler = in.Read(); if (validPartitionHandler) { NUdf::TUnboxedValue key = PartitionKeyPacker.Unpack(CurPartitionPackedKey, SerializerContext.Ctx.HolderFactory); - PartitionHandler.reset(new Algo( + PartitionHandler.reset(new TStreamingMatchRecognize( std::move(key), Parameters, RowPatternConfiguration, @@ -313,7 +196,7 @@ class TStateForNonInterleavedPartitions if (validDelayedRow) { in(DelayedRow); } - auto restoredRowPatternConfiguration = std::make_shared(); + auto restoredRowPatternConfiguration = std::make_shared(); restoredRowPatternConfiguration->Load(in); MKQL_ENSURE(*restoredRowPatternConfiguration == *RowPatternConfiguration, "Restored and current RowPatternConfiguration is different"); MKQL_ENSURE(in.Empty(), "State is corrupted"); @@ -367,12 +250,11 @@ class TStateForNonInterleavedPartitions InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(temp)); auto partitionKey = PartitionKey->GetValue(ctx); CurPartitionPackedKey = PartitionKeyPacker.Pack(partitionKey); - PartitionHandler.reset(new Algo( + PartitionHandler.reset(new TStreamingMatchRecognize( std::move(partitionKey), Parameters, RowPatternConfiguration, - Cache - )); + Cache)); PartitionHandler->ProcessInputRow(std::move(temp), ctx); } if (Terminating) { @@ -382,12 +264,12 @@ class TStateForNonInterleavedPartitions } private: TString CurPartitionPackedKey; - std::unique_ptr PartitionHandler; + std::unique_ptr PartitionHandler; IComputationExternalNode* InputRowArg; IComputationNode* PartitionKey; TValuePackerGeneric PartitionKeyPacker; const TMatchRecognizeProcessorParameters& Parameters; - const typename TRowPatternConfigurationBuilder::TPatternConfigurationPtr RowPatternConfiguration; + const TNfaTransitionGraph::TPtr RowPatternConfiguration; const TContainerCacheOnContext& Cache; NUdf::TUnboxedValue DelayedRow; bool Terminating; @@ -768,6 +650,11 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation defines.push_back(callable.GetInput(inputIndex++)); } const auto& streamingMode = callable.GetInput(inputIndex++); + NYql::NMatchRecognize::TAfterMatchSkipTo skipTo = {NYql::NMatchRecognize::EAfterMatchSkipTo::NextRow, ""}; + if (inputIndex + 2 <= callable.GetInputsCount()) { + skipTo.To = static_cast(AS_VALUE(TDataLiteral, callable.GetInput(inputIndex++))->AsValue().Get()); + skipTo.Var = AS_VALUE(TDataLiteral, callable.GetInput(inputIndex++))->AsValue().AsStringRef(); + } MKQL_ENSURE(callable.GetInputsCount() == inputIndex, "Wrong input count"); const auto& [vars, varsLookup] = ConvertListOfStrings(varNames); @@ -788,6 +675,7 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation ) , ConvertVectorOfCallables(measures, ctx) , GetOutputColumnOrder(partitionColumnIndexes, measureColumnIndexes) + , skipTo }; if (AS_VALUE(TDataLiteral, streamingMode)->AsValue().Get()) { return new TMatchRecognizeWrapper(ctx.Mutables @@ -800,28 +688,15 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation , rowType ); } else { - const bool useNfaForTables = true; //TODO(YQL-16486) get this flag from an optimizer - if (useNfaForTables) { - return new TMatchRecognizeWrapper>(ctx.Mutables - , GetValueRepresentation(inputFlow.GetStaticType()) - , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) - , static_cast(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) - , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) - , partitionKeySelector.GetStaticType() - , std::move(parameters) - , rowType - ); - } else { - return new TMatchRecognizeWrapper>(ctx.Mutables - , GetValueRepresentation(inputFlow.GetStaticType()) - , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) - , static_cast(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) - , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) - , partitionKeySelector.GetStaticType() - , std::move(parameters) - , rowType - ); - } + return new TMatchRecognizeWrapper(ctx.Mutables + , GetValueRepresentation(inputFlow.GetStaticType()) + , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) + , static_cast(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) + , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) + , partitionKeySelector.GetStaticType() + , std::move(parameters) + , rowType + ); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_nfa.h b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_nfa.h index 77df6f1f66f5..398047e33c43 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_nfa.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_nfa.h @@ -283,8 +283,7 @@ class TNfaTransitionGraphBuilder { return {input, output}; } public: - using TPatternConfigurationPtr = TNfaTransitionGraph::TPtr; - static TPatternConfigurationPtr Create(const TRowPattern& pattern, const THashMap& varNameToIndex) { + static TNfaTransitionGraph::TPtr Create(const TRowPattern& pattern, const THashMap& varNameToIndex) { auto result = std::make_shared(); TNfaTransitionGraphBuilder builder(result); auto item = builder.BuildTerms(pattern, varNameToIndex); @@ -455,6 +454,10 @@ class TNfa { serializer.Read(EpsilonTransitionsLastRow); } + void Clear() { + ActiveStates.clear(); + } + private: //TODO (zverevgeny): Consider to change to std::vector for the sake of perf using TStateSet = std::set, TMKQLAllocator>; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_ut.cpp index 08a675535d08..387762e9bd5e 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_ut.cpp @@ -115,7 +115,9 @@ namespace NKikimr { {NYql::NMatchRecognize::TRowPatternFactor{"A", 3, 3, false, false, false}} }, getDefines, - streamingMode); + streamingMode, + {NYql::NMatchRecognize::EAfterMatchSkipTo::NextRow, ""} + ); auto graph = setup.BuildGraph(pgmReturn); return graph; diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 2272ae640b3e..dccdf7ad5ff7 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5902,7 +5902,8 @@ TRuntimeNode TProgramBuilder::MatchRecognizeCore( const TArrayRef>& getMeasures, const NYql::NMatchRecognize::TRowPattern& pattern, const TArrayRef>& getDefines, - bool streamingMode + bool streamingMode, + const NYql::NMatchRecognize::TAfterMatchSkipTo& skipTo ) { MKQL_ENSURE(RuntimeVersion >= 42, "MatchRecognize is not supported in runtime version " << RuntimeVersion); @@ -6056,6 +6057,10 @@ TRuntimeNode TProgramBuilder::MatchRecognizeCore( callableBuilder.Add(d); } callableBuilder.Add(NewDataLiteral(streamingMode)); + if (RuntimeVersion >= 52U) { + callableBuilder.Add(NewDataLiteral(static_cast(skipTo.To))); + callableBuilder.Add(NewDataLiteral(skipTo.Var)); + } return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 97daf163c586..34fe726a4f83 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -696,7 +696,8 @@ class TProgramBuilder : public TTypeBuilder { const TArrayRef>& getMeasures, const NYql::NMatchRecognize::TRowPattern& pattern, const TArrayRef>& getDefines, - bool streamingMode + bool streamingMode, + const NYql::NMatchRecognize::TAfterMatchSkipTo& skipTo ); TRuntimeNode TimeOrderRecover( diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index 22072157e87f..bfd26216ab87 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 50U +#define MKQL_RUNTIME_VERSION 52U #endif // History: diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index f72f6f32e1bb..39c3383607f5 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -19,6 +19,7 @@ #include #include +#include #include @@ -874,6 +875,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { //explore params const auto& measures = params->ChildRef(0); + const auto& skipTo = params->ChildRef(2); const auto& pattern = params->ChildRef(3); const auto& defines = params->ChildRef(4); @@ -916,6 +918,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }; } + auto stringTo = skipTo->Child(0)->Content(); + auto var = skipTo->Child(1)->Content(); + MKQL_ENSURE(stringTo.SkipPrefix("AfterMatchSkip_"), R"(MATCH_RECOGNIZE: should start with "AfterMatchSkip_")"); + NYql::NMatchRecognize::EAfterMatchSkipTo to; + MKQL_ENSURE(TryFromString(stringTo, to), "MATCH_RECOGNIZE: cannot parse AfterMatchSkipTo mode"); + const auto streamingMode = FromString(settings->Child(0)->Child(1)->Content()); return ctx.ProgramBuilder.MatchRecognizeCore( @@ -925,7 +933,8 @@ TMkqlCommonCallableCompiler::TShared::TShared() { getMeasures, NYql::NMatchRecognize::ConvertPattern(pattern, ctx.ExprCtx), getDefines, - streamingMode + streamingMode, + NYql::NMatchRecognize::TAfterMatchSkipTo{to, TString{var}} ); }); diff --git a/ydb/library/yql/sql/v1/match_recognize.cpp b/ydb/library/yql/sql/v1/match_recognize.cpp index 284654b097f8..47055e2f3d7b 100644 --- a/ydb/library/yql/sql/v1/match_recognize.cpp +++ b/ydb/library/yql/sql/v1/match_recognize.cpp @@ -22,7 +22,7 @@ class TMatchRecognize: public TAstListNode { std::pair>&& sortSpecs, std::pair>&& measures, std::pair&& rowsPerMatch, - std::pair&& skipTo, + std::pair&& skipTo, std::pair&& pattern, std::pair&& subset, std::pair>&& definitions @@ -57,7 +57,7 @@ class TMatchRecognize: public TAstListNode { std::pair>&& sortSpecs, std::pair>&& measures, std::pair&& rowsPerMatch, - std::pair&& skipTo, + std::pair&& skipTo, std::pair&& pattern, std::pair&& subset, std::pair>&& definitions diff --git a/ydb/library/yql/sql/v1/match_recognize.h b/ydb/library/yql/sql/v1/match_recognize.h index 5b64de823193..7945e86e5b8e 100644 --- a/ydb/library/yql/sql/v1/match_recognize.h +++ b/ydb/library/yql/sql/v1/match_recognize.h @@ -15,23 +15,6 @@ enum class ERowsPerMatch { AllRows }; -enum class EAfterMatchSkipTo { - NextRow, - PastLastRow, - ToFirst, - ToLast, - To -}; - -struct TAfterMatchSkipTo { - TAfterMatchSkipTo(EAfterMatchSkipTo to, const TStringBuf var = TStringBuf()) - : To(to) - , Var(var) - {} - EAfterMatchSkipTo To; - TString Var; -}; - class TMatchRecognizeBuilder: public TSimpleRefCount { public: TMatchRecognizeBuilder( @@ -40,7 +23,7 @@ class TMatchRecognizeBuilder: public TSimpleRefCount { std::pair>&& sortSpecs, std::pair>&& measures, std::pair&& rowsPerMatch, - std::pair&& skipTo, + std::pair&& skipTo, std::pair&& pattern, std::pair&& subset, std::pair>&& definitions @@ -63,7 +46,7 @@ class TMatchRecognizeBuilder: public TSimpleRefCount { std::pair> SortSpecs; std::pair> Measures; std::pair RowsPerMatch; - std::pair SkipTo; + std::pair SkipTo; std::pair Pattern; std::pair Subset; std::pair> Definitions; diff --git a/ydb/library/yql/sql/v1/sql_match_recognize.cpp b/ydb/library/yql/sql/v1/sql_match_recognize.cpp index c168f60bd494..5a2ff31c5fa0 100644 --- a/ydb/library/yql/sql/v1/sql_match_recognize.cpp +++ b/ydb/library/yql/sql/v1/sql_match_recognize.cpp @@ -78,13 +78,19 @@ TMatchRecognizeBuilderPtr TSqlMatchRecognizeClause::CreateBuilder(const NSQLv1Ge //this block is located before pattern block in grammar, // but depends on it, so it is processed after pattern block - std::pair skipTo { pos, TAfterMatchSkipTo{EAfterMatchSkipTo::NextRow, TString()} }; + std::pair skipTo { + pos, + NYql::NMatchRecognize::TAfterMatchSkipTo{ + NYql::NMatchRecognize::EAfterMatchSkipTo::PastLastRow, + TString() + } + }; if (commonSyntax.HasBlock1()){ skipTo = ParseAfterMatchSkipTo(commonSyntax.GetBlock1().GetRule_row_pattern_skip_to3()); const auto varRequired = - EAfterMatchSkipTo::ToFirst == skipTo.second.To || - EAfterMatchSkipTo::ToLast == skipTo.second.To || - EAfterMatchSkipTo::To == skipTo.second.To; + NYql::NMatchRecognize::EAfterMatchSkipTo::ToFirst == skipTo.second.To || + NYql::NMatchRecognize::EAfterMatchSkipTo::ToLast == skipTo.second.To || + NYql::NMatchRecognize::EAfterMatchSkipTo::To == skipTo.second.To; if (varRequired) { const auto& allVars = NYql::NMatchRecognize::GetPatternVars(pattern); if (allVars.find(skipTo.second.Var) == allVars.cend()) { @@ -186,39 +192,39 @@ std::pair TSqlMatchRecognizeClause::ParseRowsPerMatch( } } -std::pair TSqlMatchRecognizeClause::ParseAfterMatchSkipTo(const TRule_row_pattern_skip_to& skipToClause) { +std::pair TSqlMatchRecognizeClause::ParseAfterMatchSkipTo(const TRule_row_pattern_skip_to& skipToClause) { switch (skipToClause.GetAltCase()) { case TRule_row_pattern_skip_to::kAltRowPatternSkipTo1: return std::pair{ TokenPosition(skipToClause.GetAlt_row_pattern_skip_to1().GetToken1()), - TAfterMatchSkipTo{EAfterMatchSkipTo::NextRow} + NYql::NMatchRecognize::TAfterMatchSkipTo{NYql::NMatchRecognize::EAfterMatchSkipTo::NextRow, ""} }; case TRule_row_pattern_skip_to::kAltRowPatternSkipTo2: return std::pair{ TokenPosition(skipToClause.GetAlt_row_pattern_skip_to2().GetToken1()), - TAfterMatchSkipTo{EAfterMatchSkipTo::PastLastRow} + NYql::NMatchRecognize::TAfterMatchSkipTo{NYql::NMatchRecognize::EAfterMatchSkipTo::PastLastRow, ""} }; case TRule_row_pattern_skip_to::kAltRowPatternSkipTo3: return std::pair{ TokenPosition(skipToClause.GetAlt_row_pattern_skip_to3().GetToken1()), - TAfterMatchSkipTo{ - EAfterMatchSkipTo::ToFirst, + NYql::NMatchRecognize::TAfterMatchSkipTo{ + NYql::NMatchRecognize::EAfterMatchSkipTo::ToFirst, skipToClause.GetAlt_row_pattern_skip_to3().GetRule_row_pattern_skip_to_variable_name4().GetRule_row_pattern_variable_name1().GetRule_identifier1().GetToken1().GetValue() } }; case TRule_row_pattern_skip_to::kAltRowPatternSkipTo4: return std::pair{ TokenPosition(skipToClause.GetAlt_row_pattern_skip_to4().GetToken1()), - TAfterMatchSkipTo{ - EAfterMatchSkipTo::ToLast, + NYql::NMatchRecognize::TAfterMatchSkipTo{ + NYql::NMatchRecognize::EAfterMatchSkipTo::ToLast, skipToClause.GetAlt_row_pattern_skip_to4().GetRule_row_pattern_skip_to_variable_name4().GetRule_row_pattern_variable_name1().GetRule_identifier1().GetToken1().GetValue() } }; case TRule_row_pattern_skip_to::kAltRowPatternSkipTo5: return std::pair{ TokenPosition(skipToClause.GetAlt_row_pattern_skip_to5().GetToken1()), - TAfterMatchSkipTo{ - EAfterMatchSkipTo::To, + NYql::NMatchRecognize::TAfterMatchSkipTo{ + NYql::NMatchRecognize::EAfterMatchSkipTo::To, skipToClause.GetAlt_row_pattern_skip_to5().GetRule_row_pattern_skip_to_variable_name3().GetRule_row_pattern_variable_name1().GetRule_identifier1().GetToken1().GetValue() } }; diff --git a/ydb/library/yql/sql/v1/sql_match_recognize.h b/ydb/library/yql/sql/v1/sql_match_recognize.h index d8d618920aa4..6766acc95375 100644 --- a/ydb/library/yql/sql/v1/sql_match_recognize.h +++ b/ydb/library/yql/sql/v1/sql_match_recognize.h @@ -18,7 +18,7 @@ class TSqlMatchRecognizeClause: public TSqlTranslation { TNamedFunction ParseOneMeasure(const TRule_row_pattern_measure_definition& node); TVector ParseMeasures(const TRule_row_pattern_measure_list& node); std::pair ParseRowsPerMatch(const TRule_row_pattern_rows_per_match& rowsPerMatchClause); - std::pair ParseAfterMatchSkipTo(const TRule_row_pattern_skip_to& skipToClause); + std::pair ParseAfterMatchSkipTo(const TRule_row_pattern_skip_to& skipToClause); NYql::NMatchRecognize::TRowPatternTerm ParsePatternTerm(const TRule_row_pattern_term& node); NYql::NMatchRecognize::TRowPattern ParsePattern(const TRule_row_pattern& node); TNamedFunction ParseOneDefinition(const TRule_row_pattern_definition& node); diff --git a/ydb/library/yql/tests/sql/dq_file/part5/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part5/canondata/result.json index c9bce0e6d28c..5d4c70abf59d 100644 --- a/ydb/library/yql/tests/sql/dq_file/part5/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part5/canondata/result.json @@ -2039,6 +2039,28 @@ } ], "test.test[key_filter-utf8_with_legacy--Results]": [], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Analyze]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1031349/a955c852651ea9f8124bef13bd770d8d15af6c2e/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Analyze_/plan.txt" + } + ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Debug]": [ + { + "checksum": "7911a3e7570665753b1e25827635db15", + "size": 1317, + "uri": "https://{canondata_backend}/1031349/a955c852651ea9f8124bef13bd770d8d15af6c2e/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Debug_/opt.yql_patched" + } + ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1031349/a955c852651ea9f8124bef13bd770d8d15af6c2e/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Plan_/plan.txt" + } + ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Results]": [], "test.test[optimizers-test_lmap_opts--Analyze]": [ { "checksum": "a019f0e33bc55441f2581dc8345a6b9e", diff --git a/ydb/library/yql/tests/sql/hybrid_file/part4/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part4/canondata/result.json index ffa1661c069e..c810d141dc3f 100644 --- a/ydb/library/yql/tests/sql/hybrid_file/part4/canondata/result.json +++ b/ydb/library/yql/tests/sql/hybrid_file/part4/canondata/result.json @@ -1665,6 +1665,20 @@ "uri": "https://{canondata_backend}/1931696/8382830b676a61af36d1344910d51cd1bf39f3ef/resource.tar.gz#test.test_limit-empty_sort_calc_after_limit-default.txt-Plan_/plan.txt" } ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Debug]": [ + { + "checksum": "e6a51f8c3ed77a2c4dfdf2e55ec4517d", + "size": 1316, + "uri": "https://{canondata_backend}/1880306/5213fbc312a45950f1152a68258af55d6e4976a2/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Debug_/opt.yql_patched" + } + ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1880306/5213fbc312a45950f1152a68258af55d6e4976a2/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Plan_/plan.txt" + } + ], "test.test[optimizers-nonselected_direct_row--Debug]": [ { "checksum": "8b5d70cb31c105309d443a31d1188534", diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index 4b4262ce670a..47fe933e5280 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -10758,6 +10758,13 @@ "uri": "https://{canondata_backend}/1599023/66d0b07d601bb15f0e0b65bb7b4d493f89c1c283/resource.tar.gz#test_sql2yql.test_lineage-with_inline_/sql.yql" } ], + "test_sql2yql.test[match_recognize-after_match_skip_past_last_row]": [ + { + "checksum": "bb84286a97914c6cfd2e47288a49335e", + "size": 3122, + "uri": "https://{canondata_backend}/1130705/ab8dea65d8ef4022fe05bde8ae56bb987e245f04/resource.tar.gz#test_sql2yql.test_match_recognize-after_match_skip_past_last_row_/sql.yql" + } + ], "test_sql2yql.test[match_recognize-alerts-streaming]": [ { "checksum": "608ebe5a00413e80b8e74157944f0b65", @@ -30008,32 +30015,39 @@ "uri": "https://{canondata_backend}/1599023/66d0b07d601bb15f0e0b65bb7b4d493f89c1c283/resource.tar.gz#test_sql_format.test_lineage-with_inline_/formatted.sql" } ], + "test_sql_format.test[match_recognize-after_match_skip_past_last_row]": [ + { + "checksum": "100b9f9651315a432e18868c21b776d6", + "size": 412, + "uri": "https://{canondata_backend}/1130705/ab8dea65d8ef4022fe05bde8ae56bb987e245f04/resource.tar.gz#test_sql_format.test_match_recognize-after_match_skip_past_last_row_/formatted.sql" + } + ], "test_sql_format.test[match_recognize-alerts-streaming]": [ { - "checksum": "b8aa97680d42faf26e093c2a3ccb05f1", - "size": 2939, - "uri": "https://{canondata_backend}/1937001/da4215d5087e56eec0224ec5e7754dafd0b2bdcf/resource.tar.gz#test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql" + "checksum": "d3a3fd90c8a6a758f0067dd66566d37a", + "size": 2968, + "uri": "https://{canondata_backend}/1130705/ab8dea65d8ef4022fe05bde8ae56bb987e245f04/resource.tar.gz#test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql" } ], "test_sql_format.test[match_recognize-alerts]": [ { - "checksum": "585357811c1f0240f4c3207baf8d66f3", - "size": 2941, - "uri": "https://{canondata_backend}/1937001/da4215d5087e56eec0224ec5e7754dafd0b2bdcf/resource.tar.gz#test_sql_format.test_match_recognize-alerts_/formatted.sql" + "checksum": "26acb44218b8f1112df875867fe530ef", + "size": 2970, + "uri": "https://{canondata_backend}/1130705/ab8dea65d8ef4022fe05bde8ae56bb987e245f04/resource.tar.gz#test_sql_format.test_match_recognize-alerts_/formatted.sql" } ], "test_sql_format.test[match_recognize-alerts_without_order]": [ { - "checksum": "779c2c3a4eab619646509ce5008863e8", - "size": 2906, - "uri": "https://{canondata_backend}/1937001/f1ec239726ab3e2cf00695f3d10461ff9ef6c3b0/resource.tar.gz#test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql" + "checksum": "0e6e55207b31bb4597a16821c0b3ac34", + "size": 2935, + "uri": "https://{canondata_backend}/1130705/ab8dea65d8ef4022fe05bde8ae56bb987e245f04/resource.tar.gz#test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql" } ], "test_sql_format.test[match_recognize-permute]": [ { - "checksum": "998e6752ce413cc78e952b9958dfab74", - "size": 721, - "uri": "https://{canondata_backend}/1600758/90e7657ff4d9210d12f860921bc22e4e3c794cc5/resource.tar.gz#test_sql_format.test_match_recognize-permute_/formatted.sql" + "checksum": "97960de85a125f078b142f62ebfe938e", + "size": 750, + "uri": "https://{canondata_backend}/1130705/ab8dea65d8ef4022fe05bde8ae56bb987e245f04/resource.tar.gz#test_sql_format.test_match_recognize-permute_/formatted.sql" } ], "test_sql_format.test[match_recognize-simple_paritioning-streaming]": [ diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/after_match_skip_past_last_row.sql b/ydb/library/yql/tests/sql/suites/match_recognize/after_match_skip_past_last_row.sql new file mode 100644 index 000000000000..79d9c7622000 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/match_recognize/after_match_skip_past_last_row.sql @@ -0,0 +1,19 @@ +pragma FeatureR010="prototype"; +pragma config.flags("MatchRecognizeStream", "disable"); + +$input = SELECT * FROM AS_TABLE([ + <|time:0|>, + <|time:1|>, + <|time:2|>, + <|time:3|>, +]); + +SELECT * FROM $input MATCH_RECOGNIZE( + ORDER BY CAST(time as Timestamp) + MEASURES + FIRST(X.time) as first_time, + LAST(X.time) as last_time + PATTERN (X{2}) + DEFINE + X as True +); diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/alerts-streaming.sql b/ydb/library/yql/tests/sql/suites/match_recognize/alerts-streaming.sql index 5a282ca647ac..efa5bef17ae4 100644 --- a/ydb/library/yql/tests/sql/suites/match_recognize/alerts-streaming.sql +++ b/ydb/library/yql/tests/sql/suites/match_recognize/alerts-streaming.sql @@ -29,6 +29,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login ONE ROW PER MATCH + AFTER MATCH SKIP TO NEXT ROW PATTERN ( LOGIN_SUCCESS_REMOTE ANY_ROW* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW*){2,} LOGIN_SUCCESS_SAME_USER diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/alerts.sql b/ydb/library/yql/tests/sql/suites/match_recognize/alerts.sql index dc5d70ddbd76..65aac91efdf3 100644 --- a/ydb/library/yql/tests/sql/suites/match_recognize/alerts.sql +++ b/ydb/library/yql/tests/sql/suites/match_recognize/alerts.sql @@ -29,6 +29,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login ONE ROW PER MATCH + AFTER MATCH SKIP TO NEXT ROW PATTERN ( LOGIN_SUCCESS_REMOTE ANY_ROW* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW*){2,} LOGIN_SUCCESS_SAME_USER diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql b/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql index 7d92f0f18c7b..4773e16588f7 100644 --- a/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql +++ b/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql @@ -28,6 +28,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login ONE ROW PER MATCH + AFTER MATCH SKIP TO NEXT ROW PATTERN ( LOGIN_SUCCESS_REMOTE ANY_ROW* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW*){2,} LOGIN_SUCCESS_SAME_USER diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/permute.sql b/ydb/library/yql/tests/sql/suites/match_recognize/permute.sql index ff48795cf791..614f5b77e0c7 100644 --- a/ydb/library/yql/tests/sql/suites/match_recognize/permute.sql +++ b/ydb/library/yql/tests/sql/suites/match_recognize/permute.sql @@ -22,6 +22,7 @@ FROM AS_TABLE($data) MATCH_RECOGNIZE( FIRST(B.dt) as b, FIRST(C.dt) as c ONE ROW PER MATCH + AFTER MATCH SKIP TO NEXT ROW PATTERN ( PERMUTE(A, B, C) ) diff --git a/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json index 1dd48fcb5e40..21197f390700 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json @@ -2052,6 +2052,27 @@ "uri": "https://{canondata_backend}/1871182/1ba48914c21beb3df20272c8218b20981a428432/resource.tar.gz#test.test_lineage-window_tablerow-default.txt-Results_/Output.yqlrun.txt.attr" } ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Debug]": [ + { + "checksum": "53f5efe11cf530787e416b733c2b7f53", + "size": 1254, + "uri": "https://{canondata_backend}/1917492/b6d69ba0bdf3cfac8aa79db6bee1738a75d1edc4/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Debug_/opt.yql" + } + ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1917492/b6d69ba0bdf3cfac8aa79db6bee1738a75d1edc4/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Plan_/plan.txt" + } + ], + "test.test[match_recognize-after_match_skip_past_last_row-default.txt-Results]": [ + { + "checksum": "6583532367519fd2d47e3f77357d5627", + "size": 1591, + "uri": "https://{canondata_backend}/1917492/b6d69ba0bdf3cfac8aa79db6bee1738a75d1edc4/resource.tar.gz#test.test_match_recognize-after_match_skip_past_last_row-default.txt-Results_/results.txt" + } + ], "test.test[optimizers-test_lmap_opts--Debug]": [ { "checksum": "e0c91cd592cbfa870bdc55a48f225229",