diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index 4d06444..5e972c0 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -212,9 +212,9 @@ mod tests { async fn test_dyn_fs() { use tempfile::tempfile; - use crate::Write; + use crate::{disk::tokio::TokioFile, Write}; - let file = tokio::fs::File::from_std(tempfile().unwrap()); + let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap())); let mut dyn_file: Box = Box::new(file); let buf = [24, 9, 24, 0]; let (result, _) = dyn_file.write_all(&buf[..]).await; diff --git a/fusio/src/impls/buffered.rs b/fusio/src/impls/buffered.rs index 2328cf0..68416bc 100644 --- a/fusio/src/impls/buffered.rs +++ b/fusio/src/impls/buffered.rs @@ -155,10 +155,9 @@ impl Write for BufWriter { #[cfg(feature = "tokio")] #[cfg(test)] pub(crate) mod tests { - use tokio::io::AsyncWriteExt; use super::BufWriter; - use crate::{buffered::BufReader, Error, IoBufMut, Read}; + use crate::{buffered::BufReader, Error, IoBufMut, Read, Write}; impl Read for BufWriter { async fn read_exact_at(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) { @@ -180,10 +179,12 @@ pub(crate) mod tests { async fn test_buf_read() { use tempfile::tempfile; - let mut file = tokio::fs::File::from_std(tempfile().unwrap()); - file.write_all(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) - .await - .unwrap(); + use crate::disk::tokio::TokioFile; + + let mut file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap())); + let _ = file + .write_all([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15].as_slice()) + .await; let mut reader = BufReader::new(file, 8).await.unwrap(); { @@ -229,9 +230,12 @@ pub(crate) mod tests { async fn test_buf_read_write() { use tempfile::tempfile; - use crate::{impls::buffered::BufWriter, Read, Write}; + use crate::{ + impls::{buffered::BufWriter, disk::tokio::TokioFile}, + Read, Write, + }; - let file = tokio::fs::File::from_std(tempfile().unwrap()); + let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap())); let mut writer = BufWriter::new(file, 4); { let _ = writer.write_all("Hello".as_bytes()).await; diff --git a/fusio/src/impls/disk/opfs/sync/mod.rs b/fusio/src/impls/disk/opfs/sync/mod.rs index da3be87..74e8b24 100644 --- a/fusio/src/impls/disk/opfs/sync/mod.rs +++ b/fusio/src/impls/disk/opfs/sync/mod.rs @@ -7,7 +7,6 @@ use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read, /// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle) /// This file is only accessible inside dedicated Web Workers. pub struct OPFSSyncFile { - file_handle: FileSystemFileHandle, access_handle: Option, } @@ -15,22 +14,7 @@ impl OPFSSyncFile { pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result { let js_promise = file_handle.create_sync_access_handle(); let access_handle = Some(promise::(js_promise).await?); - Ok(Self { - file_handle, - access_handle, - }) - } - - pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle { - if self.access_handle.is_none() { - let js_promise = self.file_handle.create_sync_access_handle(); - self.access_handle = Some( - promise::(js_promise) - .await - .unwrap(), - ); - } - self.access_handle.as_ref().unwrap() + Ok(Self { access_handle }) } } @@ -40,9 +24,12 @@ impl Write for OPFSSyncFile { /// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called. /// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write) async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + debug_assert!(self.access_handle.is_some(), "file is already closed"); + match self - .access_handle() - .await + .access_handle + .as_ref() + .unwrap() .write_with_u8_array(buf.as_slice()) { Ok(_) => (Ok(()), buf), @@ -53,10 +40,18 @@ impl Write for OPFSSyncFile { /// Persists any changes made to the file. /// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush) async fn flush(&mut self) -> Result<(), Error> { - self.access_handle().await.flush().map_err(wasm_err) + debug_assert!(self.access_handle.is_some(), "file is already closed"); + + self.access_handle + .as_ref() + .unwrap() + .flush() + .map_err(wasm_err) } async fn close(&mut self) -> Result<(), Error> { + debug_assert!(self.access_handle.is_some(), "file is already closed"); + if let Some(access_handle) = self.access_handle.take() { access_handle.close(); } @@ -72,11 +67,13 @@ impl Read for OPFSSyncFile { /// If the operation encounters an "end of file" before completely /// filling the buffer, it returns an error of [`crate::Error`]. async fn read_exact_at(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { + debug_assert!(self.access_handle.is_some(), "file is already closed"); + let buf_len = buf.bytes_init() as i32; let options = FileSystemReadWriteOptions::new(); options.set_at(pos as f64); - let access_handle = self.access_handle().await; + let access_handle = self.access_handle.as_ref().unwrap(); let size = access_handle .get_size() .expect("InvalidStateError: file is already closed."); @@ -102,10 +99,12 @@ impl Read for OPFSSyncFile { /// If an error is encountered then the `read_to_end_at` operation /// immediately completes. async fn read_to_end_at(&mut self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + debug_assert!(self.access_handle.is_some(), "file is already closed"); + let options = FileSystemReadWriteOptions::new(); options.set_at(pos as f64); - let access_handle = self.access_handle().await; + let access_handle = self.access_handle.as_ref().unwrap(); let size = access_handle .get_size() .expect("InvalidStateError: file is already closed."); @@ -129,27 +128,13 @@ impl Read for OPFSSyncFile { /// Return the size of file in bytes. async fn size(&self) -> Result { - match self.access_handle.as_ref() { - Some(access_handle) => access_handle - .get_size() - .map(|sz| sz.round() as u64) - .map_err(wasm_err), - None => { - // FIXME: here should throw an error - let js_promise = self.file_handle.create_sync_access_handle(); - let access_handle = promise::(js_promise) - .await - .unwrap(); - let result = access_handle - .get_size() - .map(|sz| sz.round() as u64) - .map_err(wasm_err); - - access_handle.close(); - - result - } - } + debug_assert!(self.access_handle.is_some(), "file is already closed"); + self.access_handle + .as_ref() + .unwrap() + .get_size() + .map(|sz| sz.round() as u64) + .map_err(wasm_err) } } diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index 2f470d4..bb7d158 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -3,11 +3,12 @@ use std::io; use async_stream::stream; use futures_core::Stream; use tokio::{ - fs::{create_dir_all, remove_file, File}, + fs::{create_dir_all, remove_file}, task::spawn_blocking, }; use crate::{ + disk::tokio::TokioFile, fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, @@ -16,7 +17,7 @@ use crate::{ pub struct TokioFs; impl Fs for TokioFs { - type File = File; + type File = TokioFile; fn file_system(&self) -> FileSystemTag { FileSystemTag::Local @@ -36,7 +37,7 @@ impl Fs for TokioFs { file.set_len(0).await?; } - Ok(file) + Ok(TokioFile::new(file)) } async fn create_dir_all(path: &Path) -> Result<(), Error> { diff --git a/fusio/src/impls/disk/tokio/mod.rs b/fusio/src/impls/disk/tokio/mod.rs index 9814f31..c528b95 100644 --- a/fusio/src/impls/disk/tokio/mod.rs +++ b/fusio/src/impls/disk/tokio/mod.rs @@ -10,10 +10,21 @@ use tokio::{ use crate::{buf::IoBufMut, Error, IoBuf, Read, Write}; -impl Write for File { +pub struct TokioFile { + file: Option, +} +impl TokioFile { + pub(crate) fn new(file: File) -> Self { + Self { file: Some(file) } + } +} + +impl Write for TokioFile { async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + debug_assert!(self.file.is_some(), "file is already closed"); + ( - AsyncWriteExt::write_all(self, unsafe { + AsyncWriteExt::write_all(self.file.as_mut().unwrap(), unsafe { &*slice_from_raw_parts(buf.as_ptr(), buf.bytes_init()) }) .await @@ -23,40 +34,56 @@ impl Write for File { } async fn flush(&mut self) -> Result<(), Error> { - AsyncWriteExt::flush(self).await.map_err(Error::from) + debug_assert!(self.file.is_some(), "file is already closed"); + + AsyncWriteExt::flush(self.file.as_mut().unwrap()) + .await + .map_err(Error::from) } async fn close(&mut self) -> Result<(), Error> { - AsyncWriteExt::flush(self).await.map_err(Error::from)?; - File::shutdown(self).await?; + debug_assert!(self.file.is_some(), "file is already closed"); + + let file = self.file.as_mut().unwrap(); + AsyncWriteExt::flush(file).await.map_err(Error::from)?; + File::shutdown(file).await?; + self.file.take(); Ok(()) } } -impl Read for File { +impl Read for TokioFile { async fn read_exact_at(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { + debug_assert!(self.file.is_some(), "file is already closed"); + + let file = self.file.as_mut().unwrap(); // TODO: Use pread instead of seek + read_exact - if let Err(e) = AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await { + if let Err(e) = AsyncSeekExt::seek(file, SeekFrom::Start(pos)).await { return (Err(Error::Io(e)), buf); } - match AsyncReadExt::read_exact(self, buf.as_slice_mut()).await { + match AsyncReadExt::read_exact(file, buf.as_slice_mut()).await { Ok(_) => (Ok(()), buf), Err(e) => (Err(Error::Io(e)), buf), } } async fn read_to_end_at(&mut self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + debug_assert!(self.file.is_some(), "file is already closed"); + + let file = self.file.as_mut().unwrap(); // TODO: Use pread instead of seek + read_exact - if let Err(e) = AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await { + if let Err(e) = AsyncSeekExt::seek(file, SeekFrom::Start(pos)).await { return (Err(Error::Io(e)), buf); } - match AsyncReadExt::read_to_end(self, &mut buf).await { + match AsyncReadExt::read_to_end(file, &mut buf).await { Ok(_) => (Ok(()), buf), Err(e) => (Err(Error::Io(e)), buf), } } async fn size(&self) -> Result { - Ok(self.metadata().await?.len()) + debug_assert!(self.file.is_some(), "file is already closed"); + + Ok(self.file.as_ref().unwrap().metadata().await?.len()) } } diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index a4fd45d..e42f8da 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -32,9 +32,11 @@ //! async fn main() { //! #[cfg(feature = "tokio")] //! { +//! use fusio::{disk::LocalFs, DynFs}; //! use tokio::fs::File; //! -//! let mut file = File::open("foo.txt").await.unwrap(); +//! let fs = LocalFs {}; +//! let mut file = fs.open(&"foo.txt".into()).await.unwrap(); //! let write_buf = "hello, world".as_bytes(); //! let mut read_buf = [0; 12]; //! let (result, _, read_buf) = @@ -541,10 +543,13 @@ mod tests { use tempfile::tempfile; use tokio::fs::File; + use crate::disk::tokio::TokioFile; + let read = tempfile().unwrap(); let write = read.try_clone().unwrap(); - - write_and_read(File::from_std(write), File::from_std(read)).await; + let read_file = TokioFile::new(File::from_std(read)); + let write_file = TokioFile::new(File::from_std(write)); + write_and_read(write_file, read_file).await; } #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] @@ -582,7 +587,9 @@ mod tests { use tempfile::tempfile; use tokio::fs::File; - let mut file = File::from_std(tempfile().unwrap()); + use crate::disk::tokio::TokioFile; + + let mut file = TokioFile::new(File::from_std(tempfile().unwrap())); let (result, _) = file.write_all(&b"hello, world"[..]).await; result.unwrap(); let (result, buf) = file.read_exact_at(vec![0u8; 5], 0).await;