Skip to content

Commit

Permalink
fix: Fix corrupted reads for hive parts from cloud and projection pus…
Browse files Browse the repository at this point in the history
…hdown failure on hive parts (#17152)
  • Loading branch information
nameexhaustion authored Jun 24, 2024
1 parent 4ca5aa6 commit 514728b
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 65 deletions.
16 changes: 14 additions & 2 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn apply_predicate(
/// - Null count
/// - Minimum value
/// - Maximum value
#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ColumnStats {
field: Field,
Expand Down Expand Up @@ -91,6 +91,10 @@ impl ColumnStats {
}
}

pub fn field_name(&self) -> &SmartString {
self.field.name()
}

/// Returns the [`DataType`] of the column.
pub fn dtype(&self) -> &DataType {
self.field.data_type()
Expand Down Expand Up @@ -195,7 +199,7 @@ fn use_min_max(dtype: &DataType) -> bool {

/// A collection of column stats with a known schema.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BatchStats {
schema: SchemaRef,
stats: Vec<ColumnStats>,
Expand Down Expand Up @@ -238,4 +242,12 @@ impl BatchStats {
pub fn num_rows(&self) -> Option<usize> {
self.num_rows
}

pub fn with_schema(&mut self, schema: SchemaRef) {
self.schema = schema;
}

pub fn take_indices(&mut self, indices: &[usize]) {
self.stats = indices.iter().map(|&i| self.stats[i].clone()).collect();
}
}
46 changes: 11 additions & 35 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use polars_core::config::{get_file_prefetch_size, verbose};
use polars_core::utils::accumulate_dataframes_vertical;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::metadata::FileMetaDataRef;
use polars_io::parquet::read::materialize_empty_df;
use polars_io::utils::is_cloud_url;
use polars_io::RowIndex;

Expand All @@ -16,7 +15,7 @@ use super::*;
pub struct ParquetExec {
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
#[allow(dead_code)]
Expand All @@ -31,7 +30,7 @@ impl ParquetExec {
pub(crate) fn new(
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
Expand Down Expand Up @@ -186,7 +185,12 @@ impl ParquetExec {
let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let mut base_row_index = self.file_options.row_index.take();
let mut processed = 0;
for (batch_idx, paths) in self.paths.chunks(batch_size).enumerate() {

for batch_start in (0..self.paths.len()).step_by(batch_size) {
let end = std::cmp::min(batch_start.saturating_add(batch_size), self.paths.len());
let paths = &self.paths[batch_start..end];
let hive_parts = self.hive_parts.as_ref().map(|x| &x[batch_start..end]);

if remaining_rows_to_read == 0 && !result.is_empty() {
return Ok(result);
}
Expand All @@ -201,7 +205,7 @@ impl ParquetExec {

// First initialize the readers and get the metadata concurrently.
let iter = paths.iter().enumerate().map(|(i, path)| async move {
let first_file = batch_idx == 0 && i == 0;
let first_file = batch_start == 0 && i == 0;
// use the cached one as this saves a cloud call
let (metadata, schema) = if first_file {
(first_metadata.clone(), Some((*first_schema).clone()))
Expand Down Expand Up @@ -255,8 +259,7 @@ impl ParquetExec {
let iter = readers_and_metadata.into_iter().enumerate().map(
|(i, (num_rows_this_file, reader))| {
let (remaining_rows_to_read, cumulative_read) = &rows_statistics[i];
let hive_partitions = self
.hive_parts
let hive_partitions = hive_parts
.as_ref()
.map(|x| x[i].materialize_partition_columns());

Expand Down Expand Up @@ -324,34 +327,7 @@ impl ParquetExec {
.and_then(|_| self.predicate.take())
.map(phys_expr_to_io_expr);

let is_cloud = match self.paths.first() {
Some(p) => is_cloud_url(p.as_path()),
None => {
let hive_partitions = self
.hive_parts
.as_ref()
.filter(|x| !x.is_empty())
.map(|x| x.first().unwrap().materialize_partition_columns());
let (projection, _) = prepare_scan_args(
None,
&mut self.file_options.with_columns,
&mut self.file_info.schema,
self.file_options.row_index.is_some(),
hive_partitions.as_deref(),
);
return Ok(materialize_empty_df(
projection.as_deref(),
self.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left(),
hive_partitions.as_deref(),
self.file_options.row_index.as_ref(),
));
},
};
let is_cloud = is_cloud_url(self.paths.first().unwrap());
let force_async = config::force_async();

let out = if is_cloud || force_async {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct ParquetSource {
cloud_options: Option<CloudOptions>,
metadata: Option<FileMetaDataRef>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
verbose: bool,
run_async: bool,
prefetch_size: usize,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl ParquetSource {
metadata: Option<FileMetaDataRef>,
file_options: FileScanOptions,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
verbose: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<Self> {
Expand Down
34 changes: 30 additions & 4 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,40 @@ use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE};
use serde::{Deserialize, Serialize};

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct HivePartitions {
/// Single value Series that can be used to run the predicate against.
/// They are to be broadcasted if the predicates don't filter them out.
stats: BatchStats,
}

impl HivePartitions {
pub fn get_projection_schema_and_indices<T: AsRef<str>>(
&self,
names: &[T],
) -> (SchemaRef, Vec<usize>) {
let names = names.iter().map(T::as_ref).collect::<PlHashSet<&str>>();
let mut out_schema = Schema::with_capacity(self.stats.schema().len());
let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());

for (i, cs) in self.stats.column_stats().iter().enumerate() {
let name = cs.field_name();
if names.contains(name.as_str()) {
out_indices.push(i);
out_schema
.insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone())
.unwrap();
}
}

(out_schema.into(), out_indices)
}

pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) {
self.stats.with_schema(new_schema);
self.stats.take_indices(column_indices);
}

pub fn get_statistics(&self) -> &BatchStats {
&self.stats
}
Expand All @@ -40,7 +66,7 @@ pub fn hive_partitions_from_paths(
paths: &[PathBuf],
hive_start_idx: usize,
schema: Option<SchemaRef>,
) -> PolarsResult<Option<Vec<Arc<HivePartitions>>>> {
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let Some(path) = paths.first() else {
return Ok(None);
};
Expand Down Expand Up @@ -131,10 +157,10 @@ pub fn hive_partitions_from_paths(

let stats = BatchStats::new(hive_schema.clone(), column_stats, None);

hive_partitions.push(Arc::new(HivePartitions { stats }));
hive_partitions.push(HivePartitions { stats });
}

Ok(Some(hive_partitions))
Ok(Some(Arc::from(hive_partitions)))
}

/// Determine the path separator for identifying Hive partitions.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum IR {
Scan {
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<ExprIR>,
/// schema of the projected file
output_schema: Option<SchemaRef>,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub enum DslPlan {
paths: Arc<[PathBuf]>,
// Option as this is mostly materialized on the IR phase.
file_info: Option<FileInfo>,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Expr>,
file_options: FileScanOptions,
scan_type: FileScan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl<'a> PredicatePushDown<'a> {
}
scan_type.remove_metadata();
}
if paths.is_empty() {
if new_paths.is_empty() {
let schema = output_schema.as_ref().unwrap_or(&file_info.schema);
let df = DataFrame::from(schema.as_ref());

Expand All @@ -390,7 +390,7 @@ impl<'a> PredicatePushDown<'a> {
});
} else {
paths = Arc::from(new_paths);
scan_hive_parts = Some(new_hive_parts);
scan_hive_parts = Some(Arc::from(new_hive_parts));
}
}
}
Expand Down
32 changes: 27 additions & 5 deletions crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl ProjectionPushDown {
Scan {
paths,
file_info,
hive_parts,
mut hive_parts,
scan_type,
predicate,
mut file_options,
Expand All @@ -422,19 +422,39 @@ impl ProjectionPushDown {
file_options.row_index.as_ref(),
);

output_schema = if file_options.with_columns.is_none() {
None
} else {
output_schema = if let Some(ref with_columns) = file_options.with_columns {
let mut schema = update_scan_schema(
&acc_projections,
expr_arena,
&file_info.schema,
scan_type.sort_projection(&file_options),
)?;

hive_parts = if let Some(hive_parts) = hive_parts {
let (new_schema, projected_indices) = hive_parts[0]
.get_projection_schema_and_indices(with_columns.as_ref());

Some(
hive_parts
.iter()
.cloned()
.map(|mut hp| {
hp.apply_projection(
new_schema.clone(),
projected_indices.as_ref(),
);
hp
})
.collect::<Arc<[_]>>(),
)
} else {
hive_parts
};

// Hive partitions are created AFTER the projection, so the output
// schema is incorrect. Here we ensure the columns that are projected and hive
// parts are added at the proper place in the schema, which is at the end.
if let Some(ref hive_parts) = hive_parts {
if let Some(ref mut hive_parts) = hive_parts {
let partition_schema = hive_parts.first().unwrap().schema();

for (name, _) in partition_schema.iter() {
Expand All @@ -444,6 +464,8 @@ impl ProjectionPushDown {
}
}
Some(Arc::new(schema))
} else {
None
};
}

Expand Down
Loading

0 comments on commit 514728b

Please sign in to comment.