Skip to content

Commit

Permalink
Merge pull request #101292 from AliceZhang2016/job_controller_metrics
Browse files Browse the repository at this point in the history
Graduate indexed job to beta
  • Loading branch information
k8s-ci-robot authored May 7, 2021
2 parents c9bd08a + 0c99f29 commit 548fb43
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 23 deletions.
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/apis/batch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ type JobSpec struct {
// for each index.
// When value is `Indexed`, .spec.completions must be specified and
// `.spec.parallelism` must be less than or equal to 10^5.
// In addition, The Pod name takes the form
// `$(job-name)-$(index)-$(random-string)`,
// the Pod hostname takes the form `$(job-name)-$(index)`.
//
// This field is alpha-level and is only honored by servers that enable the
// IndexedJob feature gate. More completion modes can be added in the future.
// This field is beta-level. More completion modes can be added in the future.
// If the Job controller observes a mode that it doesn't recognize, the
// controller skips updates for the Job.
// +optional
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/batch/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func ValidateJob(job *batch.Job, opts apivalidation.PodValidationOptions) field.
allErrs := apivalidation.ValidateObjectMeta(&job.ObjectMeta, true, apivalidation.ValidateReplicationControllerName, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateGeneratedSelector(job)...)
allErrs = append(allErrs, ValidateJobSpec(&job.Spec, field.NewPath("spec"), opts)...)
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion && job.Spec.Completions != nil && *job.Spec.Completions > 0 {
// For indexed job, the job controller appends a suffix (`-$INDEX`)
// to the pod hostname when indexed job create pods.
// The index could be maximum `.spec.completions-1`
// If we don't validate this here, the indexed job will fail to create pods later.
maximumPodHostname := fmt.Sprintf("%s-%d", job.ObjectMeta.Name, *job.Spec.Completions-1)
if errs := apimachineryvalidation.IsDNS1123Label(maximumPodHostname); len(errs) > 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), job.ObjectMeta.Name, fmt.Sprintf("will not able to create pod with invalid DNS label: %s", maximumPodHostname)))
}
}
return allErrs
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/job/indexed_job_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func getCompletionIndex(annotations map[string]string) int {
if annotations == nil {
return unknownCompletionIndex
}
v, ok := annotations[batch.JobCompletionIndexAnnotationAlpha]
v, ok := annotations[batch.JobCompletionIndexAnnotation]
if !ok {
return unknownCompletionIndex
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func addCompletionIndexEnvVariable(container *v1.Container) {
Name: completionIndexEnvName,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha),
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation),
},
},
})
Expand All @@ -213,7 +213,7 @@ func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
if template.Annotations == nil {
template.Annotations = make(map[string]string, 1)
}
template.Annotations[batch.JobCompletionIndexAnnotationAlpha] = strconv.Itoa(index)
template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
}

type byCompletionIndex []*v1.Pod
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/indexed_job_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
}
if desc.Index != noIndex {
p.Annotations = map[string]string{
batch.JobCompletionIndexAnnotationAlpha: desc.Index,
batch.JobCompletionIndexAnnotation: desc.Index,
}
}
pods = append(pods, p)
Expand All @@ -297,7 +297,7 @@ func toIndexPhases(pods []*v1.Pod) []indexPhase {
for i, p := range pods {
index := noIndex
if p.Annotations != nil {
index = p.Annotations[batch.JobCompletionIndexAnnotationAlpha]
index = p.Annotations[batch.JobCompletionIndexAnnotation]
}
result[i] = indexPhase{index, p.Status.Phase}
}
Expand Down
34 changes: 31 additions & 3 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/integer"
)
Expand All @@ -60,7 +61,8 @@ var (
// DefaultJobBackOff is the default backoff period, exported for the e2e test
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test
MaxJobBackOff = 360 * time.Second
MaxJobBackOff = 360 * time.Second
maxPodCreateDeletePerSync = 500
)

// Controller ensures that all Job objects have corresponding pods to
Expand Down Expand Up @@ -139,6 +141,8 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob

metrics.Register()

return jm
}

Expand Down Expand Up @@ -440,7 +444,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *Controller) syncJob(key string) (bool, error) {
func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
Expand Down Expand Up @@ -480,6 +484,21 @@ func (jm *Controller) syncJob(key string) (bool, error) {
return false, nil
}

completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(&job) {
completionMode = string(batch.IndexedCompletion)
}

defer func() {
result := "success"
if rErr != nil {
result = "error"
}

metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result).Observe(time.Since(startTime).Seconds())
metrics.JobSyncNum.WithLabelValues(completionMode, result).Inc()
}()

// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
Expand Down Expand Up @@ -546,6 +565,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage))
jobConditionsChanged = true
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc()
} else {
if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods)
Expand Down Expand Up @@ -581,6 +601,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
now := metav1.Now()
job.Status.CompletionTime = &now
jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
} else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
// Update the conditions / emit events only if manageJob was called in
// this syncJob. Otherwise wait for the right syncJob call to make
Expand Down Expand Up @@ -613,7 +634,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
}
}

forget := false
forget = false
// Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true
// This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to
// improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed.
Expand Down Expand Up @@ -783,6 +804,9 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
rmAtLeast = 0
}
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
if len(podsToDelete) > maxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:maxPodCreateDeletePerSync]
}
if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism)
Expand All @@ -803,6 +827,10 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
return active, nil
}

if diff > int32(maxPodCreateDeletePerSync) {
diff = int32(maxPodCreateDeletePerSync)
}

jm.expectations.ExpectCreations(jobKey, int(diff))
errCh := make(chan error, diff)
klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
p.Status = v1.PodStatus{Phase: s.Phase}
if s.Index != noIndex {
p.Annotations = map[string]string{
batch.JobCompletionIndexAnnotationAlpha: s.Index,
batch.JobCompletionIndexAnnotation: s.Index,
}
}
podIndexer.Add(p)
Expand Down Expand Up @@ -2176,7 +2176,7 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
Name: "JOB_COMPLETION_INDEX",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha),
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation),
},
},
},
Expand Down
75 changes: 75 additions & 0 deletions pkg/controller/job/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"sync"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)

// JobControllerSubsystem - subsystem name used for this controller.
const JobControllerSubsystem = "job_controller"

var (
// JobSyncDurationSeconds tracks the latency of job syncs as
// completion_mode = Indexed / NonIndexed and result = success / error.
JobSyncDurationSeconds = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: JobControllerSubsystem,
Name: "job_sync_duration_seconds",
Help: "The time it took to sync a job",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
},
[]string{"completion_mode", "result"},
)
// JobSyncNum tracks the number of job syncs as
// completion_mode = Indexed / NonIndexed and result = success / error.
JobSyncNum = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_sync_total",
Help: "The number of job syncs",
StabilityLevel: metrics.ALPHA,
},
[]string{"completion_mode", "result"},
)
// JobFinishedNum tracks the number of jobs that finish as
// completion_mode = Indexed / NonIndexed and result = failed / succeeded.
JobFinishedNum = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_finished_total",
Help: "The number of finished job",
StabilityLevel: metrics.ALPHA,
},
[]string{"completion_mode", "result"},
)
)

var registerMetrics sync.Once

// Register registers Job controller metrics.
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(JobSyncDurationSeconds)
legacyregistry.MustRegister(JobSyncNum)
legacyregistry.MustRegister(JobFinishedNum)
})
}
3 changes: 2 additions & 1 deletion pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ const (

// owner: @alculquicondor
// alpha: v1.21
// beta: v1.22
//
// Allows Job controller to manage Pod completions per completion index.
IndexedJob featuregate.Feature = "IndexedJob"
Expand Down Expand Up @@ -779,7 +780,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
NetworkPolicyEndPort: {Default: false, PreRelease: featuregate.Alpha},
ProcMountType: {Default: false, PreRelease: featuregate.Alpha},
TTLAfterFinished: {Default: true, PreRelease: featuregate.Beta},
IndexedJob: {Default: false, PreRelease: featuregate.Alpha},
IndexedJob: {Default: true, PreRelease: featuregate.Beta},
KubeletPodResources: {Default: true, PreRelease: featuregate.Beta},
LocalStorageCapacityIsolationFSQuotaMonitoring: {Default: false, PreRelease: featuregate.Alpha},
NonPreemptingPriority: {Default: true, PreRelease: featuregate.Beta},
Expand Down
6 changes: 4 additions & 2 deletions staging/src/k8s.io/api/batch/v1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions staging/src/k8s.io/api/batch/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const JobCompletionIndexAnnotationAlpha = "batch.kubernetes.io/job-completion-index"
const JobCompletionIndexAnnotation = "batch.kubernetes.io/job-completion-index"

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -162,9 +162,11 @@ type JobSpec struct {
// for each index.
// When value is `Indexed`, .spec.completions must be specified and
// `.spec.parallelism` must be less than or equal to 10^5.
// In addition, The Pod name takes the form
// `$(job-name)-$(index)-$(random-string)`,
// the Pod hostname takes the form `$(job-name)-$(index)`.
//
// This field is alpha-level and is only honored by servers that enable the
// IndexedJob feature gate. More completion modes can be added in the future.
// This field is beta-level. More completion modes can be added in the future.
// If the Job controller observes a mode that it doesn't recognize, the
// controller skips updates for the Job.
// +optional
Expand Down
Loading

0 comments on commit 548fb43

Please sign in to comment.