Skip to content

Commit

Permalink
feat(cq): implement generic CQ
Browse files Browse the repository at this point in the history
  • Loading branch information
FujiZ committed Oct 27, 2024
1 parent f90a5ba commit 989d74e
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 19 deletions.
104 changes: 104 additions & 0 deletions src/verbs/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@ impl<'iter> BasicWorkCompletion<'iter> {
pub fn opcode(&self) -> u32 {
self.wc.opcode
}

pub fn vendor_err(&self) -> u32 {
self.wc.vendor_err
}

Check warning on line 403 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L401-L403

Added lines #L401 - L403 were not covered by tests

pub fn byte_len(&self) -> u32 {
self.wc.byte_len
}

Check warning on line 407 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L405-L407

Added lines #L405 - L407 were not covered by tests
}

pub struct ExtendedWorkCompletion<'iter> {
Expand Down Expand Up @@ -553,6 +561,102 @@ impl<'cq> Iterator for ExtendedPoller<'cq> {
}
}

#[derive(Debug)]
pub enum GenericCompletionQueue<'cq> {
/// Variant for a Basic CQ
Basic(BasicCompletionQueue<'cq>),
/// Variant for an Extended CQ
Extended(ExtendedCompletionQueue<'cq>),
}

impl CompletionQueue for GenericCompletionQueue<'_> {
unsafe fn cq(&self) -> NonNull<ibv_cq> {
match self {
GenericCompletionQueue::Basic(cq) => cq.cq(),
GenericCompletionQueue::Extended(cq) => cq.cq(),
}
}
}

pub enum GenericPoller<'cq> {
Basic(BasicPoller<'cq>),
Extended(ExtendedPoller<'cq>),
}

impl GenericCompletionQueue<'_> {
pub fn start_poll(&self) -> Result<GenericPoller<'_>, String> {
match self {
GenericCompletionQueue::Basic(cq) => cq.start_poll().map(GenericPoller::Basic),
GenericCompletionQueue::Extended(cq) => cq.start_poll().map(GenericPoller::Extended),
}
}
}

impl<'cq> Iterator for GenericPoller<'cq> {
type Item = GenericWorkCompletion<'cq>;

fn next(&mut self) -> Option<Self::Item> {
match self {
GenericPoller::Basic(poller) => poller.next().map(GenericWorkCompletion::Basic),
GenericPoller::Extended(poller) => poller.next().map(GenericWorkCompletion::Extended),
}
}
}

pub enum GenericWorkCompletion<'iter> {
Basic(BasicWorkCompletion<'iter>),
Extended(ExtendedWorkCompletion<'iter>),
}

impl<'iter> GenericWorkCompletion<'iter> {
pub fn wr_id(&self) -> u64 {
match self {
GenericWorkCompletion::Basic(wc) => wc.wr_id(),
GenericWorkCompletion::Extended(wc) => wc.wr_id(),
}
}

pub fn status(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.status(),
GenericWorkCompletion::Extended(wc) => wc.status(),
}
}

pub fn opcode(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.opcode(),
GenericWorkCompletion::Extended(wc) => wc.opcode(),
}
}

pub fn vendor_err(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.vendor_err(),
GenericWorkCompletion::Extended(wc) => wc.vendor_err(),

Check warning on line 636 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L633-L636

Added lines #L633 - L636 were not covered by tests
}
}

Check warning on line 638 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L638

Added line #L638 was not covered by tests

pub fn byte_len(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.byte_len(),
GenericWorkCompletion::Extended(wc) => wc.byte_len(),

Check warning on line 643 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L640-L643

Added lines #L640 - L643 were not covered by tests
}
}

Check warning on line 645 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L645

Added line #L645 was not covered by tests
}

impl<'cq> From<BasicCompletionQueue<'cq>> for GenericCompletionQueue<'cq> {
fn from(cq: BasicCompletionQueue<'cq>) -> Self {
GenericCompletionQueue::Basic(cq)
}
}

impl<'cq> From<ExtendedCompletionQueue<'cq>> for GenericCompletionQueue<'cq> {
fn from(cq: ExtendedCompletionQueue<'cq>) -> Self {
GenericCompletionQueue::Extended(cq)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
39 changes: 20 additions & 19 deletions tests/test_post_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use core::time;
use std::{io::IoSlice, thread};

use sideway::verbs::completion::GenericCompletionQueue;
use sideway::verbs::queue_pair::GenericQueuePair;
use sideway::verbs::{
address::{AddressHandleAttribute, GidType},
Expand All @@ -18,9 +19,11 @@ use sideway::verbs::{
use rstest::rstest;

#[rstest]
#[case(true)]
#[case(false)]
fn main(#[case] use_qp_ex: bool) -> Result<(), Box<dyn std::error::Error>> {
#[case(true, true)]
#[case(false, true)]
#[case(true, false)]
#[case(false, false)]
fn main(#[case] use_qp_ex: bool, #[case] use_cq_ex: bool) -> Result<(), Box<dyn std::error::Error>> {
let device_list = device::DeviceList::new()?;
for device in &device_list {
let ctx = device.open().unwrap();
Expand All @@ -31,27 +34,25 @@ fn main(#[case] use_qp_ex: bool) -> Result<(), Box<dyn std::error::Error>> {

let _comp_channel = ctx.create_comp_channel().unwrap();
let mut cq_builder = ctx.create_cq_builder();
let sq = cq_builder.setup_cqe(128).build().unwrap();
let rq = cq_builder.setup_cqe(128).build().unwrap();
cq_builder.setup_cqe(128);
let sq : GenericCompletionQueue = if use_cq_ex {
cq_builder.build_ex().unwrap().into()
} else {
cq_builder.build().unwrap().into()
};
let rq : GenericCompletionQueue = if use_cq_ex {
cq_builder.build_ex().unwrap().into()
} else {
cq_builder.build().unwrap().into()
};

let mut builder = pd.create_qp_builder();
builder.setup_max_inline_data(128).setup_send_cq(&sq).setup_recv_cq(&rq);

let mut qp: GenericQueuePair = if use_qp_ex {
builder
.setup_max_inline_data(128)
.setup_send_cq(&sq)
.setup_recv_cq(&rq)
.build_ex()
.unwrap()
.into()
builder.build_ex().unwrap().into()
} else {
builder
.setup_max_inline_data(128)
.setup_send_cq(&sq)
.setup_recv_cq(&rq)
.build()
.unwrap()
.into()
builder.build().unwrap().into()
};

println!("qp pointer is {:?}", qp);
Expand Down

0 comments on commit 989d74e

Please sign in to comment.