Skip to content

Commit

Permalink
vsock: pop all the received packets
Browse files Browse the repository at this point in the history
After receiving from vsock transport layer, pop all the vsock packet
from device and try to fill the caller's buffer as much as possible.

Signed-off-by: Jiaqi Gao <jiaqi.gao@intel.com>
  • Loading branch information
gaojiaqi7 committed Oct 23, 2024
1 parent 49f8137 commit 7bf7394
Showing 1 changed file with 52 additions and 36 deletions.
88 changes: 52 additions & 36 deletions src/devices/vsock/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,60 +326,76 @@ impl VsockStream {
}

if self.data_queue.is_empty() {
let recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;
let packet = Packet::new_checked(recv.as_slice())?;

if packet.op() == field::OP_SHUTDOWN {
self.shutdown()?;
return Ok(0);
}
if packet.op() == field::OP_RST {
self.reset()?;
return Err(VsockError::Illegal);
}
if packet.op() != field::OP_RW {
return Err(VsockError::Illegal);
}

if packet.data_len() > 0 {
let mut recv = VSOCK_DEVICE
loop {
let recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;

self.rx_cnt += packet.data_len();
if packet.data_len() as usize <= recv.len() {
recv.truncate(packet.data_len() as usize);
} else {
let packet = Packet::new_checked(recv.as_slice())?;

if packet.op() == field::OP_SHUTDOWN {
self.shutdown()?;
return Ok(0);
}
if packet.op() == field::OP_RST {
self.reset()?;
return Err(VsockError::Illegal);
}
if packet.op() != field::OP_RW {
return Err(VsockError::Illegal);
}

if packet.data_len() > 0 {
let mut recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;

self.rx_cnt += packet.data_len();
if packet.data_len() as usize <= recv.len() {
recv.truncate(packet.data_len() as usize);
} else {
return Err(VsockError::Illegal);
}

self.data_queue.push_back(recv);
self.data_queue.push_back(recv);
}

// If there are received vsock packets, continue to pop them out and insert to the
// `data_queue`. If there is no vsock packet left in the device, break the loop.
if !VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.can_recv()
{
break;
}
}
}

let mut recvd = 0;
if !self.data_queue.is_empty() {
let mut used = 0;
while !self.data_queue.is_empty() && used < buf.len() {
let head = self.data_queue.front_mut().unwrap();
if head.len() <= buf.len() {
buf[..head.len()].copy_from_slice(head);
recvd = head.len();
let free = buf.len() - used;
if head.len() <= free {
buf[used..used + head.len()].copy_from_slice(head);
used += head.len();
self.data_queue.pop_front();
} else {
buf.copy_from_slice(&head[..buf.len()]);
recvd = buf.len();
head.drain(..buf.len());
buf[used..].copy_from_slice(&head[..free]);
used += free;
head.drain(..free);
}
}

Ok(recvd)
Ok(used)
}

fn reset(&mut self) -> Result {
Expand Down

0 comments on commit 7bf7394

Please sign in to comment.