@@ -53,7 +53,7 @@ pub async fn do_config_watch(
53
53
) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync + ' static > > {
54
54
info ! ( "do_config_watch - enter" ) ;
55
55
let config_map: ConfigMap = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
56
- let kube_interface = k8s:: KubeImpl :: new ( ) . await ?;
56
+ let kube_interface = Arc :: new ( k8s:: KubeImpl :: new ( ) . await ?) ;
57
57
let mut tasks = Vec :: new ( ) ;
58
58
59
59
// Handle pre-existing configs
@@ -62,9 +62,10 @@ pub async fn do_config_watch(
62
62
let config_map = config_map. clone ( ) ;
63
63
let discovery_handler_map = discovery_handler_map. clone ( ) ;
64
64
let new_discovery_handler_sender = new_discovery_handler_sender. clone ( ) ;
65
+ let new_kube_interface = kube_interface. clone ( ) ;
65
66
tasks. push ( tokio:: spawn ( async move {
66
67
handle_config_add (
67
- Arc :: new ( k8s :: KubeImpl :: new ( ) . await . unwrap ( ) ) ,
68
+ new_kube_interface ,
68
69
& config,
69
70
config_map,
70
71
discovery_handler_map,
@@ -78,7 +79,7 @@ pub async fn do_config_watch(
78
79
// Watch for new configs and changes
79
80
tasks. push ( tokio:: spawn ( async move {
80
81
watch_for_config_changes (
81
- & kube_interface,
82
+ kube_interface,
82
83
config_map,
83
84
discovery_handler_map,
84
85
new_discovery_handler_sender,
@@ -94,7 +95,7 @@ pub async fn do_config_watch(
94
95
95
96
/// This watches for Configuration events
96
97
async fn watch_for_config_changes (
97
- kube_interface : & impl KubeInterface ,
98
+ kube_interface : Arc < dyn KubeInterface > ,
98
99
config_map : ConfigMap ,
99
100
discovery_handler_map : RegisteredDiscoveryHandlerMap ,
100
101
new_discovery_handler_sender : broadcast:: Sender < String > ,
@@ -115,7 +116,7 @@ async fn watch_for_config_changes(
115
116
} ;
116
117
let new_discovery_handler_sender = new_discovery_handler_sender. clone ( ) ;
117
118
handle_config (
118
- kube_interface,
119
+ kube_interface. clone ( ) ,
119
120
event,
120
121
config_map. clone ( ) ,
121
122
discovery_handler_map. clone ( ) ,
@@ -129,7 +130,7 @@ async fn watch_for_config_changes(
129
130
/// This takes an event off the Configuration stream and delegates it to the
130
131
/// correct function based on the event type.
131
132
async fn handle_config (
132
- kube_interface : & impl KubeInterface ,
133
+ kube_interface : Arc < dyn KubeInterface > ,
133
134
event : Event < Configuration > ,
134
135
config_map : ConfigMap ,
135
136
discovery_handler_map : RegisteredDiscoveryHandlerMap ,
@@ -157,7 +158,7 @@ async fn handle_config(
157
158
config. metadata . name . clone ( ) . unwrap ( ) ,
158
159
) ;
159
160
info ! ( "handle_config - deleted Configuration {:?}" , config_id, ) ;
160
- handle_config_delete ( kube_interface, config_id, config_map) . await ?;
161
+ handle_config_delete ( kube_interface. as_ref ( ) , config_id, config_map) . await ?;
161
162
}
162
163
Event :: Restarted ( configs) => {
163
164
let new_configs: HashSet < ConfigId > = configs
@@ -171,11 +172,16 @@ async fn handle_config(
171
172
. collect ( ) ;
172
173
let old_configs: HashSet < ConfigId > = config_map. read ( ) . await . keys ( ) . cloned ( ) . collect ( ) ;
173
174
for config_id in old_configs. difference ( & new_configs) {
174
- handle_config_delete ( kube_interface, config_id. clone ( ) , config_map. clone ( ) ) . await ?;
175
+ handle_config_delete (
176
+ kube_interface. as_ref ( ) ,
177
+ config_id. clone ( ) ,
178
+ config_map. clone ( ) ,
179
+ )
180
+ . await ?;
175
181
}
176
182
for config in configs {
177
183
handle_config_apply (
178
- kube_interface,
184
+ kube_interface. clone ( ) ,
179
185
config,
180
186
config_map. clone ( ) ,
181
187
discovery_handler_map. clone ( ) ,
@@ -189,7 +195,7 @@ async fn handle_config(
189
195
}
190
196
191
197
async fn handle_config_apply (
192
- kube_interface : & impl KubeInterface ,
198
+ kube_interface : Arc < dyn KubeInterface > ,
193
199
config : Configuration ,
194
200
config_map : ConfigMap ,
195
201
discovery_handler_map : RegisteredDiscoveryHandlerMap ,
@@ -215,12 +221,12 @@ async fn handle_config_apply(
215
221
"handle_config - modified Configuration {:?}" ,
216
222
config. metadata. name,
217
223
) ;
218
- handle_config_delete ( kube_interface, config_id, config_map. clone ( ) ) . await ?;
224
+ handle_config_delete ( kube_interface. as_ref ( ) , config_id, config_map. clone ( ) ) . await ?;
219
225
}
220
226
221
227
tokio:: spawn ( async move {
222
228
handle_config_add (
223
- Arc :: new ( k8s :: KubeImpl :: new ( ) . await . unwrap ( ) ) ,
229
+ kube_interface ,
224
230
& config,
225
231
config_map,
226
232
discovery_handler_map,
@@ -282,7 +288,7 @@ async fn handle_config_add(
282
288
/// Then, for each of the Configuration's Instances, it signals the DevicePluginService to shutdown,
283
289
/// and deletes the Instance CRD.
284
290
async fn handle_config_delete (
285
- kube_interface : & impl KubeInterface ,
291
+ kube_interface : & dyn KubeInterface ,
286
292
config_id : ConfigId ,
287
293
config_map : ConfigMap ,
288
294
) -> anyhow:: Result < ( ) > {
@@ -363,7 +369,7 @@ async fn should_recreate_config(
363
369
364
370
/// This shuts down all a Configuration's Instances and terminates the associated Device Plugins
365
371
pub async fn delete_all_instances_in_map (
366
- kube_interface : & impl k8s:: KubeInterface ,
372
+ kube_interface : & dyn k8s:: KubeInterface ,
367
373
instance_map : InstanceMap ,
368
374
( namespace, name) : ConfigId ,
369
375
) -> anyhow:: Result < ( ) > {
@@ -409,12 +415,13 @@ mod config_action_tests {
409
415
config. metadata . namespace . clone ( ) . unwrap ( ) ,
410
416
config. metadata . name . clone ( ) . unwrap ( ) ,
411
417
) ;
418
+ let kube_interface = Arc :: new ( MockKubeInterface :: new ( ) ) ;
412
419
413
420
let config_map = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
414
421
let dh_map = Arc :: new ( std:: sync:: Mutex :: new ( HashMap :: new ( ) ) ) ;
415
422
let ( tx, mut _rx1) = broadcast:: channel ( 1 ) ;
416
423
assert ! ( handle_config(
417
- & MockKubeInterface :: new ( ) ,
424
+ kube_interface . clone ( ) ,
418
425
Event :: Restarted ( vec![ config] ) ,
419
426
config_map. clone( ) ,
420
427
dh_map. clone( ) ,
@@ -429,7 +436,7 @@ mod config_action_tests {
429
436
assert ! ( config_map. read( ) . await . contains_key( & config_id) ) ;
430
437
431
438
assert ! ( handle_config(
432
- & MockKubeInterface :: new ( ) ,
439
+ kube_interface ,
433
440
Event :: Restarted ( Vec :: new( ) ) ,
434
441
config_map. clone( ) ,
435
442
dh_map,
0 commit comments