From c6bff197b236441109d271e9c3d16e6e4c457b3e Mon Sep 17 00:00:00 2001 From: Centaurus99 Date: Wed, 28 Feb 2024 11:26:15 +0000 Subject: [PATCH 1/2] feat(device): add packet queues to bandwidth device (#6) --- rattan-cli/src/docker.rs | 10 +- rattan-cli/src/main.rs | 13 +- rattan/benches/bandwidth.rs | 10 +- rattan/src/devices/bandwidth.rs | 205 --------- rattan/src/devices/bandwidth/mod.rs | 286 +++++++++++++ rattan/src/devices/bandwidth/queue.rs | 444 ++++++++++++++++++++ rattan/src/devices/delay.rs | 53 ++- rattan/src/devices/mod.rs | 19 +- rattan/src/env.rs | 2 +- rattan/src/metal/io.rs | 3 +- rattan/src/metal/timer.rs | 5 + rattan/src/utils/sync.rs | 7 + rattan/tests/external/af_packet.rs | 1 - rattan/tests/integration/bandwidth.rs | 571 +++++++++++++++++++++++++- rattan/tests/integration/compound.rs | 10 +- rattan/tests/integration/delay.rs | 2 + rattan/tests/integration/http.rs | 26 +- rattan/tests/integration/loss.rs | 5 +- 18 files changed, 1407 insertions(+), 265 deletions(-) delete mode 100644 rattan/src/devices/bandwidth.rs create mode 100644 rattan/src/devices/bandwidth/mod.rs create mode 100644 rattan/src/devices/bandwidth/queue.rs diff --git a/rattan-cli/src/docker.rs b/rattan-cli/src/docker.rs index c071308..d9e3ea0 100644 --- a/rattan-cli/src/docker.rs +++ b/rattan-cli/src/docker.rs @@ -9,7 +9,7 @@ use rand::{rngs::StdRng, SeedableRng}; use rattan::{ core::{RattanMachine, RattanMachineConfig}, devices::{ - bandwidth::{BwDevice, BwDeviceConfig}, + bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}, external::VirtualEthernet, }, env::get_container_env, @@ -169,15 +169,15 @@ pub fn docker_main(opts: CommandArgs) -> anyhow::Result<()> { let mut left_fd = vec![left_device_rx]; let mut right_fd = vec![right_device_rx]; if let Some(bandwidth) = bandwidth { - let left_bw_device = BwDevice::::new(); - let right_bw_device = BwDevice::::new(); + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); let left_bw_ctl = left_bw_device.control_interface(); let right_bw_ctl = right_bw_device.control_interface(); left_bw_ctl - .set_config(BwDeviceConfig::new(bandwidth)) + .set_config(BwDeviceConfig::new(bandwidth, None)) .unwrap(); right_bw_ctl - .set_config(BwDeviceConfig::new(bandwidth)) + .set_config(BwDeviceConfig::new(bandwidth, None)) .unwrap(); let (left_bw_rx, left_bw_tx) = machine.add_device(left_bw_device); info!(left_bw_rx, left_bw_tx); diff --git a/rattan-cli/src/main.rs b/rattan-cli/src/main.rs index c518e73..db54cf0 100644 --- a/rattan-cli/src/main.rs +++ b/rattan-cli/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use rand::rngs::StdRng; use rand::SeedableRng; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig}; +use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}; use rattan::devices::delay::{DelayDevice, DelayDeviceConfig}; use rattan::devices::external::VirtualEthernet; use rattan::devices::loss::{IIDLossDevice, IIDLossDeviceConfig}; @@ -93,15 +93,15 @@ fn main() { let mut left_fd = vec![left_device_rx]; let mut right_fd = vec![right_device_rx]; if let Some(bandwidth) = bandwidth { - let left_bw_device = BwDevice::::new(); - let right_bw_device = BwDevice::::new(); + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); let left_bw_ctl = left_bw_device.control_interface(); let right_bw_ctl = right_bw_device.control_interface(); left_bw_ctl - .set_config(BwDeviceConfig::new(bandwidth)) + .set_config(BwDeviceConfig::new(bandwidth, None)) .unwrap(); right_bw_ctl - .set_config(BwDeviceConfig::new(bandwidth)) + .set_config(BwDeviceConfig::new(bandwidth, None)) .unwrap(); let (left_bw_rx, left_bw_tx) = machine.add_device(left_bw_device); info!(left_bw_rx, left_bw_tx); @@ -187,7 +187,8 @@ fn main() { let output = handle.wait_with_output().unwrap(); let stdout = String::from_utf8(output.stdout).unwrap(); stdout.contains("time=") - }; match res { + }; + match res { true => { info!("ping test passed"); left_ns.enter().unwrap(); diff --git a/rattan/benches/bandwidth.rs b/rattan/benches/bandwidth.rs index 6dc8580..5a1a766 100644 --- a/rattan/benches/bandwidth.rs +++ b/rattan/benches/bandwidth.rs @@ -3,7 +3,11 @@ use std::{sync::Arc, thread::JoinHandle}; use criterion::{criterion_group, criterion_main, Criterion}; use rattan::{ core::{RattanMachine, RattanMachineConfig}, - devices::{bandwidth::BwDevice, external::VirtualEthernet, StdPacket}, + devices::{ + bandwidth::{queue::InfiniteQueue, BwDevice, MAX_BANDWIDTH}, + external::VirtualEthernet, + StdPacket, + }, env::{get_std_env, StdNetEnvConfig}, metal::{io::AfPacketDriver, netns::NetNs}, }; @@ -26,8 +30,8 @@ fn prepare_env() -> (JoinHandle<()>, CancellationToken, Arc, Arc) .unwrap(); runtime.block_on(async move { - let left_bw_device = BwDevice::::new(); - let right_bw_device = BwDevice::::new(); + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); let left_device = VirtualEthernet::::new(_std_env.left_pair.right.clone()); let right_device = diff --git a/rattan/src/devices/bandwidth.rs b/rattan/src/devices/bandwidth.rs deleted file mode 100644 index 78c3d39..0000000 --- a/rattan/src/devices/bandwidth.rs +++ /dev/null @@ -1,205 +0,0 @@ -use crate::devices::{Device, Packet}; -use crate::error::Error; -use crate::metal::timer::Timer; -use crate::utils::sync::AtomicRawCell; -use async_trait::async_trait; -use netem_trace::{Bandwidth, Delay}; -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::time::Instant; -use tracing::{debug, info}; - -use super::{ControlInterface, Egress, Ingress}; - -const MAX_BANDWIDTH: Bandwidth = Bandwidth::from_bps(u64::MAX); - -#[derive(Debug)] -struct BwPacket

-where - P: Packet, -{ - ingress_time: Instant, - packet: P, -} - -pub struct BwDeviceIngress

-where - P: Packet, -{ - ingress: mpsc::UnboundedSender>, -} - -impl

Clone for BwDeviceIngress

-where - P: Packet, -{ - fn clone(&self) -> Self { - Self { - ingress: self.ingress.clone(), - } - } -} - -impl

Ingress

for BwDeviceIngress

-where - P: Packet + Send, -{ - fn enqueue(&self, packet: P) -> Result<(), Error> { - // XXX(minhuw): handle possible error here - self.ingress - .send(BwPacket { - ingress_time: Instant::now(), - packet, - }) - .unwrap(); - Ok(()) - } -} - -// Requires the bandwidth to be less than 2^64 bps -fn transfer_time(length: usize, bandwidth: Bandwidth) -> Delay { - let bits = length * 8; - let capacity = bandwidth.as_bps() as u64; - let seconds = bits as f64 / capacity as f64; - Delay::from_secs_f64(seconds) -} - -pub struct BwDeviceEgress

-where - P: Packet, -{ - egress: mpsc::UnboundedReceiver>, - bandwidth: Arc>, - inner_bandwidth: Box, - next_available: Instant, - timer: Timer, -} - -#[async_trait] -impl

Egress

for BwDeviceEgress

-where - P: Packet + Send + Sync, -{ - async fn dequeue(&mut self) -> Option

{ - let packet = self.egress.recv().await.unwrap(); - if let Some(bandwidth) = self.bandwidth.swap_null() { - self.inner_bandwidth = bandwidth; - debug!(?self.inner_bandwidth, "Set inner bandwidth:"); - } - let now = Instant::now(); - if packet.ingress_time >= self.next_available { - // no need to wait, since the packet arrives after next_available - let transfer_time = transfer_time(packet.packet.length(), *self.inner_bandwidth); - self.next_available = packet.ingress_time + transfer_time; - } else if now >= self.next_available { - // the current time is already after next_available - let transfer_time = transfer_time(packet.packet.length(), *self.inner_bandwidth); - self.next_available += transfer_time; - } else { - // wait until next_available - let wait_time = self.next_available - now; - self.timer.sleep(wait_time).await.unwrap(); - let transfer_time = transfer_time(packet.packet.length(), *self.inner_bandwidth); - self.next_available += transfer_time; - } - Some(packet.packet) - } -} - -#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] -#[derive(Debug)] -pub struct BwDeviceConfig { - bandwidth: Bandwidth, -} - -impl BwDeviceConfig { - pub fn new>(bandwidth: T) -> Self { - Self { - bandwidth: bandwidth.into(), - } - } -} - -pub struct BwDeviceControlInterface { - bandwidth: Arc>, -} - -impl ControlInterface for BwDeviceControlInterface { - type Config = BwDeviceConfig; - - fn set_config(&self, config: Self::Config) -> Result<(), Error> { - if config.bandwidth > MAX_BANDWIDTH { - return Err(Error::ConfigError( - "Bandwidth should be less than 2^64 bps".to_string(), - )); - } - info!("Setting bandwidth to: {:?}", config.bandwidth); - self.bandwidth.store(Box::new(config.bandwidth)); - Ok(()) - } -} - -pub struct BwDevice { - ingress: Arc>, - egress: BwDeviceEgress

, - control_interface: Arc, -} - -impl

Device

for BwDevice

-where - P: Packet + Send + Sync + 'static, -{ - type IngressType = BwDeviceIngress

; - type EgressType = BwDeviceEgress

; - type ControlInterfaceType = BwDeviceControlInterface; - - fn sender(&self) -> Arc { - self.ingress.clone() - } - - fn receiver(&mut self) -> &mut Self::EgressType { - &mut self.egress - } - - fn into_receiver(self) -> Self::EgressType { - self.egress - } - - fn control_interface(&self) -> Arc { - self.control_interface.clone() - } -} - -impl

BwDevice

-where - P: Packet, -{ - pub fn new() -> BwDevice

{ - debug!("New BwDevice"); - let (rx, tx) = mpsc::unbounded_channel(); - let bandwidth = Arc::new(AtomicRawCell::new(Box::new(Bandwidth::from_bps(u64::MAX)))); - BwDevice { - ingress: Arc::new(BwDeviceIngress { ingress: rx }), - egress: BwDeviceEgress { - egress: tx, - bandwidth: Arc::clone(&bandwidth), - inner_bandwidth: Box::new(Bandwidth::from_bps(u64::MAX)), - next_available: Instant::now(), - timer: Timer::new().unwrap(), - }, - control_interface: Arc::new(BwDeviceControlInterface { bandwidth }), - } - } -} - -impl

Default for BwDevice

-where - P: Packet, -{ - fn default() -> Self { - Self::new() - } -} diff --git a/rattan/src/devices/bandwidth/mod.rs b/rattan/src/devices/bandwidth/mod.rs new file mode 100644 index 0000000..4444071 --- /dev/null +++ b/rattan/src/devices/bandwidth/mod.rs @@ -0,0 +1,286 @@ +use crate::devices::bandwidth::queue::PacketQueue; +use crate::devices::{Device, Packet}; +use crate::error::Error; +use crate::metal::timer::Timer; +use async_trait::async_trait; +use netem_trace::{Bandwidth, Delay}; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tracing::{debug, info}; + +use super::{ControlInterface, Egress, Ingress}; + +pub mod queue; + +pub const MAX_BANDWIDTH: Bandwidth = Bandwidth::from_bps(u64::MAX); + +pub struct BwDeviceIngress

+where + P: Packet, +{ + ingress: mpsc::UnboundedSender

, +} + +impl

Clone for BwDeviceIngress

+where + P: Packet, +{ + fn clone(&self) -> Self { + Self { + ingress: self.ingress.clone(), + } + } +} + +impl

Ingress

for BwDeviceIngress

+where + P: Packet + Send, +{ + fn enqueue(&self, mut packet: P) -> Result<(), Error> { + packet.set_timestamp(Instant::now()); + // XXX(minhuw): handle possible error here + self.ingress.send(packet).unwrap(); + Ok(()) + } +} + +// Requires the bandwidth to be less than 2^64 bps +fn transfer_time(length: usize, bandwidth: Bandwidth) -> Delay { + let bits = length * 8; + let capacity = bandwidth.as_bps() as u64; + let seconds = bits as f64 / capacity as f64; + Delay::from_secs_f64(seconds) +} + +pub struct BwDeviceEgress +where + P: Packet, + Q: PacketQueue

, +{ + egress: mpsc::UnboundedReceiver

, + bandwidth: Bandwidth, + packet_queue: Q, + next_available: Instant, + config_rx: mpsc::UnboundedReceiver>, + timer: Timer, +} + +impl BwDeviceEgress +where + P: Packet + Send + Sync, + Q: PacketQueue

, +{ + fn set_config(&mut self, config: BwDeviceConfig) { + if let Some(bandwidth) = config.bandwidth { + debug!( + "Previous next_available distance: {:?}", + self.next_available - Instant::now() + ); + self.next_available = self.next_available + + (self.next_available - Instant::now()) + .mul_f64(self.bandwidth.as_bps() as f64 / bandwidth.as_bps() as f64); + debug!( + before = ?self.bandwidth, + after = ?bandwidth, + "Set inner bandwidth:" + ); + debug!( + "Now next_available distance: {:?}", + self.next_available - Instant::now() + ); + self.bandwidth = bandwidth; + } + if let Some(queue_config) = config.queue_config { + debug!(?queue_config, "Set inner queue config:"); + self.packet_queue.configure(queue_config); + } + } +} + +#[async_trait] +impl Egress

for BwDeviceEgress +where + P: Packet + Send + Sync, + Q: PacketQueue

, +{ + async fn dequeue(&mut self) -> Option

{ + // wait until next_available + loop { + tokio::select! { + biased; + Some(config) = self.config_rx.recv() => { + self.set_config(config); + } + _ = self.timer.sleep(self.next_available - Instant::now()) => { + break; + } + } + } + // process the packets received during sleep time + while let Ok(new_packet) = self.egress.try_recv() { + self.packet_queue.enqueue(new_packet); + } + + let mut packet = self.packet_queue.dequeue(); + while packet.is_none() { + // the queue is empty, wait for the next packet + tokio::select! { + biased; + Some(config) = self.config_rx.recv() => { + self.set_config(config); + } + recv_packet = self.egress.recv() => { + match recv_packet { + Some(new_packet) => { + self.packet_queue.enqueue(new_packet); + packet = self.packet_queue.dequeue(); + } + None => { + // channel closed + return None; + } + } + } + } + } + + // send the packet + let packet = packet.unwrap(); + let transfer_time = transfer_time(packet.length(), self.bandwidth); + if packet.get_timestamp() >= self.next_available { + // the packet arrives after next_available + self.next_available = packet.get_timestamp() + transfer_time; + } else { + // the packet arrives before next_available and now >= self.next_available + self.next_available += transfer_time; + } + Some(packet) + } +} + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] +#[derive(Debug)] +pub struct BwDeviceConfig +where + P: Packet, + Q: PacketQueue

, +{ + bandwidth: Option, + queue_config: Option, +} + +impl BwDeviceConfig +where + P: Packet, + Q: PacketQueue

, +{ + pub fn new>, U: Into>>( + bandwidth: T, + queue_config: U, + ) -> Self { + Self { + bandwidth: bandwidth.into(), + queue_config: queue_config.into(), + } + } +} + +pub struct BwDeviceControlInterface +where + P: Packet, + Q: PacketQueue

, +{ + config_tx: mpsc::UnboundedSender>, +} + +impl ControlInterface for BwDeviceControlInterface +where + P: Packet, + Q: PacketQueue

+ 'static, +{ + type Config = BwDeviceConfig; + + fn set_config(&self, config: Self::Config) -> Result<(), Error> { + if config.bandwidth.is_none() && config.queue_config.is_none() { + // This ensures that incorrect HTTP requests will return errors. + return Err(Error::ConfigError( + "At least one of bandwidth and queue_config should be set".to_string(), + )); + } + if let Some(bandwidth) = config.bandwidth { + if bandwidth > MAX_BANDWIDTH { + return Err(Error::ConfigError( + "Bandwidth should be less than 2^64 bps".to_string(), + )); + } + info!("Setting bandwidth to: {:?}", bandwidth); + } + if let Some(queue_config) = config.queue_config.as_ref() { + info!("Setting queue config to: {:?}", queue_config); + } + self.config_tx + .send(config) + .map_err(|_| Error::ConfigError("Control channel is closed.".to_string()))?; + Ok(()) + } +} + +pub struct BwDevice> { + ingress: Arc>, + egress: BwDeviceEgress, + control_interface: Arc>, +} + +impl Device

for BwDevice +where + P: Packet + Send + Sync + 'static, + Q: PacketQueue

+ 'static, +{ + type IngressType = BwDeviceIngress

; + type EgressType = BwDeviceEgress; + type ControlInterfaceType = BwDeviceControlInterface; + + fn sender(&self) -> Arc { + self.ingress.clone() + } + + fn receiver(&mut self) -> &mut Self::EgressType { + &mut self.egress + } + + fn into_receiver(self) -> Self::EgressType { + self.egress + } + + fn control_interface(&self) -> Arc { + self.control_interface.clone() + } +} + +impl BwDevice +where + P: Packet, + Q: PacketQueue

, +{ + pub fn new(bandwidth: Bandwidth, packet_queue: Q) -> BwDevice { + debug!("New BwDevice"); + let (rx, tx) = mpsc::unbounded_channel(); + let (config_tx, config_rx) = mpsc::unbounded_channel(); + BwDevice { + ingress: Arc::new(BwDeviceIngress { ingress: rx }), + egress: BwDeviceEgress { + egress: tx, + bandwidth, + packet_queue, + next_available: Instant::now(), + config_rx, + timer: Timer::new().unwrap(), + }, + control_interface: Arc::new(BwDeviceControlInterface { config_tx }), + } + } +} diff --git a/rattan/src/devices/bandwidth/queue.rs b/rattan/src/devices/bandwidth/queue.rs new file mode 100644 index 0000000..a2c7f77 --- /dev/null +++ b/rattan/src/devices/bandwidth/queue.rs @@ -0,0 +1,444 @@ +use crate::devices::Packet; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::fmt::Debug; +use tokio::time::{Duration, Instant}; +use tracing::trace; + +pub trait PacketQueue

: Send +where + P: Packet, +{ + #[cfg(feature = "serde")] + type Config: for<'a> Deserialize<'a> + Serialize + Send + Debug; + #[cfg(not(feature = "serde"))] + type Config: Send + Debug; + + fn configure(&mut self, config: Self::Config); + + fn enqueue(&mut self, packet: P); + + // If the queue is empty, return `None` + fn dequeue(&mut self) -> Option

; +} + +#[derive(Debug)] +pub struct InfiniteQueue

{ + queue: VecDeque

, +} + +impl

InfiniteQueue

{ + pub fn new() -> Self { + Self { + queue: VecDeque::new(), + } + } +} + +impl

Default for InfiniteQueue

{ + fn default() -> Self { + Self::new() + } +} + +impl

PacketQueue

for InfiniteQueue

+where + P: Packet, +{ + type Config = (); + + fn configure(&mut self, _config: Self::Config) {} + + fn enqueue(&mut self, packet: P) { + self.queue.push_back(packet); + } + + fn dequeue(&mut self) -> Option

{ + self.queue.pop_front() + } +} + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] +#[derive(Debug)] +pub struct DropTailQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, +} + +impl DropTailQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + ) -> Self { + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + } + } +} + +#[derive(Debug)] +pub struct DropTailQueue

{ + queue: VecDeque

, + packet_limit: Option, + byte_limit: Option, + now_bytes: usize, +} + +impl

DropTailQueue

{ + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + ) -> Self { + Self { + queue: VecDeque::new(), + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + now_bytes: 0, + } + } +} + +impl

PacketQueue

for DropTailQueue

+where + P: Packet, +{ + type Config = DropTailQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.packet_limit = config.packet_limit; + self.byte_limit = config.byte_limit; + } + + fn enqueue(&mut self, packet: P) { + if self + .packet_limit + .map_or(true, |limit| self.queue.len() < limit) + && self + .byte_limit + .map_or(true, |limit| self.now_bytes + packet.length() <= limit) + { + self.now_bytes += packet.length(); + self.queue.push_back(packet); + } else { + trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(len: {}) when enqueue", packet.length() + ); + } + } + + fn dequeue(&mut self) -> Option

{ + match self.queue.pop_front() { + Some(packet) => { + self.now_bytes -= packet.length(); + Some(packet) + } + None => None, + } + } +} + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] +#[derive(Debug)] +pub struct DropHeadQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, +} + +impl DropHeadQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + ) -> Self { + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + } + } +} + +#[derive(Debug)] +pub struct DropHeadQueue

{ + queue: VecDeque

, + packet_limit: Option, + byte_limit: Option, + now_bytes: usize, +} + +impl

DropHeadQueue

{ + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + ) -> Self { + Self { + queue: VecDeque::new(), + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + now_bytes: 0, + } + } +} + +impl

PacketQueue

for DropHeadQueue

+where + P: Packet, +{ + type Config = DropHeadQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.packet_limit = config.packet_limit; + self.byte_limit = config.byte_limit; + } + + fn enqueue(&mut self, packet: P) { + self.now_bytes += packet.length(); + self.queue.push_back(packet); + while self + .packet_limit + .map_or(false, |limit| self.queue.len() > limit) + || self + .byte_limit + .map_or(false, |limit| self.now_bytes > limit) + { + let packet = self.dequeue().unwrap(); + trace!( + after_queue_len = self.queue.len(), + after_now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(len: {}) when enqueue another packet", packet.length() + ) + } + } + + fn dequeue(&mut self) -> Option

{ + match self.queue.pop_front() { + Some(packet) => { + self.now_bytes -= packet.length(); + Some(packet) + } + None => None, + } + } +} + +// CODEL Queue Implementation Reference: +// https://github.com/torvalds/linux/blob/v6.6/include/net/codel.h +// https://github.com/ravinet/mahimahi/blob/0bd12164388bc109bbbd8ffa03a09e94adcbec5a/src/packet/codel_packet_queue.cc + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] +#[derive(Debug)] +pub struct CODELQueueConfig { + pub packet_limit: Option, // the maximum number of packets in the queue, or None for unlimited + pub byte_limit: Option, // the maximum number of bytes in the queue, or None for unlimited + pub interval: Duration, // width of moving time window + pub target: Duration, // target queue delay + pub mtu: u32, // device MTU, or minimal queue backlog in bytes +} + +impl CODELQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + interval: Duration, + target: Duration, + mtu: u32, + ) -> Self { + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + interval, + target, + mtu, + } + } +} +#[derive(Debug)] +pub struct CODELQueue

{ + queue: VecDeque

, + config: CODELQueueConfig, + now_bytes: usize, // the current number of bytes in the queue + + count: u32, // how many drops we've done since the last time we entered dropping state + lastcount: u32, // count at entry to dropping state + dropping: bool, // set to true if in dropping state + first_above_time: Option, // when we went (or will go) continuously above target for interval + drop_next: Instant, // time to drop next packet, or when we dropped last + ldelay: Duration, // sojourn time of last dequeued packet +} + +impl

CODELQueue

{ + pub fn new(config: CODELQueueConfig) -> Self { + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + count: 0, + lastcount: 0, + dropping: false, + first_above_time: None, + drop_next: Instant::now(), + ldelay: Duration::ZERO, + } + } +} + +impl

CODELQueue

+where + P: Packet, +{ + fn should_drop(&mut self, packet: &P) -> bool { + self.ldelay = Instant::now() - packet.get_timestamp(); + if self.ldelay < self.config.target || self.now_bytes <= self.config.mtu as usize { + self.first_above_time = None; + false + } else { + let mut ok_to_drop = false; + match self.first_above_time { + Some(first_above_time) => { + if Instant::now() >= first_above_time { + ok_to_drop = true; + } + } + None => { + self.first_above_time = Some(Instant::now() + self.config.interval); + } + } + ok_to_drop + } + } + + fn control_law(&self, t: Instant) -> Instant { + t + self.config.interval.div_f64(f64::sqrt(self.count as f64)) + } +} + +impl

PacketQueue

for CODELQueue

+where + P: Packet, +{ + type Config = CODELQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn enqueue(&mut self, packet: P) { + if self + .config + .packet_limit + .map_or(true, |limit| self.queue.len() < limit) + && self + .config + .byte_limit + .map_or(true, |limit| self.now_bytes + packet.length() <= limit) + { + self.now_bytes += packet.length(); + self.queue.push_back(packet); + } else { + trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(len: {}) when enqueue", packet.length() + ); + } + } + + fn dequeue(&mut self) -> Option

{ + match self.queue.pop_front() { + Some(mut packet) => { + self.now_bytes -= packet.length(); + let now = Instant::now(); + let drop = self.should_drop(&packet); + trace!( + drop, + ldelay = ?self.ldelay, + count = self.count, + lastcount = self.lastcount, + dropping = self.dropping, + first_above_time_from_now = ?self.first_above_time.map(|t| t - Instant::now()), + drop_next_from_now = ?self.drop_next - Instant::now(), + after_queue_len = self.queue.len(), + after_now_bytes = self.now_bytes, + "dequeueing a new packet" + ); + if self.dropping { + if !drop { + self.dropping = false; + trace!("Exit dropping state since packet should not drop"); + } else { + while self.dropping && now >= self.drop_next { + self.count += 1; + trace!( + ldelay = ?self.ldelay, + count = self.count, + after_queue_len = self.queue.len(), + after_now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(len: {}) since should drop and now >= self.drop_next", packet.length() + ); + let new_packet = self.queue.pop_front(); + if new_packet.is_none() { + self.dropping = false; + trace!("Exit dropping state since queue is empty"); + return None; + } + packet = new_packet.unwrap(); + self.now_bytes -= packet.length(); + + if self.should_drop(&packet) { + self.drop_next = self.control_law(self.drop_next); + trace!(drop_next_from_now = ?self.drop_next - Instant::now()); + } else { + self.dropping = false; + trace!("Exit dropping state since packet should not drop"); + } + } + } + } else if drop { + trace!( + ldelay = ?self.ldelay, + after_queue_len = self.queue.len(), + after_now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(len: {}) as the first", packet.length() + ); + let new_packet = self.queue.pop_front(); + if new_packet.is_none() { + self.dropping = false; + trace!("Exit dropping state since queue is empty"); + return None; + } + packet = new_packet.unwrap(); + self.now_bytes -= packet.length(); + + self.dropping = true; + let delta = self.count - self.lastcount; + if delta > 1 && now - self.drop_next < 16 * self.config.interval { + self.count = delta; + } else { + self.count = 1; + } + self.lastcount = self.count; + self.drop_next = self.control_law(now); + trace!( + count = self.count, + delta, + drop_next_from_now = ?self.drop_next - Instant::now(), + "Enter dropping state" + ); + } + Some(packet) + } + None => { + self.dropping = false; + trace!("Exit dropping state since queue is empty"); + None + } + } + } +} diff --git a/rattan/src/devices/delay.rs b/rattan/src/devices/delay.rs index 0b15997..f92040a 100644 --- a/rattan/src/devices/delay.rs +++ b/rattan/src/devices/delay.rs @@ -1,7 +1,6 @@ use crate::devices::{Device, Packet}; use crate::error::Error; use crate::metal::timer::Timer; -use crate::utils::sync::AtomicRawCell; use async_trait::async_trait; use netem_trace::Delay; #[cfg(feature = "serde")] @@ -62,11 +61,25 @@ where P: Packet, { egress: mpsc::UnboundedReceiver>, - delay: Arc>, - inner_delay: Box, + delay: Delay, + config_rx: mpsc::UnboundedReceiver, timer: Timer, } +impl

DelayDeviceEgress

+where + P: Packet + Send + Sync, +{ + fn set_config(&mut self, config: DelayDeviceConfig) { + debug!( + before = ?self.delay, + after = ?config.delay, + "Set inner delay:" + ); + self.delay = config.delay; + } +} + #[async_trait] impl

Egress

for DelayDeviceEgress

where @@ -74,16 +87,16 @@ where { async fn dequeue(&mut self) -> Option

{ let packet = self.egress.recv().await.unwrap(); - if let Some(delay) = self.delay.swap_null() { - self.inner_delay = delay; - debug!(?self.inner_delay, "Set inner delay:"); - } - let queuing_delay = Instant::now() - packet.ingress_time; - if queuing_delay < *self.inner_delay { - self.timer - .sleep(*self.inner_delay - queuing_delay) - .await - .unwrap(); + loop { + tokio::select! { + biased; + Some(config) = self.config_rx.recv() => { + self.set_config(config); + } + _ = self.timer.sleep(packet.ingress_time + self.delay - Instant::now()) => { + break; + } + } } Some(packet.packet) } @@ -104,7 +117,7 @@ impl DelayDeviceConfig { } pub struct DelayDeviceControlInterface { - delay: Arc>, + config_tx: mpsc::UnboundedSender, } impl ControlInterface for DelayDeviceControlInterface { @@ -112,7 +125,9 @@ impl ControlInterface for DelayDeviceControlInterface { fn set_config(&self, config: Self::Config) -> Result<(), Error> { info!("Setting delay to {:?}", config.delay); - self.delay.store(Box::new(config.delay)); + self.config_tx + .send(config) + .map_err(|_| Error::ConfigError("Control channel is closed.".to_string()))?; Ok(()) } } @@ -155,16 +170,16 @@ where pub fn new() -> DelayDevice

{ debug!("New DelayDevice"); let (rx, tx) = mpsc::unbounded_channel(); - let delay = Arc::new(AtomicRawCell::new(Box::default())); + let (config_tx, config_rx) = mpsc::unbounded_channel(); DelayDevice { ingress: Arc::new(DelayDeviceIngress { ingress: rx }), egress: DelayDeviceEgress { egress: tx, - delay: Arc::clone(&delay), - inner_delay: Box::default(), + delay: Delay::default(), + config_rx, timer: Timer::new().unwrap(), }, - control_interface: Arc::new(DelayDeviceControlInterface { delay }), + control_interface: Arc::new(DelayDeviceControlInterface { config_tx }), } } } diff --git a/rattan/src/devices/mod.rs b/rattan/src/devices/mod.rs index 7973735..33cb9a1 100644 --- a/rattan/src/devices/mod.rs +++ b/rattan/src/devices/mod.rs @@ -3,6 +3,7 @@ use etherparse::{Ethernet2Header, Ipv4Header}; #[cfg(feature = "serde")] use serde::Deserialize; use std::{fmt::Debug, sync::Arc}; +use tokio::time::Instant; use crate::error::Error; @@ -19,22 +20,30 @@ pub trait Packet: Debug + 'static + Send { fn as_raw_buffer(&mut self) -> &mut [u8]; fn ether_hdr(&self) -> Option; fn ip_hdr(&self) -> Option; + + fn get_timestamp(&self) -> Instant; + fn set_timestamp(&mut self, timestamp: Instant); } #[derive(Debug)] pub struct StdPacket { buf: Vec, + timestamp: Instant, } impl Packet for StdPacket { fn empty(maximum: usize) -> Self { Self { buf: Vec::with_capacity(maximum), + timestamp: Instant::now(), } } fn from_raw_buffer(buf: &[u8]) -> Self { - Self { buf: buf.to_vec() } + Self { + buf: buf.to_vec(), + timestamp: Instant::now(), + } } fn length(&self) -> usize { @@ -61,6 +70,14 @@ impl Packet for StdPacket { fn ether_hdr(&self) -> Option { etherparse::Ethernet2Header::from_slice(self.buf.as_slice()).map_or(None, |x| Some(x.0)) } + + fn get_timestamp(&self) -> Instant { + self.timestamp + } + + fn set_timestamp(&mut self, timestamp: Instant) { + self.timestamp = timestamp; + } } pub trait Ingress

: Send + Sync diff --git a/rattan/src/env.rs b/rattan/src/env.rs index aecf61a..bf7a6cd 100644 --- a/rattan/src/env.rs +++ b/rattan/src/env.rs @@ -289,7 +289,7 @@ pub fn get_container_env() -> anyhow::Result { { for address_attr in address_msg.attributes { if let AddressAttribute::Address(address) = address_attr { - ip_addr = Some((address, address_msg.header.prefix_len)); + ip_addr = Some((address, address_msg.header.prefix_len)); } } if ip_addr.is_none() { diff --git a/rattan/src/metal/io.rs b/rattan/src/metal/io.rs index fe52f85..8016f24 100644 --- a/rattan/src/metal/io.rs +++ b/rattan/src/metal/io.rs @@ -228,7 +228,8 @@ where std::mem::size_of::() as u32, )) } - }; match res { + }; + match res { Ok(_) => break, Err(e) => { times -= 1; diff --git a/rattan/src/metal/timer.rs b/rattan/src/metal/timer.rs index b692c19..15ba3d8 100644 --- a/rattan/src/metal/timer.rs +++ b/rattan/src/metal/timer.rs @@ -8,6 +8,7 @@ use tokio::io::unix::AsyncFd; use crate::metal::error::MetalError; +// High-resolution timer pub struct Timer { timer: AsyncFd, } @@ -23,6 +24,10 @@ impl Timer { } pub async fn sleep(&mut self, duration: std::time::Duration) -> Result<(), MetalError> { + // Set TimerFd to 0 will disable it. We need to handle this case. + if duration.as_nanos() == 0 { + return Ok(()); + } self.timer.get_mut().set( Expiration::OneShot(TimeSpec::from_duration(duration)), TimerSetTimeFlags::empty(), diff --git a/rattan/src/utils/sync.rs b/rattan/src/utils/sync.rs index 0692aaa..230de42 100644 --- a/rattan/src/utils/sync.rs +++ b/rattan/src/utils/sync.rs @@ -25,6 +25,13 @@ impl AtomicRawCell { } } + pub fn new_null() -> AtomicRawCell { + AtomicRawCell { + ptr: AtomicPtr::new(std::ptr::null_mut()), + phantom: PhantomData, + } + } + fn swap_raw(&self, ptr: *mut T, order: Ordering) -> *mut T { self.ptr.swap(ptr, order) } diff --git a/rattan/tests/external/af_packet.rs b/rattan/tests/external/af_packet.rs index 5fe915f..73a42d5 100644 --- a/rattan/tests/external/af_packet.rs +++ b/rattan/tests/external/af_packet.rs @@ -1,6 +1,5 @@ /// This test need to be run as root (CAP_NET_ADMIN, CAP_SYS_ADMIN and CAP_SYS_RAW) /// CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER='sudo -E' cargo test af_packet -- --ignored --nocapture - use libc::{c_void, size_t, sockaddr, sockaddr_ll, socklen_t}; use nix::errno::Errno; use nix::sys::epoll::{epoll_create, epoll_ctl, epoll_wait, EpollEvent, EpollFlags}; diff --git a/rattan/tests/integration/bandwidth.rs b/rattan/tests/integration/bandwidth.rs index cd06dd9..8fec609 100644 --- a/rattan/tests/integration/bandwidth.rs +++ b/rattan/tests/integration/bandwidth.rs @@ -2,16 +2,24 @@ /// RUST_LOG=info CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER='sudo -E' cargo test bandwidth --all-features -- --nocapture use netem_trace::Bandwidth; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig}; +use rattan::devices::bandwidth::{ + queue::{ + CODELQueue, CODELQueueConfig, DropHeadQueue, DropHeadQueueConfig, DropTailQueue, + DropTailQueueConfig, InfiniteQueue, + }, + BwDevice, BwDeviceConfig, MAX_BANDWIDTH, +}; use rattan::devices::external::VirtualEthernet; use rattan::devices::{ControlInterface, Device, StdPacket}; use rattan::env::{get_std_env, StdNetEnvConfig}; use rattan::metal::io::AfPacketDriver; use rattan::metal::netns::NetNsGuard; use regex::Regex; +use std::sync::mpsc; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, span, warn, Instrument, Level}; #[instrument] @@ -40,8 +48,8 @@ fn test_bandwidth() { runtime.block_on( async move { - let left_bw_device = BwDevice::::new(); - let right_bw_device = BwDevice::::new(); + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); let left_control_interface = left_bw_device.control_interface(); let right_control_interface = right_bw_device.control_interface(); if let Err(_) = control_tx.send((left_control_interface, right_control_interface)) { @@ -70,7 +78,7 @@ fn test_bandwidth() { let config = RattanMachineConfig { original_ns, - port: 8081, + port: 8090, }; machine.core_loop(config).await } @@ -123,14 +131,18 @@ fn test_bandwidth() { warn!("{}", stderr); } let re = Regex::new(r#""bits_per_second":\s*(\d+)"#).unwrap(); - let bandwidth = re + let mut bandwidth = re .captures_iter(&stdout) .flat_map(|cap| cap[1].parse::()) .step_by(2) .take(10) .collect::>(); - info!("bandwidth: {:?}", bandwidth); + info!(?bandwidth); assert!(!bandwidth.is_empty()); + + bandwidth.drain(0..4); + let bitrate = bandwidth.iter().sum::() / bandwidth.len() as u64; + info!("bitrate: {:?}", Bandwidth::from_bps(bitrate)); } // info!("===================================================="); // After set the BwDevice, the bandwidth should be between 90-100Mbps @@ -139,10 +151,10 @@ fn test_bandwidth() { let _span = span!(Level::INFO, "iperf_with_limit").entered(); info!("try to iperf with bandwidth limit set to 100Mbps"); left_control_interface - .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100), None)) .unwrap(); right_control_interface - .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100), None)) .unwrap(); let handle = { let _right_ns_guard = NetNsGuard::new(right_ns.clone()).unwrap(); @@ -188,13 +200,552 @@ fn test_bandwidth() { .step_by(2) .take(10) .collect::>(); + info!(?bandwidth); bandwidth.drain(0..4); let bitrate = bandwidth.iter().sum::() / bandwidth.len() as u64; - info!("bitrate: {}", bitrate); + info!("bitrate: {:?}", Bandwidth::from_bps(bitrate)); assert!(bitrate > 90000000 && bitrate < 100000000); } cancel_token.cancel(); rattan_thread.join().unwrap(); } + +#[instrument] +#[test_log::test] +fn test_droptail_queue() { + let _std_env = get_std_env(StdNetEnvConfig::default()).unwrap(); + let left_ns = _std_env.left_ns.clone(); + let right_ns = _std_env.right_ns.clone(); + + let mut machine = RattanMachine::::new(); + let cancel_token = machine.cancel_token(); + + let (control_tx, control_rx) = oneshot::channel(); + + let rattan_thread_span = span!(Level::DEBUG, "rattan_thread").or_current(); + let rattan_thread = std::thread::spawn(move || { + let _entered = rattan_thread_span.entered(); + let original_ns = _std_env.rattan_ns.enter().unwrap(); + let _left_pair_guard = _std_env.left_pair.clone(); + let _right_pair_guard = _std_env.right_pair.clone(); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on( + async move { + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, DropTailQueue::new(10, None)); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, DropTailQueue::new(10, None)); + let left_control_interface = left_bw_device.control_interface(); + let right_control_interface = right_bw_device.control_interface(); + if let Err(_) = control_tx.send((left_control_interface, right_control_interface)) { + error!("send control interface failed"); + } + let left_device = VirtualEthernet::::new( + _std_env.left_pair.right.clone(), + ); + let right_device = VirtualEthernet::::new( + _std_env.right_pair.left.clone(), + ); + + let (left_bw_rx, left_bw_tx) = machine.add_device(left_bw_device); + info!(left_bw_rx, left_bw_tx); + let (right_bw_rx, right_bw_tx) = machine.add_device(right_bw_device); + info!(right_bw_rx, right_bw_tx); + let (left_device_rx, left_device_tx) = machine.add_device(left_device); + info!(left_device_rx, left_device_tx); + let (right_device_rx, right_device_tx) = machine.add_device(right_device); + info!(right_device_rx, right_device_tx); + + machine.link_device(left_device_rx, left_bw_tx); + machine.link_device(left_bw_rx, right_device_tx); + machine.link_device(right_device_rx, right_bw_tx); + machine.link_device(right_bw_rx, left_device_tx); + + let config = RattanMachineConfig { + original_ns, + port: 8091, + }; + machine.core_loop(config).await + } + .in_current_span(), + ); + }); + + let (left_control_interface, _right_control_interface) = control_rx.blocking_recv().unwrap(); + + { + let _span = span!(Level::INFO, "run_test").entered(); + info!("Test DropTailQueue"); + + let (msg_tx, msg_rx) = mpsc::channel(); + + let cancel_token_inner = CancellationToken::new(); + let server_cancel_token = cancel_token_inner.clone(); + { + let _right_ns_guard = NetNsGuard::new(right_ns.clone()).unwrap(); + std::thread::spawn(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + runtime.block_on(async move { + let server_socket = tokio::net::UdpSocket::bind("0.0.0.0:54321").await.unwrap(); + let mut buf = [0; 1024]; + loop { + tokio::select! { + _ = cancel_token_inner.cancelled() => { + break; + } + Ok((size, _)) = server_socket.recv_from(&mut buf) => { + msg_tx.send((buf[0], size, Instant::now())).unwrap(); + } + } + } + }); + }) + }; + + let _left_ns_guard = NetNsGuard::new(left_ns.clone()).unwrap(); + sleep(Duration::from_millis(500)); + + let client_socket = std::net::UdpSocket::bind("0.0.0.0:54321").unwrap(); + client_socket.connect("192.168.12.1:54321").unwrap(); + + info!("Test DropTailQueue (10 packets limit)"); + info!("Set bandwidth to 800kbps (1000B per 10ms)"); + left_control_interface + .set_config(BwDeviceConfig::new(Bandwidth::from_kbps(800), None)) + .unwrap(); + info!("Send 30 packets(1000B) with 1.05ms interval"); + let mut next_time = Instant::now(); + for i in 0..30 { + client_socket.send(&[i as u8; 1000 - 42]).unwrap(); + next_time += Duration::from_micros(1050); + sleep(next_time - Instant::now()); + } + sleep(Duration::from_millis(200)); + let mut recv_indexs = Vec::new(); + while let Ok((index, _size, _timestamp)) = msg_rx.try_recv() { + recv_indexs.push(index); + } + info!(?recv_indexs); + assert!(recv_indexs.len() == 14); + for i in 0..12 { + assert!(recv_indexs[i] == i as u8); + } + assert!(19 <= recv_indexs[12] && recv_indexs[12] <= 20); + assert!(27 <= recv_indexs[13] && recv_indexs[13] <= 30); + + info!("Test DropTailQueue (500 Bytes limit)"); + info!("Set bandwidth to 40kbps (50B per 10ms)"); + left_control_interface + .set_config(BwDeviceConfig::new( + Bandwidth::from_kbps(40), + DropTailQueueConfig::new(None, 500), + )) + .unwrap(); + info!("Send 30 packets(50B) with 1.05ms interval"); + let mut next_time = Instant::now(); + for i in 0..30 { + client_socket.send(&[i as u8; 50 - 42]).unwrap(); + next_time += Duration::from_micros(1050); + sleep(next_time - Instant::now()); + } + sleep(Duration::from_millis(200)); + let mut recv_indexs = Vec::new(); + while let Ok((index, _size, _timestamp)) = msg_rx.try_recv() { + recv_indexs.push(index); + } + info!(?recv_indexs); + assert!(recv_indexs.len() == 14); + for i in 0..12 { + assert!(recv_indexs[i] == i as u8); + } + assert!(19 <= recv_indexs[12] && recv_indexs[12] <= 20); + assert!(27 <= recv_indexs[13] && recv_indexs[13] <= 30); + + server_cancel_token.cancel(); + } + + cancel_token.cancel(); + rattan_thread.join().unwrap(); +} + +#[instrument] +#[test_log::test] +fn test_drophead_queue() { + let _std_env = get_std_env(StdNetEnvConfig::default()).unwrap(); + let left_ns = _std_env.left_ns.clone(); + let right_ns = _std_env.right_ns.clone(); + + let mut machine = RattanMachine::::new(); + let cancel_token = machine.cancel_token(); + + let (control_tx, control_rx) = oneshot::channel(); + + let rattan_thread_span = span!(Level::DEBUG, "rattan_thread").or_current(); + let rattan_thread = std::thread::spawn(move || { + let _entered = rattan_thread_span.entered(); + let original_ns = _std_env.rattan_ns.enter().unwrap(); + let _left_pair_guard = _std_env.left_pair.clone(); + let _right_pair_guard = _std_env.right_pair.clone(); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on( + async move { + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, DropHeadQueue::new(10, None)); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, DropHeadQueue::new(10, None)); + let left_control_interface = left_bw_device.control_interface(); + let right_control_interface = right_bw_device.control_interface(); + if let Err(_) = control_tx.send((left_control_interface, right_control_interface)) { + error!("send control interface failed"); + } + let left_device = VirtualEthernet::::new( + _std_env.left_pair.right.clone(), + ); + let right_device = VirtualEthernet::::new( + _std_env.right_pair.left.clone(), + ); + + let (left_bw_rx, left_bw_tx) = machine.add_device(left_bw_device); + info!(left_bw_rx, left_bw_tx); + let (right_bw_rx, right_bw_tx) = machine.add_device(right_bw_device); + info!(right_bw_rx, right_bw_tx); + let (left_device_rx, left_device_tx) = machine.add_device(left_device); + info!(left_device_rx, left_device_tx); + let (right_device_rx, right_device_tx) = machine.add_device(right_device); + info!(right_device_rx, right_device_tx); + + machine.link_device(left_device_rx, left_bw_tx); + machine.link_device(left_bw_rx, right_device_tx); + machine.link_device(right_device_rx, right_bw_tx); + machine.link_device(right_bw_rx, left_device_tx); + + let config = RattanMachineConfig { + original_ns, + port: 8092, + }; + machine.core_loop(config).await + } + .in_current_span(), + ); + }); + + let (left_control_interface, _right_control_interface) = control_rx.blocking_recv().unwrap(); + + { + let _span = span!(Level::INFO, "run_test").entered(); + info!("Test DropHeadQueue"); + + let (msg_tx, msg_rx) = mpsc::channel(); + + let cancel_token_inner = CancellationToken::new(); + let server_cancel_token = cancel_token_inner.clone(); + { + let _right_ns_guard = NetNsGuard::new(right_ns.clone()).unwrap(); + std::thread::spawn(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + runtime.block_on(async move { + let server_socket = tokio::net::UdpSocket::bind("0.0.0.0:54321").await.unwrap(); + let mut buf = [0; 1024]; + loop { + tokio::select! { + _ = cancel_token_inner.cancelled() => { + break; + } + Ok((size, _)) = server_socket.recv_from(&mut buf) => { + msg_tx.send((buf[0], size, Instant::now())).unwrap(); + } + } + } + }); + }) + }; + + let _left_ns_guard = NetNsGuard::new(left_ns.clone()).unwrap(); + sleep(Duration::from_millis(500)); + + let client_socket = std::net::UdpSocket::bind("0.0.0.0:54321").unwrap(); + client_socket.connect("192.168.12.1:54321").unwrap(); + + info!("Test DropHeadQueue (10 packets limit)"); + info!("Set bandwidth to 800kbps (1000B per 10ms)"); + left_control_interface + .set_config(BwDeviceConfig::new(Bandwidth::from_kbps(800), None)) + .unwrap(); + info!("Send 30 packets(1000B) with 1.05ms interval"); + let mut next_time = Instant::now(); + for i in 0..30 { + client_socket.send(&[i as u8; 1000 - 42]).unwrap(); + next_time += Duration::from_micros(1050); + sleep(next_time - Instant::now()); + } + sleep(Duration::from_millis(200)); + let mut recv_indexs = Vec::new(); + while let Ok((index, _size, _timestamp)) = msg_rx.try_recv() { + recv_indexs.push(index); + } + info!(?recv_indexs); + assert!(recv_indexs.len() == 14); + assert!(recv_indexs[0] == 0); + assert!(recv_indexs[1] == 1); + assert!(8 <= recv_indexs[2] && recv_indexs[2] <= 10); + assert!(18 <= recv_indexs[3] && recv_indexs[3] <= 19); + for i in 4..14 { + assert!(recv_indexs[i] == 16 + i as u8); + } + + info!("Test DropHeadQueue (500 Bytes limit)"); + info!("Set bandwidth to 40kbps (50B per 10ms)"); + left_control_interface + .set_config(BwDeviceConfig::new( + Bandwidth::from_kbps(40), + DropHeadQueueConfig::new(None, 500), + )) + .unwrap(); + info!("Send 30 packets(50B) with 1.05ms interval"); + let mut next_time = Instant::now(); + for i in 0..30 { + client_socket.send(&[i as u8; 50 - 42]).unwrap(); + next_time += Duration::from_micros(1050); + sleep(next_time - Instant::now()); + } + sleep(Duration::from_millis(200)); + let mut recv_indexs = Vec::new(); + while let Ok((index, _size, _timestamp)) = msg_rx.try_recv() { + recv_indexs.push(index); + } + info!(?recv_indexs); + assert!(recv_indexs.len() == 14); + assert!(recv_indexs[0] == 0); + assert!(recv_indexs[1] == 1); + assert!(8 <= recv_indexs[2] && recv_indexs[2] <= 10); + assert!(18 <= recv_indexs[3] && recv_indexs[3] <= 19); + for i in 4..14 { + assert!(recv_indexs[i] == 16 + i as u8); + } + + server_cancel_token.cancel(); + } + + cancel_token.cancel(); + rattan_thread.join().unwrap(); +} + +#[instrument] +#[test_log::test] +fn test_codel_queue() { + let _std_env = get_std_env(StdNetEnvConfig::default()).unwrap(); + let left_ns = _std_env.left_ns.clone(); + let right_ns = _std_env.right_ns.clone(); + + let mut machine = RattanMachine::::new(); + let cancel_token = machine.cancel_token(); + + let (control_tx, control_rx) = oneshot::channel(); + + let rattan_thread_span = span!(Level::DEBUG, "rattan_thread").or_current(); + let rattan_thread = std::thread::spawn(move || { + let _entered = rattan_thread_span.entered(); + let original_ns = _std_env.rattan_ns.enter().unwrap(); + let _left_pair_guard = _std_env.left_pair.clone(); + let _right_pair_guard = _std_env.right_pair.clone(); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on( + async move { + let left_bw_device = BwDevice::new( + MAX_BANDWIDTH, + CODELQueue::new(CODELQueueConfig::new( + 60, + None, + Duration::from_millis(104), + Duration::from_millis(50), + 1500, + )), + ); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let left_control_interface = left_bw_device.control_interface(); + let right_control_interface = right_bw_device.control_interface(); + if let Err(_) = control_tx.send((left_control_interface, right_control_interface)) { + error!("send control interface failed"); + } + let left_device = VirtualEthernet::::new( + _std_env.left_pair.right.clone(), + ); + let right_device = VirtualEthernet::::new( + _std_env.right_pair.left.clone(), + ); + + let (left_bw_rx, left_bw_tx) = machine.add_device(left_bw_device); + info!(left_bw_rx, left_bw_tx); + let (right_bw_rx, right_bw_tx) = machine.add_device(right_bw_device); + info!(right_bw_rx, right_bw_tx); + let (left_device_rx, left_device_tx) = machine.add_device(left_device); + info!(left_device_rx, left_device_tx); + let (right_device_rx, right_device_tx) = machine.add_device(right_device); + info!(right_device_rx, right_device_tx); + + machine.link_device(left_device_rx, left_bw_tx); + machine.link_device(left_bw_rx, right_device_tx); + machine.link_device(right_device_rx, right_bw_tx); + machine.link_device(right_bw_rx, left_device_tx); + + let config = RattanMachineConfig { + original_ns, + port: 8093, + }; + machine.core_loop(config).await + } + .in_current_span(), + ); + }); + + let (left_control_interface, _right_control_interface) = control_rx.blocking_recv().unwrap(); + + { + let _span = span!(Level::INFO, "run_test").entered(); + info!("Test CODELQueue"); + + let (msg_tx, msg_rx) = mpsc::channel(); + + let cancel_token_inner = CancellationToken::new(); + let server_cancel_token = cancel_token_inner.clone(); + { + let _right_ns_guard = NetNsGuard::new(right_ns.clone()).unwrap(); + std::thread::spawn(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + runtime.block_on(async move { + let server_socket = tokio::net::UdpSocket::bind("0.0.0.0:54321").await.unwrap(); + let mut buf = [0; 1024]; + loop { + tokio::select! { + _ = cancel_token_inner.cancelled() => { + break; + } + Ok((size, _)) = server_socket.recv_from(&mut buf) => { + msg_tx.send((buf[0], size, Instant::now())).unwrap(); + } + } + } + }); + }) + }; + + let _left_ns_guard = NetNsGuard::new(left_ns.clone()).unwrap(); + sleep(Duration::from_millis(500)); + + let client_socket = std::net::UdpSocket::bind("0.0.0.0:54321").unwrap(); + client_socket.connect("192.168.12.1:54321").unwrap(); + + info!("Test CODELQueue (60 packets limit, target 50ms, interval 104ms, mtu 1500)"); + info!("Set bandwidth to 800kbps (1000B per 10ms)"); + left_control_interface + .set_config(BwDeviceConfig::new(Bandwidth::from_kbps(800), None)) + .unwrap(); + info!("Send 80 packets(1000B) with 2.05ms interval"); + let mut next_time = Instant::now(); + for i in 0..80 { + client_socket.send(&[i as u8; 1000 - 42]).unwrap(); + next_time += Duration::from_micros(2050); + sleep(next_time - Instant::now()); + } + sleep(Duration::from_millis(1000)); + let mut recv_indexs = Vec::new(); + while let Ok((index, _size, _timestamp)) = msg_rx.try_recv() { + recv_indexs.push(index); + } + info!(?recv_indexs); + let mut dropped_indexs = Vec::new(); + for i in 0..80 { + if !recv_indexs.contains(&(i as u8)) { + dropped_indexs.push(i as u8); + } + } + info!(?dropped_indexs); + // A reference result: [18, 30, 38, 45, 51, 57, 62, 67, 72, 76, 77, 78] (len=12) + // This result is same with mahimahi + assert!(11 <= dropped_indexs.len() && dropped_indexs.len() <= 13); + assert!(17 <= dropped_indexs[0] && dropped_indexs[0] <= 19); + let pattarn = [12, 8, 7, 6, 6, 5, 5, 4]; + for i in 0..pattarn.len() { + assert!( + pattarn[i] - 1 <= dropped_indexs[i + 1] - dropped_indexs[i] + && dropped_indexs[i + 1] - dropped_indexs[i] <= pattarn[i] + 1 + ); + } + + info!("Test CODELQueue (500 Bytes limit, target 50ms, interval 104ms, mtu 80)"); + info!("Use last cycle as a good starting point"); + info!("Set bandwidth to 40kbps (50B per 10ms)"); + left_control_interface + .set_config(BwDeviceConfig::new( + Bandwidth::from_kbps(40), + CODELQueueConfig::new( + None, + 3000, + Duration::from_millis(104), + Duration::from_millis(50), + 80, + ), + )) + .unwrap(); + info!("Send 80 packets(50B) with 2.05ms interval"); + let mut next_time = Instant::now(); + for i in 0..80 { + client_socket.send(&[i as u8; 50 - 42]).unwrap(); + next_time += Duration::from_micros(2050); + sleep(next_time - Instant::now()); + } + sleep(Duration::from_millis(1000)); + let mut recv_indexs = Vec::new(); + while let Ok((index, _size, _timestamp)) = msg_rx.try_recv() { + recv_indexs.push(index); + } + info!(?recv_indexs); + let mut dropped_indexs = Vec::new(); + for i in 0..80 { + if !recv_indexs.contains(&(i as u8)) { + dropped_indexs.push(i as u8); + } + } + info!(?dropped_indexs); + // A reference result: [18, 23, 28, 32, 36, 40, 44, 48, 51, 55, 59, 62, 65, 69, 72, 76, 77, 78] (len=18) + assert!(16 <= dropped_indexs.len() && dropped_indexs.len() <= 20); + assert!(16 <= dropped_indexs[0] && dropped_indexs[0] <= 20); + let pattarn = [5, 5, 4, 4, 4, 4, 3, 4, 4, 3, 3, 4, 3]; + for i in 0..pattarn.len() { + assert!( + pattarn[i] - 1 <= dropped_indexs[i + 1] - dropped_indexs[i] + && dropped_indexs[i + 1] - dropped_indexs[i] <= pattarn[i] + 1 + ); + } + + server_cancel_token.cancel(); + } + + cancel_token.cancel(); + rattan_thread.join().unwrap(); +} diff --git a/rattan/tests/integration/compound.rs b/rattan/tests/integration/compound.rs index 6c2605b..626afe0 100644 --- a/rattan/tests/integration/compound.rs +++ b/rattan/tests/integration/compound.rs @@ -4,7 +4,7 @@ use netem_trace::Bandwidth; use rand::rngs::StdRng; use rand::SeedableRng; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig}; +use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}; use rattan::devices::delay::{DelayDevice, DelayDeviceConfig}; use rattan::devices::loss::{LossDevice, LossDeviceConfig}; @@ -48,8 +48,8 @@ fn test_compound() { runtime.block_on( async move { let rng = StdRng::seed_from_u64(42); - let left_bw_device = BwDevice::::new(); - let right_bw_device = BwDevice::::new(); + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); let left_delay_device = DelayDevice::::new(); let right_delay_device = DelayDevice::::new(); let left_loss_device = LossDevice::::new(rng.clone()); @@ -178,10 +178,10 @@ fn test_compound() { { info!("try to iperf with bandwidth limit set to 100Mbps"); left_bw_ctl - .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100), None)) .unwrap(); right_bw_ctl - .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .set_config(BwDeviceConfig::new(Bandwidth::from_mbps(100), None)) .unwrap(); left_delay_ctl .set_config(DelayDeviceConfig::new(Duration::from_millis(50))) diff --git a/rattan/tests/integration/delay.rs b/rattan/tests/integration/delay.rs index ac28c96..4d63808 100644 --- a/rattan/tests/integration/delay.rs +++ b/rattan/tests/integration/delay.rs @@ -98,6 +98,7 @@ fn test_delay() { .captures_iter(&stdout) .flat_map(|cap| cap[1].parse::()) .collect::>(); + info!(?latency); assert_eq!(latency.len(), 10); latency.drain(0..5); @@ -130,6 +131,7 @@ fn test_delay() { .captures_iter(&stdout) .flat_map(|cap| cap[1].parse::()) .collect::>(); + info!(?latency); assert_eq!(latency.len(), 10); latency.drain(0..5); diff --git a/rattan/tests/integration/http.rs b/rattan/tests/integration/http.rs index 5d4e030..472e2d3 100644 --- a/rattan/tests/integration/http.rs +++ b/rattan/tests/integration/http.rs @@ -4,7 +4,7 @@ use netem_trace::Bandwidth; use rand::rngs::StdRng; use rand::SeedableRng; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig}; +use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}; use rattan::devices::delay::{DelayDevice, DelayDeviceConfig}; use rattan::devices::loss::{LossDevice, LossDeviceConfig}; @@ -48,8 +48,8 @@ fn test_http() { runtime.block_on( async move { let rng = StdRng::seed_from_u64(42); - let left_bw_device = BwDevice::::new(); - let right_bw_device = BwDevice::::new(); + let left_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); + let right_bw_device = BwDevice::new(MAX_BANDWIDTH, InfiniteQueue::new()); let left_delay_device = DelayDevice::::new(); let right_delay_device = DelayDevice::::new(); let left_loss_device = LossDevice::::new(rng.clone()); @@ -73,7 +73,10 @@ fn test_http() { info!(left_delay_rx, left_delay_tx); let (right_delay_rx, right_delay_tx) = machine.add_device(right_delay_device); info!(right_delay_rx, right_delay_tx); - if delay_control_tx.send((left_delay_tx, right_delay_tx)).is_err() { + if delay_control_tx + .send((left_delay_tx, right_delay_tx)) + .is_err() + { error!("send control interface failed"); } let (left_loss_rx, left_loss_tx) = machine.add_device(left_loss_device); @@ -172,7 +175,10 @@ fn test_http() { // Test wrong http config let resp = client .post(format!("http://127.0.0.1:8087/control/{}", left_loss_ctl)) - .json(&BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .json(&BwDeviceConfig::>::new( + Bandwidth::from_mbps(100), + None, + )) .send() .unwrap(); assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST); @@ -197,13 +203,19 @@ fn test_http() { // Test right http config let resp = client .post(format!("http://127.0.0.1:8087/control/{}", left_bw_ctl)) - .json(&BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .json(&BwDeviceConfig::>::new( + Bandwidth::from_mbps(100), + None, + )) .send() .unwrap(); assert!(resp.status().is_success()); let resp = client .post(format!("http://127.0.0.1:8087/control/{}", right_bw_ctl)) - .json(&BwDeviceConfig::new(Bandwidth::from_mbps(100))) + .json(&BwDeviceConfig::>::new( + Bandwidth::from_mbps(100), + None, + )) .send() .unwrap(); assert!(resp.status().is_success()); diff --git a/rattan/tests/integration/loss.rs b/rattan/tests/integration/loss.rs index a6475b6..6937d7f 100644 --- a/rattan/tests/integration/loss.rs +++ b/rattan/tests/integration/loss.rs @@ -166,7 +166,10 @@ fn test_iid_loss() { let right_loss_device = IIDLossDevice::::new(rng); let left_control_interface = left_loss_device.control_interface(); let right_control_interface = right_loss_device.control_interface(); - if control_tx.send((left_control_interface, right_control_interface)).is_err() { + if control_tx + .send((left_control_interface, right_control_interface)) + .is_err() + { error!("send control interface failed"); } let left_device = VirtualEthernet::::new( From f4325a8e2c0b12018bd0b03223038d3cc099fd29 Mon Sep 17 00:00:00 2001 From: Centaurus99 Date: Mon, 4 Mar 2024 11:48:30 +0000 Subject: [PATCH 2/2] build: set cargo resolver to 2 --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 0a5c8e9..fd3434d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,3 @@ [workspace] members = ['rattan', 'rattan-cli'] +resolver = "2"