Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: add function to get cluster migration info (#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Aug 2, 2021
1 parent 8530b9f commit 4bddefe
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 11 deletions.
161 changes: 154 additions & 7 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,44 @@ namespace replication {
DSN_DEFINE_bool("meta_server", balance_cluster, false, "whether to enable cluster balancer");
DSN_TAG_VARIABLE(balance_cluster, FT_MUTABLE);

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id)
{
unsigned count = 0;
switch (type) {
case cluster_balance_type::Secondary:
if (app_id > 0) {
count = ns.partition_count(app_id) - ns.primary_count(app_id);
} else {
count = ns.partition_count() - ns.primary_count();
}
break;
case cluster_balance_type::Primary:
if (app_id > 0) {
count = ns.primary_count(app_id);
} else {
count = ns.primary_count();
}
break;
default:
break;
}
return (uint32_t)count;
}

uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
{
uint32_t min = UINT_MAX, max = 0;
for (const auto &kv : count_map) {
if (kv.second < min) {
min = kv.second;
}
if (kv.second > max) {
max = kv.second;
}
}
return max - min;
}

greedy_load_balancer::greedy_load_balancer(meta_service *_svc)
: simple_load_balancer(_svc),
_ctrl_balancer_ignored_apps(nullptr),
Expand Down Expand Up @@ -941,17 +979,16 @@ void greedy_load_balancer::balance_cluster()
}
}

bool need_continue = cluster_replica_balance(cluster_balance_type::Secondary);
bool need_continue = cluster_replica_balance(t_global_view, cluster_balance_type::Secondary);
if (!need_continue) {
return;
}

// TODO(zlw): copy primary
}

bool greedy_load_balancer::cluster_replica_balance(const cluster_balance_type type)
bool greedy_load_balancer::cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
{
bool enough_information = do_cluster_replica_balance(type);
bool enough_information = do_cluster_replica_balance(global_view, type);
if (!enough_information) {
return false;
}
Expand All @@ -963,10 +1000,120 @@ bool greedy_load_balancer::cluster_replica_balance(const cluster_balance_type ty
return true;
}

bool greedy_load_balancer::do_cluster_replica_balance(const cluster_balance_type type)
bool greedy_load_balancer::do_cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
{
return true;
cluster_migration_info cluster_info;
if (!get_cluster_migration_info(global_view, type, cluster_info)) {
return false;
}

/// TBD(zlw)
return true;
}

bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ cluster_migration_info &cluster_info)
{
const node_mapper &nodes = *global_view->nodes;
if (nodes.size() < 3) {
return false;
}
for (const auto &kv : nodes) {
if (!all_replica_infos_collected(kv.second)) {
return false;
}
}

const app_mapper &all_apps = *global_view->apps;
app_mapper apps;
for (const auto &kv : all_apps) {
const std::shared_ptr<app_state> &app = kv.second;
if (is_ignored_app(app->app_id) || app->is_bulk_loading || app->splitting()) {
return false;
}
if (app->status == app_status::AS_AVAILABLE) {
apps[app->app_id] = app;
}
}

for (const auto &kv : apps) {
std::shared_ptr<app_state> app = kv.second;
app_migration_info info;
if (!get_app_migration_info(app, nodes, type, info)) {
return false;
}
cluster_info.apps_info.emplace(kv.first, std::move(info));
cluster_info.apps_skew[kv.first] = get_skew(info.replicas_count);
}

for (const auto &kv : nodes) {
const node_state &ns = kv.second;
node_migration_info info;
get_node_migration_info(ns, apps, info);
cluster_info.nodes_info.emplace(kv.first, std::move(info));

auto count = get_count(ns, type, -1);
cluster_info.replicas_count[kv.first] = count;
}

cluster_info.type = type;
return true;
}

bool greedy_load_balancer::get_app_migration_info(std::shared_ptr<app_state> app,
const node_mapper &nodes,
const cluster_balance_type type,
app_migration_info &info)
{
info.app_id = app->app_id;
info.app_name = app->app_name;
info.partitions.resize(app->partitions.size());
for (auto i = 0; i < app->partitions.size(); ++i) {
std::map<rpc_address, partition_status::type> pstatus_map;
pstatus_map[app->partitions[i].primary] = partition_status::PS_PRIMARY;
if (app->partitions[i].secondaries.size() != app->partitions[i].max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &addr : app->partitions[i].secondaries) {
pstatus_map[addr] = partition_status::PS_SECONDARY;
}
info.partitions[i] = pstatus_map;
}

for (const auto &it : nodes) {
const node_state &ns = it.second;
auto count = get_count(ns, type, app->app_id);
info.replicas_count[ns.addr()] = count;
}

return true;
}

