From e0faacc8a0fb7251c860930eda9380e0bcf35d02 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 18 Oct 2024 12:49:22 -0400 Subject: [PATCH 1/2] attempt at list stream --- object-store-rs/src/list.rs | 149 +++++++++++++++++++++++++++++++----- 1 file changed, 131 insertions(+), 18 deletions(-) diff --git a/object-store-rs/src/list.rs b/object-store-rs/src/list.rs index f84e0a8..3e46b7f 100644 --- a/object-store-rs/src/list.rs +++ b/object-store-rs/src/list.rs @@ -1,12 +1,15 @@ use std::collections::HashMap; use std::sync::Arc; -use futures::TryStreamExt; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{ListResult, ObjectMeta, ObjectStore}; +use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration}; use pyo3::prelude::*; use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; use pyo3_object_store::PyObjectStore; +use tokio::sync::Mutex; use crate::runtime::get_runtime; @@ -30,9 +33,9 @@ impl IntoPy for PyObjectMeta { } } -pub(crate) struct PyListResult(ListResult); +pub(crate) struct PyMaterializedListResult(ListResult); -impl IntoPy for PyListResult { +impl IntoPy for PyMaterializedListResult { fn into_py(self, py: Python<'_>) -> PyObject { let mut dict = HashMap::new(); dict.insert( @@ -57,21 +60,131 @@ impl IntoPy for PyListResult { } } +#[pyclass(name = "ListResult")] +pub(crate) struct PyListResult<'a> { + // store: Arc, + // stream: Option>>>>, + payload: BoxStream<'a, object_store::Result>, + min_chunk_size: usize, +} + +impl PyListResult { + fn new( + // store: Arc, + // prefix: Option<&Path>, + stream: BoxStream<'static, object_store::Result>, + min_chunk_size: usize, + ) -> Self { + // let stream = store.as_ref().list(prefix); + Self { + payload: stream, + min_chunk_size, + } + } +} + +#[pyclass(name = "ListStream")] +pub struct PyListStream { + stream: Arc>>>, + min_chunk_size: usize, +} + +impl PyListStream { + fn new( + stream: BoxStream<'static, object_store::Result>, + min_chunk_size: usize, + ) -> Self { + Self { + stream: Arc::new(Mutex::new(stream)), + min_chunk_size, + } + } +} + +async fn next_stream( + stream: Arc>>>, + min_chunk_size: usize, + sync: bool, +) -> PyResult<()> { + let mut stream = stream.lock().await; + let mut metas: Vec = vec![]; + loop { + match stream.next().await { + Some(Ok(meta)) => { + metas.push(meta); + if metas.len() >= min_chunk_size { + todo!() + // return Ok(PyBytesWrapper::new_multiple(buffers)); + } + } + Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()), + None => { + if metas.is_empty() { + // Depending on whether the iteration is sync or not, we raise either a + // StopIteration or a StopAsyncIteration + if sync { + return Err(PyStopIteration::new_err("stream exhausted")); + } else { + return Err(PyStopAsyncIteration::new_err("stream exhausted")); + } + } else { + todo!() + // return Ok(PyBytesWrapper::new_multiple(buffers)); + } + } + }; + } +} + +#[pymethods] +impl PyListStream { + fn __aiter__(slf: Py) -> Py { + slf + } + + fn __iter__(slf: Py) -> Py { + slf + } + + fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult> { + let stream = self.stream.clone(); + pyo3_async_runtimes::tokio::future_into_py( + py, + next_stream(stream, self.min_chunk_size, false), + ) + } + + fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<()> { + let runtime = get_runtime(py)?; + let stream = self.stream.clone(); + runtime.block_on(next_stream(stream, self.min_chunk_size, true)) + } +} + #[pyfunction] -#[pyo3(signature = (store, prefix = None))] -pub(crate) fn list( - py: Python, +#[pyo3(signature = (store, prefix = None, *, min_chunk_size = 1000))] +pub(crate) fn list<'py>( + py: Python<'py>, store: PyObjectStore, prefix: Option, -) -> PyObjectStoreResult> { - let runtime = get_runtime(py)?; - py.allow_threads(|| { - let out = runtime.block_on(list_materialize( - store.into_inner(), - prefix.map(|s| s.into()).as_ref(), - ))?; - Ok::<_, PyObjectStoreError>(out) - }) + min_chunk_size: usize, +) -> PyObjectStoreResult { + // todo!() + // // let runtime = get_runtime(py)?; + // let store = store.into_inner(); + let stream = store.as_ref().list(prefix.map(|s| s.into()).as_ref()); + // todo!() + Ok(PyListResult::new(stream, min_chunk_size)) + // Ok::<_, PyObjectStoreError>(PyListResult::new(store, stream, min_chunk_size)) + // // py.allow_threads(move || { + + // // let x = runtime.block_on(fut); + // // let out = runtime.block_on(list_materialize( + // // store.into_inner(), + // // prefix.map(|s| s.into()).as_ref(), + // // ))?; + // // Ok::<_, PyObjectStoreError>(out) + // }) } #[pyfunction] @@ -101,7 +214,7 @@ pub(crate) fn list_with_delimiter( py: Python, store: PyObjectStore, prefix: Option, -) -> PyObjectStoreResult { +) -> PyObjectStoreResult { let runtime = get_runtime(py)?; py.allow_threads(|| { let out = runtime.block_on(list_with_delimiter_materialize( @@ -130,7 +243,7 @@ pub(crate) fn list_with_delimiter_async( async fn list_with_delimiter_materialize( store: Arc, prefix: Option<&Path>, -) -> PyObjectStoreResult { +) -> PyObjectStoreResult { let list_result = store.list_with_delimiter(prefix).await?; - Ok(PyListResult(list_result)) + Ok(PyMaterializedListResult(list_result)) } From 2f02ca5653a13470c98633482401ffc785b9fa9b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 22 Oct 2024 11:00:27 -0400 Subject: [PATCH 2/2] Attempt ouroboros --- Cargo.lock | 78 +++++++++++++++++++++++++++++++++++-- object-store-rs/Cargo.toml | 1 + object-store-rs/src/list.rs | 15 +++---- 3 files changed, 84 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f15fc3..643dca4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -343,6 +349,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -513,6 +525,15 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -639,6 +660,7 @@ dependencies = [ "http", "indexmap", "object_store", + "ouroboros", "pyo3", "pyo3-async-runtimes", "pyo3-file", @@ -661,7 +683,7 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools", + "itertools 0.13.0", "md-5", "parking_lot", "percent-encoding", @@ -691,6 +713,31 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "ouroboros" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944fa20996a25aded6b4795c6d63f10014a7a83f8be9828a11860b08c5fc4a67" +dependencies = [ + "aliasable", + "ouroboros_macro", + "static_assertions", +] + +[[package]] +name = "ouroboros_macro" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39b0deead1528fd0e5947a8546a9642a9777c25f6e1e26f34c97b204bbb465bd" +dependencies = [ + "heck 0.4.1", + "itertools 0.12.1", + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -756,6 +803,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "version_check", + "yansi", +] + [[package]] name = "pyo3" version = "0.22.5" @@ -835,7 +895,7 @@ version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1e3f09eecd94618f60a455a23def79f79eba4dc561a97324bf9ac8c6df30ce" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "pyo3-build-config", "quote", @@ -1217,7 +1277,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn", @@ -1239,6 +1299,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "subtle" version = "2.6.1" @@ -1701,6 +1767,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/object-store-rs/Cargo.toml b/object-store-rs/Cargo.toml index 9d10f27..bcd6b62 100644 --- a/object-store-rs/Cargo.toml +++ b/object-store-rs/Cargo.toml @@ -24,6 +24,7 @@ futures = { workspace = true } http = { workspace = true } indexmap = { workspace = true } object_store = { workspace = true } +ouroboros = "0.18" pyo3 = { workspace = true, features = ["chrono", "abi3-py39"] } pyo3-async-runtimes = { workspace = true, features = ["tokio-runtime"] } pyo3-file = { workspace = true } diff --git a/object-store-rs/src/list.rs b/object-store-rs/src/list.rs index b364efc..154b137 100644 --- a/object-store-rs/src/list.rs +++ b/object-store-rs/src/list.rs @@ -5,6 +5,7 @@ use futures::StreamExt; use indexmap::IndexMap; use object_store::path::Path; use object_store::{ListResult, ObjectMeta, ObjectStore}; +use ouroboros::self_referencing; use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration}; use pyo3::prelude::*; use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; @@ -62,13 +63,13 @@ impl IntoPy for PyMaterializedListResult { } } -// #[pyclass(name = "ListResult")] -// pub(crate) struct PyListResult<'a> { -// // store: Arc, -// // stream: Option>>>>, -// payload: BoxStream<'a, object_store::Result>, -// min_chunk_size: usize, -// } +#[pyclass(name = "ListResult")] +#[self_referencing] +pub(crate) struct PyListResult { + store: Arc, + #[borrows(store)] + payload: BoxStream<'this, object_store::Result>, +} // impl PyListResult { // fn new(