Skip to content

Commit

Permalink
Merge ceecfe7 into 3d8e652
Browse files Browse the repository at this point in the history
  • Loading branch information
igormunkin authored May 30, 2024
2 parents 3d8e652 + ceecfe7 commit dd42bf3
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions ydb/library/yql/minikql/mkql_program_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1887,13 +1887,20 @@ TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callabl

TRuntimeNode TProgramBuilder::Top(TRuntimeNode flow, TRuntimeNode count, TRuntimeNode ascending, const TUnaryLambda& keyExtractor) {
if (const auto flowType = flow.GetStaticType(); flowType->IsFlow() || flowType->IsStream()) {
const TUnaryLambda getKey = [&](TRuntimeNode item) { return Nth(item, 0U); };
const TUnaryLambda getItem = [&](TRuntimeNode item) { return Nth(item, 1U); };
const TUnaryLambda cacheKeyExtractor = [&](TRuntimeNode item) {
return NewTuple({keyExtractor(item), item});
};

return FlatMap(Condense1(flow,
return FlatMap(Condense1(Map(flow, cacheKeyExtractor),
[&](TRuntimeNode item) { return AsList(item); },
[this](TRuntimeNode, TRuntimeNode) { return NewDataLiteral<bool>(false); },
[&](TRuntimeNode item, TRuntimeNode state) { return KeepTop(count, state, item, ascending, keyExtractor); }
[&](TRuntimeNode item, TRuntimeNode state) {
return KeepTop(count, state, item, ascending, getKey);
}
),
[&](TRuntimeNode list) { return Top(list, count, ascending, keyExtractor); }
[&](TRuntimeNode list) { return Map(Top(list, count, ascending, getKey), getItem); }
);
}

Expand All @@ -1902,14 +1909,20 @@ TRuntimeNode TProgramBuilder::Top(TRuntimeNode flow, TRuntimeNode count, TRuntim

TRuntimeNode TProgramBuilder::TopSort(TRuntimeNode flow, TRuntimeNode count, TRuntimeNode ascending, const TUnaryLambda& keyExtractor) {
if (const auto flowType = flow.GetStaticType(); flowType->IsFlow() || flowType->IsStream()) {
return FlatMap(Condense1(flow,
const TUnaryLambda getKey = [&](TRuntimeNode item) { return Nth(item, 0U); };
const TUnaryLambda getItem = [&](TRuntimeNode item) { return Nth(item, 1U); };
const TUnaryLambda cacheKeyExtractor = [&](TRuntimeNode item) {
return NewTuple({keyExtractor(item), item});
};

return FlatMap(Condense1(Map(flow, cacheKeyExtractor),
[&](TRuntimeNode item) { return AsList(item); },
[this](TRuntimeNode, TRuntimeNode) { return NewDataLiteral<bool>(false); },
[&](TRuntimeNode item, TRuntimeNode state) {
return KeepTop(count, state, item, ascending, keyExtractor);
return KeepTop(count, state, item, ascending, getKey);
}
),
[&](TRuntimeNode list) { return TopSort(list, count, ascending, keyExtractor); }
[&](TRuntimeNode list) { return Map(TopSort(list, count, ascending, getKey), getItem); }
);
}

Expand Down

0 comments on commit dd42bf3

Please sign in to comment.