Skip to content

Commit

Permalink
[yugabyte#17904] docdb: Prevent tserver heartbeats to master leader i…
Browse files Browse the repository at this point in the history
…n a different universe

Summary:
Currently, if a tserver heartbeats to a master leader in a different universe, it can successfully register even though it is part of a different universe. This can happen, for example, if tserver's `--tserver_master_addrs` are incorrectly set or if a master is wiped and added to a new cluster but not properly removed from the existing cluster. This can result in data loss scenarios, as tasks will be triggered to clean up orphaned tablets on these tservers.

We introduce new cluster config field `universe_uuid` that is only generated by the master leader (as opposed to cluster_uuid which can be passed in as a flag). The master leader will set `universe_uuid` on the VisitSysCatalog path (newly elected leader), and set `universe_uuid` in the cluster config if not already set.

We also add a similarly named field `universe_uuid` to the tserver instance metadata, indicating which cluster this tserver belongs to.

On the heartbeat path, the tserver sets the `universe_uuid` in the request if it is set in its instance metadata. Otherwise, it is left unset. Master leader checks the value of this passed in uuid against whats in the cluster config. Here are the scenarios:
1. If master's uuid is unset, return a TryAgain error to the tserver to retry until the uuid is set.
2. If both are set but mismatch, then fail since tserver is heartbeating to the wrong cluster.
3. If master is set but tserver is unset, then return the uuid to the tserver so it can set state in the instance metadata. The master heartbeat path will now wait for tserver to set universe_uuid before preceding with any logic.

####Upgrade Implications

This feature is gated by a kLocalPersisted autoflag `master_enable_universe_uuid_heartbeat_check = true`. When this flag is enabled, master both enables the uuid check on heartbeat, and sets the `universe_uuid` as part of catalog manager bg tasks. We need an auto flag here to guard against the following situation:
1. Master leader M1 on newer version replicates `universe_uuid` to followers on older version.
2. Older version master M2 becomes leader.
3. M2 replicates a cluster config change. `universe_uuid` is unset.

####Backport Plan

We want to backport this change down to 2.14 line. Due to the usage of autoflags we require a different backport plan for each line:

2.18+: Autoflags exist with autopromotion in YBA, so we will backport the change as is.
2.16: Autoflags exist, but there is no autopromotion in YBA. We will backport the change as is, but the user will have to manually promote this flag post-upgrade.
2.14: Autoflags do not exist at all. We will need to backport a change that just uses a gflag set to false. After upgrade, the user will have to manually set this flag to true.

Jira: DB-6983

Test Plan:
ybd debug --cxx-test master-test --gtest_filter *UniverseUuid*
ybd debug --cxx-test master_heartbeat-itest --gtest_filter *PreventHeartbeatWrongCluster*

Reviewers: hsunder, asrivastava, zdrudi

Reviewed By: hsunder, asrivastava, zdrudi

Subscribers: ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D27858
  • Loading branch information
rahuldesirazu committed Sep 27, 2023
1 parent e0f08da commit fb98e56
Show file tree
Hide file tree
Showing 17 changed files with 448 additions and 22 deletions.
11 changes: 11 additions & 0 deletions src/yb/fs/fs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,20 @@ message InstanceMetadataPB {
// to indicate whether we want to run initdb.
optional bool initdb_done_set_after_sys_catalog_restore = 3;

// Tserver specific metadata.
optional TserverInstanceMetadataPB tserver_instance_metadata = 4;

// TODO: add a "node type" (TS/Master?)
}

message TserverInstanceMetadataPB {
// The universe_uuid received from the master leader on the first heartbeat after creation
// or an upgrade. This is the identifier for the cluster this server belongs to. It is passed
// in every heartbeat request and compared with the value on master leader to ensure a match.
// Only set on the tserver.
optional string universe_uuid = 1;
}

// Describes a collection of filesystem path instances and the membership of a
// particular instance in the collection.
//
Expand Down
37 changes: 37 additions & 0 deletions src/yb/fs/fs_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@
#include "yb/util/env_util.h"
#include "yb/util/flags.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
#include "yb/util/metric_entity.h"
#include "yb/util/net/net_util.h"
#include "yb/util/oid_generator.h"
#include "yb/util/path_util.h"
#include "yb/util/pb_util.h"
#include "yb/util/result.h"
#include "yb/util/string_util.h"

DEFINE_UNKNOWN_bool(enable_data_block_fsync, true,
"Whether to enable fsync() of data blocks, metadata, and their parent directories. "
Expand Down Expand Up @@ -115,6 +117,7 @@ const char *FsManager::kWalsRecoveryDirSuffix = ".recovery";
const char *FsManager::kRocksDBDirName = "rocksdb";
const char *FsManager::kDataDirName = "data";

YB_STRONGLY_TYPED_UUID_IMPL(UniverseUuid);
namespace {

const char kRaftGroupMetadataDirName[] = "tablet-meta";
Expand Down Expand Up @@ -327,6 +330,22 @@ std::string FsManager::GetAutoFlagsConfigPath() const {
return auto_flags_config_path_;
}

Result<std::string> FsManager::GetUniverseUuidFromTserverInstanceMetadata() const {
std::lock_guard lock(metadata_mutex_);
SCHECK_NOTNULL(metadata_);
return metadata_->tserver_instance_metadata().universe_uuid();
}

Status FsManager::SetUniverseUuidOnTserverInstanceMetadata(
const UniverseUuid& universe_uuid) {
std::lock_guard lock(metadata_mutex_);
SCHECK_NOTNULL(metadata_);
metadata_->mutable_tserver_instance_metadata()->set_universe_uuid(universe_uuid.ToString());
auto instance_metadata_path = VERIFY_RESULT(GetExistingInstanceMetadataPath());
return pb_util::WritePBContainerToPath(
env_, instance_metadata_path, *metadata_.get(), pb_util::OVERWRITE, pb_util::SYNC);
}

Status FsManager::CheckAndOpenFileSystemRoots() {
RETURN_NOT_OK(Init());

Expand All @@ -335,6 +354,10 @@ Status FsManager::CheckAndOpenFileSystemRoots() {
}

bool create_roots = false;

// Currently, this path is only called on Init and does not race with any other threads trying
// to access metadata_. To future proof this however, we will still obtain a lock.
std::lock_guard lock(metadata_mutex_);
for (const string& root : canonicalized_all_fs_roots_) {
auto pb = std::make_unique<InstanceMetadataPB>();
auto read_result = pb_util::ReadPBContainerFromPath(env_, GetInstanceMetadataPath(root),
Expand Down Expand Up @@ -670,10 +693,12 @@ Status FsManager::CreateDirIfMissingAndSync(const std::string& path, bool* creat
}

const string& FsManager::uuid() const {
std::lock_guard lock(metadata_mutex_);
return CHECK_NOTNULL(metadata_.get())->uuid();
}

bool FsManager::initdb_done_set_after_sys_catalog_restore() const {
std::lock_guard lock(metadata_mutex_);
return CHECK_NOTNULL(metadata_.get())->initdb_done_set_after_sys_catalog_restore();
}

Expand Down Expand Up @@ -782,6 +807,18 @@ Result<std::vector<std::string>> FsManager::ListTabletIds() {
return tablet_ids;
}

Result<std::string> FsManager::GetExistingInstanceMetadataPath() const {
for (const string& root : canonicalized_all_fs_roots_) {
auto instance_metadata_path = GetInstanceMetadataPath(root);
if (env_->FileExists(GetInstanceMetadataPath(root))) {
return instance_metadata_path;
}
}
return STATUS(IllegalState,
Format("No instance metadata found in root dirs $0",
RangeToString(canonicalized_all_fs_roots_.begin(), canonicalized_all_fs_roots_.end())));
}

std::string FsManager::GetInstanceMetadataPath(const string& root) const {
return JoinPathSegments(GetServerTypeDataPath(root, server_type_), kInstanceMetadataFileName);
}
Expand Down
12 changes: 11 additions & 1 deletion src/yb/fs/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "yb/util/metrics.h"
#include "yb/util/path_util.h"
#include "yb/util/strongly_typed_bool.h"
#include "yb/util/strongly_typed_uuid.h"

DECLARE_bool(enable_data_block_fsync);

Expand All @@ -67,6 +68,8 @@ class ExternalMiniClusterFsInspector;
class InstanceMetadataPB;

YB_STRONGLY_TYPED_BOOL(ShouldDeleteLogs);
YB_STRONGLY_TYPED_UUID_DECL(UniverseUuid);


struct FsManagerOpts {
FsManagerOpts();
Expand Down Expand Up @@ -196,6 +199,10 @@ class FsManager {
// Return the tablet IDs in the metadata directory.
Result<std::vector<std::string>> ListTabletIds();

Result<std::string> GetUniverseUuidFromTserverInstanceMetadata() const;

Status SetUniverseUuidOnTserverInstanceMetadata(const UniverseUuid& universe_uuid);

// Return the path where InstanceMetadataPB is stored.
std::string GetInstanceMetadataPath(const std::string& root) const;

Expand Down Expand Up @@ -289,6 +296,8 @@ class FsManager {
const std::string& path,
const std::vector<std::string>& objects);

Result<std::string> GetExistingInstanceMetadataPath() const;

Env *env_;

// Set on the TabletServer::Init path.
Expand Down Expand Up @@ -322,7 +331,8 @@ class FsManager {
mutable std::mutex auto_flag_mutex_;
std::string auto_flags_config_path_ GUARDED_BY(auto_flag_mutex_);

std::unique_ptr<InstanceMetadataPB> metadata_;
std::unique_ptr<InstanceMetadataPB> metadata_ GUARDED_BY(metadata_mutex_);
mutable std::mutex metadata_mutex_;

// Keep references to counters, counters without reference will be retired.
std::vector<scoped_refptr<Counter>> counters_;
Expand Down
6 changes: 6 additions & 0 deletions src/yb/integration-tests/create-table-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,15 @@ TEST_F(CreateTableStressTest, TestHeartbeatDeadline) {
ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
FLAGS_num_test_tablets, &resp));

master::SysClusterConfigEntryPB config;
ASSERT_OK(cluster_->mini_master()->catalog_manager().GetClusterConfig(&config));
auto universe_uuid = config.universe_uuid();

// Grab TS#1 and Generate a Full Report for it.
auto ts_server = cluster_->mini_tablet_server(0)->server();
master::TSHeartbeatRequestPB hb_req;
hb_req.mutable_common()->mutable_ts_instance()->CopyFrom(ts_server->instance_pb());
hb_req.set_universe_uuid(universe_uuid);
ts_server->tablet_manager()->StartFullTabletReport(hb_req.mutable_tablet_report());
ASSERT_GT(hb_req.tablet_report().updated_tablets_size(),
FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms);
Expand All @@ -478,6 +483,7 @@ TEST_F(CreateTableStressTest, TestHeartbeatDeadline) {
master::TSHeartbeatResponsePB hb_resp;
hb_req.mutable_tablet_report()->set_is_incremental(true);
hb_req.mutable_tablet_report()->set_sequence_number(1);
hb_req.set_universe_uuid(universe_uuid);
Status heartbeat_status;
// Regression testbed often has stalls at this timing granularity. Allow a couple hiccups.
for (int tries = 0; tries < 3; ++tries) {
Expand Down
27 changes: 27 additions & 0 deletions src/yb/integration-tests/master_heartbeat-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "yb/master/catalog_manager_if.h"
#include "yb/master/catalog_entity_info.h"
#include "yb/master/master_cluster.proxy.h"
#include "yb/master/master_fwd.h"

#include "yb/rpc/messenger.h"
#include "yb/rpc/proxy.h"
Expand All @@ -44,6 +45,10 @@ DECLARE_bool(enable_load_balancing);
DECLARE_int32(heartbeat_interval_ms);
DECLARE_bool(TEST_pause_before_remote_bootstrap);
DECLARE_int32(committed_config_change_role_timeout_sec);
DECLARE_string(TEST_master_universe_uuid);
DECLARE_int32(TEST_mini_cluster_registration_wait_time_sec);
DECLARE_int32(tserver_unresponsive_timeout_ms);
DECLARE_bool(master_enable_universe_uuid_heartbeat_check);

namespace yb {

Expand All @@ -59,6 +64,28 @@ class MasterHeartbeatITest : public YBTableTestBase {
std::unique_ptr<rpc::ProxyCache> proxy_cache_;
};

TEST_F(MasterHeartbeatITest, PreventHeartbeatWrongCluster) {
// First ensure that if a tserver heartbeats to a different cluster, heartbeats fail and
// eventually, master marks servers as dead. Mock a different cluster by setting the flag
// TEST_master_universe_uuid.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_master_universe_uuid) = Uuid::Generate().ToString();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_tserver_unresponsive_timeout_ms) = 10 * 1000;
master::TSDescriptorVector ts_descs;
ASSERT_OK(mini_cluster_->WaitForTabletServerCount(0, &ts_descs, true /* live_only */));

// When the flag is unset, ensure that master leader can register tservers.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_master_universe_uuid) = "";
ASSERT_OK(mini_cluster_->WaitForTabletServerCount(3, &ts_descs, true /* live_only */));

// Ensure that state for universe_uuid is persisted across restarts.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_master_universe_uuid) = Uuid::Generate().ToString();
for (int i = 0; i < 3; i++) {
ASSERT_OK(mini_cluster_->mini_tablet_server(i)->Restart());
}
ASSERT_OK(mini_cluster_->WaitForTabletServerCount(0, &ts_descs, true /* live_only */));
}


TEST_F(MasterHeartbeatITest, IgnorePeerNotInConfig) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_load_balancing) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_before_remote_bootstrap) = true;
Expand Down
11 changes: 7 additions & 4 deletions src/yb/integration-tests/mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,18 +690,19 @@ Status MiniCluster::WaitForAllTabletServers() {

Status MiniCluster::WaitForTabletServerCount(size_t count) {
vector<shared_ptr<master::TSDescriptor> > descs;
return WaitForTabletServerCount(count, &descs);
return WaitForTabletServerCount(count, &descs, false);
}

Status MiniCluster::WaitForTabletServerCount(size_t count,
vector<shared_ptr<TSDescriptor> >* descs) {
vector<shared_ptr<TSDescriptor> >* descs,
bool live_only) {
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < FLAGS_TEST_mini_cluster_registration_wait_time_sec) {
auto leader = GetLeaderMiniMaster();
if (leader.ok()) {
(*leader)->ts_manager().GetAllDescriptors(descs);
if (descs->size() == count) {
if (live_only || descs->size() == count) {
// GetAllDescriptors() may return servers that are no longer online.
// Do a second step of verification to verify that the descs that we got
// are aligned (same uuid/seqno) with the TSs that we have in the cluster.
Expand All @@ -711,7 +712,9 @@ Status MiniCluster::WaitForTabletServerCount(size_t count,
auto ts = mini_tablet_server->server();
if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
match_count++;
if (!live_only || desc->IsLive()) {
match_count++;
}
break;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/yb/integration-tests/mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ class MiniCluster : public MiniClusterBase {
// count. Returns Status::TimedOut if the desired count is not achieved
// within kRegistrationWaitTimeSeconds.
Status WaitForTabletServerCount(size_t count);
Status WaitForTabletServerCount(
size_t count, std::vector<std::shared_ptr<master::TSDescriptor>>* descs);
Status WaitForTabletServerCount(size_t count,
std::vector<std::shared_ptr<master::TSDescriptor>>* descs,
bool live_only = false);

// Wait for all tablet servers to be registered. Returns Status::TimedOut if the desired count is
// not achieved within kRegistrationWaitTimeSeconds.
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/catalog_entity_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ message SysClusterConfigEntryPB {
optional BlacklistPB leader_blacklist = 7;
optional EncryptionInfoPB encryption_info = 5;
optional cdc.ConsumerRegistryPB consumer_registry = 6;
// This field is only generated internally by the master leader in PrepareDefaultClusterConfig (as
// opposed to cluster_uuid which can be passed in as a flag). If not already set, it is set on
// the VisitSysCatalog path.
optional string universe_uuid = 8;
}

message SysXClusterConfigEntryPB {
Expand Down
32 changes: 32 additions & 0 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
#include "yb/gutil/sysinfo.h"
#include "yb/gutil/walltime.h"

#include "yb/master/leader_epoch.h"
#include "yb/master/master_fwd.h"
#include "yb/master/auto_flags_orchestrator.h"
#include "yb/master/async_rpc_tasks.h"
Expand Down Expand Up @@ -597,6 +598,8 @@ DEFINE_test_flag(string, block_alter_table, "",
"\"alter_schema\" (blocks the schema from being altered) and \"completion\","
"(blocks the service completion of the alter table request)");

DECLARE_bool(master_enable_universe_uuid_heartbeat_check);

DECLARE_int32(heartbeat_interval_ms);

DEFINE_RUNTIME_bool(master_join_existing_universe, false,
Expand Down Expand Up @@ -1547,6 +1550,12 @@ Status CatalogManager::PrepareDefaultClusterConfig(int64_t term) {
LOG_WITH_PREFIX(INFO)
<< "Setting cluster UUID to " << config.cluster_uuid() << " " << cluster_uuid_source;

if (GetAtomicFlag(&FLAGS_master_enable_universe_uuid_heartbeat_check)) {
auto universe_uuid = Uuid::Generate().ToString();
LOG_WITH_PREFIX(INFO) << Format("Setting universe_uuid to $0 on new universe", universe_uuid);
config.set_universe_uuid(universe_uuid);
}

// Create in memory object.
cluster_config_ = std::make_shared<ClusterConfigInfo>();

Expand All @@ -1561,6 +1570,29 @@ Status CatalogManager::PrepareDefaultClusterConfig(int64_t term) {
return Status::OK();
}

Status CatalogManager::SetUniverseUuidIfNeeded(const LeaderEpoch& epoch) {
if (!GetAtomicFlag(&FLAGS_master_enable_universe_uuid_heartbeat_check)) {
return Status::OK();
}

auto cluster_config = ClusterConfig();
SCHECK(cluster_config, IllegalState, "Cluster config is not initialized");

auto l = cluster_config->LockForWrite();
if (!l.data().pb.universe_uuid().empty()) {
return Status::OK();
}

auto universe_uuid = Uuid::Generate().ToString();
LOG_WITH_PREFIX(INFO) << Format("Setting universe_uuid to $0 on existing universe",
universe_uuid);

l.mutable_data()->pb.set_universe_uuid(universe_uuid);
RETURN_NOT_OK(sys_catalog_->Upsert(epoch, cluster_config_.get()));
l.Commit();
return Status::OK();
}

Status CatalogManager::PrepareDefaultXClusterConfig(int64_t term) {
if (xcluster_config_) {
LOG_WITH_PREFIX(INFO)
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "yb/common/constants.h"
#include "yb/common/entity_ids.h"
#include "yb/master/leader_epoch.h"
#include "yb/master/restore_sys_catalog_state.h"
#include "yb/qlexpr/index.h"
#include "yb/dockv/partition.h"
Expand Down Expand Up @@ -1435,6 +1436,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf,

Status RunXClusterBgTasks();

Status SetUniverseUuidIfNeeded(const LeaderEpoch& epoch);

void StartCDCParentTabletDeletionTaskIfStopped();

void ScheduleCDCParentTabletDeletionTask();
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/catalog_manager_bg_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ void CatalogManagerBgTasks::Run() {
// Abort inactive YSQL BackendsCatalogVersionJob jobs.
catalog_manager_->master_->ysql_backends_manager()->AbortInactiveJobs();

// Set the universe_uuid field in the cluster config if not already set.
WARN_NOT_OK(catalog_manager_->SetUniverseUuidIfNeeded(l.epoch()),
"Failed SetUniverseUuidIfNeeded Task");

was_leader_ = true;
} else {
// leader_status is not ok.
Expand Down
Loading

0 comments on commit fb98e56

Please sign in to comment.