diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index cf7662ed7843..b87faad03708 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -364,8 +364,15 @@ class TSpillingSupportState : public TComputationValue { enum class ETasteResult: i8 { Init = -1, Update, - ConsumeRawData, - ExtractRawData + ConsumeRawData + }; + + enum class EUpdateResult: i8 { + Yield = -1, + ExtractRawData, + ReadInput, + Extract, + Finish }; TSpillingSupportState( TMemoryUsageInfo* memInfo, @@ -398,28 +405,29 @@ class TSpillingSupportState : public TComputationValue { bool IsProcessingRequired() const { if (InputStatus != EFetchResult::Finish) return true; - return HasRawDataToExtract || HasDataForProcessing; + return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory; } - bool UpdateAndWait() { + EUpdateResult Update() { + if (IsEverythingExtracted) return EUpdateResult::Finish; + switch (GetMode()) { case EOperatingMode::InMemory: { if (CheckMemoryAndSwitchToSpilling()) { - return UpdateAndWait(); + return Update(); } - return false; + if (InputStatus == EFetchResult::Finish) return EUpdateResult::Extract; + + return EUpdateResult::ReadInput; } - - case EOperatingMode::ProcessSpilled: - return ProcessSpilledDataAndWait(); case EOperatingMode::Spilling: { UpdateSpillingBuckets(); - if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true; + if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield; if (BufferForUsedInputItems.size()) { auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId]; - if (bucket.AsyncWriteOperation.has_value()) return true; + if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield; bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems); BufferForUsedInputItems.resize(0); //for freeing allocated key value asap @@ -429,8 +437,10 @@ class TSpillingSupportState : public TComputationValue { // Prepare buffer for reading new key BufferForKeyAndState.resize(KeyWidth); - return false; + return EUpdateResult::ReadInput; } + case EOperatingMode::ProcessSpilled: + return ProcessSpilledData(); } } @@ -442,14 +452,6 @@ class TSpillingSupportState : public TComputationValue { return isNew ? ETasteResult::Init : ETasteResult::Update; } if (GetMode() == EOperatingMode::ProcessSpilled) { - if (HasRawDataToExtract) { - // Tongue not used here. - Throat = BufferForUsedInputItems.data(); - HasRawDataToExtract = false; - HasDataForProcessing = true; - return ETasteResult::ExtractRawData; - } - HasDataForProcessing = false; // while restoration we process buckets one by one starting from the first in a queue bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt(); Throat = SpilledBuckets.front().InMemoryProcessingState->Throat; @@ -476,20 +478,27 @@ class TSpillingSupportState : public TComputationValue { MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error"); BufferForUsedInputItems.resize(ItemNodesSize); BufferForUsedInputItemsBucketId = bucketId; + Throat = BufferForUsedInputItems.data(); - + return ETasteResult::ConsumeRawData; } NUdf::TUnboxedValuePod* Extract() { - if (GetMode() == EOperatingMode::InMemory) return static_cast(InMemoryProcessingState.Extract()); + NUdf::TUnboxedValue* value = nullptr; + if (GetMode() == EOperatingMode::InMemory) { + value = static_cast(InMemoryProcessingState.Extract()); + if (!value) IsEverythingExtracted = true; + return value; + } MKQL_ENSURE(SpilledBuckets.front().BucketState == TSpilledBucket::EBucketState::InMemory, "Internal logic error"); MKQL_ENSURE(SpilledBuckets.size() > 0, "Internal logic error"); - auto value = static_cast(SpilledBuckets.front().InMemoryProcessingState->Extract()); + value = static_cast(SpilledBuckets.front().InMemoryProcessingState->Extract()); if (!value) { SpilledBuckets.pop_front(); + if (SpilledBuckets.empty()) IsEverythingExtracted = true; } return value; @@ -503,7 +512,7 @@ class TSpillingSupportState : public TComputationValue { BufferForKeyAndState.resize(0); } - bool FlushSpillingBuffersAndWait() { + EUpdateResult FlushSpillingBuffersAndWait() { UpdateSpillingBuckets(); ui64 finishedCount = 0; @@ -519,11 +528,11 @@ class TSpillingSupportState : public TComputationValue { } } - if (finishedCount != SpilledBuckets.size()) return true; + if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield; SwitchMode(EOperatingMode::ProcessSpilled); - return ProcessSpilledDataAndWait(); + return ProcessSpilledData(); } void SplitStateIntoBuckets() { @@ -628,11 +637,9 @@ class TSpillingSupportState : public TComputationValue { return false; } - bool ProcessSpilledDataAndWait() { - if (SpilledBuckets.empty()) return false; - + EUpdateResult ProcessSpilledData() { if (AsyncReadOperation) { - if (!AsyncReadOperation->HasValue()) return true; + if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield; if (RecoverState) { SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory); } else { @@ -642,12 +649,8 @@ class TSpillingSupportState : public TComputationValue { } auto& bucket = SpilledBuckets.front(); - if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false; - if (HasDataForProcessing) { - Tongue = bucket.InMemoryProcessingState->Tongue; - Throat = bucket.InMemoryProcessingState->Throat; - return false; - } + if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::Extract; + //recover spilled state while(!bucket.SpilledState->Empty()) { RecoverState = true; @@ -655,7 +658,7 @@ class TSpillingSupportState : public TComputationValue { AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState); if (AsyncReadOperation) { BufferForKeyAndState.resize(0); - return true; + return EUpdateResult::Yield; } for (size_t i = 0; i< KeyWidth; ++i) { //jumping into unsafe world, refusing ownership @@ -675,18 +678,16 @@ class TSpillingSupportState : public TComputationValue { BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount()); AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems); if (AsyncReadOperation) { - return true; + return EUpdateResult::Yield; } + Throat = BufferForUsedInputItems.data(); Tongue = bucket.InMemoryProcessingState->Tongue; - Throat = bucket.InMemoryProcessingState->Throat; - HasRawDataToExtract = true; - return false; + return EUpdateResult::ExtractRawData; } bucket.BucketState = TSpilledBucket::EBucketState::InMemory; - HasDataForProcessing = false; - return false; + return EUpdateResult::Extract; } EOperatingMode GetMode() const { @@ -744,9 +745,7 @@ class TSpillingSupportState : public TComputationValue { private: ui64 NextBucketToSpill = 0; - bool HasDataForProcessing = false; - - bool HasRawDataToExtract = false; + bool IsEverythingExtracted = false; TState InMemoryProcessingState; const TMultiType* const UsedInputItemType; @@ -1268,50 +1267,49 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNodeUpdateAndWait()) { - return EFetchResult::Yield; - } - if (ptr->InputStatus != EFetchResult::Finish) { - for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) - fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); - switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { - case EFetchResult::One: - break; - case EFetchResult::Finish: - continue; - case EFetchResult::Yield: - return EFetchResult::Yield; + switch(ptr->Update()) { + case TSpillingSupportState::EUpdateResult::ReadInput: { + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) + fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); + switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { + case EFetchResult::One: + break; + case EFetchResult::Finish: + continue; + case EFetchResult::Yield: + return EFetchResult::Yield; + } + break; } + case TSpillingSupportState::EUpdateResult::Yield: + return EFetchResult::Yield; + case TSpillingSupportState::EUpdateResult::ExtractRawData: + Nodes.ExtractValues(ctx, static_cast(ptr->Throat), fields); + break; + case TSpillingSupportState::EUpdateResult::Extract: + if (const auto values = static_cast(ptr->Extract())) { + Nodes.FinishItem(ctx, values, output); + return EFetchResult::One; + } + continue; + case TSpillingSupportState::EUpdateResult::Finish: + return EFetchResult::Finish; } - if (ptr->IsProcessingRequired()) { - Nodes.ExtractKey(ctx, fields, static_cast(ptr->Tongue)); + Nodes.ExtractKey(ctx, fields, static_cast(ptr->Tongue)); - switch(ptr->TasteIt()) { - case TSpillingSupportState::ETasteResult::Init: - Nodes.ProcessItem(ctx, nullptr, static_cast(ptr->Throat)); - break; - case TSpillingSupportState::ETasteResult::Update: - Nodes.ProcessItem(ctx, static_cast(ptr->Tongue), static_cast(ptr->Throat)); - break; - case TSpillingSupportState::ETasteResult::ConsumeRawData: - Nodes.ExtractValues(ctx, fields, static_cast(ptr->Throat)); - break; - case TSpillingSupportState::ETasteResult::ExtractRawData: - Nodes.ExtractValues(ctx, static_cast(ptr->Throat), fields); - break; - } - continue; - } - - if (const auto values = static_cast(ptr->Extract())) { - Nodes.FinishItem(ctx, values, output); - return EFetchResult::One; + switch(ptr->TasteIt()) { + case TSpillingSupportState::ETasteResult::Init: + Nodes.ProcessItem(ctx, nullptr, static_cast(ptr->Throat)); + break; + case TSpillingSupportState::ETasteResult::Update: + Nodes.ProcessItem(ctx, static_cast(ptr->Tongue), static_cast(ptr->Throat)); + break; + case TSpillingSupportState::ETasteResult::ConsumeRawData: + Nodes.ExtractValues(ctx, fields, static_cast(ptr->Throat)); + break; } - if (!ptr->HasAnyData()) { - return EFetchResult::Finish; - } } } Y_UNREACHABLE(); @@ -1366,13 +1364,18 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNodegetType()}, false); + const auto updateFuncPtr = CastInst::Create(Instruction::IntToPtr, updateFunc, PointerType::getUnqual(updateType), "update_func", block); + const auto update = CallInst::Create(updateType, updateFuncPtr, { stateArg }, "update", block); result->addIncoming(ConstantInt::get(statusType, static_cast(EFetchResult::Yield)), block); - BranchInst::Create(over, test, waitMore, block); + const auto updateWay = SwitchInst::Create(update, test, 3U, block); + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::Yield)), over); + // TODO add exctraction code and jmp there + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::ExtractRawData)), test); + updateWay->addCase(ConstantInt::get(wayType, static_cast(TSpillingSupportState::EUpdateResult::Extract)), test); block = test;