Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply "Support returning list in delete statements (#1684)" (#1705) #1706

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,18 @@
{
"Name": "TKqlDeleteRows",
"Base": "TKqlDeleteRowsBase",
"Match": {"Type": "Callable", "Name": "KqlDeleteRows"}
"Match": {"Type": "Callable", "Name": "KqlDeleteRows"},
"Children": [
{"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"}
]
},
{
"Name": "TKqlDeleteRowsIndex",
"Base": "TKqlDeleteRowsBase",
"Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"}
"Match": {"Type": "Callable", "Name": "KqlDeleteRowsIndex"},
"Children": [
{"Index": 2, "Name": "ReturningColumns", "Type": "TCoAtomList"}
]
},
{
"Name": "TKqpDeleteRows",
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const
TStatus AnnotateDeleteRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData)
{
if (!EnsureArgsCount(*node, 2, ctx)) {
if (!EnsureMaxArgsCount(*node, 3, ctx) && !EnsureMinArgsCount(*node, 2, ctx)) {
return TStatus::Error;
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ TExprBase BuildDeleteTable(const TKiWriteTable& write, const TKikimrTableDescrip
return Build<TKqlDeleteRows>(ctx, write.Pos())
.Table(BuildTableMeta(tableData, write.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns(write.ReturningColumns())
.Done();
}

Expand All @@ -438,6 +439,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiWriteTable& write, const TKikimrTab
return Build<TKqlDeleteRowsIndex>(ctx, write.Pos())
.Table(BuildTableMeta(tableData, write.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns(write.ReturningColumns())
.Done();
}

Expand All @@ -464,6 +466,7 @@ TExprBase BuildDeleteTable(const TKiDeleteTable& del, const TKikimrTableDescript
return Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(BuildTableMeta(tableData, del.Pos(), ctx))
.Input(keysToDelete)
.ReturningColumns<TCoAtomList>().Build()
.Done();
}

Expand All @@ -483,6 +486,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
auto tableDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(BuildTableMeta(tableData, del.Pos(), ctx))
.Input(ProjectColumns(rowsToDelete, pk, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

TVector<TExprBase> effects;
Expand All @@ -506,6 +510,7 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
auto indexDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(indexMeta)
.Input(ProjectColumns(rowsToDelete, indexTableColumns, ctx))
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.push_back(indexDelete);
Expand Down Expand Up @@ -699,6 +704,7 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT
auto indexDelete = Build<TKqlDeleteRows>(ctx, update.Pos())
.Table(indexMeta)
.Input(ProjectColumns(rowsToUpdate, indexTableColumns, ctx))
.ReturningColumns<TCoAtomList>().Build()
Copy link
Collaborator

@qrort qrort Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а можно не добоавлять чайлда, наверное?

Copy link
Collaborator Author

@ssmike ssmike Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну совершенно точно это нужная штука. Надо будет поправить правила про ExtractMembers логические к примеру

.Done();

effects.push_back(indexDelete);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
.Table(deleteRows.Table())
.Input(deleteInput.Cast())
.ReturningColumns(deleteRows.ReturningColumns())
.Done();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto tableDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(del.Table())
.Input(lookupKeys)
.ReturningColumns(del.ReturningColumns())
.Done();

TVector<TExprBase> effects;
Expand All @@ -102,6 +103,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto indexDelete = Build<TKqlDeleteRows>(ctx, del.Pos())
.Table(tableNode)
.Input(deleteIndexKeys)
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.emplace_back(std::move(indexDelete));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE
NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteReturningDelete(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

Expand Down
114 changes: 88 additions & 26 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_returning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@ TCoAtomList MakeColumnsList(Container rows, TExprContext& ctx, TPositionHandle p
return Build<TCoAtomList>(ctx, pos).Add(columnsVector).Done();
}

template<typename Container>
TExprBase SelectFields(TExprBase node, Container fields, TExprContext& ctx, TPositionHandle pos) {
TVector<TExprBase> items;
for (auto&& field : fields) {
TString name;

if constexpr (std::is_same_v<NYql::NNodes::TCoAtom&&, decltype(field)>) {
name = field.Value();
} else {
name = field;
}

auto tuple = Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(field)
.template Value<TCoMember>()
.Struct(node)
.Name().Build(name)
.Build()
.Done();

items.emplace_back(tuple);
}
return Build<TCoAsStruct>(ctx, pos)
.Add(items)
.Done();
}

TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeReturning = node.Maybe<TKqlReturningList>();
if (!maybeReturning) {
Expand All @@ -24,46 +51,60 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
auto returning = maybeReturning.Cast();
const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, returning.Table().Path());

auto buildFromUpsert = [&](TMaybeNode<TKqlUpsertRows> upsert) -> TExprBase {
auto rows = upsert.Cast().Input();
auto pos = upsert.Input().Cast().Pos();
auto buildReturningRows = [&](TExprBase rows, TCoAtomList columns, TCoAtomList returningColumns) -> TExprBase {
auto pos = rows.Pos();

TSet<TString> inputColumns;
TSet<TString> columnsToReadSet;

for (auto&& column : upsert.Columns().Cast()) {
for (auto&& column : columns) {
inputColumns.insert(TString(column.Value()));
}
for (auto&& column : upsert.ReturningColumns().Cast()) {
for (auto&& column : returningColumns) {
if (!inputColumns.contains(column) && !tableDesc.GetKeyColumnIndex(TString(column))) {
columnsToReadSet.insert(TString(column));
}
}

TMaybeNode<TExprBase> input = upsert.Input();
TMaybeNode<TExprBase> input = rows;

if (!columnsToReadSet.empty()) {
TString upsertInputName = "upsertInput";
TString tableInputName = "table";
auto payloadSelectorArg = TCoArgument(ctx.NewArgument(pos, "payload_selector_row"));
TVector<TExprBase> payloadTuples;
for (const auto& column : columns) {
payloadTuples.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name(column)
.Value<TCoMember>()
.Struct(payloadSelectorArg)
.Name(column)
.Build()
.Done());
}

auto payloadSelector = Build<TCoLambda>(ctx, pos)
.Args({payloadSelectorArg})
.Body<TCoAsStruct>()
.Add(payloadTuples)
.Build()
.Done();

auto payloadSelector = MakeRowsPayloadSelector(upsert.Columns().Cast(), tableDesc, pos, ctx);
auto condenseResult = CondenseInputToDictByPk(input.Cast(), tableDesc, payloadSelector, ctx);
if (!condenseResult) {
return node;
}

auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx);

TSet<TString> columnsToLookup = columnsToReadSet;
for (auto&& column : tableDesc.Metadata->KeyColumnNames) {
columnsToReadSet.insert(column);
}

TSet<TString> columnsToLookup = columnsToReadSet;
for (auto&& column : tableDesc.Metadata->KeyColumnNames) {
columnsToReadSet.erase(column);
}
TCoAtomList additionalColumnsToRead = MakeColumnsList(columnsToReadSet, ctx, pos);

TCoArgument existingRow = Build<TCoArgument>(ctx, node.Pos())
.Name("existing_row")
.Done();
auto prepareUpdateStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputDictAndKeys.KeysPrecompute)
Expand All @@ -80,27 +121,21 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
.Columns(MakeColumnsList(columnsToLookup, ctx, pos))
.Build()
.Lambda()
.Args({"existingRow"})
.Args({existingRow})
.Body<TCoJust>()
.Input<TCoFlattenMembers>()
.Add()
.Name().Build("")
.Value<TCoUnwrap>() // Key should always exist in the dict
.Optional<TCoLookup>()
.Collection("dict")
.Lookup<TCoExtractMembers>()
.Input("existingRow")
.Members(MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, pos))
.Build()
.Lookup(SelectFields(existingRow, tableDesc.Metadata->KeyColumnNames, ctx, pos))
.Build()
.Build()
.Build()
.Add()
.Name().Build("")
.Value<TCoExtractMembers>()
.Input("existingRow")
.Members(additionalColumnsToRead)
.Build()
.Value(SelectFields(existingRow, additionalColumnsToRead, ctx, pos))
.Build()
.Build()
.Build()
Expand Down Expand Up @@ -128,20 +163,27 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
for (auto item : maybeList.Cast()) {
if (auto upsert = item.Maybe<TKqlUpsertRows>()) {
if (upsert.Cast().Table().Raw() == returning.Table().Raw()) {
return buildFromUpsert(upsert);
return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns());
}
}
if (auto del = item.Maybe<TKqlDeleteRows>()) {
if (del.Cast().Table().Raw() == returning.Table().Raw()) {
return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns());
}
}
}
}

if (auto upsert = returning.Update().Maybe<TKqlUpsertRows>()) {
return buildFromUpsert(upsert);
return buildReturningRows(upsert.Input().Cast(), upsert.Columns().Cast(), returning.Columns());
}
if (auto del = returning.Update().Maybe<TKqlDeleteRows>()) {
return buildReturningRows(del.Input().Cast(), MakeColumnsList(tableDesc.Metadata->KeyColumnNames, ctx, node.Pos()), returning.Columns());
}

return node;
}


TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
auto upsert = node.Cast<TKqlUpsertRows>();
if (upsert.ReturningColumns().Empty()) {
Expand All @@ -164,4 +206,24 @@ TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKq
.Done();
}

TExprBase KqpRewriteReturningDelete(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
auto del = node.Cast<TKqlDeleteRows>();
if (del.ReturningColumns().Empty()) {
return node;
}

if (!del.Input().Maybe<TDqPrecompute>() && !del.Input().Maybe<TDqPhyPrecompute>()) {
return node;
}

return
Build<TKqlDeleteRows>(ctx, del.Pos())
.Input<TDqPrecompute>()
.Input(del.Input())
.Build()
.Table(del.Table())
.ReturningColumns<TCoAtomList>().Build()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

К сожалению, не очень понял: мы не передаем сюда del.ReturningColumns(), а конструируем пустой лист?

.Done();
}

} // namespace NKikimr::NKqp::NOpt
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
auto indexDelete = Build<TKqlDeleteRows>(ctx, pos)
.Table(tableNode)
.Input(deleteIndexKeys)
.ReturningColumns<TCoAtomList>().Build()
.Done();

effects.emplace_back(indexDelete);
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {

AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable));
AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpLookupTable));
AddHandler(2, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));
AddHandler(2, &TKqlDeleteRows::Match, HNDL(RewriteReturningDelete));

AddHandler(3, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));

AddHandler(4, &TKqlReturningList::Match, HNDL(BuildReturning));
AddHandler(3, &TKqlReturningList::Match, HNDL(BuildReturning));
#undef HNDL

SetGlobal(1u);
Expand All @@ -145,6 +145,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteReturningDelete(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteReturningDelete(node, ctx, KqpCtx);
DumpAppliedRule("RewriteReturningDelete", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx);
DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx);
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1534,13 +1534,24 @@ Y_UNIT_TEST_SUITE(KqpPg) {
{
const auto query = Q_(R"(
--!syntax_pg
UPDATE ReturningTableExtraValue SET value2 = 3 where key = 2 RETURNING *;
DELETE FROM ReturningTableExtraValue WHERE key = 2 RETURNING key, value, value2;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([["2";"4";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
DELETE FROM ReturningTable WHERE key <= 3 RETURNING key, value;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([["2";"2"];["3";"2"];["1";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(DropTablePg) {
Expand Down
Loading