Skip to content

Commit

Permalink
feat: use BytesMut for Transmit content
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and djc committed May 9, 2023
1 parent 37e7b74 commit 89b527c
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 38 deletions.
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use thiserror::Error;
use tracing::{debug, trace};

Expand Down Expand Up @@ -127,7 +127,7 @@ impl DatagramState {
Ok(was_empty)
}

pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
pub(super) fn write(&mut self, buf: &mut BytesMut, max_size: usize) -> bool {
let datagram = match self.outgoing.pop_front() {
Some(x) => x,
None => return false,
Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl Connection {
SpaceId::Data,
"PATH_CHALLENGE queued without 1-RTT keys"
);
let mut buf = Vec::with_capacity(self.path.current_mtu() as usize);
let mut buf = BytesMut::with_capacity(self.path.current_mtu() as usize);
let buf_capacity = self.path.current_mtu() as usize;

let mut builder = PacketBuilder::new(
Expand Down Expand Up @@ -500,7 +500,7 @@ impl Connection {
_ => false,
};

let mut buf = Vec::new();
let mut buf = BytesMut::new();
// Reserving capacity can provide more capacity than we asked for.
// However we are not allowed to write more than MTU size. Therefore
// the maximum capacity is tracked separately.
Expand Down Expand Up @@ -2809,7 +2809,7 @@ impl Connection {
fn populate_packet(
&mut self,
space_id: SpaceId,
buf: &mut Vec<u8>,
buf: &mut BytesMut,
max_size: usize,
) -> SentFrames {
let mut sent = SentFrames::default();
Expand Down Expand Up @@ -2981,7 +2981,7 @@ impl Connection {
receiving_ecn: bool,
sent: &mut SentFrames,
space: &mut PacketSpace,
buf: &mut Vec<u8>,
buf: &mut BytesMut,
stats: &mut ConnectionStats,
) {
debug_assert!(!space.pending_acks.ranges().is_empty());
Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Instant;

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use rand::Rng;
use tracing::{trace, trace_span};

Expand Down Expand Up @@ -32,7 +32,7 @@ impl PacketBuilder {
pub(super) fn new(
now: Instant,
space_id: SpaceId,
buffer: &mut Vec<u8>,
buffer: &mut BytesMut,
buffer_capacity: usize,
datagram_start: usize,
ack_eliciting: bool,
Expand Down Expand Up @@ -167,7 +167,7 @@ impl PacketBuilder {
now: Instant,
conn: &mut Connection,
sent: Option<SentFrames>,
buffer: &mut Vec<u8>,
buffer: &mut BytesMut,
) {
let ack_eliciting = self.ack_eliciting;
let exact_number = self.exact_number;
Expand Down Expand Up @@ -210,7 +210,7 @@ impl PacketBuilder {
}

/// Encrypt packet, returning the length of the packet and whether padding was added
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec<u8>) -> (usize, bool) {
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut BytesMut) -> (usize, bool) {
let pad = buffer.len() < self.min_size;
if pad {
trace!("PADDING * {}", self.min_size - buffer.len());
Expand Down
12 changes: 6 additions & 6 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
mem,
};

use bytes::BufMut;
use bytes::{BufMut, BytesMut};
use rustc_hash::FxHashMap;
use tracing::{debug, trace};

Expand Down Expand Up @@ -349,7 +349,7 @@ impl StreamsState {

pub(in crate::connection) fn write_control_frames(
&mut self,
buf: &mut Vec<u8>,
buf: &mut BytesMut,
pending: &mut Retransmits,
retransmits: &mut ThinRetransmits,
stats: &mut FrameStats,
Expand Down Expand Up @@ -475,7 +475,7 @@ impl StreamsState {

pub(crate) fn write_stream_frames(
&mut self,
buf: &mut Vec<u8>,
buf: &mut BytesMut,
max_buf_size: usize,
) -> StreamMetaVec {
let mut stream_frames = StreamMetaVec::new();
Expand Down Expand Up @@ -874,7 +874,7 @@ mod tests {
connection::State as ConnState, connection::Streams, ReadableError, RecvStream, SendStream,
TransportErrorCode, WriteError,
};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};

fn make(side: Side) -> StreamsState {
StreamsState::new(
Expand Down Expand Up @@ -1266,7 +1266,7 @@ mod tests {
high.set_priority(1).unwrap();
high.write(b"high").unwrap();

let mut buf = Vec::with_capacity(40);
let mut buf = BytesMut::with_capacity(40);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
Expand Down Expand Up @@ -1325,7 +1325,7 @@ mod tests {
};
high.set_priority(-1).unwrap();

let mut buf = Vec::with_capacity(1000);
let mut buf = BytesMut::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);
Expand Down
10 changes: 5 additions & 5 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl Endpoint {
}
trace!("sending version negotiation");
// Negotiate versions
let mut buf = Vec::<u8>::new();
let mut buf = BytesMut::new();
Header::VersionNegotiate {
random: self.rng.gen::<u8>() | 0x40,
src_cid: dst_cid,
Expand Down Expand Up @@ -340,15 +340,15 @@ impl Endpoint {
"sending stateless reset for {} to {}",
dst_cid, addresses.remote
);
let mut buf = Vec::<u8>::new();
let mut buf = BytesMut::new();
// Resets with at least this much padding can't possibly be distinguished from real packets
const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
max_padding_len
} else {
self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
};
buf.reserve_exact(padding_len + RESET_TOKEN_SIZE);
buf.reserve(padding_len + RESET_TOKEN_SIZE);
buf.resize(padding_len, 0);
self.rng.fill_bytes(&mut buf[0..padding_len]);
buf[0] = 0b0100_0000 | buf[0] >> 2;
Expand Down Expand Up @@ -538,7 +538,7 @@ impl Endpoint {
version,
};

let mut buf = Vec::new();
let mut buf = BytesMut::new();
let encode = header.encode(&mut buf);
buf.put_slice(&token);
buf.extend_from_slice(&server_config.crypto.retry_tag(version, &dst_cid, &buf));
Expand Down Expand Up @@ -689,7 +689,7 @@ impl Endpoint {
version,
};

let mut buf = Vec::<u8>::new();
let mut buf = BytesMut::new();
let partial_encode = header.encode(&mut buf);
let max_len =
INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
Expand Down
3 changes: 2 additions & 1 deletion quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod tests;
pub mod transport_parameters;
mod varint;

use bytes::BytesMut;
pub use varint::{VarInt, VarIntBoundsExceeded};

mod connection;
Expand Down Expand Up @@ -277,7 +278,7 @@ pub struct Transmit {
/// Explicit congestion notification bits to set on the packet
pub ecn: Option<EcnCodepoint>,
/// Contents of the datagram
pub contents: Vec<u8>,
pub contents: BytesMut,
/// The segment size if this transmission contains multiple datagrams.
/// This is `None` if the transmit only contains a single datagram
pub segment_size: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pub(crate) enum Header {
}

impl Header {
pub(crate) fn encode(&self, w: &mut Vec<u8>) -> PartialEncode {
pub(crate) fn encode(&self, w: &mut BytesMut) -> PartialEncode {
use self::Header::*;
let start = w.len();
match *self {
Expand Down Expand Up @@ -844,7 +844,7 @@ mod tests {

let dcid = ConnectionId::new(&hex!("06b858ec6f80452b"));
let client = initial_keys(Version::V1, &dcid, Side::Client);
let mut buf = Vec::new();
let mut buf = BytesMut::new();
let header = Header::Initial {
number: PacketNumber::U8(0),
src_cid: ConnectionId::new(&[]),
Expand Down Expand Up @@ -875,7 +875,7 @@ mod tests {

let server = initial_keys(Version::V1, &dcid, Side::Server);
let supported_versions = DEFAULT_SUPPORTED_VERSIONS.to_vec();
let decode = PartialDecode::new(buf.as_slice().into(), 0, &supported_versions, false)
let decode = PartialDecode::new(buf, 0, &supported_versions, false)
.unwrap()
.0;
let mut packet = decode.finish(Some(&*server.header.remote)).unwrap();
Expand Down
18 changes: 6 additions & 12 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ pub(super) struct TestEndpoint {
timeout: Option<Instant>,
pub(super) outbound: VecDeque<Transmit>,
delayed: VecDeque<Transmit>,
pub(super) inbound: VecDeque<(Instant, Option<EcnCodepoint>, Vec<u8>)>,
pub(super) inbound: VecDeque<(Instant, Option<EcnCodepoint>, BytesMut)>,
accepted: Option<ConnectionHandle>,
pub(super) connections: HashMap<ConnectionHandle, Connection>,
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionEvent>>,
Expand Down Expand Up @@ -305,10 +305,7 @@ impl TestEndpoint {

while self.inbound.front().map_or(false, |x| x.0 <= now) {
let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap();
if let Some((ch, event)) =
self.endpoint
.handle(recv_time, remote, None, ecn, packet.as_slice().into())
{
if let Some((ch, event)) = self.endpoint.handle(recv_time, remote, None, ecn, packet) {
match event {
DatagramEvent::NewConnection(conn) => {
self.connections.insert(ch, conn);
Expand Down Expand Up @@ -482,27 +479,24 @@ pub(super) fn min_opt<T: Ord>(x: Option<T>, y: Option<T>) -> Option<T> {
/// The maximum of datagrams TestEndpoint will produce via `poll_transmit`
const MAX_DATAGRAMS: usize = 10;

fn split_transmit(transmit: Transmit) -> Vec<Transmit> {
fn split_transmit(mut transmit: Transmit) -> Vec<Transmit> {
let segment_size = match transmit.segment_size {
Some(segment_size) => segment_size,
_ => return vec![transmit],
};

let mut offset = 0;
let mut transmits = Vec::new();
while offset < transmit.contents.len() {
let end = (offset + segment_size).min(transmit.contents.len());
while !transmit.contents.is_empty() {
let end = segment_size.min(transmit.contents.len());

let contents = transmit.contents[offset..end].to_vec();
let contents = transmit.contents.split_to(end);
transmits.push(Transmit {
destination: transmit.destination,
ecn: transmit.ecn,
contents,
segment_size: None,
src_ip: transmit.src_ip,
});

offset = end;
}

transmits
Expand Down
1 change: 1 addition & 0 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ log = ["tracing/log"]
maintenance = { status = "experimental" }

[dependencies]
bytes = "1"
libc = "0.2.113"
socket2 = "0.4" # 0.5.1 has an MSRV of 1.63
tracing = "0.1.10"
Expand Down
3 changes: 2 additions & 1 deletion quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
time::{Duration, Instant},
};

use bytes::BytesMut;
use tracing::warn;

#[cfg(unix)]
Expand Down Expand Up @@ -138,7 +139,7 @@ pub struct Transmit {
/// Explicit congestion notification bits to set on the packet
pub ecn: Option<EcnCodepoint>,
/// Contents of the datagram
pub contents: Vec<u8>,
pub contents: BytesMut,
/// The segment size if this transmission contains multiple datagrams.
/// This is `None` if the transmit only contains a single datagram
pub segment_size: Option<usize>,
Expand Down

0 comments on commit 89b527c

Please sign in to comment.