Skip to content
This repository has been archived by the owner on Feb 4, 2021. It is now read-only.

Commit

Permalink
fix(transport): Fix lazily reconnecting (hyperium#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco authored and rabbitinspace committed Jan 1, 2020
1 parent f19bc87 commit f8323b3
Showing 1 changed file with 43 additions and 11 deletions.
54 changes: 43 additions & 11 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::Error;
use pin_project::pin_project;
use pin_project::{pin_project, project};
use std::fmt;
use std::{
future::Future,
Expand All @@ -17,6 +17,7 @@ where
mk_service: M,
state: State<M::Future, M::Response>,
target: Target,
error: Option<M::Error>,
}

#[derive(Debug)]
Expand All @@ -41,6 +42,7 @@ where
mk_service,
state: State::Connected(initial_connection),
target,
error: None,
}
}
}
Expand All @@ -55,10 +57,9 @@ where
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future>;
type Future = ResponseFuture<S::Future, M::Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ret;
let mut state;

loop {
Expand Down Expand Up @@ -90,7 +91,7 @@ where
Poll::Ready(Err(e)) => {
trace!("poll_ready; error");
state = State::Idle;
ret = Err(e.into());
self.error = Some(e.into());
break;
}
}
Expand Down Expand Up @@ -118,10 +119,14 @@ where
}

self.state = state;
Poll::Ready(ret)
Poll::Ready(Ok(()))
}

fn call(&mut self, request: Request) -> Self::Future {
if let Some(error) = self.error.take() {
return ResponseFuture::error(error);
}

let service = match self.state {
State::Connected(ref mut service) => service,
_ => panic!("service not ready; poll_ready must be called first"),
Expand All @@ -148,27 +153,54 @@ where
}
}

/// Future that resolves to the response or failure to connect.
#[pin_project]
#[derive(Debug)]
pub(crate) struct ResponseFuture<F> {
pub(crate) struct ResponseFuture<F, E> {
#[pin]
inner: F,
inner: Inner<F, E>,
}

#[pin_project]
#[derive(Debug)]
enum Inner<F, E> {
Future(#[pin] F),
Error(Option<E>),
}

impl<F> ResponseFuture<F> {
impl<F, E> ResponseFuture<F, E> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture { inner }
ResponseFuture {
inner: Inner::Future(inner),
}
}

pub(crate) fn error(error: E) -> Self {
ResponseFuture {
inner: Inner::Error(Some(error)),
}
}
}

impl<F, T, E> Future for ResponseFuture<F>
impl<F, T, E, ME> Future for ResponseFuture<F, ME>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
ME: Into<Error>,
{
type Output = Result<T, Error>;

#[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map_err(Into::into)
//self.project().inner.poll(cx).map_err(Into::into)
let me = self.project();
#[project]
match me.inner.project() {
Inner::Future(fut) => fut.poll(cx).map_err(Into::into),
Inner::Error(e) => {
let e = e.take().expect("Polled after ready.").into();
Poll::Ready(Err(e))
}
}
}
}

0 comments on commit f8323b3

Please sign in to comment.