Skip to content

Commit

Permalink
support api
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <ihusharp@gmail.com>
  • Loading branch information
HuSharp committed Jul 12, 2024
1 parent 4d63f95 commit 33292f0
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 26 deletions.
15 changes: 9 additions & 6 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,29 @@ func GetPDClient(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster) p
}

// GetPDMSClient tries to return an available PDMSClient
func GetPDMSClient(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster, serviceName string) error {
func GetPDMSClient(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster, serviceName string) pdapi.PDMSClient {
pdMSClient := getPDMSClientFromService(pdControl, tc, serviceName)

err := pdMSClient.GetHealth()
if err == nil {
return nil
return pdMSClient
}

for _, service := range tc.Status.PDMS {
if service.Name != serviceName {
continue
}
for _, pdMember := range service.Members {
pdPeerClient := pdControl.GetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), serviceName,
pdMSPeerClient := pdControl.GetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), serviceName,
tc.IsTLSClusterEnabled(), pdapi.SpecifyClient(pdMember, pdMember))
err = pdPeerClient.GetHealth()
err = pdMSPeerClient.GetHealth()
if err == nil {
return nil
return pdMSPeerClient
}
}
}

return err
return nil
}

// NewFakePDClient creates a fake pdclient that is set as the pd client
Expand Down
114 changes: 97 additions & 17 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS status is nil, can not to be upgraded", ns, tcName)
}

componentName := controller.PDMSTrimName(newSet.Name)
klog.Infof("gracefulUpgrade pdMS trim name, componentName: %s", componentName)
if tc.Status.PDMS[componentName] == nil {
tc.Status.PDMS[componentName] = &v1alpha1.PDMSStatus{Name: componentName}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, componentName)
curService := controller.PDMSTrimName(newSet.Name)
klog.Infof("gracefulUpgrade pdMS trim name, componentName: %s", curService)
if tc.Status.PDMS[curService] == nil {
tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService)
}
if !tc.Status.PDMS[componentName].Synced {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS status sync failed, can not to be upgraded, component: %s", ns, tcName, componentName)
if !tc.Status.PDMS[curService].Synced {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS status sync failed, can not to be upgraded, component: %s", ns, tcName, curService)
}
oldTrimName := controller.PDMSTrimName(oldSet.Name)
if oldTrimName != componentName {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, componentName)
if oldTrimName != curService {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService)
}
klog.Infof("gracefulUpgrade pdMS trim name, oldTrimName: %s", oldTrimName)
if tc.PDMSScaling(oldTrimName) {
klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
ns, tcName, tc.Status.PDMS[componentName].Phase)
ns, tcName, tc.Status.PDMS[curService].Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
Expand All @@ -73,7 +73,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
return nil
}

tc.Status.PDMS[componentName].Phase = v1alpha1.UpgradePhase
tc.Status.PDMS[curService].Phase = v1alpha1.UpgradePhase
if !templateEqual(newSet, oldSet) {
return nil
}
Expand All @@ -84,12 +84,20 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
// If we encounter this situation, we will let the native statefulset controller do the upgrade completely, which may be unsafe for upgrading pdMS.
// Therefore, in the production environment, we should try to avoid modifying the pd statefulset update strategy directly.
newSet.Spec.UpdateStrategy = oldSet.Spec.UpdateStrategy
klog.Warningf("tidbcluster: [%s/%s] pdMS statefulset %s UpdateStrategy has been modified manually, componentName: %s", ns, tcName, oldSet.GetName(), componentName)
klog.Warningf("tidbcluster: [%s/%s] pdMS statefulset %s UpdateStrategy has been modified manually, componentName: %s", ns, tcName, oldSet.GetName(), curService)
return nil
}

mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()

pdClient := controller.GetPDClient(u.deps.PDControl, tc)
// pdMS member
primary, err := pdClient.GetMSPrimary(curService)
if err != nil {
return err
}

for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
i := podOrdinals[_i]
podName := PDMSPodName(tcName, i, oldTrimName)
Expand All @@ -103,30 +111,102 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pdMS pod: [%s] has no label: %s", ns, tcName, podName, apps.ControllerRevisionHashLabelKey)
}

if revision == tc.Status.PDMS[componentName].StatefulSet.UpdateRevision {
if revision == tc.Status.PDMS[curService].StatefulSet.UpdateRevision {
if !k8s.IsPodReady(pod) {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded pdMS pod: [%s] is not ready", ns, tcName, podName)
}

var exist bool
for _, member := range tc.Status.PDMS[componentName].Members {
for _, member := range tc.Status.PDMS[curService].Members {
if strings.Contains(member, podName) {
exist = true
}
}
if !exist {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pdMS upgraded pod: [%s] is not exist, all members: %v",
ns, tcName, podName, tc.Status.PDMS[componentName].Members)
ns, tcName, podName, tc.Status.PDMS[curService].Members)
}
continue
}
mngerutils.SetUpgradePartition(newSet, i)
return nil

return u.upgradePDMSPod(tc, i, newSet, primary, curService)
}

return nil
}