void greedy_load_balancer::get_node_migration_info(const node_state &ns,
const app_mapper &apps,
/*out*/ node_migration_info &info)
{
info.address = ns.addr();
for (const auto &iter : apps) {
std::shared_ptr<app_state> app = iter.second;
for (const auto &context : app->helpers->contexts) {
std::string disk_tag;
if (!context.get_disk_tag(ns.addr(), disk_tag)) {
continue;
}
auto pid = context.config_owner->pid;
if (info.partitions.find(disk_tag) != info.partitions.end()) {
info.partitions[disk_tag].insert(pid);
} else {
partition_set pset;
pset.insert(pid);
info.partitions.emplace(disk_tag, pset);
}
}
}
}

bool greedy_load_balancer::balance(meta_view view, migration_list &list)
Expand Down
80 changes: 78 additions & 2 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ ENUM_REG(cluster_balance_type::Primary)
ENUM_REG(cluster_balance_type::Secondary)
ENUM_END(cluster_balance_type)

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id);
uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map);

class greedy_load_balancer : public simple_load_balancer
{
public:
Expand Down Expand Up @@ -150,9 +153,75 @@ class greedy_load_balancer : public simple_load_balancer

void balance_cluster();

bool cluster_replica_balance(const cluster_balance_type type);
bool cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);

bool do_cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);

struct app_migration_info
{
int32_t app_id;
std::string app_name;
std::vector<std::map<rpc_address, partition_status::type>> partitions;
std::map<rpc_address, uint32_t> replicas_count;
bool operator<(const app_migration_info &another) const
{
if (app_id < another.app_id)
return true;
return false;
}
bool operator==(const app_migration_info &another) const
{
return app_id == another.app_id;
}
partition_status::type get_partition_status(int32_t pidx, rpc_address addr)
{
for (const auto &kv : partitions[pidx]) {
if (kv.first == addr) {
return kv.second;
}
}
return partition_status::PS_INACTIVE;
}
};

struct node_migration_info
{
rpc_address address;
std::map<std::string, partition_set> partitions;
partition_set future_partitions;
bool operator<(const node_migration_info &another) const
{
if (address < another.address)
return true;
return false;
}
bool operator==(const node_migration_info &another) const
{
return address == another.address;
}
};

struct cluster_migration_info
{
cluster_balance_type type;
std::map<int32_t, uint32_t> apps_skew;
std::map<int32_t, app_migration_info> apps_info;
std::map<rpc_address, node_migration_info> nodes_info;
std::map<rpc_address, uint32_t> replicas_count;
};

bool do_cluster_replica_balance(const cluster_balance_type type);
bool get_cluster_migration_info(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ cluster_migration_info &cluster_info);

bool get_app_migration_info(std::shared_ptr<app_state> app,
const node_mapper &nodes,
const cluster_balance_type type,
/*out*/ app_migration_info &info);

void get_node_migration_info(const node_state &ns,
const app_mapper &all_apps,
/*out*/ node_migration_info &info);

bool all_replica_infos_collected(const node_state &ns);
// using t_global_view to get disk_tag of node's pid
Expand All @@ -178,6 +247,13 @@ class greedy_load_balancer : public simple_load_balancer
std::string clear_balancer_ignored_app_ids();

bool is_ignored_app(app_id app_id);

FRIEND_TEST(greedy_load_balancer, app_migration_info);
FRIEND_TEST(greedy_load_balancer, node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_skew);
FRIEND_TEST(greedy_load_balancer, get_count);
FRIEND_TEST(greedy_load_balancer, get_app_migration_info);
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
};

inline configuration_proposal_action
Expand Down
14 changes: 12 additions & 2 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,16 @@ void config_context::adjust_proposal(const rpc_address &node, const replica_info
lb_actions.track_current_learner(node, info);
}

bool config_context::get_disk_tag(const rpc_address &node, /*out*/ std::string &disk_tag) const
{
auto iter = find_from_serving(node);
if (iter == serving.end()) {
return false;
}
disk_tag = iter->disk_tag;
return true;
}

void app_state_helper::on_init_partitions()
{
config_context context;
Expand Down Expand Up @@ -546,5 +556,5 @@ partition_status::type node_state::served_as(const gpid &pid) const
return partition_status::PS_SECONDARY;
return partition_status::PS_INACTIVE;
}
}
}
} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ class config_context

void adjust_proposal(const dsn::rpc_address &node, const replica_info &info);

bool get_disk_tag(const rpc_address &node, /*out*/ std::string &disk_tag) const;

public:
// intialize to 4 statically.
// and will be set by load-balancer module
Expand Down
Loading

0 comments on commit 4bddefe

Please sign in to comment.