From f95228d9303ddd054023479a1ecc1c9d8b81a3a2 Mon Sep 17 00:00:00 2001 From: Igor Munkin Date: Wed, 29 May 2024 06:50:45 +0000 Subject: [PATCH 1/2] YQL-16402: Reimplement key extractor for Top node --- ydb/library/yql/minikql/mkql_program_builder.cpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 778aecf8e833..6c49f3af5392 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1887,13 +1887,20 @@ TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callabl TRuntimeNode TProgramBuilder::Top(TRuntimeNode flow, TRuntimeNode count, TRuntimeNode ascending, const TUnaryLambda& keyExtractor) { if (const auto flowType = flow.GetStaticType(); flowType->IsFlow() || flowType->IsStream()) { + const TUnaryLambda getKey = [&](TRuntimeNode item) { return Nth(item, 0U); }; + const TUnaryLambda getItem = [&](TRuntimeNode item) { return Nth(item, 1U); }; + const TUnaryLambda cacheKeyExtractor = [&](TRuntimeNode item) { + return NewTuple({keyExtractor(item), item}); + }; - return FlatMap(Condense1(flow, + return FlatMap(Condense1(Map(flow, cacheKeyExtractor), [&](TRuntimeNode item) { return AsList(item); }, [this](TRuntimeNode, TRuntimeNode) { return NewDataLiteral(false); }, - [&](TRuntimeNode item, TRuntimeNode state) { return KeepTop(count, state, item, ascending, keyExtractor); } + [&](TRuntimeNode item, TRuntimeNode state) { + return KeepTop(count, state, item, ascending, getKey); + } ), - [&](TRuntimeNode list) { return Top(list, count, ascending, keyExtractor); } + [&](TRuntimeNode list) { return Map(Top(list, count, ascending, getKey), getItem); } ); } From ceecfe7974c8d904bf8ccffb2d4b2e19d585bb84 Mon Sep 17 00:00:00 2001 From: Igor Munkin Date: Wed, 29 May 2024 06:50:51 +0000 Subject: [PATCH 2/2] YQL-16402: Reimplement key extractor for TopSort node --- ydb/library/yql/minikql/mkql_program_builder.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 6c49f3af5392..7e4ad0cdb437 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1909,14 +1909,20 @@ TRuntimeNode TProgramBuilder::Top(TRuntimeNode flow, TRuntimeNode count, TRuntim TRuntimeNode TProgramBuilder::TopSort(TRuntimeNode flow, TRuntimeNode count, TRuntimeNode ascending, const TUnaryLambda& keyExtractor) { if (const auto flowType = flow.GetStaticType(); flowType->IsFlow() || flowType->IsStream()) { - return FlatMap(Condense1(flow, + const TUnaryLambda getKey = [&](TRuntimeNode item) { return Nth(item, 0U); }; + const TUnaryLambda getItem = [&](TRuntimeNode item) { return Nth(item, 1U); }; + const TUnaryLambda cacheKeyExtractor = [&](TRuntimeNode item) { + return NewTuple({keyExtractor(item), item}); + }; + + return FlatMap(Condense1(Map(flow, cacheKeyExtractor), [&](TRuntimeNode item) { return AsList(item); }, [this](TRuntimeNode, TRuntimeNode) { return NewDataLiteral(false); }, [&](TRuntimeNode item, TRuntimeNode state) { - return KeepTop(count, state, item, ascending, keyExtractor); + return KeepTop(count, state, item, ascending, getKey); } ), - [&](TRuntimeNode list) { return TopSort(list, count, ascending, keyExtractor); } + [&](TRuntimeNode list) { return Map(TopSort(list, count, ascending, getKey), getItem); } ); }