Skip to content

Commit

Permalink
quinn-proto: remove unnecessary RecvState::Closed
Browse files Browse the repository at this point in the history
  • Loading branch information
djc authored and Ralith committed Feb 25, 2021
1 parent 241cc9c commit dd23094
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 21 deletions.
5 changes: 3 additions & 2 deletions quinn-proto/src/connection/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ impl Streams {
return Ok(ShouldTransmit(false));
}

let new_bytes = rs.ingest(frame, payload_len, self.data_recvd, self.local_max_data)?;
let (new_bytes, closed) =
rs.ingest(frame, payload_len, self.data_recvd, self.local_max_data)?;
self.data_recvd = self.data_recvd.saturating_add(new_bytes);

if !rs.stopped {
Expand All @@ -315,7 +316,7 @@ impl Streams {
}

// Stopped streams become closed instantly on FIN, so check whether we need to clean up
if rs.is_closed() {
if closed {
self.recv.remove(&stream);
self.stream_freed(stream, StreamHalf::Recv);
}
Expand Down
29 changes: 10 additions & 19 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ impl Recv {
}
}

/// Process a STREAM frame
///
/// Return value is `(number_of_new_bytes_ingested, stream_is_closed)`
pub(super) fn ingest(
&mut self,
frame: frame::Stream,
payload_len: usize,
received: u64,
max_data: u64,
) -> Result<u64, TransportError> {
) -> Result<(u64, bool), TransportError> {
let end = frame.offset + frame.data.len() as u64;
if end >= 2u64.pow(62) {
return Err(TransportError::FLOW_CONTROL_ERROR(
Expand All @@ -49,12 +52,10 @@ impl Recv {

let new_bytes = self.credit_consumed_by(end, received, max_data)?;

if frame.fin {
if self.stopped {
// Stopped streams don't need to wait for the actual data, they just need to know
// how much there was.
self.state = RecvState::Closed;
} else if let RecvState::Recv { ref mut size } = self.state {
// Stopped streams don't need to wait for the actual data, they just need to know
// how much there was.
if frame.fin && !self.stopped {
if let RecvState::Recv { ref mut size } = self.state {
*size = Some(end);
}
}
Expand All @@ -66,7 +67,7 @@ impl Recv {
self.assembler.set_bytes_read(end);
}

Ok(new_bytes)
Ok((new_bytes, frame.fin && self.stopped))
}

pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> StreamReadResult<Chunk> {
Expand Down Expand Up @@ -131,13 +132,10 @@ impl Recv {
fn read_blocked(&mut self) -> Result<(), ReadError> {
match self.state {
RecvState::ResetRecvd { error_code, .. } => {
self.state = RecvState::Closed;
Err(ReadError::Reset(error_code))
}
RecvState::Closed => Err(ReadError::UnknownStream),
RecvState::Recv { size } => {
if size == Some(self.end) && self.assembler.bytes_read() == self.end {
self.state = RecvState::Closed;
Ok(())
} else {
Err(ReadError::Blocked)
Expand Down Expand Up @@ -188,16 +186,10 @@ impl Recv {
matches!(self.state, RecvState::Recv { .. })
}

/// All data read by application
pub(super) fn is_closed(&self) -> bool {
self.state == self::RecvState::Closed
}

fn final_offset(&self) -> Option<u64> {
match self.state {
RecvState::Recv { size } => size,
RecvState::ResetRecvd { size, .. } => Some(size),
_ => None,
}
}

Expand All @@ -221,7 +213,7 @@ impl Recv {
}
self.credit_consumed_by(final_offset.into(), received, max_data)?;

if matches!(self.state, RecvState::ResetRecvd { .. } | RecvState::Closed) {
if matches!(self.state, RecvState::ResetRecvd { .. }) {
return Ok(false);
}
self.state = RecvState::ResetRecvd {
Expand Down Expand Up @@ -331,7 +323,6 @@ impl From<IllegalOrderedRead> for ReadError {
enum RecvState {
Recv { size: Option<u64> },
ResetRecvd { size: u64, error_code: VarInt },
Closed,
}

impl Default for RecvState {
Expand Down

0 comments on commit dd23094

Please sign in to comment.