Skip to content

Commit

Permalink
Allow UPDATE ON, UPSERT with partial set of input...
Browse files Browse the repository at this point in the history
columns in case of unique index.

Example:
  table: pk, fk1, fk2. Uniq index: fk1, fk2.

To perform UPSERT INTO table (pk, fk1) we
need to read missed value fk2 from main table to
create lookup uniq index key.

Right now we have two lookups in to main table
- first one has been described above
- second we need to make to remove old value
from index. It will be fixed in a next step.
  • Loading branch information
dcherednik committed Jan 9, 2024
1 parent 4ff35fe commit 5831cb3
Show file tree
Hide file tree
Showing 9 changed files with 1,106 additions and 73 deletions.
1 change: 0 additions & 1 deletion ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ TMaybe<TCondenseInputResult> CondenseInput(const TExprBase& input, TExprContext&
TVector<TCoArgument> stageArguments;

if (IsDqPureExpr(input)) {
YQL_ENSURE(input.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List, "" << input.Ref().Dump());
auto stream = Build<TCoToStream>(ctx, input.Pos())
.Input<TCoJust>()
.Input(input)
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ TMaybe<TCondenseInputResult> CondenseInputToDictByPk(const NYql::NNodes::TExprBa
const NYql::TKikimrTableDescription& table, const NYql::NNodes::TCoLambda& payloadSelector,
NYql::TExprContext& ctx);

NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDict(
const NYql::NNodes::TDqPhyPrecompute& lookupKeys, const NYql::TKikimrTableDescription& table,
const TVector<NYql::NNodes::TExprBase>& columnsList,
NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool fixLookupKeys);

NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDict(
const NYql::NNodes::TDqPhyPrecompute& lookupKeys, const NYql::TKikimrTableDescription& table,
const THashSet<TString>& dataColumns, const THashSet<TString>& keyColumns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx);

NYql::NNodes::TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& condenseResult,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);

// Creates key selector using PK of given table
NYql::NNodes::TCoLambda MakeTableKeySelector(const NYql::TKikimrTableMetadataPtr tableMeta, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, TMaybe<int> tupleId = {});
Expand Down
91 changes: 78 additions & 13 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ TVector<TExprBase> CreateColumnsToSelectToUpdateIndex(
return columnsToSelect;
}

TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) {
} // namespace

TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& condenseResult,
TPositionHandle pos, TExprContext& ctx)
{
auto computeDictStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(condenseResult.StageInputs)
Expand All @@ -70,8 +74,6 @@ TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPos
.Done();
}

} // namespace

TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector(
const TKikimrTableDescription& table,
TPositionHandle pos,
Expand Down Expand Up @@ -127,26 +129,79 @@ TSecondaryIndexes BuildSecondaryIndexVector(const TKikimrTableDescription& table
}

TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
const TKikimrTableDescription& table, const THashSet<TString>& dataColumns,
const THashSet<TString>& keyColumns, TPositionHandle pos, TExprContext& ctx)
const TKikimrTableDescription& table, const TVector<TExprBase>& columnsList,
TPositionHandle pos, TExprContext& ctx, bool fixLookupKeys)
{
auto lookupColumns = CreateColumnsToSelectToUpdateIndex(table.Metadata->KeyColumnNames, dataColumns,
keyColumns, pos, ctx);

auto lookupColumnsList = Build<TCoAtomList>(ctx, pos)
.Add(lookupColumns)
.Add(columnsList)
.Done();

TExprNode::TPtr keys;

if (fixLookupKeys) {
auto keyArg = TCoArgument(ctx.NewArgument(pos, "key"));
auto keysList = TCoArgument(ctx.NewArgument(pos, "keys_list"));
TVector<TExprBase> keyLookupTuples;
keyLookupTuples.reserve(table.Metadata->KeyColumnNames.size());

for (const auto& key : table.Metadata->KeyColumnNames) {
keyLookupTuples.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(key)
.Value<TCoMember>()
.Struct(keyArg)
.Name().Build(key)
.Build()
.Done());
}

auto list = Build<TCoToStream>(ctx, pos)
.Input<TCoJust>()
.Input<TCoMap>()
.Input(keysList)
.Lambda()
.Args({keyArg})
.Body<TCoAsStruct>()
.Add(keyLookupTuples)
.Build()
.Build()
.Build()
.Build()
.Done().Ptr();

keys = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(lookupKeys)
.Build()
.Program()
.Args({keysList})
.Body(list)
.Build()
.Settings().Build()
.Done().Ptr();

keys = Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(keys)
.Index().Build("0")
.Build()
.Build()
.Done().Ptr();
} else {
keys = lookupKeys.Ptr();
}

