diff --git a/examples/client.rs b/examples/client.rs index bafb6ba7fe..b5df77e25d 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -40,7 +40,7 @@ fn main() { println!("Response: {}", res.status()); println!("Headers: {:#?}", res.headers()); - res.into_parts().1.for_each(|chunk| { + res.into_parts().1.into_stream().for_each(|chunk| { io::stdout().write_all(&chunk).map_err(From::from) }) }).map(|_| { diff --git a/examples/params.rs b/examples/params.rs index ff51b91e3c..c632daa88e 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -30,7 +30,7 @@ impl Service for ParamExample { Box::new(futures::future::ok(Response::new(INDEX.into()))) }, (&Method::POST, "/post") => { - Box::new(req.into_parts().1.concat2().map(|b| { + Box::new(req.into_parts().1.into_stream().concat2().map(|b| { // Parse the request body. form_urlencoded::parse // always succeeds, but in general parsing may // fail (for example, an invalid post of json), so diff --git a/examples/send_file.rs b/examples/send_file.rs index 71351ef975..7e507fe33f 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -3,15 +3,15 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; -use futures::{Future, Sink}; +use futures::{Future/*, Sink*/}; use futures::sync::oneshot; -use hyper::{Body, Chunk, Method, Request, Response, StatusCode}; +use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode}; use hyper::error::Error; use hyper::server::{Http, Service}; use std::fs::File; -use std::io::{self, copy, Read}; +use std::io::{self, copy/*, Read*/}; use std::thread; static NOTFOUND: &[u8] = b"Not Found"; @@ -80,7 +80,7 @@ impl Service for ResponseExamples { // a small test file. let (tx, rx) = oneshot::channel(); thread::spawn(move || { - let mut file = match File::open(INDEX) { + let _file = match File::open(INDEX) { Ok(f) => f, Err(_) => { tx.send(Response::builder() @@ -91,9 +91,10 @@ impl Service for ResponseExamples { return; }, }; - let (mut tx_body, rx_body) = Body::pair(); + let (_tx_body, rx_body) = Body::channel(); let res = Response::new(rx_body.into()); tx.send(res).expect("Send error on successful file read"); + /* TODO: fix once we have futures 0.2 Sink working let mut buf = [0u8; 16]; loop { match file.read(&mut buf) { @@ -104,7 +105,7 @@ impl Service for ResponseExamples { break; } else { let chunk: Chunk = buf[0..n].to_vec().into(); - match tx_body.send(Ok(chunk)).wait() { + match tx_body.send_data(chunk).wait() { Ok(t) => { tx_body = t; }, Err(_) => { break; } }; @@ -113,6 +114,7 @@ impl Service for ResponseExamples { Err(_) => { break; } } } + */ }); Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e)))) diff --git a/examples/server.rs b/examples/server.rs index 7c7b3c510b..4c8cba4229 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -24,7 +24,7 @@ impl Service for Echo { Response::new(INDEX.into()) }, (&Method::POST, "/echo") => { - Response::new(req.into_parts().1) + Response::new(req.into_body()) }, _ => { let mut res = Response::new(Body::empty()); diff --git a/examples/web_api.rs b/examples/web_api.rs index d7fdb46e7b..566dad4c49 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -7,7 +7,6 @@ extern crate tokio_core; use futures::{Future, Stream}; use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode}; -use hyper::error::Error; use hyper::server::{Http, Service}; #[allow(unused)] @@ -18,20 +17,18 @@ static URL: &str = "http://127.0.0.1:1337/web_api"; static INDEX: &[u8] = b"test.html"; static LOWERCASE: &[u8] = b"i am a lower case string"; -pub type ResponseStream = Box>; - struct ResponseExamples(tokio_core::reactor::Handle); impl Service for ResponseExamples { type Request = Request; - type Response = Response; + type Response = Response; type Error = hyper::Error; type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { match (req.method(), req.uri().path()) { (&Method::GET, "/") | (&Method::GET, "/index.html") => { - let body: ResponseStream = Box::new(Body::from(INDEX)); + let body = Body::from(INDEX); Box::new(futures::future::ok(Response::new(body))) }, (&Method::GET, "/test.html") => { @@ -45,7 +42,7 @@ impl Service for ResponseExamples { let web_res_future = client.request(req); Box::new(web_res_future.map(|web_res| { - let body: ResponseStream = Box::new(web_res.into_parts().1.map(|b| { + let body = Body::wrap_stream(web_res.into_body().into_stream().map(|b| { Chunk::from(format!("before: '{:?}'
after: '{:?}'", std::str::from_utf8(LOWERCASE).unwrap(), std::str::from_utf8(&b).unwrap())) @@ -55,7 +52,7 @@ impl Service for ResponseExamples { }, (&Method::POST, "/web_api") => { // A web api to run against. Simple upcasing of the body. - let body: ResponseStream = Box::new(req.into_parts().1.map(|chunk| { + let body = Body::wrap_stream(req.into_body().into_stream().map(|chunk| { let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase()) .collect::>(); Chunk::from(upper) @@ -63,7 +60,7 @@ impl Service for ResponseExamples { Box::new(futures::future::ok(Response::new(body))) }, _ => { - let body: ResponseStream = Box::new(Body::from(NOTFOUND)); + let body = Body::from(NOTFOUND); Box::new(futures::future::ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(body) diff --git a/src/client/conn.rs b/src/client/conn.rs index 91fdeeaa36..2a01d0135b 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -11,11 +11,12 @@ use std::fmt; use std::marker::PhantomData; use bytes::Bytes; -use futures::{Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll}; use futures::future::{self, Either}; use tokio_io::{AsyncRead, AsyncWrite}; use proto; +use proto::body::Entity; use super::dispatch; use {Body, Request, Response, StatusCode}; @@ -44,14 +45,13 @@ pub struct SendRequest { pub struct Connection where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { inner: proto::dispatch::Dispatcher< proto::dispatch::Client, B, T, - B::Item, + B::Data, proto::ClientUpgradeTransaction, >, } @@ -134,8 +134,7 @@ impl SendRequest impl SendRequest where - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Sends a `Request` on the associated connection. /// @@ -152,7 +151,7 @@ where /// the `Host` header based on it. You must add a `Host` header yourself /// before calling this method. /// - Since absolute-form `Uri`s are not required, if received, they will - /// be serialized as-is, irregardless of calling `Request::set_proxy`. + /// be serialized as-is. /// /// # Example /// @@ -185,19 +184,6 @@ where /// # fn main() {} /// ``` pub fn send_request(&mut self, req: Request) -> ResponseFuture { - /* TODO? - // The Connection API does less things automatically than the Client - // API does. For instance, right here, we always assume set_proxy, so - // that if an absolute-form URI is provided, it is serialized as-is. - // - // Part of the reason for this is to prepare for the change to `http` - // types, where there is no more set_proxy. - // - // It's important that this method isn't called directly from the - // `Client`, so that `set_proxy` there is still respected. - req.set_proxy(true); - */ - let inner = match self.dispatch.send(req) { Ok(rx) => { Either::A(rx.then(move |res| { @@ -269,8 +255,7 @@ impl fmt::Debug for SendRequest { impl Connection where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Return the inner IO object, and additional information. pub fn into_parts(self) -> Parts { @@ -297,8 +282,7 @@ where impl Future for Connection where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (); type Error = ::Error; @@ -311,8 +295,7 @@ where impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite + fmt::Debug, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") @@ -341,8 +324,7 @@ impl Builder { pub fn handshake(&self, io: T) -> Handshake where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { Handshake { inner: HandshakeInner { @@ -356,8 +338,7 @@ impl Builder { pub(super) fn handshake_no_upgrades(&self, io: T) -> HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { HandshakeNoUpgrades { inner: HandshakeInner { @@ -374,8 +355,7 @@ impl Builder { impl Future for Handshake where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (SendRequest, Connection); type Error = ::Error; @@ -400,14 +380,13 @@ impl fmt::Debug for Handshake { impl Future for HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (SendRequest, proto::dispatch::Dispatcher< proto::dispatch::Client, B, T, - B::Item, + B::Data, proto::ClientTransaction, >); type Error = ::Error; @@ -420,8 +399,7 @@ where impl Future for HandshakeInner where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, R: proto::Http1Transaction< Incoming=StatusCode, Outgoing=proto::RequestLine, @@ -431,7 +409,7 @@ where proto::dispatch::Client, B, T, - B::Item, + B::Data, R, >); type Error = ::Error; @@ -485,16 +463,16 @@ impl AssertSendSync for SendRequest {} impl AssertSend for Connection where T: AsyncRead + AsyncWrite, - B: Stream, - B::Item: AsRef<[u8]> + Send, + B: Entity + 'static, + B::Data: Send + 'static, {} #[doc(hidden)] impl AssertSendSync for Connection where T: AsyncRead + AsyncWrite, - B: Stream, - B::Item: AsRef<[u8]> + Send + Sync, + B: Entity + 'static, + B::Data: Send + Sync + 'static, {} #[doc(hidden)] diff --git a/src/client/mod.rs b/src/client/mod.rs index 51eecd8256..57880eea53 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,14 +7,15 @@ use std::rc::Rc; use std::sync::Arc; use std::time::Duration; -use futures::{Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll}; use futures::future::{self, Executor}; use http::{Method, Request, Response, Uri, Version}; use http::header::{Entry, HeaderValue, HOST}; use tokio::reactor::Handle; pub use tokio_service::Service; -use proto::{self, Body}; +use proto::body::{Body, Entity}; +use proto; use self::pool::Pool; pub use self::connect::{HttpConnector, Connect}; @@ -101,8 +102,7 @@ impl Client { impl Client where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Send a constructed Request using this Client. #[inline] @@ -160,13 +160,13 @@ where C: Connect, let client = self.clone(); //TODO: let is_proxy = req.is_proxy(); - //let uri = req.uri().clone(); + let uri = req.uri().clone(); let fut = RetryableSendRequest { client: client, future: self.send_request(req, &domain), domain: domain, //is_proxy: is_proxy, - //uri: uri, + uri: uri, }; FutureResponse(Box::new(fut)) } @@ -272,8 +272,7 @@ where C: Connect, impl Service for Client where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Request = Request; type Response = Response; @@ -329,14 +328,13 @@ struct RetryableSendRequest { domain: String, future: Box, Error=ClientError>>, //is_proxy: bool, - //uri: Uri, + uri: Uri, } impl Future for RetryableSendRequest where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = Response; type Error = ::Error; @@ -349,7 +347,7 @@ where Err(ClientError::Normal(err)) => return Err(err), Err(ClientError::Canceled { connection_reused, - req, + mut req, reason, }) => { if !self.client.retry_canceled_requests || !connection_reused { @@ -359,6 +357,7 @@ where } trace!("unstarted request canceled, trying again (reason={:?})", reason); + *req.uri_mut() = self.uri.clone(); self.future = self.client.send_request(req, &self.domain); } } @@ -526,8 +525,7 @@ impl Config { impl Config where C: Connect, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { /// Construct the Client with this configuration. #[inline] @@ -548,8 +546,7 @@ where C: Connect, } impl Config -where B: Stream, - B::Item: AsRef<[u8]>, +where B: Entity, { /// Construct the Client with this configuration. #[inline] diff --git a/src/headers.rs b/src/headers.rs index 1e98ee4ce2..1cad1ba9e0 100644 --- a/src/headers.rs +++ b/src/headers.rs @@ -1,7 +1,14 @@ +use std::fmt::Write; + +use bytes::BytesMut; use http::HeaderMap; -use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, HeaderValue, TRANSFER_ENCODING}; +use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, TRANSFER_ENCODING}; +use http::header::{HeaderValue, OccupiedEntry, ValueIter}; use unicase; +/// Maximum number of bytes needed to serialize a u64 into ASCII decimal. +const MAX_DECIMAL_U64_BYTES: usize = 20; + pub fn connection_keep_alive(headers: &HeaderMap) -> bool { for line in headers.get_all(CONNECTION) { if let Ok(s) = line.to_str() { @@ -31,13 +38,15 @@ pub fn connection_close(headers: &HeaderMap) -> bool { } pub fn content_length_parse(headers: &HeaderMap) -> Option { + content_length_parse_all(headers.get_all(CONTENT_LENGTH).into_iter()) +} + +pub fn content_length_parse_all(values: ValueIter) -> Option { // If multiple Content-Length headers were sent, everything can still // be alright if they all contain the same value, and all parse // correctly. If not, then it's an error. - let values = headers.get_all(CONTENT_LENGTH); let folded = values - .into_iter() .fold(None, |prev, line| match prev { Some(Ok(prev)) => { Some(line @@ -66,12 +75,25 @@ pub fn content_length_zero(headers: &mut HeaderMap) { headers.insert(CONTENT_LENGTH, HeaderValue::from_static("0")); } +pub fn content_length_value(len: u64) -> HeaderValue { + let mut len_buf = BytesMut::with_capacity(MAX_DECIMAL_U64_BYTES); + write!(len_buf, "{}", len) + .expect("BytesMut can hold a decimal u64"); + // safe because u64 Display is ascii numerals + unsafe { + HeaderValue::from_shared_unchecked(len_buf.freeze()) + } +} + pub fn expect_continue(headers: &HeaderMap) -> bool { Some(&b"100-continue"[..]) == headers.get(EXPECT).map(|v| v.as_bytes()) } pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool { - let mut encodings = headers.get_all(TRANSFER_ENCODING).into_iter(); + is_chunked(headers.get_all(TRANSFER_ENCODING).into_iter()) +} + +pub fn is_chunked(mut encodings: ValueIter) -> bool { // chunked must always be the last encoding, according to spec if let Some(line) = encodings.next_back() { if let Ok(s) = line.to_str() { @@ -83,3 +105,33 @@ pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool { false } + +pub fn add_chunked(mut entry: OccupiedEntry) { + const CHUNKED: &'static str = "chunked"; + + if let Some(line) = entry.iter_mut().next_back() { + // + 2 for ", " + let new_cap = line.as_bytes().len() + CHUNKED.len() + 2; + let mut buf = BytesMut::with_capacity(new_cap); + buf.copy_from_slice(line.as_bytes()); + buf.copy_from_slice(b", "); + buf.copy_from_slice(CHUNKED.as_bytes()); + + *line = HeaderValue::from_shared(buf.freeze()) + .expect("original header value plus ascii is valid"); + return; + } + + entry.insert(HeaderValue::from_static(CHUNKED)); +} + +#[cfg(test)] +mod tests { + #[test] + fn assert_max_decimal_u64_bytes() { + assert_eq!( + super::MAX_DECIMAL_U64_BYTES, + ::std::u64::MAX.to_string().len() + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 18d13c2bfc..66a0fa9a72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,7 @@ pub use http::{ pub use client::Client; pub use error::{Result, Error}; -pub use proto::{Body, Chunk}; +pub use proto::{body, Body, Chunk}; pub use server::Server; mod common; diff --git a/src/proto/body.rs b/src/proto/body.rs index 28acd51f67..23eea59e7b 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -1,50 +1,236 @@ +//! Streaming bodies for Requests and Responses +use std::borrow::Cow; use std::fmt; use bytes::Bytes; -use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; +use futures::{Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; -use std::borrow::Cow; +use http::HeaderMap; use super::Chunk; -pub type BodySender = mpsc::Sender>; +type BodySender = mpsc::Sender>; + +/// This trait represents a streaming body of a `Request` or `Response`. +pub trait Entity { + /// A buffer of bytes representing a single chunk of a body. + type Data: AsRef<[u8]>; + + /// The error type of this stream. + //TODO: add bounds Into<::error::User> (or whatever it is called) + type Error; + + /// Poll for a `Data` buffer. + /// + /// Similar to `Stream::poll_next`, this yields `Some(Data)` until + /// the body ends, when it yields `None`. + fn poll_data(&mut self) -> Poll, Self::Error>; + + /// Poll for an optional **single** `HeaderMap` of trailers. + /// + /// This should **only** be called after `poll_data` has ended. + /// + /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2. + fn poll_trailers(&mut self) -> Poll, Self::Error> { + Ok(Async::Ready(None)) + } + + /// A hint that the `Body` is complete, and doesn't need to be polled more. + /// + /// This can be useful to determine if the there is any body or trailers + /// without having to poll. An empty `Body` could return `true` and hyper + /// would be able to know that only the headers need to be sent. Or, it can + /// also be checked after each `poll_data` call, to allow hyper to try to + /// end the underlying stream with the last chunk, instead of needing to + /// send an extra `DATA` frame just to mark the stream as finished. + /// + /// As a hint, it is used to try to optimize, and thus is OK for a default + /// implementation to return `false`. + fn is_end_stream(&self) -> bool { + false + } + + /// Return a length of the total bytes that will be streamed, if known. + /// + /// If an exact size of bytes is known, this would allow hyper to send a + /// `Content-Length` header automatically, not needing to fall back to + /// `Transfer-Encoding: chunked`. + /// + /// This does not need to be kept updated after polls, it will only be + /// called once to create the headers. + fn content_length(&self) -> Option { + None + } +} + +impl Entity for Box { + type Data = E::Data; + type Error = E::Error; + + fn poll_data(&mut self) -> Poll, Self::Error> { + (**self).poll_data() + } + + fn poll_trailers(&mut self) -> Poll, Self::Error> { + (**self).poll_trailers() + } -/// A `Stream` for `Chunk`s used in requests and responses. + fn is_end_stream(&self) -> bool { + (**self).is_end_stream() + } + + fn content_length(&self) -> Option { + (**self).content_length() + } +} + +/// A wrapper to consume an `Entity` as a futures `Stream`. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct EntityStream { + is_data_eof: bool, + entity: E, +} + +impl Stream for EntityStream { + type Item = E::Data; + type Error = E::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if self.is_data_eof { + return self.entity.poll_trailers() + .map(|async| { + async.map(|_opt| { + // drop the trailers and return that Stream is done + None + }) + }); + } + + let opt = try_ready!(self.entity.poll_data()); + if let Some(data) = opt { + return Ok(Async::Ready(Some(data))); + } else { + self.is_data_eof = true; + } + } + } +} + +/// An `Entity` of `Chunk`s, used when receiving bodies. +/// +/// Also a good default `Entity` to use in many applications. #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, } -#[derive(Debug)] enum Kind { Chan { - close_tx: oneshot::Sender, + _close_tx: oneshot::Sender<()>, rx: mpsc::Receiver>, }, + Wrapped(Box + Send>), Once(Option), Empty, } -//pub(crate) +/// A sender half used with `Body::channel()`. #[derive(Debug)] -pub struct ChunkSender { - close_rx: oneshot::Receiver, - close_rx_check: bool, +pub struct Sender { + close_rx: oneshot::Receiver<()>, tx: BodySender, } impl Body { - /// Return an empty body stream + /// Create an empty `Body` stream. + /// + /// # Example + /// + /// ``` + /// use hyper::{Body, Request}; + /// + /// // create a `GET /` request + /// let get = Request::new(Body::empty()); + /// ``` #[inline] pub fn empty() -> Body { Body::new(Kind::Empty) } - /// Return a body stream with an associated sender half + /// Create a `Body` stream with an associated sender half. + #[inline] + pub fn channel() -> (Sender, Body) { + let (tx, rx) = mpsc::channel(0); + let (close_tx, close_rx) = oneshot::channel(); + + let tx = Sender { + close_rx: close_rx, + tx: tx, + }; + let rx = Body::new(Kind::Chan { + _close_tx: close_tx, + rx: rx, + }); + + (tx, rx) + } + + /// Wrap a futures `Stream` in a box inside `Body`. + /// + /// # Example + /// + /// ``` + /// # extern crate futures; + /// # extern crate hyper; + /// # use hyper::Body; + /// # fn main() { + /// let chunks = vec![ + /// "hello", + /// " ", + /// "world", + /// ]; + /// let stream = futures::stream::iter_ok(chunks); + /// + /// let body = Body::wrap_stream(stream); + /// # } + /// ``` + pub fn wrap_stream(stream: S) -> Body + where + S: Stream + Send + 'static, + Chunk: From, + { + Body::new(Kind::Wrapped(Box::new(stream.map(Chunk::from)))) + } + + /// Convert this `Body` into a `Stream`. + /// + /// # Example + /// + /// ``` + /// # extern crate futures; + /// # extern crate hyper; + /// # use futures::{Future, Stream}; + /// # use hyper::{Body, Request}; + /// # fn request_concat(some_req: Request) { + /// let req: Request = some_req; + /// let body = req.into_body(); + /// + /// let stream = body.into_stream(); + /// stream.concat2() + /// .map(|buf| { + /// println!("body length: {}", buf.len()); + /// }); + /// # } + /// # fn main() {} + /// ``` #[inline] - pub fn pair() -> (mpsc::Sender>, Body) { - let (tx, rx) = channel(); - (tx.tx, rx) + pub fn into_stream(self) -> EntityStream { + EntityStream { + is_data_eof: false, + entity: self, + } } /// Returns if this body was constructed via `Body::empty()`. @@ -68,8 +254,20 @@ impl Body { kind: kind, } } +} + +impl Default for Body { + #[inline] + fn default() -> Body { + Body::empty() + } +} + +impl Entity for Body { + type Data = Chunk; + type Error = ::Error; - fn poll_inner(&mut self) -> Poll, ::Error> { + fn poll_data(&mut self) -> Poll, Self::Error> { match self.kind { Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), @@ -77,85 +275,75 @@ impl Body { Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), }, + Kind::Wrapped(ref mut s) => s.poll(), Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Empty => Ok(Async::Ready(None)), } } -} -impl Default for Body { - #[inline] - fn default() -> Body { - Body::empty() + fn is_end_stream(&self) -> bool { + match self.kind { + Kind::Chan { .. } => false, + Kind::Wrapped(..) => false, + Kind::Once(ref val) => val.is_none(), + Kind::Empty => true + } } -} -impl Stream for Body { - type Item = Chunk; - type Error = ::Error; - - #[inline] - fn poll(&mut self) -> Poll, ::Error> { - self.poll_inner() + fn content_length(&self) -> Option { + match self.kind { + Kind::Chan { .. } => None, + Kind::Wrapped(..) => None, + Kind::Once(Some(ref val)) => Some(val.len() as u64), + Kind::Once(None) => None, + Kind::Empty => Some(0) + } } } impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("Body") - .field(&self.kind) + f.debug_struct("Body") .finish() } } -//pub(crate) -pub fn channel() -> (ChunkSender, Body) { - let (tx, rx) = mpsc::channel(0); - let (close_tx, close_rx) = oneshot::channel(); - - let tx = ChunkSender { - close_rx: close_rx, - close_rx_check: true, - tx: tx, - }; - let rx = Body::new(Kind::Chan { - close_tx: close_tx, - rx: rx, - }); - - (tx, rx) -} - -impl ChunkSender { +impl Sender { + /// Check to see if this `Sender` can send more data. pub fn poll_ready(&mut self) -> Poll<(), ()> { - if self.close_rx_check { - match self.close_rx.poll() { - Ok(Async::Ready(true)) | Err(_) => return Err(()), - Ok(Async::Ready(false)) => { - // needed to allow converting into a plain mpsc::Receiver - // if it has been, the tx will send false to disable this check - self.close_rx_check = false; - } - Ok(Async::NotReady) => (), - } + match self.close_rx.poll() { + Ok(Async::Ready(())) | Err(_) => return Err(()), + Ok(Async::NotReady) => (), } self.tx.poll_ready().map_err(|_| ()) } - pub fn start_send(&mut self, msg: Result) -> StartSend<(), ()> { - match self.tx.start_send(msg) { - Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), - Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())), - Err(_) => Err(()), - } + /// Sends data on this channel. + /// + /// This should be called after `poll_ready` indicated the channel + /// could accept another `Chunk`. + /// + /// Returns `Err(Chunk)` if the channel could not (currently) accept + /// another `Chunk`. + pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { + self.tx.try_send(Ok(chunk)) + .map_err(|err| err.into_inner().expect("just sent Ok")) + } + + pub(crate) fn send_error(&mut self, err: ::Error) { + let _ = self.tx.try_send(Err(err)); } } impl From for Body { #[inline] - fn from (chunk: Chunk) -> Body { - Body::new(Kind::Once(Some(chunk))) + fn from(chunk: Chunk) -> Body { + if chunk.is_empty() { + Body::empty() + } else { + Body::new(Kind::Once(Some(chunk))) + } } } @@ -214,13 +402,6 @@ impl From> for Body { } } -impl From> for Body { - #[inline] - fn from (body: Option) -> Body { - body.unwrap_or_default() - } -} - fn _assert_send_sync() { fn _assert_send() {} fn _assert_sync() {} @@ -232,15 +413,14 @@ fn _assert_send_sync() { #[test] fn test_body_stream_concat() { - use futures::{Sink, Stream, Future}; - let (tx, body) = Body::pair(); + use futures::{Stream, Future}; - ::std::thread::spawn(move || { - let tx = tx.send(Ok("hello ".into())).wait().unwrap(); - tx.send(Ok("world".into())).wait().unwrap(); - }); + let body = Body::from("hello world"); - let total = body.concat2().wait().unwrap(); + let total = body.into_stream() + .concat2() + .wait() + .unwrap(); assert_eq!(total.as_ref(), b"hello world"); } diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 30f9df87f3..9a3419f6a2 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -8,7 +8,7 @@ use futures::task::Task; use http::{Method, Version}; use tokio_io::{AsyncRead, AsyncWrite}; -use proto::{Chunk, Decode, Http1Transaction, MessageHead}; +use proto::{BodyLength, Chunk, Decode, Http1Transaction, MessageHead}; use super::io::{Cursor, Buffered}; use super::{EncodedBuf, Encoder, Decoder}; @@ -418,7 +418,7 @@ where I: AsyncRead + AsyncWrite, self.io.can_buffer() } - pub fn write_head(&mut self, mut head: MessageHead, body: bool) { + pub fn write_head(&mut self, mut head: MessageHead, body: Option) { debug_assert!(self.can_write_head()); if !T::should_read_first() { @@ -541,7 +541,7 @@ where I: AsyncRead + AsyncWrite, match self.state.writing { Writing::Init => { if let Some(msg) = T::on_error(&err) { - self.write_head(msg, false); + self.write_head(msg, None); self.state.error = Some(err); return Ok(()); } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 31539e3887..dc77d45e88 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,17 +1,18 @@ use std::io; use bytes::Bytes; -use futures::{Async, AsyncSink, Future, Poll, Stream}; +use futures::{Async, Future, Poll, Stream}; use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; -use proto::{Body, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; +use proto::body::Entity; +use proto::{Body, BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; pub struct Dispatcher { conn: Conn, dispatch: D, - body_tx: Option<::proto::body::ChunkSender>, + body_tx: Option<::proto::body::Sender>, body_rx: Option, is_closing: bool, } @@ -46,7 +47,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Stream, + Bs: Entity, { pub fn new(dispatch: D, conn: Conn) -> Self { Dispatcher { @@ -130,13 +131,10 @@ where } match self.conn.read_body() { Ok(Async::Ready(Some(chunk))) => { - match body.start_send(Ok(chunk)) { - Ok(AsyncSink::Ready) => { + match body.send_data(chunk) { + Ok(()) => { self.body_tx = Some(body); }, - Ok(AsyncSink::NotReady(_chunk)) => { - unreachable!("mpsc poll_ready was ready, start_send was not"); - } Err(_canceled) => { if self.conn.can_read_body() { trace!("body receiver dropped before eof, closing"); @@ -154,7 +152,7 @@ where return Ok(Async::NotReady); } Err(e) => { - let _ = body.start_send(Err(::Error::Io(e))); + body.send_error(::Error::Io(e)); } } } else { @@ -181,7 +179,7 @@ where match self.conn.read_head() { Ok(Async::Ready(Some((head, has_body)))) => { let body = if has_body { - let (mut tx, rx) = ::proto::body::channel(); + let (mut tx, rx) = Body::channel(); let _ = tx.poll_ready(); // register this task if rx is dropped self.body_tx = Some(tx); rx @@ -213,7 +211,12 @@ where return Ok(Async::Ready(())); } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) { - self.conn.write_head(head, body.is_some()); + let body_type = body.as_ref().map(|body| { + body.content_length() + .map(BodyLength::Known) + .unwrap_or(BodyLength::Unknown) + }); + self.conn.write_head(head, body_type); self.body_rx = body; } else { self.close(); @@ -222,7 +225,7 @@ where } else if !self.conn.can_buffer_body() { try_ready!(self.poll_flush()); } else if let Some(mut body) = self.body_rx.take() { - let chunk = match body.poll()? { + let chunk = match body.poll_data()? { Async::Ready(Some(chunk)) => { self.body_rx = Some(body); chunk @@ -291,7 +294,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Stream, + Bs: Entity, { type Item = (); type Error = ::Error; @@ -316,8 +319,7 @@ impl Server where S: Service { impl Dispatch for Server where S: Service, Response=Response, Error=::Error>, - Bs: Stream, - Bs::Item: AsRef<[u8]>, + Bs: Entity, { type PollItem = MessageHead; type PollBody = Bs; @@ -338,7 +340,12 @@ where subject: parts.status, headers: parts.headers, }; - Ok(Async::Ready(Some((head, Some(body))))) + let body = if body.is_end_stream() { + None + } else { + Some(body) + }; + Ok(Async::Ready(Some((head, body)))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); } @@ -382,8 +389,7 @@ impl Client { impl Dispatch for Client where - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { type PollItem = RequestHead; type PollBody = B; @@ -405,8 +411,14 @@ where subject: RequestLine(parts.method, parts.uri), headers: parts.headers, }; + + let body = if body.is_end_stream() { + None + } else { + Some(body) + }; self.callback = Some(cb); - Ok(Async::Ready(Some((head, Some(body))))) + Ok(Async::Ready(Some((head, body)))) } } }, diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 587bbda2ca..9ac241bf98 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -1,12 +1,12 @@ use std::fmt::{self, Write}; use bytes::{BytesMut, Bytes}; -use http::header::{CONTENT_LENGTH, DATE, HeaderName, HeaderValue, TRANSFER_ENCODING}; +use http::header::{CONTENT_LENGTH, DATE, Entry, HeaderName, HeaderValue, TRANSFER_ENCODING}; use http::{HeaderMap, Method, StatusCode, Uri, Version}; use httparse; use headers; -use proto::{Decode, MessageHead, Http1Transaction, ParseResult, RequestLine, RequestHead}; +use proto::{BodyLength, Decode, MessageHead, Http1Transaction, ParseResult, RequestLine, RequestHead}; use proto::h1::{Encoder, Decoder, date}; const MAX_HEADERS: usize = 100; @@ -122,8 +122,13 @@ where } - fn encode(mut head: MessageHead, has_body: bool, method: &mut Option, dst: &mut Vec) -> ::Result { - trace!("Server::encode has_body={}, method={:?}", has_body, method); + fn encode( + mut head: MessageHead, + body: Option, + method: &mut Option, + dst: &mut Vec, + ) -> ::Result { + trace!("Server::encode body={:?}, method={:?}", body, method); // hyper currently doesn't support returning 1xx status codes as a Response // This is because Service only allows returning a single Response, and @@ -132,7 +137,7 @@ where let ret = if StatusCode::SWITCHING_PROTOCOLS == head.subject { T::on_encode_upgrade(&mut head) .map(|_| { - let mut enc = Server::set_length(&mut head, has_body, method.as_ref()); + let mut enc = Server::set_length(&mut head, body, method.as_ref()); enc.set_last(); enc }) @@ -143,7 +148,7 @@ where headers::content_length_zero(&mut head.headers); Err(::Error::Status) } else { - Ok(Server::set_length(&mut head, has_body, method.as_ref())) + Ok(Server::set_length(&mut head, body, method.as_ref())) }; @@ -160,6 +165,7 @@ where extend(dst, head.subject.as_str().as_bytes()); extend(dst, b" "); + // a reason MUST be written, as many parsers will expect it. extend(dst, head.subject.canonical_reason().unwrap_or("").as_bytes()); extend(dst, b"\r\n"); } @@ -207,7 +213,7 @@ where } impl Server<()> { - fn set_length(head: &mut MessageHead, has_body: bool, method: Option<&Method>) -> Encoder { + fn set_length(head: &mut MessageHead, body: Option, method: Option<&Method>) -> Encoder { // these are here thanks to borrowck // `if method == Some(&Method::Get)` says the RHS doesn't live long enough const HEAD: Option<&'static Method> = Some(&Method::HEAD); @@ -229,8 +235,8 @@ impl Server<()> { } }; - if has_body && can_have_body { - set_length(&mut head.headers, head.version == Version::HTTP_11) + if let (Some(body), true) = (body, can_have_body) { + set_length(&mut head.headers, body, head.version == Version::HTTP_11) } else { head.headers.remove(TRANSFER_ENCODING); if can_have_body { @@ -354,12 +360,17 @@ where } } - fn encode(mut head: MessageHead, has_body: bool, method: &mut Option, dst: &mut Vec) -> ::Result { - trace!("Client::encode has_body={}, method={:?}", has_body, method); + fn encode( + mut head: MessageHead, + body: Option, + method: &mut Option, + dst: &mut Vec, + ) -> ::Result { + trace!("Client::encode body={:?}, method={:?}", body, method); *method = Some(head.subject.0.clone()); - let body = Client::set_length(&mut head, has_body); + let body = Client::set_length(&mut head, body); let init_cap = 30 + head.headers.len() * AVERAGE_HEADER_SIZE; dst.reserve(init_cap); @@ -398,33 +409,143 @@ where } impl Client<()> { - fn set_length(head: &mut RequestHead, has_body: bool) -> Encoder { - if has_body { + fn set_length(head: &mut RequestHead, body: Option) -> Encoder { + if let Some(body) = body { let can_chunked = head.version == Version::HTTP_11 && (head.subject.0 != Method::HEAD) && (head.subject.0 != Method::GET) && (head.subject.0 != Method::CONNECT); - set_length(&mut head.headers, can_chunked) + set_length(&mut head.headers, body, can_chunked) } else { - head.headers.remove(CONTENT_LENGTH); head.headers.remove(TRANSFER_ENCODING); Encoder::length(0) } } } -fn set_length(headers: &mut HeaderMap, can_chunked: bool) -> Encoder { - let len = headers::content_length_parse(&headers); +fn set_length(headers: &mut HeaderMap, body: BodyLength, can_chunked: bool) -> Encoder { + // If the user already set specific headers, we should respect them, regardless + // of what the Entity knows about itself. They set them for a reason. + + // Because of the borrow checker, we can't check the for an existing + // Content-Length header while holding an `Entry` for the Transfer-Encoding + // header, so unfortunately, we must do the check here, first. + + let existing_con_len = headers::content_length_parse(headers); + let mut should_remove_con_len = false; + + if can_chunked { + // If the user set a transfer-encoding, respect that. Let's just + // make sure `chunked` is the final encoding. + let encoder = match headers.entry(TRANSFER_ENCODING) + .expect("TRANSFER_ENCODING is valid HeaderName") { + Entry::Occupied(te) => { + should_remove_con_len = true; + if headers::is_chunked(te.iter()) { + Some(Encoder::chunked()) + } else { + warn!("user provided transfer-encoding does not end in 'chunked'"); + + // There's a Transfer-Encoding, but it doesn't end in 'chunked'! + // An example that could trigger this: + // + // Transfer-Encoding: gzip + // + // This can be bad, depending on if this is a request or a + // response. + // + // - A request is illegal if there is a `Transfer-Encoding` + // but it doesn't end in `chunked`. + // - A response that has `Transfer-Encoding` but doesn't + // end in `chunked` isn't illegal, it just forces this + // to be close-delimited. + // + // We can try to repair this, by adding `chunked` ourselves. + + headers::add_chunked(te); + Some(Encoder::chunked()) + } + }, + Entry::Vacant(te) => { + if let Some(len) = existing_con_len { + Some(Encoder::length(len)) + } else if let BodyLength::Unknown = body { + should_remove_con_len = true; + te.insert(HeaderValue::from_static("chunked")); + Some(Encoder::chunked()) + } else { + None + } + }, + }; + + // This is because we need a second mutable borrow to remove + // content-length header. + if let Some(encoder) = encoder { + if should_remove_con_len && existing_con_len.is_some() { + headers.remove(CONTENT_LENGTH); + } + return encoder; + } + + // User didn't set transfer-encoding, AND we know body length, + // so we can just set the Content-Length automatically. + + let len = if let BodyLength::Known(len) = body { + len + } else { + unreachable!("BodyLength::Unknown would set chunked"); + }; - if let Some(len) = len { - Encoder::length(len) - } else if can_chunked { - //TODO: maybe not overwrite existing transfer-encoding - headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - Encoder::chunked() + set_content_length(headers, len) } else { - headers.remove(TRANSFER_ENCODING); - Encoder::eof() + // Chunked isn't legal, so if it is set, we need to remove it. + // Also, if it *is* set, then we shouldn't replace with a length, + // since the user tried to imply there isn't a length. + let encoder = if headers.remove(TRANSFER_ENCODING).is_some() { + trace!("removing illegal transfer-encoding header"); + should_remove_con_len = true; + Encoder::eof() + } else if let Some(len) = existing_con_len { + Encoder::length(len) + } else if let BodyLength::Known(len) = body { + set_content_length(headers, len) + } else { + Encoder::eof() + }; + + if should_remove_con_len && existing_con_len.is_some() { + headers.remove(CONTENT_LENGTH); + } + + encoder + } +} + +fn set_content_length(headers: &mut HeaderMap, len: u64) -> Encoder { + // At this point, there should not be a valid Content-Length + // header. However, since we'll be indexing in anyways, we can + // warn the user if there was an existing illegal header. + + match headers.entry(CONTENT_LENGTH) + .expect("CONTENT_LENGTH is valid HeaderName") { + Entry::Occupied(mut cl) => { + // Uh oh, the user set `Content-Length` headers, but set bad ones. + // This would be an illegal message anyways, so let's try to repair + // with our known good length. + warn!("user provided content-length header was invalid"); + + // Internal sanity check, we should have already determined + // that the header was illegal before calling this function. + debug_assert!(headers::content_length_parse_all(cl.iter()).is_none()); + + cl.insert(headers::content_length_value(len)); + Encoder::length(len) + }, + Entry::Vacant(cl) => { + cl.insert(headers::content_length_value(len)); + Encoder::length(len) + } } } @@ -572,6 +693,7 @@ mod tests { } } + #[test] fn test_parse_request() { extern crate pretty_env_logger; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 68b7be0cf3..b0e2a9d531 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -8,7 +8,7 @@ pub use self::body::Body; pub use self::chunk::Chunk; pub use self::h1::{dispatch, Conn}; -mod body; +pub mod body; mod chunk; mod h1; //mod h2; @@ -72,7 +72,12 @@ pub trait Http1Transaction { type Outgoing: Default; fn parse(bytes: &mut BytesMut) -> ParseResult; fn decoder(head: &MessageHead, method: &mut Option) -> ::Result; - fn encode(head: MessageHead, has_body: bool, method: &mut Option, dst: &mut Vec) -> ::Result; + fn encode( + head: MessageHead, + body: Option, + method: &mut Option, + dst: &mut Vec, + ) -> ::Result; fn on_error(err: &::Error) -> Option>; fn should_error_on_parse_eof() -> bool; @@ -81,6 +86,15 @@ pub trait Http1Transaction { pub type ParseResult = ::Result, usize)>>; +#[derive(Debug)] +pub enum BodyLength { + /// Content-Length + Known(u64), + /// Transfer-Encoding: chunked (if h1) + Unknown, +} + + #[derive(Debug)] pub enum Decode { /// Decode normally. diff --git a/src/server/conn.rs b/src/server/conn.rs index 389e28c708..4b34351cad 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -11,10 +11,11 @@ use std::fmt; use bytes::Bytes; -use futures::{Future, Poll, Stream}; +use futures::{Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; -use proto::{self, Body}; +use proto; +use proto::body::{Body, Entity}; use super::{HyperService, Request, Response, Service}; /// A future binding a connection with a Service. @@ -24,14 +25,13 @@ use super::{HyperService, Request, Response, Service}; pub struct Connection where S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, + S::ResponseBody: Entity, { pub(super) conn: proto::dispatch::Dispatcher< proto::dispatch::Server, S::ResponseBody, I, - ::Item, + ::Data, proto::ServerTransaction, >, } @@ -61,8 +61,7 @@ pub struct Parts { impl Connection where S: Service, Response = Response, Error = ::Error> + 'static, I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Disables keep-alive for this connection. pub fn disable_keep_alive(&mut self) { @@ -99,8 +98,7 @@ where S: Service, Response = Response, Error = ::Erro impl Future for Connection where S: Service, Response = Response, Error = ::Error> + 'static, I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (); type Error = ::Error; @@ -113,8 +111,7 @@ where S: Service, Response = Response, Error = ::Erro impl fmt::Debug for Connection where S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, + S::ResponseBody: Entity, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") diff --git a/src/server/mod.rs b/src/server/mod.rs index 0c2c93100d..18b0d0e7cb 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -23,7 +23,8 @@ use tokio::reactor::{Core, Handle, Timeout}; use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; -use proto::{self, Body}; +use proto::body::{Body, Entity}; +use proto; use self::addr_stream::AddrStream; use self::hyper_service::HyperService; @@ -48,10 +49,10 @@ pub struct Http { /// This server is intended as a convenience for creating a TCP listener on an /// address and then serving TCP connections accepted with the service provided. pub struct Server -where B: Stream, - B::Item: AsRef<[u8]>, +where + B: Entity, { - protocol: Http, + protocol: Http, new_service: S, reactor: Core, listener: TcpListener, @@ -90,7 +91,6 @@ pub struct AddrIncoming { timeout: Option, } - // ===== impl Http ===== impl + 'static> Http { @@ -154,7 +154,7 @@ impl + 'static> Http { /// actually run the server. pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> where S: NewService, Response = Response, Error = ::Error> + 'static, - Bd: Stream, + Bd: Entity, { let core = try!(Core::new()); let handle = core.handle(); @@ -179,7 +179,7 @@ impl + 'static> Http { /// connection. pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> where S: NewService, Response = Response, Error = ::Error>, - Bd: Stream, + Bd: Entity, { let listener = TcpListener::bind(addr, &handle)?; let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; @@ -196,7 +196,7 @@ impl + 'static> Http { where I: Stream, I::Item: AsyncRead + AsyncWrite, S: NewService, Response = Response, Error = ::Error>, - Bd: Stream, + Bd: Entity, { Serve { incoming: incoming, @@ -246,10 +246,8 @@ impl + 'static> Http { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where S: Service, Response = Response, Error = ::Error>, - Bd: Stream, - Bd::Item: AsRef<[u8]>, + Bd: Entity, I: AsyncRead + AsyncWrite, - { let mut conn = proto::Conn::new(io); if !self.keep_alive { @@ -290,8 +288,7 @@ impl fmt::Debug for Http { impl Server where S: NewService, Response = Response, Error = ::Error> + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> ::Result { @@ -407,8 +404,7 @@ impl Server } } -impl> fmt::Debug for Server -where B::Item: AsRef<[u8]> +impl> fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") @@ -445,8 +441,7 @@ where I: Stream, I::Item: AsyncRead + AsyncWrite, S: NewService, Response=Response, Error=::Error>, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { type Item = Connection; type Error = ::Error; @@ -720,7 +715,7 @@ impl Future for WaitUntilZero { } mod hyper_service { - use super::{Body, Request, Response, Service, Stream}; + use super::{Body, Entity, Request, Response, Service}; /// A "trait alias" for any type that implements `Service` with hyper's /// Request, Response, and Error types, and a streaming body. /// @@ -751,8 +746,7 @@ mod hyper_service { Response=Response, Error=::Error, >, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, {} impl HyperService for S @@ -763,8 +757,7 @@ mod hyper_service { Error=::Error, >, S: Sealed, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { type ResponseBody = B; type Sealed = Opaque; diff --git a/tests/client.rs b/tests/client.rs index a5d6b51076..43bfbd18c7 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -34,7 +34,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, response: status: $client_status:ident, @@ -53,7 +52,6 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - proxy: $request_proxy, response: status: $client_status, @@ -73,7 +71,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, response: status: $client_status:ident, @@ -99,7 +96,6 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - proxy: $request_proxy, }.unwrap(); @@ -108,7 +104,7 @@ macro_rules! test { assert_eq!(res.headers()[$response_header_name], $response_header_val); )* - let body = core.run(res.into_parts().1.concat2()).unwrap(); + let body = core.run(res.into_body().into_stream().concat2()).unwrap(); let expected_res_body = Option::<&[u8]>::from($response_body) .unwrap_or_default(); @@ -126,7 +122,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, error: $err:expr, ) => ( @@ -149,7 +144,6 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - proxy: $request_proxy, }.unwrap_err(); if !$err(&err) { panic!("unexpected error: {:?}", err) @@ -171,7 +165,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, ) => ({ let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); @@ -183,32 +176,21 @@ macro_rules! test { } let client = config.build(&core.handle()); - let mut is_empty = false; let body = if let Some(body) = $request_body { let body: &'static str = body; body.into() } else { - is_empty = true; Body::empty() }; - let mut req = Request::builder(); - req + let req = Request::builder() .method(Method::$client_method) + .uri(&*format!($client_url, addr=addr)) $( .header($request_header_name, $request_header_val) )* - .uri(&*format!($client_url, addr=addr)); - - //TODO: remove when client bodies are fixed - if is_empty { - req.header("content-length", "0"); - } - - let req = req.body(body) + .body(body) .unwrap(); - // req.set_proxy($request_proxy); - let res = client.request(req); let (tx, rx) = oneshot::channel(); @@ -257,7 +239,6 @@ test! { url: "http://{addr}/", headers: {}, body: None, - proxy: false, response: status: OK, headers: { @@ -279,7 +260,6 @@ test! { url: "http://{addr}/foo?key=val#dont_send_me", headers: {}, body: None, - proxy: false, response: status: OK, headers: { @@ -301,7 +281,6 @@ test! { url: "http://{addr}/", headers: {}, body: Some(""), - proxy: false, response: status: OK, headers: { @@ -331,7 +310,6 @@ test! { "Content-Length" => "7", }, body: Some("foo bar"), - proxy: false, response: status: OK, headers: {}, @@ -361,7 +339,6 @@ test! { "Transfer-Encoding" => "chunked", }, body: Some("foo bar baz"), - proxy: false, response: status: OK, headers: {}, @@ -387,14 +364,14 @@ test! { headers: { "Content-Length" => "0", }, - body: Some(""), - proxy: false, + body: None, response: status: OK, headers: {}, body: None, } +/*TODO: when new Connect trait allows stating connection is proxied test! { name: client_http_proxy, @@ -407,18 +384,18 @@ test! { reply: REPLY_OK, client: + proxy: true, request: method: GET, url: "http://{addr}/proxy", headers: {}, body: None, - proxy: true, response: status: OK, headers: {}, body: None, } - +*/ test! { name: client_head_ignores_body, @@ -442,7 +419,6 @@ test! { url: "http://{addr}/head", headers: {}, body: None, - proxy: false, response: status: OK, headers: {}, @@ -473,7 +449,6 @@ test! { url: "http://{addr}/pipe", headers: {}, body: None, - proxy: false, response: status: OK, headers: {}, @@ -500,7 +475,6 @@ test! { url: "http://{addr}/err", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Incomplete => true, _ => false, @@ -527,7 +501,6 @@ test! { url: "http://{addr}/err", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Version => true, _ => false, @@ -562,7 +535,6 @@ test! { "Content-Length" => "7", }, body: Some("foo bar"), - proxy: false, response: status: OK, headers: {}, @@ -592,7 +564,6 @@ test! { url: "http://{addr}/upgrade", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Upgrade => true, _ => false, @@ -618,7 +589,6 @@ test! { url: "http://{addr}/", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Method => true, _ => false, @@ -648,7 +618,6 @@ test! { url: "http://{addr}/no-host/{addr}", headers: {}, body: None, - proxy: false, response: status: OK, headers: {}, @@ -755,7 +724,7 @@ mod dispatch_impl { .unwrap(); client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }).and_then(|_| { Timeout::new(Duration::from_secs(1), &handle).unwrap() .from_err() @@ -808,7 +777,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); core.run(res.join(rx).map(|r| r.0)).unwrap(); @@ -975,7 +944,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); core.run(res.join(rx).map(|r| r.0)).unwrap(); @@ -1023,7 +992,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); core.run(res.join(rx).map(|r| r.0)).unwrap(); @@ -1078,7 +1047,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); core.run(res).unwrap(); @@ -1130,7 +1099,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1215,8 +1184,6 @@ mod dispatch_impl { let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1227,8 +1194,6 @@ mod dispatch_impl { let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1281,8 +1246,6 @@ mod dispatch_impl { let req = Request::builder() .method("HEAD") .uri(&*format!("http://{}/a", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1293,8 +1256,6 @@ mod dispatch_impl { let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1434,7 +1395,7 @@ mod conn { .unwrap(); let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1481,7 +1442,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1522,7 +1483,7 @@ mod conn { .unwrap(); let res1 = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); // pipelined request will hit NotReady, and thus should return an Error::Cancel @@ -1599,7 +1560,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); assert_eq!(res.headers()["Upgrade"], "foobar"); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1680,7 +1641,7 @@ mod conn { let res = client.send_request(req) .and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }) .map(|body| { assert_eq!(body.as_ref(), b""); diff --git a/tests/server.rs b/tests/server.rs index 2bfc80b6e2..d5ce55746e 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -99,8 +99,8 @@ fn get_implicitly_empty() { type Future = Box>; fn call(&self, req: Request) -> Self::Future { - Box::new(req.into_parts() - .1 + Box::new(req.into_body() + .into_stream() .concat2() .map(|buf| { assert!(buf.is_empty()); @@ -110,112 +110,188 @@ fn get_implicitly_empty() { } } -#[test] -fn get_fixed_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .header("content-length", foo_bar.len().to_string()) - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); - let n = body.find("\r\n\r\n").unwrap() + 4; +mod response_body_lengths { + use super::*; - assert_eq!(&body[n..], "foo bar baz"); -} + struct TestCase { + version: usize, + headers: &'static [(&'static str, &'static str)], + body: Bd, + expects_chunked: bool, + expects_con_len: bool, + } -#[test] -fn get_chunked_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .header("transfer-encoding", "chunked") - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); - let n = body.find("\r\n\r\n").unwrap() + 4; + enum Bd { + Known(&'static str), + Unknown(&'static str), + } - assert_eq!(&body[n..], "B\r\nfoo bar baz\r\n0\r\n\r\n"); -} + fn run_test(case: TestCase) { + assert!(case.version == 0 || case.version == 1, "TestCase.version must 0 or 1"); -#[test] -fn get_auto_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); + let server = serve(); - assert!(has_header(&body, "Transfer-Encoding: chunked")); + let mut reply = server.reply(); + for header in case.headers { + reply = reply.header(header.0, header.1); + } - let n = body.find("\r\n\r\n").unwrap() + 4; - assert_eq!(&body[n..], "B\r\nfoo bar baz\r\n0\r\n\r\n"); -} + let body_str = match case.body { + Bd::Known(b) => { + reply.body(b); + b + }, + Bd::Unknown(b) => { + let (mut tx, body) = hyper::Body::channel(); + tx.send_data(b.into()).expect("send_data"); + reply.body_stream(body); + b + }, + }; + + let mut req = connect(server.addr()); + write!(req, "\ + GET / HTTP/1.{}\r\n\ + Host: example.domain\r\n\ + Connection: close\r\n\ + \r\n\ + ", case.version).expect("request write"); + let mut body = String::new(); + req.read_to_string(&mut body).unwrap(); + + assert_eq!( + case.expects_chunked, + has_header(&body, "transfer-encoding:"), + "expects_chunked" + ); + assert_eq!( + case.expects_con_len, + has_header(&body, "content-length:"), + "expects_con_len" + ); + + let n = body.find("\r\n\r\n").unwrap() + 4; + + if case.expects_chunked { + let len = body.len(); + assert_eq!(&body[n + 1..n + 3], "\r\n", "expected body chunk size header"); + assert_eq!(&body[n + 3..len - 7], body_str, "expected body"); + assert_eq!(&body[len - 7..], "\r\n0\r\n\r\n", "expected body final chunk size header"); + } else { + assert_eq!(&body[n..], body_str, "expected body"); + } + } -#[test] -fn http_10_get_auto_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.0\r\n\ - Host: example.domain\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); + #[test] + fn get_fixed_response_known() { + run_test(TestCase { + version: 1, + headers: &[("content-length", "11")], + body: Bd::Known("foo bar baz"), + expects_chunked: false, + expects_con_len: true, + }); + } - assert!(!has_header(&body, "Transfer-Encoding:")); + #[test] + fn get_fixed_response_unknown() { + run_test(TestCase { + version: 1, + headers: &[("content-length", "11")], + body: Bd::Unknown("foo bar baz"), + expects_chunked: false, + expects_con_len: true, + }); + } - let n = body.find("\r\n\r\n").unwrap() + 4; - assert_eq!(&body[n..], "foo bar baz"); -} + #[test] + fn get_chunked_response_known() { + run_test(TestCase { + version: 1, + headers: &[("transfer-encoding", "chunked")], + // even though we know the length, don't strip user's TE header + body: Bd::Known("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } -#[test] -fn http_10_get_chunked_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - // this header should actually get removed - .header("transfer-encoding", "chunked") - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.0\r\n\ - Host: example.domain\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); + #[test] + fn get_chunked_response_unknown() { + run_test(TestCase { + version: 1, + headers: &[("transfer-encoding", "chunked")], + body: Bd::Unknown("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } - assert!(!has_header(&body, "Transfer-Encoding:")); + #[test] + fn get_chunked_response_trumps_length() { + run_test(TestCase { + version: 1, + headers: &[ + ("transfer-encoding", "chunked"), + // both headers means content-length is stripped + ("content-length", "11"), + ], + body: Bd::Known("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } - let n = body.find("\r\n\r\n").unwrap() + 4; - assert_eq!(&body[n..], "foo bar baz"); + #[test] + fn get_auto_response_with_entity_unknown_length() { + run_test(TestCase { + version: 1, + // no headers means trying to guess from Entity + headers: &[], + body: Bd::Unknown("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } + + #[test] + fn get_auto_response_with_entity_known_length() { + run_test(TestCase { + version: 1, + // no headers means trying to guess from Entity + headers: &[], + body: Bd::Known("foo bar baz"), + expects_chunked: false, + expects_con_len: true, + }); + } + + + #[test] + fn http_10_get_auto_response_with_entity_unknown_length() { + run_test(TestCase { + version: 0, + // no headers means trying to guess from Entity + headers: &[], + body: Bd::Unknown("foo bar baz"), + expects_chunked: false, + expects_con_len: false, + }); + } + + + #[test] + fn http_10_get_chunked_response() { + run_test(TestCase { + version: 0, + // http/1.0 should strip this header + headers: &[("transfer-encoding", "chunked")], + // even when we don't know the length + body: Bd::Unknown("foo bar baz"), + expects_chunked: false, + expects_con_len: false, + }); + } } #[test] @@ -314,67 +390,6 @@ fn post_with_incomplete_body() { req.read(&mut [0; 256]).expect("read"); } -#[test] -fn empty_response_chunked() { - let server = serve(); - - server.reply() - .body(""); - - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Content-Length: 0\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - - - let mut response = String::new(); - req.read_to_string(&mut response).unwrap(); - - assert!(response.contains("Transfer-Encoding: chunked\r\n")); - - let mut lines = response.lines(); - assert_eq!(lines.next(), Some("HTTP/1.1 200 OK")); - - let mut lines = lines.skip_while(|line| !line.is_empty()); - assert_eq!(lines.next(), Some("")); - // 0\r\n\r\n - assert_eq!(lines.next(), Some("0")); - assert_eq!(lines.next(), Some("")); - assert_eq!(lines.next(), None); -} - -#[test] -fn empty_response_chunked_without_body_should_set_content_length() { - extern crate pretty_env_logger; - let _ = pretty_env_logger::try_init(); - let server = serve(); - server.reply() - .header("transfer-encoding", "chunked"); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - - let mut response = String::new(); - req.read_to_string(&mut response).unwrap(); - - assert!(!response.contains("Transfer-Encoding: chunked\r\n")); - assert!(response.contains("Content-Length: 0\r\n")); - - let mut lines = response.lines(); - assert_eq!(lines.next(), Some("HTTP/1.1 200 OK")); - - let mut lines = lines.skip_while(|line| !line.is_empty()); - assert_eq!(lines.next(), Some("")); - assert_eq!(lines.next(), None); -} #[test] fn head_response_can_send_content_length() { @@ -394,7 +409,7 @@ fn head_response_can_send_content_length() { let mut response = String::new(); req.read_to_string(&mut response).unwrap(); - assert!(response.contains("Content-Length: 1024\r\n")); + assert!(response.contains("content-length: 1024\r\n")); let mut lines = response.lines(); assert_eq!(lines.next(), Some("HTTP/1.1 200 OK")); @@ -423,7 +438,7 @@ fn response_does_not_set_chunked_if_body_not_allowed() { let mut response = String::new(); req.read_to_string(&mut response).unwrap(); - assert!(!response.contains("Transfer-Encoding")); + assert!(!response.contains("transfer-encoding")); let mut lines = response.lines(); assert_eq!(lines.next(), Some("HTTP/1.1 304 Not Modified")); @@ -691,13 +706,13 @@ fn pipeline_enabled() { { let mut lines = buf.split(|&b| b == b'\n'); assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r"); - assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r"); + assert_eq!(s(lines.next().unwrap()), "content-length: 12\r"); lines.next().unwrap(); // Date assert_eq!(s(lines.next().unwrap()), "\r"); assert_eq!(s(lines.next().unwrap()), "Hello World"); assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r"); - assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r"); + assert_eq!(s(lines.next().unwrap()), "content-length: 12\r"); lines.next().unwrap(); // Date assert_eq!(s(lines.next().unwrap()), "\r"); assert_eq!(s(lines.next().unwrap()), "Hello World"); @@ -720,7 +735,7 @@ fn http_10_request_receives_http_10_response() { \r\n\ ").unwrap(); - let expected = "HTTP/1.0 200 OK\r\nContent-Length: 0\r\n"; + let expected = "HTTP/1.0 200 OK\r\ncontent-length: 0\r\n"; let mut buf = [0; 256]; let n = req.read(&mut buf).unwrap(); assert!(n >= expected.len(), "read: {:?} >= {:?}", n, expected.len()); @@ -729,6 +744,7 @@ fn http_10_request_receives_http_10_response() { #[test] fn disable_keep_alive_mid_request() { + let mut core = Core::new().unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -773,6 +789,7 @@ fn disable_keep_alive_mid_request() { #[test] fn disable_keep_alive_post_request() { + let _ = pretty_env_logger::try_init(); let mut core = Core::new().unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -790,10 +807,11 @@ fn disable_keep_alive_post_request() { let mut buf = [0; 1024 * 8]; loop { let n = req.read(&mut buf).expect("reading 1"); - if n < buf.len() { - if &buf[n - HELLO.len()..n] == HELLO.as_bytes() { - break; - } + if &buf[n - HELLO.len()..n] == HELLO.as_bytes() { + break; + } + if n == 0 { + panic!("unexpected eof"); } } @@ -1113,16 +1131,14 @@ fn streaming_body() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let (socket, _) = item.unwrap(); - Http::<& &'static [u8]>::new() + Http::::new() .keep_alive(false) .serve_connection(socket, service_fn(|_| { static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; - let b = ::futures::stream::iter_ok(S.iter()); + let b = ::futures::stream::iter_ok(S.into_iter()) + .map(|&s| s); + let b = hyper::Body::wrap_stream(b); Ok(Response::new(b)) - /* - Ok(Response::, ::hyper::Error>>::new() - .with_body(b)) - */ })) .map(|_| ()) }); @@ -1195,7 +1211,12 @@ impl<'a> ReplyBuilder<'a> { } fn body>(self, body: T) { - self.tx.send(Reply::Body(body.as_ref().into())).unwrap(); + self.tx.send(Reply::Body(body.as_ref().to_vec().into())).unwrap(); + } + + fn body_stream(self, body: Body) + { + self.tx.send(Reply::Body(body)).unwrap(); } } @@ -1219,11 +1240,11 @@ struct TestService { _timeout: Option, } -#[derive(Clone, Debug)] +#[derive(Debug)] enum Reply { Status(hyper::StatusCode), Header(HeaderName, HeaderValue), - Body(Vec), + Body(hyper::Body), End, } @@ -1257,7 +1278,7 @@ impl Service for TestService { let tx2 = self.tx.clone(); let replies = self.reply.clone(); - Box::new(req.into_parts().1.for_each(move |chunk| { + Box::new(req.into_body().into_stream().for_each(move |chunk| { tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); Ok(()) }).then(move |result| { @@ -1278,7 +1299,7 @@ impl Service for TestService { res.headers_mut().insert(name, value); }, Reply::Body(body) => { - *res.body_mut() = body.into(); + *res.body_mut() = body; }, Reply::End => break, }