Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cq): implement generic CQ #41

Merged
merged 1 commit into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/verbs/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@
pub fn opcode(&self) -> u32 {
self.wc.opcode
}

pub fn vendor_err(&self) -> u32 {
self.wc.vendor_err
}

Check warning on line 403 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L401-L403

Added lines #L401 - L403 were not covered by tests

pub fn byte_len(&self) -> u32 {
self.wc.byte_len
}

Check warning on line 407 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L405-L407

Added lines #L405 - L407 were not covered by tests
}

pub struct ExtendedWorkCompletion<'iter> {
Expand Down Expand Up @@ -553,6 +561,102 @@
}
}

#[derive(Debug)]
pub enum GenericCompletionQueue<'cq> {
/// Variant for a Basic CQ
Basic(BasicCompletionQueue<'cq>),
/// Variant for an Extended CQ
Extended(ExtendedCompletionQueue<'cq>),
}

impl CompletionQueue for GenericCompletionQueue<'_> {
unsafe fn cq(&self) -> NonNull<ibv_cq> {
match self {
GenericCompletionQueue::Basic(cq) => cq.cq(),
GenericCompletionQueue::Extended(cq) => cq.cq(),
}
}
}

pub enum GenericPoller<'cq> {
Basic(BasicPoller<'cq>),
Extended(ExtendedPoller<'cq>),
}

impl GenericCompletionQueue<'_> {
pub fn start_poll(&self) -> Result<GenericPoller<'_>, String> {
match self {
GenericCompletionQueue::Basic(cq) => cq.start_poll().map(GenericPoller::Basic),
GenericCompletionQueue::Extended(cq) => cq.start_poll().map(GenericPoller::Extended),
}
}
}

impl<'cq> Iterator for GenericPoller<'cq> {
type Item = GenericWorkCompletion<'cq>;

fn next(&mut self) -> Option<Self::Item> {
match self {
GenericPoller::Basic(poller) => poller.next().map(GenericWorkCompletion::Basic),
GenericPoller::Extended(poller) => poller.next().map(GenericWorkCompletion::Extended),
}
}
}

pub enum GenericWorkCompletion<'iter> {
Basic(BasicWorkCompletion<'iter>),
Extended(ExtendedWorkCompletion<'iter>),
}

impl<'iter> GenericWorkCompletion<'iter> {
pub fn wr_id(&self) -> u64 {
match self {
GenericWorkCompletion::Basic(wc) => wc.wr_id(),
GenericWorkCompletion::Extended(wc) => wc.wr_id(),
}
}

pub fn status(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.status(),
GenericWorkCompletion::Extended(wc) => wc.status(),
}
}

pub fn opcode(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.opcode(),
GenericWorkCompletion::Extended(wc) => wc.opcode(),
}
}

pub fn vendor_err(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.vendor_err(),
GenericWorkCompletion::Extended(wc) => wc.vendor_err(),

Check warning on line 636 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L633-L636

Added lines #L633 - L636 were not covered by tests
}
}

Check warning on line 638 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L638

Added line #L638 was not covered by tests

pub fn byte_len(&self) -> u32 {
match self {
GenericWorkCompletion::Basic(wc) => wc.byte_len(),
GenericWorkCompletion::Extended(wc) => wc.byte_len(),

Check warning on line 643 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L640-L643

Added lines #L640 - L643 were not covered by tests
}
}

Check warning on line 645 in src/verbs/completion.rs

View check run for this annotation

Codecov / codecov/patch

src/verbs/completion.rs#L645

Added line #L645 was not covered by tests
}

impl<'cq> From<BasicCompletionQueue<'cq>> for GenericCompletionQueue<'cq> {
fn from(cq: BasicCompletionQueue<'cq>) -> Self {
GenericCompletionQueue::Basic(cq)
}
}

impl<'cq> From<ExtendedCompletionQueue<'cq>> for GenericCompletionQueue<'cq> {
fn from(cq: ExtendedCompletionQueue<'cq>) -> Self {
GenericCompletionQueue::Extended(cq)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
39 changes: 20 additions & 19 deletions tests/test_post_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use core::time;
use std::{io::IoSlice, thread};

use sideway::verbs::completion::GenericCompletionQueue;
use sideway::verbs::queue_pair::GenericQueuePair;
use sideway::verbs::{
address::{AddressHandleAttribute, GidType},
Expand All @@ -18,9 +19,11 @@ use sideway::verbs::{
use rstest::rstest;

#[rstest]
#[case(true)]
#[case(false)]
fn main(#[case] use_qp_ex: bool) -> Result<(), Box<dyn std::error::Error>> {
#[case(true, true)]
#[case(false, true)]
#[case(true, false)]
#[case(false, false)]
fn main(#[case] use_qp_ex: bool, #[case] use_cq_ex: bool) -> Result<(), Box<dyn std::error::Error>> {
let device_list = device::DeviceList::new()?;
for device in &device_list {
let ctx = device.open().unwrap();
Expand All @@ -31,27 +34,25 @@ fn main(#[case] use_qp_ex: bool) -> Result<(), Box<dyn std::error::Error>> {

let _comp_channel = ctx.create_comp_channel().unwrap();
let mut cq_builder = ctx.create_cq_builder();
let sq = cq_builder.setup_cqe(128).build().unwrap();
let rq = cq_builder.setup_cqe(128).build().unwrap();
cq_builder.setup_cqe(128);
let sq : GenericCompletionQueue = if use_cq_ex {
cq_builder.build_ex().unwrap().into()
} else {
cq_builder.build().unwrap().into()
};
let rq : GenericCompletionQueue = if use_cq_ex {
cq_builder.build_ex().unwrap().into()
} else {
cq_builder.build().unwrap().into()
};

let mut builder = pd.create_qp_builder();
builder.setup_max_inline_data(128).setup_send_cq(&sq).setup_recv_cq(&rq);

let mut qp: GenericQueuePair = if use_qp_ex {
builder
.setup_max_inline_data(128)
.setup_send_cq(&sq)
.setup_recv_cq(&rq)
.build_ex()
.unwrap()
.into()
builder.build_ex().unwrap().into()
} else {
builder
.setup_max_inline_data(128)
.setup_send_cq(&sq)
.setup_recv_cq(&rq)
.build()
.unwrap()
.into()
builder.build().unwrap().into()
};

println!("qp pointer is {:?}", qp);
Expand Down