Skip to content

Commit

Permalink
feat(qp): add post_send implementation for basic qp
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Yue <lukedyue@gmail.com>
  • Loading branch information
dragonJACson committed Sep 11, 2024
1 parent 9e0c959 commit 06fdc57
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 35 deletions.
115 changes: 115 additions & 0 deletions examples/test_ibv_wr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use core::time;
use std::thread;

use sideway::verbs::{
address::AddressHandleAttribute,
device,
device_context::Mtu,
queue_pair::{PostSendGuard, QueuePair, QueuePairAttribute, QueuePairState, SetInlineData},
AccessFlags,
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
let device_list = device::DeviceList::new()?;
for device in &device_list {
let ctx = device.open().unwrap();

let pd = ctx.alloc_pd().unwrap();
let mr = pd.reg_managed_mr(64).unwrap();

let comp_channel = ctx.create_comp_channel().unwrap();
let mut cq_builder = ctx.create_cq_builder();
let mut sq = cq_builder.setup_cqe(128).build_ex().unwrap();
let rq = cq_builder.setup_cqe(128).build_ex().unwrap();

let mut builder = pd.create_qp_builder();

let mut qp = builder
.setup_max_inline_data(128)
.setup_send_cq(&sq)
.setup_recv_cq(&rq)
.build_ex()
.unwrap();

println!("qp pointer is {:?}", qp);
// modify QP to INIT state
let mut attr = QueuePairAttribute::new();
attr.setup_state(QueuePairState::Init)
.setup_pkey_index(0)
.setup_port(1)
.setup_access_flags(AccessFlags::LocalWrite.or(AccessFlags::RemoteWrite));
qp.modify(&attr).unwrap();

assert_eq!(QueuePairState::Init, qp.state());

// modify QP to RTR state, set dest qp as itself
let mut attr = QueuePairAttribute::new();
attr.setup_state(QueuePairState::ReadyToReceive)
.setup_path_mtu(Mtu::Mtu1024)
.setup_dest_qp_num(qp.qp_number())
.setup_rq_psn(1)
.setup_max_dest_read_atomic(0)
.setup_min_rnr_timer(0);
// setup address vector
let mut ah_attr = AddressHandleAttribute::new();
let gid_entries = ctx.query_gid_table().unwrap();

ah_attr
.setup_dest_lid(1)
.setup_port(1)
.setup_service_level(1)
.setup_grh_src_gid_index(gid_entries[0].gid_index().try_into().unwrap())
.setup_grh_dest_gid(&gid_entries[0].gid())
.setup_grh_hop_limit(64);
attr.setup_address_vector(&ah_attr);
qp.modify(&attr).unwrap();

assert_eq!(QueuePairState::ReadyToReceive, qp.state());

// modify QP to RTS state
let mut attr = QueuePairAttribute::new();
attr.setup_state(QueuePairState::ReadyToSend)
.setup_sq_psn(1)
.setup_timeout(12)
.setup_retry_cnt(7)
.setup_rnr_retry(7)
.setup_max_read_atomic(0);

qp.modify(&attr).unwrap();

assert_eq!(QueuePairState::ReadyToSend, qp.state());

let mut guard = qp.start_post_send().unwrap();
let buf = vec![0, 1, 2, 3];

let write_handle = guard
.construct_wr(233, 10)
.setup_write(mr.rkey(), mr.buf.data.as_ptr() as _);

// while holding a write handle, we can't build a send handle at the same time
// let send_handle = guard.construct_wr(2, 0).setup_send();

write_handle.setup_inline_data(&buf);

// while holding a post send guard, we can't build a post send guard at the same time
// let mut guard_2 = qp.start_post_send().unwrap();

let _err = guard.do_post().unwrap();

thread::sleep(time::Duration::from_millis(10));

// poll for the completion
{
let poller = sq.start_poll().unwrap();
println!("wr_id {}, status: {}", poller.wr_id(), poller.status(),);
assert_eq!(poller.wr_id(), 233);
}

unsafe {
let slice = std::slice::from_raw_parts(mr.buf.data.as_ptr(), mr.buf.len);
println!("Buffer contents: {:?}", slice);
}
}

Ok(())
}
10 changes: 3 additions & 7 deletions examples/test_post_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.setup_max_inline_data(128)
.setup_send_cq(&sq)
.setup_recv_cq(&rq)
.build_ex()
.build()
.unwrap();

