Skip to content

Commit

Permalink
feat(admin-cli): update admin-cli import depository
Browse files Browse the repository at this point in the history
use "apache/incubator-pegasus/collector" to replcae "pegasus-kv/collector"
  • Loading branch information
lupengfan1 committed Jan 8, 2025
1 parent 1091da5 commit 3d1eb58
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 307 deletions.
2 changes: 1 addition & 1 deletion admin-cli/executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/pegasus-kv/collector/aggregate"
"github.com/apache/incubator-pegasus/collector/aggregate"
)

// Client represents as a manager of various SDKs that
Expand Down
2 changes: 1 addition & 1 deletion admin-cli/executor/nodes_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package executor
import (
"github.com/apache/incubator-pegasus/admin-cli/tabular"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/pegasus-kv/collector/aggregate"
"github.com/apache/incubator-pegasus/collector/aggregate"
)

var nodeStatsTemplate = `---
Expand Down
2 changes: 1 addition & 1 deletion admin-cli/executor/partition_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sort"

"github.com/apache/incubator-pegasus/admin-cli/tabular"
"github.com/pegasus-kv/collector/aggregate"
"github.com/apache/incubator-pegasus/collector/aggregate"
)

var partitionStatsTemplate = `---
Expand Down
2 changes: 1 addition & 1 deletion admin-cli/executor/table_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/apache/incubator-pegasus/admin-cli/tabular"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/pegasus-kv/collector/aggregate"
"github.com/apache/incubator-pegasus/collector/aggregate"
)

