Skip to content

Commit

Permalink
move to seperate function
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Dec 8, 2024
1 parent 341493d commit bd71732
Showing 1 changed file with 154 additions and 159 deletions.
313 changes: 154 additions & 159 deletions crates/polars-mem-engine/src/executors/hive_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,159 @@ impl HiveExec {
scan_type,
}
}

pub fn read(&mut self) -> PolarsResult<DataFrame> {
let include_file_paths = self.file_options.include_file_paths.take();
let predicate = self.predicate.take();
let stats_evaluator = predicate.as_ref().and_then(|p| p.as_stats_evaluator());
let mut row_index = self.file_options.row_index.take();
let mut slice = self.file_options.slice.take();

assert_eq!(self.sources.len(), self.hive_parts.len());

assert!(!self.file_options.allow_missing_columns, "NYI");
assert!(slice.is_none_or(|s| s.0 >= 0), "NYI");

#[cfg(feature = "parquet")]
{
let FileScan::Parquet {
options,
cloud_options,
metadata,
} = self.scan_type.clone()
else {
todo!()
};

let stats_evaluator = stats_evaluator.filter(|_| options.use_statistics);

let mut dfs = Vec::with_capacity(self.sources.len());

for (source, hive_part) in self.sources.iter().zip(self.hive_parts.iter()) {
if slice.is_some_and(|s| s.1 == 0) {
break;
}

let part_source = match source {
ScanSourceRef::Path(path) => ScanSources::Paths([path.to_path_buf()].into()),
ScanSourceRef::File(_) | ScanSourceRef::Buffer(_) => {
ScanSources::Buffers([source.to_memslice()?].into())
},
};

let mut file_options = self.file_options.clone();
file_options.row_index = row_index.clone();

// @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate
let mut exec = ParquetExec::new(
part_source,
self.file_info.clone(),
None,
None, // @TODO: add predicate with hive columns replaced
options.clone(),
cloud_options.clone(),
file_options,
metadata.clone(),
);

let mut num_unfiltered_rows = LazyTryCell::new(|| exec.num_unfiltered_rows());

let mut do_skip_file = false;
if let Some(slice) = &slice {
do_skip_file |= slice.0 >= num_unfiltered_rows.get()? as i64;
}
if let Some(stats_evaluator) = stats_evaluator {
do_skip_file |= !stats_evaluator
.should_read(hive_part.get_statistics())
.unwrap_or(true);
}

// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows.get()?;
}

if do_skip_file {
// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows.get()?;
}
// Update the slice offset.
if let Some(slice) = slice.as_mut() {
slice.0 = slice.0.saturating_sub(num_unfiltered_rows.get()? as i64);
}

continue;
}

// Read the DataFrame and needed metadata.
// @TODO: these should be merged into one call
let num_unfiltered_rows = num_unfiltered_rows.get()?;
let mut df = exec.read()?;

// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows;
}
// Update the slice.
if let Some(slice) = slice.as_mut() {
slice.0 = slice.0.saturating_sub(num_unfiltered_rows as i64);
slice.1 = slice.1.saturating_sub(num_unfiltered_rows as usize);
}

if let Some(with_columns) = &self.file_options.with_columns {
df = match &row_index {
None => df.select(with_columns.iter().cloned())?,
Some(ri) => df.select(
std::iter::once(ri.name.clone()).chain(with_columns.iter().cloned()),
)?,
}
}

// Materialize the hive columns and add them basic in.
let hive_df: DataFrame = hive_part
.get_statistics()
.column_stats()
.iter()
.map(|hive_col| {
ScalarColumn::from_single_value_series(
hive_col
.to_min()
.unwrap()
.clone()
.with_name(hive_col.field_name().clone()),
df.height(),
)
.into_column()
})
.collect();
let mut df = hive_df.hstack(df.get_columns())?;

if let Some(include_file_paths) = &include_file_paths {
df.with_column(ScalarColumn::new(
include_file_paths.clone(),
PlSmallStr::from_str(source.to_include_path_name()).into(),
df.height(),
))?;
}

dfs.push(df);
}

let out = if cfg!(debug_assertions) {
accumulate_dataframes_vertical(dfs)?
} else {
accumulate_dataframes_vertical_unchecked(dfs)
};

Ok(out)
}

#[cfg(not(feature = "parquet"))]
{
todo!()
}
}
}

