Skip to content

Commit

Permalink
pass all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Cunningham committed Aug 5, 2022
1 parent 50005a9 commit 1aca08f
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 5 deletions.
3 changes: 3 additions & 0 deletions benches/support/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

mod tokiort;
pub use tokiort::TokioTimer;
111 changes: 111 additions & 0 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#![allow(dead_code)]
//! Various runtimes for hyper
use std::{
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use futures_util::Future;
use hyper::rt::{Interval, Sleep, Timer};

/// An Executor that uses the tokio runtime.
pub struct TokioExecutor;

/// A Timer that uses the tokio runtime.
#[derive(Clone, Debug)]
pub struct TokioTimer;

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
let s = tokio::time::sleep(duration);
let hs = TokioSleep { inner: Box::pin(s) };
return Box::new(hs);
}

fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
return Box::new(TokioSleep {
inner: Box::pin(tokio::time::sleep_until(deadline.into())),
});
}

fn interval(&self, period: Duration) -> Box<dyn Interval> {
Box::new(TokioInterval {
inner: tokio::time::interval(period),
})
}

/*
fn timeout<O, T: Future<Output = O>>(duration: Duration, future: T) -> Box<dyn Timeout<O> + Unpin> {
tokio::time::timeout(duration, future)
}
*/
}

/// An Interval object that uses the tokio runtime.
pub struct TokioInterval {
inner: tokio::time::Interval,
}

impl Interval for TokioInterval {
fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<std::time::Instant> {
let raw = tokio::time::Interval::poll_tick(&mut self.inner, cx);
raw.map(|a| a.into_std())
}
}

struct TokioTimeout<T> {
inner: Pin<Box<tokio::time::Timeout<T>>>,
}

impl<T> Future for TokioTimeout<T> where T: Future {
type Output = Result<T::Output, tokio::time::error::Elapsed>;

fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(context)
}

}

/*
impl<T> Timeout<T> for TokioTimeout<T>
where
T: Future + Send + Sync,
{
//type Output = ;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Elapsed>> {
self.inner.as_mut().poll(cx)
}
}
*/

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pub(crate) struct TokioSleep {
pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
}

