Skip to content

Commit

Permalink
[object_store] Retry S3 requests with 200 response with "Error" in bo…
Browse files Browse the repository at this point in the history
…dy (#6508)

* rebase

* generalize

* add test

* fix lint

* remove dep

---------

Co-authored-by: Peter Ke <peter.ke@neuralink.com>
  • Loading branch information
PeterKeDer and Peter Ke authored Oct 7, 2024
1 parent b7381de commit e7d370a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 2 deletions.
12 changes: 12 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub(crate) struct Request<'a> {
payload: Option<PutPayload>,
use_session_creds: bool,
idempotent: bool,
retry_error_body: bool,
}

impl<'a> Request<'a> {
Expand All @@ -292,6 +293,13 @@ impl<'a> Request<'a> {
Self { idempotent, ..self }
}

pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
Self {
retry_error_body,
..self
}
}

pub(crate) fn with_encryption_headers(self) -> Self {
let headers = self.config.encryption_headers.clone().into();
let builder = self.builder.headers(headers);
Expand Down Expand Up @@ -379,6 +387,7 @@ impl<'a> Request<'a> {
.with_aws_sigv4(credential.authorizer(), sha)
.retryable(&self.config.retry_config)
.idempotent(self.idempotent)
.retry_error_body(self.retry_error_body)
.payload(self.payload)
.send()
.await
Expand Down Expand Up @@ -413,6 +422,7 @@ impl S3Client {
config: &self.config,
use_session_creds: true,
idempotent: false,
retry_error_body: false,
}
}

Expand Down Expand Up @@ -559,6 +569,7 @@ impl S3Client {

self.request(Method::PUT, to)
.idempotent(true)
.retry_error_body(true)
.header(&COPY_SOURCE_HEADER, &source)
.headers(self.config.encryption_headers.clone().into())
.headers(copy_source_encryption_headers)
Expand Down Expand Up @@ -648,6 +659,7 @@ impl S3Client {
.with_aws_sigv4(credential.authorizer(), None)
.retryable(&self.config.retry_config)
.idempotent(true)
.retry_error_body(true)
.send()
.await
.context(CompleteMultipartRequestSnafu)?;
Expand Down
124 changes: 122 additions & 2 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub enum Error {
#[snafu(display("Received redirect without LOCATION, this normally indicates an incorrectly configured region"))]
BareRedirect,

#[snafu(display("Server error, body contains Error, with status {status}: {}", body.as_deref().unwrap_or("No Body")))]
Server {
status: StatusCode,
body: Option<String>,
},

#[snafu(display("Client error with status {status}: {}", body.as_deref().unwrap_or("No Body")))]
Client {
status: StatusCode,
Expand All @@ -54,6 +60,7 @@ impl Error {
pub fn status(&self) -> Option<StatusCode> {
match self {
Self::BareRedirect => None,
Self::Server { status, .. } => Some(*status),
Self::Client { status, .. } => Some(*status),
Self::Reqwest { source, .. } => source.status(),
}
Expand All @@ -63,6 +70,7 @@ impl Error {
pub fn body(&self) -> Option<&str> {
match self {
Self::Client { body, .. } => body.as_deref(),
Self::Server { body, .. } => body.as_deref(),
Self::BareRedirect => None,
Self::Reqwest { .. } => None,
}
Expand Down Expand Up @@ -178,6 +186,10 @@ impl Default for RetryConfig {
}
}

fn body_contains_error(response_body: &str) -> bool {
response_body.contains("InternalError") || response_body.contains("SlowDown")
}

pub(crate) struct RetryableRequest {
client: Client,
request: Request,
Expand All @@ -189,6 +201,8 @@ pub(crate) struct RetryableRequest {
sensitive: bool,
idempotent: Option<bool>,
payload: Option<PutPayload>,

retry_error_body: bool,
}

impl RetryableRequest {
Expand Down Expand Up @@ -216,6 +230,14 @@ impl RetryableRequest {
Self { payload, ..self }
}

#[allow(unused)]
pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
Self {
retry_error_body,
..self
}
}

pub(crate) async fn send(self) -> Result<Response> {
let max_retries = self.max_retries;
let retry_timeout = self.retry_timeout;
Expand Down Expand Up @@ -244,7 +266,57 @@ impl RetryableRequest {

match self.client.execute(request).await {
Ok(r) => match r.error_for_status_ref() {
Ok(_) if r.status().is_success() => return Ok(r),
Ok(_) if r.status().is_success() => {
// For certain S3 requests, 200 response may contain `InternalError` or
// `SlowDown` in the message. These responses should be handled similarly
// to r5xx errors.
// More info here: https://repost.aws/knowledge-center/s3-resolve-200-internalerror
if !self.retry_error_body {
return Ok(r);
}

let status = r.status();
let headers = r.headers().clone();

let bytes = r.bytes().await.map_err(|e| Error::Reqwest {
retries,
max_retries,
elapsed: now.elapsed(),
retry_timeout,
source: e,
})?;

let response_body = String::from_utf8_lossy(&bytes);
info!("Checking for error in response_body: {}", response_body);

if !body_contains_error(&response_body) {
// Success response and no error, clone and return response
let mut success_response = hyper::Response::new(bytes);
*success_response.status_mut() = status;
*success_response.headers_mut() = headers;

return Ok(reqwest::Response::from(success_response));
} else {
// Retry as if this was a 5xx response
if retries == max_retries || now.elapsed() > retry_timeout {
return Err(Error::Server {
body: Some(response_body.into_owned()),
status,
});
}

let sleep = backoff.next();
retries += 1;
info!(
"Encountered a response status of {} but body contains Error, backing off for {} seconds, retry {} of {}",
status,
sleep.as_secs_f32(),
retries,
max_retries,
);
tokio::time::sleep(sleep).await;
}
}
Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
return Err(Error::Client {
body: None,
Expand Down Expand Up @@ -395,6 +467,7 @@ impl RetryExt for reqwest::RequestBuilder {
idempotent: None,
payload: None,
sensitive: false,
retry_error_body: false,
}
}

Expand All @@ -407,13 +480,27 @@ impl RetryExt for reqwest::RequestBuilder {
#[cfg(test)]
mod tests {
use crate::client::mock_server::MockServer;
use crate::client::retry::{Error, RetryExt};
use crate::client::retry::{body_contains_error, Error, RetryExt};
use crate::RetryConfig;
use hyper::header::LOCATION;
use hyper::Response;
use reqwest::{Client, Method, StatusCode};
use std::time::Duration;

#[test]
fn test_body_contains_error() {
// Example error message provided by https://repost.aws/knowledge-center/s3-resolve-200-internalerror
let error_response = "AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 200; Error Code: InternalError; Request ID: 0EXAMPLE9AAEB265)";
assert!(body_contains_error(error_response));

let error_response_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code><Message>Please reduce your request rate.</Message><RequestId>123</RequestId><HostId>456</HostId></Error>";
assert!(body_contains_error(error_response_2));

// Example success response from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
let success_response = "<CopyObjectResult><LastModified>2009-10-12T17:50:30.000Z</LastModified><ETag>\"9b2cf535f27731c974343645a3985328\"</ETag></CopyObjectResult>";
assert!(!body_contains_error(success_response));
}

#[tokio::test]
async fn test_retry() {
let mock = MockServer::new().await;
Expand Down Expand Up @@ -637,6 +724,39 @@ mod tests {
let err = req.send().await.unwrap_err().to_string();
assert!(!err.contains("SENSITIVE"), "{err}");

// Success response with error in body is retried
mock.push(
Response::builder()
.status(StatusCode::OK)
.body("InternalError".to_string())
.unwrap(),
);
let req = client
.request(Method::PUT, &url)
.retryable(&retry)
.idempotent(true)
.retry_error_body(true);
let r = req.send().await.unwrap();
assert_eq!(r.status(), StatusCode::OK);
// Response with InternalError should have been retried
assert!(!r.text().await.unwrap().contains("InternalError"));

// Should not retry success response with no error in body
mock.push(
Response::builder()
.status(StatusCode::OK)
.body("success".to_string())
.unwrap(),
);
let req = client
.request(Method::PUT, &url)
.retryable(&retry)
.idempotent(true)
.retry_error_body(true);
let r = req.send().await.unwrap();
assert_eq!(r.status(), StatusCode::OK);
assert!(r.text().await.unwrap().contains("success"));

// Shutdown
mock.shutdown().await
}
Expand Down

0 comments on commit e7d370a

Please sign in to comment.