From ce5f0aae3fcf8533e519b799236d165b32332a62 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Thu, 5 Oct 2023 14:19:56 -0700 Subject: [PATCH] Port connection poisoning tests to orchestrator and fix issues discovered (#3029) Issues fixed: - `ConnectionPoisoningInterceptor` should use the `read_after_deserialization` hook so that it's possible to poison modeled transient errors. - aws-config should enable connection poisoning on its IMDS/ECS/HTTP clients. ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --- .../src/http_credential_provider.rs | 1 + .../aws-config/src/imds/client.rs | 1 + .../aws-config/src/imds/client/token.rs | 1 + .../client/interceptors/context/wrappers.rs | 104 +++++- .../client/connectors/connection_poisoning.rs | 8 +- .../src/client/orchestrator/operation.rs | 6 + .../tests/reconnect_on_transient_error.rs | 302 ++++++++++++++++++ 7 files changed, 415 insertions(+), 8 deletions(-) create mode 100644 rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs diff --git a/aws/rust-runtime/aws-config/src/http_credential_provider.rs b/aws/rust-runtime/aws-config/src/http_credential_provider.rs index 87950ea17c..2bb2cc25c7 100644 --- a/aws/rust-runtime/aws-config/src/http_credential_provider.rs +++ b/aws/rust-runtime/aws-config/src/http_credential_provider.rs @@ -115,6 +115,7 @@ impl Builder { .http_connector(SharedHttpConnector::new(DynConnectorAdapter::new( connector, ))) + .with_connection_poisoning() .endpoint_url(endpoint) .no_auth() .runtime_plugin(StaticRuntimePlugin::new().with_config({ diff --git a/aws/rust-runtime/aws-config/src/imds/client.rs b/aws/rust-runtime/aws-config/src/imds/client.rs index ed76fc8660..32aa772023 100644 --- a/aws/rust-runtime/aws-config/src/imds/client.rs +++ b/aws/rust-runtime/aws-config/src/imds/client.rs @@ -458,6 +458,7 @@ impl Builder { config.time_source(), self.token_ttl.unwrap_or(DEFAULT_TOKEN_TTL), )) + .with_connection_poisoning() .serializer(|path| { Ok(http::Request::builder() .uri(path) diff --git a/aws/rust-runtime/aws-config/src/imds/client/token.rs b/aws/rust-runtime/aws-config/src/imds/client/token.rs index 98e1a79a80..ec3ffe406d 100644 --- a/aws/rust-runtime/aws-config/src/imds/client/token.rs +++ b/aws/rust-runtime/aws-config/src/imds/client/token.rs @@ -132,6 +132,7 @@ impl TokenResolver { .operation_name("get-token") .runtime_plugin(common_plugin) .no_auth() + .with_connection_poisoning() .serializer(move |_| { Ok(http::Request::builder() .method("PUT") diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context/wrappers.rs b/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context/wrappers.rs index 77f751e136..bead9491db 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context/wrappers.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context/wrappers.rs @@ -54,6 +54,14 @@ impl<'a, I, O, E> BeforeSerializationInterceptorContextRef<'a, I, O, E> { pub fn input(&self) -> &I { expect!(self, input) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } } // @@ -80,6 +88,22 @@ impl<'a, I, O, E> BeforeSerializationInterceptorContextMut<'a, I, O, E> { pub fn input_mut(&mut self) -> &mut I { expect!(self, input_mut) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner_mut(&mut self) -> &'_ mut InterceptorContext { + self.inner + } } // @@ -101,6 +125,14 @@ impl<'a, I, O, E> BeforeTransmitInterceptorContextRef<'a, I, O, E> { pub fn request(&self) -> &Request { expect!(self, request) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } } // @@ -127,6 +159,22 @@ impl<'a, I, O, E> BeforeTransmitInterceptorContextMut<'a, I, O, E> { pub fn request_mut(&mut self) -> &mut Request { expect!(self, request_mut) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner_mut(&mut self) -> &'_ mut InterceptorContext { + self.inner + } } // @@ -148,6 +196,14 @@ impl<'a, I, O, E> BeforeDeserializationInterceptorContextRef<'a, I, O, E> { pub fn response(&self) -> &Response { expect!(self, response) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } } // @@ -174,10 +230,18 @@ impl<'a, I, O, E> BeforeDeserializationInterceptorContextMut<'a, I, O, E> { expect!(self, response_mut) } - #[doc(hidden)] - /// Downgrade this helper struct, returning the underlying InterceptorContext. There's no good - /// reason to use this unless you're writing tests or you have to interact with an API that - /// doesn't support the helper structs. + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. pub fn inner_mut(&mut self) -> &'_ mut InterceptorContext { self.inner } @@ -206,6 +270,14 @@ impl<'a, I, O, E> AfterDeserializationInterceptorContextRef<'a, I, O, E> { pub fn output_or_error(&self) -> Result<&O, &OrchestratorError> { expect!(self, output_or_error) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } } // @@ -243,6 +315,14 @@ impl<'a, I, O, E> FinalizerInterceptorContextRef<'a, I, O, E> { pub fn output_or_error(&self) -> Option>> { self.inner.output_or_error.as_ref().map(|o| o.as_ref()) } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } } // @@ -300,4 +380,20 @@ impl<'a, I, O, E> FinalizerInterceptorContextMut<'a, I, O, E> { pub fn output_or_error_mut(&mut self) -> Option<&mut Result>> { self.inner.output_or_error.as_mut() } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner(&self) -> &'_ InterceptorContext { + self.inner + } + + /// Downgrade this wrapper struct, returning the underlying InterceptorContext. + /// + /// There's no good reason to use this unless you're writing tests or you have to + /// interact with an API that doesn't support the context wrapper structs. + pub fn inner_mut(&mut self) -> &'_ mut InterceptorContext { + self.inner + } } diff --git a/rust-runtime/aws-smithy-runtime/src/client/connectors/connection_poisoning.rs b/rust-runtime/aws-smithy-runtime/src/client/connectors/connection_poisoning.rs index 5f6f4e7862..fe26f38520 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/connectors/connection_poisoning.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/connectors/connection_poisoning.rs @@ -6,7 +6,7 @@ use aws_smithy_http::connection::{CaptureSmithyConnection, ConnectionMetadata}; use aws_smithy_runtime_api::box_error::BoxError; use aws_smithy_runtime_api::client::interceptors::context::{ - BeforeDeserializationInterceptorContextMut, BeforeTransmitInterceptorContextMut, + AfterDeserializationInterceptorContextRef, BeforeTransmitInterceptorContextMut, }; use aws_smithy_runtime_api::client::interceptors::Interceptor; use aws_smithy_runtime_api::client::retries::{ClassifyRetry, RetryReason}; @@ -61,9 +61,9 @@ impl Interceptor for ConnectionPoisoningInterceptor { Ok(()) } - fn modify_before_deserialization( + fn read_after_deserialization( &self, - context: &mut BeforeDeserializationInterceptorContextMut<'_>, + context: &AfterDeserializationInterceptorContextRef<'_>, runtime_components: &RuntimeComponents, cfg: &mut ConfigBag, ) -> Result<(), BoxError> { @@ -77,7 +77,7 @@ impl Interceptor for ConnectionPoisoningInterceptor { .ok_or("retry classifiers are required for connection poisoning to work")?; let error_is_transient = retry_classifiers - .classify_retry(context.inner_mut()) + .classify_retry(context.inner()) .map(|reason| reason == RetryReason::Error(ErrorKind::TransientError)) .unwrap_or_default(); let connection_poisoning_is_enabled = diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index ac61dc7222..3c483ca593 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -4,6 +4,7 @@ */ use crate::client::auth::no_auth::{NoAuthScheme, NO_AUTH_SCHEME_ID}; +use crate::client::connectors::connection_poisoning::ConnectionPoisoningInterceptor; use crate::client::identity::no_auth::NoAuthIdentityResolver; use crate::client::orchestrator::endpoints::StaticUriEndpointResolver; use crate::client::retries::strategy::{NeverRetryStrategy, StandardRetryStrategy}; @@ -254,6 +255,11 @@ impl OperationBuilder { self } + /// Registers the [`ConnectionPoisoningInterceptor`]. + pub fn with_connection_poisoning(self) -> Self { + self.interceptor(ConnectionPoisoningInterceptor::new()) + } + pub fn runtime_plugin(mut self, runtime_plugin: impl IntoShared) -> Self { self.runtime_plugins.push(runtime_plugin.into_shared()); self diff --git a/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs b/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs new file mode 100644 index 0000000000..bf1d8220d9 --- /dev/null +++ b/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs @@ -0,0 +1,302 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#![cfg(all( + feature = "client", + feature = "wire-mock", + feature = "connector-hyper-0-14-x", +))] + +use ::aws_smithy_runtime::client::retries::classifier::{ + HttpStatusCodeClassifier, SmithyErrorClassifier, +}; +use ::aws_smithy_runtime_api::client::retries::RetryClassifiers; +use aws_smithy_async::rt::sleep::TokioSleep; +use aws_smithy_http::body::{BoxBody, SdkBody}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use aws_smithy_runtime::client::http::test_util::wire::{ + RecordedEvent, ReplayedEvent, WireMockServer, +}; +use aws_smithy_runtime::client::orchestrator::operation::Operation; +use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs; +use aws_smithy_runtime::{ev, match_events}; +use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; +use aws_smithy_runtime_api::client::orchestrator::OrchestratorError; +use aws_smithy_runtime_api::client::retries::{ClassifyRetry, RetryReason}; +use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind, ReconnectMode, RetryConfig}; +use aws_smithy_types::timeout::TimeoutConfig; +use hyper::client::Builder as HyperBuilder; +use std::fmt; +use std::time::Duration; + +const END_OF_TEST: &str = "end_of_test"; + +#[derive(Debug)] +struct OperationError(ErrorKind); + +impl fmt::Display for OperationError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ProvideErrorKind for OperationError { + fn retryable_error_kind(&self) -> Option { + Some(self.0) + } + + fn code(&self) -> Option<&str> { + None + } +} + +impl std::error::Error for OperationError {} + +#[derive(Debug)] +struct TestRetryClassifier; + +impl ClassifyRetry for TestRetryClassifier { + fn classify_retry(&self, ctx: &InterceptorContext) -> Option { + tracing::info!("classifying retry for {ctx:?}"); + let classification = ctx.output_or_error().unwrap().err().and_then(|err| { + if let Some(err) = err.as_operation_error() { + tracing::info!("its an operation error: {err:?}"); + let err = err.downcast_ref::().unwrap(); + Some(RetryReason::Error(err.0)) + } else { + tracing::info!("its something else... using other classifiers"); + SmithyErrorClassifier::::new() + .classify_retry(ctx) + .or_else(|| HttpStatusCodeClassifier::default().classify_retry(ctx)) + } + }); + tracing::info!("classified as {classification:?}"); + classification + } + + fn name(&self) -> &'static str { + "test" + } +} + +async fn h1_and_h2(events: Vec, match_clause: impl Fn(&[RecordedEvent])) { + wire_level_test( + events.clone(), + |_b| {}, + ReconnectMode::ReconnectOnTransientError, + &match_clause, + ) + .await; + wire_level_test( + events, + |b| { + b.http2_only(true); + }, + ReconnectMode::ReconnectOnTransientError, + match_clause, + ) + .await; + tracing::info!("h2 ok!"); +} + +/// Repeatedly send test operation until `end_of_test` is received +/// +/// When the test is over, match_clause is evaluated +async fn wire_level_test( + events: Vec, + hyper_builder_settings: impl Fn(&mut HyperBuilder), + reconnect_mode: ReconnectMode, + match_clause: impl Fn(&[RecordedEvent]), +) { + let mut hyper_builder = hyper::Client::builder(); + hyper_builder_settings(&mut hyper_builder); + + let mock = WireMockServer::start(events).await; + let http_client = HyperClientBuilder::new() + .hyper_builder(hyper_builder) + .build(hyper::client::HttpConnector::new_with_resolver( + mock.dns_resolver(), + )); + + let operation = Operation::builder() + .service_name("test") + .operation_name("test") + .no_auth() + .endpoint_url(&mock.endpoint_url()) + .http_client(http_client) + .timeout_config( + TimeoutConfig::builder() + .operation_attempt_timeout(Duration::from_millis(100)) + .build(), + ) + .standard_retry(&RetryConfig::standard().with_reconnect_mode(reconnect_mode)) + .retry_classifiers(RetryClassifiers::new().with_classifier(TestRetryClassifier)) + .sleep_impl(TokioSleep::new()) + .with_connection_poisoning() + .serializer({ + let endpoint_url = mock.endpoint_url(); + move |_| { + let request = http::Request::builder() + .uri(endpoint_url.clone()) + // Make the body non-replayable since we don't actually want to retry + .body(SdkBody::from_dyn(BoxBody::new(SdkBody::from("body")))) + .unwrap(); + tracing::info!("serializing request: {request:?}"); + Ok(request) + } + }) + .deserializer(|response| { + tracing::info!("deserializing response: {:?}", response); + match response.status() { + s if s.is_success() => { + Ok(String::from_utf8(response.body().bytes().unwrap().into()).unwrap()) + } + s if s.is_client_error() => Err(OrchestratorError::operation(OperationError( + ErrorKind::ServerError, + ))), + s if s.is_server_error() => Err(OrchestratorError::operation(OperationError( + ErrorKind::TransientError, + ))), + _ => panic!("unexpected status: {}", response.status()), + } + }) + .build(); + + let mut iteration = 0; + loop { + tracing::info!("iteration {iteration}..."); + match operation.invoke(()).await { + Ok(resp) => { + tracing::info!("response: {:?}", resp); + if resp == END_OF_TEST { + break; + } + } + Err(e) => tracing::info!("error: {:?}", e), + } + iteration += 1; + if iteration > 50 { + panic!("probably an infinite loop; no satisfying 'end_of_test' response received"); + } + } + let events = mock.events(); + match_clause(&events); + mock.shutdown(); +} + +#[tokio::test] +async fn non_transient_errors_no_reconnect() { + let _logs = capture_test_logs(); + h1_and_h2( + vec![ + ReplayedEvent::status(400), + ReplayedEvent::with_body(END_OF_TEST), + ], + match_events!(ev!(dns), ev!(connect), ev!(http(400)), ev!(http(200))), + ) + .await +} + +#[tokio::test] +async fn reestablish_dns_on_503() { + let _logs = capture_test_logs(); + h1_and_h2( + vec![ + ReplayedEvent::status(503), + ReplayedEvent::status(503), + ReplayedEvent::status(503), + ReplayedEvent::with_body(END_OF_TEST), + ], + match_events!( + // first request + ev!(dns), + ev!(connect), + ev!(http(503)), + // second request + ev!(dns), + ev!(connect), + ev!(http(503)), + // third request + ev!(dns), + ev!(connect), + ev!(http(503)), + // all good + ev!(dns), + ev!(connect), + ev!(http(200)) + ), + ) + .await; +} + +#[tokio::test] +async fn connection_shared_on_success() { + let _logs = capture_test_logs(); + h1_and_h2( + vec![ + ReplayedEvent::ok(), + ReplayedEvent::ok(), + ReplayedEvent::status(503), + ReplayedEvent::with_body(END_OF_TEST), + ], + match_events!( + ev!(dns), + ev!(connect), + ev!(http(200)), + ev!(http(200)), + ev!(http(503)), + ev!(dns), + ev!(connect), + ev!(http(200)) + ), + ) + .await; +} + +#[tokio::test] +async fn no_reconnect_when_disabled() { + let _logs = capture_test_logs(); + wire_level_test( + vec![ + ReplayedEvent::status(503), + ReplayedEvent::with_body(END_OF_TEST), + ], + |_b| {}, + ReconnectMode::ReuseAllConnections, + match_events!(ev!(dns), ev!(connect), ev!(http(503)), ev!(http(200))), + ) + .await; +} + +#[tokio::test] +async fn connection_reestablished_after_timeout() { + let _logs = capture_test_logs(); + h1_and_h2( + vec![ + ReplayedEvent::ok(), + ReplayedEvent::Timeout, + ReplayedEvent::ok(), + ReplayedEvent::Timeout, + ReplayedEvent::with_body(END_OF_TEST), + ], + match_events!( + // first connection + ev!(dns), + ev!(connect), + ev!(http(200)), + // reuse but got a timeout + ev!(timeout), + // so we reconnect + ev!(dns), + ev!(connect), + ev!(http(200)), + ev!(timeout), + ev!(dns), + ev!(connect), + ev!(http(200)) + ), + ) + .await; +}