diff --git a/api/v1beta1/upgradejobhook_types.go b/api/v1beta1/upgradejobhook_types.go index 23bead4..a5dfc31 100644 --- a/api/v1beta1/upgradejobhook_types.go +++ b/api/v1beta1/upgradejobhook_types.go @@ -70,6 +70,11 @@ type UpgradeJobHookSpec struct { // More advanced failure policies can be handled through the upstream Job failure handling mechanisms. // +kubebuilder:validation:Enum=Abort;Ignore FailurePolicy string `json:"failurePolicy,omitempty"` + // Disruptive defines if the code run by the hook is potentially disruptive. + // Added to the job metrics and injected as an environment variable to all hooks matching the job. + // This is currently only informational, but can be used to make decisions in jobs. + // The default is `false`. + Disruptive bool `json:"disruptive,omitempty"` // Selector is the label selector that determines which upgrade jobs the hook is executed for. Selector metav1.LabelSelector `json:"selector,omitempty"` // Template is the job template that is executed. @@ -125,24 +130,17 @@ type UpgradeJobHook struct { } // Claim claims the hook for the given claimer. -// Returns true if the hook was claimed, false if it was already claimed. +// Returns true if the hook was claimed, or does not need to be claimed, false if it was already claimed. // Second return value is true if the hooks status was updated. func (u *UpgradeJobHook) Claim(claimer client.Object) (ok, updated bool) { if u.Spec.GetRun() != RunNext { return true, false } - ref := ClaimReference{ - APIVersion: claimer.GetObjectKind().GroupVersionKind().GroupVersion().String(), - Kind: claimer.GetObjectKind().GroupVersionKind().Kind, - Name: claimer.GetName(), - UID: claimer.GetUID(), - } - + ref := buildClaimReference(claimer) if u.Status.ClaimedBy == ref { return true, false } - if u.Status.ClaimedBy == (ClaimReference{}) { u.Status.ClaimedBy = ref return true, true @@ -151,6 +149,24 @@ func (u *UpgradeJobHook) Claim(claimer client.Object) (ok, updated bool) { return false, false } +// WouldExecute returns true if the hook would be executed for the given claimer. +func (u *UpgradeJobHook) WouldExecute(claimer client.Object) bool { + if u.Spec.GetRun() != RunNext { + return true + } + + return u.Status.ClaimedBy == (ClaimReference{}) || u.Status.ClaimedBy == buildClaimReference(claimer) +} + +func buildClaimReference(claimer client.Object) ClaimReference { + return ClaimReference{ + APIVersion: claimer.GetObjectKind().GroupVersionKind().GroupVersion().String(), + Kind: claimer.GetObjectKind().GroupVersionKind().Kind, + Name: claimer.GetName(), + UID: claimer.GetUID(), + } +} + //+kubebuilder:object:root=true // UpgradeJobHookList contains a list of UpgradeJobHook diff --git a/config/crd/bases/managedupgrade.appuio.io_upgradejobhooks.yaml b/config/crd/bases/managedupgrade.appuio.io_upgradejobhooks.yaml index 9ddec69..b3ea934 100644 --- a/config/crd/bases/managedupgrade.appuio.io_upgradejobhooks.yaml +++ b/config/crd/bases/managedupgrade.appuio.io_upgradejobhooks.yaml @@ -34,6 +34,12 @@ spec: spec: description: UpgradeJobHookSpec defines the desired state of UpgradeJobHook properties: + disruptive: + description: Disruptive defines if the code run by the hook is potentially + disruptive. Added to the job metrics and injected as an environment + variable to all hooks matching the job. This is currently only informational, + but can be used to make decisions in jobs. The default is `false`. + type: boolean events: description: Events is the list of events to trigger the hook to be executed. `Create`, `Start`, and `UpgradeComplete` are the events diff --git a/controllers/upgradejob_controller.go b/controllers/upgradejob_controller.go index f5a0302..d3c1550 100644 --- a/controllers/upgradejob_controller.go +++ b/controllers/upgradejob_controller.go @@ -449,11 +449,16 @@ func (r *UpgradeJobReconciler) executeHooks(ctx context.Context, uj *managedupgr } } + var hasMatchingDisruptiveHook bool + for _, hook := range hooks { + hasMatchingDisruptiveHook = hasMatchingDisruptiveHook || hook.Spec.Disruptive + } + activeJobs := []string{} errors := []error{} failedJobs := []string{} for _, hook := range hooks { - jobs, err := r.jobForUpgradeJobAndHook(ctx, uj, hook, event) + jobs, err := r.jobForUpgradeJobAndHook(ctx, uj, hook, event, hookJobMeta{MatchesDisruptiveHooks: hasMatchingDisruptiveHook}) if err != nil { errors = append(errors, err) continue @@ -496,7 +501,17 @@ func (r *UpgradeJobReconciler) executeHooks(ctx context.Context, uj *managedupgr return true, nil } -func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob, hook managedupgradev1beta1.UpgradeJobHook, event managedupgradev1beta1.UpgradeEvent) ([]managedupgradev1beta1.HookJobTracker, error) { +type hookJobMeta struct { + MatchesDisruptiveHooks bool `json:"matchesDisruptiveHooks"` +} + +func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook( + ctx context.Context, + uj *managedupgradev1beta1.UpgradeJob, + hook managedupgradev1beta1.UpgradeJobHook, + event managedupgradev1beta1.UpgradeEvent, + meta hookJobMeta, +) ([]managedupgradev1beta1.HookJobTracker, error) { var jobs batchv1.JobList if err := r.List(ctx, &jobs, client.InNamespace(uj.Namespace), client.MatchingLabels(jobLabels(uj.Name, hook.Name, event))); err != nil { return nil, err @@ -518,7 +533,7 @@ func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj * return jobStatus, nil } - _, err := r.createHookJob(ctx, hook, uj, event) + _, err := r.createHookJob(ctx, hook, uj, event, meta) return []managedupgradev1beta1.HookJobTracker{{ HookEvent: string(event), UpgradeJobHookName: hook.Name, @@ -526,7 +541,13 @@ func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj * }}, err } -func (r *UpgradeJobReconciler) createHookJob(ctx context.Context, hook managedupgradev1beta1.UpgradeJobHook, uj *managedupgradev1beta1.UpgradeJob, event managedupgradev1beta1.UpgradeEvent) (batchv1.Job, error) { +func (r *UpgradeJobReconciler) createHookJob( + ctx context.Context, + hook managedupgradev1beta1.UpgradeJobHook, + uj *managedupgradev1beta1.UpgradeJob, + event managedupgradev1beta1.UpgradeEvent, + meta hookJobMeta, +) (batchv1.Job, error) { l := log.FromContext(ctx) tmpl := hook.Spec.Template.DeepCopy() @@ -543,13 +564,20 @@ func (r *UpgradeJobReconciler) createHookJob(ctx context.Context, hook managedup return batchv1.Job{}, fmt.Errorf("failed to normalize upgrade job: %w", err) } + normalizedMeta, err := normalizeAsJson(meta) + if err != nil { + return batchv1.Job{}, fmt.Errorf("failed to normalize hook job meta: %w", err) + } + evm := map[string]any{ "EVENT": normalizedEvent, "JOB": normalizedUJ, + "META": normalizedMeta, } flattenInto("EVENT", normalizedEvent, evm) flattenInto("JOB", normalizedUJ, evm) + flattenInto("META", normalizedMeta, evm) envs := make([]corev1.EnvVar, 0, len(evm)) for k, v := range evm { diff --git a/controllers/upgradejob_controller_test.go b/controllers/upgradejob_controller_test.go index b3eaddc..79ff049 100644 --- a/controllers/upgradejob_controller_test.go +++ b/controllers/upgradejob_controller_test.go @@ -516,9 +516,119 @@ func Test_UpgradeJobReconciler_Reconcile_HookJobContainerEnv(t *testing.T) { requireEnv(t, c.Env, "JOB_status_conditions_0_type", matchStr("\"Started\"")) requireEnv(t, c.Env, "EVENT", isJsonObj) requireEnv(t, c.Env, "EVENT_name", matchStr("\"Create\"")) + requireEnv(t, c.Env, "META", isJsonObj) + requireEnv(t, c.Env, "META_matchesDisruptiveHooks", matchStr("false")) } } +func Test_UpgradeJobReconciler_Reconcile_Disruptive(t *testing.T) { + clock := mockClock{now: time.Date(2022, 12, 4, 22, 45, 0, 0, time.UTC)} + + upgradeJob := &managedupgradev1beta1.UpgradeJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "upgrade-1234-4-5-13", + Namespace: "appuio-openshift-upgrade-controller", + Labels: map[string]string{"test": "test"}, + }, + Spec: managedupgradev1beta1.UpgradeJobSpec{}, + Status: managedupgradev1beta1.UpgradeJobStatus{ + Conditions: []metav1.Condition{ + { + Type: managedupgradev1beta1.UpgradeJobConditionStarted, + Status: metav1.ConditionTrue, + }, + }, + }, + } + disruptiveJobHook := &managedupgradev1beta1.UpgradeJobHook{ + ObjectMeta: metav1.ObjectMeta{ + Name: "disruptive", + Namespace: "appuio-openshift-upgrade-controller", + }, + Spec: managedupgradev1beta1.UpgradeJobHookSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: upgradeJob.Labels, + }, + FailurePolicy: managedupgradev1beta1.FailurePolicyAbort, + Events: []managedupgradev1beta1.UpgradeEvent{ + managedupgradev1beta1.EventCreate, + }, + Disruptive: true, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + }, + }, + }, + }, + }, + }, + }, + } + otherJobHook := &managedupgradev1beta1.UpgradeJobHook{ + ObjectMeta: metav1.ObjectMeta{ + Name: "other", + Namespace: "appuio-openshift-upgrade-controller", + }, + Spec: managedupgradev1beta1.UpgradeJobHookSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: upgradeJob.Labels, + }, + FailurePolicy: managedupgradev1beta1.FailurePolicyAbort, + Events: []managedupgradev1beta1.UpgradeEvent{ + managedupgradev1beta1.EventCreate, + }, + Disruptive: false, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + }, + }, + }, + }, + }, + }, + }, + } + + c := controllerClient(t, upgradeJob, disruptiveJobHook, otherJobHook, + &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + }}) + + subject := &UpgradeJobReconciler{ + Client: c, + Scheme: c.Scheme(), + + Clock: &clock, + + ManagedUpstreamClusterVersionName: "version", + } + + matchStr := func(str string) func(v string) (bool, error) { + return func(v string) (bool, error) { + return v == str, nil + } + } + + job := checkAndCompleteHook(t, c, subject, upgradeJob, disruptiveJobHook, managedupgradev1beta1.EventCreate, true) + require.Equal(t, 1, len(job.Spec.Template.Spec.Containers)) + requireEnv(t, job.Spec.Template.Spec.Containers[0].Env, "META_matchesDisruptiveHooks", matchStr("true")) + + job = checkAndCompleteHook(t, c, subject, upgradeJob, otherJobHook, managedupgradev1beta1.EventCreate, true) + require.Equal(t, 1, len(job.Spec.Template.Spec.Containers)) + requireEnv(t, job.Spec.Template.Spec.Containers[0].Env, "META_matchesDisruptiveHooks", matchStr("true")) +} + func Test_UpgradeJobReconciler_Reconcile_ClaimNextHook(t *testing.T) { ctx := context.Background() clock := mockClock{now: time.Date(2022, 12, 4, 22, 45, 0, 0, time.UTC)} diff --git a/controllers/upgrading_metrics.go b/controllers/upgrading_metrics.go index 3dbdaf2..74b087e 100644 --- a/controllers/upgrading_metrics.go +++ b/controllers/upgrading_metrics.go @@ -10,8 +10,11 @@ import ( machineconfigurationv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" "github.com/prometheus/client_golang/prometheus" apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" managedupgradev1beta1 "github.com/appuio/openshift-upgrade-controller/api/v1beta1" "github.com/appuio/openshift-upgrade-controller/pkg/clusterversion" @@ -46,6 +49,7 @@ var jobStates = prometheus.NewDesc( "desired_version_image", "desired_version_version", "state", + "matches_disruptive_hooks", }, nil, ) @@ -106,6 +110,13 @@ func (m *ClusterUpgradingMetric) Collect(ch chan<- prometheus.Metric) { var jobs managedupgradev1beta1.UpgradeJobList if err := m.Client.List(ctx, &jobs); err != nil { ch <- prometheus.NewInvalidMetric(jobStates, fmt.Errorf("failed to list upgrade jobs: %w", err)) + return + } + + var jobsHooks managedupgradev1beta1.UpgradeJobHookList + if err := m.Client.List(ctx, &jobsHooks); err != nil { + ch <- prometheus.NewInvalidMetric(jobStates, fmt.Errorf("failed to list upgrade job hooks: %w", err)) + return } for _, job := range jobs.Items { @@ -124,6 +135,7 @@ func (m *ClusterUpgradingMetric) Collect(ch chan<- prometheus.Metric) { v.Image, v.Version, jobState(job), + strconv.FormatBool(jobHasMatchingDisruptiveHook(job, jobsHooks)), ) } } @@ -145,3 +157,21 @@ func jobState(job managedupgradev1beta1.UpgradeJob) string { } return "pending" } + +func jobHasMatchingDisruptiveHook(job managedupgradev1beta1.UpgradeJob, hooks managedupgradev1beta1.UpgradeJobHookList) bool { + for _, hook := range hooks.Items { + sel, err := metav1.LabelSelectorAsSelector(&hook.Spec.Selector) + if err != nil { + log.Log.Error(err, "failed to parse hook selector") + continue + } + if !sel.Matches(labels.Set(job.Labels)) { + continue + } + if hook.WouldExecute(&job) && hook.Spec.Disruptive { + return true + } + } + + return false +} diff --git a/controllers/upgrading_metrics_test.go b/controllers/upgrading_metrics_test.go index 7ec48f3..ce30fcd 100644 --- a/controllers/upgrading_metrics_test.go +++ b/controllers/upgrading_metrics_test.go @@ -66,6 +66,72 @@ func Test_ClusterUpgradingMetric(t *testing.T) { }, }, } + disruptiveJob := &managedupgradev1beta1.UpgradeJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "disruptive", + Labels: map[string]string{ + "upgrade": "disruptive", + }, + }, + } + disruptiveJobHook := &managedupgradev1beta1.UpgradeJobHook{ + ObjectMeta: metav1.ObjectMeta{ + Name: disruptiveJob.Name, + }, + Spec: managedupgradev1beta1.UpgradeJobHookSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: disruptiveJob.Labels, + }, + Disruptive: true, + }, + } + disruptiveUnclaimedNextJob := &managedupgradev1beta1.UpgradeJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "disruptive-unclaimed-next", + Labels: map[string]string{ + "upgrade": "disruptive-unclaimed-next", + }, + }, + } + disruptiveUnclaimedNextJobHook := &managedupgradev1beta1.UpgradeJobHook{ + ObjectMeta: metav1.ObjectMeta{ + Name: disruptiveUnclaimedNextJob.Name, + }, + Spec: managedupgradev1beta1.UpgradeJobHookSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: disruptiveUnclaimedNextJob.Labels, + }, + Disruptive: true, + Run: managedupgradev1beta1.RunNext, + }, + } + disruptiveClaimedNextJob := &managedupgradev1beta1.UpgradeJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "disruptive-claimed-next", + Labels: map[string]string{ + "upgrade": "disruptive-claimed-next", + }, + }, + } + disruptiveClaimedNextJobHook := &managedupgradev1beta1.UpgradeJobHook{ + ObjectMeta: metav1.ObjectMeta{ + Name: disruptiveClaimedNextJob.Name, + }, + Spec: managedupgradev1beta1.UpgradeJobHookSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: disruptiveClaimedNextJob.Labels, + }, + Disruptive: true, + Run: managedupgradev1beta1.RunNext, + }, + Status: managedupgradev1beta1.UpgradeJobHookStatus{ + ClaimedBy: managedupgradev1beta1.ClaimReference{ + APIVersion: "v1", + Kind: "Pod", + Name: "someone else", + }, + }, + } activeJob := &managedupgradev1beta1.UpgradeJob{ ObjectMeta: metav1.ObjectMeta{ Name: "active", @@ -111,7 +177,10 @@ func Test_ClusterUpgradingMetric(t *testing.T) { }, }, } - c := controllerClient(t, version, masterPool, workerPool, pendingJob, activeJob, succeededJob, failedJob) + c := controllerClient(t, version, masterPool, workerPool, pendingJob, activeJob, succeededJob, failedJob, + disruptiveJob, disruptiveUnclaimedNextJob, disruptiveClaimedNextJob, + disruptiveJobHook, disruptiveUnclaimedNextJobHook, disruptiveClaimedNextJobHook, + ) subject := &ClusterUpgradingMetric{ Client: c, @@ -157,10 +226,14 @@ openshift_upgrade_controller_machine_config_pools_upgrading{pool="master"} %d openshift_upgrade_controller_machine_config_pools_upgrading{pool="worker"} %d # HELP openshift_upgrade_controller_upgradejob_state Returns the state of jobs in the cluster. 'pending', 'active', 'succeeded', or 'failed' are possible states. # TYPE openshift_upgrade_controller_upgradejob_state gauge -openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="active",upgradejob="active"} 1 -openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="failed",upgradejob="failed"} 1 -openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="succeeded",upgradejob="succeeded"} 1 -openshift_upgrade_controller_upgradejob_state{desired_version_force="true",desired_version_image="quay.io/openshift-release-dev/ocp-release@sha256:26f6d10b18",desired_version_version="4.11.23",start_after="2020-01-20T20:00:00Z",start_before="2020-01-20T21:00:00Z",state="pending",upgradejob="pending"} 1 +openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",matches_disruptive_hooks="false",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="active",upgradejob="active"} 1 +openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",matches_disruptive_hooks="false",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="failed",upgradejob="failed"} 1 +openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",matches_disruptive_hooks="false",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="succeeded",upgradejob="succeeded"} 1 +openshift_upgrade_controller_upgradejob_state{desired_version_force="true",desired_version_image="quay.io/openshift-release-dev/ocp-release@sha256:26f6d10b18",desired_version_version="4.11.23",matches_disruptive_hooks="false",start_after="2020-01-20T20:00:00Z",start_before="2020-01-20T21:00:00Z",state="pending",upgradejob="pending"} 1 + +openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",matches_disruptive_hooks="true",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="pending",upgradejob="disruptive"} 1 +openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",matches_disruptive_hooks="true",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="pending",upgradejob="disruptive-unclaimed-next"} 1 +openshift_upgrade_controller_upgradejob_state{desired_version_force="false",desired_version_image="",desired_version_version="",matches_disruptive_hooks="false",start_after="0001-01-01T00:00:00Z",start_before="0001-01-01T00:00:00Z",state="pending",upgradejob="disruptive-claimed-next"} 1 ` return strings.NewReader( fmt.Sprintf(metrics, b2i(upgrading), b2i(masterUpgrading), b2i(workerUpgrading)),