From 390ef8009f7a491055a576bffd990de63b417919 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Tue, 23 Jul 2024 22:50:22 +0300 Subject: [PATCH 1/5] WideCombiner with spilling better buffer pass --- .../minikql/comp_nodes/mkql_wide_combine.cpp | 84 +++++++++---------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index cf7662ed7843..19228ae95a1e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -364,8 +364,13 @@ class TSpillingSupportState : public TComputationValue { enum class ETasteResult: i8 { Init = -1, Update, - ConsumeRawData, - ExtractRawData + ConsumeRawData + }; + + enum class EUpdateResult: i8 { + Yield = -1, + ExtractRawData, + None }; TSpillingSupportState( TMemoryUsageInfo* memInfo, @@ -398,16 +403,16 @@ class TSpillingSupportState : public TComputationValue { bool IsProcessingRequired() const { if (InputStatus != EFetchResult::Finish) return true; - return HasRawDataToExtract || HasDataForProcessing; + return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory; } - bool UpdateAndWait() { + EUpdateResult Update() { switch (GetMode()) { case EOperatingMode::InMemory: { if (CheckMemoryAndSwitchToSpilling()) { - return UpdateAndWait(); + return Update(); } - return false; + return EUpdateResult::None; } case EOperatingMode::ProcessSpilled: @@ -415,11 +420,11 @@ class TSpillingSupportState : public TComputationValue { case EOperatingMode::Spilling: { UpdateSpillingBuckets(); - if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true; + if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield; if (BufferForUsedInputItems.size()) { auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId]; - if (bucket.AsyncWriteOperation.has_value()) return true; + if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield; bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems); BufferForUsedInputItems.resize(0); //for freeing allocated key value asap @@ -429,7 +434,7 @@ class TSpillingSupportState : public TComputationValue { // Prepare buffer for reading new key BufferForKeyAndState.resize(KeyWidth); - return false; + return EUpdateResult::None; } } } @@ -442,14 +447,6 @@ class TSpillingSupportState : public TComputationValue { return isNew ? ETasteResult::Init : ETasteResult::Update; } if (GetMode() == EOperatingMode::ProcessSpilled) { - if (HasRawDataToExtract) { - // Tongue not used here. - Throat = BufferForUsedInputItems.data(); - HasRawDataToExtract = false; - HasDataForProcessing = true; - return ETasteResult::ExtractRawData; - } - HasDataForProcessing = false; // while restoration we process buckets one by one starting from the first in a queue bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt(); Throat = SpilledBuckets.front().InMemoryProcessingState->Throat; @@ -476,8 +473,11 @@ class TSpillingSupportState : public TComputationValue { MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error"); BufferForUsedInputItems.resize(ItemNodesSize); BufferForUsedInputItemsBucketId = bucketId; + Throat = BufferForUsedInputItems.data(); - + Tongue = nullptr; + + return ETasteResult::ConsumeRawData; } @@ -503,7 +503,7 @@ class TSpillingSupportState : public TComputationValue { BufferForKeyAndState.resize(0); } - bool FlushSpillingBuffersAndWait() { + EUpdateResult FlushSpillingBuffersAndWait() { UpdateSpillingBuckets(); ui64 finishedCount = 0; @@ -519,7 +519,7 @@ class TSpillingSupportState : public TComputationValue { } } - if (finishedCount != SpilledBuckets.size()) return true; + if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield; SwitchMode(EOperatingMode::ProcessSpilled); @@ -628,11 +628,11 @@ class TSpillingSupportState : public TComputationValue { return false; } - bool ProcessSpilledDataAndWait() { - if (SpilledBuckets.empty()) return false; + EUpdateResult ProcessSpilledDataAndWait() { + if (SpilledBuckets.empty()) return EUpdateResult::None; if (AsyncReadOperation) { - if (!AsyncReadOperation->HasValue()) return true; + if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield; if (RecoverState) { SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory); } else { @@ -642,12 +642,8 @@ class TSpillingSupportState : public TComputationValue { } auto& bucket = SpilledBuckets.front(); - if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false; - if (HasDataForProcessing) { - Tongue = bucket.InMemoryProcessingState->Tongue; - Throat = bucket.InMemoryProcessingState->Throat; - return false; - } + if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::None; + //recover spilled state while(!bucket.SpilledState->Empty()) { RecoverState = true; @@ -655,7 +651,7 @@ class TSpillingSupportState : public TComputationValue { AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState); if (AsyncReadOperation) { BufferForKeyAndState.resize(0); - return true; + return EUpdateResult::Yield; } for (size_t i = 0; i< KeyWidth; ++i) { //jumping into unsafe world, refusing ownership @@ -675,18 +671,16 @@ class TSpillingSupportState : public TComputationValue { BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount()); AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems); if (AsyncReadOperation) { - return true; + return EUpdateResult::Yield; } + Throat = BufferForUsedInputItems.data(); Tongue = bucket.InMemoryProcessingState->Tongue; - Throat = bucket.InMemoryProcessingState->Throat; - HasRawDataToExtract = true; - return false; + return EUpdateResult::ExtractRawData; } bucket.BucketState = TSpilledBucket::EBucketState::InMemory; - HasDataForProcessing = false; - return false; + return EUpdateResult::None; } EOperatingMode GetMode() const { @@ -744,10 +738,6 @@ class TSpillingSupportState : public TComputationValue { private: ui64 NextBucketToSpill = 0; - bool HasDataForProcessing = false; - - bool HasRawDataToExtract = false; - TState InMemoryProcessingState; const TMultiType* const UsedInputItemType; const TMultiType* const KeyAndStateType; @@ -1259,6 +1249,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNodeUpdateAndWait()) { - return EFetchResult::Yield; + switch(ptr->Update()) { + case TSpillingSupportState::EUpdateResult::Yield: + return EFetchResult::Yield; + case TSpillingSupportState::EUpdateResult::ExtractRawData: + Nodes.ExtractValues(ctx, static_cast(ptr->Throat), fields); + break; + case TSpillingSupportState::EUpdateResult::None: + break; } if (ptr->InputStatus != EFetchResult::Finish) { for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) @@ -1297,9 +1294,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode(ptr->Throat)); break; - case TSpillingSupportState::ETasteResult::ExtractRawData: - Nodes.ExtractValues(ctx, static_cast(ptr->Throat), fields); - break; } continue; } From a128cead720ab67c8d3b9ffa69deeb73ef3a7877 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 24 Jul 2024 14:46:56 +0300 Subject: [PATCH 2/5] fixup + llvm code --- ydb/library/yql/minikql/aligned_page_pool.cpp | 2 +- .../yql/minikql/comp_nodes/mkql_wide_combine.cpp | 15 +++++++++------ .../dq/worker_manager/local_worker_manager.h | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/ydb/library/yql/minikql/aligned_page_pool.cpp b/ydb/library/yql/minikql/aligned_page_pool.cpp index 72e08113df6b..e3e87d357308 100644 --- a/ydb/library/yql/minikql/aligned_page_pool.cpp +++ b/ydb/library/yql/minikql/aligned_page_pool.cpp @@ -505,7 +505,7 @@ template void TAlignedPagePoolImpl::UpdateMemoryYellowZone() { if (Limit == 0) return; if (IsMemoryYellowZoneForcefullyChanged) return; - if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; + // if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; ui8 usedMemoryPercent = 100 * GetUsed() / Limit; if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index 19228ae95a1e..c23a21f843b6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -475,8 +475,6 @@ class TSpillingSupportState : public TComputationValue { BufferForUsedInputItemsBucketId = bucketId; Throat = BufferForUsedInputItems.data(); - Tongue = nullptr; - return ETasteResult::ConsumeRawData; } @@ -1360,13 +1358,18 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNodegetType()}, false); + const auto updateFuncPtr = CastInst::Create(Instruction::IntToPtr, updateFunc, PointerType::getUnqual(updateType), "update_func", block); + const auto update = CallInst::Create(updateType, updateFuncPtr, { stateArg }, "update", block); result->addIncoming(ConstantInt::get(statusType, static_cast(EFetchResult::Yield)), block); - BranchInst::Create(over, test, waitMore, block); + const auto updateWay = SwitchInst::Create(update, test, 3U, block); + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::Yield)), over); + // TODO add exctraction code and jmp there + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::ExtractRawData)), test); + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::None)), test); block = test; diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 9185c43c485f..6078318b5e34 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -30,7 +30,7 @@ namespace NYql::NDqs { NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; THashMap ClusterNamesMapping; - ui64 MkqlInitialMemoryLimit = 8_GB; + ui64 MkqlInitialMemoryLimit = 50_MB; ui64 MkqlTotalMemoryLimit = 0; ui64 MkqlMinAllocSize = 30_MB; ui64 MkqlProgramHardMemoryLimit = 0; From a0dfb94fc8332e338d41ac7dc6baf61032c289c4 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 24 Jul 2024 14:53:40 +0300 Subject: [PATCH 3/5] fixup --- ydb/library/yql/minikql/aligned_page_pool.cpp | 2 +- .../yql/providers/dq/worker_manager/local_worker_manager.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/minikql/aligned_page_pool.cpp b/ydb/library/yql/minikql/aligned_page_pool.cpp index e3e87d357308..72e08113df6b 100644 --- a/ydb/library/yql/minikql/aligned_page_pool.cpp +++ b/ydb/library/yql/minikql/aligned_page_pool.cpp @@ -505,7 +505,7 @@ template void TAlignedPagePoolImpl::UpdateMemoryYellowZone() { if (Limit == 0) return; if (IsMemoryYellowZoneForcefullyChanged) return; - // if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; + if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; ui8 usedMemoryPercent = 100 * GetUsed() / Limit; if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) { diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 6078318b5e34..9185c43c485f 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -30,7 +30,7 @@ namespace NYql::NDqs { NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; THashMap ClusterNamesMapping; - ui64 MkqlInitialMemoryLimit = 50_MB; + ui64 MkqlInitialMemoryLimit = 8_GB; ui64 MkqlTotalMemoryLimit = 0; ui64 MkqlMinAllocSize = 30_MB; ui64 MkqlProgramHardMemoryLimit = 0; From 21b7d15befc02c94cd088155505b669638bd03f1 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 24 Jul 2024 15:01:48 +0300 Subject: [PATCH 4/5] fixup --- ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index c23a21f843b6..76d80ec4f15a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -1247,7 +1247,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode Date: Wed, 24 Jul 2024 16:49:32 +0300 Subject: [PATCH 5/5] more refactoring --- .../minikql/comp_nodes/mkql_wide_combine.cpp | 109 ++++++++++-------- 1 file changed, 58 insertions(+), 51 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index 76d80ec4f15a..b87faad03708 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -370,7 +370,9 @@ class TSpillingSupportState : public TComputationValue { enum class EUpdateResult: i8 { Yield = -1, ExtractRawData, - None + ReadInput, + Extract, + Finish }; TSpillingSupportState( TMemoryUsageInfo* memInfo, @@ -407,16 +409,17 @@ class TSpillingSupportState : public TComputationValue { } EUpdateResult Update() { + if (IsEverythingExtracted) return EUpdateResult::Finish; + switch (GetMode()) { case EOperatingMode::InMemory: { if (CheckMemoryAndSwitchToSpilling()) { return Update(); } - return EUpdateResult::None; + if (InputStatus == EFetchResult::Finish) return EUpdateResult::Extract; + + return EUpdateResult::ReadInput; } - - case EOperatingMode::ProcessSpilled: - return ProcessSpilledDataAndWait(); case EOperatingMode::Spilling: { UpdateSpillingBuckets(); @@ -434,8 +437,10 @@ class TSpillingSupportState : public TComputationValue { // Prepare buffer for reading new key BufferForKeyAndState.resize(KeyWidth); - return EUpdateResult::None; + return EUpdateResult::ReadInput; } + case EOperatingMode::ProcessSpilled: + return ProcessSpilledData(); } } @@ -480,14 +485,20 @@ class TSpillingSupportState : public TComputationValue { } NUdf::TUnboxedValuePod* Extract() { - if (GetMode() == EOperatingMode::InMemory) return static_cast(InMemoryProcessingState.Extract()); + NUdf::TUnboxedValue* value = nullptr; + if (GetMode() == EOperatingMode::InMemory) { + value = static_cast(InMemoryProcessingState.Extract()); + if (!value) IsEverythingExtracted = true; + return value; + } MKQL_ENSURE(SpilledBuckets.front().BucketState == TSpilledBucket::EBucketState::InMemory, "Internal logic error"); MKQL_ENSURE(SpilledBuckets.size() > 0, "Internal logic error"); - auto value = static_cast(SpilledBuckets.front().InMemoryProcessingState->Extract()); + value = static_cast(SpilledBuckets.front().InMemoryProcessingState->Extract()); if (!value) { SpilledBuckets.pop_front(); + if (SpilledBuckets.empty()) IsEverythingExtracted = true; } return value; @@ -521,7 +532,7 @@ class TSpillingSupportState : public TComputationValue { SwitchMode(EOperatingMode::ProcessSpilled); - return ProcessSpilledDataAndWait(); + return ProcessSpilledData(); } void SplitStateIntoBuckets() { @@ -626,9 +637,7 @@ class TSpillingSupportState : public TComputationValue { return false; } - EUpdateResult ProcessSpilledDataAndWait() { - if (SpilledBuckets.empty()) return EUpdateResult::None; - + EUpdateResult ProcessSpilledData() { if (AsyncReadOperation) { if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield; if (RecoverState) { @@ -640,7 +649,7 @@ class TSpillingSupportState : public TComputationValue { } auto& bucket = SpilledBuckets.front(); - if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::None; + if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::Extract; //recover spilled state while(!bucket.SpilledState->Empty()) { @@ -678,7 +687,7 @@ class TSpillingSupportState : public TComputationValue { return EUpdateResult::ExtractRawData; } bucket.BucketState = TSpilledBucket::EBucketState::InMemory; - return EUpdateResult::None; + return EUpdateResult::Extract; } EOperatingMode GetMode() const { @@ -736,6 +745,8 @@ class TSpillingSupportState : public TComputationValue { private: ui64 NextBucketToSpill = 0; + bool IsEverythingExtracted = false; + TState InMemoryProcessingState; const TMultiType* const UsedInputItemType; const TMultiType* const KeyAndStateType; @@ -1257,52 +1268,48 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNodeUpdate()) { + case TSpillingSupportState::EUpdateResult::ReadInput: { + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) + fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); + switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { + case EFetchResult::One: + break; + case EFetchResult::Finish: + continue; + case EFetchResult::Yield: + return EFetchResult::Yield; + } + break; + } case TSpillingSupportState::EUpdateResult::Yield: return EFetchResult::Yield; case TSpillingSupportState::EUpdateResult::ExtractRawData: Nodes.ExtractValues(ctx, static_cast(ptr->Throat), fields); break; - case TSpillingSupportState::EUpdateResult::None: - break; - } - if (ptr->InputStatus != EFetchResult::Finish) { - for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) - fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); - switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { - case EFetchResult::One: - break; - case EFetchResult::Finish: - continue; - case EFetchResult::Yield: - return EFetchResult::Yield; - } + case TSpillingSupportState::EUpdateResult::Extract: + if (const auto values = static_cast(ptr->Extract())) { + Nodes.FinishItem(ctx, values, output); + return EFetchResult::One; + } + continue; + case TSpillingSupportState::EUpdateResult::Finish: + return EFetchResult::Finish; } - if (ptr->IsProcessingRequired()) { - Nodes.ExtractKey(ctx, fields, static_cast(ptr->Tongue)); + Nodes.ExtractKey(ctx, fields, static_cast(ptr->Tongue)); - switch(ptr->TasteIt()) { - case TSpillingSupportState::ETasteResult::Init: - Nodes.ProcessItem(ctx, nullptr, static_cast(ptr->Throat)); - break; - case TSpillingSupportState::ETasteResult::Update: - Nodes.ProcessItem(ctx, static_cast(ptr->Tongue), static_cast(ptr->Throat)); - break; - case TSpillingSupportState::ETasteResult::ConsumeRawData: - Nodes.ExtractValues(ctx, fields, static_cast(ptr->Throat)); - break; - } - continue; - } - - if (const auto values = static_cast(ptr->Extract())) { - Nodes.FinishItem(ctx, values, output); - return EFetchResult::One; + switch(ptr->TasteIt()) { + case TSpillingSupportState::ETasteResult::Init: + Nodes.ProcessItem(ctx, nullptr, static_cast(ptr->Throat)); + break; + case TSpillingSupportState::ETasteResult::Update: + Nodes.ProcessItem(ctx, static_cast(ptr->Tongue), static_cast(ptr->Throat)); + break; + case TSpillingSupportState::ETasteResult::ConsumeRawData: + Nodes.ExtractValues(ctx, fields, static_cast(ptr->Throat)); + break; } - if (!ptr->HasAnyData()) { - return EFetchResult::Finish; - } } } Y_UNREACHABLE(); @@ -1368,7 +1375,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNodeaddCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::Yield)), over); // TODO add exctraction code and jmp there updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::ExtractRawData)), test); - updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::None)), test); + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::Extract)), test); block = test;