Skip to content

Commit

Permalink
impl HttpBody and Stream for Body
Browse files Browse the repository at this point in the history
  • Loading branch information
shouya committed Dec 8, 2023
1 parent 4756241 commit 508f0e9
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 27 deletions.
19 changes: 6 additions & 13 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ pub struct Body {
inner: Inner,
}

// The `Stream` trait isn't stable, so the impl isn't public.
pub(crate) struct ImplStream(Body);

enum Inner {
Reusable(Bytes),
Streaming {
Expand Down Expand Up @@ -153,10 +150,6 @@ impl Body {
}
}

pub(crate) fn into_stream(self) -> ImplStream {
ImplStream(self)
}

#[cfg(feature = "multipart")]
pub(crate) fn content_length(&self) -> Option<u64> {
match self.inner {
Expand Down Expand Up @@ -228,17 +221,17 @@ impl fmt::Debug for Body {
}
}

// ===== impl ImplStream =====
// ===== impl Body =====

impl HttpBody for ImplStream {
impl HttpBody for Body {
type Data = Bytes;
type Error = crate::Error;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let opt_try_chunk = match self.0.inner {
let opt_try_chunk = match self.inner {
Inner::Streaming {
ref mut body,
ref mut timeout,
Expand Down Expand Up @@ -271,14 +264,14 @@ impl HttpBody for ImplStream {
}

fn is_end_stream(&self) -> bool {
match self.0.inner {
match self.inner {
Inner::Streaming { ref body, .. } => body.is_end_stream(),
Inner::Reusable(ref bytes) => bytes.is_empty(),
}
}

fn size_hint(&self) -> http_body::SizeHint {
match self.0.inner {
match self.inner {
Inner::Streaming { ref body, .. } => body.size_hint(),
Inner::Reusable(ref bytes) => {
let mut hint = http_body::SizeHint::default();
Expand All @@ -289,7 +282,7 @@ impl HttpBody for ImplStream {
}
}

impl Stream for ImplStream {
impl Stream for Body {
type Item = Result<Bytes, crate::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Expand Down
10 changes: 4 additions & 6 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ impl ClientBuilder {
}
}

type HyperClient = hyper::Client<Connector, super::body::ImplStream>;
type HyperClient = hyper::Client<Connector, super::body::Body>;

impl Default for Client {
fn default() -> Self {
Expand Down Expand Up @@ -1822,9 +1822,7 @@ impl Client {
ResponseFuture::H3(self.inner.h3_client.as_ref().unwrap().request(req))
}
_ => {
let mut req = builder
.body(body.into_stream())
.expect("valid request parts");
let mut req = builder.body(body).expect("valid request parts");
*req.headers_mut() = headers.clone();
ResponseFuture::Default(self.inner.hyper.request(req))
}
Expand Down Expand Up @@ -2188,7 +2186,7 @@ impl PendingRequest {
let mut req = hyper::Request::builder()
.method(self.method.clone())
.uri(uri)
.body(body.into_stream())
.body(body)
.expect("valid request parts");
*req.headers_mut() = self.headers.clone();
ResponseFuture::Default(self.client.hyper.request(req))
Expand Down Expand Up @@ -2413,7 +2411,7 @@ impl Future for PendingRequest {
let mut req = hyper::Request::builder()
.method(self.method.clone())
.uri(uri.clone())
.body(body.into_stream())
.body(body)
.expect("valid request parts");
*req.headers_mut() = headers.clone();
std::mem::swap(self.as_mut().headers(), &mut headers);
Expand Down
13 changes: 5 additions & 8 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>;

enum Inner {
/// A `PlainText` decoder just returns the response content as is.
PlainText(super::body::ImplStream),
PlainText(super::body::Body),

/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
#[cfg(feature = "gzip")]
Expand All @@ -72,7 +72,7 @@ enum Inner {
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
struct Pending(PeekableIoStream, DecoderType);

struct IoStream(super::body::ImplStream);
struct IoStream(super::body::Body);

enum DecoderType {
#[cfg(feature = "gzip")]
Expand Down Expand Up @@ -102,7 +102,7 @@ impl Decoder {
/// This decoder will emit the underlying chunks as-is.
fn plain_text(body: Body) -> Decoder {
Decoder {
inner: Inner::PlainText(body.into_stream()),
inner: Inner::PlainText(body),
}
}

Expand Down Expand Up @@ -303,13 +303,10 @@ impl Future for Pending {
.expect("just peeked Some")
.unwrap_err()));
}
None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))),
None => return Poll::Ready(Ok(Inner::PlainText(Body::empty()))),
};

let _body = std::mem::replace(
&mut self.0,
IoStream(Body::empty().into_stream()).peekable(),
);
let _body = std::mem::replace(&mut self.0, IoStream(Body::empty()).peekable());

match self.1 {
#[cfg(feature = "brotli")]
Expand Down

0 comments on commit 508f0e9

Please sign in to comment.