diff --git a/tower-async-http/CHANGELOG.md b/tower-async-http/CHANGELOG.md index addccda..9d4b0ab 100644 --- a/tower-async-http/CHANGELOG.md +++ b/tower-async-http/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.2.1 (December 19, 2023) + +- Update Axum example to make use of version `axum == 0.7.2`; + ## 0.2.0 (November 20, 2023) - Update to http-body 1.0; diff --git a/tower-async-http/Cargo.toml b/tower-async-http/Cargo.toml index 818f2ed..c2dba4f 100644 --- a/tower-async-http/Cargo.toml +++ b/tower-async-http/Cargo.toml @@ -4,7 +4,7 @@ description = """ Tower Async middleware and utilities for HTTP clients and servers. An "Async Trait" fork from the original Tower Library. """ -version = "0.2.0" +version = "0.2.1" authors = ["Glen De Cauwsemaecker "] edition = "2021" license = "MIT" @@ -42,7 +42,7 @@ tracing = { version = "0.1", default_features = false, optional = true } uuid = { version = "1.0", features = ["v4"], optional = true } [dev-dependencies] -axum = { git = "https://github.com/tokio-rs/axum", branch = "david/hyper-1.0-rc.x" } +axum = "0.7.2" brotli = "3" bytes = "1" clap = { version = "4.3", features = ["derive"] } @@ -55,7 +55,7 @@ serde_json = "1.0" sync_wrapper = "0.1" tokio = { version = "1", features = ["full"] } tower = { version = "0.4", features = ["util", "make", "timeout"] } -tower-async = { path = "../tower-async", features = ["full"] } +tower-async = { path = "../tower-async", features = ["full", "nightly"] } tower-async-bridge = { path = "../tower-async-bridge", features = ["full"] } tower-async-http = { path = ".", features = ["full"] } tower-async-hyper = { path = "../tower-async-hyper" } diff --git a/tower-async-http/examples/hyper-http-router/main.rs b/tower-async-http/examples/hyper-http-router/main.rs new file mode 100644 index 0000000..17ca57b --- /dev/null +++ b/tower-async-http/examples/hyper-http-router/main.rs @@ -0,0 +1,369 @@ +use std::{ + collections::HashMap, + convert::Infallible, + net::{Ipv4Addr, SocketAddr}, + sync::Arc, + time::Duration, +}; + +use bytes::Bytes; +use clap::Parser; +use http::{ + header::{self, CONTENT_TYPE}, + HeaderValue, Method, StatusCode, +}; +use http_body_util::Full; +use hyper::{Request, Response}; +use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server::conn::auto::Builder, +}; +use tokio::net::TcpListener; +use tower_async::{ + limit::policy::{ConcurrentPolicy, LimitReached}, + service_fn, + util::BoxCloneService, + BoxError, Service, ServiceBuilder, ServiceExt, +}; +use tower_async_http::{ + trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, + LatencyUnit, ServiceBuilderExt, +}; +use tower_async_hyper::{HyperBody, TowerHyperServiceExt}; + +/// Simple Hyper server with an HTTP API +#[derive(Debug, Parser)] +struct Config { + /// The port to listen on + #[clap(short = 'p', long, default_value = "8080")] + port: u16, +} + +pub type WebRequest = Request; +pub type WebResponse = Response>; + +pub trait IntoWebResponse { + fn into_web_response(self) -> WebResponse; +} + +impl IntoWebResponse for WebResponse { + fn into_web_response(self) -> WebResponse { + self + } +} + +impl IntoWebResponse for Infallible { + fn into_web_response(self) -> WebResponse { + panic!("BUG"); + } +} + +impl IntoWebResponse for StatusCode { + fn into_web_response(self) -> WebResponse { + Response::builder() + .status(self) + .body(Full::default()) + .expect("the web response to be build") + } +} + +impl IntoWebResponse for &'static str { + fn into_web_response(self) -> WebResponse { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, HeaderValue::from_static("text/plain")) + .body(Full::from(self)) + .expect("the web response to be build") + } +} + +impl IntoWebResponse for String { + fn into_web_response(self) -> WebResponse { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, HeaderValue::from_static("text/plain")) + .body(Full::from(self)) + .expect("the web response to be build") + } +} + +#[derive(Debug, Clone, Default)] +pub struct UriParams { + params: Option>, +} + +impl UriParams { + pub fn insert(&mut self, name: String, value: String) { + self.params + .get_or_insert_with(HashMap::new) + .insert(name, value); + } + + pub fn get(&self, name: impl AsRef) -> Option<&str> { + self.params + .as_ref() + .and_then(|params| params.get(name.as_ref())) + .map(String::as_str) + } +} + +#[derive(Debug)] +struct RouterEndpoint { + matcher: EndpointMatcher, + service: BoxCloneService, +} + +impl RouterEndpoint { + pub(crate) fn new( + method: Method, + path: &'static str, + service: BoxCloneService, + ) -> Self { + Self { + matcher: EndpointMatcher::new(method, path), + service, + } + } +} + +#[derive(Debug)] +enum PathFragment { + Literal(&'static str), + Param(&'static str), + // Note if you also want to support some kind of Glob (*) stuff, you can also do that, + // but let's keep it as simple as possible +} + +#[derive(Debug)] +struct EndpointMatcher { + fragments: Vec, + method: Method, +} + +impl EndpointMatcher { + pub fn new(method: Method, path: &'static str) -> Self { + let fragments: Vec = path + .split('/') + .filter_map(|s| { + if s.is_empty() { + return None; + } + if s.starts_with(':') { + Some(PathFragment::Param(s.trim_start_matches(':'))) + } else { + Some(PathFragment::Literal(s)) + } + }) + .collect(); + Self { fragments, method } + } + + pub fn match_request(&self, method: &Method, path: &str) -> Option { + if method != self.method { + return None; + } + + let fragments_iter = self + .fragments + .iter() + .map(Some) + .chain(std::iter::repeat(None)); + + let mut params = UriParams::default(); + + for (segment, fragment) in path.split('/').map(Some).zip(fragments_iter) { + match (segment, fragment) { + (Some(segment), Some(fragment)) => match fragment { + PathFragment::Literal(literal) => { + if !literal.eq_ignore_ascii_case(segment) { + return None; + } + } + PathFragment::Param(name) => { + params.insert(name.to_string(), segment.to_string()); + } + }, + (None, None) => { + break; + } + _ => { + return None; + } + } + } + + Some(params) + } +} + +#[derive(Debug, Default)] +pub struct Router { + endpoints: Vec, +} + +impl Router { + // NOTE: you would not change this function signature since my original PR, + // I Only changed this to make my example work + pub fn on(&mut self, method: Method, endpoint: &'static str, f: F) + where + F: Fn(WebRequest) -> Fut + Clone + Send + Sync + 'static, + Fut: std::future::Future> + Send + Sync + 'static, + E: IntoWebResponse + Send + Sync + 'static, + O: IntoWebResponse + Send + Sync + 'static, + { + let svc = service_fn(f) + .map_response(IntoWebResponse::into_web_response) + .map_err(IntoWebResponse::into_web_response); + let svc = BoxCloneService::new(svc); + self.endpoints + .push(RouterEndpoint::new(method, endpoint, svc)); + } + + pub fn into_service(self) -> RouterService { + RouterService { + endpoints: Arc::new(self.endpoints), + } + } +} + + +#[derive(Debug, Default)] +pub struct RouterService { + endpoints: Arc>, +} + +impl RouterService { + fn find_cloned_endpoint_service(&self, req: &mut WebRequest) -> Option> { + let method = req.method(); + let path = req.uri().path().trim_matches('/'); + + for endpoint in self.endpoints.iter() { + if let Some(params) = endpoint.matcher.match_request(method, path.as_ref()) { + req.extensions_mut().insert(params); + return Some(endpoint.service.clone()); + } + } + None + } +} + +impl Clone for RouterService { + fn clone(&self) -> Self { + Self { + endpoints: self.endpoints.clone(), + } + } +} + +impl Service for RouterService { + type Response = WebResponse; + type Error = Infallible; + + fn call( + &self, + mut req: WebRequest, + ) -> impl std::future::Future> + { + let maybe_service = self.find_cloned_endpoint_service(&mut req); + async move { + match maybe_service { + Some(service) => { + return match service.call(req).await { + Ok(res) => Ok(res), + Err(err) => Ok(err.into_web_response()), + } + } + None => Ok(StatusCode::NOT_FOUND.into_web_response()) + } + } + } +} + +async fn render_page_fast(_request: WebRequest) -> Result { + Ok(render_page("This was a fast response.")) +} + +async fn render_page_slow(_request: WebRequest) -> Result { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Ok(render_page("This was a slow response.")) +} + +fn render_page(msg: &str) -> String { + format!( + r##" + + + + + Hyper Http Server Example + + +

