Skip to content

Commit

Permalink
UpdateRow & WriteRow (ydb-platform#663)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored and marsaly79 committed Dec 22, 2023
1 parent d0c4b61 commit 22b0759
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 63 deletions.
16 changes: 8 additions & 8 deletions ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ Y_UNIT_TEST_SUITE(TBackupTests) {

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
WriteRow(runtime, "a", "valueA");
UpdateRow(runtime, "Table", 1, "valueA");
});
}

Expand All @@ -86,17 +86,17 @@ Y_UNIT_TEST_SUITE(TBackupTests) {

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
SplitBoundary {
KeyPrefix {
Tuple { Optional { Text: "b" } }
Tuple { Optional { Uint32: 2 } }
}
}
)", [](TTestBasicRuntime& runtime) {
WriteRow(runtime, "a", "valueA", TTestTxConfig::FakeHiveTablets + 0);
WriteRow(runtime, "b", "valueb", TTestTxConfig::FakeHiveTablets + 1);
UpdateRow(runtime, "Table", 1, "valueA", TTestTxConfig::FakeHiveTablets + 0);
UpdateRow(runtime, "Table", 2, "valueb", TTestTxConfig::FakeHiveTablets + 1);
});
}

Expand All @@ -107,12 +107,12 @@ Y_UNIT_TEST_SUITE(TBackupTests) {

const auto actualResult = Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
for (ui32 i = 0; i < 100 * batchSize; ++i) {
WriteRow(runtime, Sprintf("a%d", i), "valueA");
UpdateRow(runtime, "Table", i, "valueA");
}
}, batchSize, minWriteBatchSize);

Expand Down
20 changes: 10 additions & 10 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,14 +687,14 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, "a", "valueA");
WriteRow(runtime, "b", "valueB");
UpdateRow(runtime, "Table", 1, "valueA");
UpdateRow(runtime, "Table", 2, "valueB");

runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
Expand Down Expand Up @@ -827,14 +827,14 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

for (int i = 1; i < 500; ++i) {
WriteRow(runtime, Sprintf("a%i", i), "value");
UpdateRow(runtime, "Table", i, "value");
}

// trigger memtable's compaction
Expand Down Expand Up @@ -925,13 +925,13 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, "a", "valueA");
UpdateRow(runtime, "Table", 1, "valueA");

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand Down Expand Up @@ -1277,14 +1277,14 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, "a", "valueA");
WriteRow(runtime, "b", "valueB");
UpdateRow(runtime, "Table", 1, "valueA");
UpdateRow(runtime, "Table", 2, "valueB");
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG);

TPortManager portManager;
Expand Down
48 changes: 39 additions & 9 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/engine/mkql_proto.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/data_events/payload_helper.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/persqueue/events/global.h>
Expand Down Expand Up @@ -2256,31 +2258,31 @@ namespace NSchemeShardUT_Private {
NKikimr::NPQ::CmdWrite(&runtime, tabletId, edge, partitionId, "sourceid0", msgSeqNo, data, false, {}, true, cookie, 0);
}

void WriteRow(TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId) {
void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Utf8 '%s) ) ) )
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Utf8 '%s) ) ) )
(return (AsList (UpdateRow '__user__Table key row) ))
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key.c_str(), value.c_str()), result, error);
)", key, value.c_str(), table.c_str()), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

void WriteRowPg(TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId) {
void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Utf8 '%s) ) ) )
(let key '( '('key (Utf8 '%d) ) ) )
(let row '( '('value (PgConst '%u (PgType 'int4)) ) ) )
(return (AsList (UpdateRow '__user__Table key row) ))
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key.c_str(), value), result, error);
)", key, value, table.c_str()), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
Expand All @@ -2291,6 +2293,7 @@ namespace NSchemeShardUT_Private {
auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
ev->Record.SetTableId(tableDesc.GetPathId());
Expand All @@ -2314,7 +2317,34 @@ namespace NSchemeShardUT_Private {
}

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release());
ForwardToTablet(runtime, datashardTabletId, sender, ev.Release());
runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
}

void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) {
auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 tableId = tableDesc.GetPathId();
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

const auto& sender = runtime.AllocateEdgeActor();

std::vector<ui32> columnIds{1, 2};

TVector<TCell> cells{TCell((const char*)&key, sizeof(ui32)), TCell(value.c_str(), value.size())};

TSerializedCellMatrix matrix(cells, 1, 2);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer()));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

ForwardToTablet(runtime, datashardTabletId, sender, evWrite.release());

auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
auto status = ev->Get()->Record.GetStatus();

UNIT_ASSERT_C(successIsExpected == (status == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED), "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
}
}
5 changes: 3 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,9 @@ namespace NSchemeShardUT_Private {

void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize);
void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message);
void WriteRow(TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void WriteRowPg(TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds);
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

} //NSchemeShardUT_Private
38 changes: 4 additions & 34 deletions ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2816,21 +2816,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
TTestEnv env(runtime, opts);
const auto sender = runtime.AllocateEdgeActor();

auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%u ) ) ) )
(let row '( '('value (Utf8 '%s) ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key, value.c_str(), table), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
};

auto waitForTableStats = [&](ui32 shards) {
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvDataShard::EvPeriodicTableStats, shards));
Expand Down Expand Up @@ -2865,7 +2850,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
)", {NKikimrScheme::StatusAccepted});
env.TestWaitNotification(runtime, txId);

writeRow(tabletId, 1, "value1", "Table1");
UpdateRow(runtime, "Table1", 1, "value1", tabletId);
waitForTableStats(1);

auto du = getDiskSpaceUsage();
Expand All @@ -2888,8 +2873,8 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
)", {NKikimrScheme::StatusAccepted});
env.TestWaitNotification(runtime, txId);

writeRow(tabletId + 0, 1, "value1", "Table2");
writeRow(tabletId + 1, 2, "value2", "Table2");
UpdateRow(runtime, "Table2", 1, "value1", tabletId + 0);
UpdateRow(runtime, "Table2", 2, "value2", tabletId + 1);
waitForTableStats(1 /* Table1 */ + 2 /* Table2 */);

auto du = getDiskSpaceUsage();
Expand All @@ -2910,21 +2895,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
TTestEnv env(runtime, opts);
ui64 txId = 100;

auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%u ) ) ) )
(let row '( '('value (Utf8 '%s) ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key, value.c_str(), table), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
};

auto waitForTableStats = [&](ui32 shards) {
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvDataShard::EvPeriodicTableStats, shards));
Expand Down Expand Up @@ -2972,7 +2942,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
)", {NKikimrScheme::StatusAccepted});
env.TestWaitNotification(runtime, txId);

writeRow(tabletId, 1, "value1", "Table1");
UpdateRow(runtime, "Table1", 1, "value1", tabletId);
waitForTableStats(1);

TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_0"),
Expand Down

0 comments on commit 22b0759

Please sign in to comment.