impl Executor for HiveExec {
Expand All @@ -90,164 +243,6 @@ impl Executor for HiveExec {
Cow::Borrowed("")
};

state.record(
|| {
let include_file_paths = self.file_options.include_file_paths.take();
let predicate = self.predicate.take();
let stats_evaluator = predicate.as_ref().and_then(|p| p.as_stats_evaluator());
let mut row_index = self.file_options.row_index.take();
let mut slice = self.file_options.slice.take();

assert_eq!(self.sources.len(), self.hive_parts.len());

assert!(!self.file_options.allow_missing_columns, "NYI");
assert!(slice.is_none_or(|s| s.0 >= 0), "NYI");

#[cfg(feature = "parquet")]
{
let FileScan::Parquet {
options,
cloud_options,
metadata,
} = self.scan_type.clone()
else {
todo!()
};

let stats_evaluator = stats_evaluator.filter(|_| options.use_statistics);

let mut dfs = Vec::with_capacity(self.sources.len());

for (source, hive_part) in self.sources.iter().zip(self.hive_parts.iter()) {
if slice.is_some_and(|s| s.1 == 0) {
break;
}

let part_source = match source {
ScanSourceRef::Path(path) => {
ScanSources::Paths([path.to_path_buf()].into())
},
ScanSourceRef::File(_) | ScanSourceRef::Buffer(_) => {
ScanSources::Buffers([source.to_memslice()?].into())
},
};

let mut file_options = self.file_options.clone();
file_options.row_index = row_index.clone();

// @TODO: There are cases where we can ignore reading. E.g. no row index + empty with columns + no predicate
let mut exec = ParquetExec::new(
part_source,
self.file_info.clone(),
None,
None, // @TODO: add predicate with hive columns replaced
options.clone(),
cloud_options.clone(),
file_options,
metadata.clone(),
);

let mut num_unfiltered_rows =
LazyTryCell::new(|| exec.num_unfiltered_rows());

let mut do_skip_file = false;
if let Some(slice) = &slice {
do_skip_file |= slice.0 >= num_unfiltered_rows.get()? as i64;
}
if let Some(stats_evaluator) = stats_evaluator {
do_skip_file |= !stats_evaluator
.should_read(hive_part.get_statistics())
.unwrap_or(true);
}

// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows.get()?;
}

if do_skip_file {
// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows.get()?;
}
// Update the slice offset.
if let Some(slice) = slice.as_mut() {
slice.0 = slice.0.saturating_sub(num_unfiltered_rows.get()? as i64);
}

continue;
}

// Read the DataFrame and needed metadata.
// @TODO: these should be merged into one call
let num_unfiltered_rows = num_unfiltered_rows.get()?;
let mut df = exec.read()?;

// Update the row_index to the proper offset.
if let Some(row_index) = row_index.as_mut() {
row_index.offset += num_unfiltered_rows;
}
// Update the slice.
if let Some(slice) = slice.as_mut() {
slice.0 = slice.0.saturating_sub(num_unfiltered_rows as i64);
slice.1 = slice.1.saturating_sub(num_unfiltered_rows as usize);
}

if let Some(with_columns) = &self.file_options.with_columns {
df = match &row_index {
None => df.select(with_columns.iter().cloned())?,
Some(ri) => df.select(
std::iter::once(ri.name.clone())
.chain(with_columns.iter().cloned()),
)?,
}
}

// Materialize the hive columns and add them basic in.
let hive_df: DataFrame = hive_part
.get_statistics()
.column_stats()
.iter()
.map(|hive_col| {
ScalarColumn::from_single_value_series(
hive_col
.to_min()
.unwrap()
.clone()
.with_name(hive_col.field_name().clone()),
df.height(),
)
.into_column()
})
.collect();
let mut df = hive_df.hstack(df.get_columns())?;

if let Some(include_file_paths) = &include_file_paths {
df.with_column(ScalarColumn::new(
include_file_paths.clone(),
PlSmallStr::from_str(source.to_include_path_name()).into(),
df.height(),
))?;
}

dfs.push(df);
}

let out = if cfg!(debug_assertions) {
accumulate_dataframes_vertical(dfs)?
} else {
accumulate_dataframes_vertical_unchecked(dfs)
};

Ok(out)
}

#[cfg(not(feature = "parquet"))]
{
todo!()
}
},
profile_name,
)
state.record(|| self.read(), profile_name)
}
}

0 comments on commit bd71732

Please sign in to comment.