Skip to content

Commit

Permalink
refactor(device): split queue from bandwidth device module
Browse files Browse the repository at this point in the history
Refs: #6
  • Loading branch information
Centaurus99 committed Mar 4, 2024
1 parent c339e0e commit 638cd49
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 259 deletions.
2 changes: 1 addition & 1 deletion rattan-cli/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rand::{rngs::StdRng, SeedableRng};
use rattan::{
core::{RattanMachine, RattanMachineConfig},
devices::{
bandwidth::{BwDevice, BwDeviceConfig, InfiniteQueue, MAX_BANDWIDTH},
bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH},
external::VirtualEthernet,
},
env::get_container_env,
Expand Down
2 changes: 1 addition & 1 deletion rattan-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use rand::rngs::StdRng;
use rand::SeedableRng;
use rattan::core::{RattanMachine, RattanMachineConfig};
use rattan::devices::bandwidth::{BwDevice, BwDeviceConfig, InfiniteQueue, MAX_BANDWIDTH};
use rattan::devices::bandwidth::{queue::InfiniteQueue, BwDevice, BwDeviceConfig, MAX_BANDWIDTH};
use rattan::devices::delay::{DelayDevice, DelayDeviceConfig};
use rattan::devices::external::VirtualEthernet;
use rattan::devices::loss::{IIDLossDevice, IIDLossDeviceConfig};
Expand Down
2 changes: 1 addition & 1 deletion rattan/benches/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use rattan::{
core::{RattanMachine, RattanMachineConfig},
devices::{
bandwidth::{BwDevice, InfiniteQueue, MAX_BANDWIDTH},
bandwidth::{queue::InfiniteQueue, BwDevice, MAX_BANDWIDTH},
external::VirtualEthernet,
StdPacket,
},
Expand Down
259 changes: 259 additions & 0 deletions rattan/src/devices/bandwidth/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
use crate::devices::bandwidth::queue::PacketQueue;
use crate::devices::{Device, Packet};
use crate::error::Error;
use crate::metal::timer::Timer;
use crate::utils::sync::AtomicRawCell;
use async_trait::async_trait;
use netem_trace::{Bandwidth, Delay};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tracing::{debug, info};

use super::{ControlInterface, Egress, Ingress};

pub mod queue;

pub const MAX_BANDWIDTH: Bandwidth = Bandwidth::from_bps(u64::MAX);

pub struct BwDeviceIngress<P>
where
P: Packet,
{
ingress: mpsc::UnboundedSender<P>,
}

impl<P> Clone for BwDeviceIngress<P>
where
P: Packet,
{
fn clone(&self) -> Self {
Self {
ingress: self.ingress.clone(),
}
}
}

impl<P> Ingress<P> for BwDeviceIngress<P>
where
P: Packet + Send,
{
fn enqueue(&self, packet: P) -> Result<(), Error> {
// XXX(minhuw): handle possible error here
self.ingress.send(packet).unwrap();
Ok(())
}
}

// Requires the bandwidth to be less than 2^64 bps
fn transfer_time(length: usize, bandwidth: Bandwidth) -> Delay {
let bits = length * 8;
let capacity = bandwidth.as_bps() as u64;
let seconds = bits as f64 / capacity as f64;
Delay::from_secs_f64(seconds)
}

pub struct BwDeviceEgress<P, Q>
where
P: Packet,
Q: PacketQueue<P>,
{
egress: mpsc::UnboundedReceiver<P>,
packet_queue: Q,
queue_config: Arc<AtomicRawCell<Q::Config>>,
bandwidth: Arc<AtomicRawCell<Bandwidth>>,
inner_bandwidth: Box<Bandwidth>,
next_available: Instant,
timer: Timer,
}

#[async_trait]
impl<P, Q> Egress<P> for BwDeviceEgress<P, Q>
where
P: Packet + Send + Sync,
Q: PacketQueue<P>,
{
async fn dequeue(&mut self) -> Option<P> {
// wait until next_available
let now = Instant::now();
if now < self.next_available {
self.timer.sleep(self.next_available - now).await.unwrap();
}
// update queue config
if let Some(queue_config) = self.queue_config.swap_null() {
debug!(?queue_config, "Set queue config:");
self.packet_queue.configure(*queue_config);
}
// process the packets received during this time
while let Ok(new_packet) = self.egress.try_recv() {
self.packet_queue.enqueue(new_packet);
}

let mut packet = self.packet_queue.dequeue();
while packet.is_none() {
// the queue is empty, wait for the next packet
match self.egress.recv().await {
Some(new_packet) => {
// update queue config
if let Some(queue_config) = self.queue_config.swap_null() {
debug!(?queue_config, "Set queue config:");
self.packet_queue.configure(*queue_config);
}
self.packet_queue.enqueue(new_packet);
packet = self.packet_queue.dequeue();
}
None => {
return None;
}
}
}

// update bandwidth config
if let Some(bandwidth) = self.bandwidth.swap_null() {
self.inner_bandwidth = bandwidth;
debug!(?self.inner_bandwidth, "Set inner bandwidth:");
}

let packet = packet.unwrap();
let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth);
if packet.get_timestamp() >= self.next_available {
// the packet arrives after next_available
self.next_available = packet.get_timestamp() + transfer_time;
} else {
// the packet arrives before next_available and now >= self.next_available
self.next_available += transfer_time;
}
Some(packet)
}
}

