Skip to content

Commit

Permalink
Slight cleanup for Storage (#33212)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 6b52c7a5b83f9c28ccf8730209a6a983f3d1dd42
  • Loading branch information
goffrie authored and Convex, Inc. committed Jan 16, 2025
1 parent eb4e30f commit 18108d6
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use futures::{
FutureExt,
Stream,
StreamExt,
TryFutureExt,
TryStreamExt,
};
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -622,20 +621,22 @@ impl StorageExt for Arc<dyn Storage> {
let key_ = key.clone();
let stream_fut = async move {
self_
.get_small_range_with_retries(&key_, chunk_start..chunk_end)
// Mapping everything to `io::ErrorKind::Other` feels bad, but it's what the AWS library does internally.
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map_ok(|storage_get_stream| storage_get_stream.stream).await
.get_small_range_with_retries(&key_, chunk_start..chunk_end)
.await
.map_err(|e| {
// Mapping everything to `io::ErrorKind::Other` feels bad, but it's what the
// AWS library does internally.
std::io::Error::new(std::io::ErrorKind::Other, e)
})
.map(|storage_get_stream| storage_get_stream.stream)
};
chunk_futures.push(stream_fut);
}
// Convert the list of futures into a stream, where each item is the resolved
// output of the future (i.e. a `ByteStream`)
let byte_stream = futures::stream::iter(chunk_futures)
// Wrap it in `Ok` as the underlying stream is a `TryStream`, and the error types must match
.map(Ok)
// Limit the concurrency of the chunk downloads
.try_buffered(MAX_CONCURRENT_CHUNK_DOWNLOADS)
.buffered(MAX_CONCURRENT_CHUNK_DOWNLOADS)
// Flatten the `Stream<Item = io::Result<Stream<Item = io::Result<Bytes>>>>` into a single `Stream<Item = io::Result<Bytes>>`
.try_flatten();
Ok(Some(StorageGetStream {
Expand Down

0 comments on commit 18108d6

Please sign in to comment.