println!("qp pointer is {:?}", qp);
Expand Down Expand Up @@ -101,12 +101,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// poll for the completion
{
let poller = sq.start_poll().unwrap();
println!(
"wr_id {}, status: {}, vendor err: {}",
poller.wr_id(),
poller.status(),
poller.vendor_err()
);
println!("wr_id {}, status: {}", poller.wr_id(), poller.status(),);
assert_eq!(poller.wr_id(), 233);
}

unsafe {
Expand Down
125 changes: 97 additions & 28 deletions src/verbs/queue_pair.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use bitmask_enum::bitmask;
use rdma_mummy_sys::{
ibv_access_flags, ibv_create_qp, ibv_create_qp_ex, ibv_destroy_qp, ibv_modify_qp, ibv_qp, ibv_qp_attr,
ibv_qp_attr_mask, ibv_qp_cap, ibv_qp_create_send_ops_flags, ibv_qp_ex, ibv_qp_init_attr, ibv_qp_init_attr_ex,
ibv_qp_init_attr_mask, ibv_qp_state, ibv_qp_to_qp_ex, ibv_qp_type, ibv_rx_hash_conf, ibv_wr_abort, ibv_wr_complete,
ibv_wr_opcode, ibv_wr_rdma_read, ibv_wr_rdma_write, ibv_wr_send, ibv_wr_set_inline_data, ibv_wr_start,
ibv_access_flags, ibv_create_qp, ibv_create_qp_ex, ibv_destroy_qp, ibv_modify_qp, ibv_post_send, ibv_qp,
ibv_qp_attr, ibv_qp_attr_mask, ibv_qp_cap, ibv_qp_create_send_ops_flags, ibv_qp_ex, ibv_qp_init_attr,
ibv_qp_init_attr_ex, ibv_qp_init_attr_mask, ibv_qp_state, ibv_qp_to_qp_ex, ibv_qp_type, ibv_rx_hash_conf,
ibv_send_wr, ibv_sge, ibv_wr_abort, ibv_wr_complete, ibv_wr_opcode, ibv_wr_rdma_read, ibv_wr_rdma_write,
ibv_wr_send, ibv_wr_set_inline_data, ibv_wr_start, imm_data_invalidated_rkey_union_t, rdma_t, wr_t,
};
use std::{
fmt::format,
io::{self, IoSlice},
marker::PhantomData,
mem::MaybeUninit,
ptr::{null_mut, NonNull},
ptr::{null, null_mut, NonNull},
};

use super::{
Expand Down Expand Up @@ -164,7 +165,11 @@ pub trait PostSendGuard {
// every qp should hold only one WorkRequestHandle at the same time
fn construct_wr<'g>(&'g mut self, wr_id: u64, wr_flags: u32) -> WorkRequestHandle<'g, Self>;

unsafe fn qp(&self) -> NonNull<ibv_qp>;
fn setup_send(&mut self);

fn setup_write(&mut self, rkey: u32, remote_addr: u64);

fn setup_inline_data(&mut self, buf: &[u8]);

fn do_post(self) -> Result<(), String>;
}
Expand Down Expand Up @@ -193,7 +198,14 @@ impl QueuePair for BasicQueuePair<'_> {
where
'qp: 'g,
{
todo!()
let guard = BasicPostSendGuard {
qp: self.qp,
wrs: Vec::with_capacity(0),
sges: Vec::with_capacity(0),
_phantom: PhantomData,
};

Ok(guard)
}
}

Expand Down Expand Up @@ -226,7 +238,7 @@ impl QueuePair for ExtendedQueuePair<'_> {
}

let guard = ExtendedPostSendGuard {
qp_ex: unsafe { self.qp() },
qp_ex: self.qp_ex,
_phantom: PhantomData,
};

Expand Down Expand Up @@ -517,9 +529,7 @@ pub struct WriteHandle<'g, G: PostSendGuard> {

