Skip to content

Commit

Permalink
Delete aws_smithy_http::ResolveEndpoint and point usages to service…
Browse files Browse the repository at this point in the history
…-specific trait (#3078)

## Motivation and Context
- Fixes #3043

As a follow up to #3072 this removes the old endpoint resolver
interfaces in favor of creating a per-service resolver trait.

This trait defines a `into_shared_resolver()` method which converts the
local trait into a global resolver that can be used with the
orchestrator.

## Description
<!--- Describe your changes in detail -->

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
rcoh authored Oct 18, 2023
1 parent bcfc211 commit 12fa4d3
Show file tree
Hide file tree
Showing 20 changed files with 206 additions and 458 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,15 @@ message = "The `idempotency_provider` field has been removed from config as a pu
references = ["smithy-rs#3072"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"

[[smithy-rs]]
message = "The `config::Builder::endpoint_resolver` method no longer accepts `&'static str`. Use `config::Builder::endpoint_url` instead."
references = ["smithy-rs#3078"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"

[[smithy-rs]]
message = "**This change has [detailed upgrade guidance](https://github.com/awslabs/smithy-rs/discussions/3079).** <br><br>The endpoint interfaces from `aws-smithy-http` have been removed. Service-specific endpoint resolver traits have been added."
references = ["smithy-rs#3043", "smithy-rs#3078"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"
59 changes: 34 additions & 25 deletions aws/rust-runtime/aws-inlineable/src/endpoint_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
use aws_smithy_async::future::BoxFuture;
use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
use aws_smithy_async::time::SharedTimeSource;
use aws_smithy_http::endpoint::{ResolveEndpoint, ResolveEndpointError};
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::endpoint::{
EndpointFuture, EndpointResolverParams, ResolveEndpoint,
};
use aws_smithy_types::endpoint::Endpoint;
use std::fmt::{Debug, Formatter};
use std::future::Future;
Expand All @@ -20,11 +23,9 @@ use tokio::sync::oneshot::{Receiver, Sender};
/// Endpoint reloader
#[must_use]
pub struct ReloadEndpoint {
loader: Box<
dyn Fn() -> BoxFuture<'static, (Endpoint, SystemTime), ResolveEndpointError> + Send + Sync,
>,
loader: Box<dyn Fn() -> BoxFuture<'static, (Endpoint, SystemTime), BoxError> + Send + Sync>,
endpoint: Arc<Mutex<Option<ExpiringEndpoint>>>,
error: Arc<Mutex<Option<ResolveEndpointError>>>,
error: Arc<Mutex<Option<BoxError>>>,
rx: Receiver<()>,
sleep: SharedAsyncSleep,
time: SharedTimeSource,
Expand Down Expand Up @@ -79,14 +80,14 @@ impl ReloadEndpoint {

#[derive(Debug, Clone)]
pub(crate) struct EndpointCache {
error: Arc<Mutex<Option<ResolveEndpointError>>>,
error: Arc<Mutex<Option<BoxError>>>,
endpoint: Arc<Mutex<Option<ExpiringEndpoint>>>,
// When the sender is dropped, this allows the reload loop to stop
_drop_guard: Arc<Sender<()>>,
}

impl<T> ResolveEndpoint<T> for EndpointCache {
fn resolve_endpoint(&self, _params: &T) -> aws_smithy_http::endpoint::Result {
impl ResolveEndpoint for EndpointCache {
fn resolve_endpoint<'a>(&'a self, _params: &'a EndpointResolverParams) -> EndpointFuture<'a> {
self.resolve_endpoint()
}
}
Expand All @@ -111,9 +112,9 @@ pub(crate) async fn create_cache<F>(
loader_fn: impl Fn() -> F + Send + Sync + 'static,
sleep: SharedAsyncSleep,
time: SharedTimeSource,
) -> Result<(EndpointCache, ReloadEndpoint), ResolveEndpointError>
) -> Result<(EndpointCache, ReloadEndpoint), BoxError>
where
F: Future<Output = Result<(Endpoint, SystemTime), ResolveEndpointError>> + Send + 'static,
F: Future<Output = Result<(Endpoint, SystemTime), BoxError>> + Send + 'static,
{
let error_holder = Arc::new(Mutex::new(None));
let endpoint_holder = Arc::new(Mutex::new(None));
Expand All @@ -135,25 +136,24 @@ where
reloader.reload_once().await;
// if we didn't successfully get an endpoint, bail out so the client knows
// configuration failed to work
cache.resolve_endpoint()?;
cache.resolve_endpoint().await?;
Ok((cache, reloader))
}

impl EndpointCache {
fn resolve_endpoint(&self) -> aws_smithy_http::endpoint::Result {
fn resolve_endpoint(&self) -> EndpointFuture<'_> {
tracing::trace!("resolving endpoint from endpoint discovery cache");
self.endpoint
let ep = self
.endpoint
.lock()
.unwrap()
.as_ref()
.map(|e| e.endpoint.clone())
.ok_or_else(|| {
self.error
.lock()
.unwrap()
.take()
.unwrap_or_else(|| ResolveEndpointError::message("no endpoint loaded"))
})
let error: Option<BoxError> = self.error.lock().unwrap().take();
error.unwrap_or_else(|| "Failed to resolve endpoint".into())
});
EndpointFuture::ready(ep)
}
}

Expand Down Expand Up @@ -215,21 +215,21 @@ mod test {
.await
.expect("returns an endpoint");
assert_eq!(
cache.resolve_endpoint().expect("ok").url(),
cache.resolve_endpoint().await.expect("ok").url(),
"http://foo.com/1"
);
// 120 second buffer
reloader
.reload_increment(expiry - Duration::from_secs(240))
.await;
assert_eq!(
cache.resolve_endpoint().expect("ok").url(),
cache.resolve_endpoint().await.expect("ok").url(),
"http://foo.com/1"
);

reloader.reload_increment(expiry).await;
assert_eq!(
cache.resolve_endpoint().expect("ok").url(),
cache.resolve_endpoint().await.expect("ok").url(),
"http://foo.com/2"
);
}
Expand Down Expand Up @@ -266,18 +266,27 @@ mod test {
gate.expect_sleep().await.duration(),
Duration::from_secs(60)
);
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/1");
assert_eq!(
cache.resolve_endpoint().await.unwrap().url(),
"http://foo.com/1"
);
// t = 60

let sleep = gate.expect_sleep().await;
// we're still holding the drop guard, so we haven't expired yet.
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/1");
assert_eq!(
cache.resolve_endpoint().await.unwrap().url(),
"http://foo.com/1"
);
assert_eq!(sleep.duration(), Duration::from_secs(60));
sleep.allow_progress();
// t = 120

let sleep = gate.expect_sleep().await;
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/2");
assert_eq!(
cache.resolve_endpoint().await.unwrap().url(),
"http://foo.com/2"
);
sleep.allow_progress();

let sleep = gate.expect_sleep().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ class TimestreamDecorator : ClientCodegenDecorator {
// helper function to resolve an endpoint given a base client
rustTemplate(
"""
async fn resolve_endpoint(client: &crate::Client) -> Result<(#{Endpoint}, #{SystemTime}), #{ResolveEndpointError}> {
async fn resolve_endpoint(client: &crate::Client) -> Result<(#{Endpoint}, #{SystemTime}), #{BoxError}> {
let describe_endpoints =
client.describe_endpoints().send().await.map_err(|e| {
#{ResolveEndpointError}::from_source("failed to call describe_endpoints", e)
})?;
client.describe_endpoints().send().await?;
let endpoint = describe_endpoints.endpoints().get(0).unwrap();
let expiry = client.config().time_source().expect("checked when ep discovery was enabled").now()
+ #{Duration}::from_secs(endpoint.cache_period_in_minutes() as u64 * 60);
Expand All @@ -75,7 +73,7 @@ class TimestreamDecorator : ClientCodegenDecorator {
/// Enable endpoint discovery for this client
///
/// This method MUST be called to construct a working client.
pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{BoxError}> {
let handle = self.handle.clone();
// The original client without endpoint discover gets moved into the endpoint discovery
Expand All @@ -92,22 +90,22 @@ class TimestreamDecorator : ClientCodegenDecorator {
.expect("endpoint discovery requires the client config to have a time source"),
).await?;
let client_with_discovery = crate::Client::from_conf(
handle.conf.to_builder()
.endpoint_resolver(#{SharedEndpointResolver}::new(resolver))
.build()
);
use #{IntoShared};
let mut conf = handle.conf.to_builder();
conf.set_endpoint_resolver(Some(resolver.into_shared()));
let client_with_discovery = crate::Client::from_conf(conf.build());
Ok((client_with_discovery, reloader))
}
}
""",
*RuntimeType.preludeScope,
"Arc" to RuntimeType.Arc,
"Duration" to RuntimeType.std.resolve("time::Duration"),
"SharedEndpointResolver" to RuntimeType.smithyHttp(codegenContext.runtimeConfig)
.resolve("endpoint::SharedEndpointResolver"),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
"endpoint_discovery" to endpointDiscovery.toType(),
"BoxError" to RuntimeType.boxError(codegenContext.runtimeConfig),
"IntoShared" to RuntimeType.smithyRuntimeApi(codegenContext.runtimeConfig).resolve("shared::IntoShared"),
*Types(codegenContext.runtimeConfig).toArray(),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ClientModuleDocProvider(
ClientRustModule.Config.endpoint -> strDoc("Types needed to configure endpoint resolution.")
ClientRustModule.Config.retry -> strDoc("Retry configuration.")
ClientRustModule.Config.timeout -> strDoc("Timeout configuration.")
ClientRustModule.Config.interceptors -> strDoc("Types needed to implement [`Interceptor`](crate::config::Interceptor).")
ClientRustModule.Config.interceptors -> strDoc("Types needed to implement [`Intercept`](crate::config::Intercept).")
ClientRustModule.Error -> strDoc("Common errors and error handling utilities.")
ClientRustModule.Operation -> strDoc("All operations that this crate can perform.")
ClientRustModule.Meta -> strDoc("Information about this crate.")
Expand Down
Loading

0 comments on commit 12fa4d3

Please sign in to comment.