Skip to content

Commit

Permalink
Backport to 2.2: [docdb] Setup a global leader balance threshold whil…
Browse files Browse the repository at this point in the history
…e allowing progress to be made across tables (#5021) (#5181)

Summary:
Adding new flag `load_balancer_max_concurrent_moves_per_table` to limit the number of leader moves per table. This flag is meant to be used with `load_balancer_max_concurrent_moves` in order to improve performance for these moves.
Also fixing issue #5181, where having pending leader moves on subsequent LB runs can lead to the same tablet being told to move twice, thus leading to a check failure.  This was caused from `AnalyzeTabletsUnlocked` not properly updating the state for leader stepdowns, and has been fixed by storing new_leader_uuid instead of change_config_ts_uuid for pending leader stepdown tasks.

Test Plan: Jenkins: rebase: 2.2

Reviewers: bogdan, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D9083
  • Loading branch information
hulien22 committed Aug 4, 2020
1 parent 3e32b62 commit ce8d77c
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/yb/integration-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ ADD_YB_TEST(redis_table-test)
ADD_YB_TEST(update_scan_delta_compact-test)
ADD_YB_TEST(log_version-test)
ADD_YB_TEST(load_balancer-test)
ADD_YB_TEST(load_balancer_multi_table-test)
ADD_YB_TEST(load_balancer_respect_affinity-test)

set(YB_TEST_LINK_LIBS_SAVED ${YB_TEST_LINK_LIBS})
Expand Down
47 changes: 47 additions & 0 deletions src/yb/integration-tests/load_balancer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "yb/rpc/messenger.h"
#include "yb/rpc/rpc_controller.h"
#include "yb/tools/yb-admin_client.h"
#include "yb/util/monotime.h"

namespace yb {
namespace integration_tests {
Expand Down Expand Up @@ -164,5 +165,51 @@ TEST_F(LoadBalancerTest, IsLoadBalancerIdle) {
}, MonoDelta::FromMilliseconds(10000), "IsLoadBalancerActive"));
}

// This regression test is to check that we don't hit the CHECK in cluster_balance.cc
// state_->pending_stepdown_leader_tasks_[tablet->table()->id()].count(tablet->tablet_id()) == 0
// This CHECK was previously hit when load_balancer_max_concurrent_moves was set to a value > 1
// and multiple stepdown tasks were sent to the same tablet on subsequent LB runs.
TEST_F(LoadBalancerTest, PendingLeaderStepdownRegressTest) {
const int test_bg_task_wait_ms = 1000;
ASSERT_OK(yb_admin_client_->ModifyPlacementInfo("c.r.z0,c.r.z1,c.r.z2", 3, ""));
ASSERT_OK(yb_admin_client_->SetPreferredZones({"c.r.z1"}));

// Move all leaders to one tablet.
ASSERT_OK(WaitFor([&]() {
return AreLeadersOnPreferredOnly();
}, MonoDelta::FromMilliseconds(kDefaultTimeoutMillis), "AreLeadersOnPreferredOnly"));

// Allow for multiple leader moves per table.
for (int i = 0; i < num_masters(); ++i) {
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->master(i),
"load_balancer_max_concurrent_moves", "10"));
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->master(i),
"load_balancer_max_concurrent_moves_per_table", "5"));
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->master(i),
"catalog_manager_bg_task_wait_ms",
std::to_string(test_bg_task_wait_ms)));
}
// Add stepdown delay of 2 * catalog_manager_bg_task_wait_ms.
// This ensures that we will have pending stepdown tasks during a subsequent LB run.
for (int i = 0; i < num_tablet_servers(); ++i) {
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->tablet_server(i),
"TEST_leader_stepdown_delay_ms",
std::to_string(2 * test_bg_task_wait_ms)));
}

// Trigger leader balancing.
ASSERT_OK(yb_admin_client_->SetPreferredZones({"c.r.z0", "c.r.z1", "c.r.z2"}));

// Wait for load balancing to complete.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
bool is_idle = VERIFY_RESULT(client_->IsLoadBalancerIdle());
return !is_idle;
}, MonoDelta::FromMilliseconds(kDefaultTimeoutMillis * 2), "IsLoadBalancerActive"));

ASSERT_OK(WaitFor([&]() -> Result<bool> {
return client_->IsLoadBalancerIdle();
}, MonoDelta::FromMilliseconds(kDefaultTimeoutMillis * 2), "IsLoadBalancerIdle"));
}

} // namespace integration_tests
} // namespace yb
208 changes: 208 additions & 0 deletions src/yb/integration-tests/load_balancer_multi_table-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include <gtest/gtest.h>

