Skip to content

Commit

Permalink
Streaming list results (#35)
Browse files Browse the repository at this point in the history
* Streaming list results

* Collect remaining object metas

* Fused list stream

* Also fuse BytesStream

* fix tests

* Improve list docs
  • Loading branch information
kylebarron authored Oct 23, 2024
1 parent 469ff28 commit 05c1f5d
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 119 deletions.
45 changes: 22 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ thiserror = "1"
tokio = "1.40"
url = "2"

[patch.crates-io]
object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" }

[profile.release]
lto = true
codegen-units = 1
2 changes: 1 addition & 1 deletion docs/api/list.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# List

::: object_store_rs.list
::: object_store_rs.list_async
::: object_store_rs.list_with_delimiter
::: object_store_rs.list_with_delimiter_async
::: object_store_rs.ObjectMeta
::: object_store_rs.ListResult
::: object_store_rs.ListStream
56 changes: 34 additions & 22 deletions object-store-rs/python/object_store_rs/_list.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,63 @@ class ListResult(TypedDict):
objects: List[ObjectMeta]
"""Object metadata for the listing"""

class ListStream:
"""
A stream of [ObjectMeta][object_store_rs.ObjectMeta] that can be polled in a sync or
async fashion.
"""
def __aiter__(self) -> ListStream:
"""Return `Self` as an async iterator."""

def __iter__(self) -> ListStream:
"""Return `Self` as an async iterator."""

async def collect_async(self) -> List[ObjectMeta]:
"""Collect all remaining ObjectMeta objects in the stream."""

def collect(self) -> List[ObjectMeta]:
"""Collect all remaining ObjectMeta objects in the stream."""

async def __anext__(self) -> List[ObjectMeta]:
"""Return the next chunk of ObjectMeta in the stream."""

def __next__(self) -> List[ObjectMeta]:
"""Return the next chunk of ObjectMeta in the stream."""

def list(
store: ObjectStore,
prefix: str | None = None,
*,
offset: str | None = None,
max_items: int | None = 2000,
) -> List[ObjectMeta]:
chunk_size: int = 50,
) -> ListStream:
"""
List all the objects with the given prefix.
Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of
`foo/bar/x` but not of `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x`
will be included.
Note: the order of returned [`ObjectMeta`][object_store_rs.ObjectMeta] is not
guaranteed
!!! note
The order of returned [`ObjectMeta`][object_store_rs.ObjectMeta] is not
guaranteed
!!! note
In the future, we'd like to have `list` return an async iterable, just like
`get`, so that we can stream the result of `list`, but we need [some
changes](https://github.com/apache/arrow-rs/issues/6587) in the upstream
object-store repo first.
There is no async version of this method, because `list` is not async under the
hood, rather it only instantiates a stream, which can be polled in synchronous
or asynchronous fashion. See [`ListStream`][object_store_rs.ListStream].
Args:
store: The ObjectStore instance to use.
prefix: The prefix within ObjectStore to use for listing. Defaults to None.
Keyword Args:
offset: If provided, list all the objects with the given prefix and a location greater than `offset`. Defaults to `None`.
max_items: The maximum number of items to return. Defaults to 2000.
chunk_size: The number of items to collect per chunk in the returned
(async) iterator.
Returns:
A list of `ObjectMeta`.
"""

async def list_async(
store: ObjectStore,
prefix: str | None = None,
*,
offset: str | None = None,
max_items: int | None = 2000,
) -> List[ObjectMeta]:
"""Call `list` asynchronously.
Refer to the documentation for [list][object_store_rs.list].
A ListStream, which you can iterate through to access list results.
"""

def list_with_delimiter(store: ObjectStore, prefix: str | None = None) -> ListResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ from ._get import get_ranges_async as get_ranges_async
from ._head import head as head
from ._head import head_async as head_async
from ._list import ListResult as ListResult
from ._list import ListStream as ListStream
from ._list import ObjectMeta as ObjectMeta
from ._list import list as list
from ._list import list_async as list_async
from ._list import list_with_delimiter as list_with_delimiter
from ._list import list_with_delimiter_async as list_with_delimiter_async
from ._put import PutResult as PutResult
Expand Down
10 changes: 6 additions & 4 deletions object-store-rs/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::stream::{BoxStream, Fuse};
use futures::StreamExt;
use object_store::{GetOptions, GetResult, ObjectStore};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
Expand Down Expand Up @@ -108,23 +108,25 @@ impl PyGetResult {
}
}

// Note: we fuse the underlying stream so that we can get `None` multiple times.
// See the note on PyListStream for more background.
#[pyclass(name = "BytesStream")]
pub struct PyBytesStream {
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<Bytes>>>>>,
min_chunk_size: usize,
}

impl PyBytesStream {
fn new(stream: BoxStream<'static, object_store::Result<Bytes>>, min_chunk_size: usize) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
stream: Arc::new(Mutex::new(stream.fuse())),
min_chunk_size,
}
}
}

async fn next_stream(
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<Bytes>>>>>,
min_chunk_size: usize,
sync: bool,
) -> PyResult<PyBytesWrapper> {
Expand Down
1 change: 0 additions & 1 deletion object-store-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ fn _object_store_rs(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(get::get))?;
m.add_wrapped(wrap_pyfunction!(head::head_async))?;
m.add_wrapped(wrap_pyfunction!(head::head))?;
m.add_wrapped(wrap_pyfunction!(list::list_async))?;
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter_async))?;
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter))?;
m.add_wrapped(wrap_pyfunction!(list::list))?;
Expand Down
Loading

0 comments on commit 05c1f5d

Please sign in to comment.