Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow marking UpgradeJobHooks disruptive #45

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions api/v1beta1/upgradejobhook_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type UpgradeJobHookSpec struct {
// More advanced failure policies can be handled through the upstream Job failure handling mechanisms.
// +kubebuilder:validation:Enum=Abort;Ignore
FailurePolicy string `json:"failurePolicy,omitempty"`
// Disruptive defines if the code run by the hook is potentially disruptive.
// Added to the job metrics and injected as an environment variable to all hooks matching the job.
// This is currently only informational, but can be used to make decisions in jobs.
// The default is `false`.
Disruptive bool `json:"disruptive,omitempty"`
// Selector is the label selector that determines which upgrade jobs the hook is executed for.
Selector metav1.LabelSelector `json:"selector,omitempty"`
// Template is the job template that is executed.
Expand Down Expand Up @@ -125,24 +130,17 @@ type UpgradeJobHook struct {
}

// Claim claims the hook for the given claimer.
// Returns true if the hook was claimed, false if it was already claimed.
// Returns true if the hook was claimed, or does not need to be claimed, false if it was already claimed.
// Second return value is true if the hooks status was updated.
func (u *UpgradeJobHook) Claim(claimer client.Object) (ok, updated bool) {
if u.Spec.GetRun() != RunNext {
return true, false
}

ref := ClaimReference{
APIVersion: claimer.GetObjectKind().GroupVersionKind().GroupVersion().String(),
Kind: claimer.GetObjectKind().GroupVersionKind().Kind,
Name: claimer.GetName(),
UID: claimer.GetUID(),
}

ref := buildClaimReference(claimer)
if u.Status.ClaimedBy == ref {
return true, false
}

if u.Status.ClaimedBy == (ClaimReference{}) {
u.Status.ClaimedBy = ref
return true, true
Expand All @@ -151,6 +149,24 @@ func (u *UpgradeJobHook) Claim(claimer client.Object) (ok, updated bool) {
return false, false
}

// WouldExecute returns true if the hook would be executed for the given claimer.
func (u *UpgradeJobHook) WouldExecute(claimer client.Object) bool {
if u.Spec.GetRun() != RunNext {
return true
}

return u.Status.ClaimedBy == (ClaimReference{}) || u.Status.ClaimedBy == buildClaimReference(claimer)
}

func buildClaimReference(claimer client.Object) ClaimReference {
return ClaimReference{
APIVersion: claimer.GetObjectKind().GroupVersionKind().GroupVersion().String(),
Kind: claimer.GetObjectKind().GroupVersionKind().Kind,
Name: claimer.GetName(),
UID: claimer.GetUID(),
}
}

//+kubebuilder:object:root=true

// UpgradeJobHookList contains a list of UpgradeJobHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ spec:
spec:
description: UpgradeJobHookSpec defines the desired state of UpgradeJobHook
properties:
disruptive:
description: Disruptive defines if the code run by the hook is potentially
disruptive. Added to the job metrics and injected as an environment
variable to all hooks matching the job. This is currently only informational,
but can be used to make decisions in jobs. The default is `false`.
type: boolean
events:
description: Events is the list of events to trigger the hook to be
executed. `Create`, `Start`, and `UpgradeComplete` are the events
Expand Down
36 changes: 32 additions & 4 deletions controllers/upgradejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,16 @@ func (r *UpgradeJobReconciler) executeHooks(ctx context.Context, uj *managedupgr
}
}

var hasMatchingDisruptiveHook bool
for _, hook := range hooks {
hasMatchingDisruptiveHook = hasMatchingDisruptiveHook || hook.Spec.Disruptive
}

activeJobs := []string{}
errors := []error{}
failedJobs := []string{}
for _, hook := range hooks {
jobs, err := r.jobForUpgradeJobAndHook(ctx, uj, hook, event)
jobs, err := r.jobForUpgradeJobAndHook(ctx, uj, hook, event, hookJobMeta{MatchesDisruptiveHooks: hasMatchingDisruptiveHook})
if err != nil {
errors = append(errors, err)
continue
Expand Down Expand Up @@ -496,7 +501,17 @@ func (r *UpgradeJobReconciler) executeHooks(ctx context.Context, uj *managedupgr
return true, nil
}

func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob, hook managedupgradev1beta1.UpgradeJobHook, event managedupgradev1beta1.UpgradeEvent) ([]managedupgradev1beta1.HookJobTracker, error) {
type hookJobMeta struct {
MatchesDisruptiveHooks bool `json:"matchesDisruptiveHooks"`
}

func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(
ctx context.Context,
uj *managedupgradev1beta1.UpgradeJob,
hook managedupgradev1beta1.UpgradeJobHook,
event managedupgradev1beta1.UpgradeEvent,
meta hookJobMeta,
) ([]managedupgradev1beta1.HookJobTracker, error) {
var jobs batchv1.JobList
if err := r.List(ctx, &jobs, client.InNamespace(uj.Namespace), client.MatchingLabels(jobLabels(uj.Name, hook.Name, event))); err != nil {
return nil, err
Expand All @@ -518,15 +533,21 @@ func (r *UpgradeJobReconciler) jobForUpgradeJobAndHook(ctx context.Context, uj *
return jobStatus, nil
}

_, err := r.createHookJob(ctx, hook, uj, event)
_, err := r.createHookJob(ctx, hook, uj, event, meta)
return []managedupgradev1beta1.HookJobTracker{{
HookEvent: string(event),
UpgradeJobHookName: hook.Name,
Status: managedupgradev1beta1.HookJobTrackerStatusActive,
}}, err
}

func (r *UpgradeJobReconciler) createHookJob(ctx context.Context, hook managedupgradev1beta1.UpgradeJobHook, uj *managedupgradev1beta1.UpgradeJob, event managedupgradev1beta1.UpgradeEvent) (batchv1.Job, error) {
func (r *UpgradeJobReconciler) createHookJob(
ctx context.Context,
hook managedupgradev1beta1.UpgradeJobHook,
uj *managedupgradev1beta1.UpgradeJob,
event managedupgradev1beta1.UpgradeEvent,
meta hookJobMeta,
) (batchv1.Job, error) {
l := log.FromContext(ctx)
tmpl := hook.Spec.Template.DeepCopy()

Expand All @@ -543,13 +564,20 @@ func (r *UpgradeJobReconciler) createHookJob(ctx context.Context, hook managedup
return batchv1.Job{}, fmt.Errorf("failed to normalize upgrade job: %w", err)
}

normalizedMeta, err := normalizeAsJson(meta)
if err != nil {
return batchv1.Job{}, fmt.Errorf("failed to normalize hook job meta: %w", err)
}

evm := map[string]any{
"EVENT": normalizedEvent,
"JOB": normalizedUJ,
"META": normalizedMeta,
}

flattenInto("EVENT", normalizedEvent, evm)
flattenInto("JOB", normalizedUJ, evm)
flattenInto("META", normalizedMeta, evm)

envs := make([]corev1.EnvVar, 0, len(evm))
for k, v := range evm {
Expand Down
110 changes: 110 additions & 0 deletions controllers/upgradejob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,119 @@ func Test_UpgradeJobReconciler_Reconcile_HookJobContainerEnv(t *testing.T) {
requireEnv(t, c.Env, "JOB_status_conditions_0_type", matchStr("\"Started\""))
requireEnv(t, c.Env, "EVENT", isJsonObj)
requireEnv(t, c.Env, "EVENT_name", matchStr("\"Create\""))
requireEnv(t, c.Env, "META", isJsonObj)
requireEnv(t, c.Env, "META_matchesDisruptiveHooks", matchStr("false"))
}
}

func Test_UpgradeJobReconciler_Reconcile_Disruptive(t *testing.T) {
clock := mockClock{now: time.Date(2022, 12, 4, 22, 45, 0, 0, time.UTC)}

upgradeJob := &managedupgradev1beta1.UpgradeJob{
ObjectMeta: metav1.ObjectMeta{
Name: "upgrade-1234-4-5-13",
Namespace: "appuio-openshift-upgrade-controller",
Labels: map[string]string{"test": "test"},
},
Spec: managedupgradev1beta1.UpgradeJobSpec{},
Status: managedupgradev1beta1.UpgradeJobStatus{
Conditions: []metav1.Condition{
{
Type: managedupgradev1beta1.UpgradeJobConditionStarted,
Status: metav1.ConditionTrue,
},
},
},
}
disruptiveJobHook := &managedupgradev1beta1.UpgradeJobHook{
ObjectMeta: metav1.ObjectMeta{
Name: "disruptive",
Namespace: "appuio-openshift-upgrade-controller",
},
Spec: managedupgradev1beta1.UpgradeJobHookSpec{
Selector: metav1.LabelSelector{
MatchLabels: upgradeJob.Labels,
},
FailurePolicy: managedupgradev1beta1.FailurePolicyAbort,
Events: []managedupgradev1beta1.UpgradeEvent{
managedupgradev1beta1.EventCreate,
},
Disruptive: true,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
},
},
},
},
},
},
},
}
otherJobHook := &managedupgradev1beta1.UpgradeJobHook{
ObjectMeta: metav1.ObjectMeta{
Name: "other",
Namespace: "appuio-openshift-upgrade-controller",
},
Spec: managedupgradev1beta1.UpgradeJobHookSpec{
Selector: metav1.LabelSelector{
MatchLabels: upgradeJob.Labels,
},
FailurePolicy: managedupgradev1beta1.FailurePolicyAbort,
Events: []managedupgradev1beta1.UpgradeEvent{
managedupgradev1beta1.EventCreate,
},
Disruptive: false,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
},
},
},
},
},
},
},
}

