Skip to content

Commit

Permalink
Merge pull request #10 from jiangliu/tokio-runtime
Browse files Browse the repository at this point in the history
file: use the extended tokio-uring crate
  • Loading branch information
jiangliu authored Jul 6, 2022
2 parents 03469ef + e489c3e commit ec8d63d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ vmm-sys-util = "0.9.0"
tokio = { version = "1", features = ["macros"] }

[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = "0.3"
tokio-uring = { git = "https://github.com/jiangliu/tokio-uring.git", branch = "dbs-fuse" }

[features]
async-io = ["async-trait", "futures", "tokio/fs", "tokio/sync"]
Expand Down
60 changes: 40 additions & 20 deletions src/async_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl File {
let ty = CURRENT_RUNTIME.with(|rt| match rt {
Runtime::Tokio(_) => 1,
#[cfg(target_os = "linux")]
Runtime::Uring => 2,
Runtime::Uring(_) => 2,
});

match ty {
Expand Down Expand Up @@ -91,12 +91,7 @@ impl File {
(res, bufs)
}
#[cfg(target_os = "linux")]
File::Uring(_) => {
// TODO: enhance tokio-uring to support readv_at
let file = self.as_tokio_uring_file();
let res = preadv(file.as_raw_fd(), &mut bufs, offset);
(res, bufs)
}
File::Uring(_) => self.as_tokio_uring_file().readv_at(bufs, offset).await,
}
}

Expand All @@ -115,11 +110,7 @@ impl File {
(res, bufs[0])
}
#[cfg(target_os = "linux")]
File::Uring(_) => {
self.as_tokio_uring_file()
.write_at(buf, offset as u64)
.await
}
File::Uring(_) => self.as_tokio_uring_file().write_at(buf, offset).await,
}
}

Expand All @@ -137,12 +128,7 @@ impl File {
(res, bufs)
}
#[cfg(target_os = "linux")]
File::Uring(_) => {
// TODO: enhance tokio-uring to support writev_at
let file = self.as_tokio_uring_file();
let res = pwritev(file.as_raw_fd(), &bufs, offset);
(res, bufs)
}
File::Uring(_) => self.as_tokio_uring_file().writev_at(bufs, offset).await,
}
}

Expand Down Expand Up @@ -170,8 +156,19 @@ impl File {
match self {
File::Tokio(f) => f.try_clone().await.map(File::Tokio),
#[cfg(target_os = "linux")]
// TODO
File::Uring(_f) => unimplemented!(),
File::Uring(_) => {
let file = self.as_tokio_uring_file();
// Safe because file.as_raw_fd() is valid RawFd and we have checked the result.
let fd = unsafe { libc::dup(file.as_raw_fd()) };
if fd < 0 {
Err(std::io::Error::last_os_error())
} else {
// Safe because the fd is valid.
let f = unsafe { tokio_uring::fs::File::from_raw_fd(fd) };
let p = Box::into_raw(Box::new(f)) as usize;
Ok(File::Uring(p))
}
}
}
}

Expand Down Expand Up @@ -416,4 +413,27 @@ mod tests {
assert_eq!(&res, "test");
});
}

#[test]
fn test_async_try_clone() {
let dir = TempDir::new().unwrap();
let path = dir.as_path().to_path_buf();

block_on(async {
let file = File::async_open(path.join("test.txt"), true, true)
.await
.unwrap();

let file2 = file.async_try_clone().await.unwrap();
drop(file);

let buffer = b"test";
let buf = unsafe {
FileVolatileBuf::from_raw(buffer.as_ptr() as *mut u8, buffer.len(), buffer.len())
};
let (res, buf) = file2.async_write_at(buf, 0).await;
assert_eq!(res.unwrap(), 4);
assert_eq!(buf.len(), 4);
});
}
}
17 changes: 5 additions & 12 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum Runtime {
Tokio(tokio::runtime::Runtime),
#[cfg(target_os = "linux")]
/// Tokio-uring Runtime.
Uring,
Uring(std::sync::Mutex<tokio_uring::Runtime>),
}

impl Runtime {
Expand All @@ -23,16 +23,11 @@ impl Runtime {
/// Panic if failed to create the Runtime object.
pub fn new() -> Self {
// Check whether io-uring is available.
// TODO: use io-uring probe to detect supported operations.
#[cfg(target_os = "linux")]
{
let ok = tokio_uring::start(async {
tokio_uring::fs::File::open("/proc/self/mounts")
.await
.is_ok()
});
if ok {
return Runtime::Uring;
// TODO: use io-uring probe to detect supported operations.
if let Ok(rt) = tokio_uring::Runtime::new() {
return Runtime::Uring(std::sync::Mutex::new(rt));
}
}

Expand All @@ -48,10 +43,8 @@ impl Runtime {
pub fn block_on<F: Future>(&self, f: F) -> F::Output {
match self {
Runtime::Tokio(rt) => rt.block_on(f),
// Due to limitation of tokio_uring API, the runtime object is created on-demand.
// TODO: expose tokio-uring Runtime object.
#[cfg(target_os = "linux")]
Runtime::Uring => tokio_uring::start(f),
Runtime::Uring(rt) => rt.lock().unwrap().block_on(f),
}
}
}
Expand Down

0 comments on commit ec8d63d

Please sign in to comment.