Skip to content

Commit c9b0b7f

Browse files
committed
Return UnknownStream from Connection::reset for closed/reset streams
Improves consistency with other stream interfaces. H3 logic was updated to ignore errors to preserve existing behavior, but future work might want to insert some unwraps there if static correctness guarantees are available.
1 parent b6c536a commit c9b0b7f

File tree

8 files changed

+37
-20
lines changed

8 files changed

+37
-20
lines changed

quinn-h3/src/connection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ impl ConnectionInner {
233233
}
234234
Side::Server => {
235235
if self.inner.is_closing() {
236-
send.reset(ErrorCode::REQUEST_REJECTED.into());
236+
let _ = send.reset(ErrorCode::REQUEST_REJECTED.into());
237237
let _ = recv.stop(ErrorCode::REQUEST_REJECTED.into());
238238
} else {
239239
self.inner.request_initiated(send.id());

quinn-h3/src/data.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ where
9292
}
9393
_ => {
9494
if let Some(ref mut send) = self.send.take() {
95-
send.reset(ErrorCode::REQUEST_CANCELLED.into());
95+
let _ = send.reset(ErrorCode::REQUEST_CANCELLED.into());
9696
}
9797
}
9898
}

quinn-h3/src/frame.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ where
143143

144144
pub fn reset(&mut self, err_code: ErrorCode) {
145145
if let Some(ref mut s) = self.send {
146-
s.reset(err_code.into());
146+
let _ = s.reset(err_code.into());
147147
}
148148
}
149149
}

quinn-h3/src/server.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ impl RecvRequest {
573573
recv.reset(ErrorCode::REQUEST_REJECTED);
574574
}
575575
if let Some(mut send) = self.send.take() {
576-
send.reset(ErrorCode::REQUEST_REJECTED.into());
576+
let _ = send.reset(ErrorCode::REQUEST_REJECTED.into());
577577
}
578578
}
579579

