Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix corrupted reads for hive parts from cloud and projection pushdown failure on hive parts #17152

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn apply_predicate(
/// - Null count
/// - Minimum value
/// - Maximum value
#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ColumnStats {
field: Field,
Expand Down Expand Up @@ -91,6 +91,10 @@ impl ColumnStats {
}
}

pub fn field_name(&self) -> &SmartString {
self.field.name()
}

/// Returns the [`DataType`] of the column.
pub fn dtype(&self) -> &DataType {
self.field.data_type()
Expand Down Expand Up @@ -195,7 +199,7 @@ fn use_min_max(dtype: &DataType) -> bool {

/// A collection of column stats with a known schema.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BatchStats {
schema: SchemaRef,
stats: Vec<ColumnStats>,
Expand Down Expand Up @@ -238,4 +242,12 @@ impl BatchStats {
pub fn num_rows(&self) -> Option<usize> {
self.num_rows
}

pub fn with_schema(&mut self, schema: SchemaRef) {
self.schema = schema;
}

pub fn take_indices(&mut self, indices: &[usize]) {
self.stats = indices.iter().map(|&i| self.stats[i].clone()).collect();
}
}
46 changes: 11 additions & 35 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use polars_core::config::{get_file_prefetch_size, verbose};
use polars_core::utils::accumulate_dataframes_vertical;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::metadata::FileMetaDataRef;
use polars_io::parquet::read::materialize_empty_df;
use polars_io::utils::is_cloud_url;
use polars_io::RowIndex;

Expand All @@ -16,7 +15,7 @@ use super::*;
pub struct ParquetExec {
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
#[allow(dead_code)]
Expand All @@ -31,7 +30,7 @@ impl ParquetExec {
pub(crate) fn new(
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
Expand Down Expand Up @@ -186,7 +185,12 @@ impl ParquetExec {
let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let mut base_row_index = self.file_options.row_index.take();
let mut processed = 0;
for (batch_idx, paths) in self.paths.chunks(batch_size).enumerate() {

for batch_start in (0..self.paths.len()).step_by(batch_size) {
let end = std::cmp::min(batch_start.saturating_add(batch_size), self.paths.len());
let paths = &self.paths[batch_start..end];
let hive_parts = self.hive_parts.as_ref().map(|x| &x[batch_start..end]);

if remaining_rows_to_read == 0 && !result.is_empty() {
return Ok(result);
}
Expand All @@ -201,7 +205,7 @@ impl ParquetExec {

// First initialize the readers and get the metadata concurrently.
let iter = paths.iter().enumerate().map(|(i, path)| async move {
let first_file = batch_idx == 0 && i == 0;
let first_file = batch_start == 0 && i == 0;
// use the cached one as this saves a cloud call
let (metadata, schema) = if first_file {
(first_metadata.clone(), Some((*first_schema).clone()))
Expand Down Expand Up @@ -255,8 +259,7 @@ impl ParquetExec {
let iter = readers_and_metadata.into_iter().enumerate().map(
|(i, (num_rows_this_file, reader))| {
let (remaining_rows_to_read, cumulative_read) = &rows_statistics[i];
let hive_partitions = self
.hive_parts
let hive_partitions = hive_parts
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
.as_ref()
.map(|x| x[i].materialize_partition_columns());

Expand Down Expand Up @@ -324,34 +327,7 @@ impl ParquetExec {
.and_then(|_| self.predicate.take())
.map(phys_expr_to_io_expr);

let is_cloud = match self.paths.first() {
Some(p) => is_cloud_url(p.as_path()),
None => {
let hive_partitions = self
.hive_parts
.as_ref()
.filter(|x| !x.is_empty())
.map(|x| x.first().unwrap().materialize_partition_columns());
let (projection, _) = prepare_scan_args(
None,
&mut self.file_options.with_columns,
&mut self.file_info.schema,
self.file_options.row_index.is_some(),
hive_partitions.as_deref(),
);
return Ok(materialize_empty_df(
projection.as_deref(),
self.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left(),
hive_partitions.as_deref(),
self.file_options.row_index.as_ref(),
));
},
};
let is_cloud = is_cloud_url(self.paths.first().unwrap());
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the optimizer to replace a scan with DataFrameScan if the predicate filtered out all files (see below)

let force_async = config::force_async();

let out = if is_cloud || force_async {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct ParquetSource {
cloud_options: Option<CloudOptions>,
metadata: Option<FileMetaDataRef>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
verbose: bool,
run_async: bool,
prefetch_size: usize,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl ParquetSource {
metadata: Option<FileMetaDataRef>,
file_options: FileScanOptions,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
verbose: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<Self> {
Expand Down
34 changes: 30 additions & 4 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,40 @@ use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE};
use serde::{Deserialize, Serialize};

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct HivePartitions {
/// Single value Series that can be used to run the predicate against.
/// They are to be broadcasted if the predicates don't filter them out.
stats: BatchStats,
}

impl HivePartitions {
pub fn get_projection_schema_and_indices<T: AsRef<str>>(
&self,
names: &[T],
) -> (SchemaRef, Vec<usize>) {
let names = names.iter().map(T::as_ref).collect::<PlHashSet<&str>>();
let mut out_schema = Schema::with_capacity(self.stats.schema().len());
let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());

for (i, cs) in self.stats.column_stats().iter().enumerate() {
let name = cs.field_name();
if names.contains(name.as_str()) {
out_indices.push(i);
out_schema
.insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone())
.unwrap();
}
}

(out_schema.into(), out_indices)
}

pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) {
self.stats.with_schema(new_schema);
self.stats.take_indices(column_indices);
}

pub fn get_statistics(&self) -> &BatchStats {
&self.stats
}
Expand All @@ -40,7 +66,7 @@ pub fn hive_partitions_from_paths(
paths: &[PathBuf],
hive_start_idx: usize,
schema: Option<SchemaRef>,
) -> PolarsResult<Option<Vec<Arc<HivePartitions>>>> {
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let Some(path) = paths.first() else {
return Ok(None);
};
Expand Down Expand Up @@ -131,10 +157,10 @@ pub fn hive_partitions_from_paths(

let stats = BatchStats::new(hive_schema.clone(), column_stats, None);

hive_partitions.push(Arc::new(HivePartitions { stats }));
hive_partitions.push(HivePartitions { stats });
}

Ok(Some(hive_partitions))
Ok(Some(Arc::from(hive_partitions)))
}

/// Determine the path separator for identifying Hive partitions.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum IR {
Scan {
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<ExprIR>,
/// schema of the projected file
output_schema: Option<SchemaRef>,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub enum DslPlan {
paths: Arc<[PathBuf]>,
// Option as this is mostly materialized on the IR phase.
file_info: Option<FileInfo>,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Expr>,
file_options: FileScanOptions,
scan_type: FileScan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl<'a> PredicatePushDown<'a> {
}
scan_type.remove_metadata();
}
if paths.is_empty() {
if new_paths.is_empty() {
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered there was other code in place to ensure the invariant that a reader always has at least one path here, it would replace the scan node with a DataFrameScan but it wasn't being hit because it was incorrectly checking the old paths instead of new_paths. This changes the fix at #12575

let schema = output_schema.as_ref().unwrap_or(&file_info.schema);
let df = DataFrame::from(schema.as_ref());

Expand All @@ -390,7 +390,7 @@ impl<'a> PredicatePushDown<'a> {
});
} else {
paths = Arc::from(new_paths);
scan_hive_parts = Some(new_hive_parts);
scan_hive_parts = Some(Arc::from(new_hive_parts));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl ProjectionPushDown {
Scan {
paths,
file_info,
hive_parts,
mut hive_parts,
scan_type,
predicate,
mut file_options,
Expand All @@ -422,9 +422,7 @@ impl ProjectionPushDown {
file_options.row_index.as_ref(),
);

output_schema = if file_options.with_columns.is_none() {
None
} else {
output_schema = if let Some(ref with_columns) = file_options.with_columns {
let mut schema = update_scan_schema(
&acc_projections,
expr_arena,
Expand All @@ -434,7 +432,28 @@ impl ProjectionPushDown {
// Hive partitions are created AFTER the projection, so the output
// schema is incorrect. Here we ensure the columns that are projected and hive
// parts are added at the proper place in the schema, which is at the end.
if let Some(ref hive_parts) = hive_parts {
hive_parts = if let Some(hive_parts) = hive_parts {
let (new_schema, projected_indices) = hive_parts[0]
.get_projection_schema_and_indices(with_columns.as_ref());

Some(
hive_parts
.iter()
.cloned()
.map(|mut hp| {
hp.apply_projection(
new_schema.clone(),
projected_indices.as_ref(),
);
hp
})
.collect::<Arc<[_]>>(),
)
} else {
hive_parts
};

if let Some(ref mut hive_parts) = hive_parts {
let partition_schema = hive_parts.first().unwrap().schema();

for (name, _) in partition_schema.iter() {
Expand All @@ -444,6 +463,8 @@ impl ProjectionPushDown {
}
}
Some(Arc::new(schema))
} else {
None
};
}

Expand Down
Loading
Loading