Skip to content

Commit

Permalink
use Patch to set FallbackCondition on ScaledObject.Status (kedacore#2037
Browse files Browse the repository at this point in the history
)

Signed-off-by: nilayasiktoprak <nilayasiktoprak@gmail.com>
  • Loading branch information
zroubalik authored and nilayasiktoprak committed Oct 23, 2021
1 parent 27952d1 commit b56c1b2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 37 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- Use Patch to set FallbackCondition on ScaledObject.Status ([#2037](https://github.com/kedacore/keda/pull/2037))

## v2.4.0

Expand Down
49 changes: 28 additions & 21 deletions pkg/provider/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,43 @@ import (
"context"
"fmt"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers"
)

func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool {
return scaledObject.Spec.Fallback != nil && metricSpec.External.Target.Type == v2beta2.AverageValueMetricType
}

func (p *KedaProvider) getMetricsWithFallback(scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) {
initHealthStatus(scaledObject)
status := scaledObject.Status.DeepCopy()

initHealthStatus(status)
metrics, err := scaler.GetMetrics(context.TODO(), metricName, metricSelector)
healthStatus := getHealthStatus(scaledObject, metricName)
healthStatus := getHealthStatus(status, metricName)

if err == nil {
zero := int32(0)
healthStatus.NumberOfFailures = &zero
healthStatus.Status = kedav1alpha1.HealthStatusHappy
scaledObject.Status.Health[metricName] = *healthStatus
status.Health[metricName] = *healthStatus

p.updateStatus(scaledObject, metricSpec)
p.updateStatus(scaledObject, status, metricSpec)
return metrics, nil
}

healthStatus.Status = kedav1alpha1.HealthStatusFailing
*healthStatus.NumberOfFailures++
scaledObject.Status.Health[metricName] = *healthStatus
status.Health[metricName] = *healthStatus

p.updateStatus(scaledObject, metricSpec)
p.updateStatus(scaledObject, status, metricSpec)

switch {
case !isFallbackEnabled(scaledObject, metricSpec):
Expand Down Expand Up @@ -84,37 +88,40 @@ func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.Metr
return fallbackMetrics
}

func (p *KedaProvider) updateStatus(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) {
func (p *KedaProvider) updateStatus(scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus, metricSpec v2beta2.MetricSpec) {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())

if fallbackExistsInScaledObject(scaledObject, metricSpec) {
scaledObject.Status.Conditions.SetFallbackCondition(metav1.ConditionTrue, "FallbackExists", "At least one trigger is falling back on this scaled object")
status.Conditions.SetFallbackCondition(metav1.ConditionTrue, "FallbackExists", "At least one trigger is falling back on this scaled object")
} else {
scaledObject.Status.Conditions.SetFallbackCondition(metav1.ConditionFalse, "NoFallbackFound", "No fallbacks are active on this scaled object")
status.Conditions.SetFallbackCondition(metav1.ConditionFalse, "NoFallbackFound", "No fallbacks are active on this scaled object")
}

err := p.client.Status().Update(context.TODO(), scaledObject)
scaledObject.Status = *status
err := p.client.Status().Patch(context.TODO(), scaledObject, patch)
if err != nil {
logger.Error(err, "Error updating ScaledObject status", "Error")
logger.Error(err, "Failed to patch ScaledObjects Status")
}
}

func getHealthStatus(scaledObject *kedav1alpha1.ScaledObject, metricName string) *kedav1alpha1.HealthStatus {
func getHealthStatus(status *kedav1alpha1.ScaledObjectStatus, metricName string) *kedav1alpha1.HealthStatus {
// Get health status for a specific metric
_, healthStatusExists := scaledObject.Status.Health[metricName]
_, healthStatusExists := status.Health[metricName]
if !healthStatusExists {
zero := int32(0)
status := kedav1alpha1.HealthStatus{
healthStatus := kedav1alpha1.HealthStatus{
NumberOfFailures: &zero,
Status: kedav1alpha1.HealthStatusHappy,
}
scaledObject.Status.Health[metricName] = status
status.Health[metricName] = healthStatus
}
healthStatus := scaledObject.Status.Health[metricName]
healthStatus := status.Health[metricName]
return &healthStatus
}

func initHealthStatus(scaledObject *kedav1alpha1.ScaledObject) {
func initHealthStatus(status *kedav1alpha1.ScaledObjectStatus) {
// Init health status if missing
if scaledObject.Status.Health == nil {
scaledObject.Status.Health = make(map[string]kedav1alpha1.HealthStatus)
if status.Health == nil {
status.Health = make(map[string]kedav1alpha1.HealthStatus)
}
}
31 changes: 16 additions & 15 deletions pkg/provider/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (

"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
"github.com/kubernetes-sigs/custom-metrics-apiserver/pkg/provider"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -20,6 +16,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
)

const metricName = "some_metric_name"
Expand Down Expand Up @@ -67,7 +68,7 @@ var _ = Describe("fallback", func() {
primeGetMetrics(scaler, expectedMetricValue)
so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)

Expand Down Expand Up @@ -97,7 +98,7 @@ var _ = Describe("fallback", func() {
)

metricSpec := createMetricSpec(3)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)

Expand All @@ -112,7 +113,7 @@ var _ = Describe("fallback", func() {

so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

_, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)

Expand Down Expand Up @@ -140,7 +141,7 @@ var _ = Describe("fallback", func() {
)

metricSpec := createMetricSpec(10)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

_, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)

Expand Down Expand Up @@ -169,7 +170,7 @@ var _ = Describe("fallback", func() {
},
)
metricSpec := createMetricSpec(10)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)

Expand Down Expand Up @@ -223,7 +224,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)

statusWriter := mock_client.NewMockStatusWriter(ctrl)
statusWriter.EXPECT().Update(gomock.Any(), gomock.Any()).Return(errors.New("Some error"))
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error"))
client.EXPECT().Status().Return(statusWriter)

metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)
Expand Down Expand Up @@ -253,7 +254,7 @@ var _ = Describe("fallback", func() {
},
)
metricSpec := createMetricSpec(10)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

_, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)

Expand Down Expand Up @@ -286,7 +287,7 @@ var _ = Describe("fallback", func() {
},
)
metricSpec := createMetricSpec(10)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

_, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -319,7 +320,7 @@ var _ = Describe("fallback", func() {
},
)
metricSpec := createMetricSpec(10)
expectStatusUpdate(ctrl, client)
expectStatusPatch(ctrl, client)

_, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec)
Expect(err).ShouldNot(BeNil())
Expand Down Expand Up @@ -365,9 +366,9 @@ func (h *healthStatusMatcher) NegatedFailureMessage(actual interface{}) (message
}
}

func expectStatusUpdate(ctrl *gomock.Controller, client *mock_client.MockClient) {
func expectStatusPatch(ctrl *gomock.Controller, client *mock_client.MockClient) {
statusWriter := mock_client.NewMockStatusWriter(ctrl)
statusWriter.EXPECT().Update(gomock.Any(), gomock.Any())
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter)
}

Expand Down

0 comments on commit b56c1b2

Please sign in to comment.