diff --git a/src/client.rs b/src/client.rs index a6c649811..17c302dbf 100644 --- a/src/client.rs +++ b/src/client.rs @@ -534,6 +534,11 @@ where pub fn is_extended_connect_protocol_enabled(&self) -> bool { self.inner.is_extended_connect_protocol_enabled() } + + /// Returns negotiated max send streams + pub fn max_send_streams(&self) -> usize { + self.inner.max_send_streams() + } } impl fmt::Debug for SendRequest diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 70dfc7851..d348ec6d8 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -1,5 +1,6 @@ use super::*; +use std::task::{Context, Waker}; use std::usize; #[derive(Debug)] @@ -25,6 +26,11 @@ pub(super) struct Counts { /// Current number of pending locally reset streams num_reset_streams: usize, + + /// If remote settings were applied + remote_settings_applied: bool, + + remote_settings_applied_task: Option, } impl Counts { @@ -38,6 +44,8 @@ impl Counts { num_recv_streams: 0, max_reset_streams: config.local_reset_max, num_reset_streams: 0, + remote_settings_applied: false, + remote_settings_applied_task: None, } } @@ -108,6 +116,8 @@ impl Counts { if let Some(val) = settings.max_concurrent_streams() { self.max_send_streams = val as usize; } + self.remote_settings_applied = true; + self.notify_remote_settings_applied() } /// Run a block of code that could potentially transition a stream's state. @@ -173,6 +183,16 @@ impl Counts { self.max_send_streams } + /// Returns if remote settings were applied + pub(crate) fn remote_settings_applied(&self) -> bool { + self.remote_settings_applied + } + + /// Sets waker task for remote settings being set + pub(crate) fn wait_remote_settings_applied(&mut self, cx: &Context) { + self.remote_settings_applied_task = Some(cx.waker().clone()); + } + /// Returns the maximum number of streams that can be initiated by the /// remote peer. pub(crate) fn max_recv_streams(&self) -> usize { @@ -197,6 +217,12 @@ impl Counts { assert!(self.num_reset_streams > 0); self.num_reset_streams -= 1; } + + fn notify_remote_settings_applied(&mut self) { + if let Some(task) = self.remote_settings_applied_task.take() { + task.wake(); + } + } } impl Drop for Counts { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 3e7ae97d9..ede96ca36 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -922,6 +922,10 @@ where me.actions.ensure_no_conn_error()?; me.actions.send.ensure_next_stream_id()?; + if !me.counts.remote_settings_applied() { + me.counts.wait_remote_settings_applied(cx); + return Poll::Pending; + } if let Some(pending) = pending { let mut stream = me.store.resolve(pending.key);