Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client, server) make timers generic #2904

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ server = []
runtime = [
"tokio/net",
"tokio/rt",
"tokio/time",
]

# C-API support (currently unstable (no semver))
Expand Down
3 changes: 3 additions & 0 deletions benches/support/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

mod tokiort;
pub use tokiort::TokioTimer;
92 changes: 92 additions & 0 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#![allow(dead_code)]
//! Various runtimes for hyper
use std::{
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use futures_util::Future;
use hyper::rt::{Interval, Sleep, Timer};

/// An Executor that uses the tokio runtime.
pub struct TokioExecutor;

/// A Timer that uses the tokio runtime.

#[derive(Clone, Debug)]
pub struct TokioTimer;

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
let s = tokio::time::sleep(duration);
let hs = TokioSleep { inner: Box::pin(s) };
return Box::new(hs);
}

fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
return Box::new(TokioSleep {
inner: Box::pin(tokio::time::sleep_until(deadline.into())),
});
}

fn interval(&self, period: Duration) -> Box<dyn Interval> {
Box::new(TokioInterval {
inner: tokio::time::interval(period),
})
}
}

/// An Interval object that uses the tokio runtime.
pub struct TokioInterval {
inner: tokio::time::Interval,
}

impl Interval for TokioInterval {
fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<std::time::Instant> {
let raw = tokio::time::Interval::poll_tick(&mut self.inner, cx);
raw.map(|a| a.into_std())
}
}

struct TokioTimeout<T> {
inner: Pin<Box<tokio::time::Timeout<T>>>,
}

impl<T> Future for TokioTimeout<T> where T: Future {
type Output = Result<T::Output, tokio::time::error::Elapsed>;

fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(context)
}

}

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pub(crate) struct TokioSleep {
pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
}

impl Future for TokioSleep {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

// Use HasSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html

impl Sleep for TokioSleep {
fn is_elapsed(&self) -> bool {
self.inner.is_elapsed()
}
fn deadline(&self) -> Instant {
self.inner.deadline().into()
}
fn reset(mut self: Pin<&mut Self>, deadline: Instant) {
self.inner.as_mut().reset(deadline.into())
}
}
12 changes: 11 additions & 1 deletion src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::common::{
Poll,
};
use crate::rt::Executor;
use crate::rt::Timer;

/// A Client to make outgoing HTTP requests.
///
Expand Down Expand Up @@ -1224,6 +1225,15 @@ impl Builder {
self
}

/// Provide a timer to time background `Connection` tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Self
where
M: Timer + Send + Sync + 'static,
{
self.conn_builder.timer(timer);
self
}

/// Combine the configuration of this builder with a connector to create a `Client`.
pub fn build<C, B>(&self, connector: C) -> Client<C, B>
where
Expand All @@ -1235,7 +1245,7 @@ impl Builder {
config: self.client_config,
conn_builder: self.conn_builder.clone(),
connector,
pool: Pool::new(self.pool_config, &self.conn_builder.exec),
pool: Pool::new(self.pool_config, &self.conn_builder.exec, &self.conn_builder.timer),
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use tokio::io::{AsyncRead, AsyncWrite};

use crate::Body;
use crate::body::HttpBody;
use crate::common::tim::Tim;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::upgrade::Upgraded;
use crate::proto;
use crate::rt::Executor;
use crate::rt::{Executor, Timer};
use super::super::dispatch;

type Dispatcher<T, B> =
Expand Down Expand Up @@ -46,6 +47,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Tim,
h09_responses: bool,
h1_parser_config: ParserConfig,
h1_writev: Option<bool>,
Expand Down Expand Up @@ -247,6 +249,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: None,
h09_responses: false,
h1_writev: None,
h1_read_buf_exact_size: None,
Expand All @@ -268,6 +271,15 @@ impl Builder {
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Some(Arc::new(timer));
self
}

/// Set whether HTTP/0.9 responses should be tolerated.
///
/// Default is false.
Expand Down Expand Up @@ -462,7 +474,7 @@ impl Builder {
tracing::trace!("client handshake HTTP/1");

let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
let mut conn = proto::Conn::new(io, opts.timer);
conn.set_h1_parser_config(opts.h1_parser_config);
if let Some(writev) = opts.h1_writev {
if writev {
Expand Down
17 changes: 15 additions & 2 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use tokio::io::{AsyncRead, AsyncWrite};

use crate::Body;
use crate::body::HttpBody;
use crate::common::tim::Tim;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::proto;
use crate::rt::Executor;
use crate::rt::{Executor, Timer};
use super::super::dispatch;

/// The sender side of an established connection.
Expand All @@ -44,6 +45,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Tim,
h2_builder: proto::h2::client::Config,
}

Expand Down Expand Up @@ -228,6 +230,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: None,
h2_builder: Default::default(),
}
}
Expand All @@ -239,6 +242,16 @@ impl Builder {
{
self.exec = Exec::Executor(Arc::new(exec));
self

}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Some(Arc::new(timer));
self
}

/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
Expand Down Expand Up @@ -420,7 +433,7 @@ impl Builder {

let (tx, rx) = dispatch::channel();
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer)
.await?;
Ok((
SendRequest { dispatch: tx },
Expand Down
17 changes: 14 additions & 3 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use tower_service::Service;
use tracing::{debug, trace};

use super::dispatch;
use crate::body::HttpBody;
use crate::{body::HttpBody, common::tim::Tim, rt::Timer};
#[cfg(not(all(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{
Expand Down Expand Up @@ -156,6 +156,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Tim,
h09_responses: bool,
h1_parser_config: ParserConfig,
h1_writev: Option<bool>,
Expand Down Expand Up @@ -559,6 +560,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: None,
h09_responses: false,
h1_writev: None,
h1_read_buf_exact_size: None,
Expand Down Expand Up @@ -588,6 +590,15 @@ impl Builder {
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Some(Arc::new(timer));
self
}

/// Set whether HTTP/0.9 responses should be tolerated.
///
/// Default is false.
Expand Down Expand Up @@ -960,7 +971,7 @@ impl Builder {
let proto = match opts.version {
#[cfg(feature = "http1")]
Proto::Http1 => {
let mut conn = proto::Conn::new(io);
let mut conn = proto::Conn::new(io, opts.timer);
conn.set_h1_parser_config(opts.h1_parser_config);
if let Some(writev) = opts.h1_writev {
if writev {
Expand Down Expand Up @@ -999,7 +1010,7 @@ impl Builder {
#[cfg(feature = "http2")]
Proto::Http2 => {
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone(), opts.timer.clone())
.await?;
ProtoClient::H2 { h2 }
}
Expand Down
Loading