Skip to content

Commit

Permalink
Manage hot region label for the tikv created by auto-scaler (pingcap#…
Browse files Browse the repository at this point in the history
…1801)

* Manage hot region label for the tikv created by auto-scaler

Co-authored-by: DanielZhangQD <36026334+DanielZhangQD@users.noreply.github.com>
  • Loading branch information
2 people authored and Song Gao committed Mar 5, 2020
1 parent 0d11094 commit e4aee67
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 23 deletions.
13 changes: 7 additions & 6 deletions pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
if targetReplicas == tc.Spec.TiDB.Replicas {
return nil
}
return syncTiDBAfterCalculated(tc, tac, currentReplicas, targetReplicas)
return syncTiDBAfterCalculated(tc, tac, currentReplicas, targetReplicas, sts)
}

// syncTiDBAfterCalculated would check the Consecutive count to avoid jitter, and it would also check the interval
// duration between each auto-scaling. If either of them is not meet, the auto-scaling would be rejected.
// If the auto-scaling is permitted, the timestamp would be recorded and the Consecutive count would be zeroed.
func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32) error {
func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error {
intervalSeconds := tac.Spec.TiDB.ScaleInIntervalSeconds
if recommendedReplicas > currentReplicas {
intervalSeconds = tac.Spec.TiDB.ScaleOutIntervalSeconds
Expand All @@ -64,13 +64,14 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if !ableToScale {
return nil
}
updateTcTiDBAnnIfScale(tac)
tc.Spec.TiDB.Replicas = recommendedReplicas
return nil
return updateTcTiDBIfScale(tc, tac, recommendedReplicas)
}

func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
// Currently we didnt' record the auto-scaling out slot for tidb, because it is pointless for now.
func updateTcTiDBIfScale(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, recommendedReplicas int32) error {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
tc.Spec.TiDB.Replicas = recommendedReplicas
return nil
}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, instances []string) (int32, error) {
Expand Down
28 changes: 22 additions & 6 deletions pkg/autoscaler/autoscaler/tikv_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"time"

"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate"
"github.com/pingcap/tidb-operator/pkg/label"
Expand Down Expand Up @@ -46,15 +47,15 @@ func (am *autoScalerManager) syncTiKV(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
if targetReplicas == tc.Spec.TiKV.Replicas {
return nil
}
return syncTiKVAfterCalculated(tc, tac, currentReplicas, targetReplicas)
return syncTiKVAfterCalculated(tc, tac, currentReplicas, targetReplicas, sts)
}

// syncTiKVAfterCalculated would check the Consecutive count to avoid jitter, and it would also check the interval
// duration between each auto-scaling. If either of them is not meet, the auto-scaling would be rejected.
// If the auto-scaling is permitted, the timestamp would be recorded and the Consecutive count would be zeroed.
// The currentReplicas of TiKV calculated in auto-scaling is the count of the StateUp TiKV instance, so we need to
// add the number of other state tikv instance replicas when we update the TidbCluster.Spec.TiKV.Replicas
func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32) error {
func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error {

intervalSeconds := tac.Spec.TiKV.ScaleInIntervalSeconds
if recommendedReplicas > tc.Spec.TiKV.Replicas {
Expand All @@ -67,9 +68,7 @@ func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if !ableToScale {
return nil
}
updateTcTiKVAnnIfScale(tac)
tc.Spec.TiKV.Replicas = recommendedReplicas
return nil
return updateTcTiKVIfScale(tc, tac, currentReplicas, recommendedReplicas, sts)
}

//TODO: fetch tikv instances info from pdapi in future
Expand All @@ -83,8 +82,25 @@ func filterTiKVInstances(tc *v1alpha1.TidbCluster) []string {
return instances
}

func updateTcTiKVAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
// we record the auto-scaling out slot for tikv, in order to add special hot labels when they are created
func updateTcTiKVIfScale(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error {
tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
if recommendedReplicas > currentReplicas {
newlyScaleOutOrdinalSets := helper.GetPodOrdinals(recommendedReplicas, sts).Difference(helper.GetPodOrdinals(currentReplicas, sts))
if newlyScaleOutOrdinalSets.Len() > 0 {
if tc.Annotations == nil {
tc.Annotations = map[string]string{}
}
existed := operatorUtils.GetAutoScalingOutSlots(tc, v1alpha1.TiKVMemberType)
v, err := operatorUtils.Encode(newlyScaleOutOrdinalSets.Union(existed).List())
if err != nil {
return err
}
tc.Annotations[label.AnnTiKVAutoScalingOutOrdinals] = v
}
}
tc.Spec.TiKV.Replicas = recommendedReplicas
return nil
}

func calculateTikvMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, instances []string) (int32, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const (
AnnAutoScalingTargetName = "auto-scaling.tidb.pingcap.com/target-name"
// AnnAutoScalingTargetNamespace describes the target TidbCluster Ref Namespace for the TidbCluserAutoScaler
AnnAutoScalingTargetNamespace = "auto-scaling.tidb.pingcap.com/target-namespace"
// AnnTiKVAutoScalingOutOrdinals describe the tikv pods' ordinal list which is created by auto-scaling out
AnnTiKVAutoScalingOutOrdinals = "tikv.tidb.pingcap.com/scale-out-ordinals"
// AnnTiDBAutoScalingOutOrdinals describe the tidb pods' ordinal list which is created by auto-scaling out
AnnTiDBAutoScalingOutOrdinals = "tidb.tidb.pingcap.com/scale-out-ordinals"

// PDLabelVal is PD label value
PDLabelVal string = "pd"
Expand Down
10 changes: 9 additions & 1 deletion pkg/manager/member/pd_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (psd *pdScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, n
} else if scaling < 0 {
return psd.ScaleIn(tc, oldSet, newSet)
}
return nil
return psd.SyncAutoScalerAnn(tc, oldSet)
}

func (psd *pdScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
Expand Down Expand Up @@ -167,6 +167,10 @@ func (psd *pdScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet,
return nil
}

func (psd *pdScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
return nil
}

type fakePDScaler struct{}

// NewFakePDScaler returns a fake Scaler
Expand All @@ -192,3 +196,7 @@ func (fsd *fakePDScaler) ScaleIn(_ *v1alpha1.TidbCluster, oldSet *apps.StatefulS
setReplicasAndDeleteSlots(newSet, *oldSet.Spec.Replicas-1, nil)
return nil
}

func (fsd *fakePDScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
return nil
}
9 changes: 9 additions & 0 deletions pkg/manager/member/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ const (
skipReasonScalerAnnDeferDeletingIsEmpty = "scaler: pvc annotations defer deleting is empty"
)

// TODO: add document to explain the hot region label
var (
hostRegionLabel = map[string]string{
"specialUse": "hotRegion",
}
)

// Scaler implements the logic for scaling out or scaling in the cluster.
type Scaler interface {
// Scale scales the cluster. It does nothing if scaling is not needed.
Expand All @@ -43,6 +50,8 @@ type Scaler interface {
ScaleOut(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet, desired *apps.StatefulSet) error
// ScaleIn scales in the cluster
ScaleIn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet, desired *apps.StatefulSet) error
// SyncAutoScalerAnn would sync Ann created by AutoScaler
SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error
}

type generalScaler struct {
Expand Down
58 changes: 57 additions & 1 deletion pkg/manager/member/tikv_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"strconv"
"time"

"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/util"
apps "k8s.io/api/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
Expand All @@ -48,7 +50,8 @@ func (tsd *tikvScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet,
} else if scaling < 0 {
return tsd.ScaleIn(tc, oldSet, newSet)
}
return nil
// we only sync auto scaler annotations when we are finishing syncing scaling
return tsd.SyncAutoScalerAnn(tc, oldSet)
}

func (tsd *tikvScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
Expand Down Expand Up @@ -190,6 +193,55 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
return fmt.Errorf("TiKV %s/%s not found in cluster", ns, podName)
}

// SyncAutoScalerAnn would reclaim the auto-scaling out slots if the target pod is no longer existed
// For the auto-scaling slots, we would add the special hot region label to the store with pdapi.
func (tsd *tikvScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
currentScalingSlots := util.GetAutoScalingOutSlots(tc, v1alpha1.TiKVMemberType)
if currentScalingSlots.Len() < 1 {
return nil
}
currentOrdinals := helper.GetPodOrdinals(tc.Spec.TiKV.Replicas, actual)

// reclaim the auto-scaling out slots if the target pod is no longer existed
if !currentOrdinals.HasAll(currentScalingSlots.List()...) {
reclaimedSlots := currentScalingSlots.Difference(currentOrdinals)
currentScalingSlots = currentScalingSlots.Delete(reclaimedSlots.List()...)
if currentScalingSlots.Len() < 1 {
delete(tc.Annotations, label.AnnTiKVAutoScalingOutOrdinals)
return nil
}
v, err := util.Encode(currentScalingSlots.List())
if err != nil {
return err
}
tc.Annotations[label.AnnTiKVAutoScalingOutOrdinals] = v
return nil
}

// For the auto-scaling slots, we would add the special hot region label to the store with pdapi.
pdClient := tsd.pdControl.GetPDClient(pdapi.Namespace(tc.Namespace), tc.Name, *tc.Spec.EnableTLSCluster)
for k := range currentScalingSlots {
podName := util.GetPodName(tc, v1alpha1.TiKVMemberType, k)
for _, store := range tc.Status.TiKV.Stores {
if store.PodName == podName {
id, err := strconv.ParseUint(store.ID, 10, 64)
if err != nil {
return err
}
ok, err := pdClient.SetStoreLabels(id, hostRegionLabel)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("tc[%s/%s]'s pod[%s] failed to add special hot region label", tc.Namespace, tc.Name, podName)
}
break
}
}
}
return nil
}

type fakeTiKVScaler struct{}

// NewFakeTiKVScaler returns a fake tikv Scaler
Expand All @@ -215,3 +267,7 @@ func (fsd *fakeTiKVScaler) ScaleIn(_ *v1alpha1.TidbCluster, oldSet *apps.Statefu
setReplicasAndDeleteSlots(newSet, *oldSet.Spec.Replicas-1, nil)
return nil
}

func (fsd *fakeTiKVScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
return nil
}
11 changes: 2 additions & 9 deletions pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/util"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -71,7 +72,7 @@ func statefulSetIsUpgrading(set *apps.StatefulSet) bool {

// SetStatefulSetLastAppliedConfigAnnotation set last applied config to Statefulset's annotation
func SetStatefulSetLastAppliedConfigAnnotation(set *apps.StatefulSet) error {
setApply, err := encode(set.Spec)
setApply, err := util.Encode(set.Spec)
if err != nil {
return err
}
Expand All @@ -97,14 +98,6 @@ func GetLastAppliedConfig(set *apps.StatefulSet) (*apps.StatefulSetSpec, *corev1
return spec, &spec.Template.Spec, nil
}

func encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", err
}
return string(b), nil
}

// statefulSetEqual compares the new Statefulset's spec with old Statefulset's last applied config
func statefulSetEqual(new apps.StatefulSet, old apps.StatefulSet) bool {
if !apiequality.Semantic.DeepEqual(new.Annotations, old.Annotations) {
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,40 @@ func IsStatefulSetScaling(set *appsv1.StatefulSet) bool {
func GetStatefulSetName(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) string {
return fmt.Sprintf("%s-%s", tc.Name, memberType.String())
}

func GetAutoScalingOutSlots(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) sets.Int32 {
s := sets.Int32{}
l := ""
switch memberType {
case v1alpha1.PDMemberType:
return s
case v1alpha1.TiKVMemberType:
l = label.AnnTiKVAutoScalingOutOrdinals
case v1alpha1.TiDBMemberType:
l = label.AnnTiDBAutoScalingOutOrdinals
default:
return s
}
if tc.Annotations == nil {
return s
}
v, existed := tc.Annotations[l]
if !existed {
return s
}
var slice []int32
err := json.Unmarshal([]byte(v), &slice)
if err != nil {
return s
}
s.Insert(slice...)
return s
}

func Encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", err
}
return string(b), nil
}

0 comments on commit e4aee67

Please sign in to comment.