Skip to content

Commit

Permalink
[#25441] DocDB: Fix drive aware cluster balance
Browse files Browse the repository at this point in the history
Summary:
Cluster balancer tries to pick tablets for move in the manner to guarantee even disk load after move.
But move consists of 2 steps, on the first step we add tablet to a new replica.
On the second step we remove overreplicated tablet from some replica.
So there is no guarantee that this tablet will be removed from the replica where it recides on the most loaded disk.

Fixed by taking disk load into account when picking replica to remove tablet.

Also test LoadBalancerMiniClusterTest.CheckLoadBalanceDriveAware could fail because times out waiting for cluster balance.
It happens because we cannot balance leaders. Since have to wait for 20s before step down if protege already lost leader election.
Fixed by decreasing this time to 0s in this test.
Jira: DB-14682

Test Plan: ./yb_build.sh release -n 800 --cxx-test integration-tests_load_balancer_mini_cluster-test --gtest_filter LoadBalancerMiniClusterTest.CheckLoadBalanceDriveAware -- -p 8

Reviewers: zdrudi, asrivastava

Reviewed By: asrivastava

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40922
  • Loading branch information
spolitov committed Dec 28, 2024
1 parent 60fae6a commit b4cd95d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 95 deletions.
92 changes: 47 additions & 45 deletions src/yb/integration-tests/load_balancer_mini_cluster-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@ METRIC_DECLARE_event_stats(load_balancer_duration);
METRIC_DECLARE_gauge_uint32(tablets_in_wrong_placement);
METRIC_DECLARE_gauge_uint32(total_table_load_difference);

DECLARE_int32(catalog_manager_bg_task_wait_ms);
DECLARE_bool(TEST_fail_async_delete_replica_task);
DECLARE_bool(enable_load_balancing);
DECLARE_string(instance_uuid_override);
DECLARE_bool(load_balancer_drive_aware);
DECLARE_bool(tserver_heartbeat_metrics_add_drive_data);
DECLARE_int32(TEST_load_balancer_wait_after_count_pending_tasks_ms);
DECLARE_int32(TEST_load_balancer_wait_ms);
DECLARE_int32(TEST_slowdown_master_async_rpc_tasks_by_ms);
DECLARE_int32(catalog_manager_bg_task_wait_ms);
DECLARE_int32(load_balancer_max_concurrent_moves);
DECLARE_int32(min_leader_stepdown_retry_interval_ms);
DECLARE_int32(replication_factor);
DECLARE_int32(TEST_slowdown_master_async_rpc_tasks_by_ms);
DECLARE_int32(TEST_load_balancer_wait_ms);
DECLARE_int32(TEST_load_balancer_wait_after_count_pending_tasks_ms);
DECLARE_bool(tserver_heartbeat_metrics_add_drive_data);
DECLARE_int32(tserver_heartbeat_metrics_interval_ms);
DECLARE_bool(TEST_fail_async_delete_replica_task);
DECLARE_string(instance_uuid_override);

using namespace std::literals;

Expand Down Expand Up @@ -137,40 +138,34 @@ void WaitLoadBalancerIdle(
msg));
}

typedef std::unordered_map<std::string,
std::pair<std::unordered_map<std::string, int>, int>> DriveStats;
struct TSDriveStats {
std::unordered_map<std::string, std::vector<TabletId>> dir_tablets;
int num_leaders = 0;
};

using DriveStats = std::unordered_map<TabletServerId, TSDriveStats>;

