From e7d370abea4930ce7d58f841885783a279f1ba23 Mon Sep 17 00:00:00 2001 From: Peter Ke Date: Mon, 7 Oct 2024 01:30:27 -0700 Subject: [PATCH] [object_store] Retry S3 requests with 200 response with "Error" in body (#6508) * rebase * generalize * add test * fix lint * remove dep --------- Co-authored-by: Peter Ke --- object_store/src/aws/client.rs | 12 +++ object_store/src/client/retry.rs | 124 ++++++++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 2 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index cc74f2dc3e3..4b4d0b6e3b4 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -266,6 +266,7 @@ pub(crate) struct Request<'a> { payload: Option, use_session_creds: bool, idempotent: bool, + retry_error_body: bool, } impl<'a> Request<'a> { @@ -292,6 +293,13 @@ impl<'a> Request<'a> { Self { idempotent, ..self } } + pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self { + Self { + retry_error_body, + ..self + } + } + pub(crate) fn with_encryption_headers(self) -> Self { let headers = self.config.encryption_headers.clone().into(); let builder = self.builder.headers(headers); @@ -379,6 +387,7 @@ impl<'a> Request<'a> { .with_aws_sigv4(credential.authorizer(), sha) .retryable(&self.config.retry_config) .idempotent(self.idempotent) + .retry_error_body(self.retry_error_body) .payload(self.payload) .send() .await @@ -413,6 +422,7 @@ impl S3Client { config: &self.config, use_session_creds: true, idempotent: false, + retry_error_body: false, } } @@ -559,6 +569,7 @@ impl S3Client { self.request(Method::PUT, to) .idempotent(true) + .retry_error_body(true) .header(©_SOURCE_HEADER, &source) .headers(self.config.encryption_headers.clone().into()) .headers(copy_source_encryption_headers) @@ -648,6 +659,7 @@ impl S3Client { .with_aws_sigv4(credential.authorizer(), None) .retryable(&self.config.retry_config) .idempotent(true) + .retry_error_body(true) .send() .await .context(CompleteMultipartRequestSnafu)?; diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index 2f2ba0ac76e..601bffdec15 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -33,6 +33,12 @@ pub enum Error { #[snafu(display("Received redirect without LOCATION, this normally indicates an incorrectly configured region"))] BareRedirect, + #[snafu(display("Server error, body contains Error, with status {status}: {}", body.as_deref().unwrap_or("No Body")))] + Server { + status: StatusCode, + body: Option, + }, + #[snafu(display("Client error with status {status}: {}", body.as_deref().unwrap_or("No Body")))] Client { status: StatusCode, @@ -54,6 +60,7 @@ impl Error { pub fn status(&self) -> Option { match self { Self::BareRedirect => None, + Self::Server { status, .. } => Some(*status), Self::Client { status, .. } => Some(*status), Self::Reqwest { source, .. } => source.status(), } @@ -63,6 +70,7 @@ impl Error { pub fn body(&self) -> Option<&str> { match self { Self::Client { body, .. } => body.as_deref(), + Self::Server { body, .. } => body.as_deref(), Self::BareRedirect => None, Self::Reqwest { .. } => None, } @@ -178,6 +186,10 @@ impl Default for RetryConfig { } } +fn body_contains_error(response_body: &str) -> bool { + response_body.contains("InternalError") || response_body.contains("SlowDown") +} + pub(crate) struct RetryableRequest { client: Client, request: Request, @@ -189,6 +201,8 @@ pub(crate) struct RetryableRequest { sensitive: bool, idempotent: Option, payload: Option, + + retry_error_body: bool, } impl RetryableRequest { @@ -216,6 +230,14 @@ impl RetryableRequest { Self { payload, ..self } } + #[allow(unused)] + pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self { + Self { + retry_error_body, + ..self + } + } + pub(crate) async fn send(self) -> Result { let max_retries = self.max_retries; let retry_timeout = self.retry_timeout; @@ -244,7 +266,57 @@ impl RetryableRequest { match self.client.execute(request).await { Ok(r) => match r.error_for_status_ref() { - Ok(_) if r.status().is_success() => return Ok(r), + Ok(_) if r.status().is_success() => { + // For certain S3 requests, 200 response may contain `InternalError` or + // `SlowDown` in the message. These responses should be handled similarly + // to r5xx errors. + // More info here: https://repost.aws/knowledge-center/s3-resolve-200-internalerror + if !self.retry_error_body { + return Ok(r); + } + + let status = r.status(); + let headers = r.headers().clone(); + + let bytes = r.bytes().await.map_err(|e| Error::Reqwest { + retries, + max_retries, + elapsed: now.elapsed(), + retry_timeout, + source: e, + })?; + + let response_body = String::from_utf8_lossy(&bytes); + info!("Checking for error in response_body: {}", response_body); + + if !body_contains_error(&response_body) { + // Success response and no error, clone and return response + let mut success_response = hyper::Response::new(bytes); + *success_response.status_mut() = status; + *success_response.headers_mut() = headers; + + return Ok(reqwest::Response::from(success_response)); + } else { + // Retry as if this was a 5xx response + if retries == max_retries || now.elapsed() > retry_timeout { + return Err(Error::Server { + body: Some(response_body.into_owned()), + status, + }); + } + + let sleep = backoff.next(); + retries += 1; + info!( + "Encountered a response status of {} but body contains Error, backing off for {} seconds, retry {} of {}", + status, + sleep.as_secs_f32(), + retries, + max_retries, + ); + tokio::time::sleep(sleep).await; + } + } Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { return Err(Error::Client { body: None, @@ -395,6 +467,7 @@ impl RetryExt for reqwest::RequestBuilder { idempotent: None, payload: None, sensitive: false, + retry_error_body: false, } } @@ -407,13 +480,27 @@ impl RetryExt for reqwest::RequestBuilder { #[cfg(test)] mod tests { use crate::client::mock_server::MockServer; - use crate::client::retry::{Error, RetryExt}; + use crate::client::retry::{body_contains_error, Error, RetryExt}; use crate::RetryConfig; use hyper::header::LOCATION; use hyper::Response; use reqwest::{Client, Method, StatusCode}; use std::time::Duration; + #[test] + fn test_body_contains_error() { + // Example error message provided by https://repost.aws/knowledge-center/s3-resolve-200-internalerror + let error_response = "AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 200; Error Code: InternalError; Request ID: 0EXAMPLE9AAEB265)"; + assert!(body_contains_error(error_response)); + + let error_response_2 = "SlowDownPlease reduce your request rate.123456"; + assert!(body_contains_error(error_response_2)); + + // Example success response from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html + let success_response = "2009-10-12T17:50:30.000Z\"9b2cf535f27731c974343645a3985328\""; + assert!(!body_contains_error(success_response)); + } + #[tokio::test] async fn test_retry() { let mock = MockServer::new().await; @@ -637,6 +724,39 @@ mod tests { let err = req.send().await.unwrap_err().to_string(); assert!(!err.contains("SENSITIVE"), "{err}"); + // Success response with error in body is retried + mock.push( + Response::builder() + .status(StatusCode::OK) + .body("InternalError".to_string()) + .unwrap(), + ); + let req = client + .request(Method::PUT, &url) + .retryable(&retry) + .idempotent(true) + .retry_error_body(true); + let r = req.send().await.unwrap(); + assert_eq!(r.status(), StatusCode::OK); + // Response with InternalError should have been retried + assert!(!r.text().await.unwrap().contains("InternalError")); + + // Should not retry success response with no error in body + mock.push( + Response::builder() + .status(StatusCode::OK) + .body("success".to_string()) + .unwrap(), + ); + let req = client + .request(Method::PUT, &url) + .retryable(&retry) + .idempotent(true) + .retry_error_body(true); + let r = req.send().await.unwrap(); + assert_eq!(r.status(), StatusCode::OK); + assert!(r.text().await.unwrap().contains("success")); + // Shutdown mock.shutdown().await }