diff --git a/src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc b/src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc index acc5c4550669..cbece716dbe2 100644 --- a/src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc +++ b/src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc @@ -67,23 +67,30 @@ DECLARE_uint64(TEST_delay_rpc_status_req_callback_ms); DECLARE_int32(TEST_txn_participant_inject_delay_on_start_shutdown_ms); DECLARE_string(ysql_pg_conf_csv); DECLARE_uint64(transaction_heartbeat_usec); +DECLARE_uint64(ysql_session_max_batch_size); using namespace std::literals; namespace yb { namespace pgwrapper { +YB_STRONGLY_TYPED_BOOL(UseMaxBatchSize1); + class PgWaitQueuesTest : public PgMiniTestBase { protected: void SetUp() override { ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_wait_queues) = true; ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_select_all_status_tablets) = true; ANNOTATE_UNPROTECTED_WRITE(FLAGS_force_single_shard_waiter_retry_ms) = 10000; - ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_pg_conf_csv) = MaxQueryLayerRetriesConf(0); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_pg_conf_csv) = GetYsqlPgConf(); ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_read_committed_isolation) = true; PgMiniTestBase::SetUp(); } + virtual std::string GetYsqlPgConf() const { + return MaxQueryLayerRetriesConf(0); + } + CoarseTimePoint GetDeadlockDetectedDeadline() const { return CoarseMonoClock::Now() + 5s; } @@ -101,12 +108,21 @@ class PgWaitQueuesTest : public PgMiniTestBase { } void TestDeadlockWithWrites() const; + void TestParallelUpdatesDetectDeadlock() const; + void TestMultiTabletFairness() const; virtual IsolationLevel GetIsolationLevel() const { return SNAPSHOT_ISOLATION; } }; +class PgWaitQueuesMaxBatchSize1Test : public PgWaitQueuesTest { + protected: + std::string GetYsqlPgConf() const override { + return Format("$0,ysql_session_max_batch_size=1", PgWaitQueuesTest::GetYsqlPgConf()); + } +}; + auto GetBlockerIdx(auto idx, auto cycle_length) { return (idx / cycle_length) * cycle_length + (idx + 1) % cycle_length; } @@ -1049,7 +1065,7 @@ TEST_F( status_future.get().ToString(), "could not serialize access due to concurrent update"); } -TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) { +void PgWaitQueuesTest::TestParallelUpdatesDetectDeadlock() const { // Tests that wait-for dependencies of a distributed waiter txn waiting at different tablets, and // possibly different tablet servers, are not overwritten at the deadlock detector. constexpr int kNumKeys = 20; @@ -1111,6 +1127,14 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) } } +TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) { + TestParallelUpdatesDetectDeadlock(); +} + +TEST_F(PgWaitQueuesMaxBatchSize1Test, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) { + TestParallelUpdatesDetectDeadlock(); +} + TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(DeadlockResolvesYoungestTxn)) { // Tests that in a large cyclic deadlock, the youngest transaction is always the *only* // transaction which is aborted. @@ -1184,7 +1208,7 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(DeadlockResolvesYoungestTxn)) { } } -TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) { +void PgWaitQueuesTest::TestMultiTabletFairness() const { constexpr int kNumUpdateConns = 20; constexpr int kNumKeys = 40; // This test specifically ensures 2 aspects when transactions simultaneously contend on @@ -1236,12 +1260,20 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) { auto values = ASSERT_RESULT( setup_conn.FetchRows(update_analyze_query)); const auto storage_write_requests_text = Format("Storage Write Requests: $0", kNumKeys); + uint64 batch_size = std::stoi(ASSERT_RESULT( + setup_conn.FetchRow("SHOW ysql_session_max_batch_size;"))); + // When the guc reflects the default value of 0, the actual batch size is controlled by the gflag. + if (!batch_size) { + batch_size = ANNOTATE_UNPROTECTED_READ(FLAGS_ysql_session_max_batch_size); + } + auto num_flushes = batch_size >= kNumKeys ? 1 : ceil(kNumKeys / (1.0 * batch_size)); + auto flush_requests_text = Format("Storage Flush Requests: $0", num_flushes); bool found_flush_requests_line = false; bool found_write_requests_line = false; for (const auto& value : values) { if (value.find(storage_write_requests_text) != std::string::npos) { found_write_requests_line = true; - } else if (value.find("Storage Flush Requests: 1") != std::string::npos) { + } else if (value.find(flush_requests_text) != std::string::npos) { found_flush_requests_line = true; } } @@ -1336,6 +1368,14 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) { } } +TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) { + TestMultiTabletFairness(); +} + +TEST_F(PgWaitQueuesMaxBatchSize1Test, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) { + TestMultiTabletFairness(); +} + #ifndef NDEBUG TEST_F(PgWaitQueuesTest, TestDDLsNotBlockedOnWaiters) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 120000; @@ -1427,15 +1467,27 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(TestMultipleRequestsPerTxn)) { ASSERT_EQ(value.get(), 2); } -class PgWaitQueueRF1Test : public PgWaitQueuesTest { +class PgWaitQueueRF1Test + : public PgWaitQueuesMaxBatchSize1Test, public testing::WithParamInterface { protected: + std::string GetYsqlPgConf() const override { + if (auto use_max_batch_size_as_1 = GetParam()) { + return PgWaitQueuesMaxBatchSize1Test::GetYsqlPgConf(); + } + return PgWaitQueuesTest::GetYsqlPgConf(); + } + size_t NumTabletServers() override { return 1; } }; +INSTANTIATE_TEST_SUITE_P(, PgWaitQueueRF1Test, ::testing::Values(UseMaxBatchSize1::kFalse)); +INSTANTIATE_TEST_SUITE_P( + MaxBatchSize1, PgWaitQueueRF1Test, ::testing::Values(UseMaxBatchSize1::kTrue)); + #ifndef NDEBUG -TEST_F(PgWaitQueueRF1Test, TestResumingWaitersDoesntBlockTabletShutdown) { +TEST_P(PgWaitQueueRF1Test, TestResumingWaitersDoesntBlockTabletShutdown) { static const char* sync_points[1][4] = { {"WaitQueue::Impl::SetupWaiterUnlocked:1", "PgWaitQueueRF1Test::CommitConnection1:1", "TabletPeer::StartShutdown:1", "WaiterData::Impl::InvokeCallback:1"}}; @@ -1484,7 +1536,7 @@ TEST_F(PgWaitQueueRF1Test, TestResumingWaitersDoesntBlockTabletShutdown) { // when multiple wait-for dependencies of the the same transaction are forwarded from different // tablets (or from different rpc requests at the same tablet), and validates that the detector // detects deadlocks without the need for waiter requests to re-enter the wait-queue. -TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTablets)) { +TEST_P(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTablets)) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 30000; auto setup_conn = ASSERT_RESULT(Connect()); ASSERT_OK(setup_conn.Execute("CREATE TABLE foo (k INT PRIMARY KEY, v INT)")); @@ -1492,16 +1544,10 @@ TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTab auto conn1 = ASSERT_RESULT(Connect()); ASSERT_OK(conn1.StartTransaction(IsolationLevel::READ_COMMITTED)); - // Decrease the batch size so as to simulate multiple requests at the same tablet. Helps - // assert the logic being tested better. - ASSERT_OK(conn1.Execute("SET ysql_session_max_batch_size=1")); - ASSERT_OK(conn1.Execute("SET ysql_max_in_flight_ops=20")); ASSERT_OK(conn1.Execute("UPDATE foo SET v=1 WHERE k=1")); auto conn2 = ASSERT_RESULT(Connect()); ASSERT_OK(conn2.StartTransaction(IsolationLevel::READ_COMMITTED)); - ASSERT_OK(conn2.Execute("SET ysql_session_max_batch_size=1")); - ASSERT_OK(conn2.Execute("SET ysql_max_in_flight_ops=20")); ASSERT_OK(conn2.Execute("UPDATE foo SET v=2 WHERE k=2")); auto conn3 = ASSERT_RESULT(Connect()); @@ -1526,7 +1572,7 @@ TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTab conn2.CommitTransaction().ok()); } -TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDetectorPreservesBlockerSubtxnInfo)) { +TEST_P(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDetectorPreservesBlockerSubtxnInfo)) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 30000; ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_waiter_resumption_on_blocking_subtxn_rollback) = true; auto setup_conn = ASSERT_RESULT(Connect()); @@ -1535,8 +1581,6 @@ TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDetectorPreservesBlockerS auto conn1 = ASSERT_RESULT(Connect()); ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); - ASSERT_OK(conn1.Execute("SET ysql_session_max_batch_size=1")); - ASSERT_OK(conn1.Execute("SET ysql_max_in_flight_ops=20")); ASSERT_OK(conn1.Execute("UPDATE foo SET v=1 WHERE k=1")); auto conn2 = ASSERT_RESULT(Connect());