diff --git a/Cargo.lock b/Cargo.lock index 6793f64277e0..998c34092acc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3182,6 +3182,7 @@ dependencies = [ "bitflags 2.5.0", "futures", "glob", + "memchr", "once_cell", "polars-arrow", "polars-core", diff --git a/crates/polars-io/src/options.rs b/crates/polars-io/src/options.rs index 606e7b46536e..61a1233234f2 100644 --- a/crates/polars-io/src/options.rs +++ b/crates/polars-io/src/options.rs @@ -17,6 +17,7 @@ pub struct RowIndex { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct HiveOptions { pub enabled: bool, + pub hive_start_idx: usize, pub schema: Option, } @@ -24,6 +25,7 @@ impl Default for HiveOptions { fn default() -> Self { Self { enabled: true, + hive_start_idx: 0, schema: None, } } diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index e8b2ae406788..34767752c1bb 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -26,6 +26,7 @@ polars-utils = { workspace = true } ahash = { workspace = true } bitflags = { workspace = true } glob = { version = "0.3" } +memchr = { workspace = true } once_cell = { workspace = true } pyo3 = { workspace = true, optional = true } rayon = { workspace = true } diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 577441e3dee7..117f2c705cc0 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -13,7 +13,6 @@ use crate::prelude::*; #[derive(Clone)] #[cfg(feature = "csv")] pub struct LazyCsvReader { - path: PathBuf, paths: Arc<[PathBuf]>, glob: bool, cache: bool, @@ -35,8 +34,7 @@ impl LazyCsvReader { pub fn new(path: impl AsRef) -> Self { LazyCsvReader { - path: path.as_ref().to_owned(), - paths: Arc::new([]), + paths: Arc::new([path.as_ref().to_path_buf()]), glob: true, cache: true, read_options: Default::default(), @@ -218,15 +216,13 @@ impl LazyCsvReader { where F: Fn(Schema) -> PolarsResult, { - let mut file = if let Some(mut paths) = self.iter_paths()? { - let path = match paths.next() { - Some(globresult) => globresult?, - None => polars_bail!(ComputeError: "globbing pattern did not match any files"), - }; - polars_utils::open_file(path) - } else { - polars_utils::open_file(&self.path) - }?; + let paths = self.expand_paths()?.0; + let Some(path) = paths.first() else { + polars_bail!(ComputeError: "no paths specified for this reader"); + }; + + let mut file = polars_utils::open_file(path)?; + let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file"); let skip_rows = self.read_options.skip_rows; let parse_options = self.read_options.get_parse_options(); @@ -264,25 +260,9 @@ impl LazyCsvReader { impl LazyFileListReader for LazyCsvReader { /// Get the final [LazyFrame]. - fn finish(mut self) -> PolarsResult { - if !self.glob { - return self.finish_no_glob(); - } - if let Some(paths) = self.iter_paths()? { - let paths = paths - .into_iter() - .collect::>>()?; - self.paths = paths; - } - self.finish_no_glob() - } - - fn finish_no_glob(self) -> PolarsResult { - let paths = if self.paths.is_empty() { - Arc::new([self.path]) - } else { - self.paths - }; + fn finish(self) -> PolarsResult { + // `expand_paths` respects globs + let paths = self.expand_paths()?.0; let mut lf: LazyFrame = DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)? @@ -292,23 +272,18 @@ impl LazyFileListReader for LazyCsvReader { Ok(lf) } - fn glob(&self) -> bool { - self.glob + fn finish_no_glob(self) -> PolarsResult { + unreachable!(); } - fn path(&self) -> &Path { - &self.path + fn glob(&self) -> bool { + self.glob } fn paths(&self) -> &[PathBuf] { &self.paths } - fn with_path(mut self, path: PathBuf) -> Self { - self.path = path; - self - } - fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self diff --git a/crates/polars-lazy/src/scan/file_list_reader.rs b/crates/polars-lazy/src/scan/file_list_reader.rs index 70971c424f12..e31771e78334 100644 --- a/crates/polars-lazy/src/scan/file_list_reader.rs +++ b/crates/polars-lazy/src/scan/file_list_reader.rs @@ -1,4 +1,5 @@ -use std::path::{Path, PathBuf}; +use std::collections::VecDeque; +use std::path::PathBuf; use polars_core::error::to_compute_err; use polars_core::prelude::*; @@ -11,23 +12,127 @@ use crate::prelude::*; pub type PathIterator = Box>>; -// cloud_options is used only with async feature -#[allow(unused_variables)] -fn polars_glob(pattern: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult { - if is_cloud_url(pattern) { +/// Recursively traverses directories and expands globs if `glob` is `true`. +/// Returns the expanded paths and the index at which to start parsing hive +/// partitions from the path. +fn expand_paths( + paths: &[PathBuf], + #[allow(unused_variables)] cloud_options: Option<&CloudOptions>, + glob: bool, +) -> PolarsResult<(Arc<[PathBuf]>, usize)> { + let Some(first_path) = paths.first() else { + return Ok((vec![].into(), 0)); + }; + + let is_cloud = is_cloud_url(first_path); + let mut expand_start_idx = usize::MAX; + let mut out_paths = vec![]; + + if is_cloud { #[cfg(feature = "async")] { - let paths = polars_io::async_glob(pattern, cloud_options)?; - Ok(Box::new(paths.into_iter().map(|a| Ok(PathBuf::from(&a))))) + use polars_io::cloud::{CloudLocation, PolarsObjectStore}; + + fn is_file_cloud( + path: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + polars_io::pl_async::get_runtime().block_on_potential_spawn(async { + let (CloudLocation { prefix, .. }, store) = + polars_io::cloud::build_object_store(path, cloud_options).await?; + let store = PolarsObjectStore::new(store); + PolarsResult::Ok(store.head(&prefix.into()).await.is_ok()) + }) + } + + for path in paths { + let glob_start_idx = + memchr::memchr3(b'*', b'?', b'[', path.to_str().unwrap().as_bytes()); + + let (path, glob_start_idx) = if let Some(glob_start_idx) = glob_start_idx { + (path.clone(), glob_start_idx) + } else if !path.ends_with("/") + && is_file_cloud(path.to_str().unwrap(), cloud_options)? + { + out_paths.push(path.clone()); + continue; + } else if !glob { + polars_bail!(ComputeError: "not implemented: did not find cloud file at path = {} and `glob` was set to false", path.to_str().unwrap()); + } else { + (path.join("**/*"), path.to_str().unwrap().len()) + }; + + expand_start_idx = std::cmp::min(expand_start_idx, glob_start_idx); + + out_paths.extend( + polars_io::async_glob(path.to_str().unwrap(), cloud_options)? + .into_iter() + .map(PathBuf::from), + ); + } } #[cfg(not(feature = "async"))] panic!("Feature `async` must be enabled to use globbing patterns with cloud urls.") } else { - let paths = glob::glob(pattern) - .map_err(|_| polars_err!(ComputeError: "invalid glob pattern given"))?; - let paths = paths.map(|v| v.map_err(to_compute_err)); - Ok(Box::new(paths)) + let mut stack = VecDeque::new(); + + for path in paths { + stack.clear(); + + if path.is_dir() { + let i = path.to_str().unwrap().len(); + + expand_start_idx = std::cmp::min(expand_start_idx, i); + + stack.push_back(path.clone()); + + while let Some(dir) = stack.pop_front() { + let mut paths = std::fs::read_dir(dir) + .map_err(PolarsError::from)? + .map(|x| x.map(|x| x.path())) + .collect::>>() + .map_err(PolarsError::from)?; + paths.sort_unstable(); + + for path in paths { + if path.is_dir() { + stack.push_back(path); + } else { + out_paths.push(path); + } + } + } + + continue; + } + + let i = memchr::memchr3(b'*', b'?', b'[', path.to_str().unwrap().as_bytes()); + + if glob && i.is_some() { + let i = i.unwrap(); + expand_start_idx = std::cmp::min(expand_start_idx, i); + + let Ok(paths) = glob::glob(path.to_str().unwrap()) else { + polars_bail!(ComputeError: "invalid glob pattern given") + }; + + for path in paths { + out_paths.push(path.map_err(to_compute_err)?); + } + } else { + out_paths.push(path.clone()); + } + } } + + // Todo: + // This maintains existing behavior - will remove very soon. + expand_start_idx = 0; + + Ok(( + out_paths.into_iter().collect::>(), + expand_start_idx, + )) } /// Reads [LazyFrame] from a filesystem or a cloud storage. @@ -40,43 +145,42 @@ pub trait LazyFileListReader: Clone { if !self.glob() { return self.finish_no_glob(); } - if let Some(paths) = self.iter_paths()? { - let lfs = paths - .map(|r| { - let path = r?; - self.clone() - // Each individual reader should not apply a row limit. - .with_n_rows(None) - // Each individual reader should not apply a row index. - .with_row_index(None) - .with_path(path.clone()) - .with_rechunk(false) - .finish_no_glob() - .map_err(|e| { - polars_err!( - ComputeError: "error while reading {}: {}", path.display(), e - ) - }) - }) - .collect::>>()?; - - polars_ensure!( - !lfs.is_empty(), - ComputeError: "no matching files found in {}", self.path().display() - ); - - let mut lf = self.concat_impl(lfs)?; - if let Some(n_rows) = self.n_rows() { - lf = lf.slice(0, n_rows as IdxSize) - }; - if let Some(rc) = self.row_index() { - lf = lf.with_row_index(&rc.name, Some(rc.offset)) - }; - - Ok(lf) - } else { - self.finish_no_glob() - } + + let paths = self.expand_paths()?.0; + + let lfs = paths + .iter() + .map(|path| { + self.clone() + // Each individual reader should not apply a row limit. + .with_n_rows(None) + // Each individual reader should not apply a row index. + .with_row_index(None) + .with_paths(Arc::new([path.clone()])) + .with_rechunk(false) + .finish_no_glob() + .map_err(|e| { + polars_err!( + ComputeError: "error while reading {}: {}", path.display(), e + ) + }) + }) + .collect::>>()?; + + polars_ensure!( + !lfs.is_empty(), + ComputeError: "no matching files found in {:?}", self.paths().iter().map(|x| x.to_str().unwrap()).collect::>() + ); + + let mut lf = self.concat_impl(lfs)?; + if let Some(n_rows) = self.n_rows() { + lf = lf.slice(0, n_rows as IdxSize) + }; + if let Some(rc) = self.row_index() { + lf = lf.with_row_index(&rc.name, Some(rc.offset)) + }; + + Ok(lf) } /// Recommended concatenation of [LazyFrame]s from many input files. @@ -104,19 +208,9 @@ pub trait LazyFileListReader: Clone { true } - /// Path of the scanned file. - /// It can be potentially a glob pattern. - fn path(&self) -> &Path; - fn paths(&self) -> &[PathBuf]; - /// Set path of the scanned file. - /// Support glob patterns. - #[must_use] - fn with_path(self, path: PathBuf) -> Self; - /// Set paths of the scanned files. - /// Doesn't glob patterns. #[must_use] fn with_paths(self, paths: Arc<[PathBuf]>) -> Self; @@ -145,23 +239,9 @@ pub trait LazyFileListReader: Clone { None } - /// Get list of files referenced by this reader. - /// - /// Returns [None] if path is not a glob pattern. - fn iter_paths(&self) -> PolarsResult> { - let paths = self.paths(); - if paths.is_empty() { - let path_str = self.path().to_string_lossy(); - if path_str.contains('*') || path_str.contains('?') || path_str.contains('[') { - polars_glob(&path_str, self.cloud_options()).map(Some) - } else { - Ok(None) - } - } else { - polars_ensure!(self.path().to_string_lossy() == "", InvalidOperation: "expected only a single path argument"); - // Lint is incorrect as we need static lifetime. - #[allow(clippy::unnecessary_to_owned)] - Ok(Some(Box::new(paths.to_vec().into_iter().map(Ok)))) - } + /// Returns a list of paths after resolving globs and directories, as well as + /// the string index at which to start parsing hive partitions. + fn expand_paths(&self) -> PolarsResult<(Arc<[PathBuf]>, usize)> { + expand_paths(self.paths(), self.cloud_options(), self.glob()) } } diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index 5672802a61f2..a08764aeeda4 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -33,40 +33,23 @@ impl Default for ScanArgsIpc { #[derive(Clone)] struct LazyIpcReader { args: ScanArgsIpc, - path: PathBuf, paths: Arc<[PathBuf]>, } impl LazyIpcReader { - fn new(path: PathBuf, args: ScanArgsIpc) -> Self { + fn new(args: ScanArgsIpc) -> Self { Self { args, - path, paths: Arc::new([]), } } } impl LazyFileListReader for LazyIpcReader { - fn finish(mut self) -> PolarsResult { - if let Some(paths) = self.iter_paths()? { - let paths = paths - .into_iter() - .collect::>>()?; - self.paths = paths; - } - self.finish_no_glob() - } - - fn finish_no_glob(self) -> PolarsResult { + fn finish(self) -> PolarsResult { + let paths = self.expand_paths()?.0; let args = self.args; - let paths = if self.paths.is_empty() { - Arc::new([self.path]) as Arc<[PathBuf]> - } else { - self.paths - }; - let options = IpcScanOptions { memory_map: args.memory_map, }; @@ -87,19 +70,14 @@ impl LazyFileListReader for LazyIpcReader { Ok(lf) } - fn path(&self) -> &Path { - self.path.as_path() + fn finish_no_glob(self) -> PolarsResult { + unreachable!() } fn paths(&self) -> &[PathBuf] { &self.paths } - fn with_path(mut self, path: PathBuf) -> Self { - self.path = path; - self - } - fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self @@ -136,12 +114,12 @@ impl LazyFileListReader for LazyIpcReader { impl LazyFrame { /// Create a LazyFrame directly from a ipc scan. pub fn scan_ipc(path: impl AsRef, args: ScanArgsIpc) -> PolarsResult { - LazyIpcReader::new(path.as_ref().to_owned(), args).finish() + LazyIpcReader::new(args) + .with_paths(Arc::new([path.as_ref().to_path_buf()])) + .finish() } pub fn scan_ipc_files(paths: Arc<[PathBuf]>, args: ScanArgsIpc) -> PolarsResult { - LazyIpcReader::new(PathBuf::new(), args) - .with_paths(paths) - .finish() + LazyIpcReader::new(args).with_paths(paths).finish() } } diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 824698e3a317..89022901e9b1 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -11,8 +11,7 @@ use crate::prelude::LazyFrame; #[derive(Clone)] pub struct LazyJsonLineReader { - pub(crate) path: PathBuf, - paths: Arc<[PathBuf]>, + pub(crate) paths: Arc<[PathBuf]>, pub(crate) batch_size: Option, pub(crate) low_memory: bool, pub(crate) rechunk: bool, @@ -30,8 +29,7 @@ impl LazyJsonLineReader { pub fn new(path: impl AsRef) -> Self { LazyJsonLineReader { - path: path.as_ref().to_path_buf(), - paths: Arc::new([]), + paths: Arc::new([path.as_ref().to_path_buf()]), batch_size: None, low_memory: false, rechunk: false, @@ -116,27 +114,19 @@ impl LazyFileListReader for LazyJsonLineReader { let scan_type = FileScan::NDJson { options }; Ok(LazyFrame::from(DslPlan::Scan { - paths: Arc::from([self.path.clone()]), + paths: self.paths, file_info: None, + hive_parts: None, predicate: None, file_options, scan_type, })) } - fn path(&self) -> &Path { - &self.path - } - fn paths(&self) -> &[PathBuf] { &self.paths } - fn with_path(mut self, path: PathBuf) -> Self { - self.path = path; - self - } - fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index 4e57b25d42bf..c4274c595fc4 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -42,15 +42,13 @@ impl Default for ScanArgsParquet { #[derive(Clone)] struct LazyParquetReader { args: ScanArgsParquet, - path: PathBuf, paths: Arc<[PathBuf]>, } impl LazyParquetReader { - fn new(path: PathBuf, args: ScanArgsParquet) -> Self { + fn new(args: ScanArgsParquet) -> Self { Self { args, - path, paths: Arc::new([]), } } @@ -59,26 +57,11 @@ impl LazyParquetReader { impl LazyFileListReader for LazyParquetReader { /// Get the final [LazyFrame]. fn finish(mut self) -> PolarsResult { - if !self.args.glob { - return self.finish_no_glob(); - } - if let Some(paths) = self.iter_paths()? { - let paths = paths - .into_iter() - .collect::>>()?; - self.paths = paths; - } - self.finish_no_glob() - } + let (paths, hive_start_idx) = self.expand_paths()?; + self.args.hive_options.hive_start_idx = hive_start_idx; - fn finish_no_glob(self) -> PolarsResult { let row_index = self.args.row_index; - let paths = if self.paths.is_empty() { - Arc::new([self.path]) as Arc<[PathBuf]> - } else { - self.paths - }; let mut lf: LazyFrame = DslBuilder::scan_parquet( paths, self.args.n_rows, @@ -103,17 +86,16 @@ impl LazyFileListReader for LazyParquetReader { Ok(lf) } - fn path(&self) -> &Path { - self.path.as_path() + fn glob(&self) -> bool { + self.args.glob } - fn paths(&self) -> &[PathBuf] { - &self.paths + fn finish_no_glob(self) -> PolarsResult { + unreachable!(); } - fn with_path(mut self, path: PathBuf) -> Self { - self.path = path; - self + fn paths(&self) -> &[PathBuf] { + &self.paths } fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { @@ -156,13 +138,13 @@ impl LazyFileListReader for LazyParquetReader { impl LazyFrame { /// Create a LazyFrame directly from a parquet scan. pub fn scan_parquet(path: impl AsRef, args: ScanArgsParquet) -> PolarsResult { - LazyParquetReader::new(path.as_ref().to_owned(), args).finish() + LazyParquetReader::new(args) + .with_paths(Arc::new([path.as_ref().to_path_buf()])) + .finish() } /// Create a LazyFrame directly from a parquet scan. pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult { - LazyParquetReader::new(PathBuf::new(), args) - .with_paths(paths) - .finish() + LazyParquetReader::new(args).with_paths(paths).finish() } } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 53d1a8ce285c..69a4e2348fcb 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use hive::HivePartitions; use polars_core::config; #[cfg(feature = "cloud")] use polars_core::config::{get_file_prefetch_size, verbose}; @@ -15,6 +16,7 @@ use super::*; pub struct ParquetExec { paths: Arc<[PathBuf]>, file_info: FileInfo, + hive_parts: Option>>, predicate: Option>, options: ParquetOptions, #[allow(dead_code)] @@ -25,9 +27,11 @@ pub struct ParquetExec { } impl ParquetExec { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( paths: Arc<[PathBuf]>, file_info: FileInfo, + hive_parts: Option>>, predicate: Option>, options: ParquetOptions, cloud_options: Option, @@ -37,6 +41,7 @@ impl ParquetExec { ParquetExec { paths, file_info, + hive_parts, predicate, options, cloud_options, @@ -57,12 +62,14 @@ impl ParquetExec { let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); let mut base_row_index = self.file_options.row_index.take(); - // Limit no. of files at a time to prevent open file limits. - for paths in self - .paths - .chunks(std::cmp::min(POOL.current_num_threads(), 128)) - { + let step = std::cmp::min(POOL.current_num_threads(), 128); + + for i in (0..self.paths.len()).step_by(step) { + let end = std::cmp::min(i.saturating_add(step), self.paths.len()); + let paths = &self.paths[i..end]; + let hive_parts = self.hive_parts.as_ref().map(|x| &x[i..end]); + if remaining_rows_to_read == 0 && !result.is_empty() { return Ok(result); } @@ -70,16 +77,10 @@ impl ParquetExec { // First initialize the readers, predicates and metadata. // This will be used to determine the slices. That way we can actually read all the // files in parallel even if we add row index columns or slices. - let readers_and_metadata = paths - .iter() - .map(|path| { - let mut file_info = self.file_info.clone(); - file_info.update_hive_partitions(path)?; - - let hive_partitions = file_info - .hive_parts - .as_ref() - .map(|hive| hive.materialize_partition_columns()); + let readers_and_metadata = (0..paths.len()) + .map(|i| { + let path = &paths[i]; + let hive_partitions = hive_parts.map(|x| x[i].materialize_partition_columns()); let file = std::fs::File::open(path)?; let (projection, predicate) = prepare_scan_args( @@ -251,16 +252,16 @@ impl ParquetExec { eprintln!("reading of {}/{} file...", processed, self.paths.len()); } - let iter = readers_and_metadata - .into_iter() - .zip(rows_statistics.iter()) - .zip(paths.as_ref().iter()) - .map( - |( - ((num_rows_this_file, reader), (remaining_rows_to_read, cumulative_read)), - path, - )| async move { - let mut file_info = file_info.clone(); + let iter = readers_and_metadata.into_iter().enumerate().map( + |(i, (num_rows_this_file, reader))| { + let (remaining_rows_to_read, cumulative_read) = &rows_statistics[i]; + let hive_partitions = self + .hive_parts + .as_ref() + .map(|x| x[i].materialize_partition_columns()); + + async move { + let file_info = file_info.clone(); let remaining_rows_to_read = *remaining_rows_to_read; let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read { @@ -273,13 +274,6 @@ impl ParquetExec { offset: rc.offset + *cumulative_read as IdxSize, }); - file_info.update_hive_partitions(path)?; - - let hive_partitions = file_info - .hive_parts - .as_ref() - .map(|hive| hive.materialize_partition_columns()); - let (projection, predicate) = prepare_scan_args( predicate.clone(), &mut file_options.with_columns.clone(), @@ -299,8 +293,9 @@ impl ParquetExec { .finish() .await .map(Some) - }, - ); + } + }, + ); let dfs = futures::future::try_join_all(iter).await?; let n_read = dfs @@ -333,10 +328,10 @@ impl ParquetExec { Some(p) => is_cloud_url(p.as_path()), None => { let hive_partitions = self - .file_info .hive_parts .as_ref() - .map(|hive| hive.materialize_partition_columns()); + .filter(|x| !x.is_empty()) + .map(|x| x.first().unwrap().materialize_partition_columns()); let (projection, _) = prepare_scan_args( None, &mut self.file_options.with_columns, diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index c546980c0aab..ac2f075a0493 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -233,6 +233,7 @@ fn create_physical_plan_impl( Scan { paths, file_info, + hive_parts, output_schema, scan_type, predicate, @@ -283,6 +284,7 @@ fn create_physical_plan_impl( } => Ok(Box::new(executors::ParquetExec::new( paths, file_info, + hive_parts, predicate, options, cloud_options, diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 7b511714c34a..c4222d07c206 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -19,6 +19,7 @@ use polars_io::prelude::ParquetAsyncReader; use polars_io::utils::{check_projected_arrow_schema, is_cloud_url}; use polars_io::SerReader; use polars_plan::plans::FileInfo; +use polars_plan::prelude::hive::HivePartitions; use polars_plan::prelude::FileScanOptions; use polars_utils::iter::EnumerateIdxTrait; use polars_utils::IdxSize; @@ -39,6 +40,7 @@ pub struct ParquetSource { cloud_options: Option, metadata: Option, file_info: FileInfo, + hive_parts: Option>>, verbose: bool, run_async: bool, prefetch_size: usize, @@ -78,12 +80,10 @@ impl ParquetSource { let file_options = self.file_options.clone(); let schema = self.file_info.schema.clone(); - let mut file_info = self.file_info.clone(); - file_info.update_hive_partitions(path)?; - let hive_partitions = file_info + let hive_partitions = self .hive_parts .as_ref() - .map(|hive| hive.materialize_partition_columns()); + .map(|x| x[index].materialize_partition_columns()); let projection = materialize_projection( file_options.with_columns.as_deref(), @@ -192,6 +192,7 @@ impl ParquetSource { metadata: Option, file_options: FileScanOptions, file_info: FileInfo, + hive_parts: Option>>, verbose: bool, predicate: Option>, ) -> PolarsResult { @@ -216,6 +217,7 @@ impl ParquetSource { cloud_options, metadata, file_info, + hive_parts, verbose, run_async, prefetch_size, diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 5d6e65774241..b5f32b185311 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -76,6 +76,7 @@ where Scan { paths, file_info, + hive_parts, file_options, predicate, output_schema, @@ -144,6 +145,7 @@ where metadata, file_options, file_info, + hive_parts, verbose, predicate, )?; diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index 2f52997a7ede..f180e1217cd1 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -56,6 +56,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths: Arc::new([]), file_info: Some(file_info), + hive_parts: None, predicate: None, file_options, scan_type: FileScan::Anonymous { @@ -97,6 +98,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths, file_info: None, + hive_parts: None, file_options: options, predicate: None, scan_type: FileScan::Parquet { @@ -127,6 +129,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths, file_info: None, + hive_parts: None, file_options: FileScanOptions { with_columns: None, cache, @@ -179,6 +182,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths, file_info: None, + hive_parts: None, file_options: options, predicate: None, scan_type: FileScan::Csv { diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 04851cc39d1f..8a95e0a7640f 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1,4 +1,5 @@ use expr_expansion::{is_regex_projection, rewrite_projections}; +use hive::hive_partitions_from_paths; use super::stack_opt::ConversionOptimizer; use super::*; @@ -81,6 +82,7 @@ pub fn to_alp_impl( let v = match lp { DslPlan::Scan { file_info, + hive_parts, paths, predicate, mut scan_type, @@ -132,6 +134,22 @@ pub fn to_alp_impl( } }; + let hive_parts = if hive_parts.is_some() { + hive_parts + } else if file_options.hive_options.enabled { + hive_partitions_from_paths( + paths.as_ref(), + file_options.hive_options.hive_start_idx, + file_options.hive_options.schema.clone(), + )? + } else { + None + }; + + if let Some(ref hive_parts) = hive_parts { + file_info.update_schema_with_hive_schema(hive_parts[0].schema().clone())?; + } + if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); *schema = schema @@ -140,8 +158,9 @@ pub fn to_alp_impl( } IR::Scan { - file_info, paths, + file_info, + hive_parts, output_schema: None, predicate: predicate.map(|expr| to_expr_ir(expr, expr_arena)), scan_type, diff --git a/crates/polars-plan/src/plans/conversion/mod.rs b/crates/polars-plan/src/plans/conversion/mod.rs index 29a165111046..d0d6a41e9fb9 100644 --- a/crates/polars-plan/src/plans/conversion/mod.rs +++ b/crates/polars-plan/src/plans/conversion/mod.rs @@ -45,6 +45,7 @@ impl IR { IR::Scan { paths, file_info, + hive_parts, predicate, scan_type, output_schema: _, @@ -52,6 +53,7 @@ impl IR { } => DslPlan::Scan { paths, file_info: Some(file_info), + hive_parts, predicate: predicate.map(|e| e.to_expr(expr_arena)), scan_type, file_options: options, diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index dce4876fbbb7..f7ccf7f608a1 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -77,16 +77,12 @@ pub(super) fn parquet_file_info( ) }; - let mut file_info = FileInfo::new( + let file_info = FileInfo::new( schema, Some(Either::Left(reader_schema)), (num_rows, num_rows.unwrap_or(0)), ); - if file_options.hive_options.enabled { - file_info.init_hive_partitions(path.as_path(), file_options.hive_options.schema.clone())? - } - Ok((file_info, metadata)) } diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index bc20a0973d7d..9e2f896020d3 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use percent_encoding::percent_decode_str; use polars_core::prelude::*; @@ -92,6 +92,46 @@ impl HivePartitions { } } +/// # Safety +/// `hive_start_idx <= [min path length]` +pub fn hive_partitions_from_paths( + paths: &[PathBuf], + hive_start_idx: usize, + schema: Option, +) -> PolarsResult>>> { + let Some(path) = paths.first() else { + return Ok(None); + }; + + let Some(hive_parts) = HivePartitions::try_from_path( + &PathBuf::from(&path.to_str().unwrap()[hive_start_idx..]), + schema.clone(), + )? + else { + return Ok(None); + }; + + let mut results = Vec::with_capacity(paths.len()); + results.push(Arc::new(hive_parts)); + + for path in &paths[1..] { + let Some(hive_parts) = HivePartitions::try_from_path( + &PathBuf::from(&path.to_str().unwrap()[hive_start_idx..]), + schema.clone(), + )? + else { + polars_bail!( + ComputeError: "expected Hive partitioned path, got {}\n\n\ + This error occurs if some paths are Hive partitioned and some paths are not.", + path.display() + ) + }; + results.push(Arc::new(hive_parts)); + } + + Ok(Some(results)) +} + /// Determine the path separator for identifying Hive partitions. #[cfg(target_os = "windows")] fn separator(url: &Path) -> char { diff --git a/crates/polars-plan/src/plans/ir/dot.rs b/crates/polars-plan/src/plans/ir/dot.rs index 55fc9a99afa0..f38c7d3ebfda 100644 --- a/crates/polars-plan/src/plans/ir/dot.rs +++ b/crates/polars-plan/src/plans/ir/dot.rs @@ -245,6 +245,7 @@ impl<'a> IRDotDisplay<'a> { Scan { paths, file_info, + hive_parts: _, predicate, scan_type, file_options: options, diff --git a/crates/polars-plan/src/plans/ir/inputs.rs b/crates/polars-plan/src/plans/ir/inputs.rs index 7d1bf84bb8a5..2f941bc9282d 100644 --- a/crates/polars-plan/src/plans/ir/inputs.rs +++ b/crates/polars-plan/src/plans/ir/inputs.rs @@ -104,6 +104,7 @@ impl IR { Scan { paths, file_info, + hive_parts, output_schema, predicate, file_options: options, @@ -116,6 +117,7 @@ impl IR { Scan { paths: paths.clone(), file_info: file_info.clone(), + hive_parts: hive_parts.clone(), output_schema: output_schema.clone(), file_options: options.clone(), predicate: new_predicate, diff --git a/crates/polars-plan/src/plans/ir/mod.rs b/crates/polars-plan/src/plans/ir/mod.rs index 8f7cbd9089cc..815e292e0b17 100644 --- a/crates/polars-plan/src/plans/ir/mod.rs +++ b/crates/polars-plan/src/plans/ir/mod.rs @@ -10,6 +10,7 @@ use std::path::PathBuf; pub use dot::IRDotDisplay; pub use format::{ExprIRDisplay, IRDisplay}; +use hive::HivePartitions; use polars_core::prelude::*; use polars_utils::idx_vec::UnitVec; use polars_utils::unitvec; @@ -50,6 +51,7 @@ pub enum IR { Scan { paths: Arc<[PathBuf]>, file_info: FileInfo, + hive_parts: Option>>, predicate: Option, /// schema of the projected file output_schema: Option, diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index 0086c105c4fa..ca9acc44cf53 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; +use hive::HivePartitions; use polars_core::prelude::*; use recursive::recursive; @@ -22,7 +23,7 @@ pub mod expr_ir; mod file_scan; mod format; mod functions; -pub(super) mod hive; +pub mod hive; pub(crate) mod iterator; mod lit; pub(crate) mod optimizer; @@ -80,6 +81,7 @@ pub enum DslPlan { paths: Arc<[PathBuf]>, // Option as this is mostly materialized on the IR phase. file_info: Option, + hive_parts: Option>>, predicate: Option, file_options: FileScanOptions, scan_type: FileScan, @@ -186,7 +188,7 @@ impl Clone for DslPlan { Self::PythonScan { options } => Self::PythonScan { options: options.clone() }, Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() }, Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() }, - Self::Scan { paths, file_info, predicate, file_options, scan_type } => Self::Scan { paths: paths.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() }, + Self::Scan { paths, file_info, hive_parts, predicate, file_options, scan_type } => Self::Scan { paths: paths.clone(), file_info: file_info.clone(), hive_parts: hive_parts.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() }, Self::DataFrameScan { df, schema, output_schema, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), filter: selection.clone() }, Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() }, Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() }, diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index 41e7296adaec..ce2330106ac0 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -320,7 +320,8 @@ impl<'a> PredicatePushDown<'a> { }, Scan { mut paths, - mut file_info, + file_info, + hive_parts: mut scan_hive_parts, ref predicate, mut scan_type, file_options: options, @@ -350,23 +351,20 @@ impl<'a> PredicatePushDown<'a> { }; let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena); - if let (true, Some(predicate)) = (file_info.hive_parts.is_some(), &predicate) { + if let (Some(hive_parts), Some(predicate)) = (&scan_hive_parts, &predicate) { if let Some(io_expr) = self.hive_partition_eval.unwrap()(predicate, expr_arena) { if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { let mut new_paths = Vec::with_capacity(paths.len()); + let mut new_hive_parts = Vec::with_capacity(paths.len()); - for path in paths.as_ref().iter() { - file_info.update_hive_partitions(path)?; - let hive_part_stats = file_info.hive_parts.as_deref().ok_or_else(|| { - polars_err!( - ComputeError: - "cannot combine hive partitioned directories with non-hive partitioned ones" - ) - })?; + for i in 0..paths.len() { + let path = &paths[i]; + let hive_parts = &hive_parts[i]; - if stats_evaluator.should_read(hive_part_stats.get_statistics())? { + if stats_evaluator.should_read(hive_parts.get_statistics())? { new_paths.push(path.clone()); + new_hive_parts.push(hive_parts.clone()); } } @@ -391,7 +389,8 @@ impl<'a> PredicatePushDown<'a> { filter: None, }); } else { - paths = Arc::from(new_paths) + paths = Arc::from(new_paths); + scan_hive_parts = Some(new_hive_parts); } } } @@ -408,10 +407,13 @@ impl<'a> PredicatePushDown<'a> { }; do_optimization &= predicate.is_some(); + let hive_parts = scan_hive_parts; + let lp = if do_optimization { Scan { paths, file_info, + hive_parts, predicate, file_options: options, output_schema, @@ -421,6 +423,7 @@ impl<'a> PredicatePushDown<'a> { let lp = Scan { paths, file_info, + hive_parts, predicate: None, file_options: options, output_schema, diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 2954bd77e797..8282e171d6be 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -395,6 +395,7 @@ impl ProjectionPushDown { Scan { paths, file_info, + hive_parts, scan_type, predicate, mut file_options, @@ -433,8 +434,9 @@ impl ProjectionPushDown { // Hive partitions are created AFTER the projection, so the output // schema is incorrect. Here we ensure the columns that are projected and hive // parts are added at the proper place in the schema, which is at the end. - if let Some(parts) = file_info.hive_parts.as_deref() { - let partition_schema = parts.schema(); + if let Some(ref hive_parts) = hive_parts { + let partition_schema = hive_parts.first().unwrap().schema(); + for (name, _) in partition_schema.iter() { if let Some(dt) = schema.shift_remove(name) { schema.with_column(name.clone(), dt); @@ -448,6 +450,7 @@ impl ProjectionPushDown { let lp = Scan { paths, file_info, + hive_parts, output_schema, scan_type, predicate, diff --git a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs index 9fbb83805bb8..c88302703d05 100644 --- a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs @@ -165,6 +165,7 @@ impl SlicePushDown { (Scan { paths, file_info, + hive_parts, output_schema, mut file_options, predicate, @@ -175,6 +176,7 @@ impl SlicePushDown { let lp = Scan { paths, file_info, + hive_parts, output_schema, scan_type: FileScan::Csv { options, cloud_options }, file_options, @@ -187,6 +189,7 @@ impl SlicePushDown { (Scan { paths, file_info, + hive_parts, output_schema, file_options: mut options, predicate, @@ -196,6 +199,7 @@ impl SlicePushDown { let lp = Scan { paths, file_info, + hive_parts, output_schema, predicate, file_options: options, diff --git a/crates/polars-plan/src/plans/schema.rs b/crates/polars-plan/src/plans/schema.rs index b5fd5bca2db2..35f4aade0ab6 100644 --- a/crates/polars-plan/src/plans/schema.rs +++ b/crates/polars-plan/src/plans/schema.rs @@ -1,5 +1,4 @@ use std::ops::Deref; -use std::path::Path; use std::sync::Mutex; use arrow::datatypes::ArrowSchemaRef; @@ -9,7 +8,6 @@ use polars_utils::format_smartstring; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::hive::HivePartitions; use crate::prelude::*; impl DslPlan { @@ -36,7 +34,6 @@ pub struct FileInfo { /// - known size /// - estimated size (set to unsize::max if unknown). pub row_estimation: (Option, usize), - pub hive_parts: Option>, } impl FileInfo { @@ -50,30 +47,13 @@ impl FileInfo { schema: schema.clone(), reader_schema, row_estimation, - hive_parts: None, } } - /// Set the [`HivePartitions`] information for this [`FileInfo`] from a path and an - /// optional schema. - pub fn init_hive_partitions( - &mut self, - path: &Path, - schema: Option, - ) -> PolarsResult<()> { - let hp = HivePartitions::try_from_path(path, schema)?; - if let Some(hp) = hp { - let hive_schema = hp.schema().clone(); - self.update_schema_with_hive_schema(hive_schema)?; - self.hive_parts = Some(Arc::new(hp)); - } - Ok(()) - } - /// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`]. /// /// Returns an `Err` if any of the columns in either schema overlap. - fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> { + pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> { let expected_len = self.schema.len() + hive_schema.len(); let file_schema = Arc::make_mut(&mut self.schema); @@ -86,29 +66,6 @@ impl FileInfo { ); Ok(()) } - - /// Update the [`HivePartitions`] statistics for this [`FileInfo`]. - /// - /// If the Hive partitions were not yet initialized, this function has no effect. - pub fn update_hive_partitions(&mut self, path: &Path) -> PolarsResult<()> { - if let Some(current) = &mut self.hive_parts { - let schema = current.schema().clone(); - let hp = HivePartitions::try_from_path(path, Some(schema))?; - let Some(new) = hp else { - polars_bail!( - ComputeError: "expected Hive partitioned path, got {}\n\n\ - This error occurs if some paths are Hive partitioned and some paths are not.", - path.display() - ) - }; - - match Arc::get_mut(current) { - Some(hp) => *hp = new, - None => *current = Arc::new(new), - } - } - Ok(()) - } } #[cfg(feature = "streaming")] diff --git a/crates/polars-plan/src/plans/visitor/hash.rs b/crates/polars-plan/src/plans/visitor/hash.rs index 0b43800f34b9..80c251108297 100644 --- a/crates/polars-plan/src/plans/visitor/hash.rs +++ b/crates/polars-plan/src/plans/visitor/hash.rs @@ -76,6 +76,7 @@ impl Hash for HashableEqLP<'_> { IR::Scan { paths, file_info: _, + hive_parts: _, predicate, output_schema: _, scan_type, @@ -255,6 +256,7 @@ impl HashableEqLP<'_> { IR::Scan { paths: pl, file_info: _, + hive_parts: _, predicate: pred_l, output_schema: _, scan_type: stl, @@ -263,6 +265,7 @@ impl HashableEqLP<'_> { IR::Scan { paths: pr, file_info: _, + hive_parts: _, predicate: pred_r, output_schema: _, scan_type: str, diff --git a/py-polars/polars/io/csv/batched_reader.py b/py-polars/polars/io/csv/batched_reader.py index b622d4fcc739..fc80e77424c0 100644 --- a/py-polars/polars/io/csv/batched_reader.py +++ b/py-polars/polars/io/csv/batched_reader.py @@ -57,7 +57,7 @@ def __init__( truncate_ragged_lines: bool = False, decimal_comma: bool = False, ): - path = normalize_filepath(source) + path = normalize_filepath(source, check_not_directory=False) dtype_list: Sequence[tuple[str, PolarsDataType]] | None = None dtype_slice: Sequence[PolarsDataType] | None = None diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index b5b4c661ad60..ee630a370f04 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -487,7 +487,7 @@ def _read_csv_impl( ) -> DataFrame: path: str | None if isinstance(source, (str, Path)): - path = normalize_filepath(source) + path = normalize_filepath(source, check_not_directory=False) else: path = None if isinstance(source, BytesIO): @@ -1141,9 +1141,11 @@ def with_column_names(cols: list[str]) -> list[str]: _check_arg_is_1byte("quote_char", quote_char, can_be_empty=True) if isinstance(source, (str, Path)): - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) else: - source = [normalize_filepath(source) for source in source] + source = [ + normalize_filepath(source, check_not_directory=False) for source in source + ] return _scan_csv_impl( source, diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index 6bd12b210733..deb9b7a8956b 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -137,7 +137,7 @@ def _read_ipc_impl( memory_map: bool = True, ) -> DataFrame: if isinstance(source, (str, Path)): - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) if isinstance(columns, str): columns = [columns] @@ -261,7 +261,7 @@ def _read_ipc_stream_impl( rechunk: bool = True, ) -> DataFrame: if isinstance(source, (str, Path)): - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) if isinstance(columns, str): columns = [columns] @@ -294,7 +294,7 @@ def read_ipc_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, DataTyp Dictionary mapping column names to datatypes """ if isinstance(source, (str, Path)): - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) return _read_ipc_schema(source) @@ -353,11 +353,13 @@ def scan_ipc( """ if isinstance(source, (str, Path)): can_use_fsspec = True - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) sources = [] else: can_use_fsspec = False - sources = [normalize_filepath(source) for source in source] + sources = [ + normalize_filepath(source, check_not_directory=False) for source in source + ] source = None # type: ignore[assignment] # try fsspec scanner diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 20657e8edea7..20ef371eea23 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -274,7 +274,7 @@ def read_parquet_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, Dat Dictionary mapping column names to datatypes """ if isinstance(source, (str, Path)): - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) return _read_parquet_schema(source) @@ -383,9 +383,11 @@ def scan_parquet( issue_unstable_warning(msg) if isinstance(source, (str, Path)): - source = normalize_filepath(source) + source = normalize_filepath(source, check_not_directory=False) else: - source = [normalize_filepath(source) for source in source] + source = [ + normalize_filepath(source, check_not_directory=False) for source in source + ] return _scan_parquet_impl( source, diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 4dbf4f68c605..348ee115b314 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -334,6 +334,7 @@ impl PyLazyFrame { }); let hive_options = HiveOptions { enabled: hive_partitioning, + hive_start_idx: 0, schema: hive_schema, }; diff --git a/py-polars/src/lazyframe/visitor/nodes.rs b/py-polars/src/lazyframe/visitor/nodes.rs index 103e85ecf490..5c0832a07d0f 100644 --- a/py-polars/src/lazyframe/visitor/nodes.rs +++ b/py-polars/src/lazyframe/visitor/nodes.rs @@ -294,6 +294,7 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult { IR::Scan { paths, file_info: _, + hive_parts: _, predicate, output_schema: _, scan_type, diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 9f8a483626f9..701ebebc1417 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -1,8 +1,9 @@ from __future__ import annotations from dataclasses import dataclass +from functools import partial from math import ceil -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Callable import pytest @@ -191,9 +192,6 @@ def data_file( def test_scan( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -209,9 +207,6 @@ def test_scan( def test_scan_with_limit( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -234,9 +229,6 @@ def test_scan_with_limit( def test_scan_with_filter( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -263,9 +255,6 @@ def test_scan_with_filter( def test_scan_with_filter_and_limit( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -293,9 +282,6 @@ def test_scan_with_filter_and_limit( def test_scan_with_limit_and_filter( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -323,9 +309,6 @@ def test_scan_with_limit_and_filter( def test_scan_with_row_index_and_limit( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -354,9 +337,6 @@ def test_scan_with_row_index_and_limit( def test_scan_with_row_index_and_filter( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -385,9 +365,6 @@ def test_scan_with_row_index_and_filter( def test_scan_with_row_index_limit_and_filter( capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool ) -> None: - if data_file.path.suffix == ".csv" and force_async: - pytest.skip(reason="async reading of .csv not yet implemented") - if force_async: _enable_force_async(monkeypatch) @@ -443,3 +420,54 @@ def test_scan_with_row_index_filter_and_limit( schema_overrides={"index": pl.UInt32}, ), ) + + +@pytest.mark.write_disk() +@pytest.mark.parametrize( + ("scan_func", "write_func"), + [ + (pl.scan_csv, pl.DataFrame.write_csv), + (pl.scan_parquet, pl.DataFrame.write_parquet), + (pl.scan_ipc, pl.DataFrame.write_ipc), + ], +) +@pytest.mark.parametrize( + "glob", + [True, False], +) +def test_scan_directory( + tmp_path: Path, + scan_func: Callable[[Any], pl.LazyFrame], + write_func: Callable[[pl.DataFrame, Path], None], + glob: bool, +) -> None: + tmp_path.mkdir(exist_ok=True) + + dfs: list[pl.DataFrame] = [ + pl.DataFrame({"a": [0, 0, 0, 0, 0]}), + pl.DataFrame({"a": [1, 1, 1, 1, 1]}), + pl.DataFrame({"a": [2, 2, 2, 2, 2]}), + ] + + paths = [ + tmp_path / "0.bin", + tmp_path / "1.bin", + tmp_path / "dir/data.bin", + ] + + for df, path in zip(dfs, paths): + path.parent.mkdir(exist_ok=True) + write_func(df, path) + + df = pl.concat(dfs) + + scan = scan_func + + if scan_func is pl.scan_csv: + scan = partial(scan, schema=df.schema) + + if scan_func is pl.scan_parquet: + scan = partial(scan, glob=glob) + + out = scan(tmp_path).collect() + assert_frame_equal(out, df)