Skip to content

Commit

Permalink
[WIP] futex
Browse files Browse the repository at this point in the history
  • Loading branch information
SidongYang authored and realwakka committed Sep 9, 2024
1 parent 0156395 commit 8fd2efd
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ keywords = ["async", "fs", "io-uring"]
tokio = { version = "1.2", features = ["net", "rt", "sync"] }
slab = "0.4.2"
libc = "0.2.80"
io-uring = "0.6.0"
io-uring = "0.6.4"
socket2 = { version = "0.4.4", features = ["all"] }
bytes = { version = "1.0", optional = true }
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
Expand Down
15 changes: 15 additions & 0 deletions src/fs/futex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::io::futex::{UnsubmittedFutexWait, UnsubmittedFutexWake};

#[allow(missing_docs)]
pub struct Futex;
impl Futex {
#[allow(missing_docs)]
pub fn wait(futex: *const u32, val: u64, mask: u64, futex_flags: u32) -> UnsubmittedFutexWait {
UnsubmittedFutexWait::wait(futex, val, mask, futex_flags)
}

#[allow(missing_docs)]
pub fn wake(futex: *const u32, val: u64, mask: u64, futex_flags: u32) -> UnsubmittedFutexWake {
UnsubmittedFutexWake::wake(futex, val, mask, futex_flags)
}
}
3 changes: 3 additions & 0 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ mod statx;
pub use statx::is_dir_regfile;
pub use statx::statx;
pub use statx::StatxBuilder;

mod futex;
pub use futex::Futex;
69 changes: 69 additions & 0 deletions src/io/futex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use io_uring::{cqueue::Entry, opcode};

use crate::{OneshotOutputTransform, UnsubmittedOneshot};

#[allow(missing_docs)]
pub type UnsubmittedFutexWait = UnsubmittedOneshot<FutexWaitData, FutexWaitTransform>;

#[allow(missing_docs)]
pub struct FutexWaitData {}

#[allow(missing_docs)]
pub struct FutexWaitTransform {}

impl OneshotOutputTransform for FutexWaitTransform {
type Output = Result<(), std::io::Error>;
type StoredData = FutexWaitData;

fn transform_oneshot_output(self, _data: Self::StoredData, cqe: Entry) -> Self::Output {
let n = cqe.result();
if n >= 0 {
Ok(())
} else {
Err(std::io::Error::from_raw_os_error(-n))
}
}
}

impl UnsubmittedFutexWait {
pub(crate) fn wait(futex: *const u32, val: u64, mask: u64, futex_flags: u32) -> Self {
Self::new(
FutexWaitData {},
FutexWaitTransform {},
opcode::FutexWait::new(futex, val, mask, futex_flags).build(),
)
}
}

#[allow(missing_docs)]
pub type UnsubmittedFutexWake = UnsubmittedOneshot<FutexWakeData, FutexWakeTransform>;

#[allow(missing_docs)]
pub struct FutexWakeData {}

#[allow(missing_docs)]
pub struct FutexWakeTransform {}

impl OneshotOutputTransform for FutexWakeTransform {
type Output = Result<i32, std::io::Error>;
type StoredData = FutexWakeData;

fn transform_oneshot_output(self, _data: Self::StoredData, cqe: Entry) -> Self::Output {
let n = cqe.result();
if n >= 0 {
Ok(n)
} else {
Err(std::io::Error::from_raw_os_error(-n))
}
}
}

impl UnsubmittedFutexWake {
pub(crate) fn wake(futex: *const u32, val: u64, mask: u64, futex_flags: u32) -> Self {
Self::new(
FutexWakeData {},
FutexWakeTransform {},
opcode::FutexWake::new(futex, val, mask, futex_flags).build(),
)
}
}
2 changes: 2 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ pub(crate) mod writev;

mod writev_all;
pub(crate) use writev_all::writev_at_all;

pub mod futex;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub mod buf;
pub mod fs;
pub mod net;

pub use io::futex::*;
pub use io::read::*;
pub use io::readv::*;
pub use io::write::*;
Expand Down
30 changes: 30 additions & 0 deletions tests/futex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use libc::FUTEX_BITSET_MATCH_ANY;
use tokio::task::spawn_local;
use tokio_uring::Submit;

const FUTEX2_SIZE_U32: u32 = 0x2;

#[test]
fn mutex_test() {
tokio_uring::start(async {
let f = Box::new(0u32);
let ptr: *const u32 = &*f;

let ptr_clone = ptr.clone();
let mask = FUTEX_BITSET_MATCH_ANY as u32 as u64;
let handle = spawn_local(async move {
tokio_uring::fs::Futex::wait(ptr_clone, 0, mask, FUTEX2_SIZE_U32)
.submit()
.await
.unwrap();
});
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let res = tokio_uring::fs::Futex::wake(ptr, 1, mask, FUTEX2_SIZE_U32)
.submit()
.await
.unwrap();
assert!(res == 1);

handle.await.unwrap();
});
}

0 comments on commit 8fd2efd

Please sign in to comment.