Skip to content

Commit 0020e1e

Browse files
Matthias Einwagdjc
Matthias Einwag
authored andcommitted
Add a Bytes array based send interface to quinn-proto
This change modifies quinn-proto to allow callers to submit a list of one or more owned `Bytes` chunks for transmission instead of pure byte slices. This will provide a more efficient zero-copy interface for applications which already make use of owned `Bytes` buffers. The internals of quinn-proto have been refactored in order to keep the ability to pass `&[u8]` buffers and defer the conversion into `Bytes` as long as possible, in order to avoid unnecessary allocations if no data can be stored due to flow control.
1 parent eb1911d commit 0020e1e

File tree

5 files changed

+295
-22
lines changed

5 files changed

+295
-22
lines changed

quinn-proto/src/bytes_source.rs

+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//! Defines types which can be lazily converted into `Bytes` chunks
2+
3+
use bytes::Bytes;
4+
5+
/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
6+
///
7+
/// The purpose of this data type is to defer conversion as long as possible,
8+
/// so that no heap allocation is required in case no data is writable.
9+
pub trait BytesSource {
10+
/// Returns the next chunk from the source of owned chunks.
11+
///
12+
/// This method will consume parts of the source.
13+
/// Calling it will yield `Bytes` elements up to the configured `limit`.
14+
///
15+
/// The method returns a tuple:
16+
/// - The first item is the yielded `Bytes` element. The element will be
17+
/// empty if the limit is zero or no more data is available.
18+
/// - The second item returns how many complete chunks inside the source had
19+
/// had been consumed. This can be less than 1, if a chunk inside the
20+
/// source had been truncated in order to adhere to the limit. It can also
21+
/// be more than 1, if zero-length chunks had been skipped.
22+
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
23+
}
24+
25+
/// Indicates how many bytes and chunks had been transferred in a write operation
26+
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
27+
pub struct Written {
28+
/// The amount of bytes which had been written
29+
pub bytes: usize,
30+
/// The amount of full chunks which had been written
31+
///
32+
/// If a chunk was only partially written, it will not be counted by this field.
33+
pub chunks: usize,
34+
}
35+
36+
/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
37+
///
38+
/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
39+
/// a configured limit.
40+
pub struct BytesArray<'a> {
41+
/// The wrapped slice of `Bytes`
42+
chunks: &'a mut [Bytes],
43+
/// The amount of chunks consumed from this source
44+
consumed: usize,
45+
}
46+
47+
impl<'a> BytesArray<'a> {
48+
pub fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
49+
Self {
50+
chunks,
51+
consumed: 0,
52+
}
53+
}
54+
}
55+
56+
impl<'a> BytesSource for BytesArray<'a> {
57+
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
58+
// The loop exists to skip empty chunks while still marking them as
59+
// consumed
60+
let mut chunks_consumed = 0;
61+
62+
while self.consumed < self.chunks.len() {
63+
let chunk = &mut self.chunks[self.consumed];
64+
65+
if chunk.len() <= limit {
66+
let chunk = std::mem::take(chunk);
67+
self.consumed += 1;
68+
chunks_consumed += 1;
69+
if chunk.is_empty() {
70+
continue;
71+
}
72+
return (chunk, chunks_consumed);
73+
} else if limit > 0 {
74+
let chunk = chunk.split_to(limit);
75+
return (chunk, chunks_consumed);
76+
} else {
77+
break;
78+
}
79+
}
80+
81+
(Bytes::new(), chunks_consumed)
82+
}
83+
}
84+
85+
/// A [`BytesSource`] implementation for `&[u8]`
86+
///
87+
/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
88+
/// created from a reference. This allows to defer the allocation until it is
89+
/// known how much data needs to be copied.
90+
pub struct ByteSlice<'a> {
91+
/// The wrapped byte slice
92+
data: &'a [u8],
93+
}
94+
95+
impl<'a> ByteSlice<'a> {
96+
pub fn from_slice(data: &'a [u8]) -> Self {
97+
Self { data }
98+
}
99+
}
100+
101+
impl<'a> BytesSource for ByteSlice<'a> {
102+
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
103+
let limit = limit.min(self.data.len());
104+
if limit == 0 {
105+
return (Bytes::new(), 0);
106+
}
107+
108+
let chunk = Bytes::from(self.data[..limit].to_owned());
109+
self.data = &self.data[chunk.len()..];
110+
111+
let chunks_consumed = if self.data.is_empty() { 1 } else { 0 };
112+
(chunk, chunks_consumed)
113+
}
114+
}
115+
116+
#[cfg(test)]
117+
mod tests {
118+
use super::*;
119+
120+
#[test]
121+
fn bytes_array() {
122+
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
123+
for limit in 0..full.len() {
124+
let mut chunks = [
125+
Bytes::from_static(b""),
126+
Bytes::from_static(b"Hello "),
127+
Bytes::from_static(b"Wo"),
128+
Bytes::from_static(b""),
129+
Bytes::from_static(b"r"),
130+
Bytes::from_static(b"ld"),
131+
Bytes::from_static(b""),
132+
Bytes::from_static(b" 12345678"),
133+
Bytes::from_static(b"9 ABCDE"),
134+
Bytes::from_static(b"F"),
135+
Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
136+
];
137+
let num_chunks = chunks.len();
138+
let last_chunk_len = chunks[chunks.len() - 1].len();
139+
140+
let mut array = BytesArray::from_chunks(&mut chunks);
141+
142+
let mut buf = Vec::new();
143+
let mut chunks_popped = 0;
144+
let mut chunks_consumed = 0;
145+
let mut remaining = limit;
146+
loop {
147+
let (chunk, consumed) = array.pop_chunk(remaining);
148+
chunks_consumed += consumed;
149+
150+
if !chunk.is_empty() {
151+
buf.extend_from_slice(&chunk);
152+
remaining -= chunk.len();
153+
chunks_popped += 1;
154+
} else {
155+
break;
156+
}
157+
}
158+
159+
assert_eq!(&buf[..], &full[..limit]);
160+
161+
if limit == full.len() {
162+
// Full consumption of the last chunk
163+
assert_eq!(chunks_consumed, num_chunks);
164+
// Since there are empty chunks, we consume more than there are popped
165+
assert_eq!(chunks_consumed, chunks_popped + 3);
166+
} else if limit > full.len() - last_chunk_len {
167+
// Partial consumption of the last chunk
168+
assert_eq!(chunks_consumed, num_chunks - 1);
169+
assert_eq!(chunks_consumed, chunks_popped + 2);
170+
}
171+
}
172+
}
173+
174+
#[test]
175+
fn byte_slice() {
176+
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
177+
for limit in 0..full.len() {
178+
let mut array = ByteSlice::from_slice(&full[..]);
179+
180+
let mut buf = Vec::new();
181+
let mut chunks_popped = 0;
182+
let mut chunks_consumed = 0;
183+
let mut remaining = limit;
184+
loop {
185+
let (chunk, consumed) = array.pop_chunk(remaining);
186+
chunks_consumed += consumed;
187+
188+
if !chunk.is_empty() {
189+
buf.extend_from_slice(&chunk);
190+
remaining -= chunk.len();
191+
chunks_popped += 1;
192+
} else {
193+
break;
194+
}
195+
}
196+
197+
assert_eq!(&buf[..], &full[..limit]);
198+
if limit != 0 {
199+
assert_eq!(chunks_popped, 1);
200+
} else {
201+
assert_eq!(chunks_popped, 0);
202+
}
203+
204+
if limit == full.len() {
205+
assert_eq!(chunks_consumed, 1);
206+
} else {
207+
assert_eq!(chunks_consumed, 0);
208+
}
209+
}
210+
}
211+
}