#include "yb/integration-tests/yb_table_test_base.h"

#include "yb/client/schema.h"
#include "yb/client/table_creator.h"
#include "yb/client/yb_table_name.h"
#include "yb/consensus/consensus.pb.h"
#include "yb/consensus/consensus.proxy.h"
#include "yb/gutil/strings/join.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/external_mini_cluster.h"
#include "yb/integration-tests/cluster_verifier.h"
#include "yb/master/master.h"
#include "yb/master/master-test-util.h"
#include "yb/master/sys_catalog.h"
#include "yb/master/master.proxy.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/rpc_controller.h"
#include "yb/tools/yb-admin_client.h"
#include "yb/util/monotime.h"

using namespace std::literals;

namespace yb {
namespace integration_tests {

constexpr uint32_t kDefaultTimeoutMillis = 30000;
constexpr int kNumTables = 3;
constexpr int kMovesPerTable = 1;

// We need multiple tables in order to test load_balancer_max_concurrent_moves_per_table.
class LoadBalancerMultiTableTest : public YBTableTestBase {
protected:
void SetUp() override {
YBTableTestBase::SetUp();

yb_admin_client_ = std::make_unique<tools::enterprise::ClusterAdminClient>(
external_mini_cluster()->GetMasterAddresses(), kDefaultTimeoutMillis);

ASSERT_OK(yb_admin_client_->Init());
}

bool use_external_mini_cluster() override { return true; }

bool enable_ysql() override {
// Do not create the transaction status table.
return false;
}

int num_tablets() override {
return 8;
}

client::YBTableName table_name() override {
return table_names_[0];
}

void CustomizeExternalMiniCluster(ExternalMiniClusterOptions* opts) override {
opts->extra_tserver_flags.push_back("--placement_cloud=c");
opts->extra_tserver_flags.push_back("--placement_region=r");
opts->extra_tserver_flags.push_back("--placement_zone=z${index}");
opts->extra_master_flags.push_back("--load_balancer_skip_leader_as_remove_victim=false");
opts->extra_master_flags.push_back("--load_balancer_max_concurrent_moves=10");
opts->extra_master_flags.push_back("--load_balancer_max_concurrent_moves_per_table="
+ std::to_string(kMovesPerTable));
}

void CreateTables() {
for (int i = 1; i <= kNumTables; ++i) {
table_names_.emplace_back(YQL_DATABASE_CQL,
"my_keyspace-" + std::to_string(i),
"kv-table-test-" + std::to_string(i));
}

for (const auto& tn : table_names_) {
ASSERT_OK(client_->CreateNamespaceIfNotExists(tn.namespace_name(), tn.namespace_type()));

client::YBSchemaBuilder b;
b.AddColumn("k")->Type(BINARY)->NotNull()->HashPrimaryKey();
b.AddColumn("v")->Type(BINARY)->NotNull();
ASSERT_OK(b.Build(&schema_));

ASSERT_OK(NewTableCreator()->table_name(tn).schema(&schema_).Create());
}
}

void DeleteTables() {
for (const auto& tn : table_names_) {
ASSERT_OK(client_->DeleteTable(tn));
}
table_names_.clear();
}

void CreateTable() override {
if (!table_exists_) {
CreateTables();
table_exists_ = true;
}
}

void DeleteTable() override {
if (table_exists_) {
DeleteTables();
table_exists_ = false;
}
}

Result<bool> AreLeadersOnPreferredOnly() {
master::AreLeadersOnPreferredOnlyRequestPB req;
master::AreLeadersOnPreferredOnlyResponsePB resp;
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kDefaultTimeoutMillis));
auto proxy = VERIFY_RESULT(GetMasterLeaderProxy());
RETURN_NOT_OK(proxy->AreLeadersOnPreferredOnly(req, &resp, &rpc));
return !resp.has_error();
}

Result<std::shared_ptr<master::MasterServiceProxy>> GetMasterLeaderProxy() {
int idx;
RETURN_NOT_OK(external_mini_cluster()->GetLeaderMasterIndex(&idx));
return external_mini_cluster()->master_proxy(idx);
}

std::unique_ptr<tools::enterprise::ClusterAdminClient> yb_admin_client_;
vector<client::YBTableName> table_names_;
};

