Skip to content

Commit

Permalink
perf: Cache path resolving of scan functions (#17616)
Browse files Browse the repository at this point in the history
Co-authored-by: ritchie <ritchie46@gmail.com>
  • Loading branch information
stinodego and ritchie46 authored Jul 15, 2024
1 parent 8bad2fd commit c6e1d9e
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 37 deletions.
3 changes: 2 additions & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
// TODO: This should be done when converting to the IR
// TODO: Path expansion should happen when converting to the IR
// https://github.com/pola-rs/polars/issues/17634
let paths = expand_paths(self.paths(), self.glob(), self.cloud_options())?;

let Some(path) = paths.first() else {
Expand Down
8 changes: 5 additions & 3 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::RwLock;
use std::sync::{Arc, Mutex, RwLock};

use polars_core::prelude::*;
use polars_io::RowIndex;
Expand Down Expand Up @@ -97,7 +97,7 @@ impl LazyFileListReader for LazyJsonLineReader {
return self.finish_no_glob();
}

let paths = self.paths;
let paths = Arc::new(Mutex::new((self.paths, false)));

let file_options = FileScanOptions {
n_rows: self.n_rows,
Expand Down Expand Up @@ -133,6 +133,8 @@ impl LazyFileListReader for LazyJsonLineReader {
}

fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let paths = Arc::new(Mutex::new((self.paths, false)));

let file_options = FileScanOptions {
n_rows: self.n_rows,
with_columns: None,
Expand All @@ -157,7 +159,7 @@ impl LazyFileListReader for LazyJsonLineReader {
let scan_type = FileScan::NDJson { options };

Ok(LazyFrame::from(DslPlan::Scan {
paths: self.paths,
paths,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
Expand Down
21 changes: 16 additions & 5 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::sync::RwLock;
#[cfg(any(feature = "csv", feature = "ipc", feature = "parquet"))]
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
Expand Down Expand Up @@ -58,7 +60,7 @@ impl DslBuilder {
};

Ok(DslPlan::Scan {
paths: Arc::new([]),
paths: Arc::new(Mutex::new((Arc::new([]), true))),
file_info: Arc::new(RwLock::new(Some(file_info))),
hive_parts: None,
predicate: None,
Expand Down Expand Up @@ -90,7 +92,7 @@ impl DslBuilder {
glob: bool,
include_file_paths: Option<Arc<str>>,
) -> PolarsResult<Self> {
let paths = paths.into();
let paths = init_paths(paths);

let options = FileScanOptions {
with_columns: None,
Expand Down Expand Up @@ -135,7 +137,7 @@ impl DslBuilder {
hive_options: HiveOptions,
include_file_paths: Option<Arc<str>>,
) -> PolarsResult<Self> {
let paths = paths.into();
let paths = init_paths(paths);

Ok(DslPlan::Scan {
paths,
Expand Down Expand Up @@ -172,7 +174,7 @@ impl DslBuilder {
glob: bool,
include_file_paths: Option<Arc<str>>,
) -> PolarsResult<Self> {
let paths = paths.into();
let paths = init_paths(paths);

// This gets partially moved by FileScanOptions
let read_options_clone = read_options.clone();
Expand Down Expand Up @@ -460,3 +462,12 @@ impl DslBuilder {
.into()
}
}

/// Initialize paths as non-expanded.
#[cfg(any(feature = "csv", feature = "ipc", feature = "parquet"))]
fn init_paths<P>(paths: P) -> Arc<Mutex<(Arc<[PathBuf]>, bool)>>
where
P: Into<Arc<[std::path::PathBuf]>>,
{
Arc::new(Mutex::new((paths.into(), false)))
}
73 changes: 50 additions & 23 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#[cfg(any(feature = "ipc", feature = "parquet"))]
use std::path::PathBuf;

use arrow::datatypes::ArrowSchemaRef;
Expand Down Expand Up @@ -99,23 +98,7 @@ pub fn to_alp_impl(
mut file_options,
mut scan_type,
} => {
let paths = match &scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
ref cloud_options, ..
} => expand_paths_with_hive_update(paths, &mut file_options, cloud_options)?,
#[cfg(feature = "ipc")]
FileScan::Ipc {
ref cloud_options, ..
} => expand_paths_with_hive_update(paths, &mut file_options, cloud_options)?,
#[cfg(feature = "csv")]
FileScan::Csv {
ref cloud_options, ..
} => expand_paths(&paths, file_options.glob, cloud_options.as_ref())?,
#[cfg(feature = "json")]
FileScan::NDJson { .. } => expand_paths(&paths, file_options.glob, None)?,
FileScan::Anonymous { .. } => paths,
};
let paths = expand_scan_paths(paths, &mut scan_type, &mut file_options)?;

let file_info_read = file_info.read().unwrap();

Expand Down Expand Up @@ -179,7 +162,7 @@ pub fn to_alp_impl(

let hive_parts = if hive_parts.is_some() {
hive_parts
} else if file_options.hive_options.enabled.unwrap()
} else if file_options.hive_options.enabled.unwrap_or(false)
&& resolved_file_info.reader_schema.is_some()
{
#[allow(unused_assignments)]
Expand Down Expand Up @@ -737,21 +720,65 @@ pub fn to_alp_impl(
Ok(lp_arena.add(v))
}

/// Expand scan paths if they were not already expanded.
#[allow(unused_variables)]
fn expand_scan_paths(
paths: Arc<Mutex<(Arc<[PathBuf]>, bool)>>,
scan_type: &mut FileScan,
file_options: &mut FileScanOptions,
) -> PolarsResult<Arc<[PathBuf]>> {
#[allow(unused_mut)]
let mut lock = paths.lock().unwrap();

// Return if paths are already expanded
if lock.1 {
return Ok(lock.0.clone());
}

{
let paths_expanded = match scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
ref cloud_options, ..
} => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?,
#[cfg(feature = "ipc")]
FileScan::Ipc {
ref cloud_options, ..
} => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?,
#[cfg(feature = "csv")]
FileScan::Csv {
ref cloud_options, ..
} => expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())?,
#[cfg(feature = "json")]
FileScan::NDJson { .. } => expand_paths(&lock.0, file_options.glob, None)?,
FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded.
};

#[allow(unreachable_code)]
{
*lock = (paths_expanded, true);

Ok(lock.0.clone())
}
}
}

/// Expand scan paths and update the Hive partition information of `file_options`.
#[cfg(any(feature = "ipc", feature = "parquet"))]
fn expand_paths_with_hive_update(
paths: Arc<[PathBuf]>,
fn expand_scan_paths_with_hive_update(
paths: &[PathBuf],
file_options: &mut FileScanOptions,
cloud_options: &Option<CloudOptions>,
) -> PolarsResult<Arc<[PathBuf]>> {
let hive_enabled = file_options.hive_options.enabled;
let (expanded_paths, hive_start_idx) = expand_paths_hive(
&paths,
paths,
file_options.glob,
cloud_options.as_ref(),
hive_enabled.unwrap_or(false),
)?;
let inferred_hive_enabled = hive_enabled
.unwrap_or_else(|| expanded_from_single_directory(paths.as_ref(), expanded_paths.as_ref()));
.unwrap_or_else(|| expanded_from_single_directory(paths, expanded_paths.as_ref()));

file_options.hive_options.enabled = Some(inferred_hive_enabled);
file_options.hive_options.hive_start_idx = hive_start_idx;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod scans;
mod stack_opt;

use std::borrow::Cow;
use std::sync::RwLock;
use std::sync::{Arc, Mutex, RwLock};

pub use dsl_to_ir::*;
pub use expr_to_ir::*;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl IR {
output_schema: _,
file_options: options,
} => DslPlan::Scan {
paths,
paths: Arc::new(Mutex::new((paths, true))),
file_info: Arc::new(RwLock::new(Some(file_info))),
hive_parts,
predicate: predicate.map(|e| e.to_expr(expr_arena)),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};

use hive::HivePartitions;
use polars_core::prelude::*;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub enum DslPlan {
cache_hits: u32,
},
Scan {
paths: Arc<[PathBuf]>,
paths: Arc<Mutex<(Arc<[PathBuf]>, bool)>>,
// Option as this is mostly materialized on the IR phase.
// During conversion we update the value in the DSL as well
// This is to cater to use cases where parts of a `LazyFrame`
Expand Down
2 changes: 1 addition & 1 deletion py-polars/tests/unit/io/test_lazy_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def test_file_list_comment_skip_rows_16327(tmp_path: Path, streaming: bool) -> N
assert_frame_equal(out, expect)


@pytest.mark.xfail(reason="Bug: https://github.com/pola-rs/polars/issues/17444")
@pytest.mark.xfail(reason="Bug: https://github.com/pola-rs/polars/issues/17634")
def test_scan_csv_with_column_names_nonexistent_file() -> None:
path_str = "my-nonexistent-data.csv"
path = Path(path_str)
Expand Down

0 comments on commit c6e1d9e

Please sign in to comment.