Skip to content

Commit

Permalink
Total running Jobs must not exceed maxScale - Running jobs (#528)
Browse files Browse the repository at this point in the history
* total jobs to scale must not be more than the maxScale - runningJobs

* maxScale becomes maxScale - runningJobs

* effectiveMaxScale should not be less than zero.
  • Loading branch information
balchua authored and ahmelsayed committed Jan 15, 2020
1 parent 19dd440 commit 87a777a
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions pkg/handler/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,31 @@ import (
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func (h *ScaleHandler) scaleJobs(scaledObject *kedav1alpha1.ScaledObject, isActive bool, scaleTo int64, maxScale int64) {
// TODO: get current job count
h.logger.V(1).Info("Scaling Jobs")
runningJobCount := h.getRunningJobCount(scaledObject, maxScale)
h.logger.Info("Scaling Jobs", "Number of running Jobs ", runningJobCount)

var effectiveMaxScale int64
effectiveMaxScale = maxScale - runningJobCount
if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}

h.logger.Info("Scaling Jobs")

if isActive {
h.logger.V(1).Info("At least one scaler is active")
now := metav1.Now()
scaledObject.Status.LastActiveTime = &now
h.updateScaledObjectStatus(scaledObject)
h.createJobs(scaledObject, scaleTo, maxScale)
h.createJobs(scaledObject, scaleTo, effectiveMaxScale)

} else {
h.logger.V(1).Info("No change in activity")
}
Expand All @@ -34,6 +45,8 @@ func (h *ScaleHandler) createJobs(scaledObject *kedav1alpha1.ScaledObject, scale
}
scaledObject.Spec.JobTargetRef.Template.Labels["scaledobject"] = scaledObject.GetName()

h.logger.Info("Creating jobs", "Effective number of max jobs", maxScale)

if scaleTo > maxScale {
scaleTo = maxScale
}
Expand Down Expand Up @@ -95,3 +108,36 @@ func (h *ScaleHandler) parseJobAuthRef(triggerAuthRef *kedav1alpha1.ScaledObject
return env[name]
})
}

func (h *ScaleHandler) isJobFinished(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
return true
}
}
return false
}

func (h *ScaleHandler) getRunningJobCount(scaledObject *kedav1alpha1.ScaledObject, maxScale int64) int64 {
var runningJobs int64

opts := []client.ListOption{
client.InNamespace(scaledObject.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledobject": scaledObject.GetName()}),
}

jobs := &batchv1.JobList{}
err := h.client.List(context.TODO(), jobs, opts...)

if err != nil {
return 0
}

for _, job := range jobs.Items {
if !h.isJobFinished(&job) {
runningJobs++
}
}

return runningJobs
}

0 comments on commit 87a777a

Please sign in to comment.