Skip to content

Commit

Permalink
WideCombiner with spilling better buffer pass (#7022)
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll authored Jul 25, 2024
1 parent d3fa399 commit 833f929
Showing 1 changed file with 91 additions and 88 deletions.
179 changes: 91 additions & 88 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,15 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
enum class ETasteResult: i8 {
Init = -1,
Update,
ConsumeRawData,
ExtractRawData
ConsumeRawData
};

enum class EUpdateResult: i8 {
Yield = -1,
ExtractRawData,
ReadInput,
Extract,
Finish
};
TSpillingSupportState(
TMemoryUsageInfo* memInfo,
Expand Down Expand Up @@ -398,28 +405,29 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
bool IsProcessingRequired() const {
if (InputStatus != EFetchResult::Finish) return true;

return HasRawDataToExtract || HasDataForProcessing;
return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory;
}

bool UpdateAndWait() {
EUpdateResult Update() {
if (IsEverythingExtracted) return EUpdateResult::Finish;

switch (GetMode()) {
case EOperatingMode::InMemory: {
if (CheckMemoryAndSwitchToSpilling()) {
return UpdateAndWait();
return Update();
}
return false;
if (InputStatus == EFetchResult::Finish) return EUpdateResult::Extract;

return EUpdateResult::ReadInput;
}

case EOperatingMode::ProcessSpilled:
return ProcessSpilledDataAndWait();
case EOperatingMode::Spilling: {
UpdateSpillingBuckets();

if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true;
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield;

if (BufferForUsedInputItems.size()) {
auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
if (bucket.AsyncWriteOperation.has_value()) return true;
if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield;

bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
Expand All @@ -429,8 +437,10 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

// Prepare buffer for reading new key
BufferForKeyAndState.resize(KeyWidth);
return false;
return EUpdateResult::ReadInput;
}
case EOperatingMode::ProcessSpilled:
return ProcessSpilledData();
}
}

Expand All @@ -442,14 +452,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
if (GetMode() == EOperatingMode::ProcessSpilled) {
if (HasRawDataToExtract) {
// Tongue not used here.
Throat = BufferForUsedInputItems.data();
HasRawDataToExtract = false;
HasDataForProcessing = true;
return ETasteResult::ExtractRawData;
}
HasDataForProcessing = false;
// while restoration we process buckets one by one starting from the first in a queue
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
Expand All @@ -476,20 +478,27 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
BufferForUsedInputItems.resize(ItemNodesSize);
BufferForUsedInputItemsBucketId = bucketId;

Throat = BufferForUsedInputItems.data();

return ETasteResult::ConsumeRawData;
}

NUdf::TUnboxedValuePod* Extract() {
if (GetMode() == EOperatingMode::InMemory) return static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract());
NUdf::TUnboxedValue* value = nullptr;
if (GetMode() == EOperatingMode::InMemory) {
value = static_cast<NUdf::TUnboxedValue*>(InMemoryProcessingState.Extract());
if (!value) IsEverythingExtracted = true;
return value;
}

MKQL_ENSURE(SpilledBuckets.front().BucketState == TSpilledBucket::EBucketState::InMemory, "Internal logic error");
MKQL_ENSURE(SpilledBuckets.size() > 0, "Internal logic error");

auto value = static_cast<NUdf::TUnboxedValue*>(SpilledBuckets.front().InMemoryProcessingState->Extract());
value = static_cast<NUdf::TUnboxedValue*>(SpilledBuckets.front().InMemoryProcessingState->Extract());
if (!value) {
SpilledBuckets.pop_front();
if (SpilledBuckets.empty()) IsEverythingExtracted = true;
}

return value;
Expand All @@ -503,7 +512,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForKeyAndState.resize(0);
}

bool FlushSpillingBuffersAndWait() {
EUpdateResult FlushSpillingBuffersAndWait() {
UpdateSpillingBuckets();

ui64 finishedCount = 0;
Expand All @@ -519,11 +528,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}
}

if (finishedCount != SpilledBuckets.size()) return true;
if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield;

SwitchMode(EOperatingMode::ProcessSpilled);

return ProcessSpilledDataAndWait();
return ProcessSpilledData();
}

void SplitStateIntoBuckets() {
Expand Down Expand Up @@ -628,11 +637,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return false;
}

bool ProcessSpilledDataAndWait() {
if (SpilledBuckets.empty()) return false;

EUpdateResult ProcessSpilledData() {
if (AsyncReadOperation) {
if (!AsyncReadOperation->HasValue()) return true;
if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield;
if (RecoverState) {
SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory);
} else {
Expand All @@ -642,20 +649,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

auto& bucket = SpilledBuckets.front();
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
if (HasDataForProcessing) {
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;
return false;
}
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::Extract;

//recover spilled state
while(!bucket.SpilledState->Empty()) {
RecoverState = true;
BufferForKeyAndState.resize(KeyAndStateType->GetElementsCount());
AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState);
if (AsyncReadOperation) {
BufferForKeyAndState.resize(0);
return true;
return EUpdateResult::Yield;
}
for (size_t i = 0; i< KeyWidth; ++i) {
//jumping into unsafe world, refusing ownership
Expand All @@ -675,18 +678,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
if (AsyncReadOperation) {
return true;
return EUpdateResult::Yield;
}

Throat = BufferForUsedInputItems.data();
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;

HasRawDataToExtract = true;
return false;
return EUpdateResult::ExtractRawData;
}
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
HasDataForProcessing = false;
return false;
return EUpdateResult::Extract;
}

EOperatingMode GetMode() const {
Expand Down Expand Up @@ -744,9 +745,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
private:
ui64 NextBucketToSpill = 0;

bool HasDataForProcessing = false;

bool HasRawDataToExtract = false;
bool IsEverythingExtracted = false;

TState InMemoryProcessingState;
const TMultiType* const UsedInputItemType;
Expand Down Expand Up @@ -1268,50 +1267,49 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
auto **fields = ctx.WideFields.data() + WideFieldsIndex;

while (true) {
if (ptr->UpdateAndWait()) {
return EFetchResult::Yield;
}
if (ptr->InputStatus != EFetchResult::Finish) {
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
break;
case EFetchResult::Finish:
continue;
case EFetchResult::Yield:
return EFetchResult::Yield;
switch(ptr->Update()) {
case TSpillingSupportState::EUpdateResult::ReadInput: {
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
break;
case EFetchResult::Finish:
continue;
case EFetchResult::Yield:
return EFetchResult::Yield;
}
break;
}
case TSpillingSupportState::EUpdateResult::Yield:
return EFetchResult::Yield;
case TSpillingSupportState::EUpdateResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
case TSpillingSupportState::EUpdateResult::Extract:
if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) {
Nodes.FinishItem(ctx, values, output);
return EFetchResult::One;
}
continue;
case TSpillingSupportState::EUpdateResult::Finish:
return EFetchResult::Finish;
}

if (ptr->IsProcessingRequired()) {
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));

switch(ptr->TasteIt()) {
case TSpillingSupportState::ETasteResult::Init:
Nodes.ProcessItem(ctx, nullptr, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::Update:
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::ConsumeRawData:
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
}
continue;
}

if (const auto values = static_cast<NUdf::TUnboxedValue*>(ptr->Extract())) {
Nodes.FinishItem(ctx, values, output);
return EFetchResult::One;
switch(ptr->TasteIt()) {
case TSpillingSupportState::ETasteResult::Init:
Nodes.ProcessItem(ctx, nullptr, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::Update:
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::ConsumeRawData:
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
}

if (!ptr->HasAnyData()) {
return EFetchResult::Finish;
}
}
}
Y_UNREACHABLE();
Expand Down Expand Up @@ -1366,13 +1364,18 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra

block = more;

const auto waitMoreFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::UpdateAndWait));
const auto waitMoreFuncPtr = CastInst::Create(Instruction::IntToPtr, waitMoreFunc, PointerType::getUnqual(boolFuncType), "wait_more_func", block);
const auto waitMore = CallInst::Create(boolFuncType, waitMoreFuncPtr, { stateArg }, "wait_more", block);
const auto updateFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::Update));
const auto updateType = FunctionType::get(wayType, {stateArg->getType()}, false);
const auto updateFuncPtr = CastInst::Create(Instruction::IntToPtr, updateFunc, PointerType::getUnqual(updateType), "update_func", block);
const auto update = CallInst::Create(updateType, updateFuncPtr, { stateArg }, "update", block);

result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block);

BranchInst::Create(over, test, waitMore, block);
const auto updateWay = SwitchInst::Create(update, test, 3U, block);
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::Yield)), over);
// TODO add exctraction code and jmp there
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::ExtractRawData)), test);
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::Extract)), test);

block = test;

Expand Down

0 comments on commit 833f929

Please sign in to comment.