diff --git a/pkg/autoscaler/autoscaler/tidb_autoscaler.go b/pkg/autoscaler/autoscaler/tidb_autoscaler.go index 19179b3b61..ef81ab94e3 100644 --- a/pkg/autoscaler/autoscaler/tidb_autoscaler.go +++ b/pkg/autoscaler/autoscaler/tidb_autoscaler.go @@ -46,13 +46,13 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti if targetReplicas == tc.Spec.TiDB.Replicas { return nil } - return syncTiDBAfterCalculated(tc, tac, currentReplicas, targetReplicas) + return syncTiDBAfterCalculated(tc, tac, currentReplicas, targetReplicas, sts) } // syncTiDBAfterCalculated would check the Consecutive count to avoid jitter, and it would also check the interval // duration between each auto-scaling. If either of them is not meet, the auto-scaling would be rejected. // If the auto-scaling is permitted, the timestamp would be recorded and the Consecutive count would be zeroed. -func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32) error { +func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error { intervalSeconds := tac.Spec.TiDB.ScaleInIntervalSeconds if recommendedReplicas > currentReplicas { intervalSeconds = tac.Spec.TiDB.ScaleOutIntervalSeconds @@ -64,13 +64,14 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster if !ableToScale { return nil } - updateTcTiDBAnnIfScale(tac) - tc.Spec.TiDB.Replicas = recommendedReplicas - return nil + return updateTcTiDBIfScale(tc, tac, recommendedReplicas) } -func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) { +// Currently we didnt' record the auto-scaling out slot for tidb, because it is pointless for now. +func updateTcTiDBIfScale(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, recommendedReplicas int32) error { tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix()) + tc.Spec.TiDB.Replicas = recommendedReplicas + return nil } func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, instances []string) (int32, error) { diff --git a/pkg/autoscaler/autoscaler/tikv_autoscaler.go b/pkg/autoscaler/autoscaler/tikv_autoscaler.go index f20de7de4c..0d5b519b54 100644 --- a/pkg/autoscaler/autoscaler/tikv_autoscaler.go +++ b/pkg/autoscaler/autoscaler/tikv_autoscaler.go @@ -17,6 +17,7 @@ import ( "fmt" "time" + "github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate" "github.com/pingcap/tidb-operator/pkg/label" @@ -46,7 +47,7 @@ func (am *autoScalerManager) syncTiKV(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti if targetReplicas == tc.Spec.TiKV.Replicas { return nil } - return syncTiKVAfterCalculated(tc, tac, currentReplicas, targetReplicas) + return syncTiKVAfterCalculated(tc, tac, currentReplicas, targetReplicas, sts) } // syncTiKVAfterCalculated would check the Consecutive count to avoid jitter, and it would also check the interval @@ -54,7 +55,7 @@ func (am *autoScalerManager) syncTiKV(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti // If the auto-scaling is permitted, the timestamp would be recorded and the Consecutive count would be zeroed. // The currentReplicas of TiKV calculated in auto-scaling is the count of the StateUp TiKV instance, so we need to // add the number of other state tikv instance replicas when we update the TidbCluster.Spec.TiKV.Replicas -func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32) error { +func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error { intervalSeconds := tac.Spec.TiKV.ScaleInIntervalSeconds if recommendedReplicas > tc.Spec.TiKV.Replicas { @@ -67,9 +68,7 @@ func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster if !ableToScale { return nil } - updateTcTiKVAnnIfScale(tac) - tc.Spec.TiKV.Replicas = recommendedReplicas - return nil + return updateTcTiKVIfScale(tc, tac, currentReplicas, recommendedReplicas, sts) } //TODO: fetch tikv instances info from pdapi in future @@ -83,8 +82,25 @@ func filterTiKVInstances(tc *v1alpha1.TidbCluster) []string { return instances } -func updateTcTiKVAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) { +// we record the auto-scaling out slot for tikv, in order to add special hot labels when they are created +func updateTcTiKVIfScale(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error { tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix()) + if recommendedReplicas > currentReplicas { + newlyScaleOutOrdinalSets := helper.GetPodOrdinals(recommendedReplicas, sts).Difference(helper.GetPodOrdinals(currentReplicas, sts)) + if newlyScaleOutOrdinalSets.Len() > 0 { + if tc.Annotations == nil { + tc.Annotations = map[string]string{} + } + existed := operatorUtils.GetAutoScalingOutSlots(tc, v1alpha1.TiKVMemberType) + v, err := operatorUtils.Encode(newlyScaleOutOrdinalSets.Union(existed).List()) + if err != nil { + return err + } + tc.Annotations[label.AnnTiKVAutoScalingOutOrdinals] = v + } + } + tc.Spec.TiKV.Replicas = recommendedReplicas + return nil } func calculateTikvMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, instances []string) (int32, error) { diff --git a/pkg/label/label.go b/pkg/label/label.go index c846952314..b67fa2339f 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -114,6 +114,10 @@ const ( AnnAutoScalingTargetName = "auto-scaling.tidb.pingcap.com/target-name" // AnnAutoScalingTargetNamespace describes the target TidbCluster Ref Namespace for the TidbCluserAutoScaler AnnAutoScalingTargetNamespace = "auto-scaling.tidb.pingcap.com/target-namespace" + // AnnTiKVAutoScalingOutOrdinals describe the tikv pods' ordinal list which is created by auto-scaling out + AnnTiKVAutoScalingOutOrdinals = "tikv.tidb.pingcap.com/scale-out-ordinals" + // AnnTiDBAutoScalingOutOrdinals describe the tidb pods' ordinal list which is created by auto-scaling out + AnnTiDBAutoScalingOutOrdinals = "tidb.tidb.pingcap.com/scale-out-ordinals" // PDLabelVal is PD label value PDLabelVal string = "pd" diff --git a/pkg/manager/member/pd_scaler.go b/pkg/manager/member/pd_scaler.go index a84acb8a25..47c11f4f74 100644 --- a/pkg/manager/member/pd_scaler.go +++ b/pkg/manager/member/pd_scaler.go @@ -47,7 +47,7 @@ func (psd *pdScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, n } else if scaling < 0 { return psd.ScaleIn(tc, oldSet, newSet) } - return nil + return psd.SyncAutoScalerAnn(tc, oldSet) } func (psd *pdScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error { @@ -167,6 +167,10 @@ func (psd *pdScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, return nil } +func (psd *pdScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error { + return nil +} + type fakePDScaler struct{} // NewFakePDScaler returns a fake Scaler @@ -192,3 +196,7 @@ func (fsd *fakePDScaler) ScaleIn(_ *v1alpha1.TidbCluster, oldSet *apps.StatefulS setReplicasAndDeleteSlots(newSet, *oldSet.Spec.Replicas-1, nil) return nil } + +func (fsd *fakePDScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error { + return nil +} diff --git a/pkg/manager/member/scaler.go b/pkg/manager/member/scaler.go index 47a5f0dc0b..428aa2d6a6 100644 --- a/pkg/manager/member/scaler.go +++ b/pkg/manager/member/scaler.go @@ -35,6 +35,13 @@ const ( skipReasonScalerAnnDeferDeletingIsEmpty = "scaler: pvc annotations defer deleting is empty" ) +// TODO: add document to explain the hot region label +var ( + hostRegionLabel = map[string]string{ + "specialUse": "hotRegion", + } +) + // Scaler implements the logic for scaling out or scaling in the cluster. type Scaler interface { // Scale scales the cluster. It does nothing if scaling is not needed. @@ -43,6 +50,8 @@ type Scaler interface { ScaleOut(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet, desired *apps.StatefulSet) error // ScaleIn scales in the cluster ScaleIn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet, desired *apps.StatefulSet) error + // SyncAutoScalerAnn would sync Ann created by AutoScaler + SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error } type generalScaler struct { diff --git a/pkg/manager/member/tikv_scaler.go b/pkg/manager/member/tikv_scaler.go index d59a1a2495..bdcf06da05 100644 --- a/pkg/manager/member/tikv_scaler.go +++ b/pkg/manager/member/tikv_scaler.go @@ -18,10 +18,12 @@ import ( "strconv" "time" + "github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/pdapi" + "github.com/pingcap/tidb-operator/pkg/util" apps "k8s.io/api/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog" @@ -48,7 +50,8 @@ func (tsd *tikvScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, } else if scaling < 0 { return tsd.ScaleIn(tc, oldSet, newSet) } - return nil + // we only sync auto scaler annotations when we are finishing syncing scaling + return tsd.SyncAutoScalerAnn(tc, oldSet) } func (tsd *tikvScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error { @@ -190,6 +193,55 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe return fmt.Errorf("TiKV %s/%s not found in cluster", ns, podName) } +// SyncAutoScalerAnn would reclaim the auto-scaling out slots if the target pod is no longer existed +// For the auto-scaling slots, we would add the special hot region label to the store with pdapi. +func (tsd *tikvScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error { + currentScalingSlots := util.GetAutoScalingOutSlots(tc, v1alpha1.TiKVMemberType) + if currentScalingSlots.Len() < 1 { + return nil + } + currentOrdinals := helper.GetPodOrdinals(tc.Spec.TiKV.Replicas, actual) + + // reclaim the auto-scaling out slots if the target pod is no longer existed + if !currentOrdinals.HasAll(currentScalingSlots.List()...) { + reclaimedSlots := currentScalingSlots.Difference(currentOrdinals) + currentScalingSlots = currentScalingSlots.Delete(reclaimedSlots.List()...) + if currentScalingSlots.Len() < 1 { + delete(tc.Annotations, label.AnnTiKVAutoScalingOutOrdinals) + return nil + } + v, err := util.Encode(currentScalingSlots.List()) + if err != nil { + return err + } + tc.Annotations[label.AnnTiKVAutoScalingOutOrdinals] = v + return nil + } + + // For the auto-scaling slots, we would add the special hot region label to the store with pdapi. + pdClient := tsd.pdControl.GetPDClient(pdapi.Namespace(tc.Namespace), tc.Name, *tc.Spec.EnableTLSCluster) + for k := range currentScalingSlots { + podName := util.GetPodName(tc, v1alpha1.TiKVMemberType, k) + for _, store := range tc.Status.TiKV.Stores { + if store.PodName == podName { + id, err := strconv.ParseUint(store.ID, 10, 64) + if err != nil { + return err + } + ok, err := pdClient.SetStoreLabels(id, hostRegionLabel) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("tc[%s/%s]'s pod[%s] failed to add special hot region label", tc.Namespace, tc.Name, podName) + } + break + } + } + } + return nil +} + type fakeTiKVScaler struct{} // NewFakeTiKVScaler returns a fake tikv Scaler @@ -215,3 +267,7 @@ func (fsd *fakeTiKVScaler) ScaleIn(_ *v1alpha1.TidbCluster, oldSet *apps.Statefu setReplicasAndDeleteSlots(newSet, *oldSet.Spec.Replicas-1, nil) return nil } + +func (fsd *fakeTiKVScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error { + return nil +} diff --git a/pkg/manager/member/utils.go b/pkg/manager/member/utils.go index 97492ecc07..a72f32e078 100644 --- a/pkg/manager/member/utils.go +++ b/pkg/manager/member/utils.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/label" + "github.com/pingcap/tidb-operator/pkg/util" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -71,7 +72,7 @@ func statefulSetIsUpgrading(set *apps.StatefulSet) bool { // SetStatefulSetLastAppliedConfigAnnotation set last applied config to Statefulset's annotation func SetStatefulSetLastAppliedConfigAnnotation(set *apps.StatefulSet) error { - setApply, err := encode(set.Spec) + setApply, err := util.Encode(set.Spec) if err != nil { return err } @@ -97,14 +98,6 @@ func GetLastAppliedConfig(set *apps.StatefulSet) (*apps.StatefulSetSpec, *corev1 return spec, &spec.Template.Spec, nil } -func encode(obj interface{}) (string, error) { - b, err := json.Marshal(obj) - if err != nil { - return "", err - } - return string(b), nil -} - // statefulSetEqual compares the new Statefulset's spec with old Statefulset's last applied config func statefulSetEqual(new apps.StatefulSet, old apps.StatefulSet) bool { if !apiequality.Semantic.DeepEqual(new.Annotations, old.Annotations) { diff --git a/pkg/util/util.go b/pkg/util/util.go index 87ffd8450b..6dd83b57cb 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -126,3 +126,40 @@ func IsStatefulSetScaling(set *appsv1.StatefulSet) bool { func GetStatefulSetName(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) string { return fmt.Sprintf("%s-%s", tc.Name, memberType.String()) } + +func GetAutoScalingOutSlots(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) sets.Int32 { + s := sets.Int32{} + l := "" + switch memberType { + case v1alpha1.PDMemberType: + return s + case v1alpha1.TiKVMemberType: + l = label.AnnTiKVAutoScalingOutOrdinals + case v1alpha1.TiDBMemberType: + l = label.AnnTiDBAutoScalingOutOrdinals + default: + return s + } + if tc.Annotations == nil { + return s + } + v, existed := tc.Annotations[l] + if !existed { + return s + } + var slice []int32 + err := json.Unmarshal([]byte(v), &slice) + if err != nil { + return s + } + s.Insert(slice...) + return s +} + +func Encode(obj interface{}) (string, error) { + b, err := json.Marshal(obj) + if err != nil { + return "", err + } + return string(b), nil +}