From 638cd498c0c86e277716cc4da58fcfdbfca7a250 Mon Sep 17 00:00:00 2001 From: Centaurus99 Date: Mon, 4 Mar 2024 13:18:11 +0000 Subject: [PATCH] refactor(device): split queue from bandwidth device module Refs: #6 --- rattan-cli/src/docker.rs | 2 +- rattan-cli/src/main.rs | 2 +- rattan/benches/bandwidth.rs | 2 +- rattan/src/devices/bandwidth/mod.rs | 259 ++++++++++++++++++ .../{bandwidth.rs => bandwidth/queue.rs} | 254 +---------------- rattan/tests/integration/bandwidth.rs | 7 +- rattan/tests/integration/compound.rs | 2 +- rattan/tests/integration/http.rs | 2 +- 8 files changed, 271 insertions(+), 259 deletions(-) create mode 100644 rattan/src/devices/bandwidth/mod.rs rename rattan/src/devices/{bandwidth.rs => bandwidth/queue.rs} (63%) diff --git a/rattan-cli/src/docker.rs b/rattan-cli/src/docker.rs index 580d76b..d9e3ea0 100644 --- a/rattan-cli/src/docker.rs +++ b/rattan-cli/src/docker.rs @@ -9,7 +9,7 @@ use rand::{rngs::StdRng, SeedableRng}; use rattan::{ core::{RattanMachine, RattanMachineConfig}, devices::{ - bandwidth::{BwDevice, BwDeviceConfig, InfiniteQueue, MAX_BANDWIDTH}, + bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}, external::VirtualEthernet, }, env::get_container_env, diff --git a/rattan-cli/src/main.rs b/rattan-cli/src/main.rs index 4119dfd..db54cf0 100644 --- a/rattan-cli/src/main.rs +++ b/rattan-cli/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use rand::rngs::StdRng; use rand::SeedableRng; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig, InfiniteQueue, MAX_BANDWIDTH}; +use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}; use rattan::devices::delay::{DelayDevice, DelayDeviceConfig}; use rattan::devices::external::VirtualEthernet; use rattan::devices::loss::{IIDLossDevice, IIDLossDeviceConfig}; diff --git a/rattan/benches/bandwidth.rs b/rattan/benches/bandwidth.rs index 56e000d..5a1a766 100644 --- a/rattan/benches/bandwidth.rs +++ b/rattan/benches/bandwidth.rs @@ -4,7 +4,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use rattan::{ core::{RattanMachine, RattanMachineConfig}, devices::{ - bandwidth::{BwDevice, InfiniteQueue, MAX_BANDWIDTH}, + bandwidth::{queue::InfiniteQueue, BwDevice, MAX_BANDWIDTH}, external::VirtualEthernet, StdPacket, }, diff --git a/rattan/src/devices/bandwidth/mod.rs b/rattan/src/devices/bandwidth/mod.rs new file mode 100644 index 0000000..745f20b --- /dev/null +++ b/rattan/src/devices/bandwidth/mod.rs @@ -0,0 +1,259 @@ +use crate::devices::bandwidth::queue::PacketQueue; +use crate::devices::{Device, Packet}; +use crate::error::Error; +use crate::metal::timer::Timer; +use crate::utils::sync::AtomicRawCell; +use async_trait::async_trait; +use netem_trace::{Bandwidth, Delay}; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tracing::{debug, info}; + +use super::{ControlInterface, Egress, Ingress}; + +pub mod queue; + +pub const MAX_BANDWIDTH: Bandwidth = Bandwidth::from_bps(u64::MAX); + +pub struct BwDeviceIngress

+where + P: Packet, +{ + ingress: mpsc::UnboundedSender

, +} + +impl

Clone for BwDeviceIngress

+where + P: Packet, +{ + fn clone(&self) -> Self { + Self { + ingress: self.ingress.clone(), + } + } +} + +impl

Ingress

for BwDeviceIngress

