Skip to content

Commit

Permalink
Allow marking UpgradeJobHooks disruptive
Browse files Browse the repository at this point in the history
  • Loading branch information
bastjan committed Sep 12, 2023
1 parent 49856db commit 93dd67c
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 18 deletions.
34 changes: 25 additions & 9 deletions api/v1beta1/upgradejobhook_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type UpgradeJobHookSpec struct {
// More advanced failure policies can be handled through the upstream Job failure handling mechanisms.
// +kubebuilder:validation:Enum=Abort;Ignore
FailurePolicy string `json:"failurePolicy,omitempty"`
// Disruptive defines if the code run by the hook is potentially disruptive.
// Added to the job metrics and injected as an environment variable to all hooks matching the job.
// This is currently only informational, but can be used to make decisions in jobs.
// The default is `false`.
Disruptive bool `json:"disruptive,omitempty"`
// Selector is the label selector that determines which upgrade jobs the hook is executed for.
Selector metav1.LabelSelector `json:"selector,omitempty"`
// Template is the job template that is executed.
Expand Down Expand Up @@ -125,24 +130,17 @@ type UpgradeJobHook struct {
}

// Claim claims the hook for the given claimer.
// Returns true if the hook was claimed, false if it was already claimed.
// Returns true if the hook was claimed, or does not need to be claimed, false if it was already claimed.
// Second return value is true if the hooks status was updated.
func (u *UpgradeJobHook) Claim(claimer client.Object) (ok, updated bool) {
if u.Spec.GetRun() != RunNext {
return true, false
}

ref := ClaimReference{
APIVersion: claimer.GetObjectKind().GroupVersionKind().GroupVersion().String(),
Kind: claimer.GetObjectKind().GroupVersionKind().Kind,
Name: claimer.GetName(),
UID: claimer.GetUID(),
}

ref := buildClaimReference(claimer)
if u.Status.ClaimedBy == ref {
return true, false
}

if u.Status.ClaimedBy == (ClaimReference{}) {
u.Status.ClaimedBy = ref
return true, true
Expand All @@ -151,6 +149,24 @@ func (u *UpgradeJobHook) Claim(claimer client.Object) (ok, updated bool) {
return false, false
}

// WouldExecute returns true if the hook would be executed for the given claimer.
func (u *UpgradeJobHook) WouldExecute(claimer client.Object) bool {
if u.Spec.GetRun() != RunNext {
return true
}

return u.Status.ClaimedBy == (ClaimReference{}) || u.Status.ClaimedBy == buildClaimReference(claimer)
}

func buildClaimReference(claimer client.Object) ClaimReference {
return ClaimReference{
APIVersion: claimer.GetObjectKind().GroupVersionKind().GroupVersion().String(),
Kind: claimer.GetObjectKind().GroupVersionKind().Kind,
Name: claimer.GetName(),
UID: claimer.GetUID(),
}
}

//+kubebuilder:object:root=true

// UpgradeJobHookList contains a list of UpgradeJobHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ spec:
spec:
description: UpgradeJobHookSpec defines the desired state of UpgradeJobHook
properties:
disruptive:
description: Disruptive defines if the code run by the hook is potentially
disruptive. Added to the job metrics and injected as an environment
variable to all hooks matching the job. This is currently only informational,
but can be used to make decisions in jobs. The default is `false`.
type: boolean
events:
description: Events is the list of events to trigger the hook to be
executed. `Create`, `Start`, and `UpgradeComplete` are the events
Expand Down
36 changes: 32 additions & 4 deletions controllers/upgradejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,16 @@ func (r *UpgradeJobReconciler) executeHooks(ctx context.Context, uj *managedupgr
}
}

var hasMatchingDisruptiveHook bool
for _, hook := range hooks {
hasMatchingDisruptiveHook = hasMatchingDisruptiveHook || hook.Spec.Disruptive
}

activeJobs := []string{}
errors := []error{}
failedJobs := []string{}
for _, hook := range hooks {
jobs, err := r.jobForUpgradeJobAndHook(ctx, uj, hook, event)
jobs, err := r.jobForUpgradeJobAndHook(ctx, uj, hook, event, hookJobMeta{MatchesDisruptiveHooks: hasMatchingDisruptiveHook})
if err != nil {
errors = append(errors, err)
continue
Expand Down Expand Up @@ -496,7 +501,17 @@ func (r *UpgradeJobReconciler) executeHooks(ctx context.Context, uj *managedupgr
return true, nil
}

func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob, hook managedupgradev1beta1.UpgradeJobHook, event managedupgradev1beta1.UpgradeEvent) ([]managedupgradev1beta1.HookJobTracker, error) {
type hookJobMeta struct {
MatchesDisruptiveHooks bool `json:"matchesDisruptiveHooks"`
}

func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(
ctx context.Context,
uj *managedupgradev1beta1.UpgradeJob,
hook managedupgradev1beta1.UpgradeJobHook,
event managedupgradev1beta1.UpgradeEvent,
meta hookJobMeta,
) ([]managedupgradev1beta1.HookJobTracker, error) {
var jobs batchv1.JobList
if err := r.List(ctx, &jobs, client.InNamespace(uj.Namespace), client.MatchingLabels(jobLabels(uj.Name, hook.Name, event))); err != nil {
return nil, err
Expand All @@ -518,15 +533,21 @@ func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj *
return jobStatus, nil
}

_, err := r.createHookJob(ctx, hook, uj, event)
_, err := r.createHookJob(ctx, hook, uj, event, meta)
return []managedupgradev1beta1.HookJobTracker{{
HookEvent: string(event),
UpgradeJobHookName: hook.Name,
Status: managedupgradev1beta1.HookJobTrackerStatusActive,
}}, err
}

func (r *UpgradeJobReconciler) createHookJob(ctx context.Context, hook managedupgradev1beta1.UpgradeJobHook, uj *managedupgradev1beta1.UpgradeJob, event managedupgradev1beta1.UpgradeEvent) (batchv1.Job, error) {
func (r *UpgradeJobReconciler) createHookJob(
ctx context.Context,
hook managedupgradev1beta1.UpgradeJobHook,
uj *managedupgradev1beta1.UpgradeJob,
event managedupgradev1beta1.UpgradeEvent,
meta hookJobMeta,
) (batchv1.Job, error) {
l := log.FromContext(ctx)
tmpl := hook.Spec.Template.DeepCopy()

Expand All @@ -543,13 +564,20 @@ func (r *UpgradeJobReconciler) createHookJob(ctx context.Context, hook managedup
return batchv1.Job{}, fmt.Errorf("failed to normalize upgrade job: %w", err)
}

normalizedMeta, err := normalizeAsJson(meta)
if err != nil {
return batchv1.Job{}, fmt.Errorf("failed to normalize hook job meta: %w", err)
}

evm := map[string]any{
"EVENT": normalizedEvent,
"JOB": normalizedUJ,
"META": normalizedMeta,
}

flattenInto("EVENT", normalizedEvent, evm)
flattenInto("JOB", normalizedUJ, evm)
flattenInto("META", normalizedMeta, evm)

envs := make([]corev1.EnvVar, 0, len(evm))
for k, v := range evm {
Expand Down
110 changes: 110 additions & 0 deletions controllers/upgradejob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,119 @@ func Test_UpgradeJobReconciler_Reconcile_HookJobContainerEnv(t *testing.T) {
requireEnv(t, c.Env, "JOB_status_conditions_0_type", matchStr("\"Started\""))
requireEnv(t, c.Env, "EVENT", isJsonObj)
requireEnv(t, c.Env, "EVENT_name", matchStr("\"Create\""))
requireEnv(t, c.Env, "META", isJsonObj)
requireEnv(t, c.Env, "META_matchesDisruptiveHooks", matchStr("false"))
}
}

func Test_UpgradeJobReconciler_Reconcile_Disruptive(t *testing.T) {
clock := mockClock{now: time.Date(2022, 12, 4, 22, 45, 0, 0, time.UTC)}

upgradeJob := &managedupgradev1beta1.UpgradeJob{
ObjectMeta: metav1.ObjectMeta{
Name: "upgrade-1234-4-5-13",
Namespace: "appuio-openshift-upgrade-controller",
Labels: map[string]string{"test": "test"},
},
Spec: managedupgradev1beta1.UpgradeJobSpec{},
Status: managedupgradev1beta1.UpgradeJobStatus{
Conditions: []metav1.Condition{
{
Type: managedupgradev1beta1.UpgradeJobConditionStarted,
Status: metav1.ConditionTrue,
},
},
},
}
disruptiveJobHook := &managedupgradev1beta1.UpgradeJobHook{
ObjectMeta: metav1.ObjectMeta{
Name: "disruptive",
Namespace: "appuio-openshift-upgrade-controller",
},
Spec: managedupgradev1beta1.UpgradeJobHookSpec{
Selector: metav1.LabelSelector{
MatchLabels: upgradeJob.Labels,
},
FailurePolicy: managedupgradev1beta1.FailurePolicyAbort,
Events: []managedupgradev1beta1.UpgradeEvent{
managedupgradev1beta1.EventCreate,
},
Disruptive: true,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
},
},
},
},
},
},
},
}
otherJobHook := &managedupgradev1beta1.UpgradeJobHook{
ObjectMeta: metav1.ObjectMeta{
Name: "other",
Namespace: "appuio-openshift-upgrade-controller",
},
Spec: managedupgradev1beta1.UpgradeJobHookSpec{
Selector: metav1.LabelSelector{
MatchLabels: upgradeJob.Labels,
},
FailurePolicy: managedupgradev1beta1.FailurePolicyAbort,
Events: []managedupgradev1beta1.UpgradeEvent{
managedupgradev1beta1.EventCreate,
},
Disruptive: false,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
},
},
},
},
},
},
},
}