@@ -604,7 +604,7 @@ impl RecvRequest {
604604
r.reset(ErrorCode::REQUEST_REJECTED);
605605
}
606606
if let Some(s) = &mut self.send {
607-
s.reset(ErrorCode::REQUEST_REJECTED.into());
607+
let _ = s.reset(ErrorCode::REQUEST_REJECTED.into());
608608
}
609609
return Err(Error::peer(format!(
610610
"Tried an non indempotent method in 0-RTT: {}",
@@ -718,7 +718,8 @@ impl Sender {
718718
/// Cancelling a request means that some request data have been processed by the application, which
719719
/// decided to abandon the response.
720720
pub fn cancel(&mut self) {
721-
self.send
721+
let _ = self
722+
.send
722723
.as_mut()
723724
.unwrap()
724725
.reset(ErrorCode::REQUEST_CANCELLED.into());

quinn-proto/src/connection/mod.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -1304,24 +1304,30 @@ where
13041304
///
13051305
/// # Panics
13061306
/// - when applied to a receive stream or an unopened send stream
1307-
pub fn reset(&mut self, stream_id: StreamId, error_code: VarInt) {
1308-
self.reset_inner(stream_id, error_code, false);
1307+
pub fn reset(&mut self, stream_id: StreamId, error_code: VarInt) -> Result<(), UnknownStream> {
1308+
self.reset_inner(stream_id, error_code, false)
13091309
}
13101310

13111311
/// `stopped` should be set iff this is an internal implicit reset due to `STOP_SENDING`
1312-
fn reset_inner(&mut self, stream_id: StreamId, error_code: VarInt, stopped: bool) {
1312+
fn reset_inner(
1313+
&mut self,
1314+
stream_id: StreamId,
1315+
error_code: VarInt,
1316+
stopped: bool,
1317+
) -> Result<(), UnknownStream> {
13131318
assert!(
13141319
stream_id.dir() == Dir::Bi || stream_id.initiator() == self.side,
13151320
"only streams supporting outgoing data may be reset"
13161321
);
13171322

13181323
let stop_reason = if stopped { Some(error_code) } else { None };
1319-
self.streams.reset(stream_id, stop_reason);
1324+
self.streams.reset(stream_id, stop_reason)?;
13201325

13211326
self.spaces[SpaceId::Data as usize]
13221327
.pending
13231328
.reset_stream
13241329
.push((stream_id, error_code));
1330+
Ok(())
13251331
}
13261332

13271333
/// Handle the already-decrypted first packet from the client
@@ -2106,7 +2112,8 @@ where
21062112
"STOP_SENDING on unopened stream",
21072113
));
21082114
}
2109-
self.reset_inner(id, error_code, true);
2115+
// Ignore errors from the stream already being gone
2116+
let _ = self.reset_inner(id, error_code, true);
21102117
}
21112118
Frame::RetireConnectionId { sequence } => {
21122119
if self.endpoint_config.local_cid_len == 0 {

quinn-proto/src/connection/streams.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -344,15 +344,19 @@ impl Streams {
344344
///
345345
/// Does not cause the actual RESET_STREAM frame to be sent, just updates internal
346346
/// state.
347-
pub fn reset(&mut self, id: StreamId, stop_reason: Option<VarInt>) {
347+
pub fn reset(
348+
&mut self,
349+
id: StreamId,
350+
stop_reason: Option<VarInt>,
351+
) -> Result<(), UnknownStream> {
348352
let stream = match self.send.get_mut(&id) {
349353
Some(ss) => ss,
350-
None => return,
354+
None => return Err(UnknownStream { _private: () }),
351355
};
352356

353357
if matches!(stream.state, SendState::ResetSent { .. } | SendState::ResetRecvd { .. }) {
354-
// Ignore redundant reset calls
355-
return;
358+
// Redundant reset call
359+
return Err(UnknownStream { _private: () });
356360
}
357361

358362
// Restore the portion of the send window consumed by the data that we aren't about to
@@ -367,6 +371,8 @@ impl Streams {
367371
if stop_reason.is_some() && !stream.is_closed() {
368372
self.on_stream_frame(false, id);
369373
}
374+
375+
Ok(())
370376
}
371377

372378
pub fn reset_acked(&mut self, id: StreamId) {

quinn-proto/src/tests/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ fn reset_stream() {
231231

232232
info!("resetting stream");
233233
const ERROR: VarInt = VarInt(42);
234-
pair.client_conn_mut(client_ch).reset(s, ERROR);
234+
pair.client_conn_mut(client_ch).reset(s, ERROR).unwrap();
235235
pair.drive();
236236

237237
assert_matches!(
@@ -811,7 +811,9 @@ fn test_flow_control(config: TransportConfig, window_size: usize) {
811811
Err(WriteError::Blocked)
812812
);
813813
pair.drive();
814-
pair.client_conn_mut(client_conn).reset(s, VarInt(42));
814+
pair.client_conn_mut(client_conn)
815+
.reset(s, VarInt(42))
816+
.unwrap();
815817
pair.drive();
816818
assert_eq!(
817819
pair.server_conn_mut(server_conn).read(s, &mut buf),

quinn/src/streams.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,14 @@ where
139139
/// No new data can be written after calling this method. Locally buffered data is dropped, and
140140
/// previously transmitted data will no longer be retransmitted if lost. If an attempt has
141141
/// already been made to finish the stream, the peer may still receive all written data.
142-
pub fn reset(&mut self, error_code: VarInt) {
142+
pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
143143
let mut conn = self.conn.lock().unwrap();
144144
if self.is_0rtt && conn.check_0rtt().is_err() {
145-
return;
145+
return Ok(());
146146
}
147-
conn.inner.reset(self.stream, error_code);
147+
conn.inner.reset(self.stream, error_code)?;
148148
conn.wake();
149+
Ok(())
149150
}
150151

151152
#[doc(hidden)]

0 commit comments

Comments
 (0)