Skip to content

Commit

Permalink
feat(device): remove peek function in PacketQueue
Browse files Browse the repository at this point in the history
Ref: #6
  • Loading branch information
Centaurus99 committed Mar 2, 2024
1 parent a866a45 commit d6d499d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 59 deletions.
90 changes: 35 additions & 55 deletions rattan/src/devices/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,66 +68,63 @@ where
timer: Timer,
}

impl<P, Q> BwDeviceEgress<P, Q>
#[async_trait]
impl<P, Q> Egress<P> for BwDeviceEgress<P, Q>
where
P: Packet,
P: Packet + Send + Sync,
Q: PacketQueue<P>,
{
// Drain the channel and enqueue the packets
// Do this after each await to ensure the correctness of the enqueue
fn drain_channel(&mut self) {
async fn dequeue(&mut self) -> Option<P> {
// wait until next_available
let now = Instant::now();
if now < self.next_available {
self.timer.sleep(self.next_available - now).await.unwrap();
}
// update queue config
if let Some(queue_config) = self.queue_config.swap_null() {
debug!(?queue_config, "Set queue config:");
self.packet_queue.configure(*queue_config);
}
// process the packets received during this time
while let Ok(new_packet) = self.egress.try_recv() {
self.packet_queue.enqueue(new_packet);
}
}
}

#[async_trait]
impl<P, Q> Egress<P> for BwDeviceEgress<P, Q>
where
P: Packet + Send + Sync,
Q: PacketQueue<P>,
{
async fn dequeue(&mut self) -> Option<P> {
// await until the packet queue is not empty
while self.packet_queue.peek().is_none() {
if let Some(new_packet) = self.egress.recv().await {
self.packet_queue.enqueue(new_packet);
let mut packet = self.packet_queue.dequeue();
while packet.is_none() {
// the queue is empty, wait for the next packet
match self.egress.recv().await {
Some(new_packet) => {
// update queue config
if let Some(queue_config) = self.queue_config.swap_null() {
debug!(?queue_config, "Set queue config:");
self.packet_queue.configure(*queue_config);
}
self.packet_queue.enqueue(new_packet);
packet = self.packet_queue.dequeue();
}
None => {
return None;
}
}
}
self.drain_channel();

// update bandwidth config
if let Some(bandwidth) = self.bandwidth.swap_null() {
self.inner_bandwidth = bandwidth;
debug!(?self.inner_bandwidth, "Set inner bandwidth:");
}
let now = Instant::now();
if self.packet_queue.peek().unwrap().get_timestamp() >= self.next_available {
// no need to wait, since the packet arrives after next_available
let packet = self.packet_queue.dequeue().unwrap();
let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth);

let packet = packet.unwrap();
let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth);
if packet.get_timestamp() >= self.next_available {
// the packet arrives after next_available
self.next_available = packet.get_timestamp() + transfer_time;
Some(packet)
} else if now >= self.next_available {
// the current time is already after next_available
let packet = self.packet_queue.dequeue().unwrap();
let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth);
self.next_available += transfer_time;
Some(packet)
} else {
// wait until next_available
let wait_time = self.next_available - now;
self.timer.sleep(wait_time).await.unwrap();
self.drain_channel();
let packet = self.packet_queue.dequeue().unwrap(); // get the real outgoing packet
let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth);
// the packet arrives before next_available and now >= self.next_available
self.next_available += transfer_time;
Some(packet)
}
Some(packet)
}
}

Expand Down Expand Up @@ -274,11 +271,6 @@ where

// If the queue is empty, return `None`
fn dequeue(&mut self) -> Option<P>;

// Get the reference to the first packet in the queue without removing it.
// This it useful for the BwDeviceEgress to check the next packet infomation without dequeue it actually.
// If the queue is empty, return `None`
fn peek(&self) -> Option<&P>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -315,10 +307,6 @@ where
fn dequeue(&mut self) -> Option<P> {
self.queue.pop_front()
}

fn peek(&self) -> Option<&P> {
self.queue.front()
}
}

#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
Expand Down Expand Up @@ -392,10 +380,6 @@ where
None => None,
}
}

fn peek(&self) -> Option<&P> {
self.queue.front()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -451,8 +435,4 @@ where
None => None,
}
}

fn peek(&self) -> Option<&P> {
self.queue.front()
}
}
8 changes: 4 additions & 4 deletions rattan/tests/integration/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ fn test_droptail_queue() {
for i in 0..12 {
assert!(recv_indexs[i] == i as u8);
}
assert!(18 <= recv_indexs[12] && recv_indexs[12] <= 20);
assert!(26 <= recv_indexs[13] && recv_indexs[13] <= 30);
assert!(17 <= recv_indexs[12] && recv_indexs[12] <= 20);
assert!(25 <= recv_indexs[13] && recv_indexs[13] <= 30);

info!("Test DropTailQueue (500 Bytes limit)");
info!("Set bandwidth to 40kbps (50B per 10ms)");
Expand All @@ -364,8 +364,8 @@ fn test_droptail_queue() {
for i in 0..12 {
assert!(recv_indexs[i] == i as u8);
}
assert!(18 <= recv_indexs[12] && recv_indexs[12] <= 20);
assert!(26 <= recv_indexs[13] && recv_indexs[13] <= 30);
assert!(17 <= recv_indexs[12] && recv_indexs[12] <= 20);
assert!(25 <= recv_indexs[13] && recv_indexs[13] <= 30);

server_cancel_token.cancel();
}
Expand Down

0 comments on commit d6d499d

Please sign in to comment.