From 486665e8653ea5731a87c0c2830bcfd1b29221a3 Mon Sep 17 00:00:00 2001 From: Ludwig DUBOS Date: Sat, 6 Jul 2024 21:47:15 +0200 Subject: [PATCH] update assets `Reader` to use `AsyncSeekForward` rather than `AsyncSeek` --- crates/bevy_asset/src/io/file/file_asset.rs | 27 ++++- .../bevy_asset/src/io/file/sync_file_asset.rs | 16 +-- crates/bevy_asset/src/io/memory.rs | 40 ++----- crates/bevy_asset/src/io/mod.rs | 108 ++++++++++-------- crates/bevy_asset/src/io/processor_gated.rs | 15 ++- 5 files changed, 114 insertions(+), 92 deletions(-) diff --git a/crates/bevy_asset/src/io/file/file_asset.rs b/crates/bevy_asset/src/io/file/file_asset.rs index 56a566c219127c..97ec37b85a3914 100644 --- a/crates/bevy_asset/src/io/file/file_asset.rs +++ b/crates/bevy_asset/src/io/file/file_asset.rs @@ -1,14 +1,35 @@ use crate::io::{ - get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream, - Reader, Writer, + get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward, + PathStream, Reader, Writer, }; use async_fs::{read_dir, File}; +use futures_io::AsyncSeek; use futures_lite::StreamExt; -use std::path::Path; +use std::task; +use std::{path::Path, pin::Pin, task::Poll}; use super::{FileAssetReader, FileAssetWriter}; +impl AsyncSeekForward for File { + fn poll_seek_forward( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + offset: u64, + ) -> Poll> { + let offset: Result = offset.try_into(); + + if let Ok(offset) = offset { + Pin::new(&mut self).poll_seek(cx, futures_io::SeekFrom::Current(offset)) + } else { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } + } +} + impl Reader for File {} impl AssetReader for FileAssetReader { diff --git a/crates/bevy_asset/src/io/file/sync_file_asset.rs b/crates/bevy_asset/src/io/file/sync_file_asset.rs index 2ac547e9b7136f..7ed5ff84d712e3 100644 --- a/crates/bevy_asset/src/io/file/sync_file_asset.rs +++ b/crates/bevy_asset/src/io/file/sync_file_asset.rs @@ -1,9 +1,9 @@ -use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::Stream; use crate::io::{ - get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream, - Reader, Writer, + get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward, + PathStream, Reader, Writer, }; use std::{ @@ -30,14 +30,16 @@ impl AsyncRead for FileReader { } } -impl AsyncSeek for FileReader { - fn poll_seek( +impl AsyncSeekForward for FileReader { + fn poll_seek_forward( self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, - pos: std::io::SeekFrom, + offset: u64, ) -> Poll> { let this = self.get_mut(); - let seek = this.0.seek(pos); + let current = this.0.stream_position()?; + let seek = this.0.seek(std::io::SeekFrom::Start(current + offset)); + Poll::Ready(seek) } } diff --git a/crates/bevy_asset/src/io/memory.rs b/crates/bevy_asset/src/io/memory.rs index a3707e9b139c45..3be8aa42c4fb48 100644 --- a/crates/bevy_asset/src/io/memory.rs +++ b/crates/bevy_asset/src/io/memory.rs @@ -1,9 +1,8 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; use bevy_utils::HashMap; -use futures_io::{AsyncRead, AsyncSeek}; +use futures_io::AsyncRead; use futures_lite::{ready, Stream}; use parking_lot::RwLock; -use std::io::SeekFrom; use std::{ path::{Path, PathBuf}, pin::Pin, @@ -11,6 +10,8 @@ use std::{ task::Poll, }; +use super::AsyncSeekForward; + #[derive(Default, Debug)] struct DirInternal { assets: HashMap, Data>, @@ -248,37 +249,20 @@ impl AsyncRead for DataReader { } } -impl AsyncSeek for DataReader { - fn poll_seek( +impl AsyncSeekForward for DataReader { + fn poll_seek_forward( mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - let result = match pos { - SeekFrom::Start(offset) => offset.try_into(), - SeekFrom::End(offset) => self - .data - .value() - .len() - .try_into() - .map(|len: i64| len - offset), - SeekFrom::Current(offset) => self - .bytes_read - .try_into() - .map(|bytes_read: i64| bytes_read + offset), - }; + let result = self + .bytes_read + .try_into() + .map(|bytes_read: u64| bytes_read + offset); if let Ok(new_pos) = result { - if new_pos < 0 { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seek position is out of range", - ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) - } + self.bytes_read = new_pos as _; + Poll::Ready(Ok(new_pos as _)) } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/crates/bevy_asset/src/io/mod.rs b/crates/bevy_asset/src/io/mod.rs index 925192e6dec424..610f5e318ea63e 100644 --- a/crates/bevy_asset/src/io/mod.rs +++ b/crates/bevy_asset/src/io/mod.rs @@ -22,14 +22,15 @@ pub use futures_lite::AsyncWriteExt; pub use source::*; use bevy_utils::{BoxedFuture, ConditionalSendFuture}; -use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::{ready, Stream}; +use std::task::Context; use std::{ - io::SeekFrom, + mem::size_of, path::{Path, PathBuf}, pin::Pin, sync::Arc, - task::{Context, Poll}, + task::Poll, }; use thiserror::Error; @@ -80,13 +81,51 @@ pub const STACK_FUTURE_SIZE: usize = 10 * size_of::<&()>(); pub use stackfuture::StackFuture; +/// Asynchronously advances the cursor position by a specified number of bytes. +/// +/// This trait is a simplified version of the [`futures_io::AsyncSeek`] trait, providing +/// support exclusively for the [`futures_io::SeekFrom::Current`] variant. It allows for relative +/// seeking from the current cursor position. +pub trait AsyncSeekForward { + /// Attempts to asynchronously seek forward by a specified number of bytes from the current cursor position. + /// + /// Seeking beyond the end of the stream is allowed and the behavior for this case is defined by the implementation. + /// The new position, relative to the beginning of the stream, should be returned upon successful completion + /// of the seek operation. + /// + /// If the seek operation completes successfully, + /// the new position relative to the beginning of the stream should be returned. + /// + /// # Implementation + /// + /// Implementations of this trait should handle [`Poll::Pending`] correctly, converting + /// [`std::io::ErrorKind::WouldBlock`] errors into [`Poll::Pending`] to indicate that the operation is not + /// yet complete and should be retried, and either internally retry or convert + /// [`std::io::ErrorKind::Interrupted`] into another error kind. + fn poll_seek_forward( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: u64, + ) -> Poll>; +} + +impl AsyncSeekForward for Box { + fn poll_seek_forward( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: u64, + ) -> Poll> { + Pin::new(&mut **self).poll_seek_forward(cx, offset) + } +} + /// A type returned from [`AssetReader::read`], which is used to read the contents of a file /// (or virtual file) corresponding to an asset. /// -/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeek`]. +/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeekForward`]. /// The only reason a blanket implementation is not provided for applicable types is to allow /// implementors to override the provided implementation of [`Reader::read_to_end`]. -pub trait Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync { +pub trait Reader: AsyncRead + AsyncSeekForward + Unpin + Send + Sync { /// Reads the entire contents of this reader and appends them to a vec. /// /// # Note for implementors @@ -538,32 +577,20 @@ impl AsyncRead for VecReader { } } -impl AsyncSeek for VecReader { - fn poll_seek( +impl AsyncSeekForward for VecReader { + fn poll_seek_forward( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - let result = match pos { - SeekFrom::Start(offset) => offset.try_into(), - SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset), - SeekFrom::Current(offset) => self - .bytes_read - .try_into() - .map(|bytes_read: i64| bytes_read + offset), - }; + let result = self + .bytes_read + .try_into() + .map(|bytes_read: u64| bytes_read + offset); if let Ok(new_pos) = result { - if new_pos < 0 { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seek position is out of range", - ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) - } + self.bytes_read = new_pos as _; + Poll::Ready(Ok(new_pos as _)) } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, @@ -623,32 +650,21 @@ impl<'a> AsyncRead for SliceReader<'a> { } } -impl<'a> AsyncSeek for SliceReader<'a> { - fn poll_seek( +impl<'a> AsyncSeekForward for SliceReader<'a> { + fn poll_seek_forward( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - let result = match pos { - SeekFrom::Start(offset) => offset.try_into(), - SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset), - SeekFrom::Current(offset) => self - .bytes_read - .try_into() - .map(|bytes_read: i64| bytes_read + offset), - }; + let result = self + .bytes_read + .try_into() + .map(|bytes_read: u64| bytes_read + offset); if let Ok(new_pos) = result { - if new_pos < 0 { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seek position is out of range", - ))) - } else { - self.bytes_read = new_pos as _; + self.bytes_read = new_pos as _; - Poll::Ready(Ok(new_pos as _)) - } + Poll::Ready(Ok(new_pos as _)) } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/crates/bevy_asset/src/io/processor_gated.rs b/crates/bevy_asset/src/io/processor_gated.rs index a77c7877970a8a..bf6a694b6eebd3 100644 --- a/crates/bevy_asset/src/io/processor_gated.rs +++ b/crates/bevy_asset/src/io/processor_gated.rs @@ -5,17 +5,16 @@ use crate::{ }; use async_lock::RwLockReadGuardArc; use bevy_utils::tracing::trace; -use futures_io::{AsyncRead, AsyncSeek}; -use std::io::SeekFrom; +use futures_io::AsyncRead; use std::task::Poll; use std::{path::Path, pin::Pin, sync::Arc}; -use super::ErasedAssetReader; +use super::{AsyncSeekForward, ErasedAssetReader}; /// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a /// given path until that path has been processed by [`AssetProcessor`]. /// -/// [`AssetProcessor`]: crate::processor::AssetProcessor +/// [`AssetProcessor`]: crate::processor::AssetProcessor pub struct ProcessorGatedReader { reader: Box, source: AssetSourceId<'static>, @@ -142,13 +141,13 @@ impl AsyncRead for TransactionLockedReader<'_> { } } -impl AsyncSeek for TransactionLockedReader<'_> { - fn poll_seek( +impl AsyncSeekForward for TransactionLockedReader<'_> { + fn poll_seek_forward( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - pos: SeekFrom, + offset: u64, ) -> Poll> { - Pin::new(&mut self.reader).poll_seek(cx, pos) + Pin::new(&mut self.reader).poll_seek_forward(cx, offset) } }