Skip to content

Commit

Permalink
Merge 5a99902 into 9e693c9
Browse files Browse the repository at this point in the history
  • Loading branch information
avevad authored Feb 27, 2024
2 parents 9e693c9 + 5a99902 commit 6638adf
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 346 deletions.
100 changes: 14 additions & 86 deletions ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mkql_skip.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen_impl.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>

namespace NKikimr {
Expand Down Expand Up @@ -117,8 +118,8 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TSkipFlowWrapper>;
IComputationNode* const Count;
};

class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
class TWideSkipWrapper : public TSimpleStatefulWideFlowCodegeneratorNode<TWideSkipWrapper, ui64> {
using TBaseComputation = TSimpleStatefulWideFlowCodegeneratorNode<TWideSkipWrapper, ui64>;
public:
TWideSkipWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, ui32 size)
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
Expand All @@ -127,95 +128,22 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
, StubsIndex(mutables.IncrementWideFieldsIndex(size))
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (state.IsInvalid()) {
state = Count->GetValue(ctx);
}
void InitState(ui64& count, TComputationContext& ctx) const {
count = Count->GetValue(ctx).Get<ui64>();
}

if (auto count = state.Get<ui64>()) {
do if (const auto result = Flow->FetchValues(ctx, ctx.WideFields.data() + StubsIndex); EFetchResult::One != result) {
state = NUdf::TUnboxedValuePod(count);
return result;
} while (--count);
NUdf::TUnboxedValue*const* PrepareInput(ui64& skipCount, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
return skipCount == 0 ? output : ctx.WideFields.data() + StubsIndex;
}

state = NUdf::TUnboxedValuePod::Zero();
EProcessResult DoProcess(ui64& skipCount, TComputationContext&, EFetchResult fetchRes, NUdf::TUnboxedValue*const*, NUdf::TUnboxedValue*const*) const {
if (fetchRes == EFetchResult::One && skipCount) {
skipCount--;
return EProcessResult::Fetch;
}

return Flow->FetchValues(ctx, output);
return static_cast<EProcessResult>(fetchRes);
}

#ifndef MKQL_DISABLE_CODEGEN
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
auto& context = ctx.Codegen.GetContext();

const auto valueType = Type::getInt128Ty(context);

const auto init = BasicBlock::Create(context, "init", ctx.Func);
const auto main = BasicBlock::Create(context, "main", ctx.Func);

const auto load = new LoadInst(valueType, statePtr, "load", block);
const auto state = PHINode::Create(valueType, 2U, "state", main);
state->addIncoming(load, block);
BranchInst::Create(init, main, IsInvalid(load, block), block);

block = init;

GetNodeValue(statePtr, Count, ctx, block);
const auto save = new LoadInst(valueType, statePtr, "save", block);
state->addIncoming(save, block);
BranchInst::Create(main, block);

block = main;

const auto work = BasicBlock::Create(context, "work", ctx.Func);
const auto good = BasicBlock::Create(context, "good", ctx.Func);
const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
const auto done = BasicBlock::Create(context, "done", ctx.Func);

const auto resultType = Type::getInt32Ty(context);
const auto result = PHINode::Create(resultType, 2U, "result", done);

const auto trunc = GetterFor<ui64>(state, context, block);

const auto count = PHINode::Create(trunc->getType(), 2U, "count", work);
count->addIncoming(trunc, block);

const auto plus = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, trunc, ConstantInt::get(trunc->getType(), 0ULL), "plus", block);

BranchInst::Create(work, skip, plus, block);

block = work;
const auto status = GetNodeValues(Flow, ctx, block).first;
const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, status, ConstantInt::get(status->getType(), 0), "special", block);
BranchInst::Create(pass, good, special, block);

block = pass;
new StoreInst(SetterFor<ui64>(count, context, block), statePtr, block);
result->addIncoming(status, block);
BranchInst::Create(done, block);

block = good;

const auto decr = BinaryOperator::CreateSub(count, ConstantInt::get(count->getType(), 1ULL), "decr", block);
const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, decr, ConstantInt::get(decr->getType(), 0ULL), "next", block);
count->addIncoming(decr, block);
BranchInst::Create(work, exit, next, block);

block = exit;
new StoreInst(SetterFor<ui64>(decr, context, block), statePtr, block);
BranchInst::Create(skip, block);

