Skip to content

Commit

Permalink
ready_cache: just use pin_project for Pending (#667)
Browse files Browse the repository at this point in the history
This gets rid of the `Unpin` impl with the weird comment on it.

Alternatively, we could just put a `S: Unpin` bound on `Pending`, but
this changes the public API to require that the service type is `Unpin`.
In practice, it will be, but we could also just avoid the trait bound.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Jun 17, 2022
1 parent b0e7a71 commit a2305b3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ limit = ["__common", "tokio/time", "tokio/sync", "tokio-util", "tracing"]
load = ["__common", "tokio/time", "tracing"]
load-shed = ["__common"]
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["__common", "tokio/time"]
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
Expand Down
38 changes: 19 additions & 19 deletions tower/src/ready_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ enum PendingError<K, E> {
Inner(K, E),
}

/// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: Option<S>,
_pd: std::marker::PhantomData<Req>,
pin_project_lite::pin_project! {
/// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: Option<S>,
_pd: std::marker::PhantomData<Req>,
}
}

// === ReadyCache ===
Expand Down Expand Up @@ -400,37 +402,35 @@ where

// === Pending ===

// Safety: No use unsafe access therefore this is safe.
impl<K, S, Req> Unpin for Pending<K, S, Req> {}

impl<K, S, Req> Future for Pending<K, S, Req>
where
S: Service<Req>,
{
type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut fut = self.cancel.as_mut().expect("polled after complete");
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut fut = this.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
let key = self.key.take().expect("polled after complete");
let key = this.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key)).into();
}

match self
match this
.ready
.as_mut()
.expect("polled after ready")
.poll_ready(cx)
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
let key = self.key.take().expect("polled after complete");
let cancel = self.cancel.take().expect("polled after complete");
Ok((key, self.ready.take().expect("polled after ready"), cancel)).into()
let key = this.key.take().expect("polled after complete");
let cancel = this.cancel.take().expect("polled after complete");
Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
}
Poll::Ready(Err(e)) => {
let key = self.key.take().expect("polled after compete");
let key = this.key.take().expect("polled after compete");
Err(PendingError::Inner(key, e)).into()
}
}
Expand Down

0 comments on commit a2305b3

Please sign in to comment.