c := controllerClient(t, upgradeJob, disruptiveJobHook, otherJobHook,
&configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
}})

subject := &UpgradeJobReconciler{
Client: c,
Scheme: c.Scheme(),

Clock: &clock,

ManagedUpstreamClusterVersionName: "version",
}

matchStr := func(str string) func(v string) (bool, error) {
return func(v string) (bool, error) {
return v == str, nil
}
}

job := checkAndCompleteHook(t, c, subject, upgradeJob, disruptiveJobHook, managedupgradev1beta1.EventCreate, true)
require.Equal(t, 1, len(job.Spec.Template.Spec.Containers))
requireEnv(t, job.Spec.Template.Spec.Containers[0].Env, "META_matchesDisruptiveHooks", matchStr("true"))

job = checkAndCompleteHook(t, c, subject, upgradeJob, otherJobHook, managedupgradev1beta1.EventCreate, true)
require.Equal(t, 1, len(job.Spec.Template.Spec.Containers))
requireEnv(t, job.Spec.Template.Spec.Containers[0].Env, "META_matchesDisruptiveHooks", matchStr("true"))
}

func Test_UpgradeJobReconciler_Reconcile_ClaimNextHook(t *testing.T) {
ctx := context.Background()
clock := mockClock{now: time.Date(2022, 12, 4, 22, 45, 0, 0, time.UTC)}
Expand Down
30 changes: 30 additions & 0 deletions controllers/upgrading_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
machineconfigurationv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
"github.com/prometheus/client_golang/prometheus"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

managedupgradev1beta1 "github.com/appuio/openshift-upgrade-controller/api/v1beta1"
"github.com/appuio/openshift-upgrade-controller/pkg/clusterversion"
Expand Down Expand Up @@ -46,6 +49,7 @@ var jobStates = prometheus.NewDesc(
"desired_version_image",
"desired_version_version",
"state",
"matches_disruptive_hooks",
},
nil,
)
Expand Down Expand Up @@ -106,6 +110,13 @@ func (m *ClusterUpgradingMetric) Collect(ch chan<- prometheus.Metric) {
var jobs managedupgradev1beta1.UpgradeJobList
if err := m.Client.List(ctx, &jobs); err != nil {
ch <- prometheus.NewInvalidMetric(jobStates, fmt.Errorf("failed to list upgrade jobs: %w", err))
return
}

var jobsHooks managedupgradev1beta1.UpgradeJobHookList
if err := m.Client.List(ctx, &jobsHooks); err != nil {
ch <- prometheus.NewInvalidMetric(jobStates, fmt.Errorf("failed to list upgrade job hooks: %w", err))
return
}

for _, job := range jobs.Items {
Expand All @@ -124,6 +135,7 @@ func (m *ClusterUpgradingMetric) Collect(ch chan<- prometheus.Metric) {
v.Image,
v.Version,
jobState(job),
strconv.FormatBool(jobHasMatchingDisruptiveHook(job, jobsHooks)),
)
}
}
Expand All @@ -145,3 +157,21 @@ func jobState(job managedupgradev1beta1.UpgradeJob) string {
}
return "pending"
}

func jobHasMatchingDisruptiveHook(job managedupgradev1beta1.UpgradeJob, hooks managedupgradev1beta1.UpgradeJobHookList) bool {
for _, hook := range hooks.Items {
sel, err := metav1.LabelSelectorAsSelector(&hook.Spec.Selector)
if err != nil {
log.Log.Error(err, "failed to parse hook selector")
continue
}
if !sel.Matches(labels.Set(job.Labels)) {
continue
}
if hook.WouldExecute(&job) && hook.Spec.Disruptive {
return true
}
}

return false
}
Loading