Skip to content

Commit

Permalink
WIP: implement poll_cq for basic cq
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Yue <lukedyue@gmail.com>
  • Loading branch information
dragonJACson committed Sep 18, 2024
1 parent 6168b85 commit f9dfeb7
Showing 1 changed file with 103 additions and 3 deletions.
106 changes: 103 additions & 3 deletions src/verbs/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use std::os::raw::c_void;
use std::ptr;
use std::ptr::NonNull;
use std::{marker::PhantomData, mem::MaybeUninit};
use core::slice::Iter;

use super::device_context::DeviceContext;
use rdma_mummy_sys::{
ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_cq_init_attr_ex, ibv_create_comp_channel, ibv_create_cq, ibv_create_cq_ex,
ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq_attr, ibv_start_poll,
ibv_wc_read_byte_len, ibv_wc_read_completion_ts, ibv_wc_read_opcode, ibv_wc_read_vendor_err,
ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_wc, ibv_cq_init_attr_ex, ibv_create_comp_channel, ibv_create_cq, ibv_create_cq_ex, ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq, ibv_poll_cq_attr, ibv_start_poll, ibv_wc_read_byte_len, ibv_wc_read_completion_ts, ibv_wc_read_opcode, ibv_wc_read_vendor_err
};

#[derive(Debug)]
Expand Down Expand Up @@ -64,6 +63,28 @@ impl CompletionQueue for BasicCompletionQueue<'_> {
}
}

impl BasicCompletionQueue<'_> {
pub fn start_poll<'cq>(&'cq self) -> Result<BasicPoller<'cq>, String> {
let mut cqes = Vec::<ibv_wc>::with_capacity(32);

let ret = unsafe {
ibv_poll_cq(self.cq.as_ptr(), 32, cqes.as_mut_ptr())
};

unsafe {
match ret {
0 => Err(format!("no valid cqes")),
err if err < 0 => Err(format!("ibv_poll_cq failed, ret={err}")),
res => return Ok(BasicPoller {
cq: NonNull::new(self.cq().as_ptr()).unwrap_unchecked(),
cqes: { cqes.truncate(res as _); cqes },
_phantom: PhantomData,
})
}
}
}
}

#[derive(Debug)]
pub struct ExtendedCompletionQueue<'res> {
pub(crate) cq_ex: NonNull<ibv_cq_ex>,
Expand Down Expand Up @@ -191,6 +212,32 @@ impl<'res> CompletionQueueBuilder<'res> {

// TODO trait for both cq and cq_ex?

pub struct BasicWorkCompletion<'cq> {
current: usize,
poller: &'cq mut BasicPoller<'cq>,
_phantom: PhantomData<&'cq ()>,
}

impl<'cq> BasicWorkCompletion<'cq> {
pub fn wr_id(&self) -> u64 {
unsafe {
self.poller.cqes.get(self.current).unwrap_unchecked().wr_id
}
}

pub fn status(&self) -> u32 {
unsafe {
self.poller.cqes.get(self.current).unwrap_unchecked().status
}
}

pub fn opcode(&self) -> u32 {
unsafe {
self.poller.cqes.get(self.current).unwrap_unchecked().opcode
}
}
}

pub struct ExtendedWorkCompletion<'cq> {
cq: NonNull<ibv_cq_ex>,
_phantom: PhantomData<&'cq ()>,
Expand Down Expand Up @@ -222,6 +269,59 @@ impl<'cq> ExtendedWorkCompletion<'cq> {
}
}

pub struct BasicPoller<'cq> {
cq: NonNull<ibv_cq>,
cqes: Vec<ibv_wc>,
_phantom: PhantomData<&'cq ()>,
}

impl<'cq> BasicPoller<'cq> {
pub fn iter_mut(&'cq mut self) -> BasicWorkCompletion<'cq> {
BasicWorkCompletion {
current: 0,
poller: self,
_phantom: PhantomData,
}
}
}

impl<'cq> IntoIterator for &'cq mut BasicPoller<'cq> {
type Item = &'cq ibv_wc;
type IntoIter = BasicWorkCompletion<'cq>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}

impl<'cq> Iterator for BasicWorkCompletion<'cq> {
type Item = &'cq ibv_wc;

fn next(&mut self) -> Option<Self::Item> {
if self.current < self.poller.cqes.len() {
let result = Some(&self.poller.cqes[self.current]);
self.current += 1;
result
} else {
self.poller.cqes.clear();
let ret = unsafe {
ibv_poll_cq(self.poller.cq.as_ptr(), 32, self.poller.cqes.as_mut_ptr())
};

if ret > 0 {
unsafe {
self.poller.cqes.set_len(ret as usize);
}
self.current = 1;
// Use a pointer cast to extend the lifetime
Some(unsafe { &*(self.poller.cqes.get_unchecked(0) as *const ibv_wc) })
} else {
None
}
}
}
}

pub struct ExtendedPoller<'cq> {
cq: NonNull<ibv_cq_ex>,
_phantom: PhantomData<&'cq ()>,
Expand Down

0 comments on commit f9dfeb7

Please sign in to comment.