quinn-proto/src/connection/mod.rs

+26-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use thiserror::Error;
1414
use tracing::{debug, error, trace, trace_span, warn};
1515

1616
use crate::{
17+
bytes_source::{ByteSlice, BytesArray, BytesSource},
1718
cid_generator::ConnectionIdGenerator,
1819
cid_queue::CidQueue,
1920
coding::BufMutExt,
@@ -29,7 +30,7 @@ use crate::{
2930
EndpointEventInner, IssuedCid,
3031
},
3132
transport_parameters::TransportParameters,
32-
Dir, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode, VarInt,
33+
Dir, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode, VarInt, Written,
3334
MAX_STREAM_COUNT, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, TIMER_GRANULARITY,
3435
};
3536

@@ -1100,6 +1101,30 @@ where
11001101
///
11011102
/// Returns the number of bytes successfully written.
11021103
pub fn write(&mut self, stream: StreamId, data: &[u8]) -> Result<usize, WriteError> {
1104+
let mut source = ByteSlice::from_slice(data);
1105+
Ok(self.write_source(stream, &mut source)?.bytes)
1106+
}
1107+
1108+
/// Send data on the given stream
1109+
///
1110+
/// Returns the number of bytes and chunks successfully written.
1111+
/// Note that this method might also write a partial chunk. In this case
1112+
/// [`Written::chunks`] will not count this chunk as fully written. However
1113+
/// the chunk will be advanced and contain only non-written data after the call.
1114+
pub fn write_chunks(
1115+
&mut self,
1116+
stream: StreamId,
1117+
data: &mut [Bytes],
1118+
) -> Result<Written, WriteError> {
1119+
let mut source = BytesArray::from_chunks(data);
1120+
Ok(self.write_source(stream, &mut source)?)
1121+
}
1122+
1123+
fn write_source<B: BytesSource>(
1124+
&mut self,
1125+
stream: StreamId,
1126+
data: &mut B,
1127+
) -> Result<Written, WriteError> {
11031128
assert!(stream.dir() == Dir::Bi || stream.initiator() == self.side);
11041129
if self.state.is_closed() {
11051130
trace!(%stream, "write blocked; connection draining");

quinn-proto/src/connection/streams.rs

+30-14
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use tracing::{debug, trace};
1111

1212
use super::spaces::{Retransmits, ThinRetransmits};
1313
use crate::{
14+
bytes_source::BytesSource,
1415
coding::BufMutExt,
1516
connection::stats::FrameStats,
1617
frame::{self, FrameStruct, StreamMetaVec},
1718
transport_parameters::TransportParameters,
18-
Dir, Side, StreamId, TransportError, VarInt, MAX_STREAM_COUNT,
19+
Dir, Side, StreamId, TransportError, VarInt, Written, MAX_STREAM_COUNT,
1920
};
2021

2122
mod recv;
@@ -210,7 +211,11 @@ impl Streams {
210211
}
211212

212213
/// Queue `data` to be written for `stream`
213-
pub fn write(&mut self, id: StreamId, data: &[u8]) -> Result<usize, WriteError> {
214+
pub fn write<S: BytesSource>(
215+
&mut self,
216+
id: StreamId,
217+
source: &mut S,
218+
) -> Result<Written, WriteError> {
214219
let limit = (self.max_data - self.data_sent).min(self.send_window - self.unacked_data);
215220
let stream = self.send.get_mut(&id).ok_or(WriteError::UnknownStream)?;
216221
if limit == 0 {
@@ -223,15 +228,14 @@ impl Streams {
223228
}
224229

225230
let was_pending = stream.is_pending();
226-
let len = (data.len() as u64).min(limit) as usize;
227-
let len = stream.write(&data[0..len])?;
228-
self.data_sent += len as u64;
229-
self.unacked_data += len as u64;
230-
trace!(stream = %id, "wrote {} bytes", len);
231+
let written = stream.write(source, limit)?;
232+
self.data_sent += written.bytes as u64;
233+
self.unacked_data += written.bytes as u64;
234+
trace!(stream = %id, "wrote {} bytes", written.bytes);
231235
if !was_pending {
232236
push_pending(&mut self.pending, id, stream.priority);
233237
}
234-
Ok(len)
238+
Ok(written)
235239
}
236240

237241
/// Process incoming stream frame
@@ -1032,7 +1036,7 @@ enum StreamHalf {
10321036
#[cfg(test)]
10331037
mod tests {
10341038
use super::*;
1035-
use crate::TransportErrorCode;
1039+
use crate::{connection::ByteSlice, TransportErrorCode};
10361040
use bytes::Bytes;
10371041

10381042
fn make(side: Side) -> Streams {
@@ -1258,9 +1262,15 @@ mod tests {
12581262
let id = server.open(Dir::Uni).unwrap();
12591263
let reason = 0u32.into();
12601264
server.received_stop_sending(id, reason);
1261-
assert_eq!(server.write(id, &[]), Err(WriteError::Stopped(reason)));
1265+
assert_eq!(
1266+
server.write(id, &mut ByteSlice::from_slice(&[])),
1267+
Err(WriteError::Stopped(reason))
1268+
);
12621269
server.reset(id).unwrap();
1263-
assert_eq!(server.write(id, &[]), Err(WriteError::UnknownStream));
1270+
assert_eq!(
1271+
server.write(id, &mut ByteSlice::from_slice(&[])),
1272+
Err(WriteError::UnknownStream)
1273+
);
12641274
}
12651275

12661276
#[test]
@@ -1293,9 +1303,15 @@ mod tests {
12931303
let id_low = server.open(Dir::Bi).unwrap();
12941304
server.set_priority(id_low, -1).unwrap();
12951305
server.set_priority(id_high, 1).unwrap();
1296-
server.write(id_mid, b"mid").unwrap();
1297-
server.write(id_low, b"low").unwrap();
1298-
server.write(id_high, b"high").unwrap();
1306+
server
1307+
.write(id_mid, &mut ByteSlice::from_slice(b"mid"))
1308+
.unwrap();
1309+
server
1310+
.write(id_low, &mut ByteSlice::from_slice(b"low"))
1311+
.unwrap();
1312+
server
1313+
.write(id_high, &mut ByteSlice::from_slice(b"high"))
1314+
.unwrap();
12991315
let mut buf = Vec::with_capacity(40);
13001316
let meta = server.write_stream_frames(&mut buf, 40);
13011317
assert_eq!(meta[0].id, id_high);

0 commit comments

Comments
 (0)