Skip to content

Commit

Permalink
Move some tokio::time::* over to TokioTimer.*
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Cunningham committed Aug 5, 2022
1 parent 1aca08f commit ef358ac
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 58 deletions.
19 changes: 0 additions & 19 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ impl Timer for TokioTimer {
inner: tokio::time::interval(period),
})
}

/*
fn timeout<O, T: Future<Output = O>>(duration: Duration, future: T) -> Box<dyn Timeout<O> + Unpin> {
tokio::time::timeout(duration, future)
}
*/
}

/// An Interval object that uses the tokio runtime.
Expand Down Expand Up @@ -68,19 +62,6 @@ impl<T> Future for TokioTimeout<T> where T: Future {

}

/*
impl<T> Timeout<T> for TokioTimeout<T>
where
T: Future + Send + Sync,
{
//type Output = ;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Elapsed>> {
self.inner.as_mut().poll(cx)
}
}
*/

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pub(crate) struct TokioSleep {
Expand Down
4 changes: 0 additions & 4 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@ use std::error::Error as StdError;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};

#[cfg(not(feature = "runtime"))]
use std::time::{Duration, Instant};

use futures_channel::oneshot;
#[cfg(feature = "runtime")]
use tokio::time::{Duration, Instant};
use tracing::{debug, trace};

use super::client::Ver;
Expand Down
12 changes: 0 additions & 12 deletions src/common/tim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub(crate) fn timeout<F>(tim: Tim, future: F, duration: Duration) -> HyperTimeou
HyperTimeout { sleep: tim.sleep(duration), future: future }
}
pin_project_lite::pin_project! {
pub(crate) struct HyperTimeout<F> {
sleep: Box<dyn Sleep>,
Expand Down Expand Up @@ -69,15 +68,4 @@ impl Timer for Tim {
Some(ref t) => t.interval(period),
}
}

/*
fn timeout(&self, duration: Duration, future: Box<dyn Future<Output = Box<dyn Any>> + Unpin>) -> Box<tokio::time::Timeout<Box<dyn Any>>> {
match *self {
None => {
panic!("You must supply a timer.")
}
Some(ref t) => t.timeout(duration, future),
}
}
*/
}
40 changes: 21 additions & 19 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,7 @@ mod dispatch_impl {
use hyper::body::HttpBody;
use hyper::client::connect::{Connected, Connection};
use hyper::Client;
use hyper::rt::Timer;


#[test]
Expand Down Expand Up @@ -1390,7 +1391,7 @@ mod dispatch_impl {
rt.block_on(async move {
let (res, ()) = future::join(res, rx).await;
res.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
TokioTimer.sleep(Duration::from_secs(1)).await;
});

rt.block_on(closes.into_future()).0.expect("closes");
Expand Down Expand Up @@ -1446,7 +1447,7 @@ mod dispatch_impl {
rt.block_on(async move {
let (res, ()) = future::join(res, rx).await;
res.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
TokioTimer.sleep(Duration::from_secs(1)).await;
});

rt.block_on(closes.into_future()).0.expect("closes");
Expand Down Expand Up @@ -1513,7 +1514,7 @@ mod dispatch_impl {
drop(client);

// and wait a few ticks for the connections to close
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
Expand Down Expand Up @@ -1559,7 +1560,7 @@ mod dispatch_impl {
future::select(res, rx1).await;

// res now dropped
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
Expand Down Expand Up @@ -1612,7 +1613,7 @@ mod dispatch_impl {
res.unwrap();

// and wait a few ticks to see the connection drop
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
Expand Down Expand Up @@ -1663,7 +1664,7 @@ mod dispatch_impl {
let (res, ()) = future::join(res, rx).await;
res.unwrap();

let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
Expand Down Expand Up @@ -1707,7 +1708,7 @@ mod dispatch_impl {
let (res, ()) = future::join(res, rx).await;
res.unwrap();

let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
let t = TokioTimer.sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
Expand Down Expand Up @@ -1906,7 +1907,7 @@ mod dispatch_impl {
assert_eq!(connects.load(Ordering::Relaxed), 0);

let delayed_body = rx1
.then(|_| tokio::time::sleep(Duration::from_millis(200)))
.then(|_| TokioTimer.sleep(Duration::from_millis(200)))
.map(|_| Ok::<_, ()>(Bytes::from("hello a")))
.map_err(|_| -> std::convert::Infallible { panic!("rx1") })
.into_stream();
Expand All @@ -1921,7 +1922,7 @@ mod dispatch_impl {

// req 1
let fut = future::join(client.request(req), rx)
.then(|_| tokio::time::sleep(Duration::from_millis(200)))
.then(|_| TokioTimer.sleep(Duration::from_millis(200)))
// req 2
.then(move |()| {
let rx = rx3.expect("thread panicked");
Expand Down Expand Up @@ -2008,7 +2009,7 @@ mod dispatch_impl {

// sleep real quick to let the threadpool put connection in ready
// state and back into client pool
tokio::time::sleep(Duration::from_millis(50)).await;
TokioTimer.sleep(Duration::from_millis(50)).await;

let rx = rx2.expect("thread panicked");
let req = Request::builder()
Expand Down Expand Up @@ -2370,6 +2371,7 @@ mod conn {
use bytes::Buf;
use futures_channel::oneshot;
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use hyper::rt::Timer;
use hyper::upgrade::OnUpgrade;
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};
Expand Down Expand Up @@ -2526,7 +2528,7 @@ mod conn {
});

let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200)));
let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200)));
let chunk = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
assert_eq!(chunk.len(), 5);
}
Expand Down Expand Up @@ -2621,7 +2623,7 @@ mod conn {
concat(res)
});
let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200)));
let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200)));
rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
}

