Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WideCombiner with spilling better buffer pass #7022

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 46 additions & 49 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
enum class ETasteResult: i8 {
Init = -1,
Update,
ConsumeRawData,
ExtractRawData
ConsumeRawData
};

enum class EUpdateResult: i8 {
Yield = -1,
ExtractRawData,
None
lll-phill-lll marked this conversation as resolved.
Show resolved Hide resolved
};
TSpillingSupportState(
TMemoryUsageInfo* memInfo,
Expand Down Expand Up @@ -398,28 +403,28 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
bool IsProcessingRequired() const {
if (InputStatus != EFetchResult::Finish) return true;

return HasRawDataToExtract || HasDataForProcessing;
return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory;
}

bool UpdateAndWait() {
EUpdateResult Update() {
switch (GetMode()) {
case EOperatingMode::InMemory: {
if (CheckMemoryAndSwitchToSpilling()) {
return UpdateAndWait();
return Update();
}
return false;
return EUpdateResult::None;
}

case EOperatingMode::ProcessSpilled:
return ProcessSpilledDataAndWait();
case EOperatingMode::Spilling: {
UpdateSpillingBuckets();

if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true;
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield;

if (BufferForUsedInputItems.size()) {
auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
if (bucket.AsyncWriteOperation.has_value()) return true;
if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield;

bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
Expand All @@ -429,7 +434,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

// Prepare buffer for reading new key
BufferForKeyAndState.resize(KeyWidth);
return false;
return EUpdateResult::None;
}
}
}
Expand All @@ -442,14 +447,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
if (GetMode() == EOperatingMode::ProcessSpilled) {
if (HasRawDataToExtract) {
// Tongue not used here.
Throat = BufferForUsedInputItems.data();
HasRawDataToExtract = false;
HasDataForProcessing = true;
return ETasteResult::ExtractRawData;
}
HasDataForProcessing = false;
// while restoration we process buckets one by one starting from the first in a queue
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
Expand All @@ -476,8 +473,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
BufferForUsedInputItems.resize(ItemNodesSize);
BufferForUsedInputItemsBucketId = bucketId;

Throat = BufferForUsedInputItems.data();

return ETasteResult::ConsumeRawData;
}

Expand All @@ -503,7 +501,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForKeyAndState.resize(0);
}

bool FlushSpillingBuffersAndWait() {
EUpdateResult FlushSpillingBuffersAndWait() {
UpdateSpillingBuckets();

ui64 finishedCount = 0;
Expand All @@ -519,7 +517,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}
}

if (finishedCount != SpilledBuckets.size()) return true;
if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield;

SwitchMode(EOperatingMode::ProcessSpilled);

Expand Down Expand Up @@ -628,11 +626,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return false;
}

bool ProcessSpilledDataAndWait() {
if (SpilledBuckets.empty()) return false;
EUpdateResult ProcessSpilledDataAndWait() {
if (SpilledBuckets.empty()) return EUpdateResult::None;

if (AsyncReadOperation) {
if (!AsyncReadOperation->HasValue()) return true;
if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield;
if (RecoverState) {
SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory);
} else {
Expand All @@ -642,20 +640,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

auto& bucket = SpilledBuckets.front();
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
if (HasDataForProcessing) {
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;
return false;
}
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::None;

//recover spilled state
while(!bucket.SpilledState->Empty()) {
RecoverState = true;
BufferForKeyAndState.resize(KeyAndStateType->GetElementsCount());
AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState);
if (AsyncReadOperation) {
BufferForKeyAndState.resize(0);
return true;
return EUpdateResult::Yield;
}
for (size_t i = 0; i< KeyWidth; ++i) {
//jumping into unsafe world, refusing ownership
Expand All @@ -675,18 +669,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
if (AsyncReadOperation) {
return true;
return EUpdateResult::Yield;
}

Throat = BufferForUsedInputItems.data();
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;

HasRawDataToExtract = true;
return false;
return EUpdateResult::ExtractRawData;
}
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
HasDataForProcessing = false;
return false;
return EUpdateResult::None;
}

EOperatingMode GetMode() const {
Expand Down Expand Up @@ -744,10 +736,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
private:
ui64 NextBucketToSpill = 0;

bool HasDataForProcessing = false;

bool HasRawDataToExtract = false;

TState InMemoryProcessingState;
const TMultiType* const UsedInputItemType;
const TMultiType* const KeyAndStateType;
Expand Down Expand Up @@ -1259,6 +1247,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
, AllowSpilling(allowSpilling)
{}

// MARK: DoCalculate
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
MakeState(ctx, state);
Expand All @@ -1268,8 +1257,14 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
auto **fields = ctx.WideFields.data() + WideFieldsIndex;

while (true) {
if (ptr->UpdateAndWait()) {
return EFetchResult::Yield;
switch(ptr->Update()) {
case TSpillingSupportState::EUpdateResult::Yield:
return EFetchResult::Yield;
case TSpillingSupportState::EUpdateResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
case TSpillingSupportState::EUpdateResult::None:
break;
}
if (ptr->InputStatus != EFetchResult::Finish) {
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
Expand Down Expand Up @@ -1297,9 +1292,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
case TSpillingSupportState::ETasteResult::ConsumeRawData:
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
}
continue;
}
Expand Down Expand Up @@ -1366,13 +1358,18 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra

block = more;

const auto waitMoreFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::UpdateAndWait));
const auto waitMoreFuncPtr = CastInst::Create(Instruction::IntToPtr, waitMoreFunc, PointerType::getUnqual(boolFuncType), "wait_more_func", block);
const auto waitMore = CallInst::Create(boolFuncType, waitMoreFuncPtr, { stateArg }, "wait_more", block);
const auto updateFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::Update));
const auto updateType = FunctionType::get(wayType, {stateArg->getType()}, false);
const auto updateFuncPtr = CastInst::Create(Instruction::IntToPtr, updateFunc, PointerType::getUnqual(updateType), "update_func", block);
const auto update = CallInst::Create(updateType, updateFuncPtr, { stateArg }, "update", block);

result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block);

BranchInst::Create(over, test, waitMore, block);
const auto updateWay = SwitchInst::Create(update, test, 3U, block);
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::Yield)), over);
// TODO add exctraction code and jmp there
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::ExtractRawData)), test);
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::None)), test);

block = test;

Expand Down
Loading