From dd23094007e70592c12aa912f7b156f017ebaef1 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Wed, 24 Feb 2021 09:44:14 +0100 Subject: [PATCH] quinn-proto: remove unnecessary RecvState::Closed --- quinn-proto/src/connection/streams.rs | 5 ++-- quinn-proto/src/connection/streams/recv.rs | 29 ++++++++-------------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/quinn-proto/src/connection/streams.rs b/quinn-proto/src/connection/streams.rs index a8f3a687a..4cedd7325 100644 --- a/quinn-proto/src/connection/streams.rs +++ b/quinn-proto/src/connection/streams.rs @@ -306,7 +306,8 @@ impl Streams { return Ok(ShouldTransmit(false)); } - let new_bytes = rs.ingest(frame, payload_len, self.data_recvd, self.local_max_data)?; + let (new_bytes, closed) = + rs.ingest(frame, payload_len, self.data_recvd, self.local_max_data)?; self.data_recvd = self.data_recvd.saturating_add(new_bytes); if !rs.stopped { @@ -315,7 +316,7 @@ impl Streams { } // Stopped streams become closed instantly on FIN, so check whether we need to clean up - if rs.is_closed() { + if closed { self.recv.remove(&stream); self.stream_freed(stream, StreamHalf::Recv); } diff --git a/quinn-proto/src/connection/streams/recv.rs b/quinn-proto/src/connection/streams/recv.rs index e846bc77e..56a8f658b 100644 --- a/quinn-proto/src/connection/streams/recv.rs +++ b/quinn-proto/src/connection/streams/recv.rs @@ -26,13 +26,16 @@ impl Recv { } } + /// Process a STREAM frame + /// + /// Return value is `(number_of_new_bytes_ingested, stream_is_closed)` pub(super) fn ingest( &mut self, frame: frame::Stream, payload_len: usize, received: u64, max_data: u64, - ) -> Result { + ) -> Result<(u64, bool), TransportError> { let end = frame.offset + frame.data.len() as u64; if end >= 2u64.pow(62) { return Err(TransportError::FLOW_CONTROL_ERROR( @@ -49,12 +52,10 @@ impl Recv { let new_bytes = self.credit_consumed_by(end, received, max_data)?; - if frame.fin { - if self.stopped { - // Stopped streams don't need to wait for the actual data, they just need to know - // how much there was. - self.state = RecvState::Closed; - } else if let RecvState::Recv { ref mut size } = self.state { + // Stopped streams don't need to wait for the actual data, they just need to know + // how much there was. + if frame.fin && !self.stopped { + if let RecvState::Recv { ref mut size } = self.state { *size = Some(end); } } @@ -66,7 +67,7 @@ impl Recv { self.assembler.set_bytes_read(end); } - Ok(new_bytes) + Ok((new_bytes, frame.fin && self.stopped)) } pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> StreamReadResult { @@ -131,13 +132,10 @@ impl Recv { fn read_blocked(&mut self) -> Result<(), ReadError> { match self.state { RecvState::ResetRecvd { error_code, .. } => { - self.state = RecvState::Closed; Err(ReadError::Reset(error_code)) } - RecvState::Closed => Err(ReadError::UnknownStream), RecvState::Recv { size } => { if size == Some(self.end) && self.assembler.bytes_read() == self.end { - self.state = RecvState::Closed; Ok(()) } else { Err(ReadError::Blocked) @@ -188,16 +186,10 @@ impl Recv { matches!(self.state, RecvState::Recv { .. }) } - /// All data read by application - pub(super) fn is_closed(&self) -> bool { - self.state == self::RecvState::Closed - } - fn final_offset(&self) -> Option { match self.state { RecvState::Recv { size } => size, RecvState::ResetRecvd { size, .. } => Some(size), - _ => None, } } @@ -221,7 +213,7 @@ impl Recv { } self.credit_consumed_by(final_offset.into(), received, max_data)?; - if matches!(self.state, RecvState::ResetRecvd { .. } | RecvState::Closed) { + if matches!(self.state, RecvState::ResetRecvd { .. }) { return Ok(false); } self.state = RecvState::ResetRecvd { @@ -331,7 +323,6 @@ impl From for ReadError { enum RecvState { Recv { size: Option }, ResetRecvd { size: u64, error_code: VarInt }, - Closed, } impl Default for RecvState {