var tableStatsTemplate = `---
Expand Down
2 changes: 1 addition & 1 deletion admin-cli/executor/toolkits/tablemigrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/apache/incubator-pegasus/admin-cli/executor"
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/collector/aggregate"
"github.com/apache/incubator-pegasus/go-client/session"
"github.com/pegasus-kv/collector/aggregate"
)

var pendingMutationThreshold = 100000.0
Expand Down
10 changes: 5 additions & 5 deletions admin-cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ module github.com/apache/incubator-pegasus/admin-cli
go 1.18

require (
github.com/apache/incubator-pegasus/collector v0.0.0-20250103031858-1091da5fbec8
github.com/apache/incubator-pegasus/go-client v0.0.0-20220617101220-e49a69d25a52
github.com/cheggaaa/pb/v3 v3.0.6
github.com/desertbit/grumble v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/go-resty/resty/v2 v2.6.0
github.com/go-zookeeper/zk v1.0.2
github.com/olekukonko/tablewriter v0.0.5
github.com/pegasus-kv/collector v0.0.0-20220526124628-023287923c32
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.8.2
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.16.13
Expand All @@ -44,7 +44,7 @@ require (
github.com/desertbit/go-shlex v0.1.1 // indirect
github.com/desertbit/readline v1.5.1 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -69,7 +69,7 @@ require (
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/yaml.v3 v3.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
311 changes: 17 additions & 294 deletions admin-cli/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion admin-cli/util/pegasus_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"strings"
"sync"

"github.com/apache/incubator-pegasus/collector/aggregate"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/session"
"github.com/pegasus-kv/collector/aggregate"
)

// PegasusNode is a representation of MetaServer and ReplicaServer.
Expand Down
2 changes: 1 addition & 1 deletion admin-cli/util/perf_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package util
import (
"strings"

"github.com/pegasus-kv/collector/aggregate"
"github.com/apache/incubator-pegasus/collector/aggregate"
)

func GetPartitionStat(perfSession *aggregate.PerfSession, counter string) map[string]float64 {
Expand Down
12 changes: 12 additions & 0 deletions idl/replica_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ struct query_replica_decree_response
2:i64 last_decree;
}

struct reset_ddd_partition_request
{
1:metadata.replica_configuration config;
2:optional bool disable_reserve = false;
}

struct reset_ddd_partition_response
{
1:dsn.error_code err;
2:string hint_message;
}

struct query_replica_info_request
{
1:dsn.rpc_address node1;
Expand Down
18 changes: 17 additions & 1 deletion src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
#include <type_traits>

#include "backup_types.h"
#include "common//duplication_common.h"
#include "common/replication_other_types.h"
#include "common/duplication_common.h"
#include "common/backup_common.h"
#include "common/bulk_load_common.h"
#include "common/gpid.h"
Expand Down Expand Up @@ -1594,6 +1595,21 @@ replication_ddl_client::ddd_diagnose(gpid pid, std::vector<ddd_partition_info> &
return dsn::ERR_OK;
}

void replication_ddl_client::ddd_reserve_replica(const partition_configuration &pc, const std::vector<dsn::host_port> &targets ,
/*out*/ std::map<dsn::host_port, error_with<reset_ddd_partition_response>> &resps, bool disable_reserve){
std::map<dsn::host_port, reset_ddd_partition_rpc> reset_ddd_partition_rpcs;
for (const auto &target : targets) {
dsn::replication::replica_configuration rconfig;
replica_helper::get_replica_config(pc, target, rconfig);
auto request = std::make_unique<reset_ddd_partition_request>();
request->config = rconfig;
request->disable_reserve = disable_reserve;
reset_ddd_partition_rpcs.emplace(target,
reset_ddd_partition_rpc(std::move(request), RPC_DDD_RESET_PARTITION));
}
call_rpcs_sync(reset_ddd_partition_rpcs, resps);
}

void replication_ddl_client::query_disk_info(
const std::vector<dsn::host_port> &targets,
const std::string &app_name,
Expand Down
3 changes: 3 additions & 0 deletions src/client/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <utility>
#include <vector>

#include "common/replication_common.h"
#include "bulk_load_types.h"
#include "dsn.layer2_types.h"
#include "duplication_types.h"
Expand Down Expand Up @@ -215,6 +216,8 @@ class replication_ddl_client

dsn::error_code ddd_diagnose(gpid pid, std::vector<ddd_partition_info> &ddd_partitions);

void ddd_reserve_replica(const partition_configuration &pc, const std::vector<dsn::host_port> &targets ,
/*out*/ std::map<dsn::host_port, error_with<reset_ddd_partition_response>> &resps, bool disable_reserve);
void
query_disk_info(const std::vector<dsn::host_port> &targets,
const std::string &app_name,
Expand Down
1 change: 1 addition & 0 deletions src/common/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_QUERY_DISK_INFO, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_DDD_RESET_PARTITION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_REPLICA_DISK_MIGRATE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_DETECT_HOTKEY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_ADD_NEW_DISK, TASK_PRIORITY_COMMON)
Expand Down
3 changes: 3 additions & 0 deletions src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class query_app_info_request;
class query_app_info_response;
class query_replica_info_request;
class query_replica_info_response;
class reset_ddd_partition_request;
class reset_ddd_partition_response;

typedef std::unordered_map<::dsn::host_port, partition_status::type> node_statuses;

Expand All @@ -53,6 +55,7 @@ typedef rpc_holder<configuration_update_app_env_request, configuration_update_ap
update_app_env_rpc;
typedef rpc_holder<query_app_info_request, query_app_info_response> query_app_info_rpc;
typedef rpc_holder<query_replica_info_request, query_replica_info_response> query_replica_info_rpc;
typedef rpc_holder<reset_ddd_partition_request,reset_ddd_partition_response> reset_ddd_partition_rpc;

class replication_options
{
Expand Down
1 change: 1 addition & 0 deletions src/meta/meta_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef rpc_holder<ddd_diagnose_request, ddd_diagnose_response> ddd_diagnose_rpc
typedef rpc_holder<configuration_query_by_node_request, configuration_query_by_node_response>
configuration_query_by_node_rpc;
typedef rpc_holder<query_cfg_request, query_cfg_response> configuration_query_by_index_rpc;
typedef rpc_holder<query_cfg_by_gpid_request, query_cfg_by_gpid_response> configuration_query_by_gpid_rpc;
typedef rpc_holder<configuration_list_apps_request, configuration_list_apps_response>
configuration_list_apps_rpc;
typedef rpc_holder<configuration_list_nodes_request, configuration_list_nodes_response>
Expand Down
5 changes: 5 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,11 @@ void meta_service::on_query_cluster_info(configuration_cluster_info_rpc rpc)
response.err = dsn::ERR_OK;
}

void meta_service::on_query_configuration_by_gpid(configuration_query_by_gpid_rpc rpc)
{

}

// client => meta server
void meta_service::on_query_configuration_by_index(configuration_query_by_index_rpc rpc)
{
Expand Down
85 changes: 85 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,89 @@ void replica_stub::on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc)
}
}

void replica_stub::on_reset_ddd_replica(dsn::replication::reset_ddd_partition_rpc rpc) {
const reset_ddd_partition_request & request = rpc.request();
reset_ddd_partition_response & response = rpc.response();

if(request.disable_reserve){
response.err = ERR_OK;
response.hint_message = fmt::format("connect replica success");
return ;
}

replica_ptr rep = get_replica(request.config.pid);
if(rep!=nullptr && (rep->status()!= partition_status::PS_INVALID &&
rep->status() != partition_status::PS_INACTIVE &&
rep->status() != partition_status::PS_ERROR)){
ddebug_f("[{}@{}]: received reset partition request, ballot = {}",
request.config.pid,
_primary_address_str,
request.config.ballot);
response.err = ERR_OPERATION_DISABLED;
response.hint_message = fmt::format("Cannot reset partition, because cur state={}", enum_to_string(rep->status()));
return ;
}

ddebug_f("[{}@{}]: received reset partition request",
request.config.pid,
_primary_address_str);

std::vector<std::string> replica_dirs;
std::string pid_str = fmt::format("{}.",request.config.pid);
for (auto &dir : _fs_manager.get_available_data_dirs()) {
std::vector<std::string> tmp_list;
if (!dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false)) {
dassert(false, "Fail to get subdirectories in %s.", dir.c_str());
}
for(auto str: tmp_list){
if(str.find(pid_str)!=std::string::npos){
ddebug_f("find_replica {} reset partition",str);
replica_dirs.emplace_back(str);
}
}
}

if (replica_dirs.empty()){
response.err = ERR_OBJECT_NOT_FOUND;
response.hint_message = fmt::format("Cannot reset partition, because not find gpid({}) dir", request.config.pid);
return ;
}

auto rename_func = [](std::vector<std::string> &replica_dirs,std::string &pid_str){
for(int i=0;i<replica_dirs.size();++i){
std::string replica_name = utils::filesystem::get_file_name(replica_dirs[i]);
std::string replica_dir = utils::filesystem::remove_file_name(replica_dirs[i]);
if (replica_dirs[i].find(kFolderSuffixRet) != std::string::npos){
continue;
}
// reset target_replica_dir: /replica_path/gpid.app_type.{err\bak}.{num}.res
auto target_replica_dir = fmt::format("{}/{}.{}{}", replica_dir,replica_name,i, kFolderSuffixRet);
if (utils::filesystem::directory_exists(target_replica_dir)) {
ddebug_f("disk reset(origin={}) target replica dir({}) has existed, delete it now", replica_dirs[i].c_str(),
target_replica_dir.c_str());
utils::filesystem::remove_path(target_replica_dir);
}
ddebug_f("reset partition, rename dir ({}) to ({})", replica_dirs[i].c_str(),
target_replica_dir.c_str());
utils::filesystem::rename_path(replica_dirs[i],target_replica_dir);
}
};

if(rep!= nullptr){
ddebug_f("start close replica");
rep->disk_migrator()->set_status(disk_migration_status::MOVED);
auto task = begin_close_replica(rep,true);
task->wait();
_fs_manager.remove_replica(request.config.pid);
rename_func(replica_dirs,pid_str);
}else {
rename_func(replica_dirs,pid_str);
}

response.err = ERR_OK;
response.hint_message = fmt::format("Path {}",replica_dirs.back());
}

// ThreadPool: THREAD_POOL_DEFAULT
void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
{
Expand Down Expand Up @@ -2502,6 +2585,8 @@ void replica_stub::open_service()
&replica_stub::on_query_last_checkpoint);
register_rpc_handler_with_rpc_holder(
RPC_QUERY_DISK_INFO, "query_disk_info", &replica_stub::on_query_disk_info);
register_rpc_handler_with_rpc_holder(
RPC_DDD_RESET_PARTITION, "reset_dead_partition", &replica_stub::on_reset_ddd_replica);
register_rpc_handler_with_rpc_holder(
RPC_REPLICA_DISK_MIGRATE, "disk_migrate_replica", &replica_stub::on_disk_migrate);
register_rpc_handler_with_rpc_holder(
Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
gpid &pid,
std::string &hint_message);

void on_reset_ddd_replica(reset_ddd_partition_rpc rpc);

private:
enum replica_node_state
{
Expand Down
2 changes: 2 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ bool recover(command_executor *e, shell_context *sc, arguments args);

bool ddd_diagnose(command_executor *e, shell_context *sc, arguments args);

bool ddd_reserve(command_executor *e, shell_context *sc, arguments args);

// == cold backup (see 'commands/cold_backup.cpp') == //

bool add_backup_policy(command_executor *e, shell_context *sc, arguments args);
Expand Down
Loading

0 comments on commit 3d1eb58

Please sign in to comment.