Skip to content

Commit

Permalink
Add ability to adjust INITIAL_WINDOW_SIZE setting on an existing conn…
Browse files Browse the repository at this point in the history
…ection
  • Loading branch information
seanmonstar committed Oct 4, 2019
1 parent 367206b commit d7d3ff7
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 23 deletions.
19 changes: 19 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,25 @@ where
self.inner.set_target_window_size(size);
}

/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
/// The `SETTINGS` will be sent to the remote, and only applied once the
/// remote acknowledges the change.
///
/// This can be used to increase or decrease the window size for existing
/// streams.
///
/// # Errors
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.inner.set_initial_window_size(size)?;
Ok(())
}

/// Takes a `PingPong` instance from the connection.
///
/// # Note
Expand Down
4 changes: 4 additions & 0 deletions src/codec/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub enum UserError {

/// Calls `PingPong::send_ping` before receiving a pong.
SendPingWhilePending,

/// Tries to update local SETTINGS while ACK has not been received.
SendSettingsWhilePending,
}

// ===== impl RecvError =====
Expand Down Expand Up @@ -140,6 +143,7 @@ impl error::Error for UserError {
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
PollResetAfterSendResponse => "poll_reset after send_response is illegal",
SendPingWhilePending => "send_ping before received previous pong",
SendSettingsWhilePending => "sending SETTINGS before received previous ACK",
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::codec::RecvError;
use crate::codec::{RecvError, UserError};
use crate::frame::{Reason, StreamId};
use crate::{client, frame, proto, server};

Expand Down Expand Up @@ -99,16 +99,24 @@ where
codec,
go_away: GoAway::new(),
ping_pong: PingPong::new(),
settings: Settings::new(),
settings: Settings::new(config.settings),
streams,
_phantom: PhantomData,
}
}

pub fn set_target_window_size(&mut self, size: WindowSize) {
/// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
self.streams.set_target_connection_window_size(size);
}

/// Send a new SETTINGS frame with an updated initial window size.
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
let mut settings = frame::Settings::default();
settings.set_initial_window_size(Some(size));
self.settings.send_settings(settings)
}

/// Returns `Ready` when the connection is ready to receive a frame.
///
/// Returns `RecvError` as this may raise errors that are caused by delayed
Expand All @@ -119,7 +127,7 @@ where
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
ready!(self
.settings
.send_pending_ack(cx, &mut self.codec, &mut self.streams))?;
.poll_send(cx, &mut self.codec, &mut self.streams))?;
ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?;

