Skip to content

Commit

Permalink
Expose iterator-like read API
Browse files Browse the repository at this point in the history
  • Loading branch information
djc authored and Ralith committed Feb 25, 2021
1 parent dd23094 commit 24cf82e
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 360 deletions.
42 changes: 7 additions & 35 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ pub use stats::ConnectionStats;

mod streams;
pub use streams::Streams;
pub use streams::{FinishError, ReadError, ShouldTransmit, StreamEvent, UnknownStream, WriteError};
pub use streams::{
Chunks, FinishError, ReadError, ReadableError, ShouldTransmit, StreamEvent, UnknownStream,
WriteError,
};

mod timer;
use timer::{Timer, TimerTable};
Expand Down Expand Up @@ -1088,40 +1091,9 @@ where
/// control window is filled. On any given stream, you can switch from ordered to unordered
/// reads, but ordered reads on streams that have seen previous unordered reads will return
/// `ReadError::IllegalOrderedRead`.
pub fn read(
&mut self,
id: StreamId,
max_length: usize,
ordered: bool,
) -> Result<Option<Chunk>, ReadError> {
let result = self.streams.read(id, max_length, ordered);
self.post_read(id, &result);
Ok(result?.map(|x| x.result))
}

/// Read the next ordered chunks from the given recv stream
pub fn read_chunks(
&mut self,
id: StreamId,
bufs: &mut [Bytes],
) -> Result<Option<usize>, ReadError> {
let result = self.streams.read_chunks(id, bufs);
self.post_read(id, &result);
Ok(result?.map(|x| x.result.bufs))
}

fn post_read<T>(&mut self, id: StreamId, result: &streams::ReadResult<T>) {
let (did_read, max_data, max_stream_data) = match result {
Ok(Some(did)) => (true, did.max_data, did.max_stream_data),
_ => (false, ShouldTransmit::default(), ShouldTransmit::default()),
};

let max_dirty = self.streams.take_max_streams_dirty(id.dir());
if did_read || max_dirty {
self.spaces[SpaceId::Data]
.pending
.post_read(id, max_data, max_stream_data, max_dirty);
}
pub fn read(&mut self, id: StreamId, ordered: bool) -> Result<Chunks, ReadableError> {
self.streams
.read(id, ordered, &mut self.spaces[SpaceId::Data].pending)
}

/// Send data on the given stream
Expand Down
76 changes: 20 additions & 56 deletions quinn-proto/src/connection/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use std::{
mem,
};

use bytes::{BufMut, Bytes};
use bytes::BufMut;
use thiserror::Error;
use tracing::{debug, trace};

use super::assembler::Chunk;
use super::spaces::{Retransmits, ThinRetransmits};
use crate::{
coding::BufMutExt,
Expand All @@ -20,9 +19,8 @@ use crate::{
};

mod recv;
pub use recv::ReadError;
use recv::{BytesRead, ReadChunks, Recv, StreamReadResult};
pub(super) use recv::{DidRead, ReadResult};
use recv::Recv;
pub use recv::{Chunks, ReadError, ReadableError};

mod send;
pub use send::{FinishError, WriteError};
Expand Down Expand Up @@ -202,55 +200,13 @@ impl Streams {
self.connection_blocked.clear();
}

pub(crate) fn read(
&mut self,
pub(crate) fn read<'a>(
&'a mut self,
id: StreamId,
max_length: usize,
ordered: bool,
) -> ReadResult<Chunk> {
self.try_read(id, |rs| rs.read(max_length, ordered))
}

pub(crate) fn read_chunks(
&mut self,
id: StreamId,
bufs: &mut [Bytes],
) -> ReadResult<ReadChunks> {
self.try_read(id, |rs| rs.read_chunks(bufs))
}

fn try_read<F, O>(&mut self, id: StreamId, mut read: F) -> ReadResult<O>
where
F: FnMut(&mut Recv) -> StreamReadResult<O>,
O: BytesRead,
{
let mut entry = match self.recv.entry(id) {
hash_map::Entry::Vacant(_) => return Err(ReadError::UnknownStream),
hash_map::Entry::Occupied(e) => e,
};
let rs = entry.get_mut();
match read(rs) {
Ok(Some(out)) => {
let (_, max_stream_data) = rs.max_stream_data(self.stream_receive_window);
let max_data = self.add_read_credits(out.bytes_read());
Ok(Some(DidRead {
result: out,
max_stream_data,
max_data,
}))
}
Ok(None) => {
entry.remove_entry();
self.stream_freed(id, StreamHalf::Recv);
Ok(None)
}
Err(e @ ReadError::Reset { .. }) => {
entry.remove_entry();
self.stream_freed(id, StreamHalf::Recv);
Err(e)
}
Err(e) => Err(e),
}
pending: &'a mut Retransmits,
) -> Result<Chunks<'a>, ReadableError> {
Chunks::new(id, ordered, self, pending)
}

/// Queue `data` to be written for `stream`
Expand Down Expand Up @@ -1077,6 +1033,7 @@ enum StreamHalf {
mod tests {
use super::*;
use crate::TransportErrorCode;
use bytes::Bytes;

fn make(side: Side) -> Streams {
Streams::new(
Expand All @@ -1094,6 +1051,7 @@ mod tests {
let mut client = make(Side::Client);
let id = StreamId::new(Side::Server, Dir::Uni, 0);
let initial_max = client.local_max_data;
let mut pending = Retransmits::default();
assert_eq!(
client
.received(
Expand All @@ -1110,7 +1068,9 @@ mod tests {
);
assert_eq!(client.data_recvd, 2048);
assert_eq!(client.local_max_data - initial_max, 0);
client.read(id, 1024, true).unwrap();
let mut chunks = client.read(id, true, &mut pending).unwrap();
chunks.next(1024).unwrap();
let _ = chunks.finalize();
assert_eq!(client.local_max_data - initial_max, 1024);
assert_eq!(
client
Expand Down Expand Up @@ -1193,6 +1153,7 @@ mod tests {
fn recv_stopped() {
let mut client = make(Side::Client);
let id = StreamId::new(Side::Server, Dir::Uni, 0);
let mut pending = Retransmits::default();
let initial_max = client.local_max_data;
assert_eq!(
client
Expand All @@ -1217,10 +1178,13 @@ mod tests {
}
);
assert!(client.stop(id).is_err());
assert_eq!(client.read(id, 0, true), Err(ReadError::UnknownStream));
assert_eq!(
client.read(id, usize::MAX, false),
Err(ReadError::UnknownStream)
client.read(id, true, &mut pending).err(),
Some(ReadableError::UnknownStream)
);
assert_eq!(
client.read(id, false, &mut pending).err(),
Some(ReadableError::UnknownStream)
);
assert_eq!(client.local_max_data - initial_max, 32);
assert_eq!(
Expand Down
Loading

0 comments on commit 24cf82e

Please sign in to comment.