Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage hot region label for the tikv created by auto-scaler #1801

Merged
merged 17 commits into from
Feb 27, 2020
Merged
13 changes: 7 additions & 6 deletions pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
if targetReplicas == tc.Spec.TiDB.Replicas {
return nil
}
return syncTiDBAfterCalculated(tc, tac, currentReplicas, targetReplicas)
return syncTiDBAfterCalculated(tc, tac, currentReplicas, targetReplicas, sts)
}

// syncTiDBAfterCalculated would check the Consecutive count to avoid jitter, and it would also check the interval
// duration between each auto-scaling. If either of them is not meet, the auto-scaling would be rejected.
// If the auto-scaling is permitted, the timestamp would be recorded and the Consecutive count would be zeroed.
func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32) error {
func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error {
intervalSeconds := tac.Spec.TiDB.ScaleInIntervalSeconds
if recommendedReplicas > currentReplicas {
intervalSeconds = tac.Spec.TiDB.ScaleOutIntervalSeconds
Expand All @@ -64,13 +64,14 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if !ableToScale {
return nil
}
updateTcTiDBAnnIfScale(tac)
tc.Spec.TiDB.Replicas = recommendedReplicas
return nil
return updateTcTiDBIfScale(tc, tac, recommendedReplicas)
}

func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
// Currently we didnt' record the auto-scaling out slot for tidb, because it is pointless for now.
func updateTcTiDBIfScale(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, recommendedReplicas int32) error {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
tc.Spec.TiDB.Replicas = recommendedReplicas
return nil
}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, instances []string) (int32, error) {
Expand Down
28 changes: 22 additions & 6 deletions pkg/autoscaler/autoscaler/tikv_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"time"

"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate"
"github.com/pingcap/tidb-operator/pkg/label"
Expand Down Expand Up @@ -46,15 +47,15 @@ func (am *autoScalerManager) syncTiKV(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
if targetReplicas == tc.Spec.TiKV.Replicas {
return nil
}
return syncTiKVAfterCalculated(tc, tac, currentReplicas, targetReplicas)
return syncTiKVAfterCalculated(tc, tac, currentReplicas, targetReplicas, sts)
}

// syncTiKVAfterCalculated would check the Consecutive count to avoid jitter, and it would also check the interval
// duration between each auto-scaling. If either of them is not meet, the auto-scaling would be rejected.
// If the auto-scaling is permitted, the timestamp would be recorded and the Consecutive count would be zeroed.
// The currentReplicas of TiKV calculated in auto-scaling is the count of the StateUp TiKV instance, so we need to
// add the number of other state tikv instance replicas when we update the TidbCluster.Spec.TiKV.Replicas
func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32) error {
func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error {

intervalSeconds := tac.Spec.TiKV.ScaleInIntervalSeconds
if recommendedReplicas > tc.Spec.TiKV.Replicas {
Expand All @@ -67,9 +68,7 @@ func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if !ableToScale {
return nil
}
updateTcTiKVAnnIfScale(tac)
tc.Spec.TiKV.Replicas = recommendedReplicas
return nil
return updateTcTiKVIfScale(tc, tac, currentReplicas, recommendedReplicas, sts)
}

//TODO: fetch tikv instances info from pdapi in future
Expand All @@ -83,8 +82,25 @@ func filterTiKVInstances(tc *v1alpha1.TidbCluster) []string {
return instances
}

func updateTcTiKVAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
// we record the auto-scaling out slot for tikv, in order to add special hot labels when they are created
func updateTcTiKVIfScale(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, currentReplicas, recommendedReplicas int32, sts *appsv1.StatefulSet) error {
tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
if recommendedReplicas > currentReplicas {
newlyScaleOutOrdinalSets := helper.GetPodOrdinals(recommendedReplicas, sts).Difference(helper.GetPodOrdinals(currentReplicas, sts))
if newlyScaleOutOrdinalSets.Len() > 0 {
if tc.Annotations == nil {
tc.Annotations = map[string]string{}
}
existed := operatorUtils.GetAutoScalingOutSlots(tc, v1alpha1.TiKVMemberType)
v, err := operatorUtils.Encode(newlyScaleOutOrdinalSets.Union(existed).List())
if err != nil {
return err
}
tc.Annotations[label.AnnTiKVAutoScalingOutOrdinals] = v
}
}
tc.Spec.TiKV.Replicas = recommendedReplicas
return nil
}

func calculateTikvMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, instances []string) (int32, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const (
AnnAutoScalingTargetName = "auto-scaling.tidb.pingcap.com/target-name"
// AnnAutoScalingTargetNamespace describes the target TidbCluster Ref Namespace for the TidbCluserAutoScaler
AnnAutoScalingTargetNamespace = "auto-scaling.tidb.pingcap.com/target-namespace"
// AnnTiKVAutoScalingOutOrdinals describe the tikv pods' ordinal list which is created by auto-scaling out
AnnTiKVAutoScalingOutOrdinals = "tikv.tidb.pingcap.com/scale-out-ordinals"
// AnnTiDBAutoScalingOutOrdinals describe the tidb pods' ordinal list which is created by auto-scaling out
AnnTiDBAutoScalingOutOrdinals = "tidb.tidb.pingcap.com/scale-out-ordinals"

// PDLabelVal is PD label value
PDLabelVal string = "pd"
Expand Down
10 changes: 9 additions & 1 deletion pkg/manager/member/pd_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (psd *pdScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, n
} else if scaling < 0 {
return psd.ScaleIn(tc, oldSet, newSet)
}
return nil
return psd.SyncAutoScalerAnn(tc, oldSet)
}

func (psd *pdScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
Expand Down Expand Up @@ -167,6 +167,10 @@ func (psd *pdScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet,
return nil
}

func (psd *pdScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
return nil
}

type fakePDScaler struct{}

// NewFakePDScaler returns a fake Scaler
Expand All @@ -192,3 +196,7 @@ func (fsd *fakePDScaler) ScaleIn(_ *v1alpha1.TidbCluster, oldSet *apps.StatefulS
setReplicasAndDeleteSlots(newSet, *oldSet.Spec.Replicas-1, nil)
return nil
}

func (fsd *fakePDScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
return nil
}
9 changes: 9 additions & 0 deletions pkg/manager/member/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ const (
skipReasonScalerAnnDeferDeletingIsEmpty = "scaler: pvc annotations defer deleting is empty"
)

// TODO: add document to explain the hot region label
var (
hostRegionLabel = map[string]string{
"specialUse": "hotRegion",
Copy link
Contributor

@cofyc cofyc Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

describe the purpose of this special label here? why we need to add this special label on newly created TiKV instances. If PD has public docs about this label, we can link too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I will add TODO to mark there should be some docs about this label after I finish the entire auto-scaling document.

Copy link
Contributor

@cofyc cofyc Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great! comments describing the purpose or reason behind will make it easier to understand the code.

}
)

// Scaler implements the logic for scaling out or scaling in the cluster.
type Scaler interface {
// Scale scales the cluster. It does nothing if scaling is not needed.
Expand All @@ -43,6 +50,8 @@ type Scaler interface {
ScaleOut(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet, desired *apps.StatefulSet) error
// ScaleIn scales in the cluster
ScaleIn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet, desired *apps.StatefulSet) error
// SyncAutoScalerAnn would sync Ann created by AutoScaler
SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error
}

type generalScaler struct {
Expand Down
58 changes: 57 additions & 1 deletion pkg/manager/member/tikv_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"strconv"
"time"

"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/util"
apps "k8s.io/api/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
glog "k8s.io/klog"
Expand All @@ -48,7 +50,8 @@ func (tsd *tikvScaler) Scale(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet,
} else if scaling < 0 {
return tsd.ScaleIn(tc, oldSet, newSet)
}
return nil
// we only sync auto scaler annotations when we are finishing syncing scaling
return tsd.SyncAutoScalerAnn(tc, oldSet)
}

func (tsd *tikvScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
Expand Down Expand Up @@ -190,6 +193,55 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
return fmt.Errorf("TiKV %s/%s not found in cluster", ns, podName)
}

// SyncAutoScalerAnn would reclaim the auto-scaling out slots if the target pod is no longer existed
// For the auto-scaling slots, we would add the special hot region label to the store with pdapi.
func (tsd *tikvScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
currentScalingSlots := util.GetAutoScalingOutSlots(tc, v1alpha1.TiKVMemberType)
if currentScalingSlots.Len() < 1 {
return nil
}
currentOrdinals := helper.GetPodOrdinals(tc.Spec.TiKV.Replicas, actual)

// reclaim the auto-scaling out slots if the target pod is no longer existed
if !currentOrdinals.HasAll(currentScalingSlots.List()...) {
reclaimedSlots := currentScalingSlots.Difference(currentOrdinals)
currentScalingSlots = currentScalingSlots.Delete(reclaimedSlots.List()...)
if currentScalingSlots.Len() < 1 {
delete(tc.Annotations, label.AnnTiKVAutoScalingOutOrdinals)
return nil
}
v, err := util.Encode(currentScalingSlots.List())
if err != nil {
return err
}
tc.Annotations[label.AnnTiKVAutoScalingOutOrdinals] = v
return nil
}

// For the auto-scaling slots, we would add the special hot region label to the store with pdapi.
pdClient := tsd.pdControl.GetPDClient(pdapi.Namespace(tc.Namespace), tc.Name, *tc.Spec.EnableTLSCluster)
for k := range currentScalingSlots {
podName := util.GetPodName(tc, v1alpha1.TiKVMemberType, k)
for _, store := range tc.Status.TiKV.Stores {
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
if store.PodName == podName {
id, err := strconv.ParseUint(store.ID, 10, 64)
if err != nil {
return err
}
ok, err := pdClient.SetStoreLabels(id, hostRegionLabel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to check if the label is already set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

if err != nil {
return err
}
if !ok {
return fmt.Errorf("tc[%s/%s]'s pod[%s] failed to add special hot region label", tc.Namespace, tc.Name, podName)
}
break
}
}
}
return nil
}

type fakeTiKVScaler struct{}

// NewFakeTiKVScaler returns a fake tikv Scaler
Expand All @@ -215,3 +267,7 @@ func (fsd *fakeTiKVScaler) ScaleIn(_ *v1alpha1.TidbCluster, oldSet *apps.Statefu
setReplicasAndDeleteSlots(newSet, *oldSet.Spec.Replicas-1, nil)
return nil
}

func (fsd *fakeTiKVScaler) SyncAutoScalerAnn(tc *v1alpha1.TidbCluster, actual *apps.StatefulSet) error {
return nil
}
11 changes: 2 additions & 9 deletions pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/util"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -71,7 +72,7 @@ func statefulSetIsUpgrading(set *apps.StatefulSet) bool {

// SetStatefulSetLastAppliedConfigAnnotation set last applied config to Statefulset's annotation
func SetStatefulSetLastAppliedConfigAnnotation(set *apps.StatefulSet) error {
setApply, err := encode(set.Spec)
setApply, err := util.Encode(set.Spec)
if err != nil {
return err
}
Expand All @@ -97,14 +98,6 @@ func GetLastAppliedConfig(set *apps.StatefulSet) (*apps.StatefulSetSpec, *corev1
return spec, &spec.Template.Spec, nil
}

func encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", err
}
return string(b), nil
}

// statefulSetEqual compares the new Statefulset's spec with old Statefulset's last applied config
func statefulSetEqual(new apps.StatefulSet, old apps.StatefulSet) bool {
if !apiequality.Semantic.DeepEqual(new.Annotations, old.Annotations) {
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,40 @@ func IsStatefulSetScaling(set *appsv1.StatefulSet) bool {
func GetStatefulSetName(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) string {
return fmt.Sprintf("%s-%s", tc.Name, memberType.String())
}

func GetAutoScalingOutSlots(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) sets.Int32 {
s := sets.Int32{}
l := ""
switch memberType {
case v1alpha1.PDMemberType:
return s
case v1alpha1.TiKVMemberType:
l = label.AnnTiKVAutoScalingOutOrdinals
case v1alpha1.TiDBMemberType:
l = label.AnnTiDBAutoScalingOutOrdinals
default:
return s
}
if tc.Annotations == nil {
return s
}
v, existed := tc.Annotations[l]
if !existed {
return s
}
var slice []int32
err := json.Unmarshal([]byte(v), &slice)
if err != nil {
return s
}
s.Insert(slice...)
return s
}

func Encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", err
}
return string(b), nil
}