c := controllerClient(t, upgradeJob, disruptiveJobHook, otherJobHook,
&configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
}})

subject := &UpgradeJobReconciler{
Client: c,
Scheme: c.Scheme(),

Clock: &clock,

ManagedUpstreamClusterVersionName: "version",
}

matchStr := func(str string) func(v string) (bool, error) {
return func(v string) (bool, error) {
return v == str, nil
}
}

job := checkAndCompleteHook(t, c, subject, upgradeJob, disruptiveJobHook, managedupgradev1beta1.EventCreate, true)
require.Equal(t, 1, len(job.Spec.Template.Spec.Containers))
requireEnv(t, job.Spec.Template.Spec.Containers[0].Env, "META_matchesDisruptiveHooks", matchStr("true"))

job = checkAndCompleteHook(t, c, subject, upgradeJob, otherJobHook, managedupgradev1beta1.EventCreate, true)
require.Equal(t, 1, len(job.Spec.Template.Spec.Containers))
requireEnv(t, job.Spec.Template.Spec.Containers[0].Env, "META_matchesDisruptiveHooks", matchStr("true"))
}

func Test_UpgradeJobReconciler_Reconcile_ClaimNextHook(t *testing.T) {
ctx := context.Background()
clock := mockClock{now: time.Date(2022, 12, 4, 22, 45, 0, 0, time.UTC)}
Expand Down
30 changes: 30 additions & 0 deletions controllers/upgrading_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
machineconfigurationv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
"github.com/prometheus/client_golang/prometheus"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

managedupgradev1beta1 "github.com/appuio/openshift-upgrade-controller/api/v1beta1"
"github.com/appuio/openshift-upgrade-controller/pkg/clusterversion"
Expand Down Expand Up @@ -46,6 +49,7 @@ var jobStates = prometheus.NewDesc(
"desired_version_image",
"desired_version_version",
"state",
"matches_disruptive_hooks",
},
nil,
)
Expand Down Expand Up @@ -106,6 +110,13 @@ func (m *ClusterUpgradingMetric) Collect(ch chan<- prometheus.Metric) {
var jobs managedupgradev1beta1.UpgradeJobList
if err := m.Client.List(ctx, &jobs); err != nil {
ch <- prometheus.NewInvalidMetric(jobStates, fmt.Errorf("failed to list upgrade jobs: %w", err))
return
}

var jobsHooks managedupgradev1beta1.UpgradeJobHookList
if err := m.Client.List(ctx, &jobsHooks); err != nil {
ch <- prometheus.NewInvalidMetric(jobStates, fmt.Errorf("failed to list upgrade job hooks: %w", err))
return
}

for _, job := range jobs.Items {
Expand All @@ -124,6 +135,7 @@ func (m *ClusterUpgradingMetric) Collect(ch chan<- prometheus.Metric) {
v.Image,
v.Version,
jobState(job),
strconv.FormatBool(jobHasMatchingDisruptiveHook(job, jobsHooks)),
)
}
}
Expand All @@ -145,3 +157,21 @@ func jobState(job managedupgradev1beta1.UpgradeJob) string {
}
return "pending"
}

func jobHasMatchingDisruptiveHook(job managedupgradev1beta1.UpgradeJob, hooks managedupgradev1beta1.UpgradeJobHookList) bool {
for _, hook := range hooks.Items {
sel, err := metav1.LabelSelectorAsSelector(&hook.Spec.Selector)
if err != nil {
log.Log.Error(err, "failed to parse hook selector")
continue
}
if !sel.Matches(labels.Set(job.Labels)) {
continue
}
if hook.WouldExecute(&job) && hook.Spec.Disruptive {
return true
}
}

return false
}
Loading

0 comments on commit 93dd67c

Please sign in to comment.