From b62189ad11cbe7ca4e55dc229b165eac4fd730cb Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Mon, 2 Dec 2024 14:58:50 +0800 Subject: [PATCH 1/7] refactor: support close admin-cli client https://github.com/apache/incubator-pegasus/issues/2160 Current admin-cli client does not provide an interface to close replica server TCP connections. And the QueryAllNodesDiskInfo function is difficult to use. SendQueryDiskInfoRequest function is private. --- admin-cli/executor/client.go | 45 +++++++++++++++++++++++++++++++++ admin-cli/executor/disk_info.go | 33 +++++++++++++++++++++--- admin-cli/util/pegasus_node.go | 22 ++++++++++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index b41c7189f0..edaee5504b 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/apache/incubator-pegasus/admin-cli/client" "github.com/apache/incubator-pegasus/admin-cli/util" @@ -45,6 +46,7 @@ type Client struct { } // NewClient creates a client for accessing Pegasus cluster for use of admin-cli. +// This function will call os.Exit. func NewClient(writer io.Writer, metaAddrs []string) *Client { meta := client.NewRPCBasedMeta(metaAddrs) @@ -67,3 +69,46 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client { Perf: aggregate.NewPerfClient(metaAddrs), } } + +// NewClientWithoutExit creates a client for accessing Pegasus cluster for use of admin-cli. +// This function will not call os.Exit. +func NewClientWithoutExit(writer io.Writer, metaAddrs []string) (*Client, error) { + meta := client.NewRPCBasedMeta(metaAddrs) + + nodes, err := meta.ListNodes() + if err != nil { + fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err) + return nil, fmt.Errorf("fatal: failed to list nodes [%s]", err) + } + + var replicaAddrs []string + for _, node := range nodes { + replicaAddrs = append(replicaAddrs, node.Address.GetAddress()) + } + + return &Client{ + Writer: writer, + Meta: meta, + Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs), + Perf: aggregate.NewPerfClient(metaAddrs), + }, nil +} + +func CloseClient(writer io.Writer, client *Client) error { + var errorStrings []string + err := client.Meta.Close() + if err != nil { + fmt.Fprintf(writer, "fatal: failed to close meta session [%s]\n", err) + errorStrings = append(errorStrings, err.Error()) + } + + client.Perf.Close() + + err = client.Nodes.CloseAllNodes() + if err != nil { + fmt.Fprintf(writer, "fatal: failed to close nodes session [%s]\n", err) + errorStrings = append(errorStrings, err.Error()) + } + + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) +} diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go index a7793bb4d1..09c3e08eef 100644 --- a/admin-cli/executor/disk_info.go +++ b/admin-cli/executor/disk_info.go @@ -22,10 +22,12 @@ package executor import ( "context" "fmt" + "strings" "time" "github.com/apache/incubator-pegasus/admin-cli/tabular" "github.com/apache/incubator-pegasus/admin-cli/util" + "github.com/apache/incubator-pegasus/go-client/idl/admin" "github.com/apache/incubator-pegasus/go-client/idl/base" "github.com/apache/incubator-pegasus/go-client/idl/radmin" "github.com/apache/incubator-pegasus/go-client/session" @@ -45,7 +47,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, } func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string, print bool) ([]interface{}, error) { - resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName) + resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName) if err != nil { return nil, err } @@ -60,7 +62,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, ta } } -func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) { +func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -88,7 +90,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin } for _, nodeInfo := range nodeInfos { address := nodeInfo.GetAddress().GetAddress() - resp, err := sendQueryDiskInfoRequest(client, address, tableName) + resp, err := SendQueryDiskInfoRequest(client, address, tableName) if err != nil { return respMap, err } @@ -97,6 +99,31 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin return respMap, nil } +func QueryAliveNodesDiskInfo(client *Client, tableName string) (map[string]*radmin.QueryDiskInfoResponse, error) { + respMap := make(map[string]*radmin.QueryDiskInfoResponse) + nodeInfos, err := client.Meta.ListNodes() + if err != nil { + return respMap, err + } + for _, nodeInfo := range nodeInfos { + if nodeInfo.Status != admin.NodeStatus_NS_ALIVE { + continue + } + address := nodeInfo.GetAddress().GetAddress() + resp, err := SendQueryDiskInfoRequest(client, address, tableName) + if err != nil { + // this replica server haven't the table partition. + if strings.Contains(err.Error(), "ERR_OBJECT_NOT_FOUND") { + continue + } else { + return respMap, err + } + } + respMap[address] = resp + } + return respMap, nil +} + type DiskCapacityStruct struct { Disk string `json:"disk"` Capacity int64 `json:"capacity"` diff --git a/admin-cli/util/pegasus_node.go b/admin-cli/util/pegasus_node.go index 5f7320820c..cd856ce843 100644 --- a/admin-cli/util/pegasus_node.go +++ b/admin-cli/util/pegasus_node.go @@ -84,6 +84,14 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress { return base.NewRPCAddress(n.IP, n.Port) } +func (n *PegasusNode) Close() error { + if n.session != nil { + return n.session.Close() + } else { + return nil + } +} + // NewNodeFromTCPAddr creates a node from tcp address. // NOTE: // - Will not initialize TCP connection unless needed. @@ -211,3 +219,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, ntype session.NodeType) return aggregate.WrapPerf(addr, node.session) } + +func (m *PegasusNodeManager) CloseAllNodes() error { + var errorStrings []string + for _, n := range m.nodes { + err := n.Close() + if err != nil { + errorStrings = append(errorStrings, err.Error()) + } + } + if len(errorStrings) != 0 { + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) + } + return nil +} From caf9a1553c1311c1f4cd42c4a9a113a2241e4a5b Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 3 Dec 2024 10:58:49 +0800 Subject: [PATCH 2/7] follow golang-lint --- admin-cli/util/pegasus_node.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/admin-cli/util/pegasus_node.go b/admin-cli/util/pegasus_node.go index cd856ce843..e0554008f3 100644 --- a/admin-cli/util/pegasus_node.go +++ b/admin-cli/util/pegasus_node.go @@ -87,9 +87,8 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress { func (n *PegasusNode) Close() error { if n.session != nil { return n.session.Close() - } else { - return nil } + return nil } // NewNodeFromTCPAddr creates a node from tcp address. From 764cafb7ac0ead7ec3f24bae52c50b056833fcd1 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Fri, 6 Dec 2024 11:00:10 +0800 Subject: [PATCH 3/7] follow acelyc111 advice --- admin-cli/cmd/init.go | 2 +- admin-cli/executor/client.go | 35 ++++--------------- .../toolkits/tablemigrator/switcher.go | 3 +- 3 files changed, 10 insertions(+), 30 deletions(-) diff --git a/admin-cli/cmd/init.go b/admin-cli/cmd/init.go index f910d67eb7..50fcb28395 100644 --- a/admin-cli/cmd/init.go +++ b/admin-cli/cmd/init.go @@ -34,5 +34,5 @@ var pegasusClient *executor.Client func Init(metaList []string) { globalMetaList = metaList - pegasusClient = executor.NewClient(os.Stdout, globalMetaList) + pegasusClient, _ = executor.NewClient(os.Stdout, globalMetaList, true) } diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index edaee5504b..f94deed0e5 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -46,39 +46,18 @@ type Client struct { } // NewClient creates a client for accessing Pegasus cluster for use of admin-cli. -// This function will call os.Exit. -func NewClient(writer io.Writer, metaAddrs []string) *Client { +// When listing nodes fails, willExit == true means call os.Exit(). +func NewClient(writer io.Writer, metaAddrs []string, willExit bool) (*Client, error) { meta := client.NewRPCBasedMeta(metaAddrs) // TODO(wutao): initialize replica-nodes lazily nodes, err := meta.ListNodes() if err != nil { - fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err) - os.Exit(1) - } - - var replicaAddrs []string - for _, node := range nodes { - replicaAddrs = append(replicaAddrs, node.Address.GetAddress()) - } - - return &Client{ - Writer: writer, - Meta: meta, - Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs), - Perf: aggregate.NewPerfClient(metaAddrs), - } -} - -// NewClientWithoutExit creates a client for accessing Pegasus cluster for use of admin-cli. -// This function will not call os.Exit. -func NewClientWithoutExit(writer io.Writer, metaAddrs []string) (*Client, error) { - meta := client.NewRPCBasedMeta(metaAddrs) - - nodes, err := meta.ListNodes() - if err != nil { - fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err) - return nil, fmt.Errorf("fatal: failed to list nodes [%s]", err) + fmt.Printf("Error: failed to list nodes [%s]\n", err) + if willExit { + os.Exit(1) + } + return nil, fmt.Errorf("failed to list nodes [%s]", err) } var replicaAddrs []string diff --git a/admin-cli/executor/toolkits/tablemigrator/switcher.go b/admin-cli/executor/toolkits/tablemigrator/switcher.go index 76c223a98d..41f2be6d05 100644 --- a/admin-cli/executor/toolkits/tablemigrator/switcher.go +++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go @@ -59,7 +59,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl originMeta := client.Meta targetAddrList := strings.Split(targetAddrs, ",") - targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta + pegasusClient, _ := executor.NewClient(os.Stdout, targetAddrList, true) + targetMeta := pegasusClient.Meta env := map[string]string{ "replica.deny_client_request": "reconfig*all", } From b8a06c3b7318916e17799f8eea15762a6de5ee7b Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 10 Dec 2024 09:43:02 +0800 Subject: [PATCH 4/7] follow acelyc111 advice --- admin-cli/executor/disk_info.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go index 09c3e08eef..83c49511d9 100644 --- a/admin-cli/executor/disk_info.go +++ b/admin-cli/executor/disk_info.go @@ -22,12 +22,10 @@ package executor import ( "context" "fmt" - "strings" "time" "github.com/apache/incubator-pegasus/admin-cli/tabular" "github.com/apache/incubator-pegasus/admin-cli/util" - "github.com/apache/incubator-pegasus/go-client/idl/admin" "github.com/apache/incubator-pegasus/go-client/idl/base" "github.com/apache/incubator-pegasus/go-client/idl/radmin" "github.com/apache/incubator-pegasus/go-client/session" @@ -99,31 +97,6 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin return respMap, nil } -func QueryAliveNodesDiskInfo(client *Client, tableName string) (map[string]*radmin.QueryDiskInfoResponse, error) { - respMap := make(map[string]*radmin.QueryDiskInfoResponse) - nodeInfos, err := client.Meta.ListNodes() - if err != nil { - return respMap, err - } - for _, nodeInfo := range nodeInfos { - if nodeInfo.Status != admin.NodeStatus_NS_ALIVE { - continue - } - address := nodeInfo.GetAddress().GetAddress() - resp, err := SendQueryDiskInfoRequest(client, address, tableName) - if err != nil { - // this replica server haven't the table partition. - if strings.Contains(err.Error(), "ERR_OBJECT_NOT_FOUND") { - continue - } else { - return respMap, err - } - } - respMap[address] = resp - } - return respMap, nil -} - type DiskCapacityStruct struct { Disk string `json:"disk"` Capacity int64 `json:"capacity"` From 52ee381c0543ccd5b4a6c121cbbf6dd0f27aa402 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Fri, 13 Dec 2024 14:29:14 +0800 Subject: [PATCH 5/7] remove use fmt.Fprintf to print error in CloseClient --- admin-cli/executor/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index f94deed0e5..be41f858c1 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -73,11 +73,11 @@ func NewClient(writer io.Writer, metaAddrs []string, willExit bool) (*Client, er }, nil } -func CloseClient(writer io.Writer, client *Client) error { +func CloseClient(client *Client) error { var errorStrings []string err := client.Meta.Close() if err != nil { - fmt.Fprintf(writer, "fatal: failed to close meta session [%s]\n", err) + fmt.Printf("Error: failed to close meta session [%s]\n", err) errorStrings = append(errorStrings, err.Error()) } @@ -85,7 +85,7 @@ func CloseClient(writer io.Writer, client *Client) error { err = client.Nodes.CloseAllNodes() if err != nil { - fmt.Fprintf(writer, "fatal: failed to close nodes session [%s]\n", err) + fmt.Printf("Error: failed to close nodes session [%s]\n", err) errorStrings = append(errorStrings, err.Error()) } From 6af31cc87728d402855c696ff5afc555765e7211 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 31 Dec 2024 14:19:18 +0800 Subject: [PATCH 6/7] follow empiredan advice --- admin-cli/executor/client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index be41f858c1..79cea9e5d6 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -88,6 +88,8 @@ func CloseClient(client *Client) error { fmt.Printf("Error: failed to close nodes session [%s]\n", err) errorStrings = append(errorStrings, err.Error()) } - - return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) + if len(errorStrings) != 0 { + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) + } + return nil } From c40ebcf8b27a00befebf1a065a7ad0c635f3405e Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 31 Dec 2024 14:30:40 +0800 Subject: [PATCH 7/7] follow golang lint --- admin-cli/executor/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index 79cea9e5d6..62998177a2 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -77,7 +77,7 @@ func CloseClient(client *Client) error { var errorStrings []string err := client.Meta.Close() if err != nil { - fmt.Printf("Error: failed to close meta session [%s]\n", err) + fmt.Printf("Error: failed to close meta session [%s].\n", err) errorStrings = append(errorStrings, err.Error()) } @@ -85,7 +85,7 @@ func CloseClient(client *Client) error { err = client.Nodes.CloseAllNodes() if err != nil { - fmt.Printf("Error: failed to close nodes session [%s]\n", err) + fmt.Printf("Error: failed to close nodes session [%s].\n", err) errorStrings = append(errorStrings, err.Error()) } if len(errorStrings) != 0 {