Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change close semantic for tokio fs and FileSystemSyncAccessHandle #121

Merged
merged 3 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ mod tests {
async fn test_dyn_fs() {
use tempfile::tempfile;

use crate::Write;
use crate::{disk::tokio::TokioFile, Write};

let file = tokio::fs::File::from_std(tempfile().unwrap());
let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let mut dyn_file: Box<dyn super::DynFile> = Box::new(file);
let buf = [24, 9, 24, 0];
let (result, _) = dyn_file.write_all(&buf[..]).await;
Expand Down
20 changes: 12 additions & 8 deletions fusio/src/impls/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ impl<F: Write> Write for BufWriter<F> {
#[cfg(feature = "tokio")]
#[cfg(test)]
pub(crate) mod tests {
use tokio::io::AsyncWriteExt;

use super::BufWriter;
use crate::{buffered::BufReader, Error, IoBufMut, Read};
use crate::{buffered::BufReader, Error, IoBufMut, Read, Write};

impl<F: Read> Read for BufWriter<F> {
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
Expand All @@ -180,10 +179,12 @@ pub(crate) mod tests {
async fn test_buf_read() {
use tempfile::tempfile;

let mut file = tokio::fs::File::from_std(tempfile().unwrap());
file.write_all(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
.await
.unwrap();
use crate::disk::tokio::TokioFile;

let mut file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let _ = file
.write_all([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15].as_slice())
.await;

let mut reader = BufReader::new(file, 8).await.unwrap();
{
Expand Down Expand Up @@ -229,9 +230,12 @@ pub(crate) mod tests {
async fn test_buf_read_write() {
use tempfile::tempfile;

use crate::{impls::buffered::BufWriter, Read, Write};
use crate::{
impls::{buffered::BufWriter, disk::tokio::TokioFile},
Read, Write,
};

let file = tokio::fs::File::from_std(tempfile().unwrap());
let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let mut writer = BufWriter::new(file, 4);
{
let _ = writer.write_all("Hello".as_bytes()).await;
Expand Down
71 changes: 28 additions & 43 deletions fusio/src/impls/disk/opfs/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,14 @@ use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read,
/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle)
/// This file is only accessible inside dedicated Web Workers.
pub struct OPFSSyncFile {
file_handle: FileSystemFileHandle,
access_handle: Option<FileSystemSyncAccessHandle>,
}

impl OPFSSyncFile {
pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result<Self, Error> {
let js_promise = file_handle.create_sync_access_handle();
let access_handle = Some(promise::<FileSystemSyncAccessHandle>(js_promise).await?);
Ok(Self {
file_handle,
access_handle,
})
}

pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle {
if self.access_handle.is_none() {
let js_promise = self.file_handle.create_sync_access_handle();
self.access_handle = Some(
promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap(),
);
}
self.access_handle.as_ref().unwrap()
Ok(Self { access_handle })
}
}

Expand All @@ -40,9 +24,12 @@ impl Write for OPFSSyncFile {
/// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write)
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
debug_assert!(self.access_handle.is_some(), "file is already closed");

match self
.access_handle()
.await
.access_handle
.as_ref()
.unwrap()
.write_with_u8_array(buf.as_slice())
{
Ok(_) => (Ok(()), buf),
Expand All @@ -53,10 +40,18 @@ impl Write for OPFSSyncFile {
/// Persists any changes made to the file.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush)
async fn flush(&mut self) -> Result<(), Error> {
self.access_handle().await.flush().map_err(wasm_err)
debug_assert!(self.access_handle.is_some(), "file is already closed");

self.access_handle
.as_ref()
.unwrap()
.flush()
.map_err(wasm_err)
}

async fn close(&mut self) -> Result<(), Error> {
debug_assert!(self.access_handle.is_some(), "file is already closed");

if let Some(access_handle) = self.access_handle.take() {
access_handle.close();
}
Expand All @@ -72,11 +67,13 @@ impl Read for OPFSSyncFile {
/// If the operation encounters an "end of file" before completely
/// filling the buffer, it returns an error of [`crate::Error`].
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
debug_assert!(self.access_handle.is_some(), "file is already closed");

let buf_len = buf.bytes_init() as i32;
let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let access_handle = self.access_handle.as_ref().unwrap();
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
Expand All @@ -102,10 +99,12 @@ impl Read for OPFSSyncFile {
/// If an error is encountered then the `read_to_end_at` operation
/// immediately completes.
async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
debug_assert!(self.access_handle.is_some(), "file is already closed");

let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let access_handle = self.access_handle.as_ref().unwrap();
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
Expand All @@ -129,27 +128,13 @@ impl Read for OPFSSyncFile {

/// Return the size of file in bytes.
async fn size(&self) -> Result<u64, Error> {
match self.access_handle.as_ref() {
Some(access_handle) => access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err),
None => {
// FIXME: here should throw an error
let js_promise = self.file_handle.create_sync_access_handle();
let access_handle = promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap();
let result = access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err);

access_handle.close();

result
}
}
debug_assert!(self.access_handle.is_some(), "file is already closed");
self.access_handle
.as_ref()
.unwrap()
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err)
}
}

Expand Down
7 changes: 4 additions & 3 deletions fusio/src/impls/disk/tokio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::io;
use async_stream::stream;
use futures_core::Stream;
use tokio::{
fs::{create_dir_all, remove_file, File},
fs::{create_dir_all, remove_file},
task::spawn_blocking,
};

use crate::{
disk::tokio::TokioFile,
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
Expand All @@ -16,7 +17,7 @@ use crate::{
pub struct TokioFs;

impl Fs for TokioFs {
type File = File;
type File = TokioFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
Expand All @@ -36,7 +37,7 @@ impl Fs for TokioFs {
file.set_len(0).await?;
}

Ok(file)
Ok(TokioFile::new(file))
}

async fn create_dir_all(path: &Path) -> Result<(), Error> {
Expand Down
49 changes: 38 additions & 11 deletions fusio/src/impls/disk/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ use tokio::{

use crate::{buf::IoBufMut, Error, IoBuf, Read, Write};

impl Write for File {
pub struct TokioFile {
file: Option<File>,
}
impl TokioFile {
pub(crate) fn new(file: File) -> Self {
Self { file: Some(file) }
}
}

impl Write for TokioFile {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
debug_assert!(self.file.is_some(), "file is already closed");

(
AsyncWriteExt::write_all(self, unsafe {
AsyncWriteExt::write_all(self.file.as_mut().unwrap(), unsafe {
&*slice_from_raw_parts(buf.as_ptr(), buf.bytes_init())
})
.await
Expand All @@ -23,40 +34,56 @@ impl Write for File {
}

async fn flush(&mut self) -> Result<(), Error> {
AsyncWriteExt::flush(self).await.map_err(Error::from)
debug_assert!(self.file.is_some(), "file is already closed");

AsyncWriteExt::flush(self.file.as_mut().unwrap())
.await
.map_err(Error::from)
}

async fn close(&mut self) -> Result<(), Error> {
AsyncWriteExt::flush(self).await.map_err(Error::from)?;
File::shutdown(self).await?;
debug_assert!(self.file.is_some(), "file is already closed");

let file = self.file.as_mut().unwrap();
AsyncWriteExt::flush(file).await.map_err(Error::from)?;
File::shutdown(file).await?;
self.file.take();
Ok(())
}
}

impl Read for File {
impl Read for TokioFile {
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
debug_assert!(self.file.is_some(), "file is already closed");

let file = self.file.as_mut().unwrap();
// TODO: Use pread instead of seek + read_exact
if let Err(e) = AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await {
if let Err(e) = AsyncSeekExt::seek(file, SeekFrom::Start(pos)).await {
return (Err(Error::Io(e)), buf);
}
match AsyncReadExt::read_exact(self, buf.as_slice_mut()).await {
match AsyncReadExt::read_exact(file, buf.as_slice_mut()).await {
Ok(_) => (Ok(()), buf),
Err(e) => (Err(Error::Io(e)), buf),
}
}

async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
debug_assert!(self.file.is_some(), "file is already closed");

let file = self.file.as_mut().unwrap();
// TODO: Use pread instead of seek + read_exact
if let Err(e) = AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await {
if let Err(e) = AsyncSeekExt::seek(file, SeekFrom::Start(pos)).await {
return (Err(Error::Io(e)), buf);
}
match AsyncReadExt::read_to_end(self, &mut buf).await {
match AsyncReadExt::read_to_end(file, &mut buf).await {
Ok(_) => (Ok(()), buf),
Err(e) => (Err(Error::Io(e)), buf),
}
}

async fn size(&self) -> Result<u64, Error> {
Ok(self.metadata().await?.len())
debug_assert!(self.file.is_some(), "file is already closed");

Ok(self.file.as_ref().unwrap().metadata().await?.len())
}
}
15 changes: 11 additions & 4 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
//! async fn main() {
//! #[cfg(feature = "tokio")]
//! {
//! use fusio::{disk::LocalFs, DynFs};
//! use tokio::fs::File;
//!
//! let mut file = File::open("foo.txt").await.unwrap();
//! let fs = LocalFs {};
//! let mut file = fs.open(&"foo.txt".into()).await.unwrap();
//! let write_buf = "hello, world".as_bytes();
//! let mut read_buf = [0; 12];
//! let (result, _, read_buf) =
Expand Down Expand Up @@ -541,10 +543,13 @@ mod tests {
use tempfile::tempfile;
use tokio::fs::File;

use crate::disk::tokio::TokioFile;

let read = tempfile().unwrap();
let write = read.try_clone().unwrap();

write_and_read(File::from_std(write), File::from_std(read)).await;
let read_file = TokioFile::new(File::from_std(read));
let write_file = TokioFile::new(File::from_std(write));
write_and_read(write_file, read_file).await;
}

#[cfg(all(feature = "tokio", not(target_arch = "wasm32")))]
Expand Down Expand Up @@ -582,7 +587,9 @@ mod tests {
use tempfile::tempfile;
use tokio::fs::File;

let mut file = File::from_std(tempfile().unwrap());
use crate::disk::tokio::TokioFile;

let mut file = TokioFile::new(File::from_std(tempfile().unwrap()));
let (result, _) = file.write_all(&b"hello, world"[..]).await;
result.unwrap();
let (result, buf) = file.read_exact_at(vec![0u8; 5], 0).await;
Expand Down
Loading