Skip to content

Commit

Permalink
New volatile EvWrite tests (#2253)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Feb 27, 2024
1 parent bd3cbeb commit 48a641a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
48 changes: 37 additions & 11 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, StreamLookup) {
}
}

Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) {
Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
Expand All @@ -1467,11 +1467,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) {

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
{
auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{};
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
}

TString sessionId = CreateSessionRPC(runtime);

Expand Down Expand Up @@ -1499,11 +1504,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) {
};
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);

auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{};
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);

// Send a commit request, it would block on readset exchange
auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true));

evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};

// Wait until we captured both readsets
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2;
{
Expand Down Expand Up @@ -1550,7 +1560,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) {
}
}

Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup);
Expand All @@ -1570,11 +1580,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
{
auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{};
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));
}

TString sessionId = CreateSessionRPC(runtime);

Expand Down Expand Up @@ -1603,11 +1618,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
};
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);

auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{};
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);

// Send a commit request, it would block on readset exchange
auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true));

evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};

// Wait until we captured both readsets
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2;
if (readSets.size() < expectedReadSets) {
Expand All @@ -1622,7 +1642,10 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {

// Now send non-conflicting upsert to both tables
{
blockReadSets = false; // needed for volatile transactions
auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {5, 3}}, {tableId2, {6, 3}}} : TEvWriteRows{};
auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1);

blockReadSets = false; // needed for volatile transactions
auto result = KqpSimpleExec(runtime, Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3);
UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 3))"));
Expand All @@ -1632,6 +1655,9 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {

// Check that immediate non-conflicting upsert is working too
{
auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {7, 4}}} : TEvWriteRows{};
auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1);

auto result = KqpSimpleExec(runtime, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4)"));
UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3698,7 +3698,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.CheckLockBroken(tableName, 10, {11, 11, 11}, lockTxId, *readResult1);
}

Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips2) {
Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips2, EvWrite) {
// Almost the same as ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips:
// 1. tx1: read some **non-existing** range1
// 2. tx2: upsert into range2 > range1 range and commit.
Expand All @@ -3723,6 +3723,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1);
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0);

auto rows = EvWrite ? TEvWriteRows{{{300, 0, 0, 3000}}} : TEvWriteRows{};
auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(*helper.Server->GetRuntime(), rows);

// write new data above snapshot
ExecSQL(helper.Server, helper.Sender, R"(
SELECT * FROM `/Root/table-1` WHERE key1 == 300;
Expand Down

0 comments on commit 48a641a

Please sign in to comment.