Skip to content

Commit

Permalink
Revert "Support returning list in delete statements (#1684)"
Browse files Browse the repository at this point in the history
This reverts commit 7f54b71.
  • Loading branch information
ssmike authored Feb 8, 2024
1 parent 9f3b6ae commit 8481ddb
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 130 deletions.
10 changes: 2 additions & 8 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -509,18 +509,12 @@
{
"Name": "TKqlDeleteRows",
"Base": "TKqlDeleteRowsBase",
"Match": {"Type": "Callable", "Name": "KqlDeleteRows"},
"Children": [
{"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"}
]
"Match": {"Type": "Callable", "Name": "KqlDeleteRows"}
},
{
"Name": "TKqlDeleteRowsIndex",
"Base": "TKqlDeleteRowsBase",
"Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"},
"Children": [
{"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"}
]
"Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"}
},
{
"Name": "TKqpDeleteRows",
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const
TStatus AnnotateDeleteRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData)
{
if (!EnsureMaxArgsCount(*node, 3, ctx) && !EnsureMinArgsCount(*node, 2, ctx)) {
if (!EnsureArgsCount(*node, 2, ctx)) {
return TStatus::Error;
}

Expand Down
6 changes: 0 additions & 6 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ TExprBase BuildDeleteTable(const TKiWriteTable& write, const TKikimrTableDescrip
return Build<TKqlDeleteRows>(ctx, write.Pos())
.Table(BuildTableMeta(tableData, write.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns(write.ReturningColumns())
.Done();
}

Expand All @@ -439,7 +438,6 @@ TExprBase BuildDeleteTableWithIndex(const TKiWriteTable& write, const TKikimrTab
return Build<TKqlDeleteRowsIndex>(ctx, write.Pos())
.Table(BuildTableMeta(tableData, write.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns(write.ReturningColumns())
.Done();
}

Expand All @@ -466,7 +464,6 @@ TExprBase BuildDeleteTable(const TKiDeleteTable& del, const TKikimrTableDescript
return Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(BuildTableMeta(tableData, del.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns<TCoAtomList>().Build()
.Done();
}

Expand All @@ -486,7 +483,6 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
auto tableDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(BuildTableMeta(tableData, del.Pos(), ctx))
.Input(ProjectColumns(rowsToDelete, pk, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

TVector<TExprBase> effects;
Expand All @@ -510,7 +506,6 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
auto indexDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(indexMeta)
.Input(ProjectColumns(rowsToDelete, indexTableColumns, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.push_back(indexDelete);
Expand Down Expand Up @@ -704,7 +699,6 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT
auto indexDelete = Build<TKqlDeleteRows>(ctx, update.Pos())
.Table(indexMeta)
.Input(ProjectColumns(rowsToUpdate, indexTableColumns, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.push_back(indexDelete);
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
.Table(deleteRows.Table())
.Input(deleteInput.Cast())
.ReturningColumns(deleteRows.ReturningColumns())
.Done();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto tableDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(del.Table())
.Input(lookupKeys)
.ReturningColumns(del.ReturningColumns())
.Done();

TVector<TExprBase> effects;
Expand All @@ -103,7 +102,6 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto indexDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(tableNode)
.Input(deleteIndexKeys)
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.emplace_back(std::move(indexDelete));
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE
NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteReturningDelete(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

Expand Down
114 changes: 26 additions & 88 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,6 @@ TCoAtomList MakeColumnsList(Container rows, TExprContext& ctx, TPositionHandle p
return Build<TCoAtomList>(ctx, pos).Add(columnsVector).Done();
}

template<typename Container>
TExprBase SelectFields(TExprBase node, Container fields, TExprContext& ctx, TPositionHandle pos) {
TVector<TExprBase> items;
for (auto&& field : fields) {
TString name;

if constexpr (std::is_same_v<NYql::NNodes::TCoAtom&&, decltype(field)>) {
name = field.Value();
} else {
name = field;
}

auto tuple = Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(field)
.template Value<TCoMember>()
.Struct(node)
.Name().Build(name)
.Build()
.Done();

items.emplace_back(tuple);
}
return Build<TCoAsStruct>(ctx, pos)
.Add(items)
.Done();
}

TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeReturning = node.Maybe<TKqlReturningList>();
if (!maybeReturning) {
Expand All @@ -51,60 +24,46 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
auto returning = maybeReturning.Cast();
const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, returning.Table().Path());

auto buildReturningRows = [&](TExprBase rows, TCoAtomList columns, TCoAtomList returningColumns) -> TExprBase {
auto pos = rows.Pos();
auto buildFromUpsert = [&](TMaybeNode<TKqlUpsertRows> upsert) -> TExprBase {
auto rows = upsert.Cast().Input();
auto pos = upsert.Input().Cast().Pos();

TSet<TString> inputColumns;
TSet<TString> columnsToReadSet;
for (auto&& column : columns) {

for (auto&& column : upsert.Columns().Cast()) {
inputColumns.insert(TString(column.Value()));
}
for (auto&& column : returningColumns) {
for (auto&& column : upsert.ReturningColumns().Cast()) {
if (!inputColumns.contains(column) && !tableDesc.GetKeyColumnIndex(TString(column))) {
columnsToReadSet.insert(TString(column));
}
}
TMaybeNode<TExprBase> input = rows;

if (!columnsToReadSet.empty()) {
auto payloadSelectorArg = TCoArgument(ctx.NewArgument(pos, "payload_selector_row"));
TVector<TExprBase> payloadTuples;
for (const auto& column : columns) {
payloadTuples.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name(column)
.Value<TCoMember>()
.Struct(payloadSelectorArg)
.Name(column)
.Build()
.Done());
}
TMaybeNode<TExprBase> input = upsert.Input();

auto payloadSelector = Build<TCoLambda>(ctx, pos)
.Args({payloadSelectorArg})
.Body<TCoAsStruct>()
.Add(payloadTuples)
.Build()
.Done();
if (!columnsToReadSet.empty()) {
TString upsertInputName = "upsertInput";
TString tableInputName = "table";

auto payloadSelector = MakeRowsPayloadSelector(upsert.Columns().Cast(), tableDesc, pos, ctx);
auto condenseResult = CondenseInputToDictByPk(input.Cast(), tableDesc, payloadSelector, ctx);
if (!condenseResult) {
return node;
}

auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx);

TSet<TString> columnsToLookup = columnsToReadSet;
for (auto&& column : tableDesc.Metadata->KeyColumnNames) {
columnsToReadSet.insert(column);
}
TSet<TString> columnsToLookup = columnsToReadSet;

for (auto&& column : tableDesc.Metadata->KeyColumnNames) {
columnsToReadSet.erase(column);
}
TCoAtomList additionalColumnsToRead = MakeColumnsList(columnsToReadSet, ctx, pos);

TCoArgument existingRow = Build<TCoArgument>(ctx, node.Pos())
.Name("existing_row")
.Done();
auto prepareUpdateStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputDictAndKeys.KeysPrecompute)
Expand All @@ -121,21 +80,27 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
.Columns(MakeColumnsList(columnsToLookup, ctx, pos))
.Build()
.Lambda()
.Args({existingRow})
.Args({"existingRow"})
.Body<TCoJust>()
.Input<TCoFlattenMembers>()
.Add()
.Name().Build("")
.Value<TCoUnwrap>() // Key should always exist in the dict
.Optional<TCoLookup>()
.Collection("dict")
.Lookup(SelectFields(existingRow, tableDesc.Metadata->KeyColumnNames, ctx, pos))
.Lookup<TCoExtractMembers>()
.Input("existingRow")
.Members(MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, pos))
.Build()
.Build()
.Build()
.Build()
.Add()
.Name().Build("")
.Value(SelectFields(existingRow, additionalColumnsToRead, ctx, pos))
.Value<TCoExtractMembers>()
.Input("existingRow")
.Members(additionalColumnsToRead)
.Build()
.Build()
.Build()
.Build()
Expand Down Expand Up @@ -163,27 +128,20 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
for (auto item : maybeList.Cast()) {
if (auto upsert = item.Maybe<TKqlUpsertRows>()) {
if (upsert.Cast().Table().Raw() == returning.Table().Raw()) {
return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns());
}
}
if (auto del = item.Maybe<TKqlDeleteRows>()) {
if (del.Cast().Table().Raw() == returning.Table().Raw()) {
return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns());
return buildFromUpsert(upsert);
}
}
}
}

if (auto upsert = returning.Update().Maybe<TKqlUpsertRows>()) {
return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns());
}
if (auto del = returning.Update().Maybe<TKqlDeleteRows>()) {
return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns());
return buildFromUpsert(upsert);
}

return node;
}


TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
auto upsert = node.Cast<TKqlUpsertRows>();
if (upsert.ReturningColumns().Empty()) {
Expand All @@ -206,24 +164,4 @@ TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKq
.Done();
}

TExprBase KqpRewriteReturningDelete(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
auto del = node.Cast<TKqlDeleteRows>();
if (del.ReturningColumns().Empty()) {
return node;
}

if (!del.Input().Maybe<TDqPrecompute>() && !del.Input().Maybe<TDqPhyPrecompute>()) {
return node;
}

return
Build<TKqlDeleteRows>(ctx, del.Pos())
.Input<TDqPrecompute>()
.Input(del.Input())
.Build()
.Table(del.Table())
.ReturningColumns<TCoAtomList>().Build()
.Done();
}

} // namespace NKikimr::NKqp::NOpt
12 changes: 3 additions & 9 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {

AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable));
AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpLookupTable));
AddHandler(2, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));
AddHandler(2, &TKqlDeleteRows::Match, HNDL(RewriteReturningDelete));

AddHandler(3, &TKqlReturningList::Match, HNDL(BuildReturning));
AddHandler(3, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));

AddHandler(4, &TKqlReturningList::Match, HNDL(BuildReturning));
#undef HNDL

SetGlobal(1u);
Expand All @@ -145,12 +145,6 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteReturningDelete(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteReturningDelete(node, ctx, KqpCtx);
DumpAppliedRule("RewriteReturningDelete", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx);
DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx);
Expand Down
13 changes: 1 addition & 12 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1534,24 +1534,13 @@ Y_UNIT_TEST_SUITE(KqpPg) {
{
const auto query = Q_(R"(
--!syntax_pg
DELETE FROM ReturningTableExtraValue WHERE key = 2 RETURNING key, value, value2;
UPDATE ReturningTableExtraValue SET value2 = 3 where key = 2 RETURNING *;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([["2";"4";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
DELETE FROM ReturningTable WHERE key <= 3 RETURNING key, value;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([["2";"2"];["3";"2"];["1";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(DropTablePg) {
Expand Down

0 comments on commit 8481ddb

Please sign in to comment.