Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQL-17330 Generalize stateful wide flow nodes #1203

Merged
merged 36 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1740763
First iteration of copy-paste removal
avevad Jan 30, 2024
af580c8
State initialization with LLVM
avevad Feb 2, 2024
1d8e5d7
Generalize DoCalculate of wide Skip() and Take()
avevad Feb 5, 2024
ec7b9cd
Generalize DoGenGetValues of wide Skip() and Take()
avevad Feb 12, 2024
9ad27aa
Separate simple wide node implementation from general headers
avevad Feb 20, 2024
bbc8ce6
Generalize DoGenGetValues of wide Filter() with limit
avevad Feb 21, 2024
b114099
Generalize DoGenGetValues of wide While-variations
avevad Feb 21, 2024
9a443c2
Fix build errors after rebase
avevad Feb 21, 2024
e660beb
Improve interface of simple stateful wide flow nodes
avevad Feb 27, 2024
bcacebd
Fix introduced bugs
avevad Feb 27, 2024
9886096
Adapt LLVM to new interface of simple nodes
avevad Feb 28, 2024
e6963c4
Add support for Boxed states in simple wide codegen nodes
avevad Mar 4, 2024
ed9e1cb
Generalize wide combiner node
avevad Mar 4, 2024
48b98b4
Fix incorrect LLVM logic in simple wide nodes
avevad Mar 4, 2024
905ed6e
Improve interface of simple wide codegen nodes
avevad Mar 7, 2024
ae1edaf
Adapt LLVM part of simple wide codegen nodes
avevad Mar 7, 2024
53b0c3e
Fix typo bug
avevad Mar 11, 2024
cfb0f06
Fix DoCalculate() protocol inconsistency bug
avevad Mar 11, 2024
9cda2be
Fix another typo bug
avevad Mar 11, 2024
d878d47
Fix LLVM codegen interface inconsistency bug
avevad Mar 12, 2024
37518cc
Revert WideCombine refactoring temporarily
avevad Mar 13, 2024
234a26e
Revert WideTake temporarily
avevad Mar 14, 2024
07a38b9
Add custom codegen functionality to simple wide nodes, implement it i…
avevad Mar 14, 2024
5ebe02b
Fix build errors
avevad Mar 14, 2024
3ca043a
Fix more build errors
avevad Mar 14, 2024
aabad50
Fix build errors again
avevad Mar 14, 2024
1f1722a
Eliminate some code bloat
avevad Mar 18, 2024
87d1679
Fix build error
avevad Mar 18, 2024
b53b5cf
Fix segfault in LLVM code
avevad Mar 19, 2024
cdce6c2
Fix errors after rebase
avevad Apr 15, 2024
977eded
Add JIT substitution for TWideSkipWhileWrapper
avevad Apr 26, 2024
89c7211
Add JIT substitution for WideTakeWhile
avevad May 27, 2024
c6fee41
Add LLVM substitution for WideFilterWithLimit
avevad May 27, 2024
8278540
Small fixes
avevad May 27, 2024
235e9b6
Move helper files to computation
avevad May 30, 2024
a8c24aa
Fix clang14 build error
avevad May 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 35 additions & 83 deletions ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mkql_skip.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
#include <ydb/library/yql/minikql/computation/mkql_simple_codegen.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>

namespace NKikimr {
Expand Down Expand Up @@ -117,105 +118,56 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TSkipFlowWrapper>;
IComputationNode* const Count;
};

