From 9ad93fb12119cc63a67162bb945e34640c75c343 Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Sat, 5 Aug 2023 01:00:54 +0000 Subject: [PATCH 1/7] Update VERSION file --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 795460fc..79127d85 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.1.0 +v1.2.0 From c2260cb08577cf99b5add3c1b35815236de42fba Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Fri, 4 Aug 2023 21:15:15 +0000 Subject: [PATCH 2/7] Apply clippy fixes --- agent/src/apiclient.rs | 32 +++++++++++++++++++++----------- apiserver/src/auth/middleware.rs | 2 +- controller/src/controller.rs | 12 ++++-------- controller/src/scheduler.rs | 22 +++++++++------------- models/src/node/crd/v1.rs | 9 ++------- models/src/node/crd/v2.rs | 9 ++------- models/src/node/drain.rs | 2 +- 7 files changed, 40 insertions(+), 48 deletions(-) diff --git a/agent/src/apiclient.rs b/agent/src/apiclient.rs index 6c78f73e..7597d885 100644 --- a/agent/src/apiclient.rs +++ b/agent/src/apiclient.rs @@ -62,24 +62,31 @@ enum CommandStatus { #[derive(Debug, Deserialize)] pub struct StagedImage { - image: Option, - next_to_boot: bool, + #[serde(rename = "image")] + _image: Option, + #[serde(rename = "next_to_boot")] + _next_to_boot: bool, } #[derive(Debug, Deserialize)] pub struct CommandResult { cmd_type: UpdateCommand, cmd_status: CommandStatus, - timestamp: String, - exit_status: u32, - stderr: String, + #[serde(rename = "timestamp")] + _timestamp: String, + #[serde(rename = "exit_status")] + _exit_status: u32, + #[serde(rename = "stderr")] + _stderr: String, } #[derive(Debug, Deserialize)] pub struct UpdateImage { - pub(super) arch: String, + #[serde(rename = "arch")] + pub(super) _arch: String, pub(super) version: Version, - pub(super) variant: String, + #[serde(rename = "variant")] + pub(super) _variant: String, } #[derive(Debug, Deserialize)] @@ -90,10 +97,13 @@ pub struct OsInfo { #[derive(Debug, Deserialize)] pub struct UpdateStatus { update_state: UpdateState, - available_updates: Vec, + #[serde(rename = "available_updates")] + _available_updates: Vec, chosen_update: Option, - active_partition: Option, - staging_partition: Option, + #[serde(rename = "active_partition")] + _active_partition: Option, + #[serde(rename = "staging_partition")] + _staging_partition: Option, most_recent_command: CommandResult, } @@ -150,7 +160,7 @@ async fn invoke_apiclient(args: Vec) -> Result { _ => { // API response was a non-transient error, bail out return apiclient_error::BadHttpResponseSnafu { - args: args, + args, error_content: &error_content, statuscode: error_statuscode, } diff --git a/apiserver/src/auth/middleware.rs b/apiserver/src/auth/middleware.rs index 6c1e009b..ee46ccb7 100644 --- a/apiserver/src/auth/middleware.rs +++ b/apiserver/src/auth/middleware.rs @@ -100,7 +100,7 @@ where let authorizor = self.authorizor.clone(); if self.exclude_paths.get(&request_path).is_some() { - Box::pin(async move { fut.await }) + Box::pin(fut) } else { Box::pin(async move { let apiserver_headers = maybe_apiserver_headers?; diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 55bfa466..73ff545d 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -254,15 +254,11 @@ impl BrupopController { #[instrument(skip(self))] fn nodes_ready_to_update(&self) -> bool { - let mut shadows: Vec = self.all_brss(); - for brs in shadows.drain(..) { + self.all_brss().iter().any(|brs| { // If we determine that the spec should change, this node is a candidate to begin updating. - let next_spec = determine_next_node_spec(&brs); - if next_spec != brs.spec && is_initial_state(&brs) { - return true; - } - } - return false; + let next_spec = determine_next_node_spec(brs); + next_spec != brs.spec && is_initial_state(brs) + }) } /// Runs the event loop for the Brupop controller. diff --git a/controller/src/scheduler.rs b/controller/src/scheduler.rs index c1ea7373..18432f6a 100644 --- a/controller/src/scheduler.rs +++ b/controller/src/scheduler.rs @@ -67,7 +67,7 @@ impl LegacyUpdateWindow { Ok(_) => { let start_hour: u8 = self .start_time - .split(":") + .split(':') .next() .context(scheduler_error::InvalidTimeWindowSettingsSnafu {})? .parse() @@ -76,7 +76,7 @@ impl LegacyUpdateWindow { })?; let stop_hour: u8 = self .end_time - .split(":") + .split(':') .next() .context(scheduler_error::InvalidTimeWindowSettingsSnafu {})? .parse() @@ -90,9 +90,9 @@ impl LegacyUpdateWindow { format!("* * {}-23,0-{} * * * *", start_hour, stop_hour) }; - return Ok(cron_expression_hour); + Ok(cron_expression_hour) } - Err(_) => return scheduler_error::InvalidTimeWindowSettingsSnafu {}.fail(), + Err(_) => scheduler_error::InvalidTimeWindowSettingsSnafu {}.fail(), } } } @@ -204,7 +204,7 @@ pub enum ScheduleType { fn determine_schedule_type(schedule: &Schedule) -> Result { let duration_between_each_schedule_datetime = - duration_between_next_two_points(&schedule, Utc::now())?; + duration_between_next_two_points(schedule, Utc::now())?; Ok( if duration_between_each_schedule_datetime.num_seconds() == 1 { ScheduleType::Windowed @@ -230,20 +230,16 @@ fn duration_between_next_two_points( } fn std_duration(d: &chrono::Duration) -> Result { - Ok(d.to_std() - .context(scheduler_error::ConvertToStdDurationSnafu)?) + d.to_std() + .context(scheduler_error::ConvertToStdDurationSnafu) } fn get_cron_schedule_from_env() -> Result> { match env::var(SCHEDULER_CRON_EXPRESSION_ENV_VAR) { // SCHEDULER_CRON_EXPRESSION is set - Ok(scheduler_cron_expression) => { - return Ok(Some(scheduler_cron_expression)); - } + Ok(scheduler_cron_expression) => Ok(Some(scheduler_cron_expression)), // SCHEDULER_CRON_EXPRESSION is not set - Err(_) => { - return Ok(None); - } + Err(_) => Ok(None), } } diff --git a/models/src/node/crd/v1.rs b/models/src/node/crd/v1.rs index 4d10a041..d5f26d3a 100644 --- a/models/src/node/crd/v1.rs +++ b/models/src/node/crd/v1.rs @@ -18,10 +18,11 @@ use tokio::time::Duration; use validator::Validate; /// BottlerocketShadowState represents a node's state in the update state machine. -#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, JsonSchema)] +#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, JsonSchema, Default)] pub enum BottlerocketShadowState { /// Nodes in this state are waiting for new updates to become available. This is both the starting and terminal state /// in the update process. + #[default] Idle, /// Nodes in this state have staged a new update image and used the kubernetes cordon and drain APIs to remove /// running pods. @@ -36,12 +37,6 @@ pub enum BottlerocketShadowState { MonitoringUpdate, } -impl Default for BottlerocketShadowState { - fn default() -> Self { - BottlerocketShadowState::Idle - } -} - // These constants define the maximum amount of time to allow a machine to transition *into* this state. const STAGED_UPDATE_TIMEOUT: Option = Some(Duration::from_secs(600)); const PERFORMED_UPDATE_TIMEOUT: Option = Some(Duration::from_secs(120)); diff --git a/models/src/node/crd/v2.rs b/models/src/node/crd/v2.rs index 983f7c13..11f300c2 100644 --- a/models/src/node/crd/v2.rs +++ b/models/src/node/crd/v2.rs @@ -19,10 +19,11 @@ use std::str::FromStr; use tokio::time::Duration; use validator::Validate; /// BottlerocketShadowState represents a node's state in the update state machine. -#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, JsonSchema)] +#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, JsonSchema, Default)] pub enum BottlerocketShadowState { /// Nodes in this state are waiting for new updates to become available. This is both the starting, terminal and recovery state /// in the update process. + #[default] Idle, /// Nodes in this state have staged a new update image, have installed the new image, and have updated the partition table /// to mark it as the new active image. @@ -38,12 +39,6 @@ pub enum BottlerocketShadowState { ErrorReset, } -impl Default for BottlerocketShadowState { - fn default() -> Self { - BottlerocketShadowState::Idle - } -} - // These constants define the maximum amount of time to allow a machine to transition *into* this state. const STAGED_AND_PERFORMED_UPDATE_TIMEOUT: Option = Some(Duration::from_secs(720)); const REBOOTED_INTO_UPDATE_TIMEOUT: Option = Some(Duration::from_secs(600)); diff --git a/models/src/node/drain.rs b/models/src/node/drain.rs index fb8b07ee..ab4ef22d 100644 --- a/models/src/node/drain.rs +++ b/models/src/node/drain.rs @@ -198,7 +198,7 @@ async fn evict_pod(k8s_client: &kube::Client, pod: &Pod) -> Result<(), error::Ev break; } Err(kube::Error::Api(e)) => { - let status_code = StatusCode::from_u16(e.code as u16); + let status_code = StatusCode::from_u16(e.code); match status_code { Ok(StatusCode::TOO_MANY_REQUESTS) => { event!( From 9b69fe6bc4c3fab2dd6331f7a35503462fd63d3e Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Fri, 4 Aug 2023 22:36:37 +0000 Subject: [PATCH 3/7] agent: refactor for readability --- agent/src/agentclient.rs | 90 ++++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/agent/src/agentclient.rs b/agent/src/agentclient.rs index 3057d62a..48f92f8d 100644 --- a/agent/src/agentclient.rs +++ b/agent/src/agentclient.rs @@ -1,4 +1,4 @@ -use crate::apiclient::{boot_update, get_chosen_update, get_os_info, prepare, update}; +use crate::apiclient; use apiserver::{ client::APIServerClient, CordonAndDrainBottlerocketShadowRequest, ExcludeNodeFromLoadBalancerRequest, @@ -81,51 +81,51 @@ impl BrupopAgent { } } + /// Checks the kubernetes API for the shadow object associated with this node. #[instrument(skip(self), err)] - async fn check_node_shadow_exists( + async fn query_api_for_shadow( &self, ) -> std::result::Result { - // try to check if node associated BottlerocketShadow exist in the store first. If it's not present in the - // store(either associated BottlerocketShadow doesn't exist or store data delays), make the API call for second round check. + let bottlerocket_shadows: Api = + Api::namespaced(self.k8s_client.clone(), &self.namespace); - let associated_bottlerocketshadow = self.brs_reader.state().clone(); - if !associated_bottlerocketshadow.is_empty() { - Ok(true) - } else { - let bottlerocket_shadows: Api = - Api::namespaced(self.k8s_client.clone(), &self.namespace); - - // handle the special case which associated BottlerocketShadow does exist but communication with the k8s API fails for other errors. - if let Err(e) = bottlerocket_shadows - .get(&self.associated_bottlerocketshadow_name.clone()) - .await - { - match e { - // 404 not found response error is OK for this use, which means associated BottlerocketShadow doesn't exist - kube::Error::Api(error_response) => { - if error_response.code == 404 { - return Ok(false); - } else { - return agentclient_error::FetchBottlerocketShadowErrorCodeSnafu { - code: error_response.code, - } - .fail(); - } - } - // Any other type of errors can not present that associated BottlerocketShadow doesn't exist, need return error - _ => { - return Err(e).context( - agentclient_error::UnableFetchBottlerocketShadowSnafu { - node_name: &self.associated_bottlerocketshadow_name.clone(), - }, - ); - } + match bottlerocket_shadows + .get(&self.associated_bottlerocketshadow_name.clone()) + .await + { + // 404 not found response error is OK for this use, which means associated BottlerocketShadow doesn't exist + Err(kube::Error::Api(error_response)) if error_response.code == 404 => Ok(false), + + // It's not possible to know whether or not the BottlerocketShadow exists, so we return an error + Err(kube::Error::Api(error_response)) => { + agentclient_error::FetchBottlerocketShadowErrorCodeSnafu { + code: error_response.code, } + .fail() } + + Err(e) => Err(e).context(agentclient_error::UnableFetchBottlerocketShadowSnafu { + node_name: &self.associated_bottlerocketshadow_name.clone(), + }), + + Ok(_) => Ok(true), + } + } + + /// Returns whether or not a BottlerocketShadow exists for this node. + #[instrument(skip(self), err)] + async fn check_node_shadow_exists( + &self, + ) -> std::result::Result { + let local_cache_has_associated_shadow = !self.brs_reader.is_empty(); + if local_cache_has_associated_shadow { Ok(true) + } else { + self.query_api_for_shadow().await } } + /// Returns whether or not the BottlerocketShadow for this node has a .status. #[instrument(skip(self), err)] async fn check_shadow_status_exists(&self) -> Result { Ok(self.fetch_shadow().await?.status.is_some()) @@ -186,10 +186,10 @@ impl BrupopAgent { state: BottlerocketShadowState, shadow_error_info: ShadowErrorInfo, ) -> Result { - let os_info = get_os_info() + let os_info = apiclient::get_os_info() .await .context(agentclient_error::BottlerocketShadowStatusVersionSnafu)?; - let update_version = match get_chosen_update() + let update_version = match apiclient::get_chosen_update() .await .context(agentclient_error::BottlerocketShadowStatusChosenUpdateSnafu)? { @@ -430,14 +430,14 @@ impl BrupopAgent { }, BottlerocketShadowState::StagedAndPerformedUpdate => { event!(Level::INFO, "Preparing update"); - prepare() + apiclient::prepare() .await .context(agentclient_error::UpdateActionsSnafu { action: "Prepare".to_string(), })?; event!(Level::INFO, "Performing update"); - update() + apiclient::update() .await .context(agentclient_error::UpdateActionsSnafu { action: "Perform".to_string(), @@ -460,11 +460,11 @@ impl BrupopAgent { self.handle_recover().await?; } else { self.prepare_for_update().await?; - boot_update() - .await - .context(agentclient_error::UpdateActionsSnafu { + apiclient::boot_update().await.context( + agentclient_error::UpdateActionsSnafu { action: "Reboot".to_string(), - })?; + }, + )?; } } BottlerocketShadowState::MonitoringUpdate => { @@ -597,7 +597,7 @@ impl BrupopAgent { /// Check that the currently running version is the one requested by the controller. async fn running_desired_version(spec: &BottlerocketShadowSpec) -> Result { - let os_info = get_os_info() + let os_info = apiclient::get_os_info() .await .context(agentclient_error::BottlerocketShadowStatusVersionSnafu)?; Ok(match spec.version() { From 2cea49e634bf620637a634b9744063c94dfe4402 Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Fri, 4 Aug 2023 22:38:49 +0000 Subject: [PATCH 4/7] agent: use retries for check_node_shadow_exists --- agent/src/agentclient.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/agent/src/agentclient.rs b/agent/src/agentclient.rs index 48f92f8d..a8bbdf3a 100644 --- a/agent/src/agentclient.rs +++ b/agent/src/agentclient.rs @@ -117,12 +117,15 @@ impl BrupopAgent { async fn check_node_shadow_exists( &self, ) -> std::result::Result { - let local_cache_has_associated_shadow = !self.brs_reader.is_empty(); - if local_cache_has_associated_shadow { - Ok(true) - } else { - self.query_api_for_shadow().await - } + Retry::spawn(retry_strategy(), || async { + let local_cache_has_associated_shadow = !self.brs_reader.is_empty(); + if local_cache_has_associated_shadow { + Ok(true) + } else { + self.query_api_for_shadow().await + } + }) + .await } /// Returns whether or not the BottlerocketShadow for this node has a .status. From fb99ee0f7c62710a5fc883a5f678019957d8fa60 Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Fri, 4 Aug 2023 23:43:53 +0000 Subject: [PATCH 5/7] agent: hide low-level api methods in apiclient This hides away the details of the Bottlerocket API in a private module, then renames the public functions to be more descriptive. This commit also `cargo fmt`s the agent crate. --- agent/src/agentclient.rs | 18 +- agent/src/apiclient.rs | 519 ++++++++++++++++++++------------------- 2 files changed, 275 insertions(+), 262 deletions(-) diff --git a/agent/src/agentclient.rs b/agent/src/agentclient.rs index a8bbdf3a..1f45c058 100644 --- a/agent/src/agentclient.rs +++ b/agent/src/agentclient.rs @@ -433,18 +433,18 @@ impl BrupopAgent { }, BottlerocketShadowState::StagedAndPerformedUpdate => { event!(Level::INFO, "Preparing update"); - apiclient::prepare() - .await - .context(agentclient_error::UpdateActionsSnafu { + apiclient::prepare_update().await.context( + agentclient_error::UpdateActionsSnafu { action: "Prepare".to_string(), - })?; + }, + )?; event!(Level::INFO, "Performing update"); - apiclient::update() - .await - .context(agentclient_error::UpdateActionsSnafu { + apiclient::activate_update().await.context( + agentclient_error::UpdateActionsSnafu { action: "Perform".to_string(), - })?; + }, + )?; } BottlerocketShadowState::RebootedIntoUpdate => { event!(Level::INFO, "Rebooting node to complete update"); @@ -463,7 +463,7 @@ impl BrupopAgent { self.handle_recover().await?; } else { self.prepare_for_update().await?; - apiclient::boot_update().await.context( + apiclient::boot_into_update().await.context( agentclient_error::UpdateActionsSnafu { action: "Reboot".to_string(), }, diff --git a/agent/src/apiclient.rs b/agent/src/apiclient.rs index 7597d885..9c0914a8 100644 --- a/agent/src/apiclient.rs +++ b/agent/src/apiclient.rs @@ -6,258 +6,25 @@ Bottlerocket Update API: https://github.com/bottlerocket-os/bottlerocket/tree/de Bottlerocket apiclient: https://github.com/bottlerocket-os/bottlerocket/tree/develop/sources/api/apiclient */ -use semver::Version; -use serde::Deserialize; -use snafu::{ensure, ResultExt}; -use std::process::{Command, Output}; -use tokio::time::{sleep, Duration}; -use tracing::{event, Level}; - -const API_CLIENT_BIN: &str = "apiclient"; -const UPDATES_STATUS_URI: &str = "/updates/status"; -const OS_URI: &str = "/os"; -const REFRESH_UPDATES_URI: &str = "/actions/refresh-updates"; -const PREPARE_UPDATES_URI: &str = "/actions/prepare-update"; -const ACTIVATE_UPDATES_URI: &str = "/actions/activate-update"; -const REBOOT_URI: &str = "/actions/reboot"; -const MAX_ATTEMPTS: i8 = 5; -const UPDATE_API_SLEEP_DURATION: Duration = Duration::from_millis(10000); -const UPDATE_API_BUSY_STATUSCODE: &str = "423"; +use self::api::{CommandStatus, UpdateCommand, UpdateState}; +pub use self::api::{OsInfo, UpdateImage}; +use snafu::ensure; +use std::process::Output; /// The module-wide result type. pub type Result = std::result::Result; -/// UpdateState represents four states during system update process -#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize)] -pub enum UpdateState { - // Idle: Unknown - Idle, - // Available: available versions system can update to - Available, - // Staged: processing system update (refresh, prepare, activate) - Staged, - // Ready: successfully complete update commands, ready to reboot - Ready, -} - -/// UpdateCommand represents three commands to update system -#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum UpdateCommand { - // Refresh: refresh the list of available updates - Refresh, - // Prepare: request that the update be downloaded and applied to disk - Prepare, - // Activate: proceed to "activate" the update - Activate, -} - -/// CommandStatus represents three status after running update command -#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize)] -enum CommandStatus { - Success, - Failed, - Unknown, -} - -#[derive(Debug, Deserialize)] -pub struct StagedImage { - #[serde(rename = "image")] - _image: Option, - #[serde(rename = "next_to_boot")] - _next_to_boot: bool, -} - -#[derive(Debug, Deserialize)] -pub struct CommandResult { - cmd_type: UpdateCommand, - cmd_status: CommandStatus, - #[serde(rename = "timestamp")] - _timestamp: String, - #[serde(rename = "exit_status")] - _exit_status: u32, - #[serde(rename = "stderr")] - _stderr: String, -} - -#[derive(Debug, Deserialize)] -pub struct UpdateImage { - #[serde(rename = "arch")] - pub(super) _arch: String, - pub(super) version: Version, - #[serde(rename = "variant")] - pub(super) _variant: String, -} - -#[derive(Debug, Deserialize)] -pub struct OsInfo { - pub version_id: Version, -} - -#[derive(Debug, Deserialize)] -pub struct UpdateStatus { - update_state: UpdateState, - #[serde(rename = "available_updates")] - _available_updates: Vec, - chosen_update: Option, - #[serde(rename = "active_partition")] - _active_partition: Option, - #[serde(rename = "staging_partition")] - _staging_partition: Option, - most_recent_command: CommandResult, -} - -fn get_raw_args(mut args: Vec) -> Vec { - let mut subcommand_args = vec!["raw".to_string(), "-u".to_string()]; - subcommand_args.append(&mut args); - - subcommand_args -} - -/// Extract error statuscode from stderr string -/// Error Example: -/// "Failed POST request to '/actions/refresh-updates': Status 423 when POSTing /actions/refresh-updates: Update lock held\n" -fn extract_status_code_from_error(error: &str) -> &str { - let error_content_split_by_status: Vec<&str> = error.split("Status").collect(); - let error_content_split_by_whitespace: Vec<&str> = error_content_split_by_status[1] - .split_whitespace() - .collect(); - error_content_split_by_whitespace[0] -} - -/// Apiclient binary has been volume mounted into the agent container, so agent is able to -/// invoke `/bin apiclient` to interact with the Bottlerocket Update API. -/// This function helps to invoke apiclient raw command. -async fn invoke_apiclient(args: Vec) -> Result { - let mut attempts: i8 = 0; - // Retry up to 5 times in case the Update API is busy; Waiting 10 seconds between each attempt. - while attempts < MAX_ATTEMPTS { - let output = Command::new(API_CLIENT_BIN) - .args(&args) - .output() - .context(apiclient_error::ApiClientRawCommandSnafu { args: args.clone() })?; - - match output.status.success() { - true => return Ok(output), - false => { - // Return value `exit status` is Option. When the value has `some` value, we need extract error info from stderr and handle those errors. - // Otherwise, on Unix, this will return `None` if the process was terminated by a signal. Signal termination is not considered a success. - // Apiclient `Reboot` command will send signal to terminate the process, so we have to consider this situation and have extra logic to recognize - // return value `None` as success and terminate the process properly. - match output.status.code() { - // when return value has `some` code, this part will handle those errors properly. - Some(_code) => { - let error_content = String::from_utf8_lossy(&output.stderr).to_string(); - let error_statuscode = extract_status_code_from_error(&error_content); - - match error_statuscode { - UPDATE_API_BUSY_STATUSCODE => { - event!(Level::INFO, "API server busy, retrying later ..."); - // Retry after ten seconds if we get a 423 Locked response (update API busy) - sleep(UPDATE_API_SLEEP_DURATION).await; - attempts += 1; - } - _ => { - // API response was a non-transient error, bail out - return apiclient_error::BadHttpResponseSnafu { - args, - error_content: &error_content, - statuscode: error_statuscode, - } - .fail(); - } - }; - } - // when it returns `None`, this part will treat it as success and then gracefully exit brupop agent. - _ => { - event!( - Level::INFO, - "Bottlerocket node is terminated by reboot signal" - ); - std::process::exit(0) - } - } - } - } - } - // Update API is currently unavailable, bail out - Err(apiclient_error::Error::UpdateApiUnavailable { args }) -} - -pub async fn get_update_status() -> Result { - // Refresh list of updates and check if there are any available - refresh_updates().await?; - - let raw_args = vec![UPDATES_STATUS_URI.to_string()]; - - let update_status_output = invoke_apiclient(get_raw_args(raw_args)).await?; - - let update_status_string = String::from_utf8_lossy(&update_status_output.stdout).to_string(); - let update_status: UpdateStatus = serde_json::from_str(&update_status_string) - .context(apiclient_error::UpdateStatusContentSnafu)?; - - Ok(update_status) -} - -/// extract OS info from the local system. For now this is just the version id. +/// extract OS info from the local system. pub async fn get_os_info() -> Result { - let raw_args = vec![OS_URI.to_string()]; - - let os_info_output = invoke_apiclient(get_raw_args(raw_args)).await?; - - let os_info_content_string = String::from_utf8_lossy(&os_info_output.stdout).to_string(); - let os_info: OsInfo = - serde_json::from_str(&os_info_content_string).context(apiclient_error::OsContentSnafu)?; - - Ok(os_info) -} - -pub async fn refresh_updates() -> Result { - let raw_args = vec![ - REFRESH_UPDATES_URI.to_string(), - "-m".to_string(), - "POST".to_string(), - ]; - - invoke_apiclient(get_raw_args(raw_args)).await -} - -pub async fn prepare_update() -> Result<()> { - let raw_args = vec![ - PREPARE_UPDATES_URI.to_string(), - "-m".to_string(), - "POST".to_string(), - ]; - - invoke_apiclient(get_raw_args(raw_args)).await?; - - Ok(()) -} - -pub async fn activate_update() -> Result<()> { - let raw_args = vec![ - ACTIVATE_UPDATES_URI.to_string(), - "-m".to_string(), - "POST".to_string(), - ]; - - invoke_apiclient(get_raw_args(raw_args)).await?; - - Ok(()) -} - -pub async fn reboot() -> Result { - let raw_args = vec![REBOOT_URI.to_string(), "-m".to_string(), "POST".to_string()]; - - invoke_apiclient(get_raw_args(raw_args)).await + api::get_os_info().await } // get chosen update which contains latest Bottlerocket OS can update to. pub async fn get_chosen_update() -> Result> { // Refresh list of updates and check if there are any available - refresh_updates().await?; + api::refresh_updates().await?; - let update_status = get_update_status().await?; + let update_status = api::get_update_status().await?; // Raise error if failed to refresh update or update acton performed out of band ensure!( @@ -269,8 +36,8 @@ pub async fn get_chosen_update() -> Result> { Ok(update_status.chosen_update) } -pub async fn prepare() -> Result<()> { - let update_status = get_update_status().await?; +pub async fn prepare_update() -> Result<()> { + let update_status = api::get_update_status().await?; ensure!( update_status.update_state == UpdateState::Available @@ -282,10 +49,10 @@ pub async fn prepare() -> Result<()> { ); // Download the update and apply it to the inactive partition - prepare_update().await?; + api::prepare_update().await?; // Raise error if failed to prepare update or update action performed out of band - let recent_command = get_update_status().await?.most_recent_command; + let recent_command = api::get_update_status().await?.most_recent_command; ensure!( recent_command.cmd_type == UpdateCommand::Prepare || recent_command.cmd_status == CommandStatus::Success, @@ -295,8 +62,8 @@ pub async fn prepare() -> Result<()> { Ok(()) } -pub async fn update() -> Result<()> { - let update_status = get_update_status().await?; +pub async fn activate_update() -> Result<()> { + let update_status = api::get_update_status().await?; ensure!( update_status.update_state == UpdateState::Staged, @@ -307,10 +74,10 @@ pub async fn update() -> Result<()> { ); // Activate the prepared update - activate_update().await?; + api::activate_update().await?; // Raise error if failed to activate update or update action performed out of band - let recent_command = get_update_status().await?.most_recent_command; + let recent_command = api::get_update_status().await?.most_recent_command; ensure!( recent_command.cmd_type == UpdateCommand::Activate @@ -322,8 +89,8 @@ pub async fn update() -> Result<()> { } // Reboot the host into the activated update -pub async fn boot_update() -> Result { - let update_status = get_update_status().await?; +pub async fn boot_into_update() -> Result { + let update_status = api::get_update_status().await?; ensure!( update_status.update_state == UpdateState::Ready, @@ -333,11 +100,257 @@ pub async fn boot_update() -> Result { } ); - reboot().await + api::reboot().await } -pub mod apiclient_error { +pub(super) mod api { + //! Low-level Bottlerocket update API interactions + use super::{apiclient_error, Result}; + use semver::Version; + use serde::Deserialize; + use snafu::ResultExt; + use std::process::{Command, Output}; + use tokio::time::{sleep, Duration}; + use tracing::{event, Level}; + + const API_CLIENT_BIN: &str = "apiclient"; + const MAX_ATTEMPTS: i8 = 5; + const UPDATE_API_SLEEP_DURATION: Duration = Duration::from_millis(10000); + const UPDATE_API_BUSY_STATUSCODE: &str = "423"; + const ACTIVATE_UPDATES_URI: &str = "/actions/activate-update"; + const OS_URI: &str = "/os"; + const PREPARE_UPDATES_URI: &str = "/actions/prepare-update"; + const REBOOT_URI: &str = "/actions/reboot"; + const REFRESH_UPDATES_URI: &str = "/actions/refresh-updates"; + const UPDATES_STATUS_URI: &str = "/updates/status"; + + pub(super) fn get_raw_args(mut args: Vec) -> Vec { + let mut subcommand_args = vec!["raw".to_string(), "-u".to_string()]; + subcommand_args.append(&mut args); + + subcommand_args + } + + #[derive(Debug, Deserialize)] + pub struct UpdateStatus { + pub update_state: UpdateState, + #[serde(rename = "available_updates")] + pub _available_updates: Vec, + pub chosen_update: Option, + #[serde(rename = "active_partition")] + pub _active_partition: Option, + #[serde(rename = "staging_partition")] + pub _staging_partition: Option, + pub most_recent_command: CommandResult, + } + + /// UpdateState represents four states during system update process + #[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize)] + pub enum UpdateState { + // Idle: Unknown + Idle, + // Available: available versions system can update to + Available, + // Staged: processing system update (refresh, prepare, activate) + Staged, + // Ready: successfully complete update commands, ready to reboot + Ready, + } + #[derive(Debug, Deserialize)] + pub struct UpdateImage { + #[serde(rename = "arch")] + pub _arch: String, + pub version: Version, + #[serde(rename = "variant")] + pub _variant: String, + } + + /// UpdateCommand represents three commands to update system + #[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize)] + #[serde(rename_all = "lowercase")] + pub enum UpdateCommand { + // Refresh: refresh the list of available updates + Refresh, + // Prepare: request that the update be downloaded and applied to disk + Prepare, + // Activate: proceed to "activate" the update + Activate, + } + + /// CommandStatus represents three status after running update command + #[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize)] + pub enum CommandStatus { + Success, + Failed, + Unknown, + } + + #[derive(Debug, Deserialize)] + pub struct StagedImage { + #[serde(rename = "image")] + _image: Option, + #[serde(rename = "next_to_boot")] + _next_to_boot: bool, + } + + #[derive(Debug, Deserialize)] + pub struct CommandResult { + pub cmd_type: UpdateCommand, + pub cmd_status: CommandStatus, + #[serde(rename = "timestamp")] + _timestamp: String, + #[serde(rename = "exit_status")] + _exit_status: u32, + #[serde(rename = "stderr")] + _stderr: String, + } + + #[derive(Debug, Deserialize)] + pub struct OsInfo { + pub version_id: Version, + } + + /// Extract error statuscode from stderr string + /// Error Example: + /// "Failed POST request to '/actions/refresh-updates': Status 423 when POSTing /actions/refresh-updates: Update lock held\n" + fn extract_status_code_from_error(error: &str) -> &str { + let error_content_split_by_status: Vec<&str> = error.split("Status").collect(); + let error_content_split_by_whitespace: Vec<&str> = error_content_split_by_status[1] + .split_whitespace() + .collect(); + error_content_split_by_whitespace[0] + } + + /// Apiclient binary has been volume mounted into the agent container, so agent is able to + /// invoke `/bin apiclient` to interact with the Bottlerocket Update API. + /// This function helps to invoke apiclient raw command. + pub(super) async fn invoke_apiclient(args: Vec) -> Result { + let mut attempts: i8 = 0; + // Retry up to 5 times in case the Update API is busy; Waiting 10 seconds between each attempt. + while attempts < MAX_ATTEMPTS { + let output = Command::new(API_CLIENT_BIN) + .args(&args) + .output() + .context(apiclient_error::ApiClientRawCommandSnafu { args: args.clone() })?; + + match output.status.success() { + true => return Ok(output), + false => { + // Return value `exit status` is Option. When the value has `some` value, we need extract error info from stderr and handle those errors. + // Otherwise, on Unix, this will return `None` if the process was terminated by a signal. Signal termination is not considered a success. + // Apiclient `Reboot` command will send signal to terminate the process, so we have to consider this situation and have extra logic to recognize + // return value `None` as success and terminate the process properly. + match output.status.code() { + // when return value has `some` code, this part will handle those errors properly. + Some(_code) => { + let error_content = String::from_utf8_lossy(&output.stderr).to_string(); + let error_statuscode = extract_status_code_from_error(&error_content); + + match error_statuscode { + UPDATE_API_BUSY_STATUSCODE => { + event!(Level::INFO, "API server busy, retrying later ..."); + // Retry after ten seconds if we get a 423 Locked response (update API busy) + sleep(UPDATE_API_SLEEP_DURATION).await; + attempts += 1; + } + _ => { + // API response was a non-transient error, bail out + return apiclient_error::BadHttpResponseSnafu { + args, + error_content: &error_content, + statuscode: error_statuscode, + } + .fail(); + } + }; + } + // when it returns `None`, this part will treat it as success and then gracefully exit brupop agent. + _ => { + event!( + Level::INFO, + "Bottlerocket node is terminated by reboot signal" + ); + std::process::exit(0) + } + } + } + } + } + // Update API is currently unavailable, bail out + Err(apiclient_error::Error::UpdateApiUnavailable { args }) + } + + pub(super) async fn refresh_updates() -> Result { + let raw_args = vec![ + REFRESH_UPDATES_URI.to_string(), + "-m".to_string(), + "POST".to_string(), + ]; + + invoke_apiclient(get_raw_args(raw_args)).await + } + + pub(super) async fn prepare_update() -> Result<()> { + let raw_args = vec![ + PREPARE_UPDATES_URI.to_string(), + "-m".to_string(), + "POST".to_string(), + ]; + + invoke_apiclient(get_raw_args(raw_args)).await?; + + Ok(()) + } + + pub(super) async fn activate_update() -> Result<()> { + let raw_args = vec![ + ACTIVATE_UPDATES_URI.to_string(), + "-m".to_string(), + "POST".to_string(), + ]; + + invoke_apiclient(get_raw_args(raw_args)).await?; + + Ok(()) + } + + pub(super) async fn get_update_status() -> Result { + // Refresh list of updates and check if there are any available + refresh_updates().await?; + + let raw_args = vec![UPDATES_STATUS_URI.to_string()]; + + let update_status_output = invoke_apiclient(get_raw_args(raw_args)).await?; + + let update_status_string = + String::from_utf8_lossy(&update_status_output.stdout).to_string(); + let update_status: UpdateStatus = serde_json::from_str(&update_status_string) + .context(apiclient_error::UpdateStatusContentSnafu)?; + + Ok(update_status) + } + + pub(super) async fn reboot() -> Result { + let raw_args = vec![REBOOT_URI.to_string(), "-m".to_string(), "POST".to_string()]; + + invoke_apiclient(get_raw_args(raw_args)).await + } + + pub(super) async fn get_os_info() -> Result { + let raw_args = vec![OS_URI.to_string()]; + + let os_info_output = invoke_apiclient(get_raw_args(raw_args)).await?; + + let os_info_content_string = String::from_utf8_lossy(&os_info_output.stdout).to_string(); + let os_info: OsInfo = serde_json::from_str(&os_info_content_string) + .context(apiclient_error::OsContentSnafu)?; + + Ok(os_info) + } +} + +pub mod apiclient_error { use crate::apiclient::UpdateState; use snafu::Snafu; From 482a2ed3a57073117407153f5b5d48a3811677f7 Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Sat, 5 Aug 2023 00:36:39 +0000 Subject: [PATCH 6/7] agent: rate-limit Bottlerocket update API calls --- Cargo.lock | 72 ++++++++++++++++++++++++++++++++++++++++++ agent/Cargo.toml | 2 ++ agent/src/apiclient.rs | 43 +++++++++++++++++-------- 3 files changed, 103 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8c3fe37..2ccfe948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -243,8 +243,10 @@ dependencies = [ "chrono", "dotenv", "futures", + "governor", "k8s-openapi", "kube", + "lazy_static", "models", "semver", "serde", @@ -1445,6 +1447,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -1490,6 +1498,24 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "quanta", + "rand", + "smallvec", +] + [[package]] name = "h2" version = "0.3.19" @@ -2087,6 +2113,15 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "maplit" version = "1.0.2" @@ -2202,6 +2237,12 @@ dependencies = [ "validator", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -2212,6 +2253,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -2539,6 +2586,22 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.28" @@ -2578,6 +2641,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.2.16" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index b0037cad..272e05b4 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -11,6 +11,8 @@ apiserver = { path = "../apiserver", version = "0.1.0", default-features = false dotenv = "0.15" futures = "0.3" +governor = "0.6" +lazy_static = "1" tracing = "0.1" # k8s-openapi must match the version required by kube and enable a k8s version feature diff --git a/agent/src/apiclient.rs b/agent/src/apiclient.rs index 9c0914a8..27344405 100644 --- a/agent/src/apiclient.rs +++ b/agent/src/apiclient.rs @@ -21,12 +21,8 @@ pub async fn get_os_info() -> Result { // get chosen update which contains latest Bottlerocket OS can update to. pub async fn get_chosen_update() -> Result> { - // Refresh list of updates and check if there are any available - api::refresh_updates().await?; - let update_status = api::get_update_status().await?; - // Raise error if failed to refresh update or update acton performed out of band ensure!( update_status.most_recent_command.cmd_type == UpdateCommand::Refresh && update_status.most_recent_command.cmd_status == CommandStatus::Success, @@ -106,6 +102,13 @@ pub async fn boot_into_update() -> Result { pub(super) mod api { //! Low-level Bottlerocket update API interactions use super::{apiclient_error, Result}; + use governor::{ + clock::DefaultClock, + middleware::NoOpMiddleware, + state::{InMemoryState, NotKeyed}, + Quota, RateLimiter, + }; + use lazy_static::lazy_static; use semver::Version; use serde::Deserialize; use snafu::ResultExt; @@ -124,6 +127,13 @@ pub(super) mod api { const REFRESH_UPDATES_URI: &str = "/actions/refresh-updates"; const UPDATES_STATUS_URI: &str = "/updates/status"; + type SimpleRateLimiter = RateLimiter; + + lazy_static! { + static ref UPDATE_API_RATE_LIMITER: SimpleRateLimiter = + RateLimiter::direct(Quota::with_period(Duration::from_secs(10)).unwrap()); + } + pub(super) fn get_raw_args(mut args: Vec) -> Vec { let mut subcommand_args = vec!["raw".to_string(), "-u".to_string()]; subcommand_args.append(&mut args); @@ -225,10 +235,16 @@ pub(super) mod api { /// Apiclient binary has been volume mounted into the agent container, so agent is able to /// invoke `/bin apiclient` to interact with the Bottlerocket Update API. /// This function helps to invoke apiclient raw command. - pub(super) async fn invoke_apiclient(args: Vec) -> Result { + pub(super) async fn invoke_apiclient( + args: Vec, + rate_limiter: Option<&SimpleRateLimiter>, + ) -> Result { let mut attempts: i8 = 0; // Retry up to 5 times in case the Update API is busy; Waiting 10 seconds between each attempt. while attempts < MAX_ATTEMPTS { + if let Some(rate_limiter) = rate_limiter { + rate_limiter.until_ready().await; + } let output = Command::new(API_CLIENT_BIN) .args(&args) .output() @@ -249,7 +265,7 @@ pub(super) mod api { match error_statuscode { UPDATE_API_BUSY_STATUSCODE => { - event!(Level::INFO, "API server busy, retrying later ..."); + event!(Level::INFO, "The lock for the update API is held by another process ..."); // Retry after ten seconds if we get a 423 Locked response (update API busy) sleep(UPDATE_API_SLEEP_DURATION).await; attempts += 1; @@ -288,7 +304,7 @@ pub(super) mod api { "POST".to_string(), ]; - invoke_apiclient(get_raw_args(raw_args)).await + invoke_apiclient(get_raw_args(raw_args), Some(&UPDATE_API_RATE_LIMITER)).await } pub(super) async fn prepare_update() -> Result<()> { @@ -298,7 +314,7 @@ pub(super) mod api { "POST".to_string(), ]; - invoke_apiclient(get_raw_args(raw_args)).await?; + invoke_apiclient(get_raw_args(raw_args), Some(&UPDATE_API_RATE_LIMITER)).await?; Ok(()) } @@ -310,18 +326,17 @@ pub(super) mod api { "POST".to_string(), ]; - invoke_apiclient(get_raw_args(raw_args)).await?; + invoke_apiclient(get_raw_args(raw_args), Some(&UPDATE_API_RATE_LIMITER)).await?; Ok(()) } pub(super) async fn get_update_status() -> Result { - // Refresh list of updates and check if there are any available refresh_updates().await?; let raw_args = vec![UPDATES_STATUS_URI.to_string()]; - - let update_status_output = invoke_apiclient(get_raw_args(raw_args)).await?; + let update_status_output = + invoke_apiclient(get_raw_args(raw_args), Some(&UPDATE_API_RATE_LIMITER)).await?; let update_status_string = String::from_utf8_lossy(&update_status_output.stdout).to_string(); @@ -334,13 +349,13 @@ pub(super) mod api { pub(super) async fn reboot() -> Result { let raw_args = vec![REBOOT_URI.to_string(), "-m".to_string(), "POST".to_string()]; - invoke_apiclient(get_raw_args(raw_args)).await + invoke_apiclient(get_raw_args(raw_args), None).await } pub(super) async fn get_os_info() -> Result { let raw_args = vec![OS_URI.to_string()]; - let os_info_output = invoke_apiclient(get_raw_args(raw_args)).await?; + let os_info_output = invoke_apiclient(get_raw_args(raw_args), None).await?; let os_info_content_string = String::from_utf8_lossy(&os_info_output.stdout).to_string(); let os_info: OsInfo = serde_json::from_str(&os_info_content_string) From 391599274cae0286d6dae4f4dc7cb6423680ee14 Mon Sep 17 00:00:00 2001 From: "Sean P. Kelly" Date: Tue, 8 Aug 2023 19:07:15 +0000 Subject: [PATCH 7/7] agent: use tokio-retry for Bottlerocket API --- agent/src/apiclient.rs | 105 +++++++++++++++++++++++------------------ 1 file changed, 59 insertions(+), 46 deletions(-) diff --git a/agent/src/apiclient.rs b/agent/src/apiclient.rs index 27344405..290d5cd6 100644 --- a/agent/src/apiclient.rs +++ b/agent/src/apiclient.rs @@ -113,12 +113,14 @@ pub(super) mod api { use serde::Deserialize; use snafu::ResultExt; use std::process::{Command, Output}; - use tokio::time::{sleep, Duration}; + use tokio::time::Duration; + use tokio_retry::{ + strategy::{jitter, FixedInterval}, + Retry, + }; use tracing::{event, Level}; const API_CLIENT_BIN: &str = "apiclient"; - const MAX_ATTEMPTS: i8 = 5; - const UPDATE_API_SLEEP_DURATION: Duration = Duration::from_millis(10000); const UPDATE_API_BUSY_STATUSCODE: &str = "423"; const ACTIVATE_UPDATES_URI: &str = "/actions/activate-update"; const OS_URI: &str = "/os"; @@ -232,6 +234,19 @@ pub(super) mod api { error_content_split_by_whitespace[0] } + /// Wait time between invoking the Bottlerocket API + const RETRY_DELAY: Duration = Duration::from_secs(10); + /// Number of retries while invoking the Bottlerocket API + const NUM_RETRIES: usize = 5; + + /// Retry strategy for invoking the Bottlerocket API. + /// Retries to the bottlerocket API occur on a fixed interval with jitter. + fn retry_strategy() -> impl Iterator { + FixedInterval::new(RETRY_DELAY) + .map(jitter) + .take(NUM_RETRIES) + } + /// Apiclient binary has been volume mounted into the agent container, so agent is able to /// invoke `/bin apiclient` to interact with the Bottlerocket Update API. /// This function helps to invoke apiclient raw command. @@ -239,9 +254,7 @@ pub(super) mod api { args: Vec, rate_limiter: Option<&SimpleRateLimiter>, ) -> Result { - let mut attempts: i8 = 0; - // Retry up to 5 times in case the Update API is busy; Waiting 10 seconds between each attempt. - while attempts < MAX_ATTEMPTS { + Retry::spawn(retry_strategy(), || async { if let Some(rate_limiter) = rate_limiter { rate_limiter.until_ready().await; } @@ -250,51 +263,51 @@ pub(super) mod api { .output() .context(apiclient_error::ApiClientRawCommandSnafu { args: args.clone() })?; - match output.status.success() { - true => return Ok(output), - false => { - // Return value `exit status` is Option. When the value has `some` value, we need extract error info from stderr and handle those errors. - // Otherwise, on Unix, this will return `None` if the process was terminated by a signal. Signal termination is not considered a success. - // Apiclient `Reboot` command will send signal to terminate the process, so we have to consider this situation and have extra logic to recognize - // return value `None` as success and terminate the process properly. - match output.status.code() { - // when return value has `some` code, this part will handle those errors properly. - Some(_code) => { - let error_content = String::from_utf8_lossy(&output.stderr).to_string(); - let error_statuscode = extract_status_code_from_error(&error_content); - - match error_statuscode { - UPDATE_API_BUSY_STATUSCODE => { - event!(Level::INFO, "The lock for the update API is held by another process ..."); - // Retry after ten seconds if we get a 423 Locked response (update API busy) - sleep(UPDATE_API_SLEEP_DURATION).await; - attempts += 1; - } - _ => { - // API response was a non-transient error, bail out - return apiclient_error::BadHttpResponseSnafu { - args, - error_content: &error_content, - statuscode: error_statuscode, - } - .fail(); + if output.status.success() { + Ok(output) + } else { + // Return value `exit status` is Option. When the value has `some` value, we need extract error info from stderr and handle those errors. + // Otherwise, on Unix, this will return `None` if the process was terminated by a signal. Signal termination is not considered a success. + // Apiclient `Reboot` command will send signal to terminate the process, so we have to consider this situation and have extra logic to recognize + // return value `None` as success and terminate the process properly. + match output.status.code() { + // when return value has `some` code, this part will handle those errors properly. + Some(_code) => { + let error_content = String::from_utf8_lossy(&output.stderr).to_string(); + let error_statuscode = extract_status_code_from_error(&error_content); + + match error_statuscode { + UPDATE_API_BUSY_STATUSCODE => { + event!( + Level::INFO, + "The lock for the update API is held by another process ..." + ); + apiclient_error::UpdateApiUnavailableSnafu { args: args.clone() } + .fail() + } + _ => { + // API response was a non-transient error, bail out + apiclient_error::BadHttpResponseSnafu { + args: args.clone(), + error_content: &error_content, + statuscode: error_statuscode, } - }; - } - // when it returns `None`, this part will treat it as success and then gracefully exit brupop agent. - _ => { - event!( - Level::INFO, - "Bottlerocket node is terminated by reboot signal" - ); - std::process::exit(0) + .fail() + } } } + // when it returns `None`, this part will treat it as success and then gracefully exit brupop agent. + _ => { + event!( + Level::INFO, + "Bottlerocket node is terminated by reboot signal" + ); + std::process::exit(0) + } } } - } - // Update API is currently unavailable, bail out - Err(apiclient_error::Error::UpdateApiUnavailable { args }) + }) + .await } pub(super) async fn refresh_updates() -> Result {