Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish auto-scaler controller #1731

Merged
merged 4 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func main() {
tidbMonitorController := tidbmonitor.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory)
var autoScalerController *autoscaler.Controller
if features.DefaultFeatureGate.Enabled(features.AutoScaling) {
autoScalerController = autoscaler.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory)
autoScalerController = autoscaler.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
}
// Start informer factories after all controller are initialized.
informerFactory.Start(ctx.Done())
Expand Down
80 changes: 63 additions & 17 deletions pkg/autoscaler/autoscaler/autoscaler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,42 @@ import (
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
v1alpha1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
promClient "github.com/prometheus/client_golang/api"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
)

type autoScalerManager struct {
tcLister v1alpha1listers.TidbClusterLister
cli versioned.Interface
tcControl controller.TidbClusterControlInterface
taLister v1alpha1listers.TidbClusterAutoScalerLister
stsLister appslisters.StatefulSetLister
recorder record.EventRecorder
}

func NewAutoScalerManager(
cli versioned.Interface,
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory,
recorder record.EventRecorder) *autoScalerManager {
tcLister := informerFactory.Pingcap().V1alpha1().TidbClusters().Lister()
stsLister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()
return &autoScalerManager{
tcLister: informerFactory.Pingcap().V1alpha1().TidbClusters().Lister(),
stsLister: kubeInformerFactory.Apps().V1().StatefulSets().Lister(),
cli: cli,
tcControl: controller.NewRealTidbClusterControl(cli, tcLister, recorder),
taLister: informerFactory.Pingcap().V1alpha1().TidbClusterAutoScalers().Lister(),
stsLister: stsLister,
recorder: recorder,
}
}
Expand All @@ -55,52 +68,85 @@ func (am *autoScalerManager) Sync(tac *v1alpha1.TidbClusterAutoScaler) error {
tac.Spec.Cluster.Namespace = tac.Namespace
}

tcNamespace := tac.Spec.Cluster.Namespace
tc, err := am.tcLister.TidbClusters(tcNamespace).Get(tcName)
tc, err := am.cli.PingcapV1alpha1().TidbClusters(tac.Spec.Cluster.Namespace).Get(tcName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// Target TidbCluster Ref is deleted, empty the auto-scaling status
resetAutoScalingAnn(tac)
return nil
}
return err
}
checkAndUpdateTacAnn(tac)
oldTCSpec := tc.Spec.DeepCopy()
oldTc := tc.DeepCopy()
if err := am.syncAutoScaling(tc, tac); err != nil {
return err
}
if err := am.syncTidbClusterReplicas(tc, oldTCSpec); err != nil {
if err := am.syncTidbClusterReplicas(tc, oldTc); err != nil {
return err
}
return am.syncAutoScalingStatus(tc, oldTCSpec, tac)
return am.syncAutoScalingStatus(tc, oldTc, tac)
}