Expand Down Expand Up @@ -2667,7 +2669,7 @@ mod conn {
concat(res)
});
let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200)));
let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200)));
rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
}

Expand Down Expand Up @@ -2719,7 +2721,7 @@ mod conn {
});

let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200)));
let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200)));
rt.block_on(future::join3(res1, res2, rx).map(|r| r.0))
.unwrap();
}
Expand Down Expand Up @@ -2780,7 +2782,7 @@ mod conn {
});

let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200)));
let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200)));
rt.block_on(future::join3(until_upgrade, res, rx).map(|r| r.0))
.unwrap();

Expand Down Expand Up @@ -2871,7 +2873,7 @@ mod conn {
});

let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200)));
let rx = rx.then(|_| TokioTimer.sleep(Duration::from_millis(200)));
rt.block_on(future::join3(until_tunneled, res, rx).map(|r| r.0))
.unwrap();

Expand Down Expand Up @@ -2979,7 +2981,7 @@ mod conn {
let _ = shdn_tx.send(true);

// Allow time for graceful shutdown roundtrips...
tokio::time::sleep(Duration::from_millis(100)).await;
TokioTimer.sleep(Duration::from_millis(100)).await;

// After graceful shutdown roundtrips, the client should be closed...
future::poll_fn(|ctx| client.poll_ready(ctx))
Expand Down Expand Up @@ -3058,7 +3060,7 @@ mod conn {
});

// sleep longer than keepalive would trigger
tokio::time::sleep(Duration::from_secs(4)).await;
TokioTimer.sleep(Duration::from_secs(4)).await;

future::poll_fn(|ctx| client.poll_ready(ctx))
.await
Expand Down Expand Up @@ -3166,7 +3168,7 @@ mod conn {
let _resp = client.send_request(req1).await.expect("send_request");

// sleep longer than keepalive would trigger
tokio::time::sleep(Duration::from_secs(4)).await;
TokioTimer.sleep(Duration::from_secs(4)).await;

future::poll_fn(|ctx| client.poll_ready(ctx))
.await
Expand Down
9 changes: 5 additions & 4 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use h2::client::SendRequest;
use h2::{RecvStream, SendStream};
use http::header::{HeaderName, HeaderValue};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use hyper::rt::Timer;
use support::TokioTimer;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -959,7 +960,7 @@ async fn expect_continue_waits_for_body_poll() {
service_fn(|req| {
assert_eq!(req.headers()["expect"], "100-continue");
// But! We're never going to poll the body!
tokio::time::sleep(Duration::from_millis(50)).map(move |_| {
TokioTimer.sleep(Duration::from_millis(50)).map(move |_| {
// Move and drop the req, so we don't auto-close
drop(req);
Response::builder()
Expand Down Expand Up @@ -1256,7 +1257,7 @@ async fn http1_allow_half_close() {
.serve_connection(
socket,
service_fn(|_| {
tokio::time::sleep(Duration::from_millis(500))
TokioTimer.sleep(Duration::from_millis(500))
.map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty())))
}),
)
Expand Down Expand Up @@ -1284,7 +1285,7 @@ async fn disconnect_after_reading_request_before_responding() {
.serve_connection(
socket,
service_fn(|_| {
tokio::time::sleep(Duration::from_secs(2)).map(
TokioTimer.sleep(Duration::from_secs(2)).map(
|_| -> Result<Response<Body>, hyper::Error> {
panic!("response future should have been dropped");
},
Expand Down Expand Up @@ -2531,7 +2532,7 @@ async fn http2_keep_alive_with_responsive_client() {
conn.await.expect("client conn");
});

tokio::time::sleep(Duration::from_secs(4)).await;
TokioTimer.sleep(Duration::from_secs(4)).await;

let req = http::Request::new(hyper::Body::empty());
client.send_request(req).await.expect("client.send_request");
Expand Down

0 comments on commit ef358ac

Please sign in to comment.