Skip to content

Commit

Permalink
fix: Validate empty array value of triggers in ScaledObject/ScaledJob…
Browse files Browse the repository at this point in the history
… creation (#5524)

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
  • Loading branch information
SpiritZhou authored Mar 4, 2024
1 parent d4cfb0d commit 6c94347
Show file tree
Hide file tree
Showing 15 changed files with 445 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Here is an overview of all new **experimental** features:

### Fixes

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Validate empty array value of triggers in ScaledObject/ScaledJob creation ([#5520](https://github.com/kedacore/keda/issues/5520))

### Deprecations

Expand Down
73 changes: 73 additions & 0 deletions apis/keda/v1alpha1/scaledjob_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2024 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"encoding/json"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

var scaledjoblog = logf.Log.WithName("scaledjob-validation-webhook")

func (s *ScaledJob) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(s).
Complete()
}

// +kubebuilder:webhook:path=/validate-keda-sh-v1alpha1-scaledjob,mutating=false,failurePolicy=ignore,sideEffects=None,groups=keda.sh,resources=scaledjobs,verbs=create;update,versions=v1alpha1,name=vscaledjob.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &ScaledJob{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (s *ScaledJob) ValidateCreate() (admission.Warnings, error) {
val, _ := json.MarshalIndent(s, "", " ")
scaledjoblog.Info(fmt.Sprintf("validating scaledjob creation for %s", string(val)))
return nil, verifyTriggers(s, "create", false)
}

func (s *ScaledJob) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
val, _ := json.MarshalIndent(s, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("validating scaledjob update for %s", string(val)))

oldTa := old.(*ScaledJob)
if isScaledJobRemovingFinalizer(s.ObjectMeta, oldTa.ObjectMeta, s.Spec, oldTa.Spec) {
scaledjoblog.V(1).Info("finalizer removal, skipping validation")
return nil, nil
}
return nil, verifyTriggers(s, "update", false)
}

func (s *ScaledJob) ValidateDelete() (admission.Warnings, error) {
return nil, nil
}

func isScaledJobRemovingFinalizer(om metav1.ObjectMeta, oldOm metav1.ObjectMeta, spec ScaledJobSpec, oldSpec ScaledJobSpec) bool {
taSpec, _ := json.MarshalIndent(spec, "", " ")
oldTaSpec, _ := json.MarshalIndent(oldSpec, "", " ")
taSpecString := string(taSpec)
oldTaSpecString := string(oldTaSpec)

return len(om.Finalizers) == 0 && len(oldOm.Finalizers) == 1 && taSpecString == oldTaSpecString
}
83 changes: 83 additions & 0 deletions apis/keda/v1alpha1/scaledjob_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2024 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = It("should validate empty triggers in ScaledJob", func() {

namespaceName := "scaledjob-empty-triggers-set"
namespace := createNamespace(namespaceName)

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())

sj := createScaledJob(sjName, namespaceName, []ScaleTriggers{})

Eventually(func() error {
return k8sClient.Create(context.Background(), sj)
}).Should(HaveOccurred())
})

// -------------------------------------------------------------------------- //
// ----------------------------- HELP FUNCTIONS ----------------------------- //
// -------------------------------------------------------------------------- //
func createScaledJob(name string, namespace string, triggers []ScaleTriggers) *ScaledJob {
return &ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
TypeMeta: metav1.TypeMeta{
Kind: "ScaledJob",
APIVersion: "keda.sh",
},
Spec: ScaledJobSpec{
JobTargetRef: &batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: name,
Image: name,
},
},
},
},
},
Triggers: triggers,
},
}
}
36 changes: 31 additions & 5 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func validateWorkload(so *ScaledObject, action string, dryRun bool) (admission.W

verifyFunctions := []func(*ScaledObject, string, bool) error{
verifyCPUMemoryScalers,
verifyTriggers,
verifyScaledObjects,
verifyHpas,
verifyReplicaCount,
Expand All @@ -145,6 +144,17 @@ func validateWorkload(so *ScaledObject, action string, dryRun bool) (admission.W
}
}

verifyCommonFunctions := []func(interface{}, string, bool) error{
verifyTriggers,
}

for i := range verifyCommonFunctions {
err := verifyCommonFunctions[i](so, action, dryRun)
if err != nil {
return nil, err
}
}

scaledobjectlog.V(1).Info(fmt.Sprintf("scaledobject %s is valid", so.Name))
return nil, nil
}
Expand All @@ -158,11 +168,27 @@ func verifyReplicaCount(incomingSo *ScaledObject, action string, _ bool) error {
return nil
}

func verifyTriggers(incomingSo *ScaledObject, action string, _ bool) error {
err := ValidateTriggers(incomingSo.Spec.Triggers)
func verifyTriggers(incomingObject interface{}, action string, _ bool) error {
var triggers []ScaleTriggers
var name string
var namespace string
switch obj := incomingObject.(type) {
case *ScaledObject:
triggers = obj.Spec.Triggers
name = obj.Name
namespace = obj.Namespace
case *ScaledJob:
triggers = obj.Spec.Triggers
name = obj.Name
namespace = obj.Namespace
default:
return fmt.Errorf("unknown scalable object type %v", incomingObject)
}

err := ValidateTriggers(triggers)
if err != nil {
scaledobjectlog.WithValues("name", incomingSo.Name).Error(err, "validation error")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "incorrect-triggers")
scaledobjectlog.WithValues("name", name).Error(err, "validation error")
metricscollector.RecordScaledObjectValidatingErrors(namespace, action, "incorrect-triggers")
}
return err
}
Expand Down
24 changes: 24 additions & 0 deletions apis/keda/v1alpha1/scaledobject_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,30 @@ var _ = It("shouldn't create so when stabilizationWindowSeconds exceeds 3600", f
Should(HaveOccurred())
})