+where + P: Packet + Send, +{ + fn enqueue(&self, packet: P) -> Result<(), Error> { + // XXX(minhuw): handle possible error here + self.ingress.send(packet).unwrap(); + Ok(()) + } +} + +// Requires the bandwidth to be less than 2^64 bps +fn transfer_time(length: usize, bandwidth: Bandwidth) -> Delay { + let bits = length * 8; + let capacity = bandwidth.as_bps() as u64; + let seconds = bits as f64 / capacity as f64; + Delay::from_secs_f64(seconds) +} + +pub struct BwDeviceEgress +where + P: Packet, + Q: PacketQueue

, +{ + egress: mpsc::UnboundedReceiver

, + packet_queue: Q, + queue_config: Arc>, + bandwidth: Arc>, + inner_bandwidth: Box, + next_available: Instant, + timer: Timer, +} + +#[async_trait] +impl Egress

for BwDeviceEgress +where + P: Packet + Send + Sync, + Q: PacketQueue

, +{ + async fn dequeue(&mut self) -> Option

{ + // wait until next_available + let now = Instant::now(); + if now < self.next_available { + self.timer.sleep(self.next_available - now).await.unwrap(); + } + // update queue config + if let Some(queue_config) = self.queue_config.swap_null() { + debug!(?queue_config, "Set queue config:"); + self.packet_queue.configure(*queue_config); + } + // process the packets received during this time + while let Ok(new_packet) = self.egress.try_recv() { + self.packet_queue.enqueue(new_packet); + } + + let mut packet = self.packet_queue.dequeue(); + while packet.is_none() { + // the queue is empty, wait for the next packet + match self.egress.recv().await { + Some(new_packet) => { + // update queue config + if let Some(queue_config) = self.queue_config.swap_null() { + debug!(?queue_config, "Set queue config:"); + self.packet_queue.configure(*queue_config); + } + self.packet_queue.enqueue(new_packet); + packet = self.packet_queue.dequeue(); + } + None => { + return None; + } + } + } + + // update bandwidth config + if let Some(bandwidth) = self.bandwidth.swap_null() { + self.inner_bandwidth = bandwidth; + debug!(?self.inner_bandwidth, "Set inner bandwidth:"); + } + + let packet = packet.unwrap(); + let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth); + if packet.get_timestamp() >= self.next_available { + // the packet arrives after next_available + self.next_available = packet.get_timestamp() + transfer_time; + } else { + // the packet arrives before next_available and now >= self.next_available + self.next_available += transfer_time; + } + Some(packet) + } +} + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] +#[derive(Debug)] +pub struct BwDeviceConfig +where + P: Packet, + Q: PacketQueue

, +{ + bandwidth: Option, + queue_config: Option, +} + +impl BwDeviceConfig +where + P: Packet, + Q: PacketQueue

, +{ + pub fn new>, U: Into>>( + bandwidth: T, + queue_config: U, + ) -> Self { + Self { + bandwidth: bandwidth.into(), + queue_config: queue_config.into(), + } + } +} + +pub struct BwDeviceControlInterface +where + P: Packet, + Q: PacketQueue

, +{ + bandwidth: Arc>, + queue_config: Arc>, +} + +impl ControlInterface for BwDeviceControlInterface +where + P: Packet, + Q: PacketQueue

+ 'static, +{ + type Config = BwDeviceConfig; + + fn set_config(&self, config: Self::Config) -> Result<(), Error> { + if config.bandwidth.is_none() && config.queue_config.is_none() { + // This ensures that incorrect HTTP requests will return errors. + return Err(Error::ConfigError( + "At least one of bandwidth and queue_config should be set".to_string(), + )); + } + if let Some(bandwidth) = config.bandwidth { + if bandwidth > MAX_BANDWIDTH { + return Err(Error::ConfigError( + "Bandwidth should be less than 2^64 bps".to_string(), + )); + } + info!("Setting bandwidth to: {:?}", bandwidth); + self.bandwidth.store(Box::new(bandwidth)); + } + if let Some(queue_config) = config.queue_config { + info!("Setting queue config to: {:?}", queue_config); + self.queue_config.store(Box::new(queue_config)); + } + Ok(()) + } +} + +pub struct BwDevice> { + ingress: Arc>, + egress: BwDeviceEgress, + control_interface: Arc>, +} + +impl Device

for BwDevice +where + P: Packet + Send + Sync + 'static, + Q: PacketQueue

+ 'static, +{ + type IngressType = BwDeviceIngress

; + type EgressType = BwDeviceEgress; + type ControlInterfaceType = BwDeviceControlInterface; + + fn sender(&self) -> Arc { + self.ingress.clone() + } + + fn receiver(&mut self) -> &mut Self::EgressType { + &mut self.egress + } + + fn into_receiver(self) -> Self::EgressType { + self.egress + } + + fn control_interface(&self) -> Arc { + self.control_interface.clone() + } +} + +impl BwDevice +where + P: Packet, + Q: PacketQueue

, +{ + pub fn new(bandwidth: Bandwidth, packet_queue: Q) -> BwDevice { + debug!("New BwDevice"); + let (rx, tx) = mpsc::unbounded_channel(); + let bandwidth = Arc::new(AtomicRawCell::new(Box::new(bandwidth))); + let queue_config = Arc::new(AtomicRawCell::new_null()); + BwDevice { + ingress: Arc::new(BwDeviceIngress { ingress: rx }), + egress: BwDeviceEgress { + egress: tx, + packet_queue, + queue_config: Arc::clone(&queue_config), + bandwidth: Arc::clone(&bandwidth), + inner_bandwidth: Box::new(MAX_BANDWIDTH), + next_available: Instant::now(), + timer: Timer::new().unwrap(), + }, + control_interface: Arc::new(BwDeviceControlInterface { + bandwidth, + queue_config, + }), + } + } +} diff --git a/rattan/src/devices/bandwidth.rs b/rattan/src/devices/bandwidth/queue.rs similarity index 63% rename from rattan/src/devices/bandwidth.rs rename to rattan/src/devices/bandwidth/queue.rs index 1ec00d9..1c2c17c 100644 --- a/rattan/src/devices/bandwidth.rs +++ b/rattan/src/devices/bandwidth/queue.rs @@ -1,260 +1,10 @@ -use crate::devices::{Device, Packet}; -use crate::error::Error; -use crate::metal::timer::Timer; -use crate::utils::sync::AtomicRawCell; -use async_trait::async_trait; -use netem_trace::{Bandwidth, Delay}; +use crate::devices::Packet; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fmt::Debug; -use std::sync::Arc; -use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; -use tracing::{debug, info, trace}; - -use super::{ControlInterface, Egress, Ingress}; - -pub const MAX_BANDWIDTH: Bandwidth = Bandwidth::from_bps(u64::MAX); - -pub struct BwDeviceIngress

-where - P: Packet, -{ - ingress: mpsc::UnboundedSender

, -} - -impl

Clone for BwDeviceIngress

-where - P: Packet, -{ - fn clone(&self) -> Self { - Self { - ingress: self.ingress.clone(), - } - } -} - -impl

Ingress

for BwDeviceIngress

-where - P: Packet + Send, -{ - fn enqueue(&self, packet: P) -> Result<(), Error> { - // XXX(minhuw): handle possible error here - self.ingress.send(packet).unwrap(); - Ok(()) - } -} - -// Requires the bandwidth to be less than 2^64 bps -fn transfer_time(length: usize, bandwidth: Bandwidth) -> Delay { - let bits = length * 8; - let capacity = bandwidth.as_bps() as u64; - let seconds = bits as f64 / capacity as f64; - Delay::from_secs_f64(seconds) -} - -pub struct BwDeviceEgress -where - P: Packet, - Q: PacketQueue

, -{ - egress: mpsc::UnboundedReceiver

, - packet_queue: Q, - queue_config: Arc>, - bandwidth: Arc>, - inner_bandwidth: Box, - next_available: Instant, - timer: Timer, -} - -#[async_trait] -impl Egress

for BwDeviceEgress -where - P: Packet + Send + Sync, - Q: PacketQueue

, -{ - async fn dequeue(&mut self) -> Option

{ - // wait until next_available - let now = Instant::now(); - if now < self.next_available { - self.timer.sleep(self.next_available - now).await.unwrap(); - } - // update queue config - if let Some(queue_config) = self.queue_config.swap_null() { - debug!(?queue_config, "Set queue config:"); - self.packet_queue.configure(*queue_config); - } - // process the packets received during this time - while let Ok(new_packet) = self.egress.try_recv() { - self.packet_queue.enqueue(new_packet); - } - - let mut packet = self.packet_queue.dequeue(); - while packet.is_none() { - // the queue is empty, wait for the next packet - match self.egress.recv().await { - Some(new_packet) => { - // update queue config - if let Some(queue_config) = self.queue_config.swap_null() { - debug!(?queue_config, "Set queue config:"); - self.packet_queue.configure(*queue_config); - } - self.packet_queue.enqueue(new_packet); - packet = self.packet_queue.dequeue(); - } - None => { - return None; - } - } - } - - // update bandwidth config - if let Some(bandwidth) = self.bandwidth.swap_null() { - self.inner_bandwidth = bandwidth; - debug!(?self.inner_bandwidth, "Set inner bandwidth:"); - } - - let packet = packet.unwrap(); - let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth); - if packet.get_timestamp() >= self.next_available { - // the packet arrives after next_available - self.next_available = packet.get_timestamp() + transfer_time; - } else { - // the packet arrives before next_available and now >= self.next_available - self.next_available += transfer_time; - } - Some(packet) - } -} - -#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] -#[derive(Debug)] -pub struct BwDeviceConfig -where - P: Packet, - Q: PacketQueue

, -{ - bandwidth: Option, - queue_config: Option, -} - -impl BwDeviceConfig -where - P: Packet, - Q: PacketQueue

, -{ - pub fn new>, U: Into>>( - bandwidth: T, - queue_config: U, - ) -> Self { - Self { - bandwidth: bandwidth.into(), - queue_config: queue_config.into(), - } - } -} - -pub struct BwDeviceControlInterface -where - P: Packet, - Q: PacketQueue

, -{ - bandwidth: Arc>, - queue_config: Arc>, -} - -impl ControlInterface for BwDeviceControlInterface -where - P: Packet, - Q: PacketQueue

+ 'static, -{ - type Config = BwDeviceConfig; - - fn set_config(&self, config: Self::Config) -> Result<(), Error> { - if config.bandwidth.is_none() && config.queue_config.is_none() { - // This ensures that incorrect HTTP requests will return errors. - return Err(Error::ConfigError( - "At least one of bandwidth and queue_config should be set".to_string(), - )); - } - if let Some(bandwidth) = config.bandwidth { - if bandwidth > MAX_BANDWIDTH { - return Err(Error::ConfigError( - "Bandwidth should be less than 2^64 bps".to_string(), - )); - } - info!("Setting bandwidth to: {:?}", bandwidth); - self.bandwidth.store(Box::new(bandwidth)); - } - if let Some(queue_config) = config.queue_config { - info!("Setting queue config to: {:?}", queue_config); - self.queue_config.store(Box::new(queue_config)); - } - Ok(()) - } -} - -pub struct BwDevice> { - ingress: Arc>, - egress: BwDeviceEgress, - control_interface: Arc>, -} - -impl Device

for BwDevice -where - P: Packet + Send + Sync + 'static, - Q: PacketQueue

+ 'static, -{ - type IngressType = BwDeviceIngress

; - type EgressType = BwDeviceEgress; - type ControlInterfaceType = BwDeviceControlInterface; - - fn sender(&self) -> Arc { - self.ingress.clone() - } - - fn receiver(&mut self) -> &mut Self::EgressType { - &mut self.egress - } - - fn into_receiver(self) -> Self::EgressType { - self.egress - } - - fn control_interface(&self) -> Arc { - self.control_interface.clone() - } -} - -impl BwDevice -where - P: Packet, - Q: PacketQueue

, -{ - pub fn new(bandwidth: Bandwidth, packet_queue: Q) -> BwDevice { - debug!("New BwDevice"); - let (rx, tx) = mpsc::unbounded_channel(); - let bandwidth = Arc::new(AtomicRawCell::new(Box::new(bandwidth))); - let queue_config = Arc::new(AtomicRawCell::new_null()); - BwDevice { - ingress: Arc::new(BwDeviceIngress { ingress: rx }), - egress: BwDeviceEgress { - egress: tx, - packet_queue, - queue_config: Arc::clone(&queue_config), - bandwidth: Arc::clone(&bandwidth), - inner_bandwidth: Box::new(MAX_BANDWIDTH), - next_available: Instant::now(), - timer: Timer::new().unwrap(), - }, - control_interface: Arc::new(BwDeviceControlInterface { - bandwidth, - queue_config, - }), - } - } -} +use tracing::trace; pub trait PacketQueue

: Send where diff --git a/rattan/tests/integration/bandwidth.rs b/rattan/tests/integration/bandwidth.rs index 662fec6..e0d33b7 100644 --- a/rattan/tests/integration/bandwidth.rs +++ b/rattan/tests/integration/bandwidth.rs @@ -3,8 +3,11 @@ use netem_trace::Bandwidth; use rattan::core::{RattanMachine, RattanMachineConfig}; use rattan::devices::bandwidth::{ - BwDevice, BwDeviceConfig, CODELQueue, CODELQueueConfig, DropHeadQueue, DropTailQueue, - FiniteQueueConfig, InfiniteQueue, MAX_BANDWIDTH, + queue::{ + CODELQueue, CODELQueueConfig, DropHeadQueue, DropTailQueue, FiniteQueueConfig, + InfiniteQueue, + }, + BwDevice, BwDeviceConfig, MAX_BANDWIDTH, }; use rattan::devices::external::VirtualEthernet; use rattan::devices::{ControlInterface, Device, StdPacket}; diff --git a/rattan/tests/integration/compound.rs b/rattan/tests/integration/compound.rs index 8fc446b..626afe0 100644 --- a/rattan/tests/integration/compound.rs +++ b/rattan/tests/integration/compound.rs @@ -4,7 +4,7 @@ use netem_trace::Bandwidth; use rand::rngs::StdRng; use rand::SeedableRng; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig, InfiniteQueue, MAX_BANDWIDTH}; +use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}; use rattan::devices::delay::{DelayDevice, DelayDeviceConfig}; use rattan::devices::loss::{LossDevice, LossDeviceConfig}; diff --git a/rattan/tests/integration/http.rs b/rattan/tests/integration/http.rs index 1b30210..472e2d3 100644 --- a/rattan/tests/integration/http.rs +++ b/rattan/tests/integration/http.rs @@ -4,7 +4,7 @@ use netem_trace::Bandwidth; use rand::rngs::StdRng; use rand::SeedableRng; use rattan::core::{RattanMachine, RattanMachineConfig}; -use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig, InfiniteQueue, MAX_BANDWIDTH}; +use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH}; use rattan::devices::delay::{DelayDevice, DelayDeviceConfig}; use rattan::devices::loss::{LossDevice, LossDeviceConfig};