auto lookupStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(lookupKeys)
.Add(keys)
.Build()
.Program()
.Args({"keys_list"})
.Args({"keys_stage_arg"})
.Body<TKqpLookupTable>()
.Table(BuildTableMeta(table, pos, ctx))
.LookupKeys<TCoIterator>()
.List("keys_list")
.List("keys_stage_arg")
.Build()
.Columns(lookupColumnsList)
.Build()
Expand All @@ -167,7 +222,17 @@ TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& l
return {};
}

return PrecomputeDict(*condenseLookupResult, lookupKeys.Pos(), ctx);
return PrecomputeCondenseInputResult(*condenseLookupResult, lookupKeys.Pos(), ctx);
}

TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
const TKikimrTableDescription& table, const THashSet<TString>& dataColumns,
const THashSet<TString>& keyColumns, TPositionHandle pos, TExprContext& ctx)
{
auto lookupColumns = CreateColumnsToSelectToUpdateIndex(table.Metadata->KeyColumnNames, dataColumns,
keyColumns, pos, ctx);

return PrecomputeTableLookupDict(lookupKeys, table, lookupColumns, pos, ctx, false);
}

TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,
Expand Down
55 changes: 45 additions & 10 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,17 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
// Compatibility with PG semantic - allow multiple null in columns with unique constaint
TVector<TCoAtom> skipNullColumns;
skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());

bool used = false;
for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
if (!inputColumns || inputColumns->contains(column)) {
TCoAtom atom(ctx.NewAtom(pos, column));
skipNullColumns.emplace_back(atom);
}
used |= (!inputColumns || inputColumns->contains(column));
TCoAtom atom(ctx.NewAtom(pos, column));
skipNullColumns.emplace_back(atom);
}

//no columns to skip -> no index columns to check -> skip check
if (skipNullColumns.empty()) {
continue;
}
// Just to doublecheck we are not trying to update index without data to update
YQL_ENSURE(used, "Index is used but not input columns for update. Probably it's a bug."
" Index: " << table.Metadata->Indexes[i].Name);

auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
.Input(rowsListArg)
Expand All @@ -292,11 +292,46 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
return checks;
}

static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const THashSet<TStringBuf>* inputColumns,
TPositionHandle pos, TExprContext& ctx)
{
if (!inputColumns) {
return rowsListArg.Ptr();
}

auto arg = TCoArgument(ctx.NewArgument(pos, "arg"));

TVector<TExprBase> columns;
columns.reserve(inputColumns->size());

for (const auto x : *inputColumns) {
columns.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(x)
.Value<TCoMember>()
.Struct(arg)
.Name().Build(x)
.Build()
.Done());
}

return Build<TCoMap>(ctx, pos)
.Input(rowsListArg)
.Lambda()
.Args({arg})
.Body<TCoAsStruct>()
.Add(columns)
.Build()
.Build()
.Done().Ptr();
}

TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes,
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
, False(MakeBool(pos, false, ctx))
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck))
, RowsToPass(CreateRowsToPass(RowsListArg, inputColumns, pos, ctx))
{}

TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector,
Expand Down Expand Up @@ -340,7 +375,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co

types.emplace_back(
Build<TCoTypeOf>(ctx, pos)
.Value(RowsListArg)
.Value(RowsToPass)
.Done()
);

Expand Down Expand Up @@ -376,7 +411,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co

variants.emplace_back(
Build<TCoVariant>(ctx, pos)
.Item(RowsListArg)
.Item(RowsToPass)
.Index().Build("0")
.VarType(variantType)
.Done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TUniqBuildHelper {
const NYql::TExprNode::TPtr False;
private:
const TChecks Checks;
const NYql::TExprNode::TPtr RowsToPass;
};

TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
Expand Down
Loading

0 comments on commit 5831cb3

Please sign in to comment.