diff --git a/pkg/handler/scale_jobs.go b/pkg/handler/scale_jobs.go index 1f0c48420d3..2abecbc976f 100644 --- a/pkg/handler/scale_jobs.go +++ b/pkg/handler/scale_jobs.go @@ -7,20 +7,31 @@ import ( kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) func (h *ScaleHandler) scaleJobs(scaledObject *kedav1alpha1.ScaledObject, isActive bool, scaleTo int64, maxScale int64) { - // TODO: get current job count - h.logger.V(1).Info("Scaling Jobs") + runningJobCount := h.getRunningJobCount(scaledObject, maxScale) + h.logger.Info("Scaling Jobs", "Number of running Jobs ", runningJobCount) + + var effectiveMaxScale int64 + effectiveMaxScale = maxScale - runningJobCount + if effectiveMaxScale < 0 { + effectiveMaxScale = 0 + } + + h.logger.Info("Scaling Jobs") if isActive { h.logger.V(1).Info("At least one scaler is active") now := metav1.Now() scaledObject.Status.LastActiveTime = &now h.updateScaledObjectStatus(scaledObject) - h.createJobs(scaledObject, scaleTo, maxScale) + h.createJobs(scaledObject, scaleTo, effectiveMaxScale) + } else { h.logger.V(1).Info("No change in activity") } @@ -34,6 +45,8 @@ func (h *ScaleHandler) createJobs(scaledObject *kedav1alpha1.ScaledObject, scale } scaledObject.Spec.JobTargetRef.Template.Labels["scaledobject"] = scaledObject.GetName() + h.logger.Info("Creating jobs", "Effective number of max jobs", maxScale) + if scaleTo > maxScale { scaleTo = maxScale } @@ -95,3 +108,36 @@ func (h *ScaleHandler) parseJobAuthRef(triggerAuthRef *kedav1alpha1.ScaledObject return env[name] }) } + +func (h *ScaleHandler) isJobFinished(j *batchv1.Job) bool { + for _, c := range j.Status.Conditions { + if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue { + return true + } + } + return false +} + +func (h *ScaleHandler) getRunningJobCount(scaledObject *kedav1alpha1.ScaledObject, maxScale int64) int64 { + var runningJobs int64 + + opts := []client.ListOption{ + client.InNamespace(scaledObject.GetNamespace()), + client.MatchingLabels(map[string]string{"scaledobject": scaledObject.GetName()}), + } + + jobs := &batchv1.JobList{} + err := h.client.List(context.TODO(), jobs, opts...) + + if err != nil { + return 0 + } + + for _, job := range jobs.Items { + if !h.isJobFinished(&job) { + runningJobs++ + } + } + + return runningJobs +}