class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper> {
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
class TWideSkipWrapper : public TSimpleStatefulWideFlowCodegeneratorNode<TWideSkipWrapper, ui64> {
using TBaseComputation = TSimpleStatefulWideFlowCodegeneratorNode<TWideSkipWrapper, ui64>;
public:
TWideSkipWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, ui32 size)
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
: TBaseComputation(mutables, flow, size, size)
, Flow(flow)
, Count(count)
, StubsIndex(mutables.IncrementWideFieldsIndex(size))
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (state.IsInvalid()) {
state = Count->GetValue(ctx);
}
void InitState(NUdf::TUnboxedValue& cntToSkip, TComputationContext& ctx) const {
cntToSkip = Count->GetValue(ctx);
}

if (auto count = state.Get<ui64>()) {
do if (const auto result = Flow->FetchValues(ctx, ctx.WideFields.data() + StubsIndex); EFetchResult::One != result) {
state = NUdf::TUnboxedValuePod(count);
return result;
} while (--count);
NUdf::TUnboxedValue*const* PrepareInput(NUdf::TUnboxedValue& cntToSkip, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
return cntToSkip.Get<ui64>() ? ctx.WideFields.data() + StubsIndex : output;
}

state = NUdf::TUnboxedValuePod::Zero();
TMaybeFetchResult DoProcess(NUdf::TUnboxedValue& cntToSkip, TComputationContext&, TMaybeFetchResult fetchRes, NUdf::TUnboxedValue*const*) const {
if (fetchRes.Get() == EFetchResult::One && cntToSkip.Get<ui64>()) {
cntToSkip = NUdf::TUnboxedValuePod(cntToSkip.Get<ui64>() - 1);
return TMaybeFetchResult::None();
}

return Flow->FetchValues(ctx, output);
return fetchRes;
}

#ifndef MKQL_DISABLE_CODEGEN
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
TGenerateResult GenFetchProcess(Value* statePtrVal, const TCodegenContext& ctx, const TResultCodegenerator& fetchGenerator, BasicBlock*& block) const override {
auto& context = ctx.Codegen.GetContext();

const auto valueType = Type::getInt128Ty(context);

const auto init = BasicBlock::Create(context, "init", ctx.Func);
const auto main = BasicBlock::Create(context, "main", ctx.Func);

const auto load = new LoadInst(valueType, statePtr, "load", block);
const auto state = PHINode::Create(valueType, 2U, "state", main);
state->addIncoming(load, block);
BranchInst::Create(init, main, IsInvalid(load, block), block);

block = init;

GetNodeValue(statePtr, Count, ctx, block);
const auto save = new LoadInst(valueType, statePtr, "save", block);
state->addIncoming(save, block);
BranchInst::Create(main, block);

block = main;

const auto work = BasicBlock::Create(context, "work", ctx.Func);
const auto good = BasicBlock::Create(context, "good", ctx.Func);
const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
const auto done = BasicBlock::Create(context, "done", ctx.Func);

const auto resultType = Type::getInt32Ty(context);
const auto result = PHINode::Create(resultType, 2U, "result", done);

const auto trunc = GetterFor<ui64>(state, context, block);

const auto count = PHINode::Create(trunc->getType(), 2U, "count", work);
count->addIncoming(trunc, block);

const auto plus = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, trunc, ConstantInt::get(trunc->getType(), 0ULL), "plus", block);

BranchInst::Create(work, skip, plus, block);

block = work;
const auto status = GetNodeValues(Flow, ctx, block).first;
const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, status, ConstantInt::get(status->getType(), 0), "special", block);
BranchInst::Create(pass, good, special, block);

block = pass;
new StoreInst(SetterFor<ui64>(count, context, block), statePtr, block);
result->addIncoming(status, block);
BranchInst::Create(done, block);

block = good;

const auto decr = BinaryOperator::CreateSub(count, ConstantInt::get(count->getType(), 1ULL), "decr", block);
const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, decr, ConstantInt::get(decr->getType(), 0ULL), "next", block);
count->addIncoming(decr, block);
BranchInst::Create(work, exit, next, block);

block = exit;
new StoreInst(SetterFor<ui64>(decr, context, block), statePtr, block);
BranchInst::Create(skip, block);

block = skip;
auto getres = GetNodeValues(Flow, ctx, block);
result->addIncoming(getres.first, block);
BranchInst::Create(done, block);

block = done;
return {result, std::move(getres.second)};
const auto decr = BasicBlock::Create(context, "decr", ctx.Func);
const auto end = BasicBlock::Create(context, "end", ctx.Func);

const auto fetched = fetchGenerator(ctx, block);
const auto cntToSkipVal = GetterFor<ui64>(new LoadInst(IntegerType::getInt128Ty(context), statePtrVal, "unboxed_state", block), context, block);
const auto needSkipCond = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, cntToSkipVal, ConstantInt::get(cntToSkipVal->getType(), 0), "need_skip", block);
const auto gotOneCond = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, fetched.first, ConstantInt::get(fetched.first->getType(), 1), "got_one", block);
const auto willSkipCond = BinaryOperator::Create(Instruction::And, needSkipCond, gotOneCond, "will_skip", block);
BranchInst::Create(decr, end, willSkipCond, block);

block = decr;
const auto cntToSkipNewVal = BinaryOperator::CreateSub(cntToSkipVal, ConstantInt::get(cntToSkipVal->getType(), 1), "decr", block);
new StoreInst(SetterFor<ui64>(cntToSkipNewVal, context, block), statePtrVal, block);
BranchInst::Create(end, block);

block = end;
const auto result = SelectInst::Create(willSkipCond, TMaybeFetchResult::None().LLVMConst(context), TMaybeFetchResult::LLVMFromFetchResult(fetched.first, "fetch_res_ext", block), "result", block);
return {result, fetched.second};
}
#endif

private:
void RegisterDependencies() const final {
if (const auto flow = FlowDependsOn(Flow))
Expand Down
Loading
Loading