#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
#[derive(Debug)]
pub struct BwDeviceConfig<P, Q>
where
P: Packet,
Q: PacketQueue<P>,
{
bandwidth: Option<Bandwidth>,
queue_config: Option<Q::Config>,
}

impl<P, Q> BwDeviceConfig<P, Q>
where
P: Packet,
Q: PacketQueue<P>,
{
pub fn new<T: Into<Option<Bandwidth>>, U: Into<Option<Q::Config>>>(
bandwidth: T,
queue_config: U,
) -> Self {
Self {
bandwidth: bandwidth.into(),
queue_config: queue_config.into(),
}
}
}

pub struct BwDeviceControlInterface<P, Q>
where
P: Packet,
Q: PacketQueue<P>,
{
bandwidth: Arc<AtomicRawCell<Bandwidth>>,
queue_config: Arc<AtomicRawCell<Q::Config>>,
}

impl<P, Q> ControlInterface for BwDeviceControlInterface<P, Q>
where
P: Packet,
Q: PacketQueue<P> + 'static,
{
type Config = BwDeviceConfig<P, Q>;

fn set_config(&self, config: Self::Config) -> Result<(), Error> {
if config.bandwidth.is_none() && config.queue_config.is_none() {
// This ensures that incorrect HTTP requests will return errors.
return Err(Error::ConfigError(
"At least one of bandwidth and queue_config should be set".to_string(),
));
}
if let Some(bandwidth) = config.bandwidth {
if bandwidth > MAX_BANDWIDTH {
return Err(Error::ConfigError(
"Bandwidth should be less than 2^64 bps".to_string(),
));
}
info!("Setting bandwidth to: {:?}", bandwidth);
self.bandwidth.store(Box::new(bandwidth));
}
if let Some(queue_config) = config.queue_config {
info!("Setting queue config to: {:?}", queue_config);
self.queue_config.store(Box::new(queue_config));
}
Ok(())
}
}

pub struct BwDevice<P: Packet, Q: PacketQueue<P>> {
ingress: Arc<BwDeviceIngress<P>>,
egress: BwDeviceEgress<P, Q>,
control_interface: Arc<BwDeviceControlInterface<P, Q>>,
}

impl<P, Q> Device<P> for BwDevice<P, Q>
where
P: Packet + Send + Sync + 'static,
Q: PacketQueue<P> + 'static,
{
type IngressType = BwDeviceIngress<P>;
type EgressType = BwDeviceEgress<P, Q>;
type ControlInterfaceType = BwDeviceControlInterface<P, Q>;

fn sender(&self) -> Arc<Self::IngressType> {
self.ingress.clone()
}

fn receiver(&mut self) -> &mut Self::EgressType {
&mut self.egress
}

fn into_receiver(self) -> Self::EgressType {
self.egress
}

fn control_interface(&self) -> Arc<Self::ControlInterfaceType> {
self.control_interface.clone()
}
}

impl<P, Q> BwDevice<P, Q>
where
P: Packet,
Q: PacketQueue<P>,
{
pub fn new(bandwidth: Bandwidth, packet_queue: Q) -> BwDevice<P, Q> {
debug!("New BwDevice");
let (rx, tx) = mpsc::unbounded_channel();
let bandwidth = Arc::new(AtomicRawCell::new(Box::new(bandwidth)));
let queue_config = Arc::new(AtomicRawCell::new_null());
BwDevice {
ingress: Arc::new(BwDeviceIngress { ingress: rx }),
egress: BwDeviceEgress {
egress: tx,
packet_queue,
queue_config: Arc::clone(&queue_config),
bandwidth: Arc::clone(&bandwidth),
inner_bandwidth: Box::new(MAX_BANDWIDTH),
next_available: Instant::now(),
timer: Timer::new().unwrap(),
},
control_interface: Arc::new(BwDeviceControlInterface {
bandwidth,
queue_config,
}),
}
}
}
Loading

0 comments on commit 638cd49

Please sign in to comment.