func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet, primary, curService string) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
upgradePdName := PDMSName(tcName, ordinal, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s, curService)
upgradePodName := PDMSPodName(tcName, ordinal, curService)

klog.Infof("[TODO] pdms upgrader: check primary: %s, upgradePdName: %s, upgradePodName: %s", primary, upgradePdName, upgradePodName)

// If current pdms is primary, transfer primary to other pdms pod
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePdName) {
targetName := ""

if tc.PDStsActualReplicas() > 1 {
targetName = choosePDMSToTransferFromMembers(tc, newSet, ordinal)
}

if targetName != "" {
klog.Infof("[TODO] pdms upgrader: transfer pdms primary to: %s", targetName)
err := u.transferPDMSLeaderTo(tc, targetName, curService)
if err != nil {
klog.Errorf("pdms upgrader: failed to transfer pdms primary to: %s, %v", targetName, err)
return err
}
klog.Infof("pdms upgrader: transfer pdms primary to: %s successfully", targetName)
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName)
} else {
klog.Warningf("pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd")
}
}

mngerutils.SetUpgradePartition(newSet, ordinal)
return nil
}

func (u *pdMSUpgrader) transferPDMSLeaderTo(tc *v1alpha1.TidbCluster, targetName, curService string) error {
return controller.GetPDClient(u.deps.PDControl, tc).TransferPrimary(curService, targetName)
}

// choosePDMSToTransferFromMembers choose a pdms to transfer primary from members
//
// Assume that current primary ordinal is x, and range is [0, n]
// 1. Find the max suitable ordinal in (x, n], because they have been upgraded
// 2. If no suitable ordinal, find the min suitable ordinal in [0, x) to reduce the count of transfer
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
tcName := tc.GetName()
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)

// set ordinal to max ordinal if ordinal isn't exist
if !ordinals.Has(ordinal) {
ordinal = helper.GetMaxPodOrdinal(*newSet.Spec.Replicas, newSet)
}

targetName := ""
list := ordinals.List()

// find the max ordinal which is larger than ordinal
for i := len(list) - 1; i >= 0 && list[i] > ordinal; i-- {
targetName = PDMSPodName(tcName, list[i], controller.PDMSTrimName(newSet.Name))
break
}

if targetName == "" {
// find the min ordinal which is less than ordinal
for i := 0; i < len(list) && list[i] < ordinal; i++ {
targetName = PDMSPodName(tcName, list[i], controller.PDMSTrimName(newSet.Name))
}
}

klog.Infof("pd ms upgrader: choose pd ms to transfer leader from members, targetName: %s", targetName)
return targetName
}

type fakePDMSUpgrader struct{}

// NewFakePDMSUpgrader returns a fakePDUpgrader
Expand Down
10 changes: 10 additions & 0 deletions pkg/manager/member/startscript/render_start_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package startscript
import (
"errors"

"github.com/Masterminds/semver"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
v1 "github.com/pingcap/tidb-operator/pkg/manager/member/startscript/v1"
v2 "github.com/pingcap/tidb-operator/pkg/manager/member/startscript/v2"
Expand Down Expand Up @@ -116,3 +117,12 @@ func RenderTiProxyStartScript(tc *v1alpha1.TidbCluster) (string, error) {
return "", ErrVersionNotFound
}
}

// PDMSSupportMicroServicesWithName returns true if the given version of PDMS supports microservices with name.
func PDMSSupportMicroServicesWithName(version string) (bool, error) {
v, err := semver.NewVersion(version)
if err != nil {
return true, err
}
return v.Major() >= 8 && v.Minor() >= 3 && v.Patch() >= 0, nil
}
11 changes: 11 additions & 0 deletions pkg/manager/member/startscript/v2/pd_start_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v2

