Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UpdateRow & WriteRow #663

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ Y_UNIT_TEST_SUITE(TBackupTests) {

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
WriteRow(runtime, "a", "valueA");
UpdateRow(runtime, "Table", 1, "valueA");
});
}

Expand All @@ -86,17 +86,17 @@ Y_UNIT_TEST_SUITE(TBackupTests) {

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
SplitBoundary {
KeyPrefix {
Tuple { Optional { Text: "b" } }
Tuple { Optional { Uint32: 2 } }
}
}
)", [](TTestBasicRuntime& runtime) {
WriteRow(runtime, "a", "valueA", TTestTxConfig::FakeHiveTablets + 0);
WriteRow(runtime, "b", "valueb", TTestTxConfig::FakeHiveTablets + 1);
UpdateRow(runtime, "Table", 1, "valueA", TTestTxConfig::FakeHiveTablets + 0);
UpdateRow(runtime, "Table", 2, "valueb", TTestTxConfig::FakeHiveTablets + 1);
});
}

Expand All @@ -107,12 +107,12 @@ Y_UNIT_TEST_SUITE(TBackupTests) {

const auto actualResult = Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
for (ui32 i = 0; i < 100 * batchSize; ++i) {
WriteRow(runtime, Sprintf("a%d", i), "valueA");
UpdateRow(runtime, "Table", i, "valueA");
}
}, batchSize, minWriteBatchSize);

Expand Down
20 changes: 10 additions & 10 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,14 +687,14 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, "a", "valueA");
WriteRow(runtime, "b", "valueB");
UpdateRow(runtime, "Table", 1, "valueA");
UpdateRow(runtime, "Table", 2, "valueB");

runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
Expand Down Expand Up @@ -827,14 +827,14 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

for (int i = 1; i < 500; ++i) {
WriteRow(runtime, Sprintf("a%i", i), "value");
UpdateRow(runtime, "Table", i, "value");
}

// trigger memtable's compaction
Expand Down Expand Up @@ -925,13 +925,13 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, "a", "valueA");
UpdateRow(runtime, "Table", 1, "valueA");

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand Down Expand Up @@ -1277,14 +1277,14 @@ partitioning_settings {

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, "a", "valueA");
WriteRow(runtime, "b", "valueB");
UpdateRow(runtime, "Table", 1, "valueA");
UpdateRow(runtime, "Table", 2, "valueB");
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG);

TPortManager portManager;
Expand Down
48 changes: 39 additions & 9 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/engine/mkql_proto.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/data_events/payload_helper.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/persqueue/events/global.h>
Expand Down Expand Up @@ -2256,31 +2258,31 @@ namespace NSchemeShardUT_Private {
NKikimr::NPQ::CmdWrite(&runtime, tabletId, edge, partitionId, "sourceid0", msgSeqNo, data, false, {}, true, cookie, 0);
}

void WriteRow(TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId) {
void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Utf8 '%s) ) ) )
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Utf8 '%s) ) ) )
(return (AsList (UpdateRow '__user__Table key row) ))
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key.c_str(), value.c_str()), result, error);
)", key, value.c_str(), table.c_str()), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

void WriteRowPg(TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId) {
void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Utf8 '%s) ) ) )
(let key '( '('key (Utf8 '%d) ) ) )
(let row '( '('value (PgConst '%u (PgType 'int4)) ) ) )
(return (AsList (UpdateRow '__user__Table key row) ))
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key.c_str(), value), result, error);
)", key, value, table.c_str()), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
Expand All @@ -2291,6 +2293,7 @@ namespace NSchemeShardUT_Private {
auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
ev->Record.SetTableId(tableDesc.GetPathId());
Expand All @@ -2314,7 +2317,34 @@ namespace NSchemeShardUT_Private {
}

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release());
ForwardToTablet(runtime, datashardTabletId, sender, ev.Release());
runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
}

void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) {
auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 tableId = tableDesc.GetPathId();
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

const auto& sender = runtime.AllocateEdgeActor();

std::vector<ui32> columnIds{1, 2};

TVector<TCell> cells{TCell((const char*)&key, sizeof(ui32)), TCell(value.c_str(), value.size())};

TSerializedCellMatrix matrix(cells, 1, 2);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer()));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

ForwardToTablet(runtime, datashardTabletId, sender, evWrite.release());

auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
auto status = ev->Get()->Record.GetStatus();

UNIT_ASSERT_C(successIsExpected == (status == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED), "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
}
}
5 changes: 3 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,9 @@ namespace NSchemeShardUT_Private {

void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize);
void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message);
void WriteRow(TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void WriteRowPg(TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds);
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

} //NSchemeShardUT_Private
38 changes: 4 additions & 34 deletions ydb/core/tx/schemeshard/ut_subdomain/ut_subdomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2816,21 +2816,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
TTestEnv env(runtime, opts);
const auto sender = runtime.AllocateEdgeActor();

auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%u ) ) ) )
(let row '( '('value (Utf8 '%s) ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key, value.c_str(), table), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
};

auto waitForTableStats = [&](ui32 shards) {
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvDataShard::EvPeriodicTableStats, shards));
Expand Down Expand Up @@ -2865,7 +2850,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
)", {NKikimrScheme::StatusAccepted});
env.TestWaitNotification(runtime, txId);

writeRow(tabletId, 1, "value1", "Table1");
UpdateRow(runtime, "Table1", 1, "value1", tabletId);
waitForTableStats(1);

auto du = getDiskSpaceUsage();
Expand All @@ -2888,8 +2873,8 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
)", {NKikimrScheme::StatusAccepted});
env.TestWaitNotification(runtime, txId);

writeRow(tabletId + 0, 1, "value1", "Table2");
writeRow(tabletId + 1, 2, "value2", "Table2");
UpdateRow(runtime, "Table2", 1, "value1", tabletId + 0);
UpdateRow(runtime, "Table2", 2, "value2", tabletId + 1);
waitForTableStats(1 /* Table1 */ + 2 /* Table2 */);

auto du = getDiskSpaceUsage();
Expand All @@ -2910,21 +2895,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
TTestEnv env(runtime, opts);
ui64 txId = 100;

auto writeRow = [&](ui64 tabletId, ui32 key, const TString& value, const char* table) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%u ) ) ) )
(let row '( '('value (Utf8 '%s) ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", key, value.c_str(), table), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
};

auto waitForTableStats = [&](ui32 shards) {
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvDataShard::EvPeriodicTableStats, shards));
Expand Down Expand Up @@ -2972,7 +2942,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
)", {NKikimrScheme::StatusAccepted});
env.TestWaitNotification(runtime, txId);

writeRow(tabletId, 1, "value1", "Table1");
UpdateRow(runtime, "Table1", 1, "value1", tabletId);
waitForTableStats(1);

TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_0"),
Expand Down
Loading