Skip to content

Commit

Permalink
Wake waiting tower-batch tasks on drop
Browse files Browse the repository at this point in the history
When other tower-batch tasks drop, wake any tasks that are waiting for
a semaphore permit. Otherwise, tower-batch can hang.

We currently pin tower in our workspace to:
d4d1c67 hedge: use auto-resizing histograms (tower-rs/tower#484)

Copy tower/src/semaphore.rs from that commit, to pick up
tower-rs/tower#480.
  • Loading branch information
teor2345 committed Feb 17, 2021
1 parent 47bcf63 commit 526ccfc
Showing 1 changed file with 46 additions and 5 deletions.
51 changes: 46 additions & 5 deletions tower-batch/src/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copied from tower/src/semaphore.rs
// When/if tower-batch is upstreamed, delete this file
// and use the common tower semaphore implementation
// Copied from tower/src/semaphore.rs, commit:
// d4d1c67 hedge: use auto-resizing histograms (#484)
//
// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's:
// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556)

pub(crate) use self::sync::OwnedSemaphorePermit as Permit;
use futures_core::ready;
Expand All @@ -9,7 +11,7 @@ use std::{
future::Future,
mem,
pin::Pin,
sync::Arc,
sync::{Arc, Weak},
task::{Context, Poll},
};
use tokio::sync;
Expand All @@ -20,13 +22,32 @@ pub(crate) struct Semaphore {
state: State,
}

#[derive(Debug)]
pub(crate) struct Close {
semaphore: Weak<sync::Semaphore>,
permits: usize,
}

enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync + 'static>>),
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + 'static>>),
Ready(Permit),
Empty,
}

impl Semaphore {
pub(crate) fn new_with_close(permits: usize) -> (Self, Close) {
let semaphore = Arc::new(sync::Semaphore::new(permits));
let close = Close {
semaphore: Arc::downgrade(&semaphore),
permits,
};
let semaphore = Self {
semaphore,
state: State::Empty,
};
(semaphore, close)
}

pub(crate) fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(sync::Semaphore::new(permits)),
Expand Down Expand Up @@ -76,3 +97,23 @@ impl fmt::Debug for State {
}
}
}

impl Close {
/// Close the semaphore, waking any remaining tasks currently awaiting a permit.
pub(crate) fn close(self) {
// The maximum number of permits that a `tokio::sync::Semaphore`
// can hold is usize::MAX >> 3. If we attempt to add more than that
// number of permits, the semaphore will panic.
// XXX(eliza): another shift is kinda janky but if we add (usize::MAX
// > 3 - initial permits) the semaphore impl panics (I think due to a
// bug in tokio?).
// TODO(eliza): Tokio should _really_ just expose `Semaphore::close`
// publicly so we don't have to do this nonsense...
const MAX: usize = std::usize::MAX >> 4;
if let Some(semaphore) = self.semaphore.upgrade() {
// If we added `MAX - available_permits`, any tasks that are
// currently holding permits could drop them, overflowing the max.
semaphore.add_permits(MAX - self.permits);
}
}
}

0 comments on commit 526ccfc

Please sign in to comment.