Skip to content

Commit

Permalink
rfc43: fix identity cache partition (#3566)
Browse files Browse the repository at this point in the history
## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
Original issue: #3427
RFC: #3544

## Description
<!--- Describe your changes in detail -->

Fixes the `SharedCredentialsProvider` and `SharedTokenProvider` to
re-use a consistent cache partition by default.
`SdkConfig` does not create an identity cache by default still, that
will need to be a follow on PR that gates it behind a new behavior
version. Ideally we do that with the stalled stream protection work in
#3527


## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
Added new unit and integration tests.

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [X] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates


----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
aajtodd authored Apr 10, 2024
1 parent d1bbd01 commit 692cdfe
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 10 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@
# references = ["smithy-rs#920"]
# meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client | server | all"}
# author = "rcoh"
[[aws-sdk-rust]]
message = """
Fixes the identity resolver types (`credentials_provider()` and `token_provider()`) from `SdkConfig` to have
a consistent identity cache partition when re-used across different clients.
"""
references = ["smithy-rs#3427"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd"]


[[smithy-rs]]
message = """
`SharedIdentityResolver` now respects an existing cache partition when the `ResolveIdentity` implementation
provides one already.
"""
references = ["smithy-rs#3427"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd"]

[[smithy-rs]]
message = """
Stalled stream protection now supports request upload streams. It is currently off by default, but will be enabled by default in a future release. To enable it now, you can do the following:
Expand Down
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-credential-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-credential-types"
version = "1.1.8"
version = "1.2.0"
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>"]
description = "Types for AWS SDK credentials."
edition = "2021"
Expand Down
34 changes: 30 additions & 4 deletions aws/rust-runtime/aws-credential-types/src/provider/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ construct credentials from hardcoded values.
//! ```
use crate::Credentials;
use aws_smithy_runtime_api::client::identity::{Identity, IdentityFuture, ResolveIdentity};
use aws_smithy_runtime_api::client::identity::{
Identity, IdentityCachePartition, IdentityFuture, ResolveIdentity,
};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use std::sync::Arc;
Expand Down Expand Up @@ -124,15 +126,15 @@ impl ProvideCredentials for Arc<dyn ProvideCredentials> {
/// Newtype wrapper around ProvideCredentials that implements Clone using an internal
/// Arc.
#[derive(Clone, Debug)]
pub struct SharedCredentialsProvider(Arc<dyn ProvideCredentials>);
pub struct SharedCredentialsProvider(Arc<dyn ProvideCredentials>, IdentityCachePartition);

impl SharedCredentialsProvider {
/// Create a new SharedCredentials provider from `ProvideCredentials`
///
/// The given provider will be wrapped in an internal `Arc`. If your
/// provider is already in an `Arc`, use `SharedCredentialsProvider::from(provider)` instead.
pub fn new(provider: impl ProvideCredentials + 'static) -> Self {
Self(Arc::new(provider))
Self(Arc::new(provider), IdentityCachePartition::new())
}
}

Expand All @@ -144,7 +146,7 @@ impl AsRef<dyn ProvideCredentials> for SharedCredentialsProvider {

impl From<Arc<dyn ProvideCredentials>> for SharedCredentialsProvider {
fn from(provider: Arc<dyn ProvideCredentials>) -> Self {
SharedCredentialsProvider(provider)
SharedCredentialsProvider(provider, IdentityCachePartition::new())
}
}

Expand Down Expand Up @@ -173,4 +175,28 @@ impl ResolveIdentity for SharedCredentialsProvider {
fn fallback_on_interrupt(&self) -> Option<Identity> {
ProvideCredentials::fallback_on_interrupt(self).map(|creds| creds.into())
}

fn cache_partition(&self) -> Option<IdentityCachePartition> {
Some(self.1)
}
}

#[cfg(test)]
mod tests {
use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;

use super::*;

#[test]
fn reuses_cache_partition() {
let creds = Credentials::new("AKID", "SECRET", None, None, "test");
let provider = SharedCredentialsProvider::new(creds);
let partition = provider.cache_partition();
assert!(partition.is_some());

let identity_resolver = SharedIdentityResolver::new(provider);
let identity_partition = identity_resolver.cache_partition();

assert!(partition.unwrap() == identity_partition);
}
}
32 changes: 28 additions & 4 deletions aws/rust-runtime/aws-credential-types/src/provider/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use crate::{provider::error::TokenError, provider::future, Token};
use aws_smithy_runtime_api::client::{
identity::{IdentityFuture, ResolveIdentity},
identity::{IdentityCachePartition, IdentityFuture, ResolveIdentity},
runtime_components::RuntimeComponents,
};
use aws_smithy_runtime_api::impl_shared_conversions;
Expand Down Expand Up @@ -45,15 +45,15 @@ impl ProvideToken for Token {
///
/// Newtype wrapper around [`ProvideToken`] that implements `Clone` using an internal `Arc`.
#[derive(Clone, Debug)]
pub struct SharedTokenProvider(Arc<dyn ProvideToken>);
pub struct SharedTokenProvider(Arc<dyn ProvideToken>, IdentityCachePartition);

impl SharedTokenProvider {
/// Create a new [`SharedTokenProvider`] from [`ProvideToken`].
///
/// The given provider will be wrapped in an internal `Arc`. If your
/// provider is already in an `Arc`, use `SharedTokenProvider::from(provider)` instead.
pub fn new(provider: impl ProvideToken + 'static) -> Self {
Self(Arc::new(provider))
Self(Arc::new(provider), IdentityCachePartition::new())
}
}

Expand All @@ -65,7 +65,7 @@ impl AsRef<dyn ProvideToken> for SharedTokenProvider {

impl From<Arc<dyn ProvideToken>> for SharedTokenProvider {
fn from(provider: Arc<dyn ProvideToken>) -> Self {
SharedTokenProvider(provider)
SharedTokenProvider(provider, IdentityCachePartition::new())
}
}

Expand All @@ -86,6 +86,30 @@ impl ResolveIdentity for SharedTokenProvider {
) -> IdentityFuture<'a> {
IdentityFuture::new(async move { Ok(self.provide_token().await?.into()) })
}

fn cache_partition(&self) -> Option<IdentityCachePartition> {
Some(self.1)
}
}

impl_shared_conversions!(convert SharedTokenProvider from ProvideToken using SharedTokenProvider::new);

#[cfg(test)]
mod tests {
use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;

use super::*;

#[test]
fn reuses_cache_partition() {
let token = Token::new("token", None);
let provider = SharedTokenProvider::new(token);
let partition = provider.cache_partition();
assert!(partition.is_some());

let identity_resolver = SharedIdentityResolver::new(provider);
let identity_partition = identity_resolver.cache_partition();

assert!(partition.unwrap() == identity_partition);
}
}
119 changes: 119 additions & 0 deletions aws/sdk/integration-tests/s3/tests/identity-cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;

use aws_config::{identity::IdentityCache, BehaviorVersion, Region};
use aws_credential_types::{
provider::{future::ProvideCredentials as ProvideCredentialsFuture, ProvideCredentials},
Credentials,
};
use aws_sdk_s3::Client;
use aws_smithy_runtime::client::http::test_util::infallible_client_fn;

// NOTE: These tests are _not_ S3 specific and would apply to any AWS SDK but due to the need to consume `aws-config`
// (which depends on relocated runtime crates) we can't make this an `awsSdkIntegrationTest(..)`.

#[tokio::test]
async fn test_identity_cache_reused_by_default() {
let http_client =
infallible_client_fn(|_req| http::Response::builder().status(200).body("OK!").unwrap());

let provider = TestCredProvider::new();
let cache = IdentityCache::lazy().build();
let config = aws_config::defaults(BehaviorVersion::latest())
.http_client(http_client)
.credentials_provider(provider.clone())
// TODO(rfc-43) - remove adding a cache when this is the new default
.identity_cache(cache)
.region(Region::new("us-west-2"))
.load()
.await;

let c1 = Client::new(&config);
let _ = c1.list_buckets().send().await;
assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));

let c2 = Client::new(&config);
let _ = c2.list_buckets().send().await;
assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));
}

// TODO(rfc-43) - add no_identity_cache() to ConfigLoader and re-enable test
// #[tokio::test]
// async fn test_identity_cache_explicit_unset() {
// let http_client =
// infallible_client_fn(|_req| http::Response::builder().status(200).body("OK!").unwrap());
//
// let provider = TestCredProvider::new();
//
// let config = aws_config::defaults(BehaviorVersion::latest())
// .no_identity_cache()
// .http_client(http_client)
// .credentials_provider(provider.clone())
// .region(Region::new("us-west-2"))
// .load()
// .await;
//
// let c1 = Client::new(&config);
// let _ = c1.list_buckets().send().await;
// assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));
//
// let c2 = Client::new(&config);
// let _ = c2.list_buckets().send().await;
// assert_eq!(2, provider.invoke_count.load(Ordering::SeqCst));
// }

#[tokio::test]
async fn test_identity_cache_ga_behavior_version() {
let http_client =
infallible_client_fn(|_req| http::Response::builder().status(200).body("OK!").unwrap());

let provider = TestCredProvider::new();

// no cache is defined in this behavior version by default so each client should get their own
let config = aws_config::defaults(BehaviorVersion::v2023_11_09())
.http_client(http_client)
.credentials_provider(provider.clone())
.region(Region::new("us-west-2"))
.load()
.await;

let c1 = Client::new(&config);
let _ = c1.list_buckets().send().await;
assert_eq!(1, provider.invoke_count.load(Ordering::SeqCst));

let c2 = Client::new(&config);
let _ = c2.list_buckets().send().await;
assert_eq!(2, provider.invoke_count.load(Ordering::SeqCst));
}

#[derive(Clone, Debug)]
struct TestCredProvider {
invoke_count: Arc<AtomicI32>,
creds: Credentials,
}

impl TestCredProvider {
fn new() -> Self {
TestCredProvider {
invoke_count: Arc::new(AtomicI32::default()),
creds: Credentials::for_tests(),
}
}
}

impl ProvideCredentials for TestCredProvider {
fn provide_credentials<'a>(
&'a self,
) -> aws_credential_types::provider::future::ProvideCredentials<'a>
where
Self: 'a,
{
self.invoke_count.fetch_add(1, Ordering::SeqCst);
ProvideCredentialsFuture::ready(Ok(self.creds.clone()))
}
}
20 changes: 19 additions & 1 deletion rust-runtime/aws-smithy-runtime-api/src/client/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ pub trait ResolveIdentity: Send + Sync + Debug {
fn cache_location(&self) -> IdentityCacheLocation {
IdentityCacheLocation::RuntimeComponents
}

/// Returns the identity cache partition associated with this identity resolver.
///
/// By default this returns `None` and cache partitioning is left up to `SharedIdentityResolver`.
fn cache_partition(&self) -> Option<IdentityCachePartition> {
None
}
}

/// Cache location for identity caching.
Expand Down Expand Up @@ -195,9 +202,16 @@ pub struct SharedIdentityResolver {
impl SharedIdentityResolver {
/// Creates a new [`SharedIdentityResolver`] from the given resolver.
pub fn new(resolver: impl ResolveIdentity + 'static) -> Self {
// NOTE: `IdentityCachePartition` is globally unique by construction so even
// custom implementations of `ResolveIdentity::cache_partition()` are unique.
let partition = match resolver.cache_partition() {
Some(p) => p,
None => IdentityCachePartition::new(),
};

Self {
inner: Arc::new(resolver),
cache_partition: IdentityCachePartition::new(),
cache_partition: partition,
}
}

Expand All @@ -222,6 +236,10 @@ impl ResolveIdentity for SharedIdentityResolver {
fn cache_location(&self) -> IdentityCacheLocation {
self.inner.cache_location()
}

fn cache_partition(&self) -> Option<IdentityCachePartition> {
Some(self.cache_partition())
}
}

impl_shared_conversions!(convert SharedIdentityResolver from ResolveIdentity using SharedIdentityResolver::new);
Expand Down

0 comments on commit 692cdfe

Please sign in to comment.