Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): remove common::Exec::Default #3135

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,10 @@ impl Opts {
let mut client = rt.block_on(async {
if self.http2 {
let io = tokio::net::TcpStream::connect(&addr).await.unwrap();
let (tx, conn) = hyper::client::conn::http2::Builder::new()
let (tx, conn) = hyper::client::conn::http2::Builder::new(support::TokioExecutor)
.initial_stream_window_size(self.http2_stream_window)
.initial_connection_window_size(self.http2_conn_window)
.adaptive_window(self.http2_adaptive_window)
.executor(support::TokioExecutor)
.handshake(io)
.await
.unwrap();
Expand Down
22 changes: 9 additions & 13 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ pub struct Builder {
///
/// This is a shortcut for `Builder::new().handshake(io)`.
/// See [`client::conn`](crate::client::conn) for more.
pub async fn handshake<T, B>(
pub async fn handshake<E, T, B>(
exec: E,
io: T,
) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Builder::new().handshake(io).await
Builder::new(exec).handshake(io).await
}

// ===== impl SendRequest
Expand Down Expand Up @@ -244,23 +246,17 @@ where
impl Builder {
/// Creates a new connection builder.
#[inline]
pub fn new() -> Builder {
pub fn new<E>(exec: E) -> Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
Builder {
exec: Exec::Default,
exec: Exec::new(exec),
timer: Time::Empty,
h2_builder: Default::default(),
}
}

/// Provide an executor to execute background HTTP2 tasks.
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
self.exec = Exec::Executor(Arc::new(exec));
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
Expand Down
25 changes: 10 additions & 15 deletions src/common/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,25 @@ pub trait ConnStreamExec<F, B: Body>: Clone {

pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

// Either the user provides an executor for background tasks, or we panic.
// TODO: with the `runtime`feature, `Exec::Default` used `tokio::spawn`. With the
// removal of the opt-in default runtime, this should be refactored.
// Executor must be provided by the user
#[derive(Clone)]
pub(crate) enum Exec {
Default,
Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
}
pub(crate) struct Exec(Arc<dyn Executor<BoxSendFuture> + Send + Sync>);

// ===== impl Exec =====

impl Exec {
pub(crate) fn new<E>(exec: E) -> Self
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
Self(Arc::new(exec))
}

pub(crate) fn execute<F>(&self, fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
match *self {
Exec::Default => {
panic!("executor must be set");
}
Exec::Executor(ref e) => {
e.execute(Box::pin(fut));
}
}
self.0.execute(Box::pin(fut))
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/ffi/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ ffi_fn! {
#[cfg(feature = "http2")]
{
if options.http2 {
return conn::http2::Builder::new()
.executor(options.exec.clone())
return conn::http2::Builder::new(options.exec.clone())
.handshake::<_, crate::body::Incoming>(io)
.await
.map(|(tx, conn)| {
Expand Down
21 changes: 7 additions & 14 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1922,8 +1922,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
.expect("http handshake");
Expand Down Expand Up @@ -1979,8 +1978,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (_client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (_client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2008,8 +2006,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2040,8 +2037,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2100,8 +2096,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2156,8 +2151,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
.expect("http handshake");
Expand Down Expand Up @@ -2207,8 +2201,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake::<_, Empty<Bytes>>(io)
.await
.expect("http handshake");
Expand Down
6 changes: 2 additions & 4 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2389,8 +2389,7 @@ async fn http2_keep_alive_with_responsive_client() {
});

let tcp = connect_async(addr).await;
let (mut client, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(tcp)
.await
.expect("http handshake");
Expand Down Expand Up @@ -3017,8 +3016,7 @@ impl TestClient {
.unwrap();

if self.http2_only {
let (mut sender, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(stream)
.await
.unwrap();
Expand Down
13 changes: 6 additions & 7 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ async fn async_test(cfg: __TestConfig) {
let stream = TcpStream::connect(addr).await.unwrap();

let res = if http2_only {
let (mut sender, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(stream)
.await
.unwrap();
Expand Down Expand Up @@ -526,11 +525,11 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>)
.unwrap();

let resp = if http2_only {
let (mut sender, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
.handshake(stream)
.await
.unwrap();
let (mut sender, conn) =
hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(stream)
.await
.unwrap();

tokio::task::spawn(async move {
if let Err(err) = conn.await {
Expand Down