Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace AsyncSeek trait by AsyncSeekForward for Reader to address #12880 #14194

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions crates/bevy_asset/src/io/file/file_asset.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
use crate::io::{
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream,
Reader, Writer,
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward,
PathStream, Reader, Writer,
};
use async_fs::{read_dir, File};
use futures_io::AsyncSeek;
use futures_lite::StreamExt;

use core::{pin::Pin, task, task::Poll};
use std::path::Path;

use super::{FileAssetReader, FileAssetWriter};

impl AsyncSeekForward for File {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>> {
let offset: Result<i64, _> = offset.try_into();

if let Ok(offset) = offset {
Pin::new(&mut self).poll_seek(cx, futures_io::SeekFrom::Current(offset))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
}
}
}

impl Reader for File {}

impl AssetReader for FileAssetReader {
Expand Down
16 changes: 9 additions & 7 deletions crates/bevy_asset/src/io/file/sync_file_asset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::Stream;

use crate::io::{
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream,
Reader, Writer,
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward,
PathStream, Reader, Writer,
};

use core::{pin::Pin, task::Poll};
Expand All @@ -29,14 +29,16 @@ impl AsyncRead for FileReader {
}
}

impl AsyncSeek for FileReader {
fn poll_seek(
impl AsyncSeekForward for FileReader {
fn poll_seek_forward(
self: Pin<&mut Self>,
_cx: &mut core::task::Context<'_>,
pos: std::io::SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let this = self.get_mut();
let seek = this.0.seek(pos);
let current = this.0.stream_position()?;
let seek = this.0.seek(std::io::SeekFrom::Start(current + offset));

Poll::Ready(seek)
}
}
Expand Down
44 changes: 13 additions & 31 deletions crates/bevy_asset/src/io/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
use alloc::sync::Arc;
use bevy_utils::HashMap;
use core::{pin::Pin, task::Poll};
use futures_io::{AsyncRead, AsyncSeek};
use futures_io::AsyncRead;
use futures_lite::{ready, Stream};
use parking_lot::RwLock;
use std::{
io::SeekFrom,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use super::AsyncSeekForward;

#[derive(Default, Debug)]
struct DirInternal {
Expand Down Expand Up @@ -247,37 +246,20 @@ impl AsyncRead for DataReader {
}
}

impl AsyncSeek for DataReader {
fn poll_seek(
impl AsyncSeekForward for DataReader {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut core::task::Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self
.data
.value()
.len()
.try_into()
.map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down
108 changes: 60 additions & 48 deletions crates/bevy_asset/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::{ready, Stream};
use std::{
io::SeekFrom,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use thiserror::Error;

/// Errors that occur while loading assets.
Expand Down Expand Up @@ -83,13 +80,51 @@ pub const STACK_FUTURE_SIZE: usize = 10 * size_of::<&()>();

pub use stackfuture::StackFuture;

/// Asynchronously advances the cursor position by a specified number of bytes.
///
/// This trait is a simplified version of the [`futures_io::AsyncSeek`] trait, providing
/// support exclusively for the [`futures_io::SeekFrom::Current`] variant. It allows for relative
/// seeking from the current cursor position.
pub trait AsyncSeekForward {
/// Attempts to asynchronously seek forward by a specified number of bytes from the current cursor position.
///
/// Seeking beyond the end of the stream is allowed and the behavior for this case is defined by the implementation.
/// The new position, relative to the beginning of the stream, should be returned upon successful completion
/// of the seek operation.
///
/// If the seek operation completes successfully,
/// the new position relative to the beginning of the stream should be returned.
///
/// # Implementation
///
/// Implementations of this trait should handle [`Poll::Pending`] correctly, converting
/// [`std::io::ErrorKind::WouldBlock`] errors into [`Poll::Pending`] to indicate that the operation is not
/// yet complete and should be retried, and either internally retry or convert
/// [`std::io::ErrorKind::Interrupted`] into another error kind.
fn poll_seek_forward(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>>;
}

impl<T: ?Sized + AsyncSeekForward + Unpin> AsyncSeekForward for Box<T> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>> {
Pin::new(&mut **self).poll_seek_forward(cx, offset)
}
}

/// A type returned from [`AssetReader::read`], which is used to read the contents of a file
/// (or virtual file) corresponding to an asset.
///
/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeek`].
/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeekForward`].
/// The only reason a blanket implementation is not provided for applicable types is to allow
/// implementors to override the provided implementation of [`Reader::read_to_end`].
pub trait Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync {
pub trait Reader: AsyncRead + AsyncSeekForward + Unpin + Send + Sync {
/// Reads the entire contents of this reader and appends them to a vec.
///
/// # Note for implementors
Expand Down Expand Up @@ -559,32 +594,20 @@ impl AsyncRead for VecReader {
}
}

impl AsyncSeek for VecReader {
fn poll_seek(
impl AsyncSeekForward for VecReader {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -644,32 +667,21 @@ impl<'a> AsyncRead for SliceReader<'a> {
}
}

impl<'a> AsyncSeek for SliceReader<'a> {
fn poll_seek(
impl<'a> AsyncSeekForward for SliceReader<'a> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down
16 changes: 8 additions & 8 deletions crates/bevy_asset/src/io/processor_gated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use alloc::sync::Arc;
use async_lock::RwLockReadGuardArc;
use bevy_utils::tracing::trace;
use core::{pin::Pin, task::Poll};
use futures_io::{AsyncRead, AsyncSeek};
use std::{io::SeekFrom, path::Path};
use futures_io::AsyncRead;
use std::path::Path;

use super::ErasedAssetReader;
use super::{AsyncSeekForward, ErasedAssetReader};

/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
/// given path until that path has been processed by [`AssetProcessor`].
///
/// [`AssetProcessor`]: crate::processor::AssetProcessor
/// [`AssetProcessor`]: crate::processor::AssetProcessor
pub struct ProcessorGatedReader {
reader: Box<dyn ErasedAssetReader>,
source: AssetSourceId<'static>,
Expand Down Expand Up @@ -142,13 +142,13 @@ impl AsyncRead for TransactionLockedReader<'_> {
}
}

impl AsyncSeek for TransactionLockedReader<'_> {
fn poll_seek(
impl AsyncSeekForward for TransactionLockedReader<'_> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.reader).poll_seek(cx, pos)
Pin::new(&mut self.reader).poll_seek_forward(cx, offset)
}
}

Expand Down