Skip to content

Commit

Permalink
[#21430] DocDB: Update wait-queue unit tests to exercise smaller batc…
Browse files Browse the repository at this point in the history
…h size

Summary:
Running a couple of existing unit test would have uncovered issues like #18394 and #21736. Hence, enhancing the wait-queue test suite to trigger a few of the existing tests with smaller batch size.

Most of the `PgWaitQueuesTest` tests execute point updates/lookups that end up in the wait-queue. Re-running these tests with a smaller batch size doesn't give us any useful signal. To minimize test times, the diff exercises the smaller batch size codepath only for select `PgWaitQueuesTest` test with bulk updates/lookups. This approach also helps preserve test stability history.

For existing `PgWaitQueueRF1Test` tests, 2/3 execute bulk updates. Hence going with `INSTANTIATE_TEST_SUITE_P` approach for these tests, making all `PgWaitQueueRF1Test` tests execute under both small and normal batch sizes.
Jira: DB-10311

Test Plan:
Jenkins

./yb_build.sh --cxx-test pgwrapper_pg_wait_on_conflict-test

Reviewers: rsami, rthallam

Reviewed By: rsami

Subscribers: ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34454
  • Loading branch information
basavaraj29 committed May 1, 2024
1 parent d89a7e2 commit eccd4a8
Showing 1 changed file with 60 additions and 16 deletions.
76 changes: 60 additions & 16 deletions src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,30 @@ DECLARE_uint64(TEST_delay_rpc_status_req_callback_ms);
DECLARE_int32(TEST_txn_participant_inject_delay_on_start_shutdown_ms);
DECLARE_string(ysql_pg_conf_csv);
DECLARE_uint64(transaction_heartbeat_usec);
DECLARE_uint64(ysql_session_max_batch_size);

using namespace std::literals;

namespace yb {
namespace pgwrapper {

YB_STRONGLY_TYPED_BOOL(UseMaxBatchSize1);

class PgWaitQueuesTest : public PgMiniTestBase {
protected:
void SetUp() override {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_wait_queues) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_select_all_status_tablets) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_force_single_shard_waiter_retry_ms) = 10000;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_pg_conf_csv) = MaxQueryLayerRetriesConf(0);
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_pg_conf_csv) = GetYsqlPgConf();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_read_committed_isolation) = true;
PgMiniTestBase::SetUp();
}

virtual std::string GetYsqlPgConf() const {
return MaxQueryLayerRetriesConf(0);
}

CoarseTimePoint GetDeadlockDetectedDeadline() const {
return CoarseMonoClock::Now() + 5s;
}
Expand All @@ -101,12 +108,21 @@ class PgWaitQueuesTest : public PgMiniTestBase {
}

void TestDeadlockWithWrites() const;
void TestParallelUpdatesDetectDeadlock() const;
void TestMultiTabletFairness() const;

virtual IsolationLevel GetIsolationLevel() const {
return SNAPSHOT_ISOLATION;
}
};

class PgWaitQueuesMaxBatchSize1Test : public PgWaitQueuesTest {
protected:
std::string GetYsqlPgConf() const override {
return Format("$0,ysql_session_max_batch_size=1", PgWaitQueuesTest::GetYsqlPgConf());
}
};

auto GetBlockerIdx(auto idx, auto cycle_length) {
return (idx / cycle_length) * cycle_length + (idx + 1) % cycle_length;
}
Expand Down Expand Up @@ -1049,7 +1065,7 @@ TEST_F(
status_future.get().ToString(), "could not serialize access due to concurrent update");
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) {
void PgWaitQueuesTest::TestParallelUpdatesDetectDeadlock() const {
// Tests that wait-for dependencies of a distributed waiter txn waiting at different tablets, and
// possibly different tablet servers, are not overwritten at the deadlock detector.
constexpr int kNumKeys = 20;
Expand Down Expand Up @@ -1111,6 +1127,14 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock))
}
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) {
TestParallelUpdatesDetectDeadlock();
}

TEST_F(PgWaitQueuesMaxBatchSize1Test, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) {
TestParallelUpdatesDetectDeadlock();
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(DeadlockResolvesYoungestTxn)) {
// Tests that in a large cyclic deadlock, the youngest transaction is always the *only*
// transaction which is aborted.
Expand Down Expand Up @@ -1184,7 +1208,7 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(DeadlockResolvesYoungestTxn)) {
}
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) {
void PgWaitQueuesTest::TestMultiTabletFairness() const {
constexpr int kNumUpdateConns = 20;
constexpr int kNumKeys = 40;
// This test specifically ensures 2 aspects when transactions simultaneously contend on
Expand Down Expand Up @@ -1236,12 +1260,20 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) {
auto values = ASSERT_RESULT(
setup_conn.FetchRows<std::string>(update_analyze_query));
const auto storage_write_requests_text = Format("Storage Write Requests: $0", kNumKeys);
uint64 batch_size = std::stoi(ASSERT_RESULT(
setup_conn.FetchRow<std::string>("SHOW ysql_session_max_batch_size;")));
// When the guc reflects the default value of 0, the actual batch size is controlled by the gflag.
if (!batch_size) {
batch_size = ANNOTATE_UNPROTECTED_READ(FLAGS_ysql_session_max_batch_size);
}
auto num_flushes = batch_size >= kNumKeys ? 1 : ceil(kNumKeys / (1.0 * batch_size));
auto flush_requests_text = Format("Storage Flush Requests: $0", num_flushes);
bool found_flush_requests_line = false;
bool found_write_requests_line = false;
for (const auto& value : values) {
if (value.find(storage_write_requests_text) != std::string::npos) {
found_write_requests_line = true;
} else if (value.find("Storage Flush Requests: 1") != std::string::npos) {
} else if (value.find(flush_requests_text) != std::string::npos) {
found_flush_requests_line = true;
}
}
Expand Down Expand Up @@ -1336,6 +1368,14 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) {
}
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) {
TestMultiTabletFairness();
}

