diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 92adffb75..5c18ef8c3 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -3,7 +3,7 @@ use crate::frame::{Error, Frame, Head, Kind}; use crate::hpack; use http::header::{self, HeaderName, HeaderValue}; -use http::{uri, HeaderMap, Method, StatusCode, Uri}; +use http::{uri, HeaderMap, Method, Request, StatusCode, Uri}; use bytes::{Bytes, BytesMut}; use string::String; @@ -300,9 +300,92 @@ impl fmt::Debug for Headers { } } +// ===== util ===== + +pub fn parse_u64(src: &[u8]) -> Result { + if src.len() > 19 { + // At danger for overflow... + return Err(()); + } + + let mut ret = 0; + + for &d in src { + if d < b'0' || d > b'9' { + return Err(()); + } + + ret *= 10; + ret += (d - b'0') as u64; + } + + Ok(ret) +} + // ===== impl PushPromise ===== +#[derive(Debug)] +pub enum PushPromiseHeaderError { + InvalidContentLength(Result), + NotSafeAndCacheable, +} + impl PushPromise { + pub fn new( + stream_id: StreamId, + promised_id: StreamId, + pseudo: Pseudo, + fields: HeaderMap, + ) -> Self { + PushPromise { + flags: PushPromiseFlag::default(), + header_block: HeaderBlock { + fields, + is_over_size: false, + pseudo, + }, + promised_id, + stream_id, + } + } + + pub fn validate_request(req: &Request<()>) -> Result<(), PushPromiseHeaderError> { + use PushPromiseHeaderError::*; + // The spec has some requirements for promised request headers + // [https://httpwg.org/specs/rfc7540.html#PushRequests] + + // A promised request "that indicates the presence of a request body + // MUST reset the promised stream with a stream error" + if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) { + let parsed_length = parse_u64(content_length.as_bytes()); + if parsed_length != Ok(0) { + return Err(InvalidContentLength(parsed_length)); + } + } + // "The server MUST include a method in the :method pseudo-header field + // that is safe and cacheable" + if !Self::safe_and_cacheable(req.method()) { + return Err(NotSafeAndCacheable); + } + + Ok(()) + } + + fn safe_and_cacheable(method: &Method) -> bool { + // Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods + // Safe: https://httpwg.org/specs/rfc7231.html#safe.methods + return method == Method::GET || method == Method::HEAD; + } + + pub fn fields(&self) -> &HeaderMap { + &self.header_block.fields + } + + #[cfg(feature = "unstable")] + pub fn into_fields(self) -> HeaderMap { + self.header_block.fields + } + /// Loads the push promise frame but doesn't actually do HPACK decoding. /// /// HPACK decoding is done in the `load_hpack` step. @@ -401,44 +484,13 @@ impl PushPromise { fn head(&self) -> Head { Head::new(Kind::PushPromise, self.flags.into(), self.stream_id) } -} -impl PushPromise { /// Consume `self`, returning the parts of the frame pub fn into_parts(self) -> (Pseudo, HeaderMap) { (self.header_block.pseudo, self.header_block.fields) } } -#[cfg(feature = "unstable")] -impl PushPromise { - pub fn new( - stream_id: StreamId, - promised_id: StreamId, - pseudo: Pseudo, - fields: HeaderMap, - ) -> Self { - PushPromise { - flags: PushPromiseFlag::default(), - header_block: HeaderBlock { - fields, - is_over_size: false, - pseudo, - }, - promised_id, - stream_id, - } - } - - pub fn fields(&self) -> &HeaderMap { - &self.header_block.fields - } - - pub fn into_fields(self) -> HeaderMap { - self.header_block.fields - } -} - impl From for Frame { fn from(src: PushPromise) -> Self { Frame::PushPromise(src) diff --git a/src/frame/mod.rs b/src/frame/mod.rs index ee371eeda..06d4d657c 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -42,7 +42,9 @@ mod window_update; pub use self::data::Data; pub use self::go_away::GoAway; pub use self::head::{Head, Kind}; -pub use self::headers::{Continuation, Headers, Pseudo, PushPromise}; +pub use self::headers::{ + parse_u64, Continuation, Headers, Pseudo, PushPromise, PushPromiseHeaderError, +}; pub use self::ping::Ping; pub use self::priority::{Priority, StreamDependency}; pub use self::reason::Reason; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index ce39b76bf..03e487511 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -111,7 +111,7 @@ impl Prioritize { pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option) { // If the stream is waiting to be opened, nothing more to do. - if !stream.is_pending_open { + if stream.is_send_ready() { log::trace!("schedule_send; {:?}", stream.id); // Queue the stream self.pending_send.push(stream); @@ -445,19 +445,9 @@ impl Prioritize { self.pending_capacity.push(stream); } - // If data is buffered and the stream is not pending open, then + // If data is buffered and the stream is send ready, then // schedule the stream for execution - // - // Why do we not push into pending_send when the stream is in pending_open? - // - // We allow users to call send_request() which schedules a stream to be pending_open - // if there is no room according to the concurrency limit (max_send_streams), and we - // also allow data to be buffered for send with send_data() if there is no capacity for - // the stream to send the data, which attempts to place the stream in pending_send. - // If the stream is not open, we don't want the stream to be scheduled for - // execution (pending_send). Note that if the stream is in pending_open, it will be - // pushed to pending_send when there is room for an open stream. - if stream.buffered_send_data > 0 && !stream.is_pending_open { + if stream.buffered_send_data > 0 && stream.is_send_ready() { // TODO: This assertion isn't *exactly* correct. There can still be // buffered send data while the stream's pending send queue is // empty. This can happen when a large data frame is in the process @@ -766,6 +756,22 @@ impl Prioritize { stream: stream.key(), })) } + Some(Frame::PushPromise(pp)) => { + let mut pushed = + stream.store_mut().find_mut(&pp.promised_id()).unwrap(); + pushed.is_pending_push = false; + // Transition stream from pending_push to pending_open + // if possible + if !pushed.pending_send.is_empty() { + if counts.can_inc_num_send_streams() { + counts.inc_num_send_streams(&mut pushed); + self.pending_send.push(&mut pushed); + } else { + self.queue_open(&mut pushed); + } + } + Frame::PushPromise(pp) + } Some(frame) => frame.map(|_| { unreachable!( "Frame::map closure will only be called \ diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 27e8049a6..4295aaa02 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,10 +1,10 @@ use super::*; use crate::codec::{RecvError, UserError}; -use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use crate::{frame, proto}; use std::task::Context; -use http::{HeaderMap, Method, Request, Response}; +use http::{HeaderMap, Request, Response}; use std::io; use std::task::{Poll, Waker}; @@ -178,7 +178,7 @@ impl Recv { use http::header; if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { - let content_length = match parse_u64(content_length.as_bytes()) { + let content_length = match frame::parse_u64(content_length.as_bytes()) { Ok(v) => v, Err(()) => { proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); @@ -632,44 +632,31 @@ impl Recv { } let promised_id = frame.promised_id(); - use http::header; let (pseudo, fields) = frame.into_parts(); let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; - // The spec has some requirements for promised request headers - // [https://httpwg.org/specs/rfc7540.html#PushRequests] - - // A promised request "that indicates the presence of a request body - // MUST reset the promised stream with a stream error" - if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) { - match parse_u64(content_length.as_bytes()) { - Ok(0) => {} - otherwise => { - proto_err!(stream: - "recv_push_promise; promised request has content-length {:?}; promised_id={:?}", - otherwise, - promised_id, - ); - return Err(RecvError::Stream { - id: promised_id, - reason: Reason::PROTOCOL_ERROR, - }); - } + + if let Err(e) = frame::PushPromise::validate_request(&req) { + use PushPromiseHeaderError::*; + match e { + NotSafeAndCacheable => proto_err!( + stream: + "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", + req.method(), + promised_id, + ), + InvalidContentLength(e) => proto_err!( + stream: + "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}", + e, + promised_id, + ), } - } - // "The server MUST include a method in the :method pseudo-header field - // that is safe and cacheable" - if !Self::safe_and_cacheable(req.method()) { - proto_err!( - stream: - "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", - req.method(), - promised_id, - ); return Err(RecvError::Stream { id: promised_id, reason: Reason::PROTOCOL_ERROR, }); } + use super::peer::PollMessage::*; stream .pending_recv @@ -678,12 +665,6 @@ impl Recv { Ok(()) } - fn safe_and_cacheable(method: &Method) -> bool { - // Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods - // Safe: https://httpwg.org/specs/rfc7231.html#safe.methods - method == Method::GET || method == Method::HEAD - } - /// Ensures that `id` is not in the `Idle` state. pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if let Ok(next) = self.next_stream_id { @@ -1057,25 +1038,3 @@ impl From for RecvHeaderBlockError { RecvHeaderBlockError::State(err) } } - -// ===== util ===== - -fn parse_u64(src: &[u8]) -> Result { - if src.len() > 19 { - // At danger for overflow... - return Err(()); - } - - let mut ret = 0; - - for &d in src { - if d < b'0' || d > b'9' { - return Err(()); - } - - ret *= 10; - ret += u64::from(d - b'0'); - } - - Ok(ret) -} diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 70e0fe12c..0e114aef8 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -53,6 +53,53 @@ impl Send { Ok(stream_id) } + pub fn reserve_local(&mut self) -> Result { + let stream_id = self.ensure_next_stream_id()?; + self.next_stream_id = stream_id.next_id(); + Ok(stream_id) + } + + fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> { + // 8.1.2.2. Connection-Specific Header Fields + if fields.contains_key(http::header::CONNECTION) + || fields.contains_key(http::header::TRANSFER_ENCODING) + || fields.contains_key(http::header::UPGRADE) + || fields.contains_key("keep-alive") + || fields.contains_key("proxy-connection") + { + log::debug!("illegal connection-specific headers found"); + return Err(UserError::MalformedHeaders); + } else if let Some(te) = fields.get(http::header::TE) { + if te != "trailers" { + log::debug!("illegal connection-specific headers found"); + return Err(UserError::MalformedHeaders); + } + } + Ok(()) + } + + pub fn send_push_promise( + &mut self, + frame: frame::PushPromise, + buffer: &mut Buffer>, + stream: &mut store::Ptr, + task: &mut Option, + ) -> Result<(), UserError> { + log::trace!( + "send_push_promise; frame={:?}; init_window={:?}", + frame, + self.init_window_sz + ); + + Self::check_headers(frame.fields())?; + + // Queue the frame for sending + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + pub fn send_headers( &mut self, frame: frame::Headers, @@ -67,21 +114,7 @@ impl Send { self.init_window_sz ); - // 8.1.2.2. Connection-Specific Header Fields - if frame.fields().contains_key(http::header::CONNECTION) - || frame.fields().contains_key(http::header::TRANSFER_ENCODING) - || frame.fields().contains_key(http::header::UPGRADE) - || frame.fields().contains_key("keep-alive") - || frame.fields().contains_key("proxy-connection") - { - log::debug!("illegal connection-specific headers found"); - return Err(UserError::MalformedHeaders); - } else if let Some(te) = frame.fields().get(http::header::TE) { - if te != "trailers" { - log::debug!("illegal connection-specific headers found"); - return Err(UserError::MalformedHeaders); - } - } + Self::check_headers(frame.fields())?; if frame.has_too_big_field() { return Err(UserError::HeaderTooBig); @@ -93,10 +126,14 @@ impl Send { stream.state.send_open(end_stream)?; if counts.peer().is_local_init(frame.stream_id()) { - if counts.can_inc_num_send_streams() { - counts.inc_num_send_streams(stream); - } else { - self.prioritize.queue_open(stream); + // If we're waiting on a PushPromise anyway + // handle potentially queueing the stream at that point + if !stream.is_pending_push { + if counts.can_inc_num_send_streams() { + counts.inc_num_send_streams(stream); + } else { + self.prioritize.queue_open(stream); + } } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index cd162bea7..26323124d 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -57,7 +57,7 @@ pub struct State { enum Inner { Idle, // TODO: these states shouldn't count against concurrency limits: - //ReservedLocal, + ReservedLocal, ReservedRemote, Open { local: Peer, remote: Peer }, HalfClosedLocal(Peer), // TODO: explicitly name this value @@ -114,7 +114,7 @@ impl State { Open { local, remote } } } - HalfClosedRemote(AwaitingHeaders) => { + HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { if eos { Closed(Cause::EndStream) } else { @@ -200,6 +200,17 @@ impl State { } } + /// Transition from Idle -> ReservedLocal + pub fn reserve_local(&mut self) -> Result<(), UserError> { + match self.inner { + Idle => { + self.inner = ReservedLocal; + Ok(()) + } + _ => Err(UserError::UnexpectedFrameType), + } + } + /// Indicates that the remote side will not send more data to the local. pub fn recv_close(&mut self) -> Result<(), RecvError> { match self.inner { @@ -384,7 +395,7 @@ impl State { pub fn is_recv_closed(&self) -> bool { match self.inner { - Closed(..) | HalfClosedRemote(..) => true, + Closed(..) | HalfClosedRemote(..) | ReservedLocal => true, _ => false, } } @@ -410,7 +421,7 @@ impl State { | Closed(Cause::LocallyReset(reason)) | Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), - Closed(Cause::EndStream) | HalfClosedRemote(..) => Ok(false), + Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), _ => Ok(true), } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 075d71fe1..398672049 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -69,6 +69,9 @@ pub(super) struct Stream { /// Set to true when the stream is pending to be opened pub is_pending_open: bool, + /// Set to true when a push is pending for this stream + pub is_pending_push: bool, + // ===== Fields related to receiving ===== /// Next node in the accept linked list pub next_pending_accept: Option, @@ -165,6 +168,7 @@ impl Stream { send_capacity_inc: false, is_pending_open: false, next_open: None, + is_pending_push: false, // ===== Fields related to receiving ===== next_pending_accept: None, @@ -200,6 +204,26 @@ impl Stream { self.reset_at.is_some() } + /// Returns true if frames for this stream are ready to be sent over the wire + pub fn is_send_ready(&self) -> bool { + // Why do we check pending_open? + // + // We allow users to call send_request() which schedules a stream to be pending_open + // if there is no room according to the concurrency limit (max_send_streams), and we + // also allow data to be buffered for send with send_data() if there is no capacity for + // the stream to send the data, which attempts to place the stream in pending_send. + // If the stream is not open, we don't want the stream to be scheduled for + // execution (pending_send). Note that if the stream is in pending_open, it will be + // pushed to pending_send when there is room for an open stream. + // + // In pending_push we track whether a PushPromise still needs to be sent + // from a different stream before we can start sending frames on this one. + // This is different from the "open" check because reserved streams don't count + // toward the concurrency limit. + // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 + !self.is_pending_open && !self.is_pending_push + } + /// Returns true if the stream is closed pub fn is_closed(&self) -> bool { // The state has fully transitioned to closed. diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 4662195e3..da12dafb4 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -973,6 +973,56 @@ impl StreamRef { }) } + pub fn send_push_promise(&mut self, request: Request<()>) -> Result, UserError> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + let actions = &mut me.actions; + let promised_id = actions.send.reserve_local()?; + + let child_key = { + let mut child_stream = me.store.insert( + promised_id, + Stream::new( + promised_id, + actions.send.init_window_sz(), + actions.recv.init_window_sz(), + ), + ); + child_stream.state.reserve_local()?; + child_stream.is_pending_push = true; + child_stream.key() + }; + + let pushed = { + let mut stream = me.store.resolve(self.opaque.key); + + let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?; + + actions + .send + .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task) + }; + + if let Err(err) = pushed { + let mut child_stream = me.store.resolve(child_key); + child_stream.unlink(); + child_stream.remove(); + return Err(err.into()); + } + + let opaque = + OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key)); + + Ok(StreamRef { + opaque, + send_buffer: self.send_buffer.clone(), + }) + } + /// Called by the server after the stream is accepted. Given that clients /// initialize streams by sending HEADERS, the request will always be /// available. diff --git a/src/server.rs b/src/server.rs index c7cdc66d6..a71ad21a1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -115,8 +115,8 @@ //! [`SendStream`]: ../struct.SendStream.html //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html -use crate::codec::{Codec, RecvError}; -use crate::frame::{self, Pseudo, Reason, Settings, StreamId}; +use crate::codec::{Codec, RecvError, UserError}; +use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; use crate::proto::{self, Config, Prioritized}; use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; @@ -249,8 +249,7 @@ pub struct Builder { /// explicitly reset the stream with a custom reason. /// /// It will also be used to initiate push promises linked with the associated -/// stream. This is [not yet -/// implemented](https://github.com/hyperium/h2/issues/185). +/// stream. /// /// If the `SendResponse` instance is dropped without sending a response, then /// the HTTP/2.0 stream will be reset. @@ -263,6 +262,34 @@ pub struct SendResponse { inner: proto::StreamRef, } +/// Send a response to a promised request +/// +/// A `SendPushedResponse` instance is provided when promising a request and is used +/// to send the associated response to the client. It is also used to +/// explicitly reset the stream with a custom reason. +/// +/// It can not be used to initiate push promises. +/// +/// If the `SendPushedResponse` instance is dropped without sending a response, then +/// the HTTP/2.0 stream will be reset. +/// +/// See [module] level docs for more details. +/// +/// [module]: index.html +pub struct SendPushedResponse { + inner: SendResponse, +} + +// Manual implementation necessary because of rust-lang/rust#26925 +impl fmt::Debug for SendPushedResponse +where + ::Buf: std::fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SendPushedResponse {{ {:?} }}", self.inner) + } +} + /// Stages of an in-progress handshake. enum Handshaking { /// State 1. Connection is flushing pending SETTINGS frame. @@ -924,6 +951,23 @@ impl SendResponse { .map_err(Into::into) } + /// Push a request and response to the client + /// + /// On success, a [`SendResponse`] instance is returned. + /// + /// [`SendResponse`]: # + pub fn push_request( + &mut self, + request: Request<()>, + ) -> Result, crate::Error> { + self.inner + .send_push_promise(request) + .map(|inner| SendPushedResponse { + inner: SendResponse { inner }, + }) + .map_err(Into::into) + } + /// Send a stream reset to the peer. /// /// This essentially cancels the stream, including any inbound or outbound @@ -967,8 +1011,78 @@ impl SendResponse { pub fn stream_id(&self) -> crate::StreamId { crate::StreamId::from_internal(self.inner.stream_id()) } +} + +// ===== impl SendPushedResponse ===== + +impl SendPushedResponse { + /// Send a response to a promised request. + /// + /// On success, a [`SendStream`] instance is returned. This instance can be + /// used to stream the response body and send trailers. + /// + /// If a body or trailers will be sent on the returned [`SendStream`] + /// instance, then `end_of_stream` must be set to `false` when calling this + /// function. + /// + /// The [`SendPushedResponse`] instance is associated with a promised + /// request. This function may only be called once per instance and only if + /// [`send_reset`] has not been previously called. + /// + /// [`SendPushedResponse`]: # + /// [`SendStream`]: ../struct.SendStream.html + /// [`send_reset`]: #method.send_reset + pub fn send_response( + &mut self, + response: Response<()>, + end_of_stream: bool, + ) -> Result, crate::Error> { + self.inner.send_response(response, end_of_stream) + } + + /// Send a stream reset to the peer. + /// + /// This essentially cancels the stream, including any inbound or outbound + /// data streams. + /// + /// If this function is called before [`send_response`], a call to + /// [`send_response`] will result in an error. + /// + /// If this function is called while a [`SendStream`] instance is active, + /// any further use of the instance will result in an error. + /// + /// This function should only be called once. + /// + /// [`send_response`]: #method.send_response + /// [`SendStream`]: ../struct.SendStream.html + pub fn send_reset(&mut self, reason: Reason) { + self.inner.send_reset(reason) + } + + /// Polls to be notified when the client resets this stream. + /// + /// If stream is still open, this returns `Poll::Pending`, and + /// registers the task to be notified if a `RST_STREAM` is received. + /// + /// If a `RST_STREAM` frame is received for this stream, calling this + /// method will yield the `Reason` for the reset. + /// + /// # Error + /// + /// Calling this method after having called `send_response` will return + /// a user error. + pub fn poll_reset(&mut self, cx: &mut Context) -> Poll> { + self.inner.poll_reset(cx) + } - // TODO: Support reserving push promises. + /// Returns the stream ID of the response stream. + /// + /// # Panics + /// + /// If the lock on the strean store has been poisoned. + pub fn stream_id(&self) -> crate::StreamId { + self.inner.stream_id() + } } // ===== impl Flush ===== @@ -1153,6 +1267,51 @@ impl Peer { frame } + + pub fn convert_push_message( + stream_id: StreamId, + promised_id: StreamId, + request: Request<()>, + ) -> Result { + use http::request::Parts; + + if let Err(e) = frame::PushPromise::validate_request(&request) { + use PushPromiseHeaderError::*; + match e { + NotSafeAndCacheable => log::debug!( + "convert_push_message: method {} is not safe and cacheable; promised_id={:?}", + request.method(), + promised_id, + ), + InvalidContentLength(e) => log::debug!( + "convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}", + e, + promised_id, + ), + } + return Err(UserError::MalformedHeaders); + } + + // Extract the components of the HTTP request + let ( + Parts { + method, + uri, + headers, + .. + }, + _, + ) = request.into_parts(); + + let pseudo = Pseudo::request(method, uri); + + Ok(frame::PushPromise::new( + stream_id, + promised_id, + pseudo, + headers, + )) + } } impl proto::Peer for Peer { diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index b49a17d33..25bbd73cf 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -105,6 +105,307 @@ async fn serve_request() { join(client, srv).await; } +#[tokio::test] +async fn push_request() { + let _ = env_logger::try_init(); + let (io, mut client) = mock::new(); + + let client = async move { + client + .assert_server_handshake_with_settings(frames::settings().max_concurrent_streams(100)) + .await; + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + client + .recv_frame( + frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"), + ) + .await; + client + .recv_frame(frames::headers(2).response(200).eos()) + .await; + client + .recv_frame( + frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style2.css"), + ) + .await; + client + .recv_frame(frames::headers(4).response(200).eos()) + .await; + client + .recv_frame(frames::headers(1).response(200).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Promise stream 2 + let mut pushed_s2 = { + let req = http::Request::builder() + .method("GET") + .uri("https://http2.akamai.com/style.css") + .body(()) + .unwrap(); + stream.push_request(req).unwrap() + }; + + // Promise stream 4 and push response headers + { + let req = http::Request::builder() + .method("GET") + .uri("https://http2.akamai.com/style2.css") + .body(()) + .unwrap(); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream + .push_request(req) + .unwrap() + .send_response(rsp, true) + .unwrap(); + } + + // Push response to stream 2 + { + let rsp = http::Response::builder().status(200).body(()).unwrap(); + pushed_s2.send_response(rsp, true).unwrap(); + } + + // Send response for stream 1 + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn push_request_against_concurrency() { + let _ = env_logger::try_init(); + let (io, mut client) = mock::new(); + + let client = async move { + client + .assert_server_handshake_with_settings(frames::settings().max_concurrent_streams(1)) + .await; + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + client + .recv_frame( + frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"), + ) + .await; + client.recv_frame(frames::headers(2).response(200)).await; + client + .recv_frame( + frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style2.css"), + ) + .await; + client.recv_frame(frames::data(2, &b""[..]).eos()).await; + client + .recv_frame(frames::headers(1).response(200).eos()) + .await; + client + .recv_frame(frames::headers(4).response(200).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Promise stream 2 and start response (concurrency limit reached) + let mut s2_tx = { + let req = http::Request::builder() + .method("GET") + .uri("https://http2.akamai.com/style.css") + .body(()) + .unwrap(); + let mut pushed_stream = stream.push_request(req).unwrap(); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + pushed_stream.send_response(rsp, false).unwrap() + }; + + // Promise stream 4 and push response + { + let pushed_req = http::Request::builder() + .method("GET") + .uri("https://http2.akamai.com/style2.css") + .body(()) + .unwrap(); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream + .push_request(pushed_req) + .unwrap() + .send_response(rsp, true) + .unwrap(); + } + + // Send and finish response for stream 1 + { + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + } + + // Finish response for stream 2 (at which point stream 4 will be sent) + s2_tx.send_data(vec![0; 0].into(), true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn push_request_with_data() { + let _ = env_logger::try_init(); + let (io, mut client) = mock::new(); + + let client = async move { + client + .assert_server_handshake_with_settings(frames::settings().max_concurrent_streams(100)) + .await; + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + client.recv_frame(frames::headers(1).response(200)).await; + client + .recv_frame( + frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"), + ) + .await; + client.recv_frame(frames::headers(2).response(200)).await; + client.recv_frame(frames::data(1, &b""[..]).eos()).await; + client.recv_frame(frames::data(2, &b"\x00"[..]).eos()).await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Start response to stream 1 + let mut s1_tx = { + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, false).unwrap() + }; + + // Promise stream 2, push response headers and send data + { + let pushed_req = http::Request::builder() + .method("GET") + .uri("https://http2.akamai.com/style.css") + .body(()) + .unwrap(); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + let mut push_tx = stream + .push_request(pushed_req) + .unwrap() + .send_response(rsp, false) + .unwrap(); + // Make sure nothing can queue our pushed stream before we have the PushPromise sent + push_tx.send_data(vec![0; 1].into(), true).unwrap(); + push_tx.reserve_capacity(1); + } + + // End response for stream 1 + s1_tx.send_data(vec![0; 0].into(), true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn push_request_between_data() { + let _ = env_logger::try_init(); + let (io, mut client) = mock::new(); + + let client = async move { + client + .assert_server_handshake_with_settings(frames::settings().max_concurrent_streams(100)) + .await; + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + client.recv_frame(frames::headers(1).response(200)).await; + client.recv_frame(frames::data(1, &b""[..])).await; + client + .recv_frame( + frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"), + ) + .await; + client + .recv_frame(frames::headers(2).response(200).eos()) + .await; + client.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Push response to stream 1 and send some data + let mut s1_tx = { + let rsp = http::Response::builder().status(200).body(()).unwrap(); + let mut tx = stream.send_response(rsp, false).unwrap(); + tx.send_data(vec![0; 0].into(), false).unwrap(); + tx + }; + + // Promise stream 2 and push response headers + { + let pushed_req = http::Request::builder() + .method("GET") + .uri("https://http2.akamai.com/style.css") + .body(()) + .unwrap(); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream + .push_request(pushed_req) + .unwrap() + .send_response(rsp, true) + .unwrap(); + } + + // End response for stream 1 + s1_tx.send_data(vec![0; 0].into(), true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + #[test] #[ignore] fn accept_with_pending_connections_after_socket_close() {}