Skip to content

Commit

Permalink
Merge 1a26206 into bc843db
Browse files Browse the repository at this point in the history
  • Loading branch information
avevad authored Jan 26, 2024
2 parents bc843db + 1a26206 commit 72cc40a
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 58 deletions.
34 changes: 16 additions & 18 deletions ydb/library/yql/minikql/comp_nodes/mkql_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,37 @@ class TState : public TComputationValue<TState> {
};
#endif

class TExtendWideFlowWrapper : public TStatefulWideFlowCodegeneratorNode<TExtendWideFlowWrapper> {
class TExtendWideFlowWrapper : public TStatefulWideFlowCodegeneratorNodeImpl<TExtendWideFlowWrapper, TState, true> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TExtendWideFlowWrapper>;
public:
TExtendWideFlowWrapper(TComputationMutables& mutables, TComputationWideFlowNodePtrVector&& flows, size_t width)
: TBaseComputation(mutables, this, EValueRepresentation::Boxed)
: TStatefulWideFlowCodegeneratorNodeImpl(mutables, this)
, Flows_(std::move(flows)), Width_(width)
{
#ifdef MKQL_DISABLE_CODEGEN
Y_UNUSED(Width_);
#endif
}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
auto& s = GetState(state, ctx);
while (s.Index >= 0) {
switch (Flows_[s.Index]->FetchValues(ctx, output)) {
void InitState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
state = ctx.HolderFactory.Create<TState>(Flows_.size());
}

// Codegen compatibility
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
InitState(state, ctx);
}

TMaybe<EFetchResult> DoFetch(TState &state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
while (state.Index >= 0) {
switch (Flows_[state.Index]->FetchValues(ctx, output)) {
case EFetchResult::One:
return EFetchResult::One;
case EFetchResult::Yield:
s.NextFlow();
state.NextFlow();
return EFetchResult::Yield;
case EFetchResult::Finish:
s.FlowOver();
state.FlowOver();
break;
}
}
Expand Down Expand Up @@ -204,16 +212,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TExtendWideFlowWrapp
}
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(Flows_.size());
}

TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue())
MakeState(ctx, state);
return *static_cast<TState*>(state.AsBoxed().Get());
}

const TComputationWideFlowNodePtrVector Flows_;
const size_t Width_;
};
Expand Down
15 changes: 7 additions & 8 deletions ydb/library/yql/minikql/comp_nodes/mkql_if.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,21 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TFlowIfWrapper<IsOptiona
IComputationNode* const ElseBranch;
};

class TWideIfWrapper : public TStatefulWideFlowCodegeneratorNode<TWideIfWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideIfWrapper>;
class TWideIfWrapper : public TStatefulWideFlowCodegeneratorNodeImpl<TWideIfWrapper, bool>{
public:
TWideIfWrapper(TComputationMutables& mutables, IComputationNode* predicate, IComputationWideFlowNode* thenBranch, IComputationWideFlowNode* elseBranch)
: TBaseComputation(mutables, nullptr, EValueRepresentation::Embedded)
: TStatefulWideFlowCodegeneratorNodeImpl<TWideIfWrapper, bool>(mutables, nullptr)
, Predicate(predicate)
, ThenBranch(thenBranch)
, ElseBranch(elseBranch)
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (state.IsInvalid()) {
state = Predicate->GetValue(ctx);
}
void InitState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
state = Predicate->GetValue(ctx);
}

return (state.Get<bool>() ? ThenBranch : ElseBranch)->FetchValues(ctx, output);
TMaybe<EFetchResult> DoFetch(bool pred, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
return (pred ? ThenBranch : ElseBranch)->FetchValues(ctx, output);
}
#ifndef MKQL_DISABLE_CODEGEN
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
Expand Down
26 changes: 10 additions & 16 deletions ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,31 +117,25 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TSkipFlowWrapper>;
IComputationNode* const Count;
};