TEST_F(PgWaitQueuesMaxBatchSize1Test, YB_DISABLE_TEST_IN_TSAN(MultiTabletFairness)) {
TestMultiTabletFairness();
}

#ifndef NDEBUG
TEST_F(PgWaitQueuesTest, TestDDLsNotBlockedOnWaiters) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 120000;
Expand Down Expand Up @@ -1427,15 +1467,27 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(TestMultipleRequestsPerTxn)) {
ASSERT_EQ(value.get(), 2);
}

class PgWaitQueueRF1Test : public PgWaitQueuesTest {
class PgWaitQueueRF1Test
: public PgWaitQueuesMaxBatchSize1Test, public testing::WithParamInterface<UseMaxBatchSize1> {
protected:
std::string GetYsqlPgConf() const override {
if (auto use_max_batch_size_as_1 = GetParam()) {
return PgWaitQueuesMaxBatchSize1Test::GetYsqlPgConf();
}
return PgWaitQueuesTest::GetYsqlPgConf();
}

size_t NumTabletServers() override {
return 1;
}
};

INSTANTIATE_TEST_SUITE_P(, PgWaitQueueRF1Test, ::testing::Values(UseMaxBatchSize1::kFalse));
INSTANTIATE_TEST_SUITE_P(
MaxBatchSize1, PgWaitQueueRF1Test, ::testing::Values(UseMaxBatchSize1::kTrue));

#ifndef NDEBUG
TEST_F(PgWaitQueueRF1Test, TestResumingWaitersDoesntBlockTabletShutdown) {
TEST_P(PgWaitQueueRF1Test, TestResumingWaitersDoesntBlockTabletShutdown) {
static const char* sync_points[1][4] = {
{"WaitQueue::Impl::SetupWaiterUnlocked:1", "PgWaitQueueRF1Test::CommitConnection1:1",
"TabletPeer::StartShutdown:1", "WaiterData::Impl::InvokeCallback:1"}};
Expand Down Expand Up @@ -1484,24 +1536,18 @@ TEST_F(PgWaitQueueRF1Test, TestResumingWaitersDoesntBlockTabletShutdown) {
// when multiple wait-for dependencies of the the same transaction are forwarded from different
// tablets (or from different rpc requests at the same tablet), and validates that the detector
// detects deadlocks without the need for waiter requests to re-enter the wait-queue.
TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTablets)) {
TEST_P(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTablets)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 30000;
auto setup_conn = ASSERT_RESULT(Connect());
ASSERT_OK(setup_conn.Execute("CREATE TABLE foo (k INT PRIMARY KEY, v INT)"));
ASSERT_OK(setup_conn.Execute("INSERT INTO foo SELECT generate_series(0, 20), 0"));

auto conn1 = ASSERT_RESULT(Connect());
ASSERT_OK(conn1.StartTransaction(IsolationLevel::READ_COMMITTED));
// Decrease the batch size so as to simulate multiple requests at the same tablet. Helps
// assert the logic being tested better.
ASSERT_OK(conn1.Execute("SET ysql_session_max_batch_size=1"));
ASSERT_OK(conn1.Execute("SET ysql_max_in_flight_ops=20"));
ASSERT_OK(conn1.Execute("UPDATE foo SET v=1 WHERE k=1"));

auto conn2 = ASSERT_RESULT(Connect());
ASSERT_OK(conn2.StartTransaction(IsolationLevel::READ_COMMITTED));
ASSERT_OK(conn2.Execute("SET ysql_session_max_batch_size=1"));
ASSERT_OK(conn2.Execute("SET ysql_max_in_flight_ops=20"));
ASSERT_OK(conn2.Execute("UPDATE foo SET v=2 WHERE k=2"));

auto conn3 = ASSERT_RESULT(Connect());
Expand All @@ -1526,7 +1572,7 @@ TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDeadlockAcrossMultipleTab
conn2.CommitTransaction().ok());
}

TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDetectorPreservesBlockerSubtxnInfo)) {
TEST_P(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDetectorPreservesBlockerSubtxnInfo)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 30000;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_waiter_resumption_on_blocking_subtxn_rollback) = true;
auto setup_conn = ASSERT_RESULT(Connect());
Expand All @@ -1535,8 +1581,6 @@ TEST_F(PgWaitQueueRF1Test, YB_DISABLE_TEST_IN_TSAN(TestDetectorPreservesBlockerS

auto conn1 = ASSERT_RESULT(Connect());
ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
ASSERT_OK(conn1.Execute("SET ysql_session_max_batch_size=1"));
ASSERT_OK(conn1.Execute("SET ysql_max_in_flight_ops=20"));
ASSERT_OK(conn1.Execute("UPDATE foo SET v=1 WHERE k=1"));

auto conn2 = ASSERT_RESULT(Connect());
Expand Down

0 comments on commit eccd4a8

Please sign in to comment.