From 2babd07ff8ae997861514c942d8266da103ea53e Mon Sep 17 00:00:00 2001 From: Icelk Date: Thu, 14 Sep 2023 21:11:25 +0200 Subject: [PATCH] Fix tokio-uring & QUIC to use GSO & GRO properly. Requires 'quinn' to be closed at ../quinn & at the latest version --- Cargo.toml | 7 +- src/lib.rs | 12 ++- src/uring_udp.rs | 263 +++++++++++++++++++++++++++++++++-------------- 3 files changed, 200 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 08e0190..b643a94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,12 @@ futures-util = { version = "0.3", optional = true, default-features = false, fea # HTTP/3 h3 = { version = "0.0.2", optional = true } h3-quinn = { version = "0.0.3", optional = true } -quinn = { version = "0.10.1", default-features = false, features = ["tls-rustls", "log", "runtime-tokio"], optional = true } +quinn = { version = "0.10.2", default-features = false, features = ["tls-rustls", "log", "runtime-tokio"], optional = true } + +[replace] +"quinn-udp:0.4.0" = { path = "../quinn/quinn-udp" } +"quinn:0.10.2" = { path = "../quinn/quinn" } +"quinn-proto:0.10.4" = { path = "../quinn/quinn-proto" } [target.'cfg(unix)'.dependencies] libc = { version = "0.2", default-features = false } diff --git a/src/lib.rs b/src/lib.rs index efe5fa1..7c2b9ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -296,11 +296,13 @@ impl RunConfig { Some(h3_quinn::quinn::ServerConfig::with_crypto( descriptor.server_config.clone().unwrap(), )), - uring_udp::UringUdpSocket::new( - tokio_uring::net::UdpSocket::from_std(socket.into()), - address.is_ipv4(), - ) - .expect("failed to change socket settings"), + Arc::new( + uring_udp::UringUdpSocket::new( + tokio_uring::net::UdpSocket::from_std(socket.into()), + address.is_ipv4(), + ) + .expect("failed to change socket settings"), + ), h3_quinn::quinn::default_runtime().unwrap(), ) .unwrap(); diff --git a/src/uring_udp.rs b/src/uring_udp.rs index a0ad7b8..82b2a2c 100644 --- a/src/uring_udp.rs +++ b/src/uring_udp.rs @@ -1,5 +1,21 @@ #![allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] +//--------------------- +// A bunch of this file is straight up copied from +// https://github.com/quinn-rs/quinn/blob/main/quinn-udp/src/unix.rs +// and https://github.com/quinn-rs/quinn/blob/main/quinn-udp/src/cmsg.rs +// cross platform support is removed, since uring is only available on Linux +// +// Contains some slight modifications to simplify & make compatible. +// +// Mostly copied from the cfg(macos), since +// that code was simpler than the Linux code, +// and sufficient, since we don't have `sendmmesg`. +// +// Most comments are removed, so please see the original file for reference +// +// -------------------- + use std::cell::RefCell; use std::mem; use std::os::fd::AsRawFd; @@ -32,6 +48,13 @@ pub(crate) struct UringUdpSocket { >, socket: tokio_uring::net::UdpSocket, + + last_send_error: RefCell, + max_gso_segments: RefCell, + gro_segments: usize, + may_fragment: bool, + + sendmsg_einval: RefCell, } impl Debug for UringUdpSocket { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -40,34 +63,54 @@ impl Debug for UringUdpSocket { .field("sent", &*self.sent.borrow()) .field("recv_fut", &"[recv_fut]".as_clean()) .field("socket", &"[internal socket]".as_clean()) + .field("last_send_error", &*self.last_send_error.borrow()) + .field("max_gso_segments", &*self.max_gso_segments.borrow()) + .field("gro_segments", &self.gro_segments) + .field("may_fragment", &self.may_fragment) + .field("sendmsg_einval", &*self.sendmsg_einval.borrow()) .finish() } } impl UringUdpSocket { - pub(crate) fn new(socket: tokio_uring::net::UdpSocket, is_ipv4: bool) -> io::Result { - const OPTION_ON: libc::c_int = 1; - if let Err(err) = set_socket_option(&socket, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON) - { - info!("Ignoring error setting IP_RECVTOS on socket: {err:?}"); + pub(crate) fn new(io: tokio_uring::net::UdpSocket, is_ipv4: bool) -> io::Result { + let mut cmsg_platform_space = 0; + cmsg_platform_space += + unsafe { libc::CMSG_SPACE(mem::size_of::() as _) as usize }; + + assert!( + CMSG_LEN + >= unsafe { libc::CMSG_SPACE(mem::size_of::() as _) as usize } + + cmsg_platform_space + ); + assert!( + mem::align_of::() <= mem::align_of::>(), + "control message buffers will be misaligned" + ); + + if let Err(err) = set_socket_option(&io, libc::IPPROTO_IP, libc::IP_RECVTOS, OPTION_ON) { + debug!("Ignoring error setting IP_RECVTOS on socket: {err:?}",); } + let mut may_fragment = false; { // opportunistically try to enable GRO. See gro::gro_segments(). - let _ = set_socket_option(&socket, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON); + let _ = set_socket_option(&io, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON); // Forbid IPv4 fragmentation. Set even for IPv6 to account for IPv6 mapped IPv4 addresses. - set_socket_option( - &socket, + // Set `may_fragment` to `true` if this option is not supported on the platform. + may_fragment |= !set_socket_option_supported( + &io, libc::IPPROTO_IP, libc::IP_MTU_DISCOVER, libc::IP_PMTUDISC_PROBE, )?; if is_ipv4 { - set_socket_option(&socket, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?; + set_socket_option(&io, libc::IPPROTO_IP, libc::IP_PKTINFO, OPTION_ON)?; } else { - set_socket_option( - &socket, + // Set `may_fragment` to `true` if this option is not supported on the platform. + may_fragment |= !set_socket_option_supported( + &io, libc::IPPROTO_IPV6, libc::IPV6_MTU_DISCOVER, libc::IP_PMTUDISC_PROBE, @@ -76,65 +119,93 @@ impl UringUdpSocket { } // Options standardized in RFC 3542 if !is_ipv4 { - set_socket_option( - &socket, + set_socket_option(&io, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, OPTION_ON)?; + set_socket_option(&io, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, OPTION_ON)?; + may_fragment |= !set_socket_option_supported( + &io, libc::IPPROTO_IPV6, - libc::IPV6_RECVPKTINFO, + libc::IPV6_DONTFRAG, OPTION_ON, )?; - set_socket_option( - &socket, - libc::IPPROTO_IPV6, - libc::IPV6_RECVTCLASS, - OPTION_ON, - )?; - // Linux's IP_PMTUDISC_PROBE allows us to operate under interface MTU rather than the - // kernel's path MTU guess, but actually disabling fragmentation requires this too. See - // __ip6_append_data in ip6_output.c. - set_socket_option(&socket, libc::IPPROTO_IPV6, libc::IPV6_DONTFRAG, OPTION_ON)?; } + let now = Instant::now(); + Ok(Self { send_fut: RefCell::new(Vec::new()), sent: RefCell::new(0), recv_fut: RefCell::new(None), - socket, + socket: io, + + last_send_error: RefCell::new( + now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), + ), + max_gso_segments: RefCell::new(gso::max_gso_segments()), + gro_segments: gro::gro_segments(), + may_fragment, + sendmsg_einval: RefCell::new(false), }) } } // `tokio-uring` is a single-threaded runtime, therefore this will never be sent, anyway unsafe impl Send for UringUdpSocket {} +unsafe impl Sync for UringUdpSocket {} const CMSG_LEN: usize = 88; type IpTosTy = libc::c_int; +/// Log at most 1 IO error per minute +const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60); +const OPTION_ON: libc::c_int = 1; +// Chosen somewhat arbitrarily; might benefit from additional tuning. +const BATCH_SIZE: usize = 32; impl quinn::AsyncUdpSocket for UringUdpSocket { fn poll_send( &self, - _state: &quinn::udp::UdpState, cx: &mut Context, transmits: &[quinn::udp::Transmit], ) -> Poll> { // in brackets so RefCell borrow doesn't leak into recursion { + let num_transmits = transmits.len().min(BATCH_SIZE); // check stored futures let mut vec = self.send_fut.borrow_mut(); let vec = &mut *vec; let empty = vec.is_empty(); // `TODO`: some prettier error handling? - let mut error = None; + let mut has_error = false; vec.retain_mut(|fut| { let poll = fut.poll_unpin(cx); let is_pending = poll.is_pending(); - if let Poll::Ready((Err(err), _, _)) = poll { - error = Some(err); + if let Poll::Ready((Err(e), _, _)) = poll { + if let Some(libc::EIO | libc::EINVAL) = e.raw_os_error() { + // Prevent new transmits from being scheduled using GSO. Existing GSO transmits + // may already be in the pipeline, so we need to tolerate additional failures. + if *self.max_gso_segments.borrow() > 1 { + error!("Your network card doesn't support certain optimizations (GSO or GRO)."); + self.max_gso_segments + .replace(1); + } + } + + if e.raw_os_error() == Some(libc::EINVAL) { + // Some arguments to `sendmsg` are not supported. + // Switch to fallback mode. + self.sendmsg_einval + .replace(true); + } + + if e.raw_os_error() != Some(libc::EMSGSIZE) { + log_sendmsg_error(&mut *self.last_send_error.borrow_mut(), &e, &transmits[0]); + } + has_error = true; } is_pending }); if !empty { - if let Some(err) = error { - return Poll::Ready(Err(err)); + if has_error { + return Poll::Ready(Ok(num_transmits.min(1))); } // became empty, everything is complete if vec.is_empty() { @@ -151,7 +222,14 @@ impl quinn::AsyncUdpSocket for UringUdpSocket { while sent < transmits.len() { let addr = socket2::SockAddr::from(transmits[sent].destination); - prepare_msg(&transmits[sent], &addr, &mut hdr, &mut iov, &mut ctrl); + prepare_msg( + &transmits[sent], + &addr, + &mut hdr, + &mut iov, + &mut ctrl, + *self.sendmsg_einval.borrow(), + ); let fut = self.socket.sendmsg( vec![transmits[sent].contents.clone()], Some(transmits[sent].destination), @@ -167,7 +245,7 @@ impl quinn::AsyncUdpSocket for UringUdpSocket { *self.sent.borrow_mut() = sent; } // make sure we actually poll the newly created future - self.poll_send(_state, cx, transmits) + self.poll_send(cx, transmits) } fn poll_recv( @@ -242,7 +320,13 @@ impl quinn::AsyncUdpSocket for UringUdpSocket { self.socket.local_addr() } fn may_fragment(&self) -> bool { - false + self.may_fragment + } + fn max_transmit_segments(&self) -> usize { + *self.max_gso_segments.borrow() + } + fn max_receive_segments(&self) -> usize { + self.gro_segments } } @@ -252,27 +336,13 @@ unsafe fn future_to_static_lifetime<'a, T>( mem::transmute(fut) } -//--------------------- -// The rest of this -// code is copied from -// https://github.com/quinn-rs/quinn/blob/main/quinn-udp/src/unix.rs -// and https://github.com/quinn-rs/quinn/blob/main/quinn-udp/src/cmsg.rs -// cross platform support is negligible, since uring is only available on Linux -// -// Contains some slight modifications to simplify & make compatible. -// -// Mostly copied from the cfg(macos), since -// that code was simpler than the Linux code, -// and sufficient, since we don't have `sendmmesg`. -// -// -------------------- - fn prepare_msg( transmit: &Transmit, dst_addr: &socket2::SockAddr, hdr: &mut libc::msghdr, iov: &mut libc::iovec, ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>, + sendmsg_einval: bool, ) { iov.iov_base = transmit.contents.as_ptr() as *mut _; iov.iov_len = transmit.contents.len(); @@ -294,20 +364,18 @@ fn prepare_msg( let mut encoder = unsafe { cmsg::Encoder::new(hdr) }; let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int); if transmit.destination.is_ipv4() { - // if !sendmsg_einval { - // `TODO`: maybe breaks things - // see https://github.com/quinn-rs/quinn/issues/1609 - encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy); - // } + if !sendmsg_einval { + encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy); + } } else { encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn); } // apparently, setting this might cause the UDP to fail to send... - // if let Some(segment_size) = transmit.segment_size { - // gso::set_segment_size(&mut encoder, segment_size as u16); - // } + if let Some(segment_size) = transmit.segment_size { + gso::set_segment_size(&mut encoder, segment_size as u16); + } if let Some(ip) = &transmit.src_ip { match ip { @@ -377,6 +445,54 @@ fn decode_recv(addr: SocketAddr, hdr: &libc::msghdr, len: usize) -> RecvMeta { } } +fn set_socket_option( + socket: &impl AsRawFd, + level: libc::c_int, + name: libc::c_int, + value: libc::c_int, +) -> Result<(), io::Error> { + let rc = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + level, + name, + ptr::addr_of!(value).cast(), + mem::size_of_val(&value) as _, + ) + }; + + if rc == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } +} +fn set_socket_option_supported( + socket: &impl AsRawFd, + level: libc::c_int, + name: libc::c_int, + value: libc::c_int, +) -> Result { + match set_socket_option(socket, level, name, value) { + Ok(()) => Ok(true), + Err(err) if err.raw_os_error() == Some(libc::ENOPROTOOPT) => Ok(false), + Err(err) => Err(err), + } +} +fn log_sendmsg_error( + last_send_error: &mut Instant, + err: impl core::fmt::Debug, + transmit: &Transmit, +) { + let now = Instant::now(); + if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL { + *last_send_error = now; + warn!( + "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}", + err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size); + } +} + mod gso { use super::{cmsg, set_socket_option}; @@ -405,26 +521,21 @@ mod gso { encoder.push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size); } } -fn set_socket_option( - socket: &impl AsRawFd, - level: libc::c_int, - name: libc::c_int, - value: libc::c_int, -) -> Result<(), io::Error> { - let rc = unsafe { - libc::setsockopt( - socket.as_raw_fd(), - level, - name, - ptr::addr_of!(value).cast(), - mem::size_of_val(&value) as _, - ) - }; +mod gro { + use super::{set_socket_option, OPTION_ON}; - if rc == 0 { - Ok(()) - } else { - Err(io::Error::last_os_error()) + pub(crate) fn gro_segments() -> usize { + let socket = match std::net::UdpSocket::bind("[::]:0") + .or_else(|_| std::net::UdpSocket::bind("127.0.0.1:0")) + { + Ok(socket) => socket, + Err(_) => return 1, + }; + + match set_socket_option(&socket, libc::SOL_UDP, libc::UDP_GRO, OPTION_ON) { + Ok(()) => 64, + Err(_) => 1, + } } }