Skip to content

Commit

Permalink
implement backoff for watcher - for #577
Browse files Browse the repository at this point in the history
Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed Nov 11, 2021
1 parent afc8540 commit 6a6cbb3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tracing = "0.1.29"
json-patch = "0.2.6"
serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = { version = "0.3.0", features = ["futures", "tokio_1" ] }

[dependencies.k8s-openapi]
version = "0.13.1"
Expand Down
24 changes: 21 additions & 3 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Watches a Kubernetes Resource for changes, with error recovery
use backoff::{backoff::Backoff, ExponentialBackoff};
use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube_client::{
Expand Down Expand Up @@ -229,11 +230,28 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<K>>> + Send {
let back = ExponentialBackoff::default();
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
(api, list_params, back, State::Empty),
|(api, list_params, mut back, state)| async {
let (event, state) = step(&api, &list_params, state).await;
Some((event, (api, list_params, state)))
if event.is_err() {
if let Some(wait_time) = back.next_backoff() {
if wait_time.as_secs() > 8 {
tracing::warn!("watch cancelled, reached max backoff interval");
return None;
} else {
tracing::debug!("watch waiting {}ms until retrying", wait_time.as_millis()); // TODO: kind name here
tokio::time::sleep(wait_time).await
}
} else {
tracing::warn!("watch cancelled, strategy returned none");
return None;
}
} else {
back.reset();
}
Some((event, (api, list_params, back, state)))
},
)
}
Expand Down

0 comments on commit 6a6cbb3

Please sign in to comment.