class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNodeImpl<TWideSkipWrapper, ui64> {
public:
TWideSkipWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, ui32 size)
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
: TStatefulWideFlowCodegeneratorNodeImpl<TWideSkipWrapper, ui64>(mutables, flow)
, Flow(flow)
, Count(count)
, StubsIndex(mutables.IncrementWideFieldsIndex(size))
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (state.IsInvalid()) {
state = Count->GetValue(ctx);
}

if (auto count = state.Get<ui64>()) {
do if (const auto result = Flow->FetchValues(ctx, ctx.WideFields.data() + StubsIndex); EFetchResult::One != result) {
state = NUdf::TUnboxedValuePod(count);
return result;
} while (--count);
void InitState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
state = Count->GetValue(ctx);
}

state = NUdf::TUnboxedValuePod::Zero();
TMaybe<EFetchResult> DoFetch(ui64 &count, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (auto res = Flow->FetchValues(ctx, output); res != EFetchResult::One || count == 0) {
return res;
}

return Flow->FetchValues(ctx, output);
count--;
return Nothing();
}

#ifndef MKQL_DISABLE_CODEGEN
Expand Down
26 changes: 10 additions & 16 deletions ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,22 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TTakeFlowWrapper>;
IComputationNode* const Count;
};

class TWideTakeWrapper : public TStatefulWideFlowCodegeneratorNode<TWideTakeWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideTakeWrapper>;
class TWideTakeWrapper : public TStatefulWideFlowCodegeneratorNodeImpl<TWideTakeWrapper, ui64> {
public:
TWideTakeWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count)
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow), Count(count)
: TStatefulWideFlowCodegeneratorNodeImpl<TWideTakeWrapper, ui64>(mutables, flow), Flow(flow), Count(count)
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (state.IsInvalid()) {
state = Count->GetValue(ctx);
}
void InitState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
state = Count->GetValue(ctx);
}

if (auto count = state.Get<ui64>()) {
if (const auto result = Flow->FetchValues(ctx, output); EFetchResult::One == result) {
state = NUdf::TUnboxedValuePod(--count);
return EFetchResult::One;
} else {
return result;
}
TMaybe<EFetchResult> DoFetch(ui64 &count, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (count == 0) {
return EFetchResult::Finish;
}

return EFetchResult::Finish;
count--;
return Flow->FetchValues(ctx, output);
}
#ifndef MKQL_DISABLE_CODEGEN
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1486,3 +1486,37 @@ Value* CheckAdjustedMemLimit(ui64 limit, Value* init, const TCodegenContext& ctx
}
}
#endif

namespace NKikimr::NMiniKQL {

template<typename TDerivedWrapper, typename TState, bool BoxedState = false>
class TStatefulWideFlowCodegeneratorNodeImpl : public TStatefulWideFlowCodegeneratorNode<TDerivedWrapper> {
public:
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TDerivedWrapper>;

TStatefulWideFlowCodegeneratorNodeImpl(TComputationMutables &mutables, IComputationWideFlowNode *flow)
: TBaseComputation(mutables, flow, BoxedState ? EValueRepresentation::Boxed : EValueRepresentation::Embedded) {}

EFetchResult
DoCalculate(NUdf::TUnboxedValue &state, TComputationContext &ctx, NUdf::TUnboxedValue *const *output) const {
if (state.IsInvalid()) {
static_cast<const TDerivedWrapper *>(this)->InitState(state, ctx);
}

TState *ptr = nullptr;
if constexpr (BoxedState) {
ptr = static_cast<TState *>(state.AsBoxed().Get());
} else {
ptr = static_cast<TState *>(state.GetRawPtr());
}

while (true) {
TMaybe<EFetchResult> result = static_cast<const TDerivedWrapper *>(this)->DoFetch(*ptr, ctx, output);
if (!result.Empty()) {
return result.GetRef();
}
}
}
};

}

0 comments on commit 72cc40a

Please sign in to comment.