Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rt): add Timer trait #2974

Merged
merged 1 commit into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ server = []
runtime = [
"tokio/net",
"tokio/rt",
"tokio/time",
]

# C-API support (currently unstable (no semver))
Expand Down
3 changes: 3 additions & 0 deletions benches/support/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

mod tokiort;
pub use tokiort::TokioTimer;
66 changes: 66 additions & 0 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#![allow(dead_code)]
//! Various runtimes for hyper
use std::{
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use futures_util::Future;
use hyper::rt::{Sleep, Timer};

/// An Executor that uses the tokio runtime.
pub struct TokioExecutor;

/// A Timer that uses the tokio runtime.

#[derive(Clone, Debug)]
pub struct TokioTimer;

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
let s = tokio::time::sleep(duration);
let hs = TokioSleep { inner: Box::pin(s) };
return Box::new(hs);
}

fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
return Box::new(TokioSleep {
inner: Box::pin(tokio::time::sleep_until(deadline.into())),
});
}
}

struct TokioTimeout<T> {
inner: Pin<Box<tokio::time::Timeout<T>>>,
}

impl<T> Future for TokioTimeout<T>
where
T: Future,
{
type Output = Result<T::Output, tokio::time::error::Elapsed>;

fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(context)
}
}

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pub(crate) struct TokioSleep {
pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
}

impl Future for TokioSleep {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

// Use HasSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html

impl Sleep for TokioSleep {}
14 changes: 8 additions & 6 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use tokio::io::{AsyncRead, AsyncWrite};

use crate::Recv;
use crate::body::Body;
use super::super::dispatch;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::upgrade::Upgraded;
use crate::proto;
use crate::rt::Executor;
use super::super::dispatch;
use crate::rt::{Executor};
use crate::upgrade::Upgraded;

type Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
Expand Down Expand Up @@ -120,7 +120,10 @@ where
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
/// be serialized as-is.
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Recv>>> {
pub fn send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<Recv>>> {
let sent = self.dispatch.send(req);

async move {
Expand All @@ -130,7 +133,7 @@ where
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
}
},
Err(_req) => {
tracing::debug!("connection was not ready");

Expand Down Expand Up @@ -476,4 +479,3 @@ impl Builder {
}
}
}

30 changes: 22 additions & 8 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ use tokio::io::{AsyncRead, AsyncWrite};

use crate::Recv;
use crate::body::Body;
use super::super::dispatch;
use crate::common::time::Time;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::proto;
use crate::rt::Executor;
use super::super::dispatch;
use crate::rt::{Executor, Timer};

/// The sender side of an established connection.
pub struct SendRequest<B> {
Expand All @@ -44,6 +45,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Time,
h2_builder: proto::h2::client::Config,
}

Expand Down Expand Up @@ -114,7 +116,10 @@ where
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
/// be serialized as-is.
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Recv>>> {
pub fn send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<Recv>>> {
let sent = self.dispatch.send(req);

async move {
Expand All @@ -124,7 +129,7 @@ where
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
}
},
Err(_req) => {
tracing::debug!("connection was not ready");

Expand Down Expand Up @@ -207,6 +212,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: Time::Empty,
h2_builder: Default::default(),
}
}
Expand All @@ -220,6 +226,15 @@ impl Builder {
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Time::Timer(Arc::new(timer));
self
}

/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
/// stream-level flow control.
///
Expand Down Expand Up @@ -398,14 +413,13 @@ impl Builder {
tracing::trace!("client handshake HTTP/1");

let (tx, rx) = dispatch::channel();
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
.await?;
let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer)
.await?;
Ok((
SendRequest { dispatch: tx.unbound() },
//SendRequest { dispatch: tx },
Connection { inner: (PhantomData, h2) },
))
}
}
}

23 changes: 20 additions & 3 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use crate::rt::Executor;
#[cfg(feature = "http1")]
use crate::upgrade::Upgraded;
use crate::{Recv, Request, Response};
use crate::{common::time::Time, rt::Timer};

