Skip to content

Commit

Permalink
ready-cache: Ensure cancelation can be observed (#668)
Browse files Browse the repository at this point in the history
`tokio::task` enforces a cooperative scheduling regime that can cause
`oneshot::Receiver::poll` to return pending after the sender has sent an
update. `ReadyCache` uses a oneshot to notify pending services that they
should not become ready. When a cancelation is not observed, the ready
cache return service instances that should have been canceled, which
breaks assumptions and causes an invalid state.

This branch replaces the use of `tokio::sync::oneshot` for canceling 
pending futures with a custom cancelation handle using an `AtomicBool`
and `futures::task::AtomicWaker`. This ensures that canceled `Pending`
services are always woken even when the task's budget is exceeded.
Additionally, cancelation status is now always known to the `Pending`
future, by checking the `AtomicBool` immediately on polls, even in cases
where the canceled `Pending` future was woken by the inner `Service`
becoming ready, rather than by the cancelation.

Fixes #415

Signed-off-by: Oliver Gould <ver@buoyant.io>
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
olix0r and hawkw committed Jun 17, 2022
1 parent a2305b3 commit 051b094
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 34 deletions.
101 changes: 68 additions & 33 deletions tower/src/ready_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

use super::error;
use futures_core::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
pub use indexmap::Equivalent;
use indexmap::IndexMap;
use std::fmt;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};

Expand Down Expand Up @@ -75,8 +76,18 @@ where
// Safety: This is safe because we do not use `Pin::new_unchecked`.
impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}

type CancelRx = oneshot::Receiver<()>;
type CancelTx = oneshot::Sender<()>;
#[derive(Debug)]
struct Cancel {
waker: AtomicWaker,
canceled: AtomicBool,
}

#[derive(Debug)]
struct CancelRx(Arc<Cancel>);

#[derive(Debug)]
struct CancelTx(Arc<Cancel>);

type CancelPair = (CancelTx, CancelRx);

#[derive(Debug)]
Expand Down Expand Up @@ -195,7 +206,7 @@ where
/// must be called to cause the service to be dropped.
pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
c.send(()).expect("cancel receiver lost");
c.cancel();
true
} else {
false
Expand Down Expand Up @@ -226,14 +237,14 @@ where
///
/// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
pub fn push(&mut self, key: K, svc: S) {
let cancel = oneshot::channel();
let cancel = cancelable();
self.push_pending(key, svc, cancel);
}

fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
// If there is already a service for this key, cancel it.
c.send(()).expect("cancel receiver lost");
c.cancel();
}
self.pending.push(Pending {
key: Some(key),
Expand Down Expand Up @@ -270,21 +281,10 @@ where
// recreated after the service is used.
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
} else {
// This should not technically be possible. We must have decided to cancel
// a Service (by sending on the CancelTx), yet that same service then
// returns Ready. Since polling a Pending _first_ polls the CancelRx, that
// _should_ always see our CancelTx send. Yet empirically, that isn't true:
//
// https://github.com/tower-rs/tower/issues/415
//
// So, we instead detect the endpoint as canceled at this point. That
// should be fine, since the oneshot is only really there to ensure that
// the Pending is polled again anyway.
//
// We assert that this can't happen in debug mode so that hopefully one day
// we can find a test that triggers this reliably.
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed when ready");
assert!(
cancel_tx.is_some(),
"services that become ready must have a pending cancelation"
);
}
}
Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
Expand All @@ -294,13 +294,11 @@ where
}
Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
if cancel_tx.is_some() {
return Err(error::Failed(key, e.into())).into();
} else {
// See comment for the same clause under Ready(Some(Ok)).
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed on error");
}
assert!(
cancel_tx.is_some(),
"services that return an error must have a pending cancelation"
);
return Err(error::Failed(key, e.into())).into();
}
}
}
Expand Down Expand Up @@ -400,6 +398,28 @@ where
}
}

// === impl Cancel ===

/// Creates a cancelation sender and receiver.
///
/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
/// the state to be observed as soon as the cancelation is triggered.
fn cancelable() -> CancelPair {
let cx = Arc::new(Cancel {
waker: AtomicWaker::new(),
canceled: AtomicBool::new(false),
});
(CancelTx(cx.clone()), CancelRx(cx))
}

impl CancelTx {
fn cancel(self) {
self.0.canceled.store(true, Ordering::SeqCst);
self.0.waker.wake();
}
}

// === Pending ===

impl<K, S, Req> Future for Pending<K, S, Req>
Expand All @@ -410,9 +430,10 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut fut = this.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
// Before checking whether the service is ready, check to see whether
// readiness has been canceled.
let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
if cancel.canceled.load(Ordering::SeqCst) {
let key = this.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key)).into();
}
Expand All @@ -423,7 +444,21 @@ where
.expect("polled after ready")
.poll_ready(cx)
{
Poll::Pending => Poll::Pending,
Poll::Pending => {
// Before returning Pending, register interest in cancelation so
// that this future is polled again if the state changes.
let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
cancel.waker.register(cx.waker());
// Because both the cancel receiver and cancel sender are held
// by the `ReadyCache` (i.e., on a single task), then it must
// not be possible for the cancelation state to change while
// polling a `Pending` service.
assert!(
!cancel.canceled.load(Ordering::SeqCst),
"cancelation cannot be notified while polling a pending service"
);
Poll::Pending
}
Poll::Ready(Ok(())) => {
let key = this.key.take().expect("polled after complete");
let cancel = this.cancel.take().expect("polled after complete");
Expand Down
32 changes: 31 additions & 1 deletion tower/tests/ready_cache/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
#[path = "../support.rs"]
mod support;

use std::pin::Pin;
use tokio_test::{assert_pending, assert_ready, task};
use tower::ready_cache::ReadyCache;
use tower::ready_cache::{error, ReadyCache};
use tower_test::mock;

type Req = &'static str;
Expand Down Expand Up @@ -191,3 +192,32 @@ fn duplicate_key_by_index() {
// _and_ service 0 should now be callable
assert!(task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
}

// Tests https://github.com/tower-rs/tower/issues/415
#[tokio::test(flavor = "current_thread")]
async fn cancelation_observed() {
let mut cache = ReadyCache::default();
let mut handles = vec![];

// NOTE This test passes at 129 items, but fails at 130 items (if coop
// schedulding interferes with cancelation).
for _ in 0..130 {
let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
handle.allow(1);
cache.push("ep0", svc);
handles.push(handle);
}

struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>);
impl Unpin for Ready {}
impl std::future::Future for Ready {
type Output = Result<(), error::Failed<&'static str>>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.get_mut().0.poll_pending(cx)
}
}
Ready(cache).await.unwrap();
}

0 comments on commit 051b094

Please sign in to comment.