TEST_F(LoadBalancerMultiTableTest, MultipleLeaderTabletMovesPerTable) {
const int test_bg_task_wait_ms = 5000;

// Start with 3 tables each with 8 tablets on 3 servers.
ASSERT_OK(yb_admin_client_->ModifyPlacementInfo("c.r.z0,c.r.z1,c.r.z2", 3, ""));

// Disable leader balancing.
for (int i = 0; i < num_masters(); ++i) {
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->master(i),
"load_balancer_max_concurrent_moves", "0"));
}

// Add new tserver.
std::vector<std::string> extra_opts;
extra_opts.push_back("--placement_cloud=c");
extra_opts.push_back("--placement_region=r");
extra_opts.push_back("--placement_zone=z1");
ASSERT_OK(external_mini_cluster()->AddTabletServer(true, extra_opts));
ASSERT_OK(external_mini_cluster()->WaitForTabletServerCount(num_tablet_servers() + 1,
MonoDelta::FromMilliseconds(kDefaultTimeoutMillis)));

// Wait for load balancing to complete.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
bool is_idle = VERIFY_RESULT(client_->IsLoadBalancerIdle());
return !is_idle;
}, MonoDelta::FromMilliseconds(kDefaultTimeoutMillis * 2), "IsLoadBalancerActive"));

ASSERT_OK(WaitFor([&]() -> Result<bool> {
return client_->IsLoadBalancerIdle();
}, MonoDelta::FromMilliseconds(kDefaultTimeoutMillis * 2), "IsLoadBalancerIdle"));

// Get current leader counts.
unordered_map<string, unordered_map<string, int>> initial_leader_counts;
for (const auto& tn : table_names_) {
initial_leader_counts[tn.table_name()] = ASSERT_RESULT(yb_admin_client_->GetLeaderCounts(tn));
}

// Enable leader balancing and also increase LB run delay.
LOG(INFO) << "Re-enabling leader balancing.";
for (int i = 0; i < num_masters(); ++i) {
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->master(i),
"load_balancer_max_concurrent_moves", "10"));
ASSERT_OK(external_mini_cluster_->SetFlag(external_mini_cluster_->master(i),
"catalog_manager_bg_task_wait_ms",
std::to_string(test_bg_task_wait_ms)));
}

// Wait for one run of the load balancer to complete
SleepFor(MonoDelta::FromMilliseconds(test_bg_task_wait_ms));

// Check new leader counts.
int num_leader_moves = 0;
for (const auto& tn : table_names_) {
const auto new_leader_counts = ASSERT_RESULT(yb_admin_client_->GetLeaderCounts(tn));
// Only count increases in leaders
for (const auto& lc : new_leader_counts) {
num_leader_moves += max(0, lc.second - initial_leader_counts[tn.table_name()][lc.first]);
}
}