import (
"fmt"
"github.com/pingcap/tidb-operator/pkg/manager/member/startscript"
"path/filepath"
"slices"
"strings"
Expand Down Expand Up @@ -45,6 +46,7 @@ type PDMSStartScriptModel struct {
PDStartTimeout int
PDAddresses string

PDMSName string
PDMSDomain string
ListenAddr string
AdvertiseListenAddr string
Expand Down Expand Up @@ -132,6 +134,13 @@ func renderPDMSStartScript(tc *v1alpha1.TidbCluster, name string) (string, error
m.PDMSDomain = m.PDMSDomain + "." + tc.Spec.ClusterDomain
}

if ok, err := startscript.PDMSSupportMicroServicesWithName(tc.PDMSVersion(name)); ok && err == nil {
m.PDMSName = "${PDMS_POD_NAME}"
if tc.Spec.ClusterDomain != "" {
m.PDMSName = m.PDMSDomain
}
}

m.PDStartTimeout = tc.PDStartTimeout()

preferPDAddressesOverDiscovery := slices.Contains(
Expand Down Expand Up @@ -283,6 +292,8 @@ ARGS="` + pdEnableMicroService + `--listen-addr={{ .ListenAddr }} \
--config=/etc/pd/pd.toml \
"
{{- if .PDMSName}} ARGS="${ARGS} --name={{.Name}} \" {{- end }}
echo "starting pd-server ..."
sleep $((RANDOM % 10))
echo "/pd-server ${ARGS}"
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func (m *tikvMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
// Check if all PD Micro Services are available
if tc.Spec.PDMS != nil && (tc.Spec.PD != nil && tc.Spec.PD.Mode == "ms") {
for _, pdms := range tc.Spec.PDMS {
if err = controller.GetPDMSClient(m.deps.PDControl, tc, pdms.Name); err != nil {
if cli := controller.GetPDMSClient(m.deps.PDControl, tc, pdms.Name); cli == nil {
return controller.RequeueErrorf("PDMS component %s for TidbCluster: [%s/%s], "+
"waiting for PD micro service cluster running, error: %v", pdms.Name, ns, tcName, err)
"waiting for PD micro service cluster running", pdms.Name, ns, tcName)
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ func PdName(tcName string, ordinal int32, namespace string, clusterDomain string
return PdPodName(tcName, ordinal)
}

// PDMSName should match the start arg `--name` of pd-server
// See the start script of PDMS in pkg/manager/member/startscript/v2.renderPDMSStartScript
func PDMSName(tcName string, ordinal int32, namespace, clusterDomain string, acrossK8s bool, component string) string {
if len(clusterDomain) > 0 {
return fmt.Sprintf("%s.%s-%s-peer.%s.svc.%s", PdPodName(tcName, ordinal), tcName, component, namespace, clusterDomain)
}

// clusterDomain is not set
if acrossK8s {
return fmt.Sprintf("%s.%s-%s-peer.%s.svc", PdPodName(tcName, ordinal), component, tcName, namespace)
}

return PDMSPodName(tcName, ordinal, component)
}

// NeedForceUpgrade check if force upgrade is necessary
func NeedForceUpgrade(ann map[string]string) bool {
// Check if annotation 'pingcap.com/force-upgrade: "true"' is set
Expand Down
20 changes: 20 additions & 0 deletions pkg/pdapi/fake_pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
GetClusterActionType ActionType = "GetCluster"
GetMembersActionType ActionType = "GetMembers"
GetPDMSMembersActionType ActionType = "GetPDMSMembers"
GetPDMSPrimaryActionType ActionType = "GetPDMSPrimary"
TransferPrimaryActionType ActionType = "TransferPrimary"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
GetStoreActionType ActionType = "GetStore"
Expand Down Expand Up @@ -78,6 +80,24 @@ func (c *FakePDClient) GetMSMembers(_ string) ([]string, error) {
return result.([]string), nil
}

func (c *FakePDClient) GetMSPrimary(_ string) (string, error) {
action := &Action{}
result, err := c.fakeAPI(GetPDMSPrimaryActionType, action)
if err != nil {
return "", err
}
return result.(string), nil
}

func (c *FakePDClient) TransferPrimary(_, _ string) error {
action := &Action{}
_, err := c.fakeAPI(TransferPrimaryActionType, action)
if err != nil {
return err
}
return nil
}

func NewFakePDClient() *FakePDClient {
return &FakePDClient{reactions: map[ActionType]Reaction{}}
}
Expand Down
36 changes: 35 additions & 1 deletion pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ type PDClient interface {
GetAutoscalingPlans(strategy Strategy) ([]Plan, error)
// GetRecoveringMark return the pd recovering mark
GetRecoveringMark() (bool, error)
// GetMSMembers returns all PD members service-addr from cluster by specific Micro Service
// GetMSMembers returns all PDMS members service-addr from cluster by specific Micro Service
GetMSMembers(service string) ([]string, error)
// GetMSPrimary returns the primary PDMS member service-addr from cluster by specific Micro Service
GetMSPrimary(service string) (string, error)
// TransferPrimary transfers the primary PDMS member service-addr from cluster by specific Micro Service
TransferPrimary(service, newPrimary string) error
}

var (
Expand Down Expand Up @@ -341,6 +345,36 @@ func (c *pdClient) GetMSMembers(service string) ([]string, error) {
return addrs, nil
}

func (c *pdClient) GetMSPrimary(service string) (string, error) {
apiURL := fmt.Sprintf("%s/%s/primary/%s", c.url, MicroServicePrefix, service)
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
if err != nil {
return "", err
}
var primary string
err = json.Unmarshal(body, &primary)
if err != nil {
return "", err
}

return primary, nil
}

func (c *pdClient) TransferPrimary(service, newPrimary string) error {
apiURL := fmt.Sprintf("%s/%s/primary/transfer/%s", c.url, MicroServicePrefix, service)
data, err := json.Marshal(struct {
NewPrimary string `json:"new_primary"`
}{
NewPrimary: newPrimary,
})
_, err = httputil.PostBodyOK(c.httpClient, apiURL, bytes.NewBuffer(data))
if err != nil {
return err
}

return nil
}

func (c *pdClient) getStores(apiURL string) (*StoresInfo, error) {
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
if err != nil {
Expand Down
Loading

0 comments on commit 33292f0

Please sign in to comment.