From 48a641ae6497aaae54c4479518154f4cabcf762e Mon Sep 17 00:00:00 2001 From: azevaykin <145343289+azevaykin@users.noreply.github.com> Date: Tue, 27 Feb 2024 10:28:15 +0300 Subject: [PATCH] New volatile EvWrite tests (#2253) --- ydb/core/tx/datashard/datashard_ut_order.cpp | 48 ++++++++++++++----- .../datashard/datashard_ut_read_iterator.cpp | 5 +- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 5fe289860682..5b21f4d8e197 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1446,7 +1446,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, StreamLookup) { } } -Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) { +Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); @@ -1467,11 +1467,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) { InitRoot(server, sender); - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); + auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); + { + auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{}; + auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); + } TString sessionId = CreateSessionRPC(runtime); @@ -1499,11 +1504,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) { }; auto prevObserverFunc = runtime.SetObserverFunc(captureRS); + auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{}; + auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); + // Send a commit request, it would block on readset exchange auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2); UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true)); + evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{}; + // Wait until we captured both readsets const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; { @@ -1550,7 +1560,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) { } } -Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { +Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(StreamLookup); @@ -1570,11 +1580,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { InitRoot(server, sender); - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); + auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + auto [shards2, tableId2] = CreateShardedTable(server, sender, "/Root", "table-2", 1); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); + { + auto rows = EvWrite ? TEvWriteRows{{tableId1, {1, 1}}, {tableId2, {2, 1}}} : TEvWriteRows{}; + auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); + } TString sessionId = CreateSessionRPC(runtime); @@ -1603,11 +1618,16 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { }; auto prevObserverFunc = runtime.SetObserverFunc(captureRS); + auto rows = EvWrite ? TEvWriteRows{{tableId1, {3, 2}}, {tableId2, {4, 2}}} : TEvWriteRows{}; + auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(runtime, rows); + // Send a commit request, it would block on readset exchange auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2); UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true)); + evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{}; + // Wait until we captured both readsets const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; if (readSets.size() < expectedReadSets) { @@ -1622,7 +1642,10 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { // Now send non-conflicting upsert to both tables { - blockReadSets = false; // needed for volatile transactions + auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {5, 3}}, {tableId2, {6, 3}}} : TEvWriteRows{}; + auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1); + + blockReadSets = false; // needed for volatile transactions auto result = KqpSimpleExec(runtime, Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3); UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 3))")); @@ -1632,6 +1655,9 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { // Check that immediate non-conflicting upsert is working too { + auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {7, 4}}} : TEvWriteRows{}; + auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite(runtime, rows1); + auto result = KqpSimpleExec(runtime, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4)")); UNIT_ASSERT_VALUES_EQUAL(result, ""); } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 6d1c266d7836..e7c66d488482 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -3698,7 +3698,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { helper.CheckLockBroken(tableName, 10, {11, 11, 11}, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips2) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips2, EvWrite) { // Almost the same as ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips: // 1. tx1: read some **non-existing** range1 // 2. tx2: upsert into range2 > range1 range and commit. @@ -3723,6 +3723,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0); + auto rows = EvWrite ? TEvWriteRows{{{300, 0, 0, 3000}}} : TEvWriteRows{}; + auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite(*helper.Server->GetRuntime(), rows); + // write new data above snapshot ExecSQL(helper.Server, helper.Sender, R"( SELECT * FROM `/Root/table-1` WHERE key1 == 300;