Skip to content

Commit

Permalink
Add test for ParquetObjectReader::with_runtime and fix clippy complaints
Browse files Browse the repository at this point in the history
  • Loading branch information
itsjunetime committed Oct 21, 2024
1 parent 62cb6be commit 74ffb19
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 30 deletions.
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# The tokio_unstable flag is needed for the parquet::arrow::async_reader::test_runtime_is_used
# test, as it relies on checking the amount of tasks that have run on a runtime
[test]
rustflags = ["--cfg", "tokio_unstable"]
2 changes: 1 addition & 1 deletion arrow-json/src/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl Encoder for ArrayFormatter<'_> {
/// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the value with `"`
struct RawArrayFormatter<'a>(ArrayFormatter<'a>);

impl<'a> Encoder for RawArrayFormatter<'a> {
impl Encoder for RawArrayFormatter<'_> {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
let _ = write!(out, "{}", self.0.value(idx));
}
Expand Down
2 changes: 1 addition & 1 deletion arrow-string/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn equals_kernel((n, h): (&u8, &u8)) -> bool {
}

fn equals_ignore_ascii_case_kernel((n, h): (&u8, &u8)) -> bool {
n.to_ascii_lowercase() == h.to_ascii_lowercase()
n.eq_ignore_ascii_case(h)
}

/// Transforms a like `pattern` to a regex compatible pattern. To achieve that, it does:
Expand Down
4 changes: 4 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ readme = "README.md"
edition = { workspace = true }
rust-version = "1.70.0"

# we need this for the arrow::async_reader::test_runtime_is_used test
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tokio_unstable)"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] }

Expand Down
96 changes: 68 additions & 28 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::sync::Arc;
use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use tokio::runtime::Handle;

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaDataReader, ParquetMetaData};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
///
Expand Down Expand Up @@ -118,41 +115,46 @@ impl ParquetObjectReader {
}
}

fn spawn<F, O>(&self, f: F) -> BoxFuture<'_, Result<O>>
fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O>>
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
+ Send
+ 'static,
O: Send + 'static,
E: Into<ParquetError> + Send + 'static,
{
match &self.runtime {
Some(handle) => {
let path = self.meta.location.clone();
let store = Arc::clone(&self.store);
let fut = handle.spawn(async move { f(&store, &path).await });
fut.unwrap_or_else(|e| match e.try_into_panic() {
Ok(p) => std::panic::resume_unwind(p),
Err(e) => Err(ParquetError::External(Box::new(e))),
})
.boxed()
handle
.spawn(async move { f(&store, &path).await })
.map_ok_or_else(
|e| match e.try_into_panic() {
Err(e) => Err(ParquetError::External(Box::new(e))),
Ok(p) => std::panic::resume_unwind(p),
},
|res| res.map_err(|e| e.into()),
)
.boxed()
}
None => f(&self.store, &self.meta.location),
None => f(&self.store, &self.meta.location)
.map_err(|e| e.into())
.boxed(),
}
}
}

impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.spawn(|store, path| store.get_range(path, range).map_err(|e| e.into()).boxed())
self.spawn(|store, path| store.get_range(path, range))
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
self.spawn(move |store, path| {
async move { store.get_ranges(path, &ranges).await.map_err(|e| e.into()) }.boxed()
})
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Expand All @@ -178,31 +180,40 @@ mod tests {
use arrow::util::test_util::parquet_test_data;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectStore;
use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::ParquetObjectReader;
use crate::arrow::ParquetRecordBatchStreamBuilder;

#[tokio::test]
async fn test_simple() {
async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();

let mut meta = store
let meta = store
.head(&Path::from("alltypes_plain.parquet"))
.await
.unwrap();

let store = Arc::new(store) as Arc<dyn ObjectStore>;
let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta.clone());
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
let object_reader = ParquetObjectReader::new(store, meta);

let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
}

#[tokio::test]
async fn test_not_found() {
let (mut meta, store) = get_meta_store().await;
meta.location = Path::from("I don't exist.parquet");

let object_reader = ParquetObjectReader::new(store, meta);
Expand All @@ -213,10 +224,39 @@ mod tests {
let err = e.to_string();
assert!(
err.contains("not found: No such file or directory (os error 2)"),
"{}",
err
"{err}",
);
}
}
}

#[tokio::test]
// We need to mark this with the `target_has_atomic` because the spawned_tasks_count() fn is
// only available for that cfg
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
async fn test_runtime_is_used() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let (meta, store) = get_meta_store().await;

let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());

let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

// Just copied these assert_eqs from the `test_timple` above
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);

// According to tokio documentation for the `spawned_tasks_count` method, this number
// starts at 0 when the runtime is created. So this check should actually verify what we
// want.
assert!(rt.metrics().spawned_tasks_count() > 0);

// Runtimes have to be dropped in blocking contexts, so we need to move this one to a new
// blocking thread to drop it.
tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
}
}

0 comments on commit 74ffb19

Please sign in to comment.