From 3e1e4aee626b17ce407b349b528872e791fdbc51 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 13 Dec 2024 11:09:38 +0100 Subject: [PATCH] properly do with_columns for nested files --- .../src/executors/hive_scan.rs | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/crates/polars-mem-engine/src/executors/hive_scan.rs b/crates/polars-mem-engine/src/executors/hive_scan.rs index 8512c8c05138..362529ac15d9 100644 --- a/crates/polars-mem-engine/src/executors/hive_scan.rs +++ b/crates/polars-mem-engine/src/executors/hive_scan.rs @@ -81,19 +81,40 @@ impl HiveExec { let include_file_paths = self.file_options.include_file_paths.take(); let predicate = self.predicate.take(); + // Create a index set of the hive columns. + let mut hive_column_set = PlIndexSet::default(); + if let Some(fst_hive_part) = self.hive_parts.first() { + hive_column_set.extend( + fst_hive_part + .get_statistics() + .column_stats() + .iter() + .map(|c| c.field_name().clone()), + ); + } + // Look through the predicate and assess whether hive columns are being used in it. let mut has_live_hive_columns = false; if let Some(predicate) = &predicate { let mut live_columns = PlIndexSet::new(); predicate.collect_live_columns(&mut live_columns); - if let Some(fst_hive_part) = self.hive_parts.first() { - for hive_column in fst_hive_part.get_statistics().column_stats() { - has_live_hive_columns |= live_columns.contains(hive_column.field_name()); - } + for hive_column in &hive_column_set { + has_live_hive_columns |= live_columns.contains(hive_column); } } + // Remove the hive columns for each file load. + let mut file_with_columns = self.file_options.with_columns.clone(); + if let Some(with_columns) = &self.file_options.with_columns { + file_with_columns = Some( + with_columns + .iter() + .filter_map(|c| (!hive_column_set.contains(c)).then(|| c.clone())) + .collect(), + ); + } + let mut row_index = self.file_options.row_index.take(); let mut slice = self.file_options.slice.take(); @@ -130,11 +151,14 @@ impl HiveExec { if has_live_hive_columns { let predicate = predicate.as_ref().unwrap(); const_columns.clear(); - for hive_column in hive_part.get_statistics().column_stats() { - const_columns.insert( - hive_column.field_name().clone(), - hive_column.to_min().unwrap().get(0).unwrap().into_static(), - ); + for (idx, column) in hive_column_set.iter().enumerate() { + let value = hive_part.get_statistics().column_stats()[idx] + .to_min() + .unwrap() + .get(0) + .unwrap() + .into_static(); + const_columns.insert(column.clone(), value); } file_predicate = predicate.replace_elementwise_const_columns(&const_columns); @@ -176,6 +200,7 @@ impl HiveExec { } let mut file_options = self.file_options.clone(); + file_options.with_columns = file_with_columns.clone(); file_options.row_index = row_index.clone(); // @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate @@ -195,7 +220,7 @@ impl HiveExec { let mut do_skip_file = false; if let Some(slice) = &slice { let allow_slice_skip = slice.0 >= num_unfiltered_rows.get()? as i64; - if allow_slice_skip && config::verbose() { + if allow_slice_skip && verbose { eprintln!( "Slice allows skipping of '{}'", source.to_include_path_name() @@ -211,7 +236,7 @@ impl HiveExec { let allow_predicate_skip = !stats_evaluator .should_read(&BatchStats::default()) .unwrap_or(true); - if allow_predicate_skip && config::verbose() { + if allow_predicate_skip && verbose { eprintln!( "File statistics allows skipping of '{}'", source.to_include_path_name()