From 23c6ada413fa86e11d6a9e39b7b51cb2d516e747 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 13 Dec 2019 20:01:11 -0500 Subject: [PATCH] fix(transport): Fix lazily reconnecting Closes #167 --- tonic/src/transport/service/reconnect.rs | 54 +++++++++++++++++++----- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/tonic/src/transport/service/reconnect.rs b/tonic/src/transport/service/reconnect.rs index 344f962e5..55cc58cbd 100644 --- a/tonic/src/transport/service/reconnect.rs +++ b/tonic/src/transport/service/reconnect.rs @@ -1,5 +1,5 @@ use crate::Error; -use pin_project::pin_project; +use pin_project::{pin_project, project}; use std::fmt; use std::{ future::Future, @@ -17,6 +17,7 @@ where mk_service: M, state: State, target: Target, + error: Option, } #[derive(Debug)] @@ -41,6 +42,7 @@ where mk_service, state: State::Connected(initial_connection), target, + error: None, } } } @@ -55,10 +57,9 @@ where { type Response = S::Response; type Error = Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let ret; let mut state; loop { @@ -90,7 +91,7 @@ where Poll::Ready(Err(e)) => { trace!("poll_ready; error"); state = State::Idle; - ret = Err(e.into()); + self.error = Some(e.into()); break; } } @@ -118,10 +119,14 @@ where } self.state = state; - Poll::Ready(ret) + Poll::Ready(Ok(())) } fn call(&mut self, request: Request) -> Self::Future { + if let Some(error) = self.error.take() { + return ResponseFuture::error(error); + } + let service = match self.state { State::Connected(ref mut service) => service, _ => panic!("service not ready; poll_ready must be called first"), @@ -148,27 +153,54 @@ where } } +/// Future that resolves to the response or failure to connect. #[pin_project] #[derive(Debug)] -pub(crate) struct ResponseFuture { +pub(crate) struct ResponseFuture { #[pin] - inner: F, + inner: Inner, +} + +#[pin_project] +#[derive(Debug)] +enum Inner { + Future(#[pin] F), + Error(Option), } -impl ResponseFuture { +impl ResponseFuture { pub(crate) fn new(inner: F) -> Self { - ResponseFuture { inner } + ResponseFuture { + inner: Inner::Future(inner), + } + } + + pub(crate) fn error(error: E) -> Self { + ResponseFuture { + inner: Inner::Error(Some(error)), + } } } -impl Future for ResponseFuture +impl Future for ResponseFuture where F: Future>, E: Into, + ME: Into, { type Output = Result; + #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx).map_err(Into::into) + //self.project().inner.poll(cx).map_err(Into::into) + let me = self.project(); + #[project] + match me.inner.project() { + Inner::Future(fut) => fut.poll(cx).map_err(Into::into), + Inner::Error(e) => { + let e = e.take().expect("Polled after ready.").into(); + Poll::Ready(Err(e)) + } + } } }