Skip to content

Commit

Permalink
properly do with_columns for nested files
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Dec 13, 2024
1 parent 342bb94 commit 3e1e4ae
Showing 1 changed file with 36 additions and 11 deletions.
47 changes: 36 additions & 11 deletions crates/polars-mem-engine/src/executors/hive_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,40 @@ impl HiveExec {
let include_file_paths = self.file_options.include_file_paths.take();
let predicate = self.predicate.take();

// Create a index set of the hive columns.
let mut hive_column_set = PlIndexSet::default();
if let Some(fst_hive_part) = self.hive_parts.first() {
hive_column_set.extend(
fst_hive_part
.get_statistics()
.column_stats()
.iter()
.map(|c| c.field_name().clone()),
);
}

// Look through the predicate and assess whether hive columns are being used in it.
let mut has_live_hive_columns = false;
if let Some(predicate) = &predicate {
let mut live_columns = PlIndexSet::new();
predicate.collect_live_columns(&mut live_columns);

if let Some(fst_hive_part) = self.hive_parts.first() {
for hive_column in fst_hive_part.get_statistics().column_stats() {
has_live_hive_columns |= live_columns.contains(hive_column.field_name());
}
for hive_column in &hive_column_set {
has_live_hive_columns |= live_columns.contains(hive_column);
}
}

// Remove the hive columns for each file load.
let mut file_with_columns = self.file_options.with_columns.clone();
if let Some(with_columns) = &self.file_options.with_columns {
file_with_columns = Some(
with_columns
.iter()
.filter_map(|c| (!hive_column_set.contains(c)).then(|| c.clone()))
.collect(),
);
}

let mut row_index = self.file_options.row_index.take();
let mut slice = self.file_options.slice.take();

Expand Down Expand Up @@ -130,11 +151,14 @@ impl HiveExec {
if has_live_hive_columns {
let predicate = predicate.as_ref().unwrap();
const_columns.clear();
for hive_column in hive_part.get_statistics().column_stats() {
const_columns.insert(
hive_column.field_name().clone(),
hive_column.to_min().unwrap().get(0).unwrap().into_static(),
);
for (idx, column) in hive_column_set.iter().enumerate() {
let value = hive_part.get_statistics().column_stats()[idx]
.to_min()
.unwrap()
.get(0)
.unwrap()
.into_static();
const_columns.insert(column.clone(), value);
}
file_predicate = predicate.replace_elementwise_const_columns(&const_columns);

Expand Down Expand Up @@ -176,6 +200,7 @@ impl HiveExec {
}

let mut file_options = self.file_options.clone();
file_options.with_columns = file_with_columns.clone();
file_options.row_index = row_index.clone();

// @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate
Expand All @@ -195,7 +220,7 @@ impl HiveExec {
let mut do_skip_file = false;
if let Some(slice) = &slice {
let allow_slice_skip = slice.0 >= num_unfiltered_rows.get()? as i64;
if allow_slice_skip && config::verbose() {
if allow_slice_skip && verbose {
eprintln!(
"Slice allows skipping of '{}'",
source.to_include_path_name()
Expand All @@ -211,7 +236,7 @@ impl HiveExec {
let allow_predicate_skip = !stats_evaluator
.should_read(&BatchStats::default())
.unwrap_or(true);
if allow_predicate_skip && config::verbose() {
if allow_predicate_skip && verbose {
eprintln!(
"File statistics allows skipping of '{}'",
source.to_include_path_name()
Expand Down

0 comments on commit 3e1e4ae

Please sign in to comment.