impl Future for TokioSleep {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

// Use HasSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html

impl Sleep for TokioSleep {
fn is_elapsed(&self) -> bool {
self.inner.is_elapsed()
}
fn deadline(&self) -> Instant {
self.inner.deadline().into()
}
fn reset(mut self: Pin<&mut Self>, deadline: Instant) {
self.inner.as_mut().reset(deadline.into())
}
}
10 changes: 10 additions & 0 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::common::{
Poll,
};
use crate::rt::Executor;
use crate::rt::Timer;

/// A Client to make outgoing HTTP requests.
///
Expand Down Expand Up @@ -1224,6 +1225,15 @@ impl Builder {
self
}

/// Provide a timer to time background `Connection` tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Self
where
M: Timer + Send + Sync + 'static,
{
self.conn_builder.timer(timer);
self
}

/// Combine the configuration of this builder with a connector to create a `Client`.
pub fn build<C, B>(&self, connector: C) -> Client<C, B>
where
Expand Down
22 changes: 17 additions & 5 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,10 +1342,12 @@ mod dispatch_impl {
use tokio::net::TcpStream;

use super::support;
use support::TokioTimer;
use hyper::body::HttpBody;
use hyper::client::connect::{Connected, Connection};
use hyper::Client;


#[test]
fn drop_body_before_eof_closes_connection() {
// https://github.com/hyperium/hyper/issues/1353
Expand Down Expand Up @@ -1485,7 +1487,7 @@ mod dispatch_impl {
support::runtime().block_on(client_drop_rx.into_future())
});

let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let client = Client::builder().timer(TokioTimer).build(DebugConnector::with_closes(closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1743,7 +1745,7 @@ mod dispatch_impl {
let connector = DebugConnector::new();
let connects = connector.connects.clone();

let client = Client::builder().build(connector);
let client = Client::builder().timer(TokioTimer).build(connector);

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
Expand Down Expand Up @@ -1872,7 +1874,7 @@ mod dispatch_impl {
let connector = DebugConnector::new();
let connects = connector.connects.clone();

let client = Client::builder().build(connector);
let client = Client::builder().timer(TokioTimer).build(connector);

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
Expand Down Expand Up @@ -1948,7 +1950,7 @@ mod dispatch_impl {
let connector = DebugConnector::new();
let connects = connector.connects.clone();

let client = Client::builder().build(connector);
let client = Client::builder().timer(TokioTimer).build(connector);

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
Expand Down Expand Up @@ -2171,6 +2173,8 @@ mod dispatch_impl {
use hyper::Response;
use tokio::net::TcpListener;

use support::TokioTimer;

let _ = pretty_env_logger::try_init();
let rt = support::runtime();
let listener = rt
Expand All @@ -2181,11 +2185,12 @@ mod dispatch_impl {
connector.alpn_h2 = true;
let connects = connector.connects.clone();

let client = Client::builder().build::<_, ::hyper::Body>(connector);
let client = Client::builder().timer(TokioTimer).build::<_, ::hyper::Body>(connector);

rt.spawn(async move {
let (socket, _addr) = listener.accept().await.expect("accept");
Http::new()
.with_timer(TokioTimer)
.http2_only(true)
.serve_connection(
socket,
Expand Down Expand Up @@ -2375,6 +2380,8 @@ mod conn {

use super::{concat, s, support, tcp_connect, FutureHyperExt};

use support::TokioTimer;

#[tokio::test]
async fn get() {
let _ = ::pretty_env_logger::try_init();
Expand Down Expand Up @@ -3004,6 +3011,7 @@ mod conn {

let io = tcp_connect(&addr).await.expect("tcp connect");
let (_client, conn) = conn::Builder::new()
.timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -3037,6 +3045,7 @@ mod conn {

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::Builder::new()
.timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -3073,6 +3082,7 @@ mod conn {

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::Builder::new()
.timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -3119,6 +3129,7 @@ mod conn {
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
hyper::server::conn::Http::new()
.with_timer(TokioTimer)
.http2_only(true)
.serve_connection(
sock,
Expand All @@ -3137,6 +3148,7 @@ mod conn {

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::Builder::new()
.timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand Down
6 changes: 6 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use h2::client::SendRequest;
use h2::{RecvStream, SendStream};
use http::header::{HeaderName, HeaderValue};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use support::TokioTimer;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream};
Expand Down Expand Up @@ -1376,6 +1377,7 @@ async fn header_read_timeout_slow_writes() {

let (socket, _) = listener.accept().await.unwrap();
let conn = Http::new()
.with_timer(TokioTimer)
.http1_header_read_timeout(Duration::from_secs(5))
.serve_connection(
socket,
Expand Down Expand Up @@ -1451,6 +1453,7 @@ async fn header_read_timeout_slow_writes_multiple_requests() {

let (socket, _) = listener.accept().await.unwrap();
let conn = Http::new()
.with_timer(TokioTimer)
.http1_header_read_timeout(Duration::from_secs(5))
.serve_connection(
socket,
Expand Down Expand Up @@ -2486,6 +2489,7 @@ async fn http2_keep_alive_detects_unresponsive_client() {
let (socket, _) = listener.accept().await.expect("accept");

let err = Http::new()
.with_timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand All @@ -2507,6 +2511,7 @@ async fn http2_keep_alive_with_responsive_client() {
let (socket, _) = listener.accept().await.expect("accept");

Http::new()
.with_timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2574,6 +2579,7 @@ async fn http2_keep_alive_count_server_pings() {
let (socket, _) = listener.accept().await.expect("accept");

Http::new()
.with_timer(TokioTimer)
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
Expand Down
3 changes: 3 additions & 0 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub use futures_util::{
pub use hyper::{HeaderMap, StatusCode};
pub use std::net::SocketAddr;

mod tokiort;
pub use tokiort::TokioTimer;

#[allow(unused_macros)]
macro_rules! t {
(
Expand Down
1 change: 1 addition & 0 deletions tests/support/tokiort.rs

0 comments on commit 1aca08f

Please sign in to comment.