// Ensure that we moved one run's worth of leaders.
LOG(INFO) << "Moved " << num_leader_moves << " leaders in total.";
ASSERT_EQ(num_leader_moves, kMovesPerTable * kNumTables);
}

} // namespace integration_tests
} // namespace yb
2 changes: 1 addition & 1 deletion src/yb/integration-tests/yb_table_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class YBTableTestBase : public YBTest {
void CreateRedisTable(const client::YBTableName& table_name);
virtual void CreateTable();
void OpenTable();
void DeleteTable();
virtual void DeleteTable();
virtual void PutKeyValue(yb::client::YBSession* session, string key, string value);
virtual void PutKeyValue(string key, string value);
void RestartCluster();
Expand Down
8 changes: 5 additions & 3 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6295,18 +6295,20 @@ void CatalogManager::GetPendingServerTasksUnlocked(
const TableId &table_uuid,
TabletToTabletServerMap *add_replica_tasks_map,
TabletToTabletServerMap *remove_replica_tasks_map,
TabletToTabletServerMap *stepdown_leader_tasks) {
TabletToTabletServerMap *stepdown_leader_tasks_map) {

auto table = GetTableInfoUnlocked(table_uuid);
for (const auto& task : table->GetTasks()) {
TabletToTabletServerMap* outputMap = nullptr;
TabletId tablet_id;
if (task->type() == MonitoredTask::ASYNC_ADD_SERVER) {
outputMap = add_replica_tasks_map;
} else if (task->type() == MonitoredTask::ASYNC_REMOVE_SERVER) {
outputMap = remove_replica_tasks_map;
} else if (task->type() == MonitoredTask::ASYNC_TRY_STEP_DOWN) {
outputMap = stepdown_leader_tasks;
// Store new_leader_uuid instead of change_config_ts_uuid.
auto raft_task = static_cast<AsyncTryStepDown*>(task.get());
(*stepdown_leader_tasks_map)[raft_task->tablet_id()] = raft_task->new_leader_uuid();
continue;
}
if (outputMap) {
auto raft_task = static_cast<CommonInfoForRaftTask*>(task.get());
Expand Down
23 changes: 19 additions & 4 deletions src/yb/master/cluster_balance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps_per_table,
"number of remote bootstraps across the cluster is still limited by the flag "
"load_balancer_max_concurrent_tablet_remote_bootstraps. This flag is meant to prevent "
"a single table use all the available remote bootstrap sessions and starving other "
"tables");
"tables.");

DEFINE_int32(load_balancer_max_over_replicated_tablets,
1,
Expand All @@ -75,9 +75,17 @@ DEFINE_int32(load_balancer_max_concurrent_removals,
"load balancer.");

DEFINE_int32(load_balancer_max_concurrent_moves,
10,
"Maximum number of tablet leaders on tablet servers (across the cluster) to move in "
"any one run of the load balancer.");

DEFINE_int32(load_balancer_max_concurrent_moves_per_table,
1,
"Maximum number of tablet leaders on tablet servers to move in any one run of the "
"load balancer.");
"Maximum number of tablet leaders per table to move in any one run of the load "
"balancer. The maximum number of tablet leader moves across the cluster is still "
"limited by the flag load_balancer_max_concurrent_moves. This flag is meant to "
"prevent a single table from using all of the leader moves quota and starving "
"other tables.");

DEFINE_int32(load_balancer_num_idle_runs,
5,
Expand Down Expand Up @@ -330,7 +338,14 @@ void ClusterLoadBalancer::RunLoadBalancer(Options* options) {
}

// Handle tablet servers with too many leaders.
for ( ; remaining_leader_moves > 0; --remaining_leader_moves) {
// Check the current pending tasks per table to ensure we don't trigger the same task.
int table_remaining_leader_moves = state_->options_->kMaxConcurrentLeaderMovesPerTable;
set_remaining(state_->pending_stepdown_leader_tasks_[table.first].size(),
&table_remaining_leader_moves);
// Keep track of both the global and per table limit on number of moves.
for ( ;
remaining_leader_moves > 0 && table_remaining_leader_moves > 0;
--remaining_leader_moves, --table_remaining_leader_moves) {
auto handle_leader = HandleLeaderMoves(&out_tablet_id, &out_from_ts, &out_to_ts);
if (!handle_leader.ok()) {
LOG(WARNING) << "Skipping leader moves for " << table.first << ": "
Expand Down
10 changes: 8 additions & 2 deletions src/yb/master/cluster_balance_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ DECLARE_int32(load_balancer_max_concurrent_removals);

DECLARE_int32(load_balancer_max_concurrent_moves);

DECLARE_int32(load_balancer_max_concurrent_moves_per_table);

namespace yb {
namespace master {

Expand Down Expand Up @@ -168,7 +170,7 @@ struct Options {
// this.
int kMaxTabletRemoteBootstraps = FLAGS_load_balancer_max_concurrent_tablet_remote_bootstraps;

// Max number of tablets being remote bootstrapped for a specific tabe, if we enable limiting
// Max number of tablets being remote bootstrapped for a specific table, if we enable limiting
// this.
int kMaxTabletRemoteBootstrapsPerTable =
FLAGS_load_balancer_max_concurrent_tablet_remote_bootstraps_per_table;
Expand All @@ -186,9 +188,13 @@ struct Options {
// Max number of tablet peer replicas to add in any one run of the load balancer.
int kMaxConcurrentAdds = FLAGS_load_balancer_max_concurrent_adds;

// Max number of tablet leaders on tablet servers to move in any one run of the load balancer.
// Max number of tablet leaders on tablet servers (across the cluster) to move in any one run of
// the load balancer.
int kMaxConcurrentLeaderMoves = FLAGS_load_balancer_max_concurrent_moves;

// Max number of tablet leaders per table to move in any one run of the load balancer.
int kMaxConcurrentLeaderMovesPerTable = FLAGS_load_balancer_max_concurrent_moves_per_table;

// TODO(bogdan): add state for leaders starting remote bootstraps, to limit on that end too.
};

Expand Down
Loading

0 comments on commit ce8d77c

Please sign in to comment.