func (am *autoScalerManager) syncAutoScaling(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler) error {
if tac.Spec.MetricsUrl == nil {
return fmt.Errorf("tidbclusterAutoScaler[%s/%s]' metrics url should be defined explicitly", tac.Namespace, tac.Name)
}
client, err := promClient.NewClient(promClient.Config{Address: *tac.Spec.MetricsUrl})
c, err := promClient.NewClient(promClient.Config{Address: *tac.Spec.MetricsUrl})
if err != nil {
return err
}
defaultTAC(tac)
if err := am.syncTiKV(tc, tac, client); err != nil {
return err
if err := am.syncTiKV(tc, tac, c); err != nil {
klog.Errorf("tac[%s/%s] tikv sync failed, continue to sync next, err:%v", tac.Namespace, tac.Name, err)
}
if err := am.syncTiDB(tc, tac, client); err != nil {
return err
if err := am.syncTiDB(tc, tac, c); err != nil {
klog.Errorf("tac[%s/%s] tidb sync failed, continue to sync next, err:%v", tac.Namespace, tac.Name, err)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
klog.Infof("tc[%s/%s]'s tac[%s/%s] synced", tc.Namespace, tc.Name, tac.Namespace, tac.Name)
return nil
}

//TODO: sync TidbCluster.Spec.Replicas
func (am *autoScalerManager) syncTidbClusterReplicas(tc *v1alpha1.TidbCluster, oldTCSpec *v1alpha1.TidbClusterSpec) error {
func (am *autoScalerManager) syncTidbClusterReplicas(tc *v1alpha1.TidbCluster, oldTc *v1alpha1.TidbCluster) error {
if apiequality.Semantic.DeepEqual(tc, oldTc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just compare the whole tc object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any advice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparing method updated.

return nil
}
newTc := tc.DeepCopy()
_, err := am.tcControl.UpdateTidbCluster(newTc, &newTc.Status, &oldTc.Status)
if err != nil {
return err
}
return nil
}

//TODO: sync tac status
func (am *autoScalerManager) syncAutoScalingStatus(tc *v1alpha1.TidbCluster, oldTc *v1alpha1.TidbClusterSpec,
func (am *autoScalerManager) syncAutoScalingStatus(tc *v1alpha1.TidbCluster, oldTc *v1alpha1.TidbCluster,
tac *v1alpha1.TidbClusterAutoScaler) error {
return nil
return am.updateTidbClusterAutoScaler(tac)
}

func (am *autoScalerManager) updateTidbClusterAutoScaler(tac *v1alpha1.TidbClusterAutoScaler) error {

ns := tac.GetNamespace()
tacName := tac.GetName()
oldTac := tac.DeepCopy()

// don't wait due to limited number of clients, but backoff after the default number of steps
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var updateErr error
_, updateErr = am.cli.PingcapV1alpha1().TidbClusterAutoScalers(ns).Update(tac)
if updateErr == nil {
klog.Infof("TidbClusterAutoScaler: [%s/%s] updated successfully", ns, tacName)
return nil
}
klog.Errorf("failed to update TidbClusterAutoScaler: [%s/%s], error: %v", ns, tacName, updateErr)
if updated, err := am.taLister.TidbClusterAutoScalers(ns).Get(tacName); err == nil {
// make a copy so we don't mutate the shared cache
tac = updated.DeepCopy()
tac.Annotations = oldTac.Annotations
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated TidbClusterAutoScaler %s/%s from lister: %v", ns, tacName, err))
}
return updateErr
})
}
4 changes: 2 additions & 2 deletions pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if recommendedReplicas > currentReplicas {
intervalSeconds = tac.Spec.TiDB.ScaleOutIntervalSeconds
}
ableToScale, err := checkStsAutoScalingInterval(tc, *intervalSeconds, v1alpha1.TiDBMemberType)
ableToScale, err := checkStsAutoScalingInterval(tac, *intervalSeconds, v1alpha1.TiDBMemberType)
if err != nil {
return err
}
Expand All @@ -70,7 +70,7 @@ func syncTiDBAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
}

func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = time.Now().String()
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix record timestamp bug

}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoscaler/autoscaler/tikv_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster
if recommendedReplicas > tc.Spec.TiKV.Replicas {
intervalSeconds = tac.Spec.TiKV.ScaleOutIntervalSeconds
}
ableToScale, err := checkStsAutoScalingInterval(tc, *intervalSeconds, v1alpha1.TiKVMemberType)
ableToScale, err := checkStsAutoScalingInterval(tac, *intervalSeconds, v1alpha1.TiKVMemberType)
if err != nil {
return err
}
Expand All @@ -84,7 +84,7 @@ func filterTiKVInstances(tc *v1alpha1.TidbCluster) []string {
}

func updateTcTiKVAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = time.Now().String()
tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = fmt.Sprintf("%d", time.Now().Unix())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix record timestamp bug

}

func calculateTikvMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaler/autoscaler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func checkStsAutoScalingPrerequisites(set *appsv1.StatefulSet) bool {
}

// checkStsAutoScalingInterval would check whether there is enough interval duration between every two auto-scaling
func checkStsAutoScalingInterval(tac *v1alpha1.TidbCluster, intervalSeconds int32, memberType v1alpha1.MemberType) (bool, error) {
func checkStsAutoScalingInterval(tac *v1alpha1.TidbClusterAutoScaler, intervalSeconds int32, memberType v1alpha1.MemberType) (bool, error) {
if tac.Annotations == nil {
tac.Annotations = map[string]string{}
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/autoscaler/tidbcluster_autoscaler_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package autoscaler
import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler"
"github.com/pingcap/tidb-operator/pkg/controller"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
)
Expand All @@ -25,17 +24,15 @@ type ControlInterface interface {
ResconcileAutoScaler(ta *v1alpha1.TidbClusterAutoScaler) error
}

func NewDefaultAutoScalerControl(recorder record.EventRecorder, ctrl controller.TypedControlInterface, asm autoscaler.AutoScalerManager) ControlInterface {
func NewDefaultAutoScalerControl(recorder record.EventRecorder, asm autoscaler.AutoScalerManager) ControlInterface {
return &defaultAutoScalerControl{
recoder: recorder,
typedControl: ctrl,
autoScalerManager: asm,
}
}

type defaultAutoScalerControl struct {
recoder record.EventRecorder
typedControl controller.TypedControlInterface
autoScalerManager autoscaler.AutoScalerManager
}

Expand Down
13 changes: 4 additions & 9 deletions pkg/controller/autoscaler/tidbcluster_autoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
perrors "github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
Expand All @@ -34,20 +35,17 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Controller struct {
cli client.Client
control ControlInterface
taLister listers.TidbClusterAutoScalerLister
tcLister listers.TidbClusterLister
queue workqueue.RateLimitingInterface
}

func NewController(
kubeCli kubernetes.Interface,
genericCli client.Client,
cli versioned.Interface,
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory,
) *Controller {
Expand All @@ -56,14 +54,11 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidbclusterautoscaler"})

autoScalerInformer := informerFactory.Pingcap().V1alpha1().TidbClusterAutoScalers()
typedControl := controller.NewTypedControl(controller.NewRealGenericControl(genericCli, recorder))
asm := autoscaler.NewAutoScalerManager(cli, informerFactory, kubeInformerFactory, recorder)

asm := autoscaler.NewAutoScalerManager(informerFactory, kubeInformerFactory, recorder)
tac := &Controller{
cli: genericCli,
control: NewDefaultAutoScalerControl(recorder, typedControl, asm),
control: NewDefaultAutoScalerControl(recorder, asm),
taLister: autoScalerInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
Expand Down