Skip to content

Commit e420412

Browse files
Matthias EinwagMatthias247
Matthias Einwag
authored andcommitted
Add a Bytes array based send interface to quinn-proto
This change modifies quinn-proto to allow callers to submit a list of one or more owned `Bytes` chunks for transmission instead of pure byte slices. This will provide a more efficient zero-copy interface for applications which already make use of owned `Bytes` buffers. The internals of quinn-proto have been refactored in order to keep the ability to pass `&[u8]` buffers and defer the conversion into `Bytes` as long as possible, in order to avoid unnecessary allocations if no data can be stored due to flow control.
1 parent d46d320 commit e420412

File tree

5 files changed

+302
-21
lines changed

5 files changed

+302
-21
lines changed

quinn-proto/src/bytes_source.rs

+231
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
//! Defines types which can be lazily converted into `Bytes` chunks
2+
3+
use bytes::Bytes;
4+
5+
/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
6+
///
7+
/// The purpose of this data type is to defer conversion as long as possible,
8+
/// so that no heap allocation is required in case no data is writable.
9+
pub trait BytesSource {
10+
// Returns the total amount of bytes to write
11+
fn len(&self) -> usize;
12+
/// Limits the source to the new size
13+
fn limit(&mut self, new_len: usize);
14+
/// The amount of bytes and chunks consumed from this source
15+
fn consumed(&self) -> Written;
16+
/// Returns the next chunk from the source of owned chunks.
17+
///
18+
/// This method will consume parts of the source.
19+
// Calling it will yield `Bytes` elements up to the configured `limit`.
20+
fn pop_chunk(&mut self) -> Option<Bytes>;
21+
}
22+
23+
/// Indicates how many bytes and chunks had been transferred in a write operation
24+
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
25+
pub struct Written {
26+
/// The amount of bytes which had been written
27+
pub bytes: usize,
28+
/// The amount of full chunks which had been written
29+
///
30+
/// If a chunk was only partially written, it will not be counted by this field.
31+
pub chunks: usize,
32+
}
33+
34+
/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
35+
///
36+
/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
37+
/// a configured limit.
38+
pub struct BytesArray<'a> {
39+
/// The wrapped slice of `Bytes`
40+
chunks: &'a mut [Bytes],
41+
/// The amount of bytes which are still available for consumption
42+
len: usize,
43+
/// The amount of bytes and chunks consumed from this source
44+
consumed: Written,
45+
}
46+
47+
impl<'a> BytesArray<'a> {
48+
pub fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
49+
let len = chunks.iter().map(|chunk| chunk.len()).sum();
50+
Self {
51+
chunks,
52+
len,
53+
consumed: Written::default(),
54+
}
55+
}
56+
}
57+
58+
impl<'a> BytesSource for BytesArray<'a> {
59+
fn len(&self) -> usize {
60+
self.len
61+
}
62+
63+
fn limit(&mut self, new_len: usize) {
64+
self.len = self.len.min(new_len);
65+
}
66+
67+
fn consumed(&self) -> Written {
68+
self.consumed
69+
}
70+
71+
fn pop_chunk(&mut self) -> Option<Bytes> {
72+
// The loop exists to skip empty chunks while still marking them as
73+
// consumed
74+
while self.consumed.chunks < self.chunks.len() {
75+
let chunk = &mut self.chunks[self.consumed.chunks];
76+
77+
if chunk.len() <= self.len {
78+
let chunk = std::mem::take(chunk);
79+
self.consumed.chunks += 1;
80+
if chunk.is_empty() {
81+
continue;
82+
}
83+
self.len -= chunk.len();
84+
self.consumed.bytes += chunk.len();
85+
return Some(chunk);
86+
} else if self.len > 0 {
87+
let chunk = chunk.split_to(self.len);
88+
self.len = 0;
89+
self.consumed.bytes += chunk.len();
90+
return Some(chunk);
91+
} else {
92+
return None;
93+
}
94+
}
95+
96+
None
97+
}
98+
}
99+
100+
/// A [`BytesSource`] implementation for `&[u8]`
101+
///
102+
/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
103+
/// created from a reference. This allows to defer the allocation until it is
104+
/// known how much data needs to be copied.
105+
pub struct ByteSlice<'a> {
106+
/// The wrapped byte slice
107+
data: &'a [u8],
108+
/// The amount of bytes which are still available for consumption
109+
len: usize,
110+
/// The amount of bytes and chunks consumed from this source
111+
consumed: Written,
112+
}
113+
114+
impl<'a> ByteSlice<'a> {
115+
pub fn from_slice(data: &'a [u8]) -> Self {
116+
let len = data.len();
117+
Self {
118+
data,
119+
len,
120+
consumed: Written::default(),
121+
}
122+
}
123+
}
124+
125+
impl<'a> BytesSource for ByteSlice<'a> {
126+
fn len(&self) -> usize {
127+
self.len
128+
}
129+
130+
fn limit(&mut self, new_len: usize) {
131+
self.len = self.len.min(new_len);
132+
}
133+
134+
fn consumed(&self) -> Written {
135+
self.consumed
136+
}
137+
138+
fn pop_chunk(&mut self) -> Option<Bytes> {
139+
if self.len == 0 {
140+
return None;
141+
}
142+
143+
let chunk = Bytes::from(self.data[..self.len].to_owned());
144+
self.consumed.bytes += chunk.len();
145+
if self.len == self.data.len() {
146+
self.consumed.chunks += 1;
147+
}
148+
self.len = 0;
149+
Some(chunk)
150+
}
151+
}
152+
153+
#[cfg(test)]
154+
mod tests {
155+
use super::*;
156+
157+
#[test]
158+
fn bytes_array() {
159+
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
160+
for limit in 0..full.len() {
161+
let mut chunks = [
162+
Bytes::from_static(b""),
163+
Bytes::from_static(b"Hello "),
164+
Bytes::from_static(b"Wo"),
165+
Bytes::from_static(b""),
166+
Bytes::from_static(b"r"),
167+
Bytes::from_static(b"ld"),
168+
Bytes::from_static(b""),
169+
Bytes::from_static(b" 12345678"),
170+
Bytes::from_static(b"9 ABCDE"),
171+
Bytes::from_static(b"F"),
172+
Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
173+
];
174+
let num_chunks = chunks.len();
175+
let last_chunk_len = chunks[chunks.len() - 1].len();
176+
177+
let mut array = BytesArray::from_chunks(&mut chunks);
178+
assert_eq!(array.len(), full.len());
179+
180+
array.limit(limit);
181+
assert_eq!(array.len(), limit);
182+
183+
let mut buf = Vec::new();
184+
let mut chunks_popped = 0;
185+
while let Some(chunk) = array.pop_chunk() {
186+
buf.extend_from_slice(&chunk);
187+
chunks_popped += 1;
188+
}
189+
190+
assert_eq!(&buf[..], &full[..limit]);
191+
assert_eq!(array.consumed().bytes, limit);
192+
193+
if limit == full.len() {
194+
// Full consumption of the last chunk
195+
assert_eq!(array.consumed().chunks, num_chunks);
196+
// Since there are empty chunks, we consume more than there are popped
197+
assert_eq!(array.consumed().chunks, chunks_popped + 3);
198+
} else if limit > full.len() - last_chunk_len {
199+
// Partial consumption of the last chunk
200+
assert_eq!(array.consumed().chunks, num_chunks - 1);
201+
assert_eq!(array.consumed().chunks, chunks_popped + 2);
202+
}
203+
}
204+
}
205+
206+
#[test]
207+
fn byte_slice() {
208+
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
209+
for limit in 0..full.len() {
210+
let mut array = ByteSlice::from_slice(&full[..]);
211+
assert_eq!(array.len(), full.len());
212+
213+
array.limit(limit);
214+
assert_eq!(array.len(), limit);
215+
216+
let mut buf = Vec::new();
217+
while let Some(chunk) = array.pop_chunk() {
218+
buf.extend_from_slice(&chunk);
219+
}
220+
221+
assert_eq!(&buf[..], &full[..limit]);
222+
assert_eq!(array.consumed().bytes, limit);
223+
224+
if limit == full.len() {
225+
assert_eq!(array.consumed().chunks, 1);
226+
} else {
227+
assert_eq!(array.consumed().chunks, 0);
228+
}
229+
}
230+
}
231+
}

