diff --git a/rattan/src/devices/bandwidth.rs b/rattan/src/devices/bandwidth.rs index aa1fcc9..5f2075f 100644 --- a/rattan/src/devices/bandwidth.rs +++ b/rattan/src/devices/bandwidth.rs @@ -68,66 +68,63 @@ where timer: Timer, } -impl
BwDeviceEgress
+#[async_trait] +impl
Egress
for BwDeviceEgress
where - P: Packet, + P: Packet + Send + Sync, Q: PacketQueue
, { - // Drain the channel and enqueue the packets - // Do this after each await to ensure the correctness of the enqueue - fn drain_channel(&mut self) { + async fn dequeue(&mut self) -> Option
{ + // wait until next_available + let now = Instant::now(); + if now < self.next_available { + self.timer.sleep(self.next_available - now).await.unwrap(); + } + // update queue config if let Some(queue_config) = self.queue_config.swap_null() { debug!(?queue_config, "Set queue config:"); self.packet_queue.configure(*queue_config); } + // process the packets received during this time while let Ok(new_packet) = self.egress.try_recv() { self.packet_queue.enqueue(new_packet); } - } -} -#[async_trait] -impl
Egress
for BwDeviceEgress
-where - P: Packet + Send + Sync, - Q: PacketQueue
, -{ - async fn dequeue(&mut self) -> Option
{ - // await until the packet queue is not empty - while self.packet_queue.peek().is_none() { - if let Some(new_packet) = self.egress.recv().await { - self.packet_queue.enqueue(new_packet); + let mut packet = self.packet_queue.dequeue(); + while packet.is_none() { + // the queue is empty, wait for the next packet + match self.egress.recv().await { + Some(new_packet) => { + // update queue config + if let Some(queue_config) = self.queue_config.swap_null() { + debug!(?queue_config, "Set queue config:"); + self.packet_queue.configure(*queue_config); + } + self.packet_queue.enqueue(new_packet); + packet = self.packet_queue.dequeue(); + } + None => { + return None; + } } } - self.drain_channel(); + // update bandwidth config if let Some(bandwidth) = self.bandwidth.swap_null() { self.inner_bandwidth = bandwidth; debug!(?self.inner_bandwidth, "Set inner bandwidth:"); } - let now = Instant::now(); - if self.packet_queue.peek().unwrap().get_timestamp() >= self.next_available { - // no need to wait, since the packet arrives after next_available - let packet = self.packet_queue.dequeue().unwrap(); - let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth); + + let packet = packet.unwrap(); + let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth); + if packet.get_timestamp() >= self.next_available { + // the packet arrives after next_available self.next_available = packet.get_timestamp() + transfer_time; - Some(packet) - } else if now >= self.next_available { - // the current time is already after next_available - let packet = self.packet_queue.dequeue().unwrap(); - let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth); - self.next_available += transfer_time; - Some(packet) } else { - // wait until next_available - let wait_time = self.next_available - now; - self.timer.sleep(wait_time).await.unwrap(); - self.drain_channel(); - let packet = self.packet_queue.dequeue().unwrap(); // get the real outgoing packet - let transfer_time = transfer_time(packet.length(), *self.inner_bandwidth); + // the packet arrives before next_available and now >= self.next_available self.next_available += transfer_time; - Some(packet) } + Some(packet) } } @@ -274,11 +271,6 @@ where // If the queue is empty, return `None` fn dequeue(&mut self) -> Option
; - - // Get the reference to the first packet in the queue without removing it. - // This it useful for the BwDeviceEgress to check the next packet infomation without dequeue it actually. - // If the queue is empty, return `None` - fn peek(&self) -> Option<&P>; } #[derive(Debug)] @@ -315,10 +307,6 @@ where fn dequeue(&mut self) -> Option
{ self.queue.pop_front() } - - fn peek(&self) -> Option<&P> { - self.queue.front() - } } #[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] @@ -392,10 +380,6 @@ where None => None, } } - - fn peek(&self) -> Option<&P> { - self.queue.front() - } } #[derive(Debug)] @@ -451,8 +435,4 @@ where None => None, } } - - fn peek(&self) -> Option<&P> { - self.queue.front() - } } diff --git a/rattan/tests/integration/bandwidth.rs b/rattan/tests/integration/bandwidth.rs index 504cec7..40b054a 100644 --- a/rattan/tests/integration/bandwidth.rs +++ b/rattan/tests/integration/bandwidth.rs @@ -338,8 +338,8 @@ fn test_droptail_queue() { for i in 0..12 { assert!(recv_indexs[i] == i as u8); } - assert!(18 <= recv_indexs[12] && recv_indexs[12] <= 20); - assert!(26 <= recv_indexs[13] && recv_indexs[13] <= 30); + assert!(17 <= recv_indexs[12] && recv_indexs[12] <= 20); + assert!(25 <= recv_indexs[13] && recv_indexs[13] <= 30); info!("Test DropTailQueue (500 Bytes limit)"); info!("Set bandwidth to 40kbps (50B per 10ms)"); @@ -364,8 +364,8 @@ fn test_droptail_queue() { for i in 0..12 { assert!(recv_indexs[i] == i as u8); } - assert!(18 <= recv_indexs[12] && recv_indexs[12] <= 20); - assert!(26 <= recv_indexs[13] && recv_indexs[13] <= 30); + assert!(17 <= recv_indexs[12] && recv_indexs[12] <= 20); + assert!(25 <= recv_indexs[13] && recv_indexs[13] <= 30); server_cancel_token.cancel(); }