Skip to content

Commit a6314c0

Browse files
author
Matthias Einwag
committed
Add a Bytes array based send interface to quinn-proto
This change modifies quinn-proto to allow callers to submit a list of one or more owned `Bytes` chunks for transmission instead of pure byte slices. This will provide a more efficient zero-copy interface for applications which already make use of owned `Bytes` buffers. The internals of quinn-proto have been refactored in order to keep the ability to pass `&[u8]` buffers and defer the conversion into `Bytes` as long as possible, in order to avoid unnecessary allocations if no data can be stored due to flow control.
1 parent 04fcf9b commit a6314c0

File tree

5 files changed

+295
-22
lines changed

5 files changed

+295
-22
lines changed

quinn-proto/src/bytes_source.rs

+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//! Defines types which can be lazily converted into `Bytes` chunks
2+
3+
use bytes::Bytes;
4+
5+
/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
6+
///
7+
/// The purpose of this data type is to defer conversion as long as possible,
8+
/// so that no heap allocation is required in case no data is writable.
9+
pub trait BytesSource {
10+
/// Returns the next chunk from the source of owned chunks.
11+
///
12+
/// This method will consume parts of the source.
13+
/// Calling it will yield `Bytes` elements up to the configured `limit`.
14+
///
15+
/// The method returns a tuple:
16+
/// - The first item is the yielded `Bytes` element. The element will be
17+
/// empty if the limit is zero or no more data is available.
18+
/// - The second item returns how many complete chunks inside the source had
19+
/// had been consumed. This can be less than 1, if a chunk inside the
20+
/// source had been truncated in order to adhere to the limit. It can also
21+
/// be more than 1, if zero-length chunks had been skipped.
22+
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
23+
}
24+
25+
/// Indicates how many bytes and chunks had been transferred in a write operation
26+
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
27+
pub struct Written {
28+
/// The amount of bytes which had been written
29+
pub bytes: usize,
30+
/// The amount of full chunks which had been written
31+
///
32+
/// If a chunk was only partially written, it will not be counted by this field.
33+
pub chunks: usize,
34+
}
35+
36+
/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
37+
///
38+
/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
39+
/// a configured limit.
40+
pub struct BytesArray<'a> {
41+
/// The wrapped slice of `Bytes`
42+
chunks: &'a mut [Bytes],
43+
/// The amount of chunks consumed from this source
44+
consumed: usize,
45+
}
46+
47+
impl<'a> BytesArray<'a> {
48+
pub fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
49+
Self {
50+
chunks,
51+
consumed: 0,
52+
}
53+
}
54+
}
55+
56+
impl<'a> BytesSource for BytesArray<'a> {
57+
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
58+
// The loop exists to skip empty chunks while still marking them as
59+
// consumed
60+
let mut chunks_consumed = 0;
61+
62+
while self.consumed < self.chunks.len() {
63+
let chunk = &mut self.chunks[self.consumed];
64+
65+
if chunk.len() <= limit {
66+
let chunk = std::mem::take(chunk);
67+
self.consumed += 1;
68+
chunks_consumed += 1;
69+
if chunk.is_empty() {
70+
continue;
71+
}
72+
return (chunk, chunks_consumed);
73+
} else if limit > 0 {
74+
let chunk = chunk.split_to(limit);
75+
return (chunk, chunks_consumed);
76+
} else {
77+
break;
78+
}
79+
}
80+
81+
(Bytes::new(), chunks_consumed)
82+
}
83+
}
84+
85+
/// A [`BytesSource`] implementation for `&[u8]`
86+
///
87+
/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
88+
/// created from a reference. This allows to defer the allocation until it is
89+
/// known how much data needs to be copied.
90+
pub struct ByteSlice<'a> {
91+
/// The wrapped byte slice
92+
data: &'a [u8],
93+
}
94+
95+
impl<'a> ByteSlice<'a> {
96+
pub fn from_slice(data: &'a [u8]) -> Self {
97+
Self { data }
98+
}
99+
}
100+
101+
impl<'a> BytesSource for ByteSlice<'a> {
102+
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
103+
let limit = limit.min(self.data.len());
104+
if limit == 0 {
105+
return (Bytes::new(), 0);
106+
}
107+
108+
let chunk = Bytes::from(self.data[..limit].to_owned());
109+
self.data = &self.data[chunk.len()..];
110+
111+
let chunks_consumed = if self.data.is_empty() { 1 } else { 0 };
112+
(chunk, chunks_consumed)
113+
}
114+
}
115+
116+
#[cfg(test)]
117+
mod tests {
118+
use super::*;
119+
120+
#[test]
121+
fn bytes_array() {
122+
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
123+
for limit in 0..full.len() {
124+
let mut chunks = [
125+
Bytes::from_static(b""),
126+
Bytes::from_static(b"Hello "),
127+
Bytes::from_static(b"Wo"),
128+
Bytes::from_static(b""),
129+
Bytes::from_static(b"r"),
130+
Bytes::from_static(b"ld"),
131+
Bytes::from_static(b""),
132+
Bytes::from_static(b" 12345678"),
133+
Bytes::from_static(b"9 ABCDE"),
134+
Bytes::from_static(b"F"),
135+
Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
136+
];
137+
let num_chunks = chunks.len();
138+
let last_chunk_len = chunks[chunks.len() - 1].len();
139+
140+
let mut array = BytesArray::from_chunks(&mut chunks);
141+
142+
let mut buf = Vec::new();
143+
let mut chunks_popped = 0;
144+
let mut chunks_consumed = 0;
145+
let mut remaining = limit;
146+
loop {
147+
let (chunk, consumed) = array.pop_chunk(remaining);
148+
chunks_consumed += consumed;
149+
150+
if !chunk.is_empty() {
151+
buf.extend_from_slice(&chunk);
152+
remaining -= chunk.len();
153+
chunks_popped += 1;
154+
} else {
155+
break;
156+
}
157+
}
158+
159+
assert_eq!(&buf[..], &full[..limit]);
160+
161+
if limit == full.len() {
162+
// Full consumption of the last chunk
163+
assert_eq!(chunks_consumed, num_chunks);
164+
// Since there are empty chunks, we consume more than there are popped
165+
assert_eq!(chunks_consumed, chunks_popped + 3);
166+
} else if limit > full.len() - last_chunk_len {
167+
// Partial consumption of the last chunk
168+
assert_eq!(chunks_consumed, num_chunks - 1);
169+
assert_eq!(chunks_consumed, chunks_popped + 2);
170+
}
171+
}
172+
}
173+
174+
#[test]
175+
fn byte_slice() {
176+
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
177+
for limit in 0..full.len() {
178+
let mut array = ByteSlice::from_slice(&full[..]);
179+
180+
let mut buf = Vec::new();
181+
let mut chunks_popped = 0;
182+
let mut chunks_consumed = 0;
183+
let mut remaining = limit;
184+
loop {
185+
let (chunk, consumed) = array.pop_chunk(remaining);
186+
chunks_consumed += consumed;
187+
188+
if !chunk.is_empty() {
189+
buf.extend_from_slice(&chunk);
190+
remaining -= chunk.len();
191+
chunks_popped += 1;
192+
} else {
193+
break;
194+
}
195+
}
196+
197+
assert_eq!(&buf[..], &full[..limit]);
198+
if limit != 0 {
199+
assert_eq!(chunks_popped, 1);
200+
} else {
201+
assert_eq!(chunks_popped, 0);
202+
}
203+
204+
if limit == full.len() {
205+
assert_eq!(chunks_consumed, 1);
206+
} else {
207+
assert_eq!(chunks_consumed, 0);
208+
}
209+
}
210+
}
211+
}

