Skip to content

Commit

Permalink
Merge 33481ae into 76b6e78
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike authored Feb 7, 2024
2 parents 76b6e78 + 33481ae commit edff821
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 33 deletions.
10 changes: 8 additions & 2 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,18 @@
{
"Name": "TKqlDeleteRows",
"Base": "TKqlDeleteRowsBase",
"Match": {"Type": "Callable", "Name": "KqlDeleteRows"}
"Match": {"Type": "Callable", "Name": "KqlDeleteRows"},
"Children": [
{"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"}
]
},
{
"Name": "TKqlDeleteRowsIndex",
"Base": "TKqlDeleteRowsBase",
"Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"}
"Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"},
"Children": [
{"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"}
]
},
{
"Name": "TKqpDeleteRows",
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const
TStatus AnnotateDeleteRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData)
{
if (!EnsureArgsCount(*node, 2, ctx)) {
if (!EnsureMaxArgsCount(*node, 3, ctx) && !EnsureMinArgsCount(*node, 2, ctx)) {
return TStatus::Error;
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ TExprBase BuildDeleteTable(const TKiWriteTable& write, const TKikimrTableDescrip
return Build<TKqlDeleteRows>(ctx, write.Pos())
.Table(BuildTableMeta(tableData, write.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns(write.ReturningColumns())
.Done();
}

Expand All @@ -438,6 +439,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiWriteTable& write, const TKikimrTab
return Build<TKqlDeleteRowsIndex>(ctx, write.Pos())
.Table(BuildTableMeta(tableData, write.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns(write.ReturningColumns())
.Done();
}

Expand All @@ -464,6 +466,7 @@ TExprBase BuildDeleteTable(const TKiDeleteTable& del, const TKikimrTableDescript
return Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(BuildTableMeta(tableData, del.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns<TCoAtomList>().Build()
.Done();
}

Expand All @@ -483,6 +486,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
auto tableDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(BuildTableMeta(tableData, del.Pos(), ctx))
.Input(ProjectColumns(rowsToDelete, pk, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

TVector<TExprBase> effects;
Expand All @@ -506,6 +510,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
auto indexDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(indexMeta)
.Input(ProjectColumns(rowsToDelete, indexTableColumns, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.push_back(indexDelete);
Expand Down Expand Up @@ -699,6 +704,7 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT
auto indexDelete = Build<TKqlDeleteRows>(ctx, update.Pos())
.Table(indexMeta)
.Input(ProjectColumns(rowsToUpdate, indexTableColumns, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.push_back(indexDelete);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
.Table(deleteRows.Table())
.Input(deleteInput.Cast())
.ReturningColumns(deleteRows.ReturningColumns())
.Done();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto tableDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(del.Table())
.Input(lookupKeys)
.ReturningColumns(del.ReturningColumns())
.Done();

TVector<TExprBase> effects;
Expand All @@ -102,6 +103,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto indexDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(tableNode)
.Input(deleteIndexKeys)
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.emplace_back(std::move(indexDelete));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE
NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteReturningDelete(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

Expand Down
114 changes: 88 additions & 26 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@ TCoAtomList MakeColumnsList(Container rows, TExprContext& ctx, TPositionHandle p
return Build<TCoAtomList>(ctx, pos).Add(columnsVector).Done();
}

template<typename Container>
TExprBase SelectFields(TExprBase node, Container fields, TExprContext& ctx, TPositionHandle pos) {
TVector<TExprBase> items;
for (auto&& field : fields) {
TString name;

if constexpr (std::is_same_v<NYql::NNodes::TCoAtom&&, decltype(field)>) {
name = field.Value();
} else {
name = field;
}

auto tuple = Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(field)
.template Value<TCoMember>()
.Struct(node)
.Name().Build(name)
.Build()
.Done();

items.emplace_back(tuple);
}
return Build<TCoAsStruct>(ctx, pos)
.Add(items)
.Done();
}

TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeReturning = node.Maybe<TKqlReturningList>();
if (!maybeReturning) {
Expand All @@ -24,46 +51,60 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
auto returning = maybeReturning.Cast();
const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, returning.Table().Path());

auto buildFromUpsert = [&](TMaybeNode<TKqlUpsertRows> upsert) -> TExprBase {
auto rows = upsert.Cast().Input();
auto pos = upsert.Input().Cast().Pos();
auto buildReturningRows = [&](TExprBase rows, TCoAtomList columns, TCoAtomList returningColumns) -> TExprBase {
auto pos = rows.Pos();

TSet<TString> inputColumns;
TSet<TString> columnsToReadSet;

for (auto&& column : upsert.Columns().Cast()) {
for (auto&& column : columns) {
inputColumns.insert(TString(column.Value()));
}
for (auto&& column : upsert.ReturningColumns().Cast()) {
for (auto&& column : returningColumns) {
if (!inputColumns.contains(column) && !tableDesc.GetKeyColumnIndex(TString(column))) {
columnsToReadSet.insert(TString(column));
}
}

TMaybeNode<TExprBase> input = upsert.Input();
TMaybeNode<TExprBase> input = rows;

if (!columnsToReadSet.empty()) {
TString upsertInputName = "upsertInput";
TString tableInputName = "table";
auto payloadSelectorArg = TCoArgument(ctx.NewArgument(pos, "payload_selector_row"));
TVector<TExprBase> payloadTuples;
for (const auto& column : columns) {
payloadTuples.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name(column)
.Value<TCoMember>()
.Struct(payloadSelectorArg)
.Name(column)
.Build()
.Done());
}

auto payloadSelector = Build<TCoLambda>(ctx, pos)
.Args({payloadSelectorArg})
.Body<TCoAsStruct>()
.Add(payloadTuples)
.Build()
.Done();

auto payloadSelector = MakeRowsPayloadSelector(upsert.Columns().Cast(), tableDesc, pos, ctx);
auto condenseResult = CondenseInputToDictByPk(input.Cast(), tableDesc, payloadSelector, ctx);
if (!condenseResult) {
return node;
}

auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx);

TSet<TString> columnsToLookup = columnsToReadSet;
for (auto&& column : tableDesc.Metadata->KeyColumnNames) {
columnsToReadSet.insert(column);
}

TSet<TString> columnsToLookup = columnsToReadSet;
for (auto&& column : tableDesc.Metadata->KeyColumnNames) {
columnsToReadSet.erase(column);
}
TCoAtomList additionalColumnsToRead = MakeColumnsList(columnsToReadSet, ctx, pos);

TCoArgument existingRow = Build<TCoArgument>(ctx, node.Pos())
.Name("existing_row")
.Done();
auto prepareUpdateStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputDictAndKeys.KeysPrecompute)
Expand All @@ -80,27 +121,21 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
.Columns(MakeColumnsList(columnsToLookup, ctx, pos))
.Build()
.Lambda()
.Args({"existingRow"})
.Args({existingRow})
.Body<TCoJust>()
.Input<TCoFlattenMembers>()
.Add()
.Name().Build("")
.Value<TCoUnwrap>() // Key should always exist in the dict
.Optional<TCoLookup>()
.Collection("dict")
.Lookup<TCoExtractMembers>()
.Input("existingRow")
.Members(MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, pos))
.Build()
.Lookup(SelectFields(existingRow, tableDesc.Metadata->KeyColumnNames, ctx, pos))
.Build()
.Build()
.Build()
.Add()
.Name().Build("")
.Value<TCoExtractMembers>()
.Input("existingRow")
.Members(additionalColumnsToRead)
.Build()
.Value(SelectFields(existingRow, additionalColumnsToRead, ctx, pos))
.Build()
.Build()
.Build()
Expand Down Expand Up @@ -128,20 +163,27 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
for (auto item : maybeList.Cast()) {
if (auto upsert = item.Maybe<TKqlUpsertRows>()) {
if (upsert.Cast().Table().Raw() == returning.Table().Raw()) {
return buildFromUpsert(upsert);
return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns());
}
}
if (auto del = item.Maybe<TKqlDeleteRows>()) {
if (del.Cast().Table().Raw() == returning.Table().Raw()) {
return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns());
}
}
}
}

if (auto upsert = returning.Update().Maybe<TKqlUpsertRows>()) {
return buildFromUpsert(upsert);
return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns());
}
if (auto del = returning.Update().Maybe<TKqlDeleteRows>()) {
return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns());
}

return node;
}


TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
auto upsert = node.Cast<TKqlUpsertRows>();
if (upsert.ReturningColumns().Empty()) {
Expand All @@ -164,4 +206,24 @@ TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKq
.Done();
}

TExprBase KqpRewriteReturningDelete(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
auto del = node.Cast<TKqlDeleteRows>();
if (del.ReturningColumns().Empty()) {
return node;
}

if (!del.Input().Maybe<TDqPrecompute>() && !del.Input().Maybe<TDqPhyPrecompute>()) {
return node;
}

return
Build<TKqlDeleteRows>(ctx, del.Pos())
.Input<TDqPrecompute>()
.Input(del.Input())
.Build()
.Table(del.Table())
.ReturningColumns<TCoAtomList>().Build()
.Done();
}

} // namespace NKikimr::NKqp::NOpt
12 changes: 9 additions & 3 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {

AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable));
AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpLookupTable));
AddHandler(2, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));
AddHandler(2, &TKqlDeleteRows::Match, HNDL(RewriteReturningDelete));

AddHandler(3, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));

AddHandler(4, &TKqlReturningList::Match, HNDL(BuildReturning));
AddHandler(3, &TKqlReturningList::Match, HNDL(BuildReturning));
#undef HNDL

SetGlobal(1u);
Expand All @@ -145,6 +145,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteReturningDelete(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteReturningDelete(node, ctx, KqpCtx);
DumpAppliedRule("RewriteReturningDelete", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx);
DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx);
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1534,13 +1534,24 @@ Y_UNIT_TEST_SUITE(KqpPg) {
{
const auto query = Q_(R"(
--!syntax_pg
UPDATE ReturningTableExtraValue SET value2 = 3 where key = 2 RETURNING *;
DELETE FROM ReturningTableExtraValue WHERE key = 2 RETURNING key, value, value2;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([["2";"4";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
DELETE FROM ReturningTable WHERE key <= 3 RETURNING key, value;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([["2";"2"];["3";"2"];["1";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(DropTablePg) {
Expand Down

0 comments on commit edff821

Please sign in to comment.