impl<'g, G: PostSendGuard> SetInlineData for WriteHandle<'g, G> {
fn setup_inline_data(self, buf: &[u8]) {
unsafe {
ibv_wr_set_inline_data(self.guard.qp().as_ptr() as _, buf.as_ptr() as _, buf.len());
}
self.guard.setup_inline_data(buf);
}

fn setup_inline_data_list(self, buf: &[IoSlice<'_>]) {
Expand All @@ -529,41 +539,88 @@ impl<'g, G: PostSendGuard> SetInlineData for WriteHandle<'g, G> {

impl<'g, G: PostSendGuard> WorkRequestHandle<'g, G> {
pub fn setup_send(self) -> SendHandle<'g, G> {
unsafe {
ibv_wr_send(self.guard.qp().as_ptr() as _);
}
self.guard.setup_send();
SendHandle { guard: self.guard }
}

pub fn setup_write(self, rkey: u32, remote_addr: u64) -> WriteHandle<'g, G> {
unsafe {
ibv_wr_rdma_write(self.guard.qp().as_ptr() as _, rkey, remote_addr);
}
self.guard.setup_write(rkey, remote_addr);
WriteHandle { guard: self.guard }
}
}

pub struct BasicPostSendGuard<'qp> {
qp: NonNull<ibv_qp>,
wrs: Vec<ibv_send_wr>,
sges: Vec<ibv_sge>,
_phantom: PhantomData<&'qp ()>,
}

impl PostSendGuard for BasicPostSendGuard<'_> {
fn construct_wr<'qp>(&'qp mut self, wr_id: u64, wr_flags: u32) -> WorkRequestHandle<'qp, Self> {
todo!()
// let mut new_wr = Box::new(MaybeUninit::<ibv_send_wr>::zeroed());
self.wrs.push(ibv_send_wr {
wr_id,
next: null_mut(),
sg_list: null_mut(),
num_sge: 0,
opcode: 0,
send_flags: wr_flags,
..unsafe { MaybeUninit::zeroed().assume_init() }
});

WorkRequestHandle { guard: self }
}

fn do_post(self) -> Result<(), String> {
todo!()
fn setup_send(&mut self) {
self.wrs.last_mut().unwrap().opcode = WorkRequestOperationType::Send as _;
}

unsafe fn qp(&self) -> NonNull<ibv_qp> {
self.qp
fn setup_write(&mut self, rkey: u32, remote_addr: u64) {
self.wrs.last_mut().unwrap().opcode = WorkRequestOperationType::Write as _;
self.wrs.last_mut().unwrap().wr.rdma.remote_addr = remote_addr;
self.wrs.last_mut().unwrap().wr.rdma.rkey = rkey;
}

fn setup_inline_data(&mut self, buf: &[u8]) {
self.sges.push(ibv_sge {
addr: buf.as_ptr() as u64,
length: buf.len() as u32,
lkey: 0,
});

self.wrs.last_mut().unwrap().num_sge += 1;
}

fn do_post(mut self) -> Result<(), String> {
let mut sge_index = 0;

for i in 0..self.wrs.len() {
// Set up the linked list
if i < self.wrs.len() - 1 {
self.wrs[i].next = &mut self.wrs[i + 1] as *mut _;
} else {
self.wrs[i].next = null_mut();
}

// Set up the sg_list
if self.wrs[i].num_sge > 0 {
self.wrs[i].sg_list = &mut self.sges[sge_index] as *mut _;
sge_index += self.wrs[i].num_sge as usize;
}
}

let mut bad_wr: *mut ibv_send_wr = null_mut();
let ret = unsafe { ibv_post_send(self.qp.as_ptr(), self.wrs.as_mut_ptr(), &mut bad_wr) };
match ret {
0 => Ok(()),
err => Err(format!("ibv_post_send failed, ret={err}")),
}
}
}

pub struct ExtendedPostSendGuard<'qp> {
qp_ex: NonNull<ibv_qp>,
qp_ex: NonNull<ibv_qp_ex>,
_phantom: PhantomData<&'qp ()>,
}

Expand All @@ -576,18 +633,30 @@ impl PostSendGuard for ExtendedPostSendGuard<'_> {
WorkRequestHandle { guard: self }
}

fn setup_send(&mut self) {
unsafe {
ibv_wr_send(self.qp_ex.as_ptr());
}
}

fn setup_write(&mut self, rkey: u32, remote_addr: u64) {
unsafe {
ibv_wr_rdma_write(self.qp_ex.as_ptr(), rkey, remote_addr);
}
}

fn setup_inline_data(&mut self, buf: &[u8]) {
unsafe { ibv_wr_set_inline_data(self.qp_ex.as_ptr(), buf.as_ptr() as _, buf.len()) }
}

fn do_post(self) -> Result<(), String> {
let ret: i32 = unsafe { ibv_wr_complete(self.qp_ex.as_ptr() as _) };
let ret: i32 = unsafe { ibv_wr_complete(self.qp_ex.as_ptr()) };

match ret {
0 => Ok(()),
err => Err(format!("failed to ibv_wr_complete: ret {err}")),
}
}

unsafe fn qp(&self) -> NonNull<ibv_qp> {
self.qp_ex
}
}

impl Drop for ExtendedPostSendGuard<'_> {
Expand Down

0 comments on commit 06fdc57

Please sign in to comment.