From c6e1d9e85360dfa15de7612a27e8aaeb3e29fd74 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 15 Jul 2024 20:57:28 +0200 Subject: [PATCH] perf: Cache path resolving of `scan` functions (#17616) Co-authored-by: ritchie --- crates/polars-lazy/src/scan/csv.rs | 3 +- crates/polars-lazy/src/scan/ndjson.rs | 8 +- crates/polars-plan/src/plans/builder_dsl.rs | 21 ++++-- .../src/plans/conversion/dsl_to_ir.rs | 73 +++++++++++++------ .../polars-plan/src/plans/conversion/mod.rs | 4 +- crates/polars-plan/src/plans/mod.rs | 4 +- py-polars/tests/unit/io/test_lazy_csv.py | 2 +- 7 files changed, 78 insertions(+), 37 deletions(-) diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 415a6981c603..bec13f5e8253 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -219,7 +219,8 @@ impl LazyCsvReader { where F: Fn(Schema) -> PolarsResult, { - // TODO: This should be done when converting to the IR + // TODO: Path expansion should happen when converting to the IR + // https://github.com/pola-rs/polars/issues/17634 let paths = expand_paths(self.paths(), self.glob(), self.cloud_options())?; let Some(path) = paths.first() else { diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index f42b0ec48111..37209ed5710c 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -1,6 +1,6 @@ use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; -use std::sync::RwLock; +use std::sync::{Arc, Mutex, RwLock}; use polars_core::prelude::*; use polars_io::RowIndex; @@ -97,7 +97,7 @@ impl LazyFileListReader for LazyJsonLineReader { return self.finish_no_glob(); } - let paths = self.paths; + let paths = Arc::new(Mutex::new((self.paths, false))); let file_options = FileScanOptions { n_rows: self.n_rows, @@ -133,6 +133,8 @@ impl LazyFileListReader for LazyJsonLineReader { } fn finish_no_glob(self) -> PolarsResult { + let paths = Arc::new(Mutex::new((self.paths, false))); + let file_options = FileScanOptions { n_rows: self.n_rows, with_columns: None, @@ -157,7 +159,7 @@ impl LazyFileListReader for LazyJsonLineReader { let scan_type = FileScan::NDJson { options }; Ok(LazyFrame::from(DslPlan::Scan { - paths: self.paths, + paths, file_info: Arc::new(RwLock::new(None)), hive_parts: None, predicate: None, diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index c180842daff5..940fb0dabe27 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -1,4 +1,6 @@ -use std::sync::RwLock; +#[cfg(any(feature = "csv", feature = "ipc", feature = "parquet"))] +use std::path::PathBuf; +use std::sync::{Arc, Mutex, RwLock}; use polars_core::prelude::*; #[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] @@ -58,7 +60,7 @@ impl DslBuilder { }; Ok(DslPlan::Scan { - paths: Arc::new([]), + paths: Arc::new(Mutex::new((Arc::new([]), true))), file_info: Arc::new(RwLock::new(Some(file_info))), hive_parts: None, predicate: None, @@ -90,7 +92,7 @@ impl DslBuilder { glob: bool, include_file_paths: Option>, ) -> PolarsResult { - let paths = paths.into(); + let paths = init_paths(paths); let options = FileScanOptions { with_columns: None, @@ -135,7 +137,7 @@ impl DslBuilder { hive_options: HiveOptions, include_file_paths: Option>, ) -> PolarsResult { - let paths = paths.into(); + let paths = init_paths(paths); Ok(DslPlan::Scan { paths, @@ -172,7 +174,7 @@ impl DslBuilder { glob: bool, include_file_paths: Option>, ) -> PolarsResult { - let paths = paths.into(); + let paths = init_paths(paths); // This gets partially moved by FileScanOptions let read_options_clone = read_options.clone(); @@ -460,3 +462,12 @@ impl DslBuilder { .into() } } + +/// Initialize paths as non-expanded. +#[cfg(any(feature = "csv", feature = "ipc", feature = "parquet"))] +fn init_paths

(paths: P) -> Arc, bool)>> +where + P: Into>, +{ + Arc::new(Mutex::new((paths.into(), false))) +} diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index a51291e2d6d7..63399bdc5974 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1,4 +1,3 @@ -#[cfg(any(feature = "ipc", feature = "parquet"))] use std::path::PathBuf; use arrow::datatypes::ArrowSchemaRef; @@ -99,23 +98,7 @@ pub fn to_alp_impl( mut file_options, mut scan_type, } => { - let paths = match &scan_type { - #[cfg(feature = "parquet")] - FileScan::Parquet { - ref cloud_options, .. - } => expand_paths_with_hive_update(paths, &mut file_options, cloud_options)?, - #[cfg(feature = "ipc")] - FileScan::Ipc { - ref cloud_options, .. - } => expand_paths_with_hive_update(paths, &mut file_options, cloud_options)?, - #[cfg(feature = "csv")] - FileScan::Csv { - ref cloud_options, .. - } => expand_paths(&paths, file_options.glob, cloud_options.as_ref())?, - #[cfg(feature = "json")] - FileScan::NDJson { .. } => expand_paths(&paths, file_options.glob, None)?, - FileScan::Anonymous { .. } => paths, - }; + let paths = expand_scan_paths(paths, &mut scan_type, &mut file_options)?; let file_info_read = file_info.read().unwrap(); @@ -179,7 +162,7 @@ pub fn to_alp_impl( let hive_parts = if hive_parts.is_some() { hive_parts - } else if file_options.hive_options.enabled.unwrap() + } else if file_options.hive_options.enabled.unwrap_or(false) && resolved_file_info.reader_schema.is_some() { #[allow(unused_assignments)] @@ -737,21 +720,65 @@ pub fn to_alp_impl( Ok(lp_arena.add(v)) } +/// Expand scan paths if they were not already expanded. +#[allow(unused_variables)] +fn expand_scan_paths( + paths: Arc, bool)>>, + scan_type: &mut FileScan, + file_options: &mut FileScanOptions, +) -> PolarsResult> { + #[allow(unused_mut)] + let mut lock = paths.lock().unwrap(); + + // Return if paths are already expanded + if lock.1 { + return Ok(lock.0.clone()); + } + + { + let paths_expanded = match scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { + ref cloud_options, .. + } => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?, + #[cfg(feature = "ipc")] + FileScan::Ipc { + ref cloud_options, .. + } => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?, + #[cfg(feature = "csv")] + FileScan::Csv { + ref cloud_options, .. + } => expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())?, + #[cfg(feature = "json")] + FileScan::NDJson { .. } => expand_paths(&lock.0, file_options.glob, None)?, + FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded. + }; + + #[allow(unreachable_code)] + { + *lock = (paths_expanded, true); + + Ok(lock.0.clone()) + } + } +} + +/// Expand scan paths and update the Hive partition information of `file_options`. #[cfg(any(feature = "ipc", feature = "parquet"))] -fn expand_paths_with_hive_update( - paths: Arc<[PathBuf]>, +fn expand_scan_paths_with_hive_update( + paths: &[PathBuf], file_options: &mut FileScanOptions, cloud_options: &Option, ) -> PolarsResult> { let hive_enabled = file_options.hive_options.enabled; let (expanded_paths, hive_start_idx) = expand_paths_hive( - &paths, + paths, file_options.glob, cloud_options.as_ref(), hive_enabled.unwrap_or(false), )?; let inferred_hive_enabled = hive_enabled - .unwrap_or_else(|| expanded_from_single_directory(paths.as_ref(), expanded_paths.as_ref())); + .unwrap_or_else(|| expanded_from_single_directory(paths, expanded_paths.as_ref())); file_options.hive_options.enabled = Some(inferred_hive_enabled); file_options.hive_options.hive_start_idx = hive_start_idx; diff --git a/crates/polars-plan/src/plans/conversion/mod.rs b/crates/polars-plan/src/plans/conversion/mod.rs index 50c28984b188..275109cbc613 100644 --- a/crates/polars-plan/src/plans/conversion/mod.rs +++ b/crates/polars-plan/src/plans/conversion/mod.rs @@ -8,7 +8,7 @@ mod scans; mod stack_opt; use std::borrow::Cow; -use std::sync::RwLock; +use std::sync::{Arc, Mutex, RwLock}; pub use dsl_to_ir::*; pub use expr_to_ir::*; @@ -52,7 +52,7 @@ impl IR { output_schema: _, file_options: options, } => DslPlan::Scan { - paths, + paths: Arc::new(Mutex::new((paths, true))), file_info: Arc::new(RwLock::new(Some(file_info))), hive_parts, predicate: predicate.map(|e| e.to_expr(expr_arena)), diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index 1b55b6c3ba81..4f0ec8cafafb 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -1,7 +1,7 @@ use std::fmt; use std::fmt::Debug; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use hive::HivePartitions; use polars_core::prelude::*; @@ -78,7 +78,7 @@ pub enum DslPlan { cache_hits: u32, }, Scan { - paths: Arc<[PathBuf]>, + paths: Arc, bool)>>, // Option as this is mostly materialized on the IR phase. // During conversion we update the value in the DSL as well // This is to cater to use cases where parts of a `LazyFrame` diff --git a/py-polars/tests/unit/io/test_lazy_csv.py b/py-polars/tests/unit/io/test_lazy_csv.py index 140abe5da9e6..5672c4b1b7c4 100644 --- a/py-polars/tests/unit/io/test_lazy_csv.py +++ b/py-polars/tests/unit/io/test_lazy_csv.py @@ -425,7 +425,7 @@ def test_file_list_comment_skip_rows_16327(tmp_path: Path, streaming: bool) -> N assert_frame_equal(out, expect) -@pytest.mark.xfail(reason="Bug: https://github.com/pola-rs/polars/issues/17444") +@pytest.mark.xfail(reason="Bug: https://github.com/pola-rs/polars/issues/17634") def test_scan_csv_with_column_names_nonexistent_file() -> None: path_str = "my-nonexistent-data.csv" path = Path(path_str)