quinn-proto/src/connection/mod.rs

+26-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use thiserror::Error;
1414
use tracing::{debug, error, trace, trace_span, warn};
1515

1616
use crate::{
17+
bytes_source::{ByteSlice, BytesArray, BytesSource},
1718
cid_generator::ConnectionIdGenerator,
1819
cid_queue::CidQueue,
1920
coding::BufMutExt,
@@ -29,7 +30,7 @@ use crate::{
2930
EndpointEventInner, IssuedCid,
3031
},
3132
transport_parameters::TransportParameters,
32-
Dir, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode, VarInt,
33+
Dir, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode, VarInt, Written,
3334
MAX_STREAM_COUNT, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, TIMER_GRANULARITY,
3435
};
3536

@@ -1128,6 +1129,30 @@ where
11281129
///
11291130
/// Returns the number of bytes successfully written.
11301131
pub fn write(&mut self, stream: StreamId, data: &[u8]) -> Result<usize, WriteError> {
1132+
let mut source = ByteSlice::from_slice(data);
1133+
Ok(self.write_source(stream, &mut source)?.bytes)
1134+
}
1135+
1136+
/// Send data on the given stream
1137+
///
1138+
/// Returns the number of bytes and chunks successfully written.
1139+
/// Note that this method might also write a partial chunk. In this case
1140+
/// [`Written::chunks`] will not count this chunk as fully written. However
1141+
/// the chunk will be advanced and contain only non-written data after the call.
1142+
pub fn write_chunks(
1143+
&mut self,
1144+
stream: StreamId,
1145+
data: &mut [Bytes],
1146+
) -> Result<Written, WriteError> {
1147+
let mut source = BytesArray::from_chunks(data);
1148+
Ok(self.write_source(stream, &mut source)?)
1149+
}
1150+
1151+
fn write_source<B: BytesSource>(
1152+
&mut self,
1153+
stream: StreamId,
1154+
data: &mut B,
1155+
) -> Result<Written, WriteError> {
11311156
assert!(stream.dir() == Dir::Bi || stream.initiator() == self.side);
11321157
if self.state.is_closed() {
11331158
trace!(%stream, "write blocked; connection draining");

quinn-proto/src/connection/streams.rs

+30-14
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ use tracing::{debug, trace};
1212
use super::assembler::Chunk;
1313
use super::spaces::{Retransmits, ThinRetransmits};
1414
use crate::{
15+
bytes_source::BytesSource,
1516
coding::BufMutExt,
1617
connection::stats::FrameStats,
1718
frame::{self, FrameStruct, StreamMetaVec},
1819
transport_parameters::TransportParameters,
19-
Dir, Side, StreamId, TransportError, VarInt, MAX_STREAM_COUNT,
20+
Dir, Side, StreamId, TransportError, VarInt, Written, MAX_STREAM_COUNT,
2021
};
2122

2223
mod recv;
@@ -254,7 +255,11 @@ impl Streams {
254255
}
255256

256257
/// Queue `data` to be written for `stream`
257-
pub fn write(&mut self, id: StreamId, data: &[u8]) -> Result<usize, WriteError> {
258+
pub fn write<S: BytesSource>(
259+
&mut self,
260+
id: StreamId,
261+
source: &mut S,
262+
) -> Result<Written, WriteError> {
258263
let limit = (self.max_data - self.data_sent).min(self.send_window - self.unacked_data);
259264
let stream = self.send.get_mut(&id).ok_or(WriteError::UnknownStream)?;
260265
if limit == 0 {
@@ -267,15 +272,14 @@ impl Streams {
267272
}
268273

269274
let was_pending = stream.is_pending();
270-
let len = (data.len() as u64).min(limit) as usize;
271-
let len = stream.write(&data[0..len])?;
272-
self.data_sent += len as u64;
273-
self.unacked_data += len as u64;
274-
trace!(stream = %id, "wrote {} bytes", len);
275+
let written = stream.write(source, limit)?;
276+
self.data_sent += written.bytes as u64;
277+
self.unacked_data += written.bytes as u64;
278+
trace!(stream = %id, "wrote {} bytes", written.bytes);
275279
if !was_pending {
276280
push_pending(&mut self.pending, id, stream.priority);
277281
}
278-
Ok(len)
282+
Ok(written)
279283
}
280284

281285
/// Process incoming stream frame
@@ -1068,7 +1072,7 @@ enum StreamHalf {
10681072
#[cfg(test)]
10691073
mod tests {
10701074
use super::*;
1071-
use crate::TransportErrorCode;
1075+
use crate::{connection::ByteSlice, TransportErrorCode};
10721076

10731077
fn make(side: Side) -> Streams {
10741078
Streams::new(
@@ -1286,9 +1290,15 @@ mod tests {
12861290
let id = server.open(Dir::Uni).unwrap();
12871291
let reason = 0u32.into();
12881292
server.received_stop_sending(id, reason);
1289-
assert_eq!(server.write(id, &[]), Err(WriteError::Stopped(reason)));
1293+
assert_eq!(
1294+
server.write(id, &mut ByteSlice::from_slice(&[])),
1295+
Err(WriteError::Stopped(reason))
1296+
);
12901297
server.reset(id).unwrap();
1291-
assert_eq!(server.write(id, &[]), Err(WriteError::UnknownStream));
1298+
assert_eq!(
1299+
server.write(id, &mut ByteSlice::from_slice(&[])),
1300+
Err(WriteError::UnknownStream)
1301+
);
12921302
}
12931303

12941304
#[test]
@@ -1321,9 +1331,15 @@ mod tests {
13211331
let id_low = server.open(Dir::Bi).unwrap();
13221332
server.set_priority(id_low, -1).unwrap();
13231333
server.set_priority(id_high, 1).unwrap();
1324-
server.write(id_mid, b"mid").unwrap();
1325-
server.write(id_low, b"low").unwrap();
1326-
server.write(id_high, b"high").unwrap();
1334+
server
1335+
.write(id_mid, &mut ByteSlice::from_slice(b"mid"))
1336+
.unwrap();
1337+
server
1338+
.write(id_low, &mut ByteSlice::from_slice(b"low"))
1339+
.unwrap();
1340+
server
1341+
.write(id_high, &mut ByteSlice::from_slice(b"high"))
1342+
.unwrap();
13271343
let mut buf = Vec::with_capacity(40);
13281344
let meta = server.write_stream_frames(&mut buf, 40);
13291345
assert_eq!(meta[0].id, id_high);

0 commit comments

Comments
 (0)