diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index e25e12ced530..c65db6224a56 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -509,12 +509,18 @@ { "Name": "TKqlDeleteRows", "Base": "TKqlDeleteRowsBase", - "Match": {"Type": "Callable", "Name": "KqlDeleteRows"} + "Match": {"Type": "Callable", "Name": "KqlDeleteRows"}, + "Children": [ + {"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"} + ] }, { "Name": "TKqlDeleteRowsIndex", "Base": "TKqlDeleteRowsBase", - "Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"} + "Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"}, + "Children": [ + {"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"} + ] }, { "Name": "TKqpDeleteRows", diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 40f2aef9b065..7e434d148403 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -830,7 +830,7 @@ TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const TStatus AnnotateDeleteRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData) { - if (!EnsureArgsCount(*node, 2, ctx)) { + if (!EnsureMaxArgsCount(*node, 3, ctx) && !EnsureMinArgsCount(*node, 2, ctx)) { return TStatus::Error; } diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 90db2d10ccc9..7bac021959aa 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -429,6 +429,7 @@ TExprBase BuildDeleteTable(const TKiWriteTable& write, const TKikimrTableDescrip return Build(ctx, write.Pos()) .Table(BuildTableMeta(tableData, write.Pos(), ctx)) .Input(keysToDelete) + .ReturningColumns(write.ReturningColumns()) .Done(); } @@ -438,6 +439,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiWriteTable& write, const TKikimrTab return Build(ctx, write.Pos()) .Table(BuildTableMeta(tableData, write.Pos(), ctx)) .Input(keysToDelete) + .ReturningColumns(write.ReturningColumns()) .Done(); } @@ -464,6 +466,7 @@ TExprBase BuildDeleteTable(const TKiDeleteTable& del, const TKikimrTableDescript return Build(ctx, del.Pos()) .Table(BuildTableMeta(tableData, del.Pos(), ctx)) .Input(keysToDelete) + .ReturningColumns().Build() .Done(); } @@ -483,6 +486,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl auto tableDelete = Build(ctx, del.Pos()) .Table(BuildTableMeta(tableData, del.Pos(), ctx)) .Input(ProjectColumns(rowsToDelete, pk, ctx)) + .ReturningColumns().Build() .Done(); TVector effects; @@ -506,6 +510,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl auto indexDelete = Build(ctx, del.Pos()) .Table(indexMeta) .Input(ProjectColumns(rowsToDelete, indexTableColumns, ctx)) + .ReturningColumns().Build() .Done(); effects.push_back(indexDelete); @@ -699,6 +704,7 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT auto indexDelete = Build(ctx, update.Pos()) .Table(indexMeta) .Input(ProjectColumns(rowsToUpdate, indexTableColumns, ctx)) + .ReturningColumns().Build() .Done(); effects.push_back(indexDelete); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp index aa4dfb187fde..6a28cad8ddad 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp @@ -97,6 +97,7 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK return Build(ctx, deleteRows.Pos()) .Table(deleteRows.Table()) .Input(deleteInput.Cast()) + .ReturningColumns(deleteRows.ReturningColumns()) .Done(); } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp index 38a1b572bce9..87cca8148424 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp @@ -81,6 +81,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq auto tableDelete = Build(ctx, del.Pos()) .Table(del.Table()) .Input(lookupKeys) + .ReturningColumns(del.ReturningColumns()) .Done(); TVector effects; @@ -102,6 +103,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq auto indexDelete = Build(ctx, del.Pos()) .Table(tableNode) .Input(deleteIndexKeys) + .ReturningColumns().Build() .Done(); effects.emplace_back(std::move(indexDelete)); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h index d35c4718d091..d31faa88f0e9 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h @@ -12,6 +12,9 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpRewriteReturningDelete(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp index 0487071dfaa8..5309fff6f201 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp @@ -15,6 +15,33 @@ TCoAtomList MakeColumnsList(Container rows, TExprContext& ctx, TPositionHandle p return Build(ctx, pos).Add(columnsVector).Done(); } +template +TExprBase SelectFields(TExprBase node, Container fields, TExprContext& ctx, TPositionHandle pos) { + TVector items; + for (auto&& field : fields) { + TString name; + + if constexpr (std::is_same_v) { + name = field.Value(); + } else { + name = field; + } + + auto tuple = Build(ctx, pos) + .Name().Build(field) + .template Value() + .Struct(node) + .Name().Build(name) + .Build() + .Done(); + + items.emplace_back(tuple); + } + return Build(ctx, pos) + .Add(items) + .Done(); +} + TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { auto maybeReturning = node.Maybe(); if (!maybeReturning) { @@ -24,46 +51,60 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz auto returning = maybeReturning.Cast(); const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, returning.Table().Path()); - auto buildFromUpsert = [&](TMaybeNode upsert) -> TExprBase { - auto rows = upsert.Cast().Input(); - auto pos = upsert.Input().Cast().Pos(); + auto buildReturningRows = [&](TExprBase rows, TCoAtomList columns, TCoAtomList returningColumns) -> TExprBase { + auto pos = rows.Pos(); TSet inputColumns; TSet columnsToReadSet; - - for (auto&& column : upsert.Columns().Cast()) { + for (auto&& column : columns) { inputColumns.insert(TString(column.Value())); } - for (auto&& column : upsert.ReturningColumns().Cast()) { + for (auto&& column : returningColumns) { if (!inputColumns.contains(column) && !tableDesc.GetKeyColumnIndex(TString(column))) { columnsToReadSet.insert(TString(column)); } } - - TMaybeNode input = upsert.Input(); + TMaybeNode input = rows; if (!columnsToReadSet.empty()) { - TString upsertInputName = "upsertInput"; - TString tableInputName = "table"; + auto payloadSelectorArg = TCoArgument(ctx.NewArgument(pos, "payload_selector_row")); + TVector payloadTuples; + for (const auto& column : columns) { + payloadTuples.emplace_back( + Build(ctx, pos) + .Name(column) + .Value() + .Struct(payloadSelectorArg) + .Name(column) + .Build() + .Done()); + } + + auto payloadSelector = Build(ctx, pos) + .Args({payloadSelectorArg}) + .Body() + .Add(payloadTuples) + .Build() + .Done(); - auto payloadSelector = MakeRowsPayloadSelector(upsert.Columns().Cast(), tableDesc, pos, ctx); auto condenseResult = CondenseInputToDictByPk(input.Cast(), tableDesc, payloadSelector, ctx); if (!condenseResult) { return node; } auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx); - - TSet columnsToLookup = columnsToReadSet; for (auto&& column : tableDesc.Metadata->KeyColumnNames) { columnsToReadSet.insert(column); } - + TSet columnsToLookup = columnsToReadSet; for (auto&& column : tableDesc.Metadata->KeyColumnNames) { columnsToReadSet.erase(column); } TCoAtomList additionalColumnsToRead = MakeColumnsList(columnsToReadSet, ctx, pos); + TCoArgument existingRow = Build(ctx, node.Pos()) + .Name("existing_row") + .Done(); auto prepareUpdateStage = Build(ctx, pos) .Inputs() .Add(inputDictAndKeys.KeysPrecompute) @@ -80,7 +121,7 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz .Columns(MakeColumnsList(columnsToLookup, ctx, pos)) .Build() .Lambda() - .Args({"existingRow"}) + .Args({existingRow}) .Body() .Input() .Add() @@ -88,19 +129,13 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz .Value() // Key should always exist in the dict .Optional() .Collection("dict") - .Lookup() - .Input("existingRow") - .Members(MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, pos)) - .Build() + .Lookup(SelectFields(existingRow, tableDesc.Metadata->KeyColumnNames, ctx, pos)) .Build() .Build() .Build() .Add() .Name().Build("") - .Value() - .Input("existingRow") - .Members(additionalColumnsToRead) - .Build() + .Value(SelectFields(existingRow, additionalColumnsToRead, ctx, pos)) .Build() .Build() .Build() @@ -128,20 +163,27 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz for (auto item : maybeList.Cast()) { if (auto upsert = item.Maybe()) { if (upsert.Cast().Table().Raw() == returning.Table().Raw()) { - return buildFromUpsert(upsert); + return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns()); + } + } + if (auto del = item.Maybe()) { + if (del.Cast().Table().Raw() == returning.Table().Raw()) { + return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns()); } } } } if (auto upsert = returning.Update().Maybe()) { - return buildFromUpsert(upsert); + return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns()); + } + if (auto del = returning.Update().Maybe()) { + return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns()); } return node; } - TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) { auto upsert = node.Cast(); if (upsert.ReturningColumns().Empty()) { @@ -164,4 +206,24 @@ TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKq .Done(); } +TExprBase KqpRewriteReturningDelete(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) { + auto del = node.Cast(); + if (del.ReturningColumns().Empty()) { + return node; + } + + if (!del.Input().Maybe() && !del.Input().Maybe()) { + return node; + } + + return + Build(ctx, del.Pos()) + .Input() + .Input(del.Input()) + .Build() + .Table(del.Table()) + .ReturningColumns().Build() + .Done(); +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 5a356dcbce93..b3a4cea9aeaf 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -123,10 +123,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable)); AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpLookupTable)); + AddHandler(2, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert)); + AddHandler(2, &TKqlDeleteRows::Match, HNDL(RewriteReturningDelete)); - AddHandler(3, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert)); - - AddHandler(4, &TKqlReturningList::Match, HNDL(BuildReturning)); + AddHandler(3, &TKqlReturningList::Match, HNDL(BuildReturning)); #undef HNDL SetGlobal(1u); @@ -145,6 +145,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { return output; } + TMaybeNode RewriteReturningDelete(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRewriteReturningDelete(node, ctx, KqpCtx); + DumpAppliedRule("RewriteReturningDelete", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) { TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx); DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 9887464a6ae2..f981d60e2961 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1534,13 +1534,24 @@ Y_UNIT_TEST_SUITE(KqpPg) { { const auto query = Q_(R"( --!syntax_pg - UPDATE ReturningTableExtraValue SET value2 = 3 where key = 2 RETURNING *; + DELETE FROM ReturningTableExtraValue WHERE key = 2 RETURNING key, value, value2; )"); auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(result.IsSuccess()); CompareYson(R"([["2";"4";"3"]])", FormatResultSetYson(result.GetResultSet(0))); } + + { + const auto query = Q_(R"( + --!syntax_pg + DELETE FROM ReturningTable WHERE key <= 3 RETURNING key, value; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + CompareYson(R"([["2";"2"];["3";"2"];["1";"3"]])", FormatResultSetYson(result.GetResultSet(0))); + } } Y_UNIT_TEST(DropTablePg) {