Skip to content

Commit 6eeda35

Browse files
committed
Read multiple chunks into a slice of Bytes
1 parent ff0b4e9 commit 6eeda35

File tree

4 files changed

+181
-1
lines changed

4 files changed

+181
-1
lines changed

quinn-proto/src/connection/mod.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,7 @@ where
868868
///
869869
/// While stream data is typically processed by applications in-order, unordered reads improve
870870
/// performance when packet loss occurs and data cannot be retransmitted before the flow control
871-
/// window is filled. When in-order delivery is required, the sibling `read()` or `read_chunk()`
871+
/// window is filled. When in-order delivery is required, the sibling `read()` or `read_chunk[s]()`
872872
/// methods should be used.
873873
///
874874
/// The return value if `Ok` contains the bytes and their offset in the stream.
@@ -887,6 +887,18 @@ where
887887
}))
888888
}
889889

890+
/// Read the next ordered chunks from the given recv stream
891+
pub fn read_chunks(
892+
&mut self,
893+
id: StreamId,
894+
bufs: &mut [Bytes],
895+
) -> Result<Option<usize>, ReadError> {
896+
Ok(self.streams.read_chunks(id, bufs)?.map(|result| {
897+
self.add_read_credits(id, result.max_stream_data, result.max_data);
898+
result.n_bufs
899+
}))
900+
}
901+
890902
/// Read from the given recv stream
891903
pub fn read(&mut self, id: StreamId, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
892904
Ok(self.streams.read(id, buf)?.map(|result| {

quinn-proto/src/connection/streams.rs

+48
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,23 @@ impl Streams {
213213
}))
214214
}
215215

216+
pub fn read_chunks(
217+
&mut self,
218+
id: StreamId,
219+
bufs: &mut [Bytes],
220+
) -> Result<Option<ReadChunksResult>, ReadError> {
221+
let res = self.try_read(id, |rs: &mut Recv| rs.read_chunks(bufs))?;
222+
Ok(res.map(|(n_bufs, max_stream_data)| {
223+
let len = bufs[..n_bufs].iter().fold(0, |t, b| t + b.len());
224+
let max_data = self.add_read_credits(len as u64);
225+
ReadChunksResult {
226+
n_bufs,
227+
max_stream_data,
228+
max_data,
229+
}
230+
}))
231+
}
232+
216233
fn try_read<T, U>(
217234
&mut self,
218235
id: StreamId,
@@ -1019,6 +1036,15 @@ pub struct ReadChunkResult {
10191036
pub max_data: ShouldTransmit,
10201037
}
10211038

1039+
/// Result of a `Streams::read_chunks` call in case the stream had not ended yet
1040+
#[derive(Debug, Eq, PartialEq)]
1041+
#[must_use = "A frame might need to be enqueued"]
1042+
pub struct ReadChunksResult {
1043+
pub n_bufs: usize,
1044+
pub max_stream_data: ShouldTransmit,
1045+
pub max_data: ShouldTransmit,
1046+
}
1047+
10221048
/// Result of a `Streams::read` call in case the stream had not ended yet
10231049
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
10241050
#[must_use = "A frame might need to be enqueued"]
@@ -1147,6 +1173,28 @@ impl Recv {
11471173
}
11481174
}
11491175

1176+
fn read_chunks(&mut self, chunks: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
1177+
if chunks.is_empty() {
1178+
return Ok(Some(0));
1179+
}
1180+
1181+
let mut size = 0;
1182+
while let Some(bytes) = self.assembler.read_chunk()? {
1183+
chunks[size] = bytes;
1184+
size += 1;
1185+
1186+
if size >= chunks.len() {
1187+
return Ok(Some(size));
1188+
}
1189+
}
1190+
1191+
if size > 0 {
1192+
return Ok(Some(size));
1193+
}
1194+
1195+
self.read_blocked().map(|()| None)
1196+
}
1197+
11501198
fn read_blocked(&mut self) -> Result<(), ReadError> {
11511199
match self.state {
11521200
RecvState::ResetRecvd { error_code, .. } => {

quinn-proto/src/tests/mod.rs

+77
Original file line numberDiff line numberDiff line change
@@ -1632,3 +1632,80 @@ fn repeated_request_response() {
16321632
assert_matches!(pair.client_conn_mut(client_ch).read_unordered(s), Ok(None));
16331633
}
16341634
}
1635+
1636+
#[test]
1637+
fn read_chunks() {
1638+
let _guard = subscribe();
1639+
let server = ServerConfig {
1640+
transport: Arc::new(TransportConfig {
1641+
stream_window_bidi: 1u32.into(),
1642+
..TransportConfig::default()
1643+
}),
1644+
..server_config()
1645+
};
1646+
let mut pair = Pair::new(Default::default(), server);
1647+
let (client_ch, server_ch) = pair.connect();
1648+
let mut empty = vec![];
1649+
let mut chunks = vec![Bytes::new(), Bytes::new()];
1650+
const ONE: &[u8] = b"ONE";
1651+
const TWO: &[u8] = b"TWO";
1652+
const THREE: &[u8] = b"THREE";
1653+
for _ in 0..3 {
1654+
let s = pair.client_conn_mut(client_ch).open(Dir::Bi).unwrap();
1655+
1656+
pair.client_conn_mut(client_ch).write(s, ONE).unwrap();
1657+
pair.drive();
1658+
pair.client_conn_mut(client_ch).write(s, TWO).unwrap();
1659+
pair.drive();
1660+
pair.client_conn_mut(client_ch).write(s, THREE).unwrap();
1661+
1662+
pair.drive();
1663+
1664+
assert_eq!(pair.server_conn_mut(server_ch).accept(Dir::Bi), Some(s));
1665+
1666+
// Read into an empty slice can't do much you, but doesn't crash
1667+
assert_eq!(
1668+
pair.server_conn_mut(server_ch).read_chunks(s, &mut empty),
1669+
Ok(Some(0))
1670+
);
1671+
1672+
// Read until `chunks` is filled
1673+
assert_eq!(
1674+
pair.server_conn_mut(server_ch).read_chunks(s, &mut chunks),
1675+
Ok(Some(2))
1676+
);
1677+
assert_eq!(&chunks, &[ONE, TWO]);
1678+
1679+
// Read the rest
1680+
assert_eq!(
1681+
pair.server_conn_mut(server_ch).read_chunks(s, &mut chunks),
1682+
Ok(Some(1))
1683+
);
1684+
assert_eq!(&chunks[..1], &[THREE]);
1685+
1686+
// We've read everything, stream is now blocked
1687+
assert_eq!(
1688+
pair.server_conn_mut(server_ch).read_chunks(s, &mut chunks),
1689+
Err(ReadError::Blocked)
1690+
);
1691+
1692+
// Read a new chunk after we've been blocked
1693+
pair.client_conn_mut(client_ch).write(s, ONE).unwrap();
1694+
pair.drive();
1695+
assert_eq!(
1696+
pair.server_conn_mut(server_ch).read_chunks(s, &mut chunks),
1697+
Ok(Some(1))
1698+
);
1699+
assert_eq!(&chunks[..1], &[ONE]);
1700+
1701+
// Stream finishes by yeilding `Ok(None)`
1702+
pair.client_conn_mut(client_ch).finish(s).unwrap();
1703+
pair.drive();
1704+
assert_matches!(
1705+
pair.server_conn_mut(server_ch).read_chunks(s, &mut chunks),
1706+
Ok(None)
1707+
);
1708+
1709+
pair.drive();
1710+
}
1711+
}

quinn/src/streams.rs

+43
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,27 @@ where
389389
self.poll_read_generic(cx, |conn, stream| conn.inner.read_chunk(stream))
390390
}
391391

392+
/// Read the next segments of data
393+
///
394+
/// Fills `bufs` with the segments of data beginning immediately after the
395+
/// last data yielded by `read` or `read_chunk`, or `None` if the stream was
396+
/// finished.
397+
///
398+
/// Slightly more efficient than `read` due to not copying. Chunk boundaries
399+
/// do not correspond to peer writes, and hence cannot be used as framing.
400+
pub fn read_chunks<'a>(&'a mut self, bufs: &'a mut [Bytes]) -> ReadChunks<'a, S> {
401+
ReadChunks { stream: self, bufs }
402+
}
403+
404+
/// Foundation of [`read_chunks()`]: RecvStream::read_chunks
405+
fn poll_read_chunks(
406+
&mut self,
407+
cx: &mut Context,
408+
bufs: &mut [Bytes],
409+
) -> Poll<Result<Option<usize>, ReadError>> {
410+
self.poll_read_generic(cx, |conn, stream| conn.inner.read_chunks(stream, bufs))
411+
}
412+
392413
/// Convenience method to read all remaining data into a buffer
393414
///
394415
/// The returned future fails with [`ReadToEndError::TooLong`] if it's longer than `size_limit`
@@ -795,6 +816,28 @@ where
795816
}
796817
}
797818

819+
/// Future produced by [`RecvStream::read_chunks()`].
820+
///
821+
/// [`RecvStream::read_chunks()`]: crate::generic::RecvStream::read_chunks
822+
pub struct ReadChunks<'a, S>
823+
where
824+
S: proto::crypto::Session,
825+
{
826+
stream: &'a mut RecvStream<S>,
827+
bufs: &'a mut [Bytes],
828+
}
829+
830+
impl<'a, S> Future for ReadChunks<'a, S>
831+
where
832+
S: proto::crypto::Session,
833+
{
834+
type Output = Result<Option<usize>, ReadError>;
835+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
836+
let this = self.get_mut();
837+
this.stream.poll_read_chunks(cx, this.bufs)
838+
}
839+
}
840+
798841
/// Future produced by [`SendStream::write()`].
799842
///
800843
/// [`SendStream::write()`]: crate::generic::SendStream::write

0 commit comments

Comments
 (0)