Poll::Ready(Ok(()))
Expand Down Expand Up @@ -327,7 +335,8 @@ where
}
Some(Settings(frame)) => {
log::trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
self.settings
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
}
Some(GoAway(frame)) => {
log::trace!("recv GOAWAY; frame={:?}", frame);
Expand Down
109 changes: 94 additions & 15 deletions src/proto/settings.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,98 @@
use crate::codec::RecvError;
use crate::codec::{RecvError, UserError};
use crate::error::Reason;
use crate::frame;
use crate::proto::*;
use std::task::{Context, Poll};

#[derive(Debug)]
pub(crate) struct Settings {
/// Our local SETTINGS sync state with the remote.
local: Local,
/// Received SETTINGS frame pending processing. The ACK must be written to
/// the socket first then the settings applied **before** receiving any
/// further frames.
pending: Option<frame::Settings>,
remote: Option<frame::Settings>,
}

#[derive(Debug)]
enum Local {
/// We want to send these SETTINGS to the remote when the socket is ready.
ToSend(frame::Settings),
/// We have sent these SETTINGS and are waiting for the remote to ACK
/// before we apply them.
WaitingAck(frame::Settings),
/// Our local settings are in sync with the remote.
Synced,
}

impl Settings {
pub fn new() -> Self {
Settings { pending: None }
pub(crate) fn new(local: frame::Settings) -> Self {
Settings {
// We assume the initial local SETTINGS were flushed during
// the handshake process.
local: Local::WaitingAck(local),
remote: None,
}
}

pub fn recv_settings(&mut self, frame: frame::Settings) {
pub(crate) fn recv_settings<T, B, C, P>(
&mut self,
frame: frame::Settings,
codec: &mut Codec<T, B>,
streams: &mut Streams<C, P>,
) -> Result<(), RecvError>
where
T: AsyncWrite + Unpin,
B: Buf + Unpin,
C: Buf + Unpin,
P: Peer,
{
if frame.is_ack() {
log::debug!("received remote settings ack");
// TODO: handle acks
match &self.local {
Local::WaitingAck(local) => {
log::debug!("received settings ACK; applying {:?}", local);

if let Some(max) = local.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}

if let Some(max) = local.max_header_list_size() {
codec.set_max_recv_header_list_size(max as usize);
}

streams.apply_local_settings(local)?;
self.local = Local::Synced;
Ok(())
}
Local::ToSend(..) | Local::Synced => {
// We haven't sent any SETTINGS frames to be ACKed, so
// this is very bizarre! Remote is either buggy or malicious.
proto_err!(conn: "received unexpected settings ack");
Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
}
}
} else {
assert!(self.pending.is_none());
self.pending = Some(frame);
// We always ACK before reading more frames, so `remote` should
// always be none!
assert!(self.remote.is_none());
self.remote = Some(frame);
Ok(())
}
}

pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> {
assert!(!frame.is_ack());
match &self.local {
Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
Local::Synced => {
log::trace!("queue to send local settings: {:?}", frame);
self.local = Local::ToSend(frame);
Ok(())
}
}
}

pub fn send_pending_ack<T, B, C, P>(
pub(crate) fn poll_send<T, B, C, P>(
&mut self,
cx: &mut Context,
dst: &mut Codec<T, B>,
Expand All @@ -38,11 +104,8 @@ impl Settings {
C: Buf + Unpin,
P: Peer,
{
log::trace!("send_pending_ack; pending={:?}", self.pending);

if let Some(settings) = &self.pending {
if let Some(settings) = &self.remote {
if !dst.poll_ready(cx)?.is_ready() {
log::trace!("failed to send ACK");
return Poll::Pending;
}

Expand All @@ -61,7 +124,23 @@ impl Settings {
streams.apply_remote_settings(settings)?;
}

self.pending = None;
self.remote = None;

match &self.local {
Local::ToSend(settings) => {
if !dst.poll_ready(cx)?.is_ready() {
return Poll::Pending;
}

// Buffer the settings frame
dst.buffer(settings.clone().into())
.expect("invalid settings frame");
log::trace!("local settings sent; waiting for ack: {:?}", settings);

self.local = Local::WaitingAck(settings.clone());
}
Local::WaitingAck(..) | Local::Synced => {}
}

Poll::Ready(Ok(()))
}
Expand Down
20 changes: 18 additions & 2 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ impl FlowControl {
Ok(())
}

/// Decrement the window size.
/// Decrement the send-side window size.
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_window(&mut self, sz: WindowSize) {
pub fn dec_send_window(&mut self, sz: WindowSize) {
log::trace!(
"dec_window; sz={}; window={}, available={}",
sz,
Expand All @@ -146,6 +146,22 @@ impl FlowControl {
self.window_size -= sz;
}

/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_recv_window(&mut self, sz: WindowSize) {
log::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
self.available -= sz;
}

/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
Expand Down
61 changes: 61 additions & 0 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,67 @@ impl Recv {
}
}

pub(crate) fn apply_local_settings(
&mut self,
settings: &frame::Settings,
store: &mut Store,
) -> Result<(), RecvError> {
let target = if let Some(val) = settings.initial_window_size() {
val
} else {
return Ok(());
};

let old_sz = self.init_window_sz;
self.init_window_sz = target;

log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);

// Per RFC 7540 §6.9.2:
//
// In addition to changing the flow-control window for streams that are
// not yet active, a SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state). When the
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
// the size of all stream flow-control windows that it maintains by the
// difference between the new value and the old value.
//
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
// space in a flow-control window to become negative. A sender MUST
// track the negative flow-control window and MUST NOT send new
// flow-controlled frames until it receives WINDOW_UPDATE frames that
// cause the flow-control window to become positive.

if target < old_sz {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
log::trace!("decrementing all windows; dec={}", dec);

store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
Ok(())
})
} else if target > old_sz {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
log::trace!("incrementing all windows; inc={}", inc);
store.for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(RecvError::Connection)?;
stream.recv_flow.assign_capacity(inc);
Ok(())
})
} else {
// size is the same... so do nothing
Ok(())
}
}

pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
if !stream.state.is_recv_closed() {
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl Send {
store.for_each(|mut stream| {
let stream = &mut *stream;

stream.send_flow.dec_window(dec);
stream.send_flow.dec_send_window(dec);

// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
Expand Down
7 changes: 7 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,13 @@ where
)
}

pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

me.actions.recv.apply_local_settings(frame, &mut me.store)
}

pub fn send_request(
&mut self,
request: Request<()>,
Expand Down
19 changes: 19 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,25 @@ where
self.connection.set_target_window_size(size);
}

/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
/// The `SETTINGS` will be sent to the remote, and only applied once the
/// remote acknowledges the change.
///
/// This can be used to increase or decrease the window size for existing
/// streams.
///
/// # Errors
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.connection.set_initial_window_size(size)?;
Ok(())
}

/// Returns `Ready` when the underlying connection has closed.
///
/// If any new inbound streams are received during a call to `poll_closed`,
Expand Down
Loading

0 comments on commit d7d3ff7

Please sign in to comment.