Skip to content

Commit

Permalink
Finish auto-scaler controller (pingcap#1731)
Browse files Browse the repository at this point in the history
* finish auto-scaler controller

* revise compare tc
  • Loading branch information
Yisaer authored and Song Gao committed Feb 20, 2020
1 parent c116b95 commit 9f4812d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 36 deletions.
2 changes: 1 addition & 1 deletion cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func main() {
tidbMonitorController := tidbmonitor.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory)
var autoScalerController *autoscaler.Controller
if features.DefaultFeatureGate.Enabled(features.AutoScaling) {
autoScalerController = autoscaler.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory)
autoScalerController = autoscaler.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
}
// Start informer factories after all controller are initialized.
informerFactory.Start(ctx.Done())
Expand Down
83 changes: 66 additions & 17 deletions pkg/autoscaler/autoscaler/autoscaler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,41 @@ import (
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
v1alpha1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
promClient "github.com/prometheus/client_golang/api"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
)

type autoScalerManager struct {
tcLister v1alpha1listers.TidbClusterLister
cli versioned.Interface
tcControl controller.TidbClusterControlInterface
taLister v1alpha1listers.TidbClusterAutoScalerLister
stsLister appslisters.StatefulSetLister
recorder record.EventRecorder
}

func NewAutoScalerManager(
cli versioned.Interface,
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory,
recorder record.EventRecorder) *autoScalerManager {
tcLister := informerFactory.Pingcap().V1alpha1().TidbClusters().Lister()
stsLister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()
return &autoScalerManager{
tcLister: informerFactory.Pingcap().V1alpha1().TidbClusters().Lister(),
stsLister: kubeInformerFactory.Apps().V1().StatefulSets().Lister(),
cli: cli,
tcControl: controller.NewRealTidbClusterControl(cli, tcLister, recorder),
taLister: informerFactory.Pingcap().V1alpha1().TidbClusterAutoScalers().Lister(),
stsLister: stsLister,
recorder: recorder,
}
}
Expand All @@ -55,52 +67,89 @@ func (am *autoScalerManager) Sync(tac *v1alpha1.TidbClusterAutoScaler) error {
tac.Spec.Cluster.Namespace = tac.Namespace
}

tcNamespace := tac.Spec.Cluster.Namespace
tc, err := am.tcLister.TidbClusters(tcNamespace).Get(tcName)
tc, err := am.cli.PingcapV1alpha1().TidbClusters(tac.Spec.Cluster.Namespace).Get(tcName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// Target TidbCluster Ref is deleted, empty the auto-scaling status
resetAutoScalingAnn(tac)
return nil
}
return err
}
checkAndUpdateTacAnn(tac)
oldTCSpec := tc.Spec.DeepCopy()
oldTc := tc.DeepCopy()
if err := am.syncAutoScaling(tc, tac); err != nil {
return err
}
if err := am.syncTidbClusterReplicas(tc, oldTCSpec); err != nil {
if err := am.syncTidbClusterReplicas(tc, oldTc); err != nil {
return err
}
return am.syncAutoScalingStatus(tc, oldTCSpec, tac)
return am.syncAutoScalingStatus(tc, oldTc, tac)
}

