From f19a9da9b8f79a7c448246475539c03e1bb58d7e Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Tue, 2 Apr 2024 14:02:08 -0500 Subject: [PATCH] make it possible to retry any response (#3389) ## Motivation and Context https://github.com/awsdocs/aws-doc-sdk-examples/pull/6021 ## Description This change makes it possible to retry requests that were successfully deserialized into an output. ## Testing I wrote a test ## Checklist - [x] I have updated `CHANGELOG.next.toml` if I made changes to the smithy-rs codegen or runtime crates - [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS SDK, generated SDK code, or SDK runtime crates ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --- CHANGELOG.next.toml | 12 + .../src/client/retries.rs | 9 + .../src/client/retries/classifiers.rs | 8 + .../src/client/retries/strategy/standard.rs | 210 +++++++++++++----- 4 files changed, 186 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 5543049a42..86762ed1b2 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -71,6 +71,18 @@ references = ["smithy-rs#3493"] meta = { "breaking" = false, "tada" = true, "bug" = false } author = "Velfi" +[[smithy-rs]] +message = "All requests are now retryable, even if they are deserialized successfully. Previously, this was not allowed." +references = ["smithy-rs#3389"] +meta = { "breaking" = false, "tada" = false, "bug" = false } +authors = ["Velfi"] + +[[aws-sdk-rust]] +message = "All requests are now retryable, even if they are deserialized successfully. Previously, this was not allowed." +references = ["smithy-rs#3389"] +meta = { "breaking" = false, "tada" = false, "bug" = false } +author = "Velfi" + [[smithy-rs]] message = "Fix bug in Hyper 1.0 support where https URLs returned an error" references = ["smithy-rs#3539"] diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/retries.rs b/rust-runtime/aws-smithy-runtime-api/src/client/retries.rs index 036371ddc1..d122420a50 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/retries.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/retries.rs @@ -44,6 +44,15 @@ impl ShouldAttempt { _ => panic!("Expected this to be the `YesAfterDelay` variant but it was the `{self:?}` variant instead"), } } + + /// If this isn't a `No` variant, panic. + pub fn expect_no(self) { + if ShouldAttempt::No == self { + return; + } + + panic!("Expected this to be the `No` variant but it was the `{self:?}` variant instead"); + } } impl_shared_conversions!(convert SharedRetryStrategy from RetryStrategy using SharedRetryStrategy::new); diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/retries/classifiers.rs b/rust-runtime/aws-smithy-runtime-api/src/client/retries/classifiers.rs index a99c377e1e..cd2a2cfe6c 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/retries/classifiers.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/retries/classifiers.rs @@ -103,6 +103,14 @@ impl RetryAction { pub fn client_error() -> Self { Self::retryable_error(ErrorKind::ClientError) } + + /// Check if a retry is indicated. + pub fn should_retry(&self) -> bool { + match self { + Self::NoActionIndicated | Self::RetryForbidden => false, + Self::RetryIndicated(_) => true, + } + } } /// The reason for a retry. diff --git a/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs b/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs index de74b31c2b..fc360715da 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs @@ -3,6 +3,20 @@ * SPDX-License-Identifier: Apache-2.0 */ +use std::sync::Mutex; +use std::time::{Duration, SystemTime}; + +use tokio::sync::OwnedSemaphorePermit; +use tracing::debug; + +use aws_smithy_runtime_api::box_error::BoxError; +use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; +use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason}; +use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt}; +use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; +use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace}; +use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode}; + use crate::client::retries::classifiers::run_classifiers_on_ctx; use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason}; use crate::client::retries::strategy::standard::ReleaseResult::{ @@ -11,17 +25,6 @@ use crate::client::retries::strategy::standard::ReleaseResult::{ use crate::client::retries::token_bucket::TokenBucket; use crate::client::retries::{ClientRateLimiterPartition, RetryPartition}; use crate::static_partition_map::StaticPartitionMap; -use aws_smithy_runtime_api::box_error::BoxError; -use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; -use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason}; -use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt}; -use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; -use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace}; -use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode}; -use std::sync::Mutex; -use std::time::{Duration, SystemTime}; -use tokio::sync::OwnedSemaphorePermit; -use tracing::debug; static CLIENT_RATE_LIMITER: StaticPartitionMap = StaticPartitionMap::new(); @@ -56,7 +59,7 @@ impl StandardRetryStrategy { fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) { let mut old_retry_permit = self.retry_permit.lock().unwrap(); if let Some(p) = old_retry_permit.replace(new_retry_permit) { - // Whenever we set a new retry permit and it replaces the old one, we need to "forget" + // Whenever we set a new retry permit, and it replaces the old one, we need to "forget" // the old permit, removing it from the bucket forever. p.forget() } @@ -141,7 +144,7 @@ impl StandardRetryStrategy { // Get the backoff time multiplier in seconds (with fractional seconds) retry_cfg.initial_backoff().as_secs_f64(), // `self.local.attempts` tracks number of requests made including the initial request - // The initial attempt shouldn't count towards backoff calculations so we subtract it + // The initial attempt shouldn't count towards backoff calculations, so we subtract it request_attempts - 1, ); Ok(Duration::from_secs_f64(backoff).min(retry_cfg.max_backoff())) @@ -194,27 +197,6 @@ impl RetryStrategy for StandardRetryStrategy { cfg: &ConfigBag, ) -> Result { let retry_cfg = cfg.load::().expect("retry config is required"); - // Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it - let output_or_error = ctx.output_or_error().expect( - "This must never be called without reaching the point where the result exists.", - ); - let token_bucket = cfg.load::(); - if output_or_error.is_ok() { - debug!("request succeeded, no retry necessary"); - if let Some(tb) = token_bucket { - // If this retry strategy is holding any permits, release them back to the bucket. - if let NoPermitWasReleased = self.release_retry_permit() { - // In the event that there was no retry permit to release, we generate new - // permits from nothing. We do this to make up for permits we had to "forget". - // Otherwise, repeated retries would empty the bucket and nothing could fill it - // back up again. - tb.regenerate_a_token(); - } - } - update_rate_limiter_if_exists(runtime_components, cfg, false); - - return Ok(ShouldAttempt::No); - } // Check if we're out of attempts let request_attempts = cfg @@ -236,19 +218,40 @@ impl RetryStrategy for StandardRetryStrategy { let retry_classifiers = runtime_components.retry_classifiers(); let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx); - // Calculate the appropriate backoff time. - let backoff = - match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) { + if classifier_result.should_retry() { + // Calculate the appropriate backoff time. + let backoff = match self.calculate_backoff( + runtime_components, + cfg, + retry_cfg, + &classifier_result, + ) { Ok(value) => value, // In some cases, backoff calculation will decide that we shouldn't retry at all. Err(value) => return Ok(value), }; - debug!( - "attempt #{request_attempts} failed with {:?}; retrying after {:?}", - classifier_result, backoff, - ); + debug!( + "attempt #{request_attempts} failed with {:?}; retrying after {:?}", + classifier_result, backoff, + ); + + Ok(ShouldAttempt::YesAfterDelay(backoff)) + } else { + debug!("attempt #{request_attempts} succeeded, no retry necessary"); + if let Some(tb) = cfg.load::() { + // If this retry strategy is holding any permits, release them back to the bucket. + if let NoPermitWasReleased = self.release_retry_permit() { + // In the event that there was no retry permit to release, we generate new + // permits from nothing. We do this to make up for permits we had to "forget". + // Otherwise, repeated retries would empty the bucket and nothing could fill it + // back up again. + tb.regenerate_a_token(); + } + } + update_rate_limiter_if_exists(runtime_components, cfg, false); - Ok(ShouldAttempt::YesAfterDelay(backoff)) + Ok(ShouldAttempt::No) + } } } @@ -305,34 +308,43 @@ fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 { #[cfg(test)] mod tests { - use super::*; + use std::fmt; + use std::sync::Mutex; + use std::time::Duration; + + use aws_smithy_runtime_api::client::interceptors::context::{ + Input, InterceptorContext, Output, + }; use aws_smithy_runtime_api::client::orchestrator::OrchestratorError; use aws_smithy_runtime_api::client::retries::classifiers::{ ClassifyRetry, RetryAction, SharedRetryClassifier, }; - use aws_smithy_runtime_api::client::retries::{AlwaysRetry, RetryStrategy}; - use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder; - use aws_smithy_types::config_bag::Layer; - use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind}; - use std::fmt; - use std::sync::Mutex; - use std::time::Duration; + use aws_smithy_runtime_api::client::retries::{ + AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt, + }; + use aws_smithy_runtime_api::client::runtime_components::{ + RuntimeComponents, RuntimeComponentsBuilder, + }; + use aws_smithy_types::config_bag::{ConfigBag, Layer}; + use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind, RetryConfig}; + use super::{calculate_exponential_backoff, StandardRetryStrategy}; #[cfg(feature = "test-util")] - use crate::client::retries::token_bucket::TokenBucket; - use aws_smithy_runtime_api::client::interceptors::context::{Input, Output}; + use crate::client::retries::TokenBucket; #[test] fn no_retry_necessary_for_ok_result() { let cfg = ConfigBag::of_layers(vec![{ let mut layer = Layer::new("test"); layer.store_put(RetryConfig::standard()); + layer.store_put(RequestAttempts::new(1)); layer }]); let rc = RuntimeComponentsBuilder::for_tests().build().unwrap(); let mut ctx = InterceptorContext::new(Input::doesnt_matter()); let strategy = StandardRetryStrategy::default(); ctx.set_output_or_error(Ok(Output::doesnt_matter())); + let actual = strategy .should_attempt_retry(&ctx, &rc, &cfg) .expect("method is infallible for this use"); @@ -441,7 +453,7 @@ mod tests { #[cfg(feature = "test-util")] impl PresetReasonRetryClassifier { fn new(mut retry_reasons: Vec) -> Self { - // We'll pop the retry_reasons in reverse order so we reverse the list to fix that. + // We'll pop the retry_reasons in reverse order, so we reverse the list to fix that. retry_reasons.reverse(); Self { retry_actions: Mutex::new(retry_reasons), @@ -557,6 +569,98 @@ mod tests { assert_eq!(token_bucket.available_permits(), 490); } + #[cfg(feature = "test-util")] + #[test] + fn successful_request_and_deser_should_be_retryable() { + #[derive(Clone, Copy, Debug)] + enum LongRunningOperationStatus { + Running, + Complete, + } + + #[derive(Debug)] + struct LongRunningOperationOutput { + status: Option, + } + + impl LongRunningOperationOutput { + fn status(&self) -> Option { + self.status + } + } + + struct WaiterRetryClassifier {} + + impl WaiterRetryClassifier { + fn new() -> Self { + WaiterRetryClassifier {} + } + } + + impl fmt::Debug for WaiterRetryClassifier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "WaiterRetryClassifier") + } + } + impl ClassifyRetry for WaiterRetryClassifier { + fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction { + let status: Option = + ctx.output_or_error().and_then(|res| { + res.ok().and_then(|output| { + output + .downcast_ref::() + .and_then(|output| output.status()) + }) + }); + + if let Some(LongRunningOperationStatus::Running) = status { + return RetryAction::server_error(); + }; + + RetryAction::NoActionIndicated + } + + fn name(&self) -> &'static str { + "waiter retry classifier" + } + } + + let retry_config = RetryConfig::standard() + .with_use_static_exponential_base(true) + .with_max_attempts(5); + + let rc = RuntimeComponentsBuilder::for_tests() + .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new())) + .build() + .unwrap(); + let mut layer = Layer::new("test"); + layer.store_put(retry_config); + let mut cfg = ConfigBag::of_layers(vec![layer]); + let mut ctx = InterceptorContext::new(Input::doesnt_matter()); + let strategy = StandardRetryStrategy::new(); + + ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput { + status: Some(LongRunningOperationStatus::Running), + }))); + + cfg.interceptor_state().store_put(TokenBucket::new(5)); + let token_bucket = cfg.load::().unwrap().clone(); + + cfg.interceptor_state().store_put(RequestAttempts::new(1)); + let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap(); + let dur = should_retry.expect_delay(); + assert_eq!(dur, Duration::from_secs(1)); + assert_eq!(token_bucket.available_permits(), 0); + + ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput { + status: Some(LongRunningOperationStatus::Complete), + }))); + cfg.interceptor_state().store_put(RequestAttempts::new(2)); + let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap(); + should_retry.expect_no(); + assert_eq!(token_bucket.available_permits(), 5); + } + #[cfg(feature = "test-util")] #[test] fn no_quota() {