block = skip;
auto getres = GetNodeValues(Flow, ctx, block);
result->addIncoming(getres.first, block);
BranchInst::Create(done, block);

block = done;
return {result, std::move(getres.second)};
}
#endif
private:
void RegisterDependencies() const final {
if (const auto flow = FlowDependsOn(Flow))
Expand Down
85 changes: 16 additions & 69 deletions ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mkql_take.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen_impl.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>

namespace NKikimr {
Expand Down Expand Up @@ -95,84 +96,30 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TTakeFlowWrapper>;
IComputationNode* const Count;
};

class TWideTakeWrapper : public TStatefulWideFlowCodegeneratorNode<TWideTakeWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideTakeWrapper>;
class TWideTakeWrapper : public TSimpleStatefulWideFlowCodegeneratorNode<TWideTakeWrapper, ui64> {
using TBaseComputation = TSimpleStatefulWideFlowCodegeneratorNode<TWideTakeWrapper, ui64>;
public:
TWideTakeWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count)
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow), Count(count)
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (state.IsInvalid()) {
state = Count->GetValue(ctx);
}

if (auto count = state.Get<ui64>()) {
if (const auto result = Flow->FetchValues(ctx, output); EFetchResult::One == result) {
state = NUdf::TUnboxedValuePod(--count);
return EFetchResult::One;
} else {
return result;
}
}

return EFetchResult::Finish;
void InitState(ui64 &count, TComputationContext& ctx) const {
count = Count->GetValue(ctx).Get<ui64>();
}
#ifndef MKQL_DISABLE_CODEGEN
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
auto& context = ctx.Codegen.GetContext();

const auto valueType = Type::getInt128Ty(context);

const auto init = BasicBlock::Create(context, "init", ctx.Func);
const auto main = BasicBlock::Create(context, "main", ctx.Func);

const auto load = new LoadInst(valueType, statePtr, "load", block);
const auto state = PHINode::Create(load->getType(), 2U, "state", main);
state->addIncoming(load, block);

BranchInst::Create(init, main, IsInvalid(load, block), block);

block = init;

GetNodeValue(statePtr, Count, ctx, block);
const auto save = new LoadInst(valueType, statePtr, "save", block);
state->addIncoming(save, block);
BranchInst::Create(main, block);

block = main;

const auto work = BasicBlock::Create(context, "work", ctx.Func);
const auto good = BasicBlock::Create(context, "good", ctx.Func);
const auto done = BasicBlock::Create(context, "done", ctx.Func);

const auto resultType = Type::getInt32Ty(context);
const auto result = PHINode::Create(resultType, 3U, "result", done);
result->addIncoming(ConstantInt::get(resultType, static_cast<i32>(EFetchResult::Finish)), block);

const auto trunc = GetterFor<ui64>(state, context, block);

const auto plus = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, trunc, ConstantInt::get(trunc->getType(), 0ULL), "plus", block);

BranchInst::Create(work, done, plus, block);

block = work;
const auto getres = GetNodeValues(Flow, ctx, block);
const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), 0), "special", block);
result->addIncoming(getres.first, block);
BranchInst::Create(done, good, special, block);

block = good;

const auto decr = BinaryOperator::CreateSub(trunc, ConstantInt::get(trunc->getType(), 1ULL), "decr", block);
new StoreInst(SetterFor<ui64>(decr, context, block), statePtr, block);
result->addIncoming(getres.first, block);
BranchInst::Create(done, block);
NUdf::TUnboxedValue*const* PrepareInput(ui64& takeCount, TComputationContext&, NUdf::TUnboxedValue*const* output) const {
return takeCount != 0 ? output : nullptr;
}

block = done;
return {result, std::move(getres.second)};
EProcessResult DoProcess(ui64& takeCount, TComputationContext& , EFetchResult fetchRes, NUdf::TUnboxedValue*const*, NUdf::TUnboxedValue*const*) const {
if (takeCount == 0) {
return EProcessResult::Finish;
} else if (fetchRes == EFetchResult::One) {
takeCount--;
}
return static_cast<EProcessResult>(fetchRes);
}
#endif

private:
void RegisterDependencies() const final {
if (const auto flow = FlowDependsOn(Flow))
Expand Down
Loading

0 comments on commit 6638adf

Please sign in to comment.