quinn-proto/src/connection/mod.rs

+28-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use thiserror::Error;
1414
use tracing::{debug, error, trace, trace_span, warn};
1515

1616
use crate::{
17+
bytes_source::{ByteSlice, BytesArray, BytesSource},
1718
cid_generator::ConnectionIdGenerator,
1819
cid_queue::CidQueue,
1920
coding::BufMutExt,
@@ -29,7 +30,7 @@ use crate::{
2930
EndpointEventInner, IssuedCid,
3031
},
3132
transport_parameters::TransportParameters,
32-
Dir, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode, VarInt,
33+
Dir, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode, VarInt, Written,
3334
MAX_STREAM_COUNT, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, TIMER_GRANULARITY,
3435
};
3536

@@ -948,6 +949,32 @@ where
948949
///
949950
/// Returns the number of bytes successfully written.
950951
pub fn write(&mut self, stream: StreamId, data: &[u8]) -> Result<usize, WriteError> {
952+
let mut source = ByteSlice::from_slice(data);
953+
self.write_source(stream, &mut source)?;
954+
Ok(source.consumed().bytes)
955+
}
956+
957+
/// Send data on the given stream
958+
///
959+
/// Returns the number of bytes and chunks successfully written.
960+
/// Note that this method might also write a partial chunk. In this case
961+
/// [`Written::chunks`] will not count this chunk as fully written. However
962+
/// the chunk will be advanced and contain only non-written data after the call.
963+
pub fn write_chunks(
964+
&mut self,
965+
stream: StreamId,
966+
data: &mut [Bytes],
967+
) -> Result<Written, WriteError> {
968+
let mut source = BytesArray::from_chunks(data);
969+
self.write_source(stream, &mut source)?;
970+
Ok(source.consumed())
971+
}
972+
973+
fn write_source<B: BytesSource>(
974+
&mut self,
975+
stream: StreamId,
976+
data: &mut B,
977+
) -> Result<(), WriteError> {
951978
assert!(stream.dir() == Dir::Bi || stream.initiator() == self.side);
952979
if self.state.is_closed() {
953980
trace!(%stream, "write blocked; connection draining");

quinn-proto/src/connection/streams.rs

+31-13
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tracing::{debug, trace};
1212
use super::assembler::Chunk;
1313
use super::spaces::{Retransmits, ThinRetransmits};
1414
use crate::{
15+
bytes_source::BytesSource,
1516
coding::BufMutExt,
1617
connection::stats::FrameStats,
1718
frame::{self, FrameStruct, StreamMetaVec},
@@ -254,7 +255,11 @@ impl Streams {
254255
}
255256

256257
/// Queue `data` to be written for `stream`
257-
pub fn write(&mut self, id: StreamId, data: &[u8]) -> Result<usize, WriteError> {
258+
pub fn write<S: BytesSource>(
259+
&mut self,
260+
id: StreamId,
261+
source: &mut S,
262+
) -> Result<(), WriteError> {
258263
let limit = (self.max_data - self.data_sent).min(self.send_window - self.unacked_data);
259264
let stream = self.send.get_mut(&id).ok_or(WriteError::UnknownStream)?;
260265
if limit == 0 {
@@ -267,15 +272,16 @@ impl Streams {
267272
}
268273

269274
let was_pending = stream.is_pending();
270-
let len = (data.len() as u64).min(limit) as usize;
271-
let len = stream.write(&data[0..len])?;
272-
self.data_sent += len as u64;
273-
self.unacked_data += len as u64;
274-
trace!(stream = %id, "wrote {} bytes", len);
275+
source.limit(limit as usize);
276+
stream.write(source)?;
277+
let consumed = source.consumed().bytes as u64;
278+
self.data_sent += consumed;
279+
self.unacked_data += consumed;
280+
trace!(stream = %id, "wrote {} bytes", consumed);
275281
if !was_pending {
276282
push_pending(&mut self.pending, id, stream.priority);
277283
}
278-
Ok(len)
284+
Ok(())
279285
}
280286

281287
/// Process incoming stream frame
@@ -1068,7 +1074,7 @@ enum StreamHalf {
10681074
#[cfg(test)]
10691075
mod tests {
10701076
use super::*;
1071-
use crate::TransportErrorCode;
1077+
use crate::{connection::ByteSlice, TransportErrorCode};
10721078

10731079
fn make(side: Side) -> Streams {
10741080
Streams::new(
@@ -1286,9 +1292,15 @@ mod tests {
12861292
let id = server.open(Dir::Uni).unwrap();
12871293
let reason = 0u32.into();
12881294
server.received_stop_sending(id, reason);
1289-
assert_eq!(server.write(id, &[]), Err(WriteError::Stopped(reason)));
1295+
assert_eq!(
1296+
server.write(id, &mut ByteSlice::from_slice(&[])),
1297+
Err(WriteError::Stopped(reason))
1298+
);
12901299
server.reset(id).unwrap();
1291-
assert_eq!(server.write(id, &[]), Err(WriteError::UnknownStream));
1300+
assert_eq!(
1301+
server.write(id, &mut ByteSlice::from_slice(&[])),
1302+
Err(WriteError::UnknownStream)
1303+
);
12921304
}
12931305

12941306
#[test]
@@ -1321,9 +1333,15 @@ mod tests {
13211333
let id_low = server.open(Dir::Bi).unwrap();
13221334
server.set_priority(id_low, -1).unwrap();
13231335
server.set_priority(id_high, 1).unwrap();
1324-
server.write(id_mid, b"mid").unwrap();
1325-
server.write(id_low, b"low").unwrap();
1326-
server.write(id_high, b"high").unwrap();
1336+
server
1337+
.write(id_mid, &mut ByteSlice::from_slice(b"mid"))
1338+
.unwrap();
1339+
server
1340+
.write(id_low, &mut ByteSlice::from_slice(b"low"))
1341+
.unwrap();
1342+
server
1343+
.write(id_high, &mut ByteSlice::from_slice(b"high"))
1344+
.unwrap();
13271345
let mut buf = Vec::with_capacity(40);
13281346
let meta = server.write_stream_frames(&mut buf, 40);
13291347
assert_eq!(meta[0].id, id_high);

0 commit comments

Comments
 (0)