diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 4639c09e825f..9f02ab0b0967 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -27,6 +27,10 @@ ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink +ydb/core/kqp/ut/tx KqpSinkMvcc.OltpNamedStatement +ydb/core/kqp/ut/tx KqpSinkMvcc.OlapNamedStatement +ydb/core/kqp/ut/tx KqpSinkMvcc.OltpMultiSinks +ydb/core/kqp/ut/tx KqpSinkMvcc.OlapMultiSinks ydb/core/persqueue/ut [*/*]* ydb/core/persqueue/ut TPQTest.*DirectRead* ydb/core/persqueue/ut/ut_with_sdk [*/*]* diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp index 87cca8148424..aade00fdafd3 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp @@ -88,14 +88,18 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq effects.emplace_back(tableDelete); for (const auto& [tableNode, indexDesc] : indexes) { - THashSet indexTableColumns; + THashSet indexTableColumnsSet; + TVector indexTableColumns; for (const auto& column : indexDesc->KeyColumns) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); } for (const auto& column : pk) { - indexTableColumns.insert(column); + if (indexTableColumnsSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } } auto deleteIndexKeys = MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumns, del.Pos(), ctx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index bbb42ef50236..c56ab9096fb4 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -62,11 +62,11 @@ NYql::NNodes::TCoLambda MakeRowsPayloadSelector(const NYql::NNodes::TCoAtomList& const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx); NYql::NNodes::TExprBase MakeRowsFromDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + const TVector& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); // Same as MakeRowsFromDict but skip rows which marked as non changed (true in second tuple) NYql::NNodes::TExprBase MakeRowsFromTupleDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + const TVector& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); NYql::NNodes::TMaybeNode MakeConditionalInsertRows(const NYql::NNodes::TExprBase& input, const NYql::TKikimrTableDescription& table, const TMaybe>& inputColumn, bool abortOnError, diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp index 3bb2e9a50dc2..891ae7164183 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp @@ -238,7 +238,7 @@ TMaybeNode PrecomputeTableLookupDict(const TDqPhyPrecompute& l } TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, TPositionHandle pos, TExprContext& ctx) + const TVector& columns, TPositionHandle pos, TExprContext& ctx) { THashSet dictKeysSet(dictKeys.begin(), dictKeys.end()); auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple")); @@ -296,7 +296,7 @@ TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector& } TExprBase MakeRowsFromTupleDict(const TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, TPositionHandle pos, TExprContext& ctx) + const TVector& columns, TPositionHandle pos, TExprContext& ctx) { THashSet dictKeysSet(dictKeys.begin(), dictKeys.end()); auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple")); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp index 6553dc98514b..b158dbb9ab73 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp @@ -12,7 +12,7 @@ using namespace NYql::NNodes; namespace { TExprBase MakeInsertIndexRows(const TDqPhyPrecompute& inputRows, const TKikimrTableDescription& table, - const THashSet& inputColumns, const THashSet& indexColumns, + const THashSet& inputColumns, const TVector& indexColumns, TPositionHandle pos, TExprContext& ctx) { auto inputRowArg = TCoArgument(ctx.NewArgument(pos, "input_row")); @@ -113,19 +113,24 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq effects.emplace_back(upsertTable); for (const auto& [tableNode, indexDesc] : indexes) { - THashSet indexTableColumns; + THashSet indexTableColumnsSet; + TVector indexTableColumns; for (const auto& column : indexDesc->KeyColumns) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); } for (const auto& column : table.Metadata->KeyColumnNames) { - indexTableColumns.insert(column); + if (indexTableColumnsSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } } for (const auto& column : indexDesc->DataColumns) { if (inputColumnsSet.contains(column)) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); } } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index 032bff0e69e9..6fa52f0e5cde 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -150,7 +150,7 @@ TExprBase MakeNonexistingRowsFilter(const TDqPhyPrecompute& inputRows, const TDq TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecompute& inputRows, const TDqPhyPrecompute& lookupDict, const THashSet& inputColumns, - const THashSet& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos, + const TVector& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx, bool opt) { // Check if we can update index table from just input data @@ -686,9 +686,11 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, for (const auto& [tableNode, indexDesc] : indexes) { bool indexKeyColumnsUpdated = false; - THashSet indexTableColumns; + THashSet indexTableColumnsSet; + TVector indexTableColumns; for (const auto& column : indexDesc->KeyColumns) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); if (mode == TKqpPhyUpsertIndexMode::UpdateOn && table.GetKeyColumnIndex(column)) { // Table PK cannot be updated, so don't consider PK columns update as index update @@ -701,7 +703,9 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, } for (const auto& column : pk) { - indexTableColumns.insert(column); + if (indexTableColumnsSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } } auto indexTableColumnsWithoutData = indexTableColumns; @@ -710,7 +714,8 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, bool optUpsert = true; for (const auto& column : indexDesc->DataColumns) { // TODO: Conder not fetching/updating data columns without input value. - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); if (inputColumnsSet.contains(column)) { indexDataColumnsUpdated = true; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp index 18a439af0c21..779788fbad9c 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp @@ -83,6 +83,10 @@ TCoAtomList BuildColumnsList(const THashSet& columns, TPositionHandl return BuildColumnsListImpl(columns, pos, ctx); } +TCoAtomList BuildColumnsList(const TVector& columns, NYql::TPositionHandle pos,NYql::TExprContext& ctx) { + return BuildColumnsListImpl(columns, pos, ctx); +} + TCoAtomList BuildColumnsList(const TVector& columns, TPositionHandle pos, TExprContext& ctx) { return BuildColumnsListImpl(columns, pos, ctx); } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h index 0e22dbac8e4f..867694931fa9 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h @@ -19,6 +19,9 @@ NYql::NNodes::TMaybeNode BuildLookupKeysPrecompu NYql::NNodes::TCoAtomList BuildColumnsList(const THashSet& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); +NYql::NNodes::TCoAtomList BuildColumnsList(const TVector& columns, NYql::TPositionHandle pos, + NYql::TExprContext& ctx); + NYql::NNodes::TCoAtomList BuildColumnsList(const TVector& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); diff --git a/ydb/core/kqp/ut/tx/kqp_sink_common.h b/ydb/core/kqp/ut/tx/kqp_sink_common.h index 9dfcad60ed43..80dae769b18f 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_common.h +++ b/ydb/core/kqp/ut/tx/kqp_sink_common.h @@ -18,12 +18,13 @@ class TTableDataModificationTester { std::unique_ptr Kikimr; YDB_ACCESSOR(bool, IsOlap, false); YDB_ACCESSOR(bool, FastSnapshotExpiration, false); + YDB_ACCESSOR(bool, DisableSinks, false); virtual void DoExecute() = 0; public: void Execute() { - AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); - AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(!DisableSinks); + AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(!DisableSinks); AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false); if (FastSnapshotExpiration) { diff --git a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp index 159d8154a544..83a62f070faf 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp @@ -299,6 +299,95 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) { tester.SetIsOlap(true); tester.Execute(); } + + class TNamedStatement : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + + { + auto result = session1.ExecuteQuery(Q_(R"( + $data = SELECT * FROM `/Root/KV`; + DELETE FROM `/Root/KV` WHERE 1=1; + SELECT COUNT(*) FROM `/Root/KV`; + SELECT COUNT(*) FROM $data; + DELETE FROM `/Root/KV` ON SELECT 424242u AS Key, "One" As Value; + UPSERT INTO `/Root/KV` (Key, Value) VALUES (424242u, "One"); + SELECT COUNT(*) FROM `/Root/KV`; + SELECT COUNT(*) FROM $data; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(2))); + CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(3))); + } + } + }; + + Y_UNIT_TEST(OltpNamedStatementNoSink) { + TNamedStatement tester; + tester.SetDisableSinks(true); + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OltpNamedStatement) { + TNamedStatement tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OlapNamedStatement) { + TNamedStatement tester; + tester.SetIsOlap(true); + tester.Execute(); + } + + class TMultiSinks: public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + + { + auto result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "1"); + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "2"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = session1.ExecuteQuery(Q_(R"( + SELECT Value FROM `/Root/KV` WHERE Key = 1u; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + }; + + Y_UNIT_TEST(OltpMultiSinksNoSinks) { + TMultiSinks tester; + tester.SetDisableSinks(true); + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OltpMultiSinks) { + TMultiSinks tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OlapMultiSinks) { + TMultiSinks tester; + tester.SetIsOlap(true); + tester.Execute(); + } } } // namespace NKqp