diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index f8956d4056..6973df7985 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -107,7 +107,7 @@ impl Sender { impl UnboundedSender { pub fn is_ready(&self) -> bool { - self.giver.is_wanting() + !self.giver.is_canceled() } pub fn is_closed(&self) -> bool { diff --git a/src/client/mod.rs b/src/client/mod.rs index 4829c7a3be..06e0a2de15 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -318,6 +318,15 @@ where C: Connect + Sync + 'static, Ok(()) }) ); + } else { + // There's no body to delay, but the connection isn't + // ready yet. Only re-insert when it's ready + executor.execute( + future::poll_fn(move || { + pooled.poll_ready() + }) + .then(|_| Ok(())) + ); } Ok(res) }); @@ -441,6 +450,13 @@ impl PoolClient { PoolTx::Http2(ref tx) => tx.is_ready(), } } + + fn is_closed(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_closed(), + PoolTx::Http2(ref tx) => tx.is_closed(), + } + } } impl PoolClient { @@ -460,10 +476,10 @@ impl Poolable for PoolClient where B: 'static, { - fn is_closed(&self) -> bool { + fn is_open(&self) -> bool { match self.tx { - PoolTx::Http1(ref tx) => tx.is_closed(), - PoolTx::Http2(ref tx) => tx.is_closed(), + PoolTx::Http1(ref tx) => tx.is_ready(), + PoolTx::Http2(ref tx) => tx.is_ready(), } } diff --git a/src/client/pool.rs b/src/client/pool.rs index c9b39ffe44..0d4c8902df 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -21,7 +21,7 @@ pub(super) struct Pool { // // See https://github.com/hyperium/hyper/issues/1429 pub(super) trait Poolable: Sized { - fn is_closed(&self) -> bool; + fn is_open(&self) -> bool; /// Reserve this connection. /// /// Allows for HTTP/2 to return a shared reservation. @@ -236,7 +236,7 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { while let Some(entry) = self.list.pop() { // If the connection has been closed, or is older than our idle // timeout, simply drop it and keep looking... - if entry.value.is_closed() { + if !entry.value.is_open() { trace!("removing closed connection for {:?}", self.key); continue; } @@ -377,7 +377,7 @@ impl PoolInner { self.idle.retain(|key, values| { values.retain(|entry| { - if entry.value.is_closed() { + if !entry.value.is_open() { trace!("idle interval evicting closed for {:?}", key); return false; } @@ -475,7 +475,7 @@ impl DerefMut for Pooled { impl Drop for Pooled { fn drop(&mut self) { if let Some(value) = self.value.take() { - if value.is_closed() { + if !value.is_open() { // If we *already* know the connection is done here, // it shouldn't be re-inserted back into the pool. return; @@ -519,7 +519,7 @@ impl Checkout { if let Some(mut rx) = self.waiter.take() { match rx.poll() { Ok(Async::Ready(value)) => { - if !value.is_closed() { + if value.is_open() { Ok(Async::Ready(Some(self.pool.reuse(&self.key, value)))) } else { Err(::Error::new_canceled(Some(CANCELED))) @@ -662,8 +662,8 @@ mod tests { struct Uniq(T); impl Poolable for Uniq { - fn is_closed(&self) -> bool { - false + fn is_open(&self) -> bool { + true } fn reserve(self) -> Reservation { @@ -671,21 +671,6 @@ mod tests { } } - /* - #[derive(Debug, PartialEq, Eq, Clone, Copy)] - struct Share(T); - - impl Poolable for Share { - fn is_closed(&self) -> bool { - false - } - - fn reserve(self) -> Reservation { - Reservation::Shared(self.clone(), self) - } - } - */ - fn c(key: Key) -> Connecting { Connecting { key, @@ -817,8 +802,8 @@ mod tests { } impl Poolable for CanClose { - fn is_closed(&self) -> bool { - self.closed + fn is_open(&self) -> bool { + !self.closed } fn reserve(self) -> Reservation { diff --git a/src/client/tests.rs b/src/client/tests.rs index 6ea9ce5812..94091b584d 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -88,7 +88,7 @@ fn conn_reset_after_write() { } // sleep to allow some time for the connection to return to the pool - thread::sleep(Duration::from_secs(1)); + thread::sleep(Duration::from_millis(50)); let req = Request::builder() .uri("http://mock.local/a")