#[cfg(feature = "http1")]
pub mod http1;
Expand Down Expand Up @@ -161,6 +162,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Time,
h09_responses: bool,
h1_parser_config: ParserConfig,
h1_writev: Option<bool>,
Expand Down Expand Up @@ -418,6 +420,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: Time::Empty,
h09_responses: false,
h1_writev: None,
h1_read_buf_exact_size: None,
Expand Down Expand Up @@ -447,6 +450,15 @@ impl Builder {
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Time::Timer(Arc::new(timer));
self
}

/// Set whether HTTP/0.9 responses should be tolerated.
///
/// Default is false.
Expand Down Expand Up @@ -857,9 +869,14 @@ impl Builder {
}
#[cfg(feature = "http2")]
Proto::Http2 => {
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
.await?;
let h2 = proto::h2::client::handshake(
io,
rx,
&opts.h2_builder,
opts.exec.clone(),
opts.timer.clone(),
)
.await?;
ProtoClient::H2 { h2 }
}
};
Expand Down
9 changes: 9 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) mod exec;
pub(crate) mod io;
mod never;
pub(crate) mod task;
#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
pub(crate) mod time;
pub(crate) mod watch;

#[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))]
Expand All @@ -26,3 +28,10 @@ cfg_proto! {
pub(crate) use std::marker::Unpin;
}
pub(crate) use std::{future::Future, pin::Pin};

pub(crate) fn into_pin<T: ?Sized>(boxed: Box<T>) -> Pin<Box<T>> {
// It's not possible to move or replace the insides of a `Pin<Box<T>>`
// when `T: !Unpin`, so it's safe to pin it directly without any
// additional requirements.
unsafe { Pin::new_unchecked(boxed) }
}
87 changes: 87 additions & 0 deletions src/common/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{fmt, sync::Arc};
#[cfg(all(feature = "server", feature = "runtime"))]
use std::{
pin::Pin,
time::{Duration, Instant},
};

#[cfg(all(feature = "server", feature = "runtime"))]
use crate::rt::Sleep;
use crate::rt::Timer;

/// A user-provided timer to time background tasks.
#[derive(Clone)]
pub(crate) enum Time {
Timer(Arc<dyn Timer + Send + Sync>),
Empty,
}

impl fmt::Debug for Time {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Time").finish()
}
}

/*
pub(crate) fn timeout<F>(tim: Tim, future: F, duration: Duration) -> HyperTimeout<F> {
HyperTimeout { sleep: tim.sleep(duration), future: future }
}

pin_project_lite::pin_project! {
pub(crate) struct HyperTimeout<F> {
sleep: Box<dyn Sleep>,
#[pin]
future: F
}
}

pub(crate) struct Timeout;

impl<F> Future for HyperTimeout<F> where F: Future {

type Output = Result<F::Output, Timeout>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>{
let mut this = self.project();
if let Poll::Ready(v) = this.future.poll(ctx) {
return Poll::Ready(Ok(v));
}

if let Poll::Ready(_) = Pin::new(&mut this.sleep).poll(ctx) {
return Poll::Ready(Err(Timeout));
}

return Poll::Pending;
}
}
*/

#[cfg(all(feature = "server", feature = "runtime"))]
impl Time {
pub(crate) fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
match *self {
Time::Empty => {
panic!("You must supply a timer.")
}
Time::Timer(ref t) => t.sleep(duration),
}
}

pub(crate) fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
match *self {
Time::Empty => {
panic!("You must supply a timer.")
}
Time::Timer(ref t) => t.sleep_until(deadline),
}
}

pub(crate) fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
match *self {
Time::Empty => {
panic!("You must supply a timer.")
}
Time::Timer(ref t) => t.reset(sleep, new_deadline),
}
}
}
Loading