From 0a4d81f5ba7edc94d4d6809c6f80c9db91be41e5 Mon Sep 17 00:00:00 2001 From: Dan Burkert Date: Wed, 8 Jul 2020 18:54:34 -0700 Subject: [PATCH] Add Endpoint::connect_lazy method Adds a 'lazy' constructor which will not attempt to connect to the endpoint until first use. This is useful in situations where the connection is created when the remote service may temporarily not be responding, such as server startup. Fixes #167 --- tonic/src/transport/channel/endpoint.rs | 19 +++++++++++++++++++ tonic/src/transport/channel/mod.rs | 21 ++++++++++++++++++--- tonic/src/transport/service/connection.rs | 20 ++++++++++++++++---- tonic/src/transport/service/discover.rs | 2 +- tonic/src/transport/service/reconnect.rs | 10 ++-------- 5 files changed, 56 insertions(+), 16 deletions(-) diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 3934c2ea7..3988b9153 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -214,6 +214,25 @@ impl Endpoint { Channel::connect(connector, self.clone()).await } + /// Create a channel from this config. + /// + /// The channel returned by this method does not attempt to connect to the endpoint until first + /// use. + pub fn connect_lazy(&self) -> Result { + let mut http = hyper::client::connect::HttpConnector::new(); + http.enforce_http(false); + http.set_nodelay(self.tcp_nodelay); + http.set_keepalive(self.tcp_keepalive); + + #[cfg(feature = "tls")] + let connector = service::connector(http, self.tls.clone()); + + #[cfg(not(feature = "tls"))] + let connector = service::connector(http); + + Channel::new(connector, self.clone()) + } + /// Connect with a custom connector. pub async fn connect_with_connector(&self, connector: C) -> Result where diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 773f8f20f..c8e374e34 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -130,7 +130,7 @@ impl Channel { (Self::balance(list, DEFAULT_BUFFER_SIZE), tx) } - pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result + pub(crate) fn new(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, C::Error: Into + Send, @@ -140,9 +140,24 @@ impl Channel { let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); let svc = Connection::new(connector, endpoint) - .await - .map_err(|e| super::Error::from_source(e))?; + .map_err(super::Error::from_source)?; + let svc = Buffer::new(Either::A(svc), buffer_size); + Ok(Channel { svc }) + } + + pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result + where + C: Service + Send + 'static, + C::Error: Into + Send, + C::Future: Unpin + Send, + C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + { + let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); + + let svc = Connection::connect(connector, endpoint) + .await + .map_err(super::Error::from_source)?; let svc = Buffer::new(Either::A(svc), buffer_size); Ok(Channel { svc }) diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index dcb086f09..7b6386b01 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -17,6 +17,7 @@ use tower::{ timeout::TimeoutLayer, util::BoxService, ServiceBuilder, + ServiceExt, }; use tower_load::Load; use tower_service::Service; @@ -29,7 +30,7 @@ pub(crate) struct Connection { } impl Connection { - pub(crate) async fn new(connector: C, endpoint: Endpoint) -> Result + pub(crate) fn new(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, C::Error: Into + Send, @@ -60,9 +61,8 @@ impl Connection { .optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); - let mut connector = HyperConnect::new(connector, settings); - let initial_conn = connector.call(endpoint.uri.clone()).await?; - let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone()); + let connector = HyperConnect::new(connector, settings); + let conn = Reconnect::new(connector, endpoint.uri.clone()); let inner = stack.layer(conn); @@ -70,6 +70,18 @@ impl Connection { inner: BoxService::new(inner), }) } + + pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result + where + C: Service + Send + 'static, + C::Error: Into + Send, + C::Future: Unpin + Send, + C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + { + let mut connection = Self::new(connector, endpoint)?; + connection.ready_and().await?; + Ok(connection) + } } impl Service for Connection { diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index bed7f1310..55a98cdc1 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -64,7 +64,7 @@ impl Discover for DynamicServiceStream { #[cfg(not(feature = "tls"))] let connector = service::connector(http); - let fut = Connection::new(connector, endpoint); + let fut = Connection::connect(connector, endpoint); self.connecting = Some((k, Box::pin(fut))); continue; } diff --git a/tonic/src/transport/service/reconnect.rs b/tonic/src/transport/service/reconnect.rs index 68a19d79a..3e6e7015f 100644 --- a/tonic/src/transport/service/reconnect.rs +++ b/tonic/src/transport/service/reconnect.rs @@ -31,16 +31,10 @@ impl Reconnect where M: Service, { - pub(crate) fn new(initial_connection: S, mk_service: M, target: Target) -> Self - where - M: Service, - S: Service, - Error: From + From, - Target: Clone, - { + pub(crate) fn new(mk_service: M, target: Target) -> Self { Reconnect { mk_service, - state: State::Connected(initial_connection), + state: State::Idle, target, error: None, }