Hello!

+

{msg}

+ + +"## + ) +} + +#[tokio::main] +async fn main() { + // Setup tracing + tracing_subscriber::fmt::init(); + + // Parse command line arguments + let config = Config::parse(); + + let sensitive_headers: Arc<[_]> = vec![header::AUTHORIZATION, header::COOKIE].into(); + + let mut router = Router::default(); + router.on(Method::GET, "/fast", render_page_fast); + router.on(Method::GET, "/slow", render_page_slow); + + let web_service = ServiceBuilder::new() + .map_request_body(HyperBody::from) + .compression() + .sensitive_request_headers(sensitive_headers.clone()) + .layer( + TraceLayer::new_for_http() + .on_body_chunk(|chunk: &Bytes, latency: Duration, _: &tracing::Span| { + tracing::trace!(size_bytes = chunk.len(), latency = ?latency, "sending body chunk") + }) + .make_span_with(DefaultMakeSpan::new().include_headers(true)) + .on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)), + ) + .sensitive_response_headers(sensitive_headers) + .timeout(Duration::from_secs(10)) + .map_result(map_limit_result) + .limit(ConcurrentPolicy::new(1)) + .service(router.into_service()) + .into_hyper_service(); + + let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, config.port)); + tracing::info!("Listening on {}", addr); + + let listener = TcpListener::bind(addr).await.unwrap(); + + loop { + let (stream, _) = listener.accept().await.unwrap(); + let service = web_service.clone(); + tokio::spawn(async move { + let stream = TokioIo::new(stream); + let result = Builder::new(TokioExecutor::new()) + .serve_connection(stream, service) + .await; + if let Err(e) = result { + eprintln!("server connection error: {}", e); + } + }); + } +} + +fn map_limit_result(result: Result) -> Result { + if let Err(err) = &result { + if err.is::() { + return Ok(StatusCode::TOO_MANY_REQUESTS.into_web_response()); + } + } + result +} diff --git a/tower-async/CHANGELOG.md b/tower-async/CHANGELOG.md index b28a557..687fad7 100644 --- a/tower-async/CHANGELOG.md +++ b/tower-async/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.2.1 (December 19, 2023) + +- Reintroduce boxed support for Services (`util::{BoxService, BoxCloneService}`): + - `ServiceBuilder::{boxed, boxed_clone}`; + - `ServiceExt::{boxed, boxed_clone}`; + - Achieved using erase concept as defined in . + ## 0.2.0 (November 20, 2023) - Adapt to new `tower_async::Service` contract: diff --git a/tower-async/Cargo.toml b/tower-async/Cargo.toml index 876b603..e8f9713 100644 --- a/tower-async/Cargo.toml +++ b/tower-async/Cargo.toml @@ -6,7 +6,7 @@ name = "tower-async" # - README.md # - Update CHANGELOG.md. # - Create "vX.X.X" git tag. -version = "0.2.0" +version = "0.2.1" authors = ["Glen De Cauwsemaecker "] license = "MIT" readme = "README.md" @@ -43,6 +43,8 @@ timeout = ["tokio/time", "tokio/macros"] util = ["__common", "futures-util"] util-tokio = ["util", "tokio/time"] +nightly = [] + [dependencies] tower-async-layer = { version = "0.2", path = "../tower-async-layer" } tower-async-service = { version = "0.2", path = "../tower-async-service" } diff --git a/tower-async/src/builder/mod.rs b/tower-async/src/builder/mod.rs index 7d465f1..4a48e35 100644 --- a/tower-async/src/builder/mod.rs +++ b/tower-async/src/builder/mod.rs @@ -522,6 +522,127 @@ impl ServiceBuilder { } } +#[cfg(feature = "nightly")] +impl ServiceBuilder { + /// This wraps the inner service with the [`Layer`] returned by [`BoxService::layer()`]. + /// + /// See that method for more details. + /// + /// # Example + /// + /// ``` + /// use tower_async::{Service, ServiceBuilder, BoxError, util::BoxService}; + /// use std::time::Duration; + /// # + /// # struct Request; + /// # struct Response; + /// # impl Response { + /// # fn new() -> Self { Self } + /// # } + /// + /// let service: BoxService = ServiceBuilder::new() + /// .boxed() + /// .timeout(Duration::from_secs(10)) + /// .service_fn(|req: Request| async { + /// Ok::<_, BoxError>(Response::new()) + /// }); + /// # let service = assert_service(service); + /// # fn assert_service(svc: S) -> S + /// # where S: Service { svc } + /// ``` + /// + /// [`BoxService::layer()`]: crate::util::BoxService::layer() + #[cfg(all(feature = "util", feature = "nightly"))] + pub fn boxed( + self, + ) -> ServiceBuilder< + Stack< + tower_async_layer::LayerFn< + fn( + L::Service, + ) -> crate::util::BoxService< + R, + >::Response, + >::Error, + >, + >, + L, + >, + > + where + L: Layer, + L::Service: Service + Send + Sync + 'static, + >::Response: Send + Sync + 'static, + >::Error: Send + Sync + 'static, + R: Send + 'static, + { + self.layer(crate::util::BoxService::layer()) + } + + /// This wraps the inner service with the [`Layer`] returned by [`BoxCloneService::layer()`]. + /// + /// This is similar to the [`boxed`] method, but it requires that `Self` implement + /// [`Clone`], and the returned boxed service implements [`Clone`]. + /// + /// See [`BoxCloneService`] for more details. + /// + /// # Example + /// + /// ``` + /// use tower_async::{Service, ServiceBuilder, BoxError, util::BoxCloneService}; + /// use std::time::Duration; + /// # + /// # struct Request; + /// # struct Response; + /// # impl Response { + /// # fn new() -> Self { Self } + /// # } + /// + /// let service: BoxCloneService = ServiceBuilder::new() + /// .boxed_clone() + /// .timeout(Duration::from_secs(10)) + /// .service_fn(|req: Request| async { + /// Ok::<_, BoxError>(Response::new()) + /// }); + /// # let service = assert_service(service); + /// + /// // The boxed service can still be cloned. + /// service.clone(); + /// # fn assert_service(svc: S) -> S + /// # where S: Service { svc } + /// ``` + /// + /// [`BoxCloneService::layer()`]: crate::util::BoxCloneService::layer() + /// [`BoxCloneService`]: crate::util::BoxCloneService + /// [`boxed`]: Self::boxed + #[cfg(all(feature = "util", feature = "nightly"))] + pub fn boxed_clone( + self, + ) -> ServiceBuilder< + Stack< + tower_async_layer::LayerFn< + fn( + L::Service, + ) -> crate::util::BoxCloneService< + R, + >::Response, + >::Error, + >, + >, + L, + >, + > + where + L: Layer, + L::Service: Service + Clone + Send + Sync + 'static, + >::Response: Send + Sync + 'static, + >::Error: Send + Sync + 'static, + R: Send + 'static, + { + self.layer(crate::util::BoxCloneService::layer()) + } +} + impl fmt::Debug for ServiceBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("ServiceBuilder").field(&self.layer).finish() diff --git a/tower-async/src/lib.rs b/tower-async/src/lib.rs index 04605d2..f0fc38b 100644 --- a/tower-async/src/lib.rs +++ b/tower-async/src/lib.rs @@ -8,6 +8,10 @@ #![allow(elided_lifetimes_in_paths, clippy::type_complexity)] #![cfg_attr(test, allow(clippy::float_cmp))] #![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))] + +#![cfg_attr(feature = "nightly", allow(incomplete_features))] +#![cfg_attr(feature = "nightly", feature(associated_type_bounds, return_type_notation))] + // `rustdoc::broken_intra_doc_links` is checked on CI //! `async fn(Request) -> Result` @@ -245,6 +249,10 @@ pub mod layer; #[doc(inline)] pub use self::util::{service_fn, ServiceExt}; +#[cfg(all(feature = "util", feature = "nightly"))] +#[doc(inline)] +pub use self::util::NightlyServiceExt; + #[doc(inline)] pub use crate::builder::ServiceBuilder; #[cfg(feature = "make")] diff --git a/tower-async/src/util/boxed/erase.rs b/tower-async/src/util/boxed/erase.rs new file mode 100644 index 0000000..1a17f8d --- /dev/null +++ b/tower-async/src/util/boxed/erase.rs @@ -0,0 +1,32 @@ +use std::future::Future; +use std::pin::Pin; + +use tower_async_service::Service; + +pub trait ServiceDyn { + type Response; + type Error; + + fn call( + &self, + req: Request, + ) -> Pin> + Send + Sync + '_>>; +} + +impl ServiceDyn for T +where + T: Service + Send + Sync + 'static, + T::Response: Send + Sync + 'static, + T::Error: Send + Sync + 'static, + Request: Send + 'static, +{ + type Response = T::Response; + type Error = T::Error; + + fn call( + &self, + req: Request, + ) -> Pin> + Send + Sync + '_>> { + Box::pin(>::call(self, req)) + } +} diff --git a/tower-async/src/util/boxed/layer.rs b/tower-async/src/util/boxed/layer.rs new file mode 100644 index 0000000..039208d --- /dev/null +++ b/tower-async/src/util/boxed/layer.rs @@ -0,0 +1,98 @@ +use crate::util::BoxService; +use std::{fmt, sync::Arc}; +use tower_async_layer::{layer_fn, Layer}; +use tower_async_service::Service; + +/// A boxed [`Layer`] trait object. +/// +/// [`BoxLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself +/// and the output [`Service`] to be dynamic, while having consistent types. +/// +/// This [`Layer`] produces [`BoxService`] instances erasing the type of the +/// [`Service`] produced by the wrapped [`Layer`]. +/// +/// # Example +/// +/// `BoxLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have +/// the same types. In this example, we include a [`Timeout`] layer +/// only if an environment variable is set. We can use `BoxLayer` +/// to return a consistent type regardless of runtime configuration: +/// +/// ``` +/// use std::time::Duration; +/// use tower_async::{Service, ServiceBuilder, BoxError, util::BoxLayer}; +/// +/// fn common_layer() -> BoxLayer +/// where +/// S: Service + Send + 'static, +/// S::Error: Into + 'static, +/// T: 'static, +/// { +/// let builder = ServiceBuilder::new(); +/// +/// if std::env::var("SET_TIMEOUT").is_ok() { +/// let layer = builder +/// .timeout(Duration::from_secs(30)) +/// .into_inner(); +/// +/// BoxLayer::new(layer) +/// } else { +/// let layer = builder +/// .map_err(Into::into) +/// .into_inner(); +/// +/// BoxLayer::new(layer) +/// } +/// } +/// ``` +/// +/// [`Layer`]: tower_async_layer::Layer +/// [`Service`]: tower_async_service::Service +/// [`BoxService`]: super::BoxService +/// [`Timeout`]: crate::timeout +pub struct BoxLayer { + boxed: Arc> + Send + Sync + 'static>, +} + +impl BoxLayer { + /// Create a new [`BoxLayer`]. + pub fn new(inner_layer: L) -> Self + where + L: Layer + Send + Sync + 'static, + L::Service: Service + Send + Sync + 'static, + U: Send + Sync + 'static, + E: Send + Sync + 'static, + T: Send + 'static, + { + let layer = layer_fn(move |inner: In| { + let out = inner_layer.layer(inner); + BoxService::new(out) + }); + + Self { + boxed: Arc::new(layer), + } + } +} + +impl Layer for BoxLayer { + type Service = BoxService; + + fn layer(&self, inner: In) -> Self::Service { + self.boxed.layer(inner) + } +} + +impl Clone for BoxLayer { + fn clone(&self) -> Self { + Self { + boxed: Arc::clone(&self.boxed), + } + } +} + +impl fmt::Debug for BoxLayer { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxLayer").finish() + } +} diff --git a/tower-async/src/util/boxed/layer_clone.rs b/tower-async/src/util/boxed/layer_clone.rs new file mode 100644 index 0000000..c60d00b --- /dev/null +++ b/tower-async/src/util/boxed/layer_clone.rs @@ -0,0 +1,129 @@ +use crate::util::BoxCloneService; +use std::{fmt, sync::Arc}; +use tower_async_layer::{layer_fn, Layer}; +use tower_async_service::Service; + +/// A [`Clone`] + [`Send`] boxed [`Layer`]. +/// +/// [`BoxCloneServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself +/// and the output [`Service`] to be dynamic, while having consistent types. +/// +/// This [`Layer`] produces [`BoxCloneService`] instances erasing the type of the +/// [`Service`] produced by the wrapped [`Layer`]. +/// +/// This is similar to [`BoxLayer`](super::BoxLayer) except the layer and resulting +/// service implements [`Clone`]. +/// +/// # Example +/// +/// `BoxCloneServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have +/// the same types, when the underlying service must be clone (for example, when building a MakeService) +/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use +/// `BoxCloneService` to return a consistent type regardless of runtime configuration: +/// +/// ``` +/// use std::time::Duration; +/// use tower_async::{Service, ServiceBuilder, BoxError}; +/// use tower_async::util::{BoxCloneServiceLayer, BoxCloneService}; +/// +/// # +/// # struct Request; +/// # struct Response; +/// # impl Response { +/// # fn new() -> Self { Self } +/// # } +/// +/// fn common_layer() -> BoxCloneServiceLayer +/// where +/// S: Service + Clone + Send + 'static, +/// S::Error: Into + 'static, +/// T: 'static, +/// { +/// let builder = ServiceBuilder::new(); +/// +/// if std::env::var("SET_TIMEOUT").is_ok() { +/// let layer = builder +/// .timeout(Duration::from_secs(30)) +/// .into_inner(); +/// +/// BoxCloneServiceLayer::new(layer) +/// } else { +/// let layer = builder +/// .map_err(Into::into) +/// .into_inner(); +/// +/// BoxCloneServiceLayer::new(layer) +/// } +/// } +/// +/// // We can clone the layer (this is true of BoxLayer as well) +/// let boxed_clone_layer = common_layer(); +/// +/// let cloned_layer = boxed_clone_layer.clone(); +/// +/// // Using the `BoxCloneServiceLayer` we can create a `BoxCloneService` +/// let service: BoxCloneService = ServiceBuilder::new().layer(boxed_clone_layer) +/// .service_fn(|req: Request| async { +/// Ok::<_, BoxError>(Response::new()) +/// }); +/// +/// # let service = assert_service(service); +/// +/// // And we can still clone the service +/// let cloned_service = service.clone(); +/// # +/// # fn assert_service(svc: S) -> S +/// # where S: Service { svc } +/// +/// ``` +/// +/// [`Layer`]: tower_async_layer::Layer +/// [`Service`]: tower_async_service::Service +/// [`BoxService`]: super::BoxService +/// [`Timeout`]: crate::timeout +pub struct BoxCloneServiceLayer { + boxed: Arc> + Send + 'static>, +} + +impl BoxCloneServiceLayer { + /// Create a new [`BoxCloneServiceLayer`]. + pub fn new(inner_layer: L) -> Self + where + L: Layer + Send + 'static, + L::Service: Service + Send + Sync + Clone + 'static, + U: Send + Sync + 'static, + E: Send + Sync + 'static, + T: Send + 'static, + { + let layer = layer_fn(move |inner: In| { + let out = inner_layer.layer(inner); + BoxCloneService::new(out) + }); + + Self { + boxed: Arc::new(layer), + } + } +} + +impl Layer for BoxCloneServiceLayer { + type Service = BoxCloneService; + + fn layer(&self, inner: In) -> Self::Service { + self.boxed.layer(inner) + } +} + +impl Clone for BoxCloneServiceLayer { + fn clone(&self) -> Self { + Self { + boxed: Arc::clone(&self.boxed), + } + } +} + +impl fmt::Debug for BoxCloneServiceLayer { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxCloneServiceLayer").finish() + } +} diff --git a/tower-async/src/util/boxed/mod.rs b/tower-async/src/util/boxed/mod.rs new file mode 100644 index 0000000..bd8ef94 --- /dev/null +++ b/tower-async/src/util/boxed/mod.rs @@ -0,0 +1,7 @@ +pub(crate) mod erase; +mod layer; +mod layer_clone; +mod sync; + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::{layer::BoxLayer, layer_clone::BoxCloneServiceLayer, sync::BoxService}; diff --git a/tower-async/src/util/boxed/sync.rs b/tower-async/src/util/boxed/sync.rs new file mode 100644 index 0000000..f77a15d --- /dev/null +++ b/tower-async/src/util/boxed/sync.rs @@ -0,0 +1,90 @@ +use super::erase::ServiceDyn; +use tower_async_layer::{layer_fn, LayerFn}; +use tower_async_service::Service; + +use std::fmt; +use std::future::Future; + +/// A boxed `Service + Send` trait object. +/// +/// [`BoxService`] turns a service into a trait object, allowing the response +/// future type to be dynamic. This type requires both the service and the +/// response future to be [`Send`]. +/// +/// If you need a boxed [`Service`] that implements [`Clone`] consider using +/// [`BoxCloneService`](crate::util::BoxCloneService). +/// +/// Dynamically dispatched [`Service`] objects allow for erasing the underlying +/// [`Service`] type and using the `Service` instances as opaque handles. This can +/// be useful when the service instance cannot be explicitly named for whatever +/// reason. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future::ready; +/// # use tower_async_service::Service; +/// # use tower_async::util::{BoxService, service_fn}; +/// // Respond to requests using a closure, but closures cannot be named... +/// # pub fn main() { +/// let svc = service_fn(|mut request: String| async move { +/// request.push_str(" response"); +/// Ok(request) +/// }); +/// +/// let service: BoxService = BoxService::new(svc); +/// # drop(service); +/// } +/// ``` +/// +/// [`Service`]: crate::Service +/// [`Rc`]: std::rc::Rc +pub struct BoxService { + inner: Box + Send + Sync + 'static>, +} + +impl BoxService { + #[allow(missing_docs)] + pub fn new(inner: S) -> Self + where + S: ServiceDyn + Send + Sync + 'static, + { + // rust can't infer the type + let inner: Box + Send + Sync + 'static> = Box::new(inner); + BoxService { inner } + } + + /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxService`] + /// middleware. + /// + /// [`Layer`]: crate::Layer + pub fn layer() -> LayerFn Self> + where + S: Service + Send + Sync + 'static, + U: Send + Sync + 'static, + E: Send + Sync + 'static, + T: Send + 'static, + { + layer_fn(Self::new) + } +} + +impl Service for BoxService + where + U: Send + Sync + 'static, + E: Send + Sync + 'static, + T: Send + 'static, +{ + type Response = U; + type Error = E; + + fn call(&self, request: T) -> impl Future> { + self.inner.call(request) + } +} + +impl fmt::Debug for BoxService { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxService").finish() + } +} diff --git a/tower-async/src/util/boxed_clone.rs b/tower-async/src/util/boxed_clone.rs new file mode 100644 index 0000000..87d2d78 --- /dev/null +++ b/tower-async/src/util/boxed_clone.rs @@ -0,0 +1,117 @@ +use std::fmt; +use std::future::Future; +use tower_async_layer::{layer_fn, LayerFn}; +use tower_async_service::Service; + +use super::boxed::erase::ServiceDyn; + +/// A [`Clone`] + [`Send`] boxed [`Service`]. +/// +/// [`BoxCloneService`] turns a service into a trait object, allowing the +/// response future type to be dynamic, and allowing the service to be cloned. +/// +/// This is similar to [`BoxService`](super::BoxService) except the resulting +/// service implements [`Clone`]. +/// +/// # Example +/// +/// ``` +/// use tower_async::{Service, ServiceBuilder, BoxError, util::BoxCloneService}; +/// use std::time::Duration; +/// # +/// # struct Request; +/// # struct Response; +/// # impl Response { +/// # fn new() -> Self { Self } +/// # } +/// +/// // This service has a complex type that is hard to name +/// let service = ServiceBuilder::new() +/// .map_request(|req| { +/// println!("received request"); +/// req +/// }) +/// .map_response(|res| { +/// println!("response produced"); +/// res +/// }) +/// .timeout(Duration::from_secs(10)) +/// .service_fn(|req: Request| async { +/// Ok::<_, BoxError>(Response::new()) +/// }); +/// # let service = assert_service(service); +/// +/// // `BoxCloneService` will erase the type so it's nameable +/// let service: BoxCloneService = BoxCloneService::new(service); +/// # let service = assert_service(service); +/// +/// // And we can still clone the service +/// let cloned_service = service.clone(); +/// # +/// # fn assert_service(svc: S) -> S +/// # where S: Service { svc } +/// ``` +pub struct BoxCloneService(Box + Send>); + +impl BoxCloneService { + /// Create a new `BoxCloneService`. + pub fn new(inner: S) -> Self + where + S: ServiceDyn + Clone + Send + 'static, + { + BoxCloneService(Box::new(inner)) + } + + /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneService`] + /// middleware. + /// + /// [`Layer`]: crate::Layer + pub fn layer() -> LayerFn Self> + where + S: Service + Clone + Send + Sync + 'static, + U: Send + Sync + 'static, + E: Send + Sync + 'static, + T: Send + 'static, + { + layer_fn(Self::new) + } +} + +impl Service for BoxCloneService { + type Response = U; + type Error = E; + + #[inline] + fn call(&self, request: T) -> impl Future> { + self.0.call(request) + } +} + +impl Clone for BoxCloneService { + fn clone(&self) -> Self { + Self(self.0.clone_box()) + } +} + +trait CloneService: ServiceDyn { + fn clone_box( + &self, + ) -> Box + Send>; +} + +impl CloneService for T +where + T: ServiceDyn + Send + Clone + 'static, +{ + fn clone_box( + &self, + ) -> Box + Send> { + Box::new(self.clone()) + } +} + +impl fmt::Debug for BoxCloneService { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxCloneService").finish() + } +} diff --git a/tower-async/src/util/mod.rs b/tower-async/src/util/mod.rs index ff155c3..9093813 100644 --- a/tower-async/src/util/mod.rs +++ b/tower-async/src/util/mod.rs @@ -3,6 +3,11 @@ mod and_then; mod either; +#[cfg(feature = "nightly")] +mod boxed; +#[cfg(feature = "nightly")] +mod boxed_clone; + mod map_err; mod map_request; mod map_response; @@ -25,6 +30,12 @@ pub use self::{ then::{Then, ThenLayer}, }; +#[cfg(feature = "nightly")] +pub use self::{ + boxed::{BoxCloneServiceLayer, BoxLayer, BoxService}, + boxed_clone::BoxCloneService, +}; + use std::future::Future; use crate::layer::util::Identity; @@ -708,6 +719,115 @@ pub trait ServiceExt: tower_async_service::Service { } } +/// An extension trait for `Service`s that provides a variety of convenient +/// adapters, available in nightly edition only +#[cfg(feature = "nightly")] +pub trait NightlyServiceExt: + tower_async_service::Service +{ + /// Convert the service into a [`Service`] + [`Send`] trait object. + /// + /// See [`BoxService`] for more details. + /// + /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method + /// can be used instead, to produce a boxed service which will also + /// implement [`Clone`]. + /// + /// # Example + /// + /// ``` + /// use tower_async::{Service, ServiceExt, BoxError, service_fn, util::BoxService}; + /// # + /// # struct Request; + /// # struct Response; + /// # impl Response { + /// # fn new() -> Self { Self } + /// # } + /// + /// let service = service_fn(|req: Request| async { + /// Ok::<_, BoxError>(Response::new()) + /// }); + /// + /// let service: BoxService = service + /// .map_request(|req| { + /// println!("received request"); + /// req + /// }) + /// .map_response(|res| { + /// println!("response produced"); + /// res + /// }) + /// .boxed(); + /// # let service = assert_service(service); + /// # fn assert_service(svc: S) -> S + /// # where S: Service { svc } + /// ``` + /// + /// [`Service`]: crate::Service + /// [`boxed_clone`]: Self::boxed_clone + fn boxed(self) -> BoxService + where + Self: Sized + Send + Sync + 'static, + Self::Response: Send + Sync + 'static, + Self::Error: Send + Sync + 'static, + Request: Send + 'static, + { + BoxService::new(self) + } + + #[cfg(feature = "nightly")] + /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object. + /// + /// This is similar to the [`boxed`] method, but it requires that `Self` implement + /// [`Clone`], and the returned boxed service implements [`Clone`]. + /// See [`BoxCloneService`] for more details. + /// + /// # Example + /// + /// ``` + /// use tower_async::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService}; + /// # + /// # struct Request; + /// # struct Response; + /// # impl Response { + /// # fn new() -> Self { Self } + /// # } + /// + /// let service = service_fn(|req: Request| async { + /// Ok::<_, BoxError>(Response::new()) + /// }); + /// + /// let service: BoxCloneService = service + /// .map_request(|req| { + /// println!("received request"); + /// req + /// }) + /// .map_response(|res| { + /// println!("response produced"); + /// res + /// }) + /// .boxed_clone(); + /// + /// // The boxed service can still be cloned. + /// service.clone(); + /// # let service = assert_service(service); + /// # fn assert_service(svc: S) -> S + /// # where S: Service { svc } + /// ``` + /// + /// [`Service`]: crate::Service + /// [`boxed`]: Self::boxed + fn boxed_clone(self) -> BoxCloneService + where + Self: Clone + Sized + Send + Sync + 'static, + Self::Response: Send + Sync + 'static, + Self::Error: Send + Sync + 'static, + Request: Send + 'static, + { + BoxCloneService::new(self) + } +} + impl ServiceExt for T where T: tower_async_service::Service {} /// Convert an `Option` into a [`Layer`].