Skip to content

Commit

Permalink
Add Endpoint::connect_lazy method
Browse files Browse the repository at this point in the history
Adds a 'lazy' constructor which will not attempt to connect to the
endpoint until first use. This is useful in situations where the
connection is created when the remote service may temporarily not be
responding, such as server startup.

Fixes hyperium#167
  • Loading branch information
danburkert committed Jul 9, 2020
1 parent ea7fe66 commit 0a4d81f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 16 deletions.
19 changes: 19 additions & 0 deletions tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ impl Endpoint {
Channel::connect(connector, self.clone()).await
}

/// Create a channel from this config.
///
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
pub fn connect_lazy(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(self.tcp_nodelay);
http.set_keepalive(self.tcp_keepalive);

#[cfg(feature = "tls")]
let connector = service::connector(http, self.tls.clone());

#[cfg(not(feature = "tls"))]
let connector = service::connector(http);

Channel::new(connector, self.clone())
}

/// Connect with a custom connector.
pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
where
Expand Down
21 changes: 18 additions & 3 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Channel {
(Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
Expand All @@ -140,9 +140,24 @@ impl Channel {
let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);

let svc = Connection::new(connector, endpoint)
.await
.map_err(|e| super::Error::from_source(e))?;
.map_err(super::Error::from_source)?;
let svc = Buffer::new(Either::A(svc), buffer_size);

Ok(Channel { svc })
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);

let svc = Connection::connect(connector, endpoint)
.await
.map_err(super::Error::from_source)?;
let svc = Buffer::new(Either::A(svc), buffer_size);

Ok(Channel { svc })
Expand Down
20 changes: 16 additions & 4 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tower::{
timeout::TimeoutLayer,
util::BoxService,
ServiceBuilder,
ServiceExt,
};
use tower_load::Load;
use tower_service::Service;
Expand All @@ -29,7 +30,7 @@ pub(crate) struct Connection {
}

impl Connection {
pub(crate) async fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
Expand Down Expand Up @@ -60,16 +61,27 @@ impl Connection {
.optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
.into_inner();

let mut connector = HyperConnect::new(connector, settings);
let initial_conn = connector.call(endpoint.uri.clone()).await?;
let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone());
let connector = HyperConnect::new(connector, settings);
let conn = Reconnect::new(connector, endpoint.uri.clone());

let inner = stack.layer(conn);

Ok(Self {
inner: BoxService::new(inner),
})
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
let mut connection = Self::new(connector, endpoint)?;
connection.ready_and().await?;
Ok(connection)
}
}

impl Service<Request> for Connection {
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/service/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<K: Hash + Eq + Clone> Discover for DynamicServiceStream<K> {

#[cfg(not(feature = "tls"))]
let connector = service::connector(http);
let fut = Connection::new(connector, endpoint);
let fut = Connection::connect(connector, endpoint);
self.connecting = Some((k, Box::pin(fut)));
continue;
}
Expand Down
10 changes: 2 additions & 8 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,10 @@ impl<M, Target> Reconnect<M, Target>
where
M: Service<Target>,
{
pub(crate) fn new<S, Request>(initial_connection: S, mk_service: M, target: Target) -> Self
where
M: Service<Target, Response = S>,
S: Service<Request>,
Error: From<M::Error> + From<S::Error>,
Target: Clone,
{
pub(crate) fn new(mk_service: M, target: Target) -> Self {
Reconnect {
mk_service,
state: State::Connected(initial_connection),
state: State::Idle,
target,
error: None,
}
Expand Down

0 comments on commit 0a4d81f

Please sign in to comment.