diff --git a/benches/support/tokiort.rs b/benches/support/tokiort.rs index 7692371630..a0bed9a3be 100644 --- a/benches/support/tokiort.rs +++ b/benches/support/tokiort.rs @@ -35,12 +35,6 @@ impl Timer for TokioTimer { inner: tokio::time::interval(period), }) } - - /* - fn timeout>(duration: Duration, future: T) -> Box + Unpin> { - tokio::time::timeout(duration, future) - } - */ } /// An Interval object that uses the tokio runtime. @@ -68,19 +62,6 @@ impl Future for TokioTimeout where T: Future { } -/* -impl Timeout for TokioTimeout -where - T: Future + Send + Sync, -{ - //type Output = ; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.as_mut().poll(cx) - } -} -*/ - // Use TokioSleep to get tokio::time::Sleep to implement Unpin. // see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html pub(crate) struct TokioSleep { diff --git a/src/client/pool.rs b/src/client/pool.rs index 711d24c243..622a103966 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -3,13 +3,9 @@ use std::error::Error as StdError; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, Weak}; - -#[cfg(not(feature = "runtime"))] use std::time::{Duration, Instant}; use futures_channel::oneshot; -#[cfg(feature = "runtime")] -use tokio::time::{Duration, Instant}; use tracing::{debug, trace}; use super::client::Ver; diff --git a/src/common/tim.rs b/src/common/tim.rs index 6d946fcb94..0b13647f4d 100644 --- a/src/common/tim.rs +++ b/src/common/tim.rs @@ -13,7 +13,6 @@ pub(crate) fn timeout(tim: Tim, future: F, duration: Duration) -> HyperTimeou HyperTimeout { sleep: tim.sleep(duration), future: future } } - pin_project_lite::pin_project! { pub(crate) struct HyperTimeout { sleep: Box, @@ -69,15 +68,4 @@ impl Timer for Tim { Some(ref t) => t.interval(period), } } - - /* - fn timeout(&self, duration: Duration, future: Box> + Unpin>) -> Box>> { - match *self { - None => { - panic!("You must supply a timer.") - } - Some(ref t) => t.timeout(duration, future), - } - } - */ } diff --git a/tests/client.rs b/tests/client.rs index c331511522..9fb8839423 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1346,6 +1346,7 @@ mod dispatch_impl { use hyper::body::HttpBody; use hyper::client::connect::{Connected, Connection}; use hyper::Client; + use hyper::rt::Timer; #[test] @@ -1390,7 +1391,7 @@ mod dispatch_impl { rt.block_on(async move { let (res, ()) = future::join(res, rx).await; res.unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; + TokioTimer.sleep(Duration::from_secs(1)).await; }); rt.block_on(closes.into_future()).0.expect("closes"); @@ -1446,7 +1447,7 @@ mod dispatch_impl { rt.block_on(async move { let (res, ()) = future::join(res, rx).await; res.unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; + TokioTimer.sleep(Duration::from_secs(1)).await; }); rt.block_on(closes.into_future()).0.expect("closes"); @@ -1513,7 +1514,7 @@ mod dispatch_impl { drop(client); // and wait a few ticks for the connections to close - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out")); futures_util::pin_mut!(t); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; @@ -1559,7 +1560,7 @@ mod dispatch_impl { future::select(res, rx1).await; // res now dropped - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out")); futures_util::pin_mut!(t); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; @@ -1612,7 +1613,7 @@ mod dispatch_impl { res.unwrap(); // and wait a few ticks to see the connection drop - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out")); futures_util::pin_mut!(t); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; @@ -1663,7 +1664,7 @@ mod dispatch_impl { let (res, ()) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out")); futures_util::pin_mut!(t); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; @@ -1707,7 +1708,7 @@ mod dispatch_impl { let (res, ()) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out")); futures_util::pin_mut!(t); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; @@ -1906,7 +1907,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); let delayed_body = rx1 - .then(|_| tokio::time::sleep(Duration::from_millis(200))) + .then(|_| TokioTimer.sleep(Duration::from_millis(200))) .map(|_| Ok::<_, ()>(Bytes::from("hello a"))) .map_err(|_| -> std::convert::Infallible { panic!("rx1") }) .into_stream(); @@ -1921,7 +1922,7 @@ mod dispatch_impl { // req 1 let fut = future::join(client.request(req), rx) - .then(|_| tokio::time::sleep(Duration::from_millis(200))) + .then(|_| TokioTimer.sleep(Duration::from_millis(200))) // req 2 .then(move |()| { let rx = rx3.expect("thread panicked"); @@ -2008,7 +2009,7 @@ mod dispatch_impl { // sleep real quick to let the threadpool put connection in ready // state and back into client pool - tokio::time::sleep(Duration::from_millis(50)).await; + TokioTimer.sleep(Duration::from_millis(50)).await; let rx = rx2.expect("thread panicked"); let req = Request::builder() @@ -2370,6 +2371,7 @@ mod conn { use bytes::Buf; use futures_channel::oneshot; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; + use hyper::rt::Timer; use hyper::upgrade::OnUpgrade; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; @@ -2526,7 +2528,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); + let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); let chunk = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(chunk.len(), 5); } @@ -2621,7 +2623,7 @@ mod conn { concat(res) }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); + let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } @@ -2667,7 +2669,7 @@ mod conn { concat(res) }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); + let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } @@ -2719,7 +2721,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); + let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); rt.block_on(future::join3(res1, res2, rx).map(|r| r.0)) .unwrap(); } @@ -2780,7 +2782,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); + let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); rt.block_on(future::join3(until_upgrade, res, rx).map(|r| r.0)) .unwrap(); @@ -2871,7 +2873,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); + let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200))); rt.block_on(future::join3(until_tunneled, res, rx).map(|r| r.0)) .unwrap(); @@ -2979,7 +2981,7 @@ mod conn { let _ = shdn_tx.send(true); // Allow time for graceful shutdown roundtrips... - tokio::time::sleep(Duration::from_millis(100)).await; + TokioTimer.sleep(Duration::from_millis(100)).await; // After graceful shutdown roundtrips, the client should be closed... future::poll_fn(|ctx| client.poll_ready(ctx)) @@ -3058,7 +3060,7 @@ mod conn { }); // sleep longer than keepalive would trigger - tokio::time::sleep(Duration::from_secs(4)).await; + TokioTimer.sleep(Duration::from_secs(4)).await; future::poll_fn(|ctx| client.poll_ready(ctx)) .await @@ -3166,7 +3168,7 @@ mod conn { let _resp = client.send_request(req1).await.expect("send_request"); // sleep longer than keepalive would trigger - tokio::time::sleep(Duration::from_secs(4)).await; + TokioTimer.sleep(Duration::from_secs(4)).await; future::poll_fn(|ctx| client.poll_ready(ctx)) .await diff --git a/tests/server.rs b/tests/server.rs index 3a73cfc2bd..e2d844a2af 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -21,6 +21,7 @@ use h2::client::SendRequest; use h2::{RecvStream, SendStream}; use http::header::{HeaderName, HeaderValue}; use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use hyper::rt::Timer; use support::TokioTimer; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -959,7 +960,7 @@ async fn expect_continue_waits_for_body_poll() { service_fn(|req| { assert_eq!(req.headers()["expect"], "100-continue"); // But! We're never going to poll the body! - tokio::time::sleep(Duration::from_millis(50)).map(move |_| { + TokioTimer.sleep(Duration::from_millis(50)).map(move |_| { // Move and drop the req, so we don't auto-close drop(req); Response::builder() @@ -1256,7 +1257,7 @@ async fn http1_allow_half_close() { .serve_connection( socket, service_fn(|_| { - tokio::time::sleep(Duration::from_millis(500)) + TokioTimer.sleep(Duration::from_millis(500)) .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) }), ) @@ -1284,7 +1285,7 @@ async fn disconnect_after_reading_request_before_responding() { .serve_connection( socket, service_fn(|_| { - tokio::time::sleep(Duration::from_secs(2)).map( + TokioTimer.sleep(Duration::from_secs(2)).map( |_| -> Result, hyper::Error> { panic!("response future should have been dropped"); }, @@ -2531,7 +2532,7 @@ async fn http2_keep_alive_with_responsive_client() { conn.await.expect("client conn"); }); - tokio::time::sleep(Duration::from_secs(4)).await; + TokioTimer.sleep(Duration::from_secs(4)).await; let req = http::Request::new(hyper::Body::empty()); client.send_request(req).await.expect("client.send_request");