func (am *autoScalerManager) syncAutoScaling(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler) error {
if tac.Spec.MetricsUrl == nil {
return fmt.Errorf("tidbclusterAutoScaler[%s/%s]' metrics url should be defined explicitly", tac.Namespace, tac.Name)
}
client, err := promClient.NewClient(promClient.Config{Address: *tac.Spec.MetricsUrl})
c, err := promClient.NewClient(promClient.Config{Address: *tac.Spec.MetricsUrl})
if err != nil {
return err
}
defaultTAC(tac)
if err := am.syncTiKV(tc, tac, client); err != nil {
return err
oldTikvReplicas := tc.Spec.TiKV.Replicas
if err := am.syncTiKV(tc, tac, c); err != nil {
tc.Spec.TiKV.Replicas = oldTikvReplicas
klog.Errorf("tac[%s/%s] tikv sync failed, continue to sync next, err:%v", tac.Namespace, tac.Name, err)
}
if err := am.syncTiDB(tc, tac, client); err != nil {
return err
oldTidbReplicas := tc.Spec.TiDB.Replicas
if err := am.syncTiDB(tc, tac, c); err != nil {
tc.Spec.TiDB.Replicas = oldTidbReplicas
klog.Errorf("tac[%s/%s] tidb sync failed, continue to sync next, err:%v", tac.Namespace, tac.Name, err)
}
klog.Infof("tc[%s/%s]'s tac[%s/%s] synced", tc.Namespace, tc.Name, tac.Namespace, tac.Name)
return nil
}

//TODO: sync TidbCluster.Spec.Replicas
func (am *autoScalerManager) syncTidbClusterReplicas(tc *v1alpha1.TidbCluster, oldTCSpec *v1alpha1.TidbClusterSpec) error {
func (am *autoScalerManager) syncTidbClusterReplicas(tc *v1alpha1.TidbCluster, oldTc *v1alpha1.TidbCluster) error {
if tc.Spec.TiDB.Replicas == oldTc.Spec.TiDB.Replicas && tc.Spec.TiKV.Replicas == oldTc.Spec.TiKV.Replicas {
return nil
}
newTc := tc.DeepCopy()
_, err := am.tcControl.UpdateTidbCluster(newTc, &newTc.Status, &oldTc.Status)
if err != nil {
return err
}
return nil
}

//TODO: sync tac status
func (am *autoScalerManager) syncAutoScalingStatus(tc *v1alpha1.TidbCluster, oldTc *v1alpha1.TidbClusterSpec,
func (am *autoScalerManager) syncAutoScalingStatus(tc *v1alpha1.TidbCluster, oldTc *v1alpha1.TidbCluster,
tac *v1alpha1.TidbClusterAutoScaler) error {
return nil
return am.updateTidbClusterAutoScaler(tac)
}

func (am *autoScalerManager) updateTidbClusterAutoScaler(tac *v1alpha1.TidbClusterAutoScaler) error {

ns := tac.GetNamespace()
tacName := tac.GetName()
oldTac := tac.DeepCopy()

// don't wait due to limited number of clients, but backoff after the default number of steps
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var updateErr error
_, updateErr = am.cli.PingcapV1alpha1().TidbClusterAutoScalers(ns).Update(tac)
if updateErr == nil {
klog.Infof("TidbClusterAutoScaler: [%s/%s] updated successfully", ns, tacName)
return nil
}
klog.Errorf("failed to update TidbClusterAutoScaler: [%s/%s], error: %v", ns, tacName, updateErr)
if updated, err := am.taLister.TidbClusterAutoScalers(ns).Get(tacName); err == nil {
// make a copy so we don't mutate the shared cache
tac = updated.DeepCopy()
tac.Annotations = oldTac.Annotations
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated TidbClusterAutoScaler %s/%s from lister: %v", ns, tacName, err))
}
return updateErr
})
}
4 changes: 2 additions & 2 deletions pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if recommendedReplicas > currentReplicas {
intervalSeconds = tac.Spec.TiDB.ScaleOutIntervalSeconds
}
ableToScale, err := checkStsAutoScalingInterval(tc, *intervalSeconds, v1alpha1.TiDBMemberType)
ableToScale, err := checkStsAutoScalingInterval(tac, *intervalSeconds, v1alpha1.TiDBMemberType)
if err != nil {
return err
}
Expand All @@ -70,7 +70,7 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
}

func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = time.Now().String()
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoscaler/autoscaler/tikv_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if recommendedReplicas > tc.Spec.TiKV.Replicas {
intervalSeconds = tac.Spec.TiKV.ScaleOutIntervalSeconds
}
ableToScale, err := checkStsAutoScalingInterval(tc, *intervalSeconds, v1alpha1.TiKVMemberType)
ableToScale, err := checkStsAutoScalingInterval(tac, *intervalSeconds, v1alpha1.TiKVMemberType)
if err != nil {
return err
}
Expand All @@ -84,7 +84,7 @@ func filterTiKVInstances(tc *v1alpha1.TidbCluster) []string {
}

func updateTcTiKVAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = time.Now().String()
tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
}

func calculateTikvMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaler/autoscaler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func checkStsAutoScalingPrerequisites(set *appsv1.StatefulSet) bool {
}

// checkStsAutoScalingInterval would check whether there is enough interval duration between every two auto-scaling
func checkStsAutoScalingInterval(tac *v1alpha1.TidbCluster, intervalSeconds int32, memberType v1alpha1.MemberType) (bool, error) {
func checkStsAutoScalingInterval(tac *v1alpha1.TidbClusterAutoScaler, intervalSeconds int32, memberType v1alpha1.MemberType) (bool, error) {
if tac.Annotations == nil {
tac.Annotations = map[string]string{}
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/autoscaler/tidbcluster_autoscaler_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package autoscaler
import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler"
"github.com/pingcap/tidb-operator/pkg/controller"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
)
Expand All @@ -25,17 +24,15 @@ type ControlInterface interface {
ResconcileAutoScaler(ta *v1alpha1.TidbClusterAutoScaler) error
}

func NewDefaultAutoScalerControl(recorder record.EventRecorder, ctrl controller.TypedControlInterface, asm autoscaler.AutoScalerManager) ControlInterface {
func NewDefaultAutoScalerControl(recorder record.EventRecorder, asm autoscaler.AutoScalerManager) ControlInterface {
return &defaultAutoScalerControl{
recoder: recorder,
typedControl: ctrl,
autoScalerManager: asm,
}
}

type defaultAutoScalerControl struct {
recoder record.EventRecorder
typedControl controller.TypedControlInterface
autoScalerManager autoscaler.AutoScalerManager
}

Expand Down
13 changes: 4 additions & 9 deletions pkg/controller/autoscaler/tidbcluster_autoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
perrors "github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
Expand All @@ -34,20 +35,17 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Controller struct {
cli client.Client
control ControlInterface
taLister listers.TidbClusterAutoScalerLister
tcLister listers.TidbClusterLister
queue workqueue.RateLimitingInterface
}

func NewController(
kubeCli kubernetes.Interface,
genericCli client.Client,
cli versioned.Interface,
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory,
) *Controller {
Expand All @@ -56,14 +54,11 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidbclusterautoscaler"})

autoScalerInformer := informerFactory.Pingcap().V1alpha1().TidbClusterAutoScalers()
typedControl := controller.NewTypedControl(controller.NewRealGenericControl(genericCli, recorder))
asm := autoscaler.NewAutoScalerManager(cli, informerFactory, kubeInformerFactory, recorder)

asm := autoscaler.NewAutoScalerManager(informerFactory, kubeInformerFactory, recorder)
tac := &Controller{
cli: genericCli,
control: NewDefaultAutoScalerControl(recorder, typedControl, asm),
control: NewDefaultAutoScalerControl(recorder, asm),
taLister: autoScalerInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
Expand Down

0 comments on commit 9f4812d

Please sign in to comment.