Status GetTabletsDriveStats(DriveStats* stats,
yb::MiniCluster* mini_cluster,
const yb::client::YBTableName& table_name) {
Result<DriveStats> GetTabletsDriveStats(
MiniCluster* mini_cluster, const client::YBTableName& table_name) {
DriveStats result;
scoped_refptr<master::TableInfo> tbl_info =
VERIFY_RESULT(mini_cluster->GetLeaderMiniMaster())->catalog_manager().
GetTableInfoFromNamespaceNameAndTableName(table_name.namespace_type(),
table_name.namespace_name(),
table_name.table_name());
for (const auto& tablet : VERIFY_RESULT(tbl_info->GetTablets())) {
auto replica_map = tablet->GetReplicaLocations();
for (const auto& replica : *replica_map.get()) {
auto ts = stats->find(replica.first);
if (ts == stats->end()) {
ts = stats->insert({replica.first,
std::make_pair(std::unordered_map<std::string, int>(), 0)}).first;
for (const auto& [ts_uuid, replica] : *replica_map) {
auto& ts = result[ts_uuid];
if (replica.role == PeerRole::LEADER) {
++ts.num_leaders;
}
if (replica.second.role == PeerRole::LEADER) {
++ts->second.second;
}
if (!replica.second.fs_data_dir.empty()) {
auto& ts_map = ts->second.first;
auto path = ts_map.find(replica.second.fs_data_dir);
if (path == ts_map.end()) {
ts_map.insert({replica.second.fs_data_dir, 1});
} else {
++path->second;
}
if (!replica.fs_data_dir.empty()) {
ts.dir_tablets[replica.fs_data_dir].push_back(tablet->id());
}
}
}
return Status::OK();
return result;
}

} // namespace
Expand Down Expand Up @@ -606,12 +601,13 @@ TEST_F_EX(LoadBalancerMiniClusterTest, CheckLoadBalanceWithoutDriveData,
}

TEST_F(LoadBalancerMiniClusterTest, CheckLoadBalanceDriveAware) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_min_leader_stepdown_retry_interval_ms) = 0;

// Wait LB to move leaders
SleepFor(MonoDelta::FromMilliseconds(FLAGS_catalog_manager_bg_task_wait_ms * 2));
WaitLoadBalancerIdle(client_.get());

DriveStats before;
ASSERT_OK(GetTabletsDriveStats(&before, mini_cluster(), table_name()));
auto before = ASSERT_RESULT(GetTabletsDriveStats(mini_cluster(), table_name()));

// Add new tserver to force load balancer moves.
auto new_ts_index = mini_cluster()->num_tablet_servers();
Expand All @@ -623,37 +619,43 @@ TEST_F(LoadBalancerMiniClusterTest, CheckLoadBalanceDriveAware) {
WaitLoadBalancerActive(client_.get());
WaitLoadBalancerIdle(client_.get());

DriveStats after;
ASSERT_OK(GetTabletsDriveStats(&after, mini_cluster(), table_name()));
auto after = ASSERT_RESULT(GetTabletsDriveStats(mini_cluster(), table_name()));

bool found = false;
int balanced_servers = 0;
for (size_t ts_index = 0; ts_index < new_ts_index; ++ts_index) {
const auto ts_uuid = mini_cluster()->mini_tablet_server(ts_index)->server()->permanent_uuid();
std::vector<std::string> drives;
auto& ts_before = before[ts_uuid];
auto& ts_after = after[ts_uuid];
for (const auto& drive : ts_before.first) {
for (const auto& drive : ts_before.dir_tablets) {
drives.emplace_back(drive.first);
}
ASSERT_FALSE(drives.empty());
std::sort(drives.begin(), drives.end());
LOG(INFO) << "P " << ts_uuid;
LOG(INFO) << "Leaders before: " << ts_before.second << " after: " << ts_after.second;
LOG(INFO) << "Leaders before: " << ts_before.num_leaders << " after: " << ts_after.num_leaders;

int tablets = ts_after.first[drives.front()];
bool expected_move = true;
size_t tablets = ts_after.dir_tablets[drives.front()].size();
bool drives_balanced = true;
for (const auto& drive : drives) {
if (ts_after.first[drive] != tablets) {
expected_move = false;
LOG(INFO) << drive << " before: " << AsString(ts_before.dir_tablets[drive]) <<
" after: " << AsString(ts_after.dir_tablets[drive]);
if (ts_after.dir_tablets[drive].size() != tablets) {
drives_balanced = false;
}
LOG(INFO) << drive << " before: " << ts_before.first[drive] <<
" after: " << ts_after.first[drive];
}
if (expected_move) {
found = true;
if (drives_balanced) {
++balanced_servers;
}
}
ASSERT_TRUE(found);

// There are 4 tablets in this test and each tserver has 3 disks.
// The goal of the test is to get 1 tablet on each disk after adding a new tablet server.
// Before adding a tserver we have 2 tablets on some disk for all tservers, and 1 tablet on the
// other disks. It could happen that the same two tablets share a disk on all tservers.
// In this case we cannot guarantee even disk load, but at least 2 original tservers should have
// a balanced disks after load balancing.
ASSERT_GE(balanced_servers, 2);
}

TEST_F(LoadBalancerMiniClusterTest, ClearPendingDeletesOnFailure) {
Expand Down
63 changes: 30 additions & 33 deletions src/yb/master/cluster_balance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,19 @@ namespace master {

namespace {

std::vector<std::set<TabletId>> GetTabletsOnTSToMove(
bool drive_aware, const CBTabletServerMetadata& from_ts_meta) {
std::vector<std::set<TabletId>> all_tablets;
auto GetTServerTabletsByDrive(bool drive_aware, const CBTabletServerMetadata& from_ts_meta) {
std::vector<std::pair<std::string, std::set<TabletId>>> all_tablets;
if (drive_aware) {
for (const auto& path : from_ts_meta.sorted_path_load_by_tablets_count) {
auto path_list = from_ts_meta.path_to_tablets.find(path);
if (path_list == from_ts_meta.path_to_tablets.end()) {
LOG(INFO) << "Found uninitialized path: " << path;
continue;
}
all_tablets.push_back(path_list->second);
all_tablets.emplace_back(path, path_list->second);
}
} else {
all_tablets.push_back(from_ts_meta.running_tablets);
all_tablets.emplace_back("", from_ts_meta.running_tablets);
}
return all_tablets;
}
Expand Down Expand Up @@ -1117,11 +1116,13 @@ Result<bool> ClusterLoadBalancer::GetLoadToMove(
}

// If we don't find a tablet_id to move between these two TSs, advance the state.
if (VERIFY_RESULT(GetTabletToMove(high_load_uuid, low_load_uuid, moving_tablet_id))) {
auto tablet_to_move = VERIFY_RESULT(GetTabletToMove(high_load_uuid, low_load_uuid));
if (tablet_to_move) {
// If we got this far, we have the candidate we want, so fill in the output params and
// return. The tablet_id is filled in from GetTabletToMove.
*from_ts = high_load_uuid;
*to_ts = low_load_uuid;
*moving_tablet_id = *tablet_to_move;
VLOG(3) << "Found tablet " << *moving_tablet_id << " to move from "
<< *from_ts << " to ts " << *to_ts;
RETURN_NOT_OK(MoveReplica(*moving_tablet_id, high_load_uuid, low_load_uuid));
Expand All @@ -1138,16 +1139,16 @@ Result<bool> ClusterLoadBalancer::GetLoadToMove(
return STATUS(IllegalState, "Load balancing algorithm reached illegal state.");
}

Result<bool> ClusterLoadBalancer::GetTabletToMove(
const TabletServerId& from_ts, const TabletServerId& to_ts, TabletId* moving_tablet_id) {
Result<std::optional<TabletId>> ClusterLoadBalancer::GetTabletToMove(
const TabletServerId& from_ts, const TabletServerId& to_ts) {
const auto& from_ts_meta = state_->per_ts_meta_[from_ts];
// If drive aware, all_tablets is sorted by decreasing drive load.
std::vector<std::set<TabletId>> all_tablets_by_drive =
GetTabletsOnTSToMove(global_state_->drive_aware_, from_ts_meta);
std::vector<std::set<TabletId>> all_filtered_tablets_by_drive;
for (const std::set<TabletId>& drive_tablets : all_tablets_by_drive) {
std::set<TabletId> filtered_drive_tablets;
for (const TabletId& tablet_id : drive_tablets) {
auto all_tablets_by_drive = GetTServerTabletsByDrive(global_state_->drive_aware_, from_ts_meta);
decltype(all_tablets_by_drive) all_filtered_tablets_by_drive;
for (const auto& [drive, tablets] : all_tablets_by_drive) {
all_filtered_tablets_by_drive.emplace_back(drive, decltype(tablets)());
auto& filtered_drive_tablets = all_filtered_tablets_by_drive.back().second;
for (const TabletId& tablet_id : tablets) {
// We don't want to add a new replica to an already over-replicated tablet.
//
// TODO(bogdan): should make sure we pick tablets that this TS is not a leader of, so we
Expand All @@ -1160,25 +1161,22 @@ Result<bool> ClusterLoadBalancer::GetTabletToMove(
continue;
}

if (VERIFY_RESULT(
state_->CanAddTabletToTabletServer(tablet_id, to_ts))) {
if (VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, to_ts))) {
filtered_drive_tablets.insert(tablet_id);
}
}
all_filtered_tablets_by_drive.push_back(filtered_drive_tablets);
}

// Below, we choose a tablet to move. We first filter out any tablets which cannot be moved
// because of placement limitations. Then, we prioritize moving a tablet whose leader is in the
// same zone/region it is moving to (for faster remote bootstrapping).
for (const std::set<TabletId>& drive_tablets : all_filtered_tablets_by_drive) {
VLOG(3) << Format("All tablets being considered for movement from ts $0 to ts $1 for this "
"drive are: $2", from_ts, to_ts, drive_tablets);

bool found_tablet_to_move = false;
CatalogManagerUtil::CloudInfoSimilarity chosen_tablet_ci_similarity =
CatalogManagerUtil::NO_MATCH;
for (const TabletId& tablet_id : drive_tablets) {
for (const auto& [drive, tablets] : all_filtered_tablets_by_drive) {
VLOG(3) << Format("All tablets being considered for movement from ts $0 to ts $1 for drive $2 "
" are: $3", from_ts, to_ts, drive, tablets);

std::optional<TableId> result;
auto chosen_tablet_ci_similarity = CatalogManagerUtil::NO_MATCH;
for (const TabletId& tablet_id : tablets) {
// TODO(#15853): this should be augmented as well to allow dropping by one replica, if still
// leaving us with more than the minimum.
//
Expand Down Expand Up @@ -1212,25 +1210,24 @@ Result<bool> ClusterLoadBalancer::GetTabletToMove(
ci_similarity = CatalogManagerUtil::ComputeCloudInfoSimilarity(leader_ci, to_ts_ci);
}

if (found_tablet_to_move && ci_similarity <= chosen_tablet_ci_similarity) {
if (result && ci_similarity <= chosen_tablet_ci_similarity) {
continue;
}
// This is the best tablet to move, so far.
found_tablet_to_move = true;
*moving_tablet_id = tablet_id;
result = tablet_id;
chosen_tablet_ci_similarity = ci_similarity;
}

// If there is any tablet we can move from this drive, choose it and return.
if (found_tablet_to_move) {
VLOG(3) << "Found tablet " << *moving_tablet_id << " for moving from ts " << from_ts
if (result) {
VLOG(3) << "Found tablet " << *result << " for moving from ts " << from_ts
<< " to ts " << to_ts;
return true;
return result;
}
}

VLOG(3) << Format("Did not find any tablets to move from $0 to $1", from_ts, to_ts);
return false;
return std::nullopt;
}

Result<bool> ClusterLoadBalancer::GetLeaderToMoveWithinAffinitizedPriorities(
Expand Down Expand Up @@ -1443,7 +1440,7 @@ Result<bool> ClusterLoadBalancer::HandleRemoveReplicas(

const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
const auto& tablet_servers = tablet_meta.over_replicated_tablet_servers;
auto comparator = PerTableLoadState::Comparator(state_);
auto comparator = PerTableLoadState::LoadComparator(state_, tablet_id);
std::vector<TabletServerId> sorted_ts;
// Don't include any tservers where this tablet is still starting.
std::copy_if(
Expand Down
4 changes: 2 additions & 2 deletions src/yb/master/cluster_balance.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ class ClusterLoadBalancer {
TabletId* moving_tablet_id, TabletServerId* from_ts, TabletServerId* to_ts)
REQUIRES_SHARED(catalog_manager_->mutex_);

Result<bool> GetTabletToMove(
const TabletServerId& from_ts, const TabletServerId& to_ts, TabletId* moving_tablet_id)
Result<std::optional<TabletId>> GetTabletToMove(
const TabletServerId& from_ts, const TabletServerId& to_ts)
REQUIRES_SHARED(catalog_manager_->mutex_);

// Issue the change config and modify the in-memory state for moving a replica from one tablet
Expand Down
40 changes: 30 additions & 10 deletions src/yb/master/cluster_balance_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,46 @@ bool PerTableLoadState::LeaderLoadComparator::operator()(
return a_load < b_load;
}

bool PerTableLoadState::CompareByUuid(const TabletServerId& a, const TabletServerId& b) {
bool PerTableLoadState::CompareLoad(
const TabletServerId& a, const TabletServerId& b, optional_ref<const TabletId> tablet_id) {
auto load_a = GetLoad(a);
auto load_b = GetLoad(b);
if (load_a == load_b) {
// Use global load as a heuristic to help break ties.
load_a = global_state_->GetGlobalLoad(a);
load_b = global_state_->GetGlobalLoad(b);
if (load_a == load_b) {
return a < b;
if (load_a != load_b) {
return load_a < load_b;
}
if (tablet_id) {
load_a = GetTabletDriveLoad(a, *tablet_id);
load_b = GetTabletDriveLoad(b, *tablet_id);
if (load_a != load_b) {
return load_a < load_b;
}
}
return load_a < load_b;
// Use global load as a heuristic to help break ties.
load_a = global_state_->GetGlobalLoad(a);
load_b = global_state_->GetGlobalLoad(b);
if (load_a != load_b) {
return load_a < load_b;
}
return a < b;
}

size_t PerTableLoadState::GetLoad(const TabletServerId& ts_uuid) const {
const auto& ts_meta = per_ts_meta_.at(ts_uuid);
return ts_meta.starting_tablets.size() + ts_meta.running_tablets.size();
}

size_t PerTableLoadState::GetTabletDriveLoad(
const TabletServerId& ts_uuid, const TabletId& tablet_id) const {
const auto& ts_meta = per_ts_meta_.at(ts_uuid);
for (const auto& [_, tablets] : ts_meta.path_to_tablets) {
if (tablets.contains(tablet_id)) {
return tablets.size();
}
}
LOG(DFATAL) << "Did not find tablet " << tablet_id << " on tserver " << ts_uuid;
return 0;
}

size_t PerTableLoadState::GetLeaderLoad(const TabletServerId& ts_uuid) const {
return per_ts_meta_.at(ts_uuid).leaders.size();
}
Expand Down Expand Up @@ -702,8 +723,7 @@ Status PerTableLoadState::RemoveReplica(const TabletId& tablet_id, const TabletS
}

void PerTableLoadState::SortLoad() {
auto comparator = Comparator(this);
sort(sorted_load_.begin(), sorted_load_.end(), comparator);
sort(sorted_load_.begin(), sorted_load_.end(), LoadComparator(this, std::nullopt));

if (global_state_->drive_aware_) {
SortDriveLoad();
Expand Down
Loading

0 comments on commit b4cd95d

Please sign in to comment.