diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp index 05b94e3890a1..5bf76197abfc 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -101,7 +101,7 @@ struct TSpilledData { return AsyncWriteOperation; } - TAsyncReadOperation Read(TStorage &buffer, TComputationContext& ctx) { + TAsyncReadOperation Read(TStorage &buffer, const TComputationContext& ctx) { if (AsyncReadOperation) { if (AsyncReadOperation->HasValue()) { Spiller->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), ctx.HolderFactory); @@ -134,7 +134,7 @@ class TSpilledUnboxedValuesIterator { TSpilledData::TPtr SpilledData; std::function LessFunc; ui32 Width_; - TComputationContext* Ctx; + const TComputationContext* Ctx; bool HasValue = false; public: @@ -142,7 +142,7 @@ class TSpilledUnboxedValuesIterator { const std::function& lessFunc, TSpilledData::TPtr spilledData, size_t dataWidth, - TComputationContext* ctx + const TComputationContext* ctx ) : SpilledData(spilledData) , LessFunc(lessFunc) @@ -251,32 +251,6 @@ using TBase = TComputationValue>; InputStatus = EFetchResult::Finish; } - virtual EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { - while (EFetchResult::Finish != InputStatus) { - switch (InputStatus = Flow->FetchValues(ctx, GetFields())) { - case EFetchResult::One: - Put(); - continue; - case EFetchResult::Yield: - return EFetchResult::Yield; - case EFetchResult::Finish: - Seal(); - break; - } - } - - if (auto extract = Extract()) { - for (const auto index : Indexes) - if (const auto to = output[index]) - *to = std::move(*extract++); - else - ++extract; - return EFetchResult::One; - } - - return EFetchResult::Finish; - } - NUdf::TUnboxedValue*const* GetFields() const { return Fields.data(); } @@ -383,7 +357,9 @@ using TBase = TComputationValue>; void ResetFields() { NUdf::TUnboxedValuePod* ptr; - if constexpr (!HasCount) { + if constexpr (HasCount) { + ptr = Tongue = Free.back(); + } else { auto pos = Storage.size(); Storage.insert(Storage.end(), Indexes.size(), {}); ptr = Storage.data() + pos; @@ -394,118 +370,133 @@ using TBase = TComputationValue>; public: TSpillingSupportState(TMemoryUsageInfo* memInfo, ui64 count, const bool* directons, size_t keyWidth, const TCompareFunc& compare, - const std::vector& indexes, IComputationWideFlowNode *const flow, TMultiType* tupleMultiType) + const std::vector& indexes, TMultiType* tupleMultiType, const TComputationContext& ctx) : TBase(memInfo) - , Flow(flow) , Count(count) , Indexes(indexes) , Directions(directons, directons + keyWidth) , LessFunc(std::bind(std::less(), std::bind(compare, Directions.data(), std::placeholders::_1, std::placeholders::_2), 0)) , Fields(Indexes.size(), nullptr) , TupleMultiType(tupleMultiType) + , Ctx(ctx) { if constexpr (!HasCount) { ResetFields(); return; } - throw yexception() << "Spilling doesn't support TopSort."; + + Storage.resize(GetStorageSize() * Indexes.size()); + Free.resize(GetStorageSize(), nullptr); + if (Count) { + Full.reserve(GetStorageSize()); + auto ptr = Storage.data(); + std::generate(Free.begin(), Free.end(), [&ptr, this]() { + const auto p = ptr; + ptr += Indexes.size(); + return p; + }); + ResetFields(); + } else { + InputStatus = EFetchResult::Finish; + } } - virtual EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { - while (true) { - switch(GetMode()) { - case EOperatingMode::InMemory: { - auto r = DoCalculateInMemory(ctx, output); - if (GetMode() == TSpillingSupportState::EOperatingMode::InMemory) { - return r; - } - break; + bool IsReadyToContinue() { + switch (GetMode()) { + case EOperatingMode::InMemory: + return true; + case EOperatingMode::Spilling: + { + if (!SpillState()) { + return false; } - case EOperatingMode::Spilling: { - DoCalculateWithSpilling(ctx); - if (GetMode() == EOperatingMode::Spilling) { - return EFetchResult::Yield; - } - break; + ResetFields(); + auto nextMode = (IsReadFromChannelFinished() ? EOperatingMode::ProcessSpilled : EOperatingMode::InMemory); + + YQL_LOG(INFO) << (nextMode == EOperatingMode::ProcessSpilled ? "Switching to ProcessSpilled" : "Switching to Memory mode"); + + SwitchMode(nextMode); + return IsReadyToContinue(); + } + case EOperatingMode::ProcessSpilled: + { + if (SpilledUnboxedValuesIterators.empty()) { + return true; } - case EOperatingMode::ProcessSpilled: { - return ProcessSpilledData(output); + for (auto &spilledUnboxedValuesIterator : SpilledUnboxedValuesIterators) { + if (!spilledUnboxedValuesIterator.CheckForInit()) { + return false; + } } - + return true; } } - Y_UNREACHABLE(); } -private: + bool IsFinished() const { + return IsReadFromChannelFinished() && SpilledUnboxedValuesIterators.empty(); + } - EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { - while (EFetchResult::Finish != InputStatus) { - switch (InputStatus = Flow->FetchValues(ctx, GetFields())) { - case EFetchResult::One: - if (Put()) { - if (ctx.SpillerFactory && !HasMemoryForProcessing()) { - const auto used = TlsAllocState->GetUsed(); - const auto limit = TlsAllocState->GetLimit(); + NUdf::TUnboxedValue*const* GetFields() const { + return Fields.data(); + } - YQL_LOG(INFO) << "yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit; + void Put() { + if constexpr (!HasCount) { + ResetFields(); + if (Ctx.SpillerFactory && !HasMemoryForProcessing()) { + const auto used = TlsAllocState->GetUsed(); + const auto limit = TlsAllocState->GetLimit(); - YQL_LOG(INFO) << "switching Memory mode to Spilling"; + YQL_LOG(INFO) << "Yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit; + YQL_LOG(INFO) << "Switching Memory mode to Spilling"; - SwitchMode(EOperatingMode::Spilling, ctx); - return EFetchResult::Yield; - } - } - continue; - case EFetchResult::Yield: - return EFetchResult::Yield; - case EFetchResult::Finish: - { - if (!SpilledStates.empty()) { - SwitchMode(EOperatingMode::Spilling, ctx); - return EFetchResult::Yield; - } - Seal(); - break; - } + SwitchMode(EOperatingMode::Spilling); } + return; } - if (auto extract = Extract()) { - for (const auto index : Indexes) - if (const auto to = output[index]) - *to = std::move(*extract++); - else - ++extract; - return EFetchResult::One; + if (Full.size() + 1U == GetStorageSize()) { + Free.pop_back(); + + NYql::FastNthElement(Full.begin(), Full.begin() + Count, Full.end(), LessFunc); + std::copy(Full.cbegin() + Count, Full.cend(), std::back_inserter(Free)); + Full.resize(Count); + + std::for_each(Free.cbegin(), Free.cend(), [this](NUdf::TUnboxedValuePod* ptr) { + std::fill_n(static_cast(ptr), Indexes.size(), NUdf::TUnboxedValuePod()); + }); + Free.emplace_back(Tongue); + Throat = nullptr; } - return EFetchResult::Finish; - } + if (Full.size() >= Count) { + if (!Throat) + Throat = *std::max_element(Full.cbegin(), Full.cend(), LessFunc); - EFetchResult DoCalculateWithSpilling(TComputationContext& ctx) { - if (!SpillState()) { - return EFetchResult::Yield; + if (!LessFunc(Tongue, Throat)) + return; } - ResetFields(); - auto nextMode = (IsReadFromChannelFinished() ? EOperatingMode::ProcessSpilled : EOperatingMode::InMemory); - YQL_LOG(INFO) << (nextMode == EOperatingMode::ProcessSpilled ? "switching to ProcessSpilled" : "switching to Memory mode"); + Full.emplace_back(Free.back()); + Free.pop_back(); + ResetFields(); + } - SwitchMode(nextMode, ctx); - return EFetchResult::Yield; + void Seal() { + if (!SpilledStates.empty()) { + SwitchMode(EOperatingMode::Spilling); + return; + } + SealInMemory(); } - EFetchResult ProcessSpilledData(NUdf::TUnboxedValue*const* output) { + NUdf::TUnboxedValue* Extract() { if (SpilledUnboxedValuesIterators.empty()) { - return EFetchResult::Finish; + // No spilled data + return ExtractInMemory(); } - for (auto &spilledUnboxedValuesIterator : SpilledUnboxedValuesIterators) { - if (!spilledUnboxedValuesIterator.CheckForInit()) { - return EFetchResult::Yield; - } - } if (!IsHeapBuilt) { std::make_heap(SpilledUnboxedValuesIterators.begin(), SpilledUnboxedValuesIterators.end()); IsHeapBuilt = true; @@ -515,61 +506,22 @@ using TBase = TComputationValue>; std::pop_heap(SpilledUnboxedValuesIterators.begin(), SpilledUnboxedValuesIterators.end()); auto ¤tIt = SpilledUnboxedValuesIterators.back(); - NKikimr::NUdf::TUnboxedValue* res = currentIt.GetValue(); - for (const auto index : Indexes) - { - if (const auto to = output[index]) - *to = std::move(*res++); - else - ++res; + return currentIt.GetValue(); + } + + void Clean() { + if (SpilledUnboxedValuesIterators.empty()) { + // No spilled data + return; } + auto ¤tIt = SpilledUnboxedValuesIterators.back(); currentIt.Pop(); if (currentIt.IsFinished()) { SpilledUnboxedValuesIterators.pop_back(); } - return EFetchResult::One; - } - - NUdf::TUnboxedValue*const* GetFields() const { - return Fields.data(); - } - - bool Put() { - if constexpr (!HasCount) { - ResetFields(); - return true; - } - - throw yexception() << "Spilling doesn't support TopSort."; - } - - void Seal() { - if constexpr (!HasCount) { - static_assert (Sort); - // Remove placeholder for new data - Storage.resize(Storage.size() - Indexes.size()); - - Full.reserve(Storage.size() / Indexes.size()); - for (auto it = Storage.begin(); it != Storage.end(); it += Indexes.size()) { - Full.emplace_back(&*it); - } - - std::sort(Full.rbegin(), Full.rend(), LessFunc); - return; - } - - throw yexception() << "Spilling doesn't support TopSort."; - } - - NUdf::TUnboxedValue* Extract() { - if (Full.empty()) - return nullptr; - - const auto ptr = Full.back(); - Full.pop_back(); - return static_cast(ptr); } +private: EOperatingMode GetMode() const { return Mode; } bool HasMemoryForProcessing() const { @@ -580,13 +532,13 @@ using TBase = TComputationValue>; return InputStatus == EFetchResult::Finish; } - void SwitchMode(EOperatingMode mode, TComputationContext& ctx) { + void SwitchMode(EOperatingMode mode) { switch(mode) { case EOperatingMode::InMemory: break; case EOperatingMode::Spilling: { - auto spiller = ctx.SpillerFactory->CreateSpiller(); + auto spiller = Ctx.SpillerFactory->CreateSpiller(); const size_t PACK_SIZE = 5_MB; SpilledStates.emplace_back(std::make_unique(spiller, TupleMultiType, PACK_SIZE)); break; @@ -595,7 +547,7 @@ using TBase = TComputationValue>; { SpilledUnboxedValuesIterators.reserve(SpilledStates.size()); for (auto &state: SpilledStates) { - SpilledUnboxedValuesIterators.emplace_back(LessFunc, &state, Indexes.size(), &ctx); + SpilledUnboxedValuesIterators.emplace_back(LessFunc, &state, Indexes.size(), &Ctx); } break; } @@ -613,7 +565,7 @@ using TBase = TComputationValue>; lastSpilledState.Spiller->AsyncWriteCompleted(lastSpilledState.AsyncWriteOperation->ExtractValue()); lastSpilledState.AsyncWriteOperation = std::nullopt; } else { - Seal(); + SealInMemory(); if (Full.empty()) { // Nothing to spill SpilledStates.pop_back(); @@ -621,7 +573,7 @@ using TBase = TComputationValue>; } } - while (auto extract = Extract()) { + while (auto extract = ExtractInMemory()) { auto writeOp = lastSpilledState.Write(extract, Indexes.size()); if (writeOp) { return false; @@ -637,8 +589,47 @@ using TBase = TComputationValue>; return true; } + NUdf::TUnboxedValue* ExtractInMemory() { + if (Full.empty()) + return nullptr; + + const auto ptr = Full.back(); + Full.pop_back(); + return static_cast(ptr); + } + + void SealInMemory() { + if constexpr (!HasCount) { + static_assert (Sort); + // Remove placeholder for new data + Storage.resize(Storage.size() - Indexes.size()); + + Full.reserve(Storage.size() / Indexes.size()); + for (auto it = Storage.begin(); it != Storage.end(); it += Indexes.size()) { + Full.emplace_back(&*it); + } + + std::sort(Full.rbegin(), Full.rend(), LessFunc); + return; + } + + Free.clear(); + Free.shrink_to_fit(); + + if (Full.size() > Count) { + NYql::FastNthElement(Full.begin(), Full.begin() + Count, Full.end(), LessFunc); + Full.resize(Count); + } + + if constexpr (Sort) { + std::sort(Full.rbegin(), Full.rend(), LessFunc); + } + } + +public: EFetchResult InputStatus = EFetchResult::One; - IComputationWideFlowNode *const Flow; + +private: const ui64 Count; const std::vector Indexes; const std::vector Directions; @@ -647,9 +638,12 @@ using TBase = TComputationValue>; TPointers Free, Full; TFields Fields; TMultiType* TupleMultiType; + const TComputationContext& Ctx; std::vector SpilledStates; EOperatingMode Mode = EOperatingMode::InMemory; std::vector SpilledUnboxedValuesIterators; + NUdf::TUnboxedValuePod* Tongue = nullptr; + NUdf::TUnboxedValuePod* Throat = nullptr; bool IsHeapBuilt = false; }; @@ -726,23 +720,42 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode dirs(Directions.size()); std::transform(Directions.cbegin(), Directions.cend(), dirs.begin(), [&ctx](IComputationNode* dir){ return dir->GetValue(ctx).Get(); }); - if (!ctx.ExecuteLLVM) { - MakeSpillingSupportState(ctx, state, count, dirs.data()); - } else { - MakeState(ctx, state, count, dirs.data()); - } + MakeSpillingSupportState(ctx, state, count, dirs.data()); } - // To avoid dynamic_cast implementation in LLVM implementation - // This is temporary solution. Final result will have just one state here. - if (!ctx.ExecuteLLVM) { - if (const auto ptr = static_cast*>(state.AsBoxed().Get())) { - return ptr->DoCalculate(ctx, output); + if (const auto ptr = static_cast*>(state.AsBoxed().Get())) { + while (EFetchResult::Finish != ptr->InputStatus) { + if (!ptr->IsReadyToContinue()) { + return EFetchResult::Yield; + } + switch (ptr->InputStatus = Flow->FetchValues(ctx, ptr->GetFields())) { + case EFetchResult::One: + ptr->Put(); + continue; + case EFetchResult::Yield: + return EFetchResult::Yield; + case EFetchResult::Finish: + ptr->Seal(); + break; + } } - } else { - if (const auto ptr = static_cast*>(state.AsBoxed().Get())) { - return ptr->DoCalculate(ctx, output); + + if (!ptr->IsReadyToContinue()) { + return EFetchResult::Yield; + } + + if (auto extract = ptr->Extract()) { + for (const auto index : Indexes) { + if (const auto to = output[index]) + *to = std::move(*extract++); + else + ++extract; + } + ptr->Clean(); + return EFetchResult::One; } + + return ptr->IsFinished() ? EFetchResult::Finish : EFetchResult::Yield; } Y_UNREACHABLE(); @@ -948,11 +961,11 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode>(count, directions, Directions.size(), TMyValueCompare(Keys), Indexes, Flow, TupleMultiType); - return; - } - state = ctx.HolderFactory.Create>(count, directions, Directions.size(), TMyValueCompare(Keys), Indexes, Flow); +#ifdef MKQL_DISABLE_CODEGEN + state = ctx.HolderFactory.Create>(count, directions, Directions.size(), TMyValueCompare(Keys), Indexes, TupleMultiType, ctx); +#else + state = ctx.HolderFactory.Create>(count, directions, Directions.size(), ctx.ExecuteLLVM && Compare ? TCompareFunc(Compare) : TCompareFunc(TMyValueCompare(Keys)), Indexes, TupleMultiType, ctx); +#endif } void RegisterDependencies() const final {