@@ -64,67 +64,34 @@ type ArmadaServerReconciler struct {
64
64
// move the current state of the cluster closer to the desired state.
65
65
func (r * ArmadaServerReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
66
66
logger := log .FromContext (ctx ).WithValues ("namespace" , req .Namespace , "name" , req .Name )
67
+
67
68
started := time .Now ()
68
- logger .Info ("Reconciling ArmadaServer object" )
69
-
70
- logger .Info ("Fetching ArmadaServer object from cache" )
71
- var as installv1alpha1.ArmadaServer
72
- if err := r .Client .Get (ctx , req .NamespacedName , & as ); err != nil {
73
- if k8serrors .IsNotFound (err ) {
74
- logger .Info ("ArmadaServer not found in cache, ending reconcile..." , "namespace" , req .Namespace , "name" , req .Name )
75
- return ctrl.Result {}, nil
76
- }
69
+
70
+ logger .Info ("Reconciling object" )
71
+
72
+ var server installv1alpha1.ArmadaServer
73
+ if miss , err := getObject (ctx , r .Client , & server , req .NamespacedName , logger ); err != nil || miss {
77
74
return ctrl.Result {}, err
78
75
}
79
76
80
- pc , err := installv1alpha1 .BuildPortConfig (as .Spec .ApplicationConfig )
77
+ pc , err := installv1alpha1 .BuildPortConfig (server .Spec .ApplicationConfig )
81
78
if err != nil {
82
79
return ctrl.Result {}, err
83
80
}
84
- as .Spec .PortConfig = pc
81
+ server .Spec .PortConfig = pc
85
82
86
83
var components * CommonComponents
87
- components , err = generateArmadaServerInstallComponents (& as , r .Scheme )
84
+ components , err = generateArmadaServerInstallComponents (& server , r .Scheme )
88
85
if err != nil {
89
86
return ctrl.Result {}, err
90
87
}
91
88
92
- deletionTimestamp := as .ObjectMeta .DeletionTimestamp
93
- // examine DeletionTimestamp to determine if object is under deletion
94
- if deletionTimestamp .IsZero () {
95
- // The object is not being deleted, so if it does not have our finalizer,
96
- // then lets add the finalizer and update the object. This is equivalent
97
- // registering our finalizer.
98
- if ! controllerutil .ContainsFinalizer (& as , operatorFinalizer ) {
99
- logger .Info ("Attaching finalizer to As object" , "finalizer" , operatorFinalizer )
100
- controllerutil .AddFinalizer (& as , operatorFinalizer )
101
- if err := r .Update (ctx , & as ); err != nil {
102
- return ctrl.Result {}, err
103
- }
104
- }
105
- } else {
106
- logger .Info ("ArmadaServer object is being deleted" , "finalizer" , operatorFinalizer )
107
- logger .Info ("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference" )
108
- // The object is being deleted
109
- if controllerutil .ContainsFinalizer (& as , operatorFinalizer ) {
110
- // our finalizer is present, so lets handle any external dependency
111
- logger .Info ("Running cleanup function for ArmadaServer cluster-scoped components" , "finalizer" , operatorFinalizer )
112
- if err := r .deleteExternalResources (ctx , components , logger ); err != nil {
113
- // if fail to delete the external dependency here, return with error
114
- // so that it can be retried
115
- return ctrl.Result {}, err
116
- }
117
-
118
- // remove our finalizer from the list and update it.
119
- logger .Info ("Removing finalizer from ArmadaServer object" , "finalizer" , operatorFinalizer )
120
- controllerutil .RemoveFinalizer (& as , operatorFinalizer )
121
- if err := r .Update (ctx , & as ); err != nil {
122
- return ctrl.Result {}, err
123
- }
124
- }
125
-
126
- // Stop reconciliation as the item is being deleted
127
- return ctrl.Result {}, nil
89
+ cleanupF := func (ctx context.Context ) error {
90
+ return r .deleteExternalResources (ctx , components , logger )
91
+ }
92
+ finish , err := checkAndHandleObjectDeletion (ctx , r .Client , & server , operatorFinalizer , cleanupF , logger )
93
+ if err != nil || finish {
94
+ return ctrl.Result {}, err
128
95
}
129
96
130
97
componentsCopy := components .DeepCopy ()
@@ -134,90 +101,58 @@ func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request
134
101
return nil
135
102
}
136
103
137
- if components .ServiceAccount != nil {
138
- logger .Info ("Upserting ArmadaServer ServiceAccount object" )
139
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .ServiceAccount , mutateFn ); err != nil {
140
- return ctrl.Result {}, err
141
- }
104
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .ServiceAccount , server .Kind , mutateFn , logger ); err != nil {
105
+ return ctrl.Result {}, err
142
106
}
143
107
144
- if components .Secret != nil {
145
- logger .Info ("Upserting ArmadaServer Secret object" )
146
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .Secret , mutateFn ); err != nil {
147
- return ctrl.Result {}, err
148
- }
108
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .Secret , server .Kind , mutateFn , logger ); err != nil {
109
+ return ctrl.Result {}, err
149
110
}
150
111
151
- if as .Spec .PulsarInit {
152
- for idx := range components .Jobs {
153
- err = func () error {
154
- if components .Jobs [idx ] != nil {
155
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .Jobs [idx ], mutateFn ); err != nil {
156
- return err
157
- }
158
- ctxTimeout , cancel := context .WithTimeout (ctx , migrationTimeout )
159
- defer cancel ()
160
-
161
- err := waitForJob (ctxTimeout , r .Client , components .Jobs [idx ], migrationPollSleep )
162
- if err != nil {
163
- return err
164
- }
112
+ if server .Spec .PulsarInit {
113
+ for _ , job := range components .Jobs {
114
+ err = func (job * batchv1.Job ) error {
115
+ if err := upsertObjectIfNeeded (ctx , r .Client , job , server .Kind , mutateFn , logger ); err != nil {
116
+ return err
117
+ }
118
+
119
+ if err := waitForJob (ctx , r .Client , job , jobPollInterval , jobTimeout ); err != nil {
120
+ return err
165
121
}
166
122
return nil
167
- }()
123
+ }(job )
168
124
if err != nil {
169
125
return ctrl.Result {}, err
170
126
}
171
127
}
172
128
}
173
129
174
- if components .Deployment != nil {
175
- logger .Info ("Upserting ArmadaServer Deployment object" )
176
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .Deployment , mutateFn ); err != nil {
177
- return ctrl.Result {}, err
178
- }
130
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .Deployment , server .Kind , mutateFn , logger ); err != nil {
131
+ return ctrl.Result {}, err
179
132
}
180
133
181
- if components .Service != nil {
182
- logger .Info ("Upserting ArmadaServer Service object" )
183
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .Service , mutateFn ); err != nil {
184
- return ctrl.Result {}, err
185
- }
134
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .Service , server .Kind , mutateFn , logger ); err != nil {
135
+ return ctrl.Result {}, err
186
136
}
187
137
188
- if components .IngressGrpc != nil {
189
- logger .Info ("Upserting ArmadaServer GRPC Ingress object" )
190
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .IngressGrpc , mutateFn ); err != nil {
191
- return ctrl.Result {}, err
192
- }
138
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .IngressGrpc , server .Kind , mutateFn , logger ); err != nil {
139
+ return ctrl.Result {}, err
193
140
}
194
141
195
- if components .IngressHttp != nil {
196
- logger .Info ("Upserting ArmadaServer REST Ingress object" )
197
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .IngressHttp , mutateFn ); err != nil {
198
- return ctrl.Result {}, err
199
- }
142
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .IngressHttp , server .Kind , mutateFn , logger ); err != nil {
143
+ return ctrl.Result {}, err
200
144
}
201
145
202
- if components .PodDisruptionBudget != nil {
203
- logger .Info ("Upserting ArmadaServer PodDisruptionBudget object" )
204
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .PodDisruptionBudget , mutateFn ); err != nil {
205
- return ctrl.Result {}, err
206
- }
146
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .PodDisruptionBudget , server .Kind , mutateFn , logger ); err != nil {
147
+ return ctrl.Result {}, err
207
148
}
208
149
209
- if components .PrometheusRule != nil {
210
- logger .Info ("Upserting ArmadaServer PrometheusRule object" )
211
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .PrometheusRule , mutateFn ); err != nil {
212
- return ctrl.Result {}, err
213
- }
150
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .PrometheusRule , server .Kind , mutateFn , logger ); err != nil {
151
+ return ctrl.Result {}, err
214
152
}
215
153
216
- if components .ServiceMonitor != nil {
217
- logger .Info ("Upserting ArmadaServer ServiceMonitor object" )
218
- if _ , err := controllerutil .CreateOrUpdate (ctx , r .Client , components .ServiceMonitor , mutateFn ); err != nil {
219
- return ctrl.Result {}, err
220
- }
154
+ if err := upsertObjectIfNeeded (ctx , r .Client , components .ServiceMonitor , server .Kind , mutateFn , logger ); err != nil {
155
+ return ctrl.Result {}, err
221
156
}
222
157
223
158
logger .Info ("Successfully reconciled ArmadaServer object" , "durationMillis" , time .Since (started ).Milliseconds ())
@@ -283,13 +218,17 @@ func generateArmadaServerInstallComponents(as *installv1alpha1.ArmadaServer, sch
283
218
}
284
219
285
220
var pr * monitoringv1.PrometheusRule
221
+ var sm * monitoringv1.ServiceMonitor
286
222
if as .Spec .Prometheus != nil && as .Spec .Prometheus .Enabled {
287
223
pr = createServerPrometheusRule (as .Name , as .Namespace , as .Spec .Prometheus .ScrapeInterval , as .Spec .Labels , as .Spec .Prometheus .Labels )
288
- }
224
+ if err := controllerutil .SetOwnerReference (as , pr , scheme ); err != nil {
225
+ return nil , err
226
+ }
289
227
290
- sm := createServiceMonitor (as )
291
- if err := controllerutil .SetOwnerReference (as , sm , scheme ); err != nil {
292
- return nil , err
228
+ sm = createServiceMonitor (as )
229
+ if err := controllerutil .SetOwnerReference (as , sm , scheme ); err != nil {
230
+ return nil , err
231
+ }
293
232
}
294
233
295
234
jobs := []* batchv1.Job {{}}
@@ -331,12 +270,12 @@ func createArmadaServerMigrationJobs(as *installv1alpha1.ArmadaServer) ([]*batch
331
270
332
271
appConfig , err := builders .ConvertRawExtensionToYaml (as .Spec .ApplicationConfig )
333
272
if err != nil {
334
- return [] * batchv1. Job {} , err
273
+ return nil , err
335
274
}
336
275
var asConfig AppConfig
337
276
err = yaml .Unmarshal ([]byte (appConfig ), & asConfig )
338
277
if err != nil {
339
- return [] * batchv1. Job {} , err
278
+ return nil , err
340
279
}
341
280
342
281
// First job is to poll/wait for Pulsar to be fully started
0 commit comments