From cbaebe40905fb2ab18ac6e64b9c299d558f098d5 Mon Sep 17 00:00:00 2001 From: champly Date: Wed, 1 Jun 2022 11:17:38 +0800 Subject: [PATCH 1/6] fix scaleobject ready condition 'True'->'False/Unknow' requeue Signed-off-by: champly --- controllers/keda/scaledobject_controller.go | 6 +++- controllers/keda/util/predicate.go | 32 +++++++++++++++++++++ pkg/scaling/executor/scale_scaledobjects.go | 2 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index ffa6f5fabb6..ec54b6d0dc7 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -123,7 +123,11 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont // (in this case metadata.Generation does not change) // so reconcile loop is not started on Status updates For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates( - predicate.Or(kedacontrollerutil.PausedReplicasPredicate{}, predicate.GenerationChangedPredicate{}), + predicate.Or( + kedacontrollerutil.PausedReplicasPredicate{}, + kedacontrollerutil.ScaleObjectReadyConditionPredicate{}, + predicate.GenerationChangedPredicate{}, + ), )). Owns(&autoscalingv2beta2.HorizontalPodAutoscaler{}). Complete(r) diff --git a/controllers/keda/util/predicate.go b/controllers/keda/util/predicate.go index dd3349182e7..cfca789a397 100644 --- a/controllers/keda/util/predicate.go +++ b/controllers/keda/util/predicate.go @@ -1,6 +1,7 @@ package util import ( + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -28,3 +29,34 @@ func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool { } return false } + +type ScaleObjectReadyConditionPredicate struct { + predicate.Funcs +} + +func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool { + if e.ObjectOld == nil || e.ObjectNew == nil { + return false + } + + var newReadyCondition, oldReadyCondition kedav1alpha1.Condition + + oldObj, ok := e.ObjectOld.(*kedav1alpha1.ScaledObject) + if !ok { + return false + } + oldReadyCondition = oldObj.Status.Conditions.GetReadyCondition() + + newObj, ok := e.ObjectNew.(*kedav1alpha1.ScaledObject) + if !ok { + return false + } + newReadyCondition = newObj.Status.Conditions.GetReadyCondition() + + // False/Unknown -> True + if !oldReadyCondition.IsTrue() && newReadyCondition.IsTrue() { + return true + } + + return false +} diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index b3e2db9fcb8..580ab071d1d 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -75,7 +75,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al // but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true' readyCondition := scaledObject.Status.Conditions.GetReadyCondition() if !isError && !readyCondition.IsTrue() { - if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse, + if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { logger.Error(err, "error setting ready condition") } From 03302c1cfa30964109c1ff5fa6b71fc4ef4441c5 Mon Sep 17 00:00:00 2001 From: champly Date: Wed, 1 Jun 2022 11:55:57 +0800 Subject: [PATCH 2/6] fix ci Signed-off-by: champly --- controllers/keda/util/predicate.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/controllers/keda/util/predicate.go b/controllers/keda/util/predicate.go index cfca789a397..03aeb16c0bf 100644 --- a/controllers/keda/util/predicate.go +++ b/controllers/keda/util/predicate.go @@ -1,9 +1,10 @@ package util import ( - kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas" From 9098222ec13751be96aaf2b0010f47488ad72d15 Mon Sep 17 00:00:00 2001 From: champly Date: Thu, 2 Jun 2022 20:45:10 +0800 Subject: [PATCH 3/6] add change log Signed-off-by: champly --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 414eafd464b..469a997b4d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md - **General:** Use metricName from GetMetricsSpec in ScaledJobs instead of `queueLength` ([#3032](https://github.com/kedacore/keda/issue/3032)) - **General:** Refactor adapter startup to ensure proper log initilization. ([2316](https://github.com/kedacore/keda/issues/2316)) - **Azure Eventhub Scaler:** KEDA operator crashes on nil memory panic if the eventhub connectionstring for Azure Eventhub Scaler contains an invalid character ([#3082](https://github.com/kedacore/keda/issues/3082)) +- **General:** Scaleobject ready condition 'False/Unknow' to 'True' requeue([#3096](https://github.com/kedacore/keda/issues/3096)) ### Deprecations From fe3497dc38a1cc1c5949b92c2cbbac7c4394332b Mon Sep 17 00:00:00 2001 From: champly Date: Tue, 7 Jun 2022 15:45:39 +0800 Subject: [PATCH 4/6] add ut Signed-off-by: champly --- .../keda/scaledobject_controller_test.go | 112 ++++++++++++++++++ pkg/scalers/azure_servicebus_scaler.go | 0 pkg/scalers/azure_servicebus_scaler_test.go | 0 pkg/scalers/external_mock_scaler.go | 91 ++++++++++++++ pkg/scaling/scale_handler.go | 3 + 5 files changed, 206 insertions(+) mode change 100755 => 100644 pkg/scalers/azure_servicebus_scaler.go mode change 100755 => 100644 pkg/scalers/azure_servicebus_scaler_test.go create mode 100644 pkg/scalers/external_mock_scaler.go diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index 394a02fe962..3ad4317e7bd 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -19,6 +19,7 @@ package keda import ( "context" "fmt" + "sync/atomic" "time" "github.com/golang/mock/gomock" @@ -27,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -598,6 +600,116 @@ var _ = Describe("ScaledObjectController", func() { }, 20*time.Second).Should(Equal(metav1.ConditionFalse)) }) }) + + It("scaleobject ready condition 'False/Unknow' to 'True' will requeue", func() { + var ( + deploymentName = "conditionchange" + soName = "so-" + deploymentName + min int32 = 1 + max int32 = 5 + pollingInterVal int32 = 1 + ) + + // Create the scaling target. + err := k8sClient.Create(context.Background(), generateDeployment(deploymentName)) + Expect(err).ToNot(HaveOccurred()) + + so := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: soName, Namespace: "default"}, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: deploymentName, + }, + MinReplicaCount: &min, + MaxReplicaCount: &max, + PollingInterval: &pollingInterVal, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cpu", + MetricType: autoscalingv2beta2.UtilizationMetricType, + Metadata: map[string]string{ + "value": "50", + }, + }, + { + Type: "externalMock", + MetricType: autoscalingv2beta2.AverageValueMetricType, + Metadata: map[string]string{}, + }, + }, + }, + } + err = k8sClient.Create(context.Background(), so) + Expect(err).ToNot(HaveOccurred()) + + // wait so's ready condition Ready + Eventually(func() metav1.ConditionStatus { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + return so.Status.Conditions.GetReadyCondition().Status + }, 5*time.Second).Should(Equal(metav1.ConditionTrue)) + + // check hpa + hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} + Eventually(func() int { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa) + Expect(err).ToNot(HaveOccurred()) + return len(hpa.Spec.Metrics) + }, 1*time.Second).Should(Equal(2)) + + // mock external server offline + atomic.StoreInt32(&scalers.MockExternalServerStatus, scalers.MockExternalServerStatusOffline) + + // wait so's ready condition not + Eventually(func() metav1.ConditionStatus { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + return so.Status.Conditions.GetReadyCondition().Status + }, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown))) + + // mock kube-controller-manager request v1beta1.custom.metrics.k8s.io api GetMetrics + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa) + Expect(err).ToNot(HaveOccurred()) + hpa.Status.CurrentMetrics = []autoscalingv2beta2.MetricStatus{ + { + Type: autoscalingv2beta2.ResourceMetricSourceType, + Resource: &autoscalingv2beta2.ResourceMetricStatus{ + Name: corev1.ResourceCPU, + Current: autoscalingv2beta2.MetricValueStatus{ + Value: resource.NewQuantity(int64(100), resource.DecimalSI), + }, + }, + }, + } + err = k8sClient.Status().Update(ctx, hpa) + Expect(err).ToNot(HaveOccurred()) + + // hpa metrics will only left CPU metric + Eventually(func() int { + hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa) + Expect(err).ToNot(HaveOccurred()) + return len(hpa.Spec.Metrics) + }, 5*time.Second).Should(Equal(1)) + + // mock external server online + atomic.StoreInt32(&scalers.MockExternalServerStatus, scalers.MockExternalServerStatusOnline) + + // wait so's ready condition Ready + Eventually(func() metav1.ConditionStatus { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + return so.Status.Conditions.GetReadyCondition().Status + }, 5*time.Second).Should(Equal(metav1.ConditionTrue)) + + // hpa will recover + Eventually(func() int { + hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa) + Expect(err).ToNot(HaveOccurred()) + return len(hpa.Spec.Metrics) + }, 5*time.Second).Should(Equal(2)) + }) }) func generateDeployment(name string) *appsv1.Deployment { diff --git a/pkg/scalers/azure_servicebus_scaler.go b/pkg/scalers/azure_servicebus_scaler.go old mode 100755 new mode 100644 diff --git a/pkg/scalers/azure_servicebus_scaler_test.go b/pkg/scalers/azure_servicebus_scaler_test.go old mode 100755 new mode 100644 diff --git a/pkg/scalers/external_mock_scaler.go b/pkg/scalers/external_mock_scaler.go new file mode 100644 index 00000000000..34d408275ff --- /dev/null +++ b/pkg/scalers/external_mock_scaler.go @@ -0,0 +1,91 @@ +package scalers + +import ( + "context" + "errors" + "sync/atomic" + + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +const ( + MockExternalServerStatusOffline int32 = 0 + MockExternalServerStatusOnline int32 = 1 +) + +var ( + MockExternalServerStatus = MockExternalServerStatusOnline + ErrMock = errors.New("mock error") + MockMetricName = "mockMetricName" + MockMetricTarget int64 = 50 + MockMetricValue int64 = 100 +) + +type externalMockScaler struct{} + +func NewExternalMockScaler(config *ScalerConfig) (Scaler, error) { + return &externalMockScaler{}, nil +} + +// IsActive implements Scaler +func (*externalMockScaler) IsActive(ctx context.Context) (bool, error) { + if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline { + return false, ErrMock + } + + return true, nil +} + +// Close implements Scaler +func (*externalMockScaler) Close(ctx context.Context) error { + return nil +} + +// GetMetricSpecForScaling implements Scaler +func (*externalMockScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec { + if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline { + return nil + } + + return getMockMetricsSpecs() +} + +// GetMetrics implements Scaler +func (*externalMockScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline { + return nil, ErrMock + } + + return getMockExternalMetricsValue(), nil +} + +func getMockMetricsSpecs() []v2beta2.MetricSpec { + return []v2beta2.MetricSpec{ + { + Type: v2beta2.ExternalMetricSourceType, + External: &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: MockMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.ValueMetricType, + Value: resource.NewQuantity(MockMetricValue, resource.DecimalSI), + }, + }, + }, + } +} + +func getMockExternalMetricsValue() []external_metrics.ExternalMetricValue { + return []external_metrics.ExternalMetricValue{ + { + MetricName: MockMetricName, + Value: *resource.NewQuantity(MockMetricValue, resource.DecimalSI), + Timestamp: metav1.Now(), + }, + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 3d5c39ff8ad..4fe37760a6d 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -391,6 +391,9 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewElasticsearchScaler(config) case "external": return scalers.NewExternalScaler(config) + // TODO: use other way for test. + case "externalMock": + return scalers.NewExternalMockScaler(config) case "external-push": return scalers.NewExternalPushScaler(config) case "gcp-pubsub": From 809be5c94db8ff920033ab9958fb070dc207c077 Mon Sep 17 00:00:00 2001 From: champly Date: Tue, 7 Jun 2022 16:11:43 +0800 Subject: [PATCH 5/6] fix static check with sort Signed-off-by: champly --- controllers/keda/scaledobject_controller_test.go | 2 +- pkg/scaling/scale_handler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index 3ad4317e7bd..c3730848317 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -632,7 +632,7 @@ var _ = Describe("ScaledObjectController", func() { }, }, { - Type: "externalMock", + Type: "external-mock", MetricType: autoscalingv2beta2.AverageValueMetricType, Metadata: map[string]string{}, }, diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 4fe37760a6d..b90851f3241 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -392,7 +392,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, case "external": return scalers.NewExternalScaler(config) // TODO: use other way for test. - case "externalMock": + case "external-mock": return scalers.NewExternalMockScaler(config) case "external-push": return scalers.NewExternalPushScaler(config) From b176e8bd3ad0f4519ce80cc9170c7c5cb9525253 Mon Sep 17 00:00:00 2001 From: champly Date: Tue, 7 Jun 2022 16:37:11 +0800 Subject: [PATCH 6/6] revert file permission Signed-off-by: champly --- pkg/scalers/azure_servicebus_scaler.go | 0 pkg/scalers/azure_servicebus_scaler_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 pkg/scalers/azure_servicebus_scaler.go mode change 100644 => 100755 pkg/scalers/azure_servicebus_scaler_test.go diff --git a/pkg/scalers/azure_servicebus_scaler.go b/pkg/scalers/azure_servicebus_scaler.go old mode 100644 new mode 100755 diff --git a/pkg/scalers/azure_servicebus_scaler_test.go b/pkg/scalers/azure_servicebus_scaler_test.go old mode 100644 new mode 100755