diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index ae7d0d94e56b..944f15c4a080 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -442,7 +442,7 @@ impl Encoder for ArrayFormatter<'_> { /// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the value with `"` struct RawArrayFormatter<'a>(ArrayFormatter<'a>); -impl<'a> Encoder for RawArrayFormatter<'a> { +impl Encoder for RawArrayFormatter<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { let _ = write!(out, "{}", self.0.value(idx)); } diff --git a/arrow-string/src/predicate.rs b/arrow-string/src/predicate.rs index f559088e6c96..408d9d45cc75 100644 --- a/arrow-string/src/predicate.rs +++ b/arrow-string/src/predicate.rs @@ -239,7 +239,7 @@ fn equals_kernel((n, h): (&u8, &u8)) -> bool { } fn equals_ignore_ascii_case_kernel((n, h): (&u8, &u8)) -> bool { - n.to_ascii_lowercase() == h.to_ascii_lowercase() + n.eq_ignore_ascii_case(h) } /// Transforms a like `pattern` to a regex compatible pattern. To achieve that, it does: diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 8ab78efe0cb7..609501a3ce78 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -81,7 +81,7 @@ lz4_flex = { version = "0.11", default-features = false, features = ["std", "fra zstd = { version = "0.13", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } -tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } object_store = { version = "0.11.0", default-features = false, features = ["azure"] } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index e6b47856ebe8..ab88c54afc7e 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Range; -use std::sync::Arc; +use std::{ops::Range, sync::Arc}; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, TryFutureExt}; - -use object_store::{ObjectMeta, ObjectStore}; +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; +use tokio::runtime::Handle; use crate::arrow::async_reader::AsyncFileReader; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; /// Reads Parquet files in object storage using [`ObjectStore`]. @@ -59,6 +57,7 @@ pub struct ParquetObjectReader { metadata_size_hint: Option, preload_column_index: bool, preload_offset_index: bool, + runtime: Option, } impl ParquetObjectReader { @@ -72,6 +71,7 @@ impl ParquetObjectReader { metadata_size_hint: None, preload_column_index: false, preload_offset_index: false, + runtime: None, } } @@ -99,27 +99,62 @@ impl ParquetObjectReader { ..self } } + + /// Perform IO on the provided tokio runtime + /// + /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner + /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding, + /// on the same tokio runtime can lead to degraded throughput, dropped connections and + /// other issues. For more information see [here]. + /// + /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ + pub fn with_runtime(self, handle: Handle) -> Self { + Self { + runtime: Some(handle), + ..self + } + } + + fn spawn(&self, f: F) -> BoxFuture<'_, Result> + where + F: for<'a> FnOnce(&'a Arc, &'a Path) -> BoxFuture<'a, Result> + + Send + + 'static, + O: Send + 'static, + E: Into + Send + 'static, + { + match &self.runtime { + Some(handle) => { + let path = self.meta.location.clone(); + let store = Arc::clone(&self.store); + handle + .spawn(async move { f(&store, &path).await }) + .map_ok_or_else( + |e| match e.try_into_panic() { + Err(e) => Err(ParquetError::External(Box::new(e))), + Ok(p) => std::panic::resume_unwind(p), + }, + |res| res.map_err(|e| e.into()), + ) + .boxed() + } + None => f(&self.store, &self.meta.location) + .map_err(|e| e.into()) + .boxed(), + } + } } impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.store - .get_range(&self.meta.location, range) - .map_err(|e| e.into()) - .boxed() + self.spawn(|store, path| store.get_range(path, range)) } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - async move { - self.store - .get_ranges(&self.meta.location, &ranges) - .await - .map_err(|e| e.into()) - } - .boxed() + self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { @@ -138,30 +173,42 @@ impl AsyncFileReader for ParquetObjectReader { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{ + convert::Infallible, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }; use futures::TryStreamExt; use arrow::util::test_util::parquet_test_data; + use futures::FutureExt; use object_store::local::LocalFileSystem; use object_store::path::Path; - use object_store::ObjectStore; + use object_store::{ObjectMeta, ObjectStore}; use crate::arrow::async_reader::ParquetObjectReader; use crate::arrow::ParquetRecordBatchStreamBuilder; - #[tokio::test] - async fn test_simple() { + async fn get_meta_store() -> (ObjectMeta, Arc) { let res = parquet_test_data(); let store = LocalFileSystem::new_with_prefix(res).unwrap(); - let mut meta = store + let meta = store .head(&Path::from("alltypes_plain.parquet")) .await .unwrap(); - let store = Arc::new(store) as Arc; - let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta.clone()); + (meta, Arc::new(store) as Arc) + } + + #[tokio::test] + async fn test_simple() { + let (meta, store) = get_meta_store().await; + let object_reader = ParquetObjectReader::new(store, meta); + let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); @@ -169,7 +216,11 @@ mod tests { assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 8); + } + #[tokio::test] + async fn test_not_found() { + let (mut meta, store) = get_meta_store().await; meta.location = Path::from("I don't exist.parquet"); let object_reader = ParquetObjectReader::new(store, meta); @@ -180,10 +231,69 @@ mod tests { let err = e.to_string(); assert!( err.contains("not found: No such file or directory (os error 2)"), - "{}", - err + "{err}", ); } } } + + #[tokio::test] + // We need to mark this with the `target_has_atomic` because the spawned_tasks_count() fn is + // only available for that cfg + async fn test_runtime_is_used() { + let num_actions = Arc::new(AtomicUsize::new(0)); + + let (a1, a2) = (num_actions.clone(), num_actions.clone()); + let rt = tokio::runtime::Builder::new_multi_thread() + .on_thread_park(move || { + a1.fetch_add(1, Ordering::Relaxed); + }) + .on_thread_unpark(move || { + a2.fetch_add(1, Ordering::Relaxed); + }) + .build() + .unwrap(); + + let (meta, store) = get_meta_store().await; + + let initial_actions = num_actions.load(Ordering::Relaxed); + + let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone()); + + let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); + + // Just copied these assert_eqs from the `test_timple` above + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 8); + + assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0); + + // Runtimes have to be dropped in blocking contexts, so we need to move this one to a new + // blocking thread to drop it. + tokio::runtime::Handle::current().spawn_blocking(move || drop(rt)); + } + + #[tokio::test] + async fn test_runtime_thread_id_different() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + let (meta, store) = get_meta_store().await; + + let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone()); + + let current_id = std::thread::current().id(); + + let other_id = reader + .spawn(|_, _| async move { Ok::<_, Infallible>(std::thread::current().id()) }.boxed()) + .await + .unwrap(); + + assert_ne!(current_id, other_id); + + tokio::runtime::Handle::current().spawn_blocking(move || drop(rt)); + } } diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index bb4d2543c7b4..0c879af07f10 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -107,6 +107,13 @@ impl From for ParquetError { } } +#[cfg(test)] +impl From for ParquetError { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + #[cfg(feature = "arrow")] impl From for ParquetError { fn from(e: ArrowError) -> ParquetError {