Skip to content

Commit

Permalink
[BACKPORT 2024.1][#18192] YSQL: Introduce interface to mock tserver r…
Browse files Browse the repository at this point in the history
…esponse in MiniCluster tests

Summary:
Original commit: f8e73e9 / D34698
This revision introduces the ability to mock tserver responses to pggate RPCs in pg_client_service.
The goal is to be able to test hard-to-reproduce failure modes between pggate and the tserver deterministically by adding mocks.
As an example, it is now possible to emulate scenarios such as "Introduce network failure for FinishTransaction RPCs in Session X after successful completion of CreateTable RPC" which would
previously have required tinkering with a lot of gflags and concurrency constructs.

 All RPCs in `src/yb/tserver/pg_client.proto` are now mock-able.
Jira: DB-7215

Test Plan:
Run the following sample test:
```
./yb_build.sh --cxx-test pgwrapper_pg_mini-test --gtest-filter 'PgRecursiveAbortTest.AbortAfterTserverShutdown'
```

Reviewers: dmitry, pjain

Reviewed By: dmitry

Subscribers: yql, smishra, pjain, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36278
  • Loading branch information
karthik-ramanathan-3006 committed Jul 17, 2024
1 parent e16bea8 commit 3534847
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 13 deletions.
63 changes: 63 additions & 0 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1956,4 +1956,67 @@ void PgClientServiceImpl::method( \
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_METHOD_DEFINE, ~, YB_PG_CLIENT_METHODS);
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_ASYNC_METHOD_DEFINE, ~, YB_PG_CLIENT_ASYNC_METHODS);

PgClientServiceMockImpl::PgClientServiceMockImpl(
const scoped_refptr<MetricEntity>& entity, PgClientServiceIf* impl)
: PgClientServiceIf(entity), impl_(impl) {}

PgClientServiceMockImpl::Handle PgClientServiceMockImpl::SetMock(
const std::string& method, SharedFunctor&& mock) {
{
std::lock_guard lock(mutex_);
mocks_[method] = mock;
}

return Handle{std::move(mock)};
}

Result<bool> PgClientServiceMockImpl::DispatchMock(
const std::string& method, const void* req, void* resp, rpc::RpcContext* context) {
SharedFunctor mock;
{
SharedLock lock(mutex_);
auto it = mocks_.find(method);
if (it != mocks_.end()) {
mock = it->second.lock();
}
}

if (!mock) {
return false;
}
RETURN_NOT_OK((*mock)(req, resp, context));
return true;
}

#define YB_PG_CLIENT_MOCK_METHOD_DEFINE(r, data, method) \
void PgClientServiceMockImpl::method( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB) * req, \
BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) * resp, rpc::RpcContext context) { \
auto result = DispatchMock(BOOST_PP_STRINGIZE(method), req, resp, &context); \
if (!result.ok() || *result) { \
Respond(ResultToStatus(result), resp, &context); \
return; \
} \
impl_->method(req, resp, std::move(context)); \
}

template <class Req, class Resp>
auto MakeSharedFunctor(const std::function<Status(const Req*, Resp*, rpc::RpcContext*)>& func) {
return std::make_shared<PgClientServiceMockImpl::Functor>(
[func](const void* req, void* resp, rpc::RpcContext* context) {
return func(pointer_cast<const Req*>(req), pointer_cast<Resp*>(resp), context);
});
}

#define YB_PG_CLIENT_MOCK_METHOD_SETTER_DEFINE(r, data, method) \
PgClientServiceMockImpl::Handle BOOST_PP_CAT(PgClientServiceMockImpl::Mock, method)( \
const std::function<Status( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)*, \
BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB)*, rpc::RpcContext*)>& mock) { \
return SetMock(BOOST_PP_STRINGIZE(method), MakeSharedFunctor(mock)); \
}

BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_MOCK_METHOD_DEFINE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_MOCK_METHOD_SETTER_DEFINE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);

} // namespace yb::tserver
45 changes: 45 additions & 0 deletions src/yb/tserver/pg_client_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <future>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>

#include "yb/client/client_fwd.h"

Expand Down Expand Up @@ -135,5 +137,48 @@ class PgClientServiceImpl : public PgClientServiceIf {
std::unique_ptr<Impl> impl_;
};

#define YB_PG_CLIENT_MOCKABLE_METHODS \
(Perform) \
YB_PG_CLIENT_METHODS \
YB_PG_CLIENT_ASYNC_METHODS \
/**/

// PgClientServiceMockImpl implements the PgClientService interface to allow for mocking of tserver
// responses in MiniCluster tests. This implementation defaults to forwarding calls to
// PgClientServiceImpl if a suitable mock is not available. Usage of this implementation can be
// toggled via the test tserver gflag 'FLAGS_TEST_enable_pg_client_mock'.
class PgClientServiceMockImpl : public PgClientServiceIf {
public:
using Functor = std::function<Status(const void*, void*, rpc::RpcContext*)>;
using SharedFunctor = std::shared_ptr<Functor>;

PgClientServiceMockImpl(const scoped_refptr<MetricEntity>& entity, PgClientServiceIf* impl);

class Handle {
explicit Handle(SharedFunctor&& mock) : mock_(std::move(mock)) {}
SharedFunctor mock_;

friend class PgClientServiceMockImpl;
};

#define YB_PG_CLIENT_MOCK_METHOD_SETTER_DECLARE(r, data, method) \
[[nodiscard]] Handle BOOST_PP_CAT(Mock, method)( \
const std::function<Status( \
const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)*, \
BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB)*, rpc::RpcContext*)>& mock);

BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_METHOD_DECLARE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_MOCK_METHOD_SETTER_DECLARE, ~, YB_PG_CLIENT_MOCKABLE_METHODS);

private:
PgClientServiceIf* impl_;
std::unordered_map<std::string, SharedFunctor::weak_type> mocks_;
rw_spinlock mutex_;

Result<bool> DispatchMock(
const std::string& method, const void* req, void* resp, rpc::RpcContext* context);
Handle SetMock(const std::string& method, SharedFunctor&& mock);
};

} // namespace tserver
} // namespace yb
38 changes: 27 additions & 11 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ DEFINE_RUNTIME_uint32(ysql_min_new_version_ignored_count, 10,
"Minimum consecutive number of times that a tserver is allowed to ignore an older catalog "
"version that is retrieved from a tserver-master heartbeat response.");

DEFINE_test_flag(bool, enable_pg_client_mock, false, "Enable mocking of PgClient service in tests");

namespace yb::tserver {

namespace {
Expand Down Expand Up @@ -643,13 +645,25 @@ Status TabletServer::RegisterServices() {
remote_bootstrap_service.get();
RETURN_NOT_OK(RegisterService(
FLAGS_ts_remote_bootstrap_svc_queue_length, std::move(remote_bootstrap_service)));
auto pg_client_service = std::make_shared<PgClientServiceImpl>(
*this, tablet_manager_->client_future(), clock(),
std::bind(&TabletServer::TransactionPool, this), mem_tracker(), metric_entity(), messenger(),
permanent_uuid(), &options(), xcluster_context_.get(), &pg_node_level_mutation_counter_);
pg_client_service_ = pg_client_service;
LOG(INFO) << "yb::tserver::PgClientServiceImpl created at " << pg_client_service.get();
RETURN_NOT_OK(RegisterService(FLAGS_pg_client_svc_queue_length, std::move(pg_client_service)));

auto pg_client_service_holder = std::make_shared<PgClientServiceHolder>(
*this, tablet_manager_->client_future(), clock(),
std::bind(&TabletServer::TransactionPool, this), mem_tracker(), metric_entity(),
messenger(), permanent_uuid(), &options(), xcluster_context_.get(),
&pg_node_level_mutation_counter_);
PgClientServiceIf* pg_client_service_if = &pg_client_service_holder->impl;
LOG(INFO) << "yb::tserver::PgClientServiceImpl created at " << pg_client_service_if;

if (PREDICT_FALSE(FLAGS_TEST_enable_pg_client_mock)) {
pg_client_service_holder->mock.emplace(metric_entity(), pg_client_service_if);
pg_client_service_if = &pg_client_service_holder->mock.value();
LOG(INFO) << "Mock created for yb::tserver::PgClientServiceImpl";
}

pg_client_service_ = pg_client_service_holder;
RETURN_NOT_OK(RegisterService(
FLAGS_pg_client_svc_queue_length, std::shared_ptr<PgClientServiceIf>(
std::move(pg_client_service_holder), pg_client_service_if)));

if (FLAGS_TEST_echo_service_enabled) {
auto test_echo_service = std::make_unique<stateful_service::TestEchoService>(
Expand Down Expand Up @@ -1236,10 +1250,12 @@ Status TabletServer::ListMasterServers(const ListMasterServersRequestPB* req,

void TabletServer::InvalidatePgTableCache() {
auto pg_client_service = pg_client_service_.lock();
if (pg_client_service) {
LOG(INFO) << "Invalidating all PgTableCache caches since catalog version incremented";
pg_client_service->InvalidateTableCache();
if (!pg_client_service) {
return;
}

LOG(INFO) << "Invalidating the entire PgTableCache cache since catalog version incremented";
pg_client_service->impl.InvalidateTableCache();
}

void TabletServer::InvalidatePgTableCache(
Expand All @@ -1255,7 +1271,7 @@ void TabletServer::InvalidatePgTableCache(
msg += Format("databases $0 are removed", yb::ToString(db_oids_deleted));
}
LOG(INFO) << msg;
pg_client_service->InvalidateTableCache(db_oids_updated, db_oids_deleted);
pg_client_service->impl.InvalidateTableCache(db_oids_updated, db_oids_deleted);
}
}
Status TabletServer::SetupMessengerBuilder(rpc::MessengerBuilder* builder) {
Expand Down
19 changes: 17 additions & 2 deletions src/yb/tserver/tablet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "yb/master/master_heartbeat.pb.h"
#include "yb/server/webserver_options.h"
#include "yb/tserver/db_server_base.h"
#include "yb/tserver/pg_client_service.h"
#include "yb/tserver/pg_mutation_counter.h"
#include "yb/tserver/remote_bootstrap_service.h"
#include "yb/tserver/tserver_shared_mem.h"
Expand Down Expand Up @@ -329,7 +330,13 @@ class TabletServer : public DbServerBase, public TabletServerIf {
std::string GetCertificateDetails() override;

PgClientServiceImpl* TEST_GetPgClientService() {
return pg_client_service_.lock().get();
auto holder = pg_client_service_.lock();
return holder ? &holder->impl : nullptr;
}

PgClientServiceMockImpl* TEST_GetPgClientServiceMock() {
auto holder = pg_client_service_.lock();
return holder && holder->mock.has_value() ? &holder->mock.value() : nullptr;
}

RemoteBootstrapServiceImpl* GetRemoteBootstrapService() {
Expand All @@ -356,6 +363,14 @@ class TabletServer : public DbServerBase, public TabletServerIf {

Result<std::vector<tablet::TabletStatusPB>> GetLocalTabletsMetadata() const override;

struct PgClientServiceHolder {
template<class... Args>
explicit PgClientServiceHolder(Args&&... args) : impl(std::forward<Args>(args)...) {}

PgClientServiceImpl impl;
std::optional<PgClientServiceMockImpl> mock;
};

protected:
virtual Status RegisterServices();

Expand Down Expand Up @@ -457,7 +472,7 @@ class TabletServer : public DbServerBase, public TabletServerIf {

// An instance to pg client service. This pointer is no longer valid after RpcAndWebServerBase
// is shut down.
std::weak_ptr<PgClientServiceImpl> pg_client_service_;
std::weak_ptr<PgClientServiceHolder> pg_client_service_;

// Key to shared memory for ysql connection manager stats
key_t ysql_conn_mgr_stats_shmem_key_ = 0;
Expand Down
55 changes: 55 additions & 0 deletions src/yb/yql/pgwrapper/pg_mini-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#include "yb/util/test_thread_holder.h"
#include "yb/util/tsan_util.h"

#include "yb/rpc/rpc_context.h"

#include "yb/yql/pggate/pggate_flags.h"

#include "yb/yql/pgwrapper/pg_mini_test_base.h"
Expand All @@ -77,6 +79,7 @@ DECLARE_bool(enable_tracing);
DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(enable_wait_queues);
DECLARE_bool(ysql_yb_enable_replica_identity);
DECLARE_bool(TEST_enable_pg_client_mock);

DECLARE_double(TEST_respond_write_failed_probability);
DECLARE_double(TEST_transaction_ignore_applying_probability);
Expand Down Expand Up @@ -2123,4 +2126,56 @@ TEST_F(PgMiniTest, BloomFilterBackwardScanTest) {
ASSERT_EQ(after_blooms_checked, before_blooms_checked + 1);
}

Status MockAbortFailure(
const yb::tserver::PgFinishTransactionRequestPB* req,
yb::tserver::PgFinishTransactionResponsePB* resp, yb::rpc::RpcContext* context) {
LOG(INFO) << "FinishTransaction called for session: " << req->session_id();

if (req->session_id() == 1) {
context->CloseConnection();
// The return status should not matter here.
return Status::OK();
} else if (req->session_id() == 2) {
return STATUS(NetworkError, "Mocking network failure on FinishTransaction");
}

return Status::OK();
}

class PgRecursiveAbortTest : public PgMiniTestSingleNode {
public:
void SetUp() override {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_enable_pg_client_mock) = true;
PgMiniTest::SetUp();
}

template <class F>
tserver::PgClientServiceMockImpl::Handle MockFinishTransaction(const F& mock) {
auto* client = cluster_->mini_tablet_server(0)->server()->TEST_GetPgClientServiceMock();
return client->MockFinishTransaction(mock);
}
};

TEST_F(PgRecursiveAbortTest, AbortOnTserverFailure) {
PGConn conn1 = ASSERT_RESULT(Connect());
ASSERT_OK(conn1.Execute("CREATE TABLE t1 (k INT)"));

// Validate that "connection refused" from tserver during a transaction does not produce a PANIC.
ASSERT_OK(conn1.StartTransaction(SNAPSHOT_ISOLATION));
// Run a command to ensure that the transaction is created in the backend.
ASSERT_OK(conn1.Execute("INSERT INTO t1 VALUES (1)"));
auto handle = MockFinishTransaction(MockAbortFailure);
auto status = conn1.Execute("CREATE TABLE t2 (k INT)");
ASSERT_TRUE(status.IsNetworkError());
ASSERT_EQ(conn1.ConnStatus(), CONNECTION_BAD);

// Validate that aborting a transaction does not produce a PANIC.
PGConn conn2 = ASSERT_RESULT(Connect());
ASSERT_OK(conn2.StartTransaction(SNAPSHOT_ISOLATION));
ASSERT_OK(conn2.Execute("INSERT INTO t1 VALUES (1)"));
status = conn2.Execute("ABORT");
ASSERT_TRUE(status.IsNetworkError());
ASSERT_EQ(conn1.ConnStatus(), CONNECTION_BAD);
}

} // namespace yb::pgwrapper

0 comments on commit 3534847

Please sign in to comment.