diff --git a/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp b/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp index 53bd14fa3576..0327648b86d3 100644 --- a/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp +++ b/ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp @@ -73,11 +73,11 @@ Y_UNIT_TEST_SUITE(TBackupTests) { Backup(runtime, ToString(Codec), R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )", [](TTestBasicRuntime& runtime) { - WriteRow(runtime, "a", "valueA"); + UpdateRow(runtime, "Table", 1, "valueA"); }); } @@ -86,17 +86,17 @@ Y_UNIT_TEST_SUITE(TBackupTests) { Backup(runtime, ToString(Codec), R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] SplitBoundary { KeyPrefix { - Tuple { Optional { Text: "b" } } + Tuple { Optional { Uint32: 2 } } } } )", [](TTestBasicRuntime& runtime) { - WriteRow(runtime, "a", "valueA", TTestTxConfig::FakeHiveTablets + 0); - WriteRow(runtime, "b", "valueb", TTestTxConfig::FakeHiveTablets + 1); + UpdateRow(runtime, "Table", 1, "valueA", TTestTxConfig::FakeHiveTablets + 0); + UpdateRow(runtime, "Table", 2, "valueb", TTestTxConfig::FakeHiveTablets + 1); }); } @@ -107,12 +107,12 @@ Y_UNIT_TEST_SUITE(TBackupTests) { const auto actualResult = Backup(runtime, ToString(Codec), R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )", [](TTestBasicRuntime& runtime) { for (ui32 i = 0; i < 100 * batchSize; ++i) { - WriteRow(runtime, Sprintf("a%d", i), "valueA"); + UpdateRow(runtime, "Table", i, "valueA"); } }, batchSize, minWriteBatchSize); diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 0c0e396adf9b..4a8ad8b32170 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -687,14 +687,14 @@ partitioning_settings { TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); - WriteRow(runtime, "a", "valueA"); - WriteRow(runtime, "b", "valueB"); + UpdateRow(runtime, "Table", 1, "valueA"); + UpdateRow(runtime, "Table", 2, "valueB"); runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); @@ -827,14 +827,14 @@ partitioning_settings { TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); for (int i = 1; i < 500; ++i) { - WriteRow(runtime, Sprintf("a%i", i), "value"); + UpdateRow(runtime, "Table", i, "value"); } // trigger memtable's compaction @@ -925,13 +925,13 @@ partitioning_settings { TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); - WriteRow(runtime, "a", "valueA"); + UpdateRow(runtime, "Table", 1, "valueA"); TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -1277,14 +1277,14 @@ partitioning_settings { TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); - WriteRow(runtime, "a", "valueA"); - WriteRow(runtime, "b", "valueB"); + UpdateRow(runtime, "Table", 1, "valueA"); + UpdateRow(runtime, "Table", 2, "valueB"); runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); TPortManager portManager; diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 8f292a6810de..76879b55d259 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include #include @@ -2256,31 +2258,31 @@ namespace NSchemeShardUT_Private { NKikimr::NPQ::CmdWrite(&runtime, tabletId, edge, partitionId, "sourceid0", msgSeqNo, data, false, {}, true, cookie, 0); } - void WriteRow(TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId) { + void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId) { NKikimrMiniKQL::TResult result; TString error; NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( ( - (let key '( '('key (Utf8 '%s) ) ) ) + (let key '( '('key (Uint32 '%d) ) ) ) (let row '( '('value (Utf8 '%s) ) ) ) - (return (AsList (UpdateRow '__user__Table key row) )) + (return (AsList (UpdateRow '__user__%s key row) )) ) - )", key.c_str(), value.c_str()), result, error); + )", key, value.c_str(), table.c_str()), result, error); UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); UNIT_ASSERT_VALUES_EQUAL(error, ""); } - void WriteRowPg(TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId) { + void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId) { NKikimrMiniKQL::TResult result; TString error; NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( ( - (let key '( '('key (Utf8 '%s) ) ) ) + (let key '( '('key (Utf8 '%d) ) ) ) (let row '( '('value (PgConst '%u (PgType 'int4)) ) ) ) - (return (AsList (UpdateRow '__user__Table key row) )) + (return (AsList (UpdateRow '__user__%s key row) )) ) - )", key.c_str(), value), result, error); + )", key, value, table.c_str()), result, error); UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); UNIT_ASSERT_VALUES_EQUAL(error, ""); @@ -2291,6 +2293,7 @@ namespace NSchemeShardUT_Private { auto tableDesc = DescribePath(runtime, tablePath, true, true); const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); UNIT_ASSERT(partitionIdx < tablePartitions.size()); + const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId(); auto ev = MakeHolder(); ev->Record.SetTableId(tableDesc.GetPathId()); @@ -2314,7 +2317,34 @@ namespace NSchemeShardUT_Private { } const auto& sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release()); + ForwardToTablet(runtime, datashardTabletId, sender, ev.Release()); runtime.GrabEdgeEvent(sender); } + + void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) { + auto tableDesc = DescribePath(runtime, tablePath, true, true); + const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); + UNIT_ASSERT(partitionIdx < tablePartitions.size()); + const ui64 tableId = tableDesc.GetPathId(); + const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId(); + + const auto& sender = runtime.AllocateEdgeActor(); + + std::vector columnIds{1, 2}; + + TVector cells{TCell((const char*)&key, sizeof(ui32)), TCell(value.c_str(), value.size())}; + + TSerializedCellMatrix matrix(cells, 1, 2); + + auto evWrite = std::make_unique(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer())); + evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); + + ForwardToTablet(runtime, datashardTabletId, sender, evWrite.release()); + + auto ev = runtime.GrabEdgeEventRethrow(sender); + auto status = ev->Get()->Record.GetStatus(); + + UNIT_ASSERT_C(successIsExpected == (status == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED), "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues()); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 960f228fa3ef..6dd806cfd179 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -546,8 +546,9 @@ namespace NSchemeShardUT_Private { void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize); void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message); - void WriteRow(TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets); - void WriteRowPg(TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets); + void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets); + void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets); void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector& keyTags, const TVector& valueTags, const TVector& recordIds); + void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true); } //NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp b/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp index af8d11da6462..bbda860bc133 100644 --- a/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp +++ b/ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp @@ -2816,21 +2816,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { TTestEnv env(runtime, opts); const auto sender = runtime.AllocateEdgeActor(); - auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) { - NKikimrMiniKQL::TResult result; - TString error; - NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( - ( - (let key '( '('key (Uint32 '%u ) ) ) ) - (let row '( '('value (Utf8 '%s) ) ) ) - (return (AsList (UpdateRow '__user__%s key row) )) - ) - )", key, value.c_str(), table), result, error); - - UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); - UNIT_ASSERT_VALUES_EQUAL(error, ""); - }; - auto waitForTableStats = [&](ui32 shards) { TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvDataShard::EvPeriodicTableStats, shards)); @@ -2865,7 +2850,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { )", {NKikimrScheme::StatusAccepted}); env.TestWaitNotification(runtime, txId); - writeRow(tabletId, 1, "value1", "Table1"); + UpdateRow(runtime, "Table1", 1, "value1", tabletId); waitForTableStats(1); auto du = getDiskSpaceUsage(); @@ -2888,8 +2873,8 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { )", {NKikimrScheme::StatusAccepted}); env.TestWaitNotification(runtime, txId); - writeRow(tabletId + 0, 1, "value1", "Table2"); - writeRow(tabletId + 1, 2, "value2", "Table2"); + UpdateRow(runtime, "Table2", 1, "value1", tabletId + 0); + UpdateRow(runtime, "Table2", 2, "value2", tabletId + 1); waitForTableStats(1 /* Table1 */ + 2 /* Table2 */); auto du = getDiskSpaceUsage(); @@ -2910,21 +2895,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { TTestEnv env(runtime, opts); ui64 txId = 100; - auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) { - NKikimrMiniKQL::TResult result; - TString error; - NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( - ( - (let key '( '('key (Uint32 '%u ) ) ) ) - (let row '( '('value (Utf8 '%s) ) ) ) - (return (AsList (UpdateRow '__user__%s key row) )) - ) - )", key, value.c_str(), table), result, error); - - UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); - UNIT_ASSERT_VALUES_EQUAL(error, ""); - }; - auto waitForTableStats = [&](ui32 shards) { TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvDataShard::EvPeriodicTableStats, shards)); @@ -2972,7 +2942,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { )", {NKikimrScheme::StatusAccepted}); env.TestWaitNotification(runtime, txId); - writeRow(tabletId, 1, "value1", "Table1"); + UpdateRow(runtime, "Table1", 1, "value1", tabletId); waitForTableStats(1); TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_0"),