@@ -16,7 +16,7 @@ use akri_shared::{
16
16
use futures:: { StreamExt , TryStreamExt } ;
17
17
use kube:: api:: { Api , ListParams , WatchEvent } ;
18
18
use log:: { info, trace} ;
19
- use std:: { collections:: HashMap , sync:: Arc } ;
19
+ use std:: { collections:: HashMap , option :: Option , sync:: Arc } ;
20
20
use tokio:: sync:: { broadcast, mpsc, Mutex } ;
21
21
22
22
type ConfigMap = Arc < Mutex < HashMap < String , ConfigInfo > > > ;
@@ -34,6 +34,10 @@ pub struct ConfigInfo {
34
34
/// Receives notification that all `DiscoveryOperators` threads have completed and a Configuration's Instances
35
35
/// can be safely deleted and the associated `DevicePluginServices` terminated.
36
36
finished_discovery_receiver : mpsc:: Receiver < ( ) > ,
37
+ /// Tracks the last generation of the `Configuration` resource (i.e. `.metadata.generation`).
38
+ /// This is used to determine if the `Configuration` actually changed, or if only the metadata changed.
39
+ /// The `.metadata.generation` value is incremented for all changes, except for changes to `.metadata` or `.status`.
40
+ last_generation : Option < i64 > ,
37
41
}
38
42
39
43
/// This handles pre-existing Configurations and invokes an internal method that watches for Configuration events.
@@ -154,6 +158,16 @@ async fn handle_config(
154
158
"handle_config - modified Configuration {:?}" ,
155
159
config. metadata. name,
156
160
) ;
161
+ let do_recreate = should_recreate_config ( & config, config_map. clone ( ) )
162
+ . await
163
+ . unwrap ( ) ;
164
+ if !do_recreate {
165
+ trace ! (
166
+ "handle_config - config {:?} has not changed. ignoring config modified event." ,
167
+ config. metadata. name,
168
+ ) ;
169
+ return Ok ( ( ) ) ;
170
+ }
157
171
handle_config_delete ( kube_interface, & config, config_map. clone ( ) ) . await ?;
158
172
tokio:: spawn ( async move {
159
173
handle_config_add (
@@ -196,6 +210,7 @@ async fn handle_config_add(
196
210
instance_map : instance_map. clone ( ) ,
197
211
stop_discovery_sender : stop_discovery_sender. clone ( ) ,
198
212
finished_discovery_receiver,
213
+ last_generation : config. metadata . generation ,
199
214
} ;
200
215
config_map
201
216
. lock ( )
@@ -276,6 +291,24 @@ async fn handle_config_delete(
276
291
Ok ( ( ) )
277
292
}
278
293
294
+ /// Checks to see if the configuration needs to be recreated.
295
+ /// At present, this just checks to see if the `.metadata.generation` has changed.
296
+ /// The `.metadata.generation` value is incremented for all changes, except for changes to `.metadata` or `.status`.
297
+ async fn should_recreate_config ( config : & Configuration , config_map : ConfigMap ) -> Result < bool , ( ) > {
298
+ let name = config. metadata . name . clone ( ) . unwrap ( ) ;
299
+ let last_generation = config_map. lock ( ) . await . get ( & name) . unwrap ( ) . last_generation ;
300
+ trace ! (
301
+ "should_recreate_config - checking if config {} needs to be recreated" ,
302
+ name,
303
+ ) ;
304
+
305
+ if config. metadata . generation == last_generation {
306
+ return Ok ( false ) ;
307
+ }
308
+
309
+ Ok ( true )
310
+ }
311
+
279
312
/// This shuts down all a Configuration's Instances and terminates the associated Device Plugins
280
313
pub async fn delete_all_instances_in_map (
281
314
kube_interface : & impl k8s:: KubeInterface ,
@@ -341,6 +374,7 @@ mod config_action_tests {
341
374
stop_discovery_sender,
342
375
instance_map : instance_map. clone ( ) ,
343
376
finished_discovery_receiver,
377
+ last_generation : config. metadata . generation ,
344
378
} ,
345
379
) ;
346
380
let config_map: ConfigMap = Arc :: new ( Mutex :: new ( map) ) ;
0 commit comments