Skip to content

Commit

Permalink
WideCombiner with spilling better buffer pass
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll committed Jul 23, 2024
1 parent 3822341 commit 390ef80
Showing 1 changed file with 39 additions and 45 deletions.
84 changes: 39 additions & 45 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
enum class ETasteResult: i8 {
Init = -1,
Update,
ConsumeRawData,
ExtractRawData
ConsumeRawData
};

enum class EUpdateResult: i8 {
Yield = -1,
ExtractRawData,
None
};
TSpillingSupportState(
TMemoryUsageInfo* memInfo,
Expand Down Expand Up @@ -398,28 +403,28 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
bool IsProcessingRequired() const {
if (InputStatus != EFetchResult::Finish) return true;

return HasRawDataToExtract || HasDataForProcessing;
return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory;
}

bool UpdateAndWait() {
EUpdateResult Update() {
switch (GetMode()) {
case EOperatingMode::InMemory: {
if (CheckMemoryAndSwitchToSpilling()) {
return UpdateAndWait();
return Update();
}
return false;
return EUpdateResult::None;
}

case EOperatingMode::ProcessSpilled:
return ProcessSpilledDataAndWait();
case EOperatingMode::Spilling: {
UpdateSpillingBuckets();

if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true;
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield;

if (BufferForUsedInputItems.size()) {
auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
if (bucket.AsyncWriteOperation.has_value()) return true;
if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield;

bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
Expand All @@ -429,7 +434,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

// Prepare buffer for reading new key
BufferForKeyAndState.resize(KeyWidth);
return false;
return EUpdateResult::None;
}
}
}
Expand All @@ -442,14 +447,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
if (GetMode() == EOperatingMode::ProcessSpilled) {
if (HasRawDataToExtract) {
// Tongue not used here.
Throat = BufferForUsedInputItems.data();
HasRawDataToExtract = false;
HasDataForProcessing = true;
return ETasteResult::ExtractRawData;
}
HasDataForProcessing = false;
// while restoration we process buckets one by one starting from the first in a queue
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
Expand All @@ -476,8 +473,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
BufferForUsedInputItems.resize(ItemNodesSize);
BufferForUsedInputItemsBucketId = bucketId;

Throat = BufferForUsedInputItems.data();

Tongue = nullptr;


return ETasteResult::ConsumeRawData;
}

Expand All @@ -503,7 +503,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForKeyAndState.resize(0);
}

bool FlushSpillingBuffersAndWait() {
EUpdateResult FlushSpillingBuffersAndWait() {
UpdateSpillingBuckets();

ui64 finishedCount = 0;
Expand All @@ -519,7 +519,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}
}

if (finishedCount != SpilledBuckets.size()) return true;
if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield;

SwitchMode(EOperatingMode::ProcessSpilled);

Expand Down Expand Up @@ -628,11 +628,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return false;
}

bool ProcessSpilledDataAndWait() {
if (SpilledBuckets.empty()) return false;
EUpdateResult ProcessSpilledDataAndWait() {
if (SpilledBuckets.empty()) return EUpdateResult::None;

if (AsyncReadOperation) {
if (!AsyncReadOperation->HasValue()) return true;
if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield;
if (RecoverState) {
SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory);
} else {
Expand All @@ -642,20 +642,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

auto& bucket = SpilledBuckets.front();
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
if (HasDataForProcessing) {
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;
return false;
}
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::None;

//recover spilled state
while(!bucket.SpilledState->Empty()) {
RecoverState = true;
BufferForKeyAndState.resize(KeyAndStateType->GetElementsCount());
AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState);
if (AsyncReadOperation) {
BufferForKeyAndState.resize(0);
return true;
return EUpdateResult::Yield;
}
for (size_t i = 0; i< KeyWidth; ++i) {
//jumping into unsafe world, refusing ownership
Expand All @@ -675,18 +671,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
if (AsyncReadOperation) {
return true;
return EUpdateResult::Yield;
}

Throat = BufferForUsedInputItems.data();
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;

HasRawDataToExtract = true;
return false;
return EUpdateResult::ExtractRawData;
}
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
HasDataForProcessing = false;
return false;
return EUpdateResult::None;
}

EOperatingMode GetMode() const {
Expand Down Expand Up @@ -744,10 +738,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
private:
ui64 NextBucketToSpill = 0;

bool HasDataForProcessing = false;

bool HasRawDataToExtract = false;

TState InMemoryProcessingState;
const TMultiType* const UsedInputItemType;
const TMultiType* const KeyAndStateType;
Expand Down Expand Up @@ -1259,6 +1249,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
, AllowSpilling(allowSpilling)
{}

// MARK: DoCalculate
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
MakeState(ctx, state);
Expand All @@ -1268,8 +1259,14 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
auto **fields = ctx.WideFields.data() + WideFieldsIndex;

while (true) {
if (ptr->UpdateAndWait()) {
return EFetchResult::Yield;
switch(ptr->Update()) {
case TSpillingSupportState::EUpdateResult::Yield:
return EFetchResult::Yield;
case TSpillingSupportState::EUpdateResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
case TSpillingSupportState::EUpdateResult::None:
break;
}
if (ptr->InputStatus != EFetchResult::Finish) {
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
Expand Down Expand Up @@ -1297,9 +1294,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
case TSpillingSupportState::ETasteResult::ConsumeRawData:
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
}
continue;
}
Expand Down

0 comments on commit 390ef80

Please sign in to comment.