var _ = It("should validate empty triggers in ScaledObject", func() {

namespaceName := "empty-triggers-set"
namespace := createNamespace(namespaceName)

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())

workload := createDeployment(namespaceName, false, false)
workload.Spec.Template.Spec.Containers[0].Resources.Limits = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
}

err = k8sClient.Create(context.Background(), workload)
Expect(err).ToNot(HaveOccurred())

so := createScaledObject(soName, namespaceName, workloadName, "apps/v1", "Deployment", false, map[string]string{}, "")
so.Spec.Triggers = []ScaleTriggers{}

Eventually(func() error {
return k8sClient.Create(context.Background(), so)
}).Should(HaveOccurred())
})

// ============================ SCALING MODIFIERS ============================ \\
// =========================================================================== \\

Expand Down
7 changes: 6 additions & 1 deletion apis/keda/v1alpha1/scaletriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type AuthenticationRef struct {
// - useCachedMetrics is defined only for a supported triggers
func ValidateTriggers(triggers []ScaleTriggers) error {
triggersCount := len(triggers)

if triggersCount == 0 {
return fmt.Errorf("no triggers defined in the ScaledObject/ScaledJob")
}

if triggers != nil && triggersCount > 0 {
triggerNames := make(map[string]bool, triggersCount)
for i := 0; i < triggersCount; i++ {
Expand All @@ -66,7 +71,7 @@ func ValidateTriggers(triggers []ScaleTriggers) error {
if name != "" {
if _, found := triggerNames[name]; found {
// found duplicate name
return fmt.Errorf("triggerName %q is defined multiple times in the ScaledObject, but it must be unique", name)
return fmt.Errorf("triggerName %q is defined multiple times in the ScaledObject/ScaledJob, but it must be unique", name)
}
triggerNames[name] = true
}
Expand Down
7 changes: 6 additions & 1 deletion apis/keda/v1alpha1/scaletriggers_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestValidateTriggers(t *testing.T) {
Type: "prometheus",
},
},
expectedErrMsg: "triggerName \"trigger1\" is defined multiple times in the ScaledObject, but it must be unique",
expectedErrMsg: "triggerName \"trigger1\" is defined multiple times in the ScaledObject/ScaledJob, but it must be unique",
},
{
name: "unsupported useCachedMetrics property for cpu scaler",
Expand Down Expand Up @@ -84,6 +84,11 @@ func TestValidateTriggers(t *testing.T) {
},
expectedErrMsg: "",
},
{
name: "empty triggers array should be blocked",
triggers: []ScaleTriggers{},
expectedErrMsg: "no triggers defined in the ScaledObject/ScaledJob",
},
}

for _, test := range tests {
Expand Down
3 changes: 3 additions & 0 deletions apis/keda/v1alpha1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var cancel context.CancelFunc
const (
workloadName = "deployment-name"
soName = "test-so"
sjName = "test-sj"
)

func TestAPIs(t *testing.T) {
Expand Down Expand Up @@ -119,6 +120,8 @@ var _ = BeforeSuite(func() {

err = (&ScaledObject{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
err = (&ScaledJob{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
err = (&TriggerAuthentication{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
err = (&ClusterTriggerAuthentication{}).SetupWebhookWithManager(mgr)
Expand Down
4 changes: 4 additions & 0 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,8 @@ func setupWebhook(mgr manager.Manager) {
setupLog.Error(err, "unable to create webhook", "webhook", "ClusterTriggerAuthentication")
os.Exit(1)
}
if err := (&kedav1alpha1.ScaledJob{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "ScaledJob")
os.Exit(1)
}
}
24 changes: 24 additions & 0 deletions config/webhooks/validation_webhooks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ webhooks:
- scaledobjects
sideEffects: None
timeoutSeconds: 10
- admissionReviewVersions:
- v1
clientConfig:
service:
name: keda-admission-webhooks
namespace: keda
path: /validate-keda-sh-v1alpha1-scaledjob
failurePolicy: Ignore
matchPolicy: Equivalent
name: vscaledjob.kb.io
namespaceSelector: {}
objectSelector: {}
rules:
- apiGroups:
- keda.sh
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- scaledjobs
sideEffects: None
timeoutSeconds: 10
- admissionReviewVersions:
- v1
clientConfig:
Expand Down
5 changes: 5 additions & 0 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
return "ScaledJob is paused, skipping reconcile loop", err
}

err = kedav1alpha1.ValidateTriggers(scaledJob.Spec.Triggers)
if err != nil {
return "ScaledJob doesn't have correct triggers specification", err
}

// nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable
msg, err := r.deletePreviousVersionScaleJobs(ctx, logger, scaledJob)
if err != nil {
Expand Down
Loading

0 comments on commit 6c94347

Please sign in to comment.