Skip to content

Commit

Permalink
[#18192] YSQL: Introduce interface to mock tserver response in MiniCl…
Browse files Browse the repository at this point in the history
…uster tests

Summary:
This revision introduces the ability to mock tserver responses to pggate RPCs in pg_client_service.
The goal is to be able to test hard-to-reproduce failure modes between pggate and the tserver deterministically by adding mocks.
As an example, it is now possible to emulate scenarios such as "Introduce network failure for FinishTransaction RPCs in Session X after successful completion of CreateTable RPC" which would
previously have required tinkering with a lot of gflags and concurrency constructs.

 All RPCs in `src/yb/tserver/pg_client.proto` are now mock-able.
Jira: DB-7215

Test Plan:
Run the following sample test:
```
./yb_build.sh --cxx-test pgwrapper_pg_mini-test --gtest-filter 'PgRecursiveAbortTest.AbortAfterTserverShutdown'
```

Reviewers: dmitry, pjain

Reviewed By: dmitry

Subscribers: ybase, pjain, smishra, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D34698
  • Loading branch information
karthik-ramanathan-3006 committed Jul 1, 2024
1 parent a446f27 commit f8e73e9
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 13 deletions.
63 changes: 63 additions & 0 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1954,4 +1954,67 @@ void PgClientServiceImpl::method( \
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_METHOD_DEFINE, ~, YB_PG_CLIENT_METHODS);
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_ASYNC_METHOD_DEFINE, ~, YB_PG_CLIENT_ASYNC_METHODS);

PgClientServiceMockImpl::PgClientServiceMockImpl(
const scoped_refptr<MetricEntity>& entity, PgClientServiceIf* impl)
: PgClientServiceIf(entity), impl_(impl) {}

PgClientServiceMockImpl::Handle PgClientServiceMockImpl::SetMock(
const std::string& method, SharedFunctor&& mock) {
{
std::lock_guard lock(mutex_);
mocks_[method] = mock;
}

return Handle{std::move(mock)};
}

Result<bool> PgClientServiceMockImpl::DispatchMock(
const std::string& method, const void* req, void* resp, rpc::RpcContext* context) {
SharedFunctor mock;
{
SharedLock lock(mutex_);
auto it = mocks_.find(method);
if (it != mocks_.end()) {
mock = it->second.lock();
}
}

if (!mock) {
return false;
}
RETURN_NOT_OK((*mock)(req, resp, context));
return true;
}

#define YB_PG_CLIENT_MOCK_METHOD_DEFINE(r, data, method) \
void PgClientServiceMockImpl::method( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB) * req, \
BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) * resp, rpc::RpcContext context) { \
auto result = DispatchMock(BOOST_PP_STRINGIZE(method), req, resp, &context); \
if (!result.ok() || *result) { \
Respond(ResultToStatus(result), resp, &context); \
return; \
} \
impl_->method(req, resp, std::move(context)); \
}

template <class Req, class Resp>
auto MakeSharedFunctor(const std::function<Status(const Req*, Resp*, rpc::RpcContext*)>& func) {
return std::make_shared<PgClientServiceMockImpl::Functor>(
[func](const void* req, void* resp, rpc::RpcContext* context) {
return func(pointer_cast<const Req*>(req), pointer_cast<Resp*>(resp), context);
});
}

#define YB_PG_CLIENT_MOCK_METHOD_SETTER_DEFINE(r, data, method) \
PgClientServiceMockImpl::Handle BOOST_PP_CAT(PgClientServiceMockImpl::Mock, method)( \
const std::function<Status( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)*, \
BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB)*, rpc::RpcContext*)>& mock) { \
return SetMock(BOOST_PP_STRINGIZE(method), MakeSharedFunctor(mock)); \
}

BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_MOCK_METHOD_DEFINE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_MOCK_METHOD_SETTER_DEFINE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);

} // namespace yb::tserver
45 changes: 45 additions & 0 deletions src/yb/tserver/pg_client_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <future>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>

#include "yb/client/client_fwd.h"

Expand Down Expand Up @@ -135,5 +137,48 @@ class PgClientServiceImpl : public PgClientServiceIf {
std::unique_ptr<Impl> impl_;
};

#define YB_PG_CLIENT_MOCKABLE_METHODS \
(Perform) \
YB_PG_CLIENT_METHODS \
YB_PG_CLIENT_ASYNC_METHODS \
/**/

// PgClientServiceMockImpl implements the PgClientService interface to allow for mocking of tserver
// responses in MiniCluster tests. This implementation defaults to forwarding calls to
// PgClientServiceImpl if a suitable mock is not available. Usage of this implementation can be
// toggled via the test tserver gflag 'FLAGS_TEST_enable_pg_client_mock'.
class PgClientServiceMockImpl : public PgClientServiceIf {
public:
using Functor = std::function<Status(const void*, void*, rpc::RpcContext*)>;
using SharedFunctor = std::shared_ptr<Functor>;

PgClientServiceMockImpl(const scoped_refptr<MetricEntity>& entity, PgClientServiceIf* impl);

class Handle {
explicit Handle(SharedFunctor&& mock) : mock_(std::move(mock)) {}
SharedFunctor mock_;

friend class PgClientServiceMockImpl;
};

#define YB_PG_CLIENT_MOCK_METHOD_SETTER_DECLARE(r, data, method) \
[[nodiscard]] Handle BOOST_PP_CAT(Mock, method)( \
const std::function<Status( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)*, \
BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB)*, rpc::RpcContext*)>& mock);

BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_METHOD_DECLARE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_MOCK_METHOD_SETTER_DECLARE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);

private:
PgClientServiceIf* impl_;
std::unordered_map<std::string, SharedFunctor::weak_type> mocks_;
rw_spinlock mutex_;

Result<bool> DispatchMock(
const std::string& method, const void* req, void* resp, rpc::RpcContext* context);
Handle SetMock(const std::string& method, SharedFunctor&& mock);
};

} // namespace tserver
} // namespace yb
38 changes: 27 additions & 11 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ DEFINE_RUNTIME_uint32(ysql_min_new_version_ignored_count, 10,

DECLARE_bool(enable_pg_cron);

DEFINE_test_flag(bool, enable_pg_client_mock, false, "Enable mocking of PgClient service in tests");

namespace yb::tserver {

namespace {
Expand Down Expand Up @@ -615,13 +617,25 @@ Status TabletServer::RegisterServices() {
remote_bootstrap_service.get();
RETURN_NOT_OK(RegisterService(
FLAGS_ts_remote_bootstrap_svc_queue_length, std::move(remote_bootstrap_service)));
auto pg_client_service = std::make_shared<PgClientServiceImpl>(
*this, tablet_manager_->client_future(), clock(),
std::bind(&TabletServer::TransactionPool, this), mem_tracker(), metric_entity(), messenger(),
permanent_uuid(), &options(), xcluster_context_.get(), &pg_node_level_mutation_counter_);
pg_client_service_ = pg_client_service;
LOG(INFO) << "yb::tserver::PgClientServiceImpl created at " << pg_client_service.get();
RETURN_NOT_OK(RegisterService(FLAGS_pg_client_svc_queue_length, std::move(pg_client_service)));

auto pg_client_service_holder = std::make_shared<PgClientServiceHolder>(
*this, tablet_manager_->client_future(), clock(),
std::bind(&TabletServer::TransactionPool, this), mem_tracker(), metric_entity(),
messenger(), permanent_uuid(), &options(), xcluster_context_.get(),
&pg_node_level_mutation_counter_);
PgClientServiceIf* pg_client_service_if = &pg_client_service_holder->impl;
LOG(INFO) << "yb::tserver::PgClientServiceImpl created at " << pg_client_service_if;

if (PREDICT_FALSE(FLAGS_TEST_enable_pg_client_mock)) {
pg_client_service_holder->mock.emplace(metric_entity(), pg_client_service_if);
pg_client_service_if = &pg_client_service_holder->mock.value();
LOG(INFO) << "Mock created for yb::tserver::PgClientServiceImpl";
}

pg_client_service_ = pg_client_service_holder;
RETURN_NOT_OK(RegisterService(
FLAGS_pg_client_svc_queue_length, std::shared_ptr<PgClientServiceIf>(
std::move(pg_client_service_holder), pg_client_service_if)));

if (FLAGS_TEST_echo_service_enabled) {
auto test_echo_service = std::make_unique<stateful_service::TestEchoService>(
Expand Down Expand Up @@ -1220,10 +1234,12 @@ Status TabletServer::ListMasterServers(const ListMasterServersRequestPB* req,

void TabletServer::InvalidatePgTableCache() {
auto pg_client_service = pg_client_service_.lock();
if (pg_client_service) {
LOG(INFO) << "Invalidating all PgTableCache caches since catalog version incremented";
pg_client_service->InvalidateTableCache();
if (!pg_client_service) {
return;
}

LOG(INFO) << "Invalidating the entire PgTableCache cache since catalog version incremented";
pg_client_service->impl.InvalidateTableCache();
}

void TabletServer::InvalidatePgTableCache(
Expand All @@ -1239,7 +1255,7 @@ void TabletServer::InvalidatePgTableCache(
msg += Format("databases $0 are removed", yb::ToString(db_oids_deleted));
}
LOG(INFO) << msg;
pg_client_service->InvalidateTableCache(db_oids_updated, db_oids_deleted);
pg_client_service->impl.InvalidateTableCache(db_oids_updated, db_oids_deleted);
}
}
Status TabletServer::SetupMessengerBuilder(rpc::MessengerBuilder* builder) {
Expand Down
19 changes: 17 additions & 2 deletions src/yb/tserver/tablet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "yb/master/master_heartbeat.pb.h"
#include "yb/server/webserver_options.h"
#include "yb/tserver/db_server_base.h"
#include "yb/tserver/pg_client_service.h"
#include "yb/tserver/pg_mutation_counter.h"
#include "yb/tserver/remote_bootstrap_service.h"
#include "yb/tserver/tserver_shared_mem.h"
Expand Down Expand Up @@ -337,7 +338,13 @@ class TabletServer : public DbServerBase, public TabletServerIf {
std::string GetCertificateDetails() override;

PgClientServiceImpl* TEST_GetPgClientService() {
return pg_client_service_.lock().get();
auto holder = pg_client_service_.lock();
return holder ? &holder->impl : nullptr;
}

PgClientServiceMockImpl* TEST_GetPgClientServiceMock() {
auto holder = pg_client_service_.lock();
return holder && holder->mock.has_value() ? &holder->mock.value() : nullptr;
}

RemoteBootstrapServiceImpl* GetRemoteBootstrapService() {
Expand Down Expand Up @@ -366,6 +373,14 @@ class TabletServer : public DbServerBase, public TabletServerIf {

void TEST_SetIsCronLeader(bool is_cron_leader);

struct PgClientServiceHolder {
template<class... Args>
explicit PgClientServiceHolder(Args&&... args) : impl(std::forward<Args>(args)...) {}

PgClientServiceImpl impl;
std::optional<PgClientServiceMockImpl> mock;
};

protected:
virtual Status RegisterServices();

Expand Down Expand Up @@ -469,7 +484,7 @@ class TabletServer : public DbServerBase, public TabletServerIf {

// An instance to pg client service. This pointer is no longer valid after RpcAndWebServerBase
// is shut down.
std::weak_ptr<PgClientServiceImpl> pg_client_service_;
std::weak_ptr<PgClientServiceHolder> pg_client_service_;

// Key to shared memory for ysql connection manager stats
key_t ysql_conn_mgr_stats_shmem_key_ = 0;
Expand Down
55 changes: 55 additions & 0 deletions src/yb/yql/pgwrapper/pg_mini-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
#include "yb/util/test_thread_holder.h"
#include "yb/util/tsan_util.h"

#include "yb/rpc/rpc_context.h"

#include "yb/yql/pggate/pggate_flags.h"

#include "yb/yql/pgwrapper/pg_mini_test_base.h"
Expand All @@ -79,6 +81,7 @@ DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(enable_wait_queues);
DECLARE_bool(pg_client_use_shared_memory);
DECLARE_bool(ysql_yb_enable_replica_identity);
DECLARE_bool(TEST_enable_pg_client_mock);

DECLARE_double(TEST_respond_write_failed_probability);
DECLARE_double(TEST_transaction_ignore_applying_probability);
Expand Down Expand Up @@ -2232,4 +2235,56 @@ TEST_F_EX(PgMiniTest, DISABLED_ReadsDuringRBS, PgMiniStreamCompressionTest) {
thread_holder.Stop();
}

Status MockAbortFailure(
const yb::tserver::PgFinishTransactionRequestPB* req,
yb::tserver::PgFinishTransactionResponsePB* resp, yb::rpc::RpcContext* context) {
LOG(INFO) << "FinishTransaction called for session: " << req->session_id();

if (req->session_id() == 1) {
context->CloseConnection();
// The return status should not matter here.
return Status::OK();
} else if (req->session_id() == 2) {
return STATUS(NetworkError, "Mocking network failure on FinishTransaction");
}

return Status::OK();
}

class PgRecursiveAbortTest : public PgMiniTestSingleNode {
public:
void SetUp() override {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_enable_pg_client_mock) = true;
PgMiniTest::SetUp();
}

template <class F>
tserver::PgClientServiceMockImpl::Handle MockFinishTransaction(const F& mock) {
auto* client = cluster_->mini_tablet_server(0)->server()->TEST_GetPgClientServiceMock();
return client->MockFinishTransaction(mock);
}
};

TEST_F(PgRecursiveAbortTest, AbortOnTserverFailure) {
PGConn conn1 = ASSERT_RESULT(Connect());
ASSERT_OK(conn1.Execute("CREATE TABLE t1 (k INT)"));

// Validate that "connection refused" from tserver during a transaction does not produce a PANIC.
ASSERT_OK(conn1.StartTransaction(SNAPSHOT_ISOLATION));
// Run a command to ensure that the transaction is created in the backend.
ASSERT_OK(conn1.Execute("INSERT INTO t1 VALUES (1)"));
auto handle = MockFinishTransaction(MockAbortFailure);
auto status = conn1.Execute("CREATE TABLE t2 (k INT)");
ASSERT_TRUE(status.IsNetworkError());
ASSERT_EQ(conn1.ConnStatus(), CONNECTION_BAD);

// Validate that aborting a transaction does not produce a PANIC.
PGConn conn2 = ASSERT_RESULT(Connect());
ASSERT_OK(conn2.StartTransaction(SNAPSHOT_ISOLATION));
ASSERT_OK(conn2.Execute("INSERT INTO t1 VALUES (1)"));
status = conn2.Execute("ABORT");
ASSERT_TRUE(status.IsNetworkError());
ASSERT_EQ(conn1.ConnStatus(), CONNECTION_BAD);
}

} // namespace yb::pgwrapper

0 comments on commit f8e73e9

Please sign in to comment.