Skip to content

Commit

Permalink
Implement pruning on partition columns (#1179)
Browse files Browse the repository at this point in the history
# Description
Exposes partition columns in Datafusion's `PruningStatistics` which will
reduce the number of files scanned when the table is queried.

This also resolves another partition issues where involving `null`
partitions. Previously `ScalarValue::Null` was used which would cause an
error when the actual datatype was obtained from the physical parquet
files.

# Related Issue(s)
- closes #1175
  • Loading branch information
Blajda authored Mar 10, 2023
1 parent 3d0ec02 commit 89742b2
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 91 deletions.
9 changes: 2 additions & 7 deletions rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,17 @@ impl From<BuilderError> for DeltaTableError {
}

/// possible version specifications for loading a delta table
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum DeltaVersion {
/// load the newest version
#[default]
Newest,
/// specify the version to load
Version(DeltaDataTypeVersion),
/// specify the timestamp in UTC
Timestamp(DateTime<Utc>),
}

impl Default for DeltaVersion {
fn default() -> Self {
DeltaVersion::Newest
}
}

/// Configuration options for delta table
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
170 changes: 127 additions & 43 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::builder::ensure_table_uri;
use crate::Invariant;
use crate::{action, open_table, open_table_with_storage_options};
use crate::{action, open_table, open_table_with_storage_options, SchemaDataType};
use crate::{schema, DeltaTableBuilder};
use crate::{DeltaResult, Invariant};
use crate::{DeltaTable, DeltaTableError};

impl From<DeltaTableError> for DataFusionError {
Expand Down Expand Up @@ -236,49 +236,59 @@ impl DeltaTable {
}
}

fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option<ArrayRef> {
let field = table
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;

// See issue 1214. Binary type does not support natural order which is required for Datafusion to prune
if let SchemaDataType::primitive(t) = &field.get_type() {
if t == "binary" {
return None;
}
}

let data_type = field.get_type().try_into().ok()?;
let partition_columns = &table.get_metadata().ok()?.partition_columns;

let values = table.get_state().files().iter().map(|add| {
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
let value = match value {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, &data_type).unwrap_or(ScalarValue::Null)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
} else {
statistics.min_values
};

values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
}

impl PruningStatistics for DeltaTable {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.min_values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
get_prune_stats(self, column, false)
}

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.max_values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
get_prune_stats(self, column, true)
}

/// return the number of containers (e.g. row groups) being
Expand All @@ -292,13 +302,29 @@ impl PruningStatistics for DeltaTable {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.get_metadata().ok()?.partition_columns;

let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
match value {
Some(_) => ScalarValue::UInt64(Some(0)),
None => ScalarValue::UInt64(Some(statistics.num_records as u64)),
}
} else {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
}
} else if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
match value {
Some(_) => ScalarValue::UInt64(Some(0)),
None => ScalarValue::UInt64(None),
}
} else {
ScalarValue::UInt64(None)
}
Expand Down Expand Up @@ -510,6 +536,64 @@ impl ExecutionPlan for DeltaScan {
}
}

fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
match t {
ArrowDataType::Null => Ok(ScalarValue::Null),
ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
ArrowDataType::FixedSizeBinary(size) => {
Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
}
ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
None,
precision.to_owned(),
scale.to_owned(),
)),
ArrowDataType::Timestamp(unit, tz) => {
let tz = tz.to_owned();
Ok(match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
})
}
//Unsupported types...
ArrowDataType::Float16
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::Union(_, _, _)
| ArrowDataType::Dictionary(_, _)
| ArrowDataType::LargeList(_)
| ArrowDataType::Struct(_)
| ArrowDataType::List(_)
| ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::Time32(_)
| ArrowDataType::Time64(_)
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::RunEndEncoded(_, _)
| ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
"Unsupported data type for Delta Lake {}",
t
))),
}
}

fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> PartitionedFile {
let partition_values = schema
.fields()
Expand All @@ -521,7 +605,7 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P
f.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => ScalarValue::Null,
None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null),
})
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -570,7 +654,7 @@ fn to_correct_scalar_value(
match stat_val {
serde_json::Value::Array(_) => None,
serde_json::Value::Object(_) => None,
serde_json::Value::Null => None,
serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(),
serde_json::Value::String(string_val) => match field_dt {
ArrowDataType::Timestamp(_, _) => {
let time_nanos = ScalarValue::try_from_string(
Expand Down
1 change: 0 additions & 1 deletion rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ pub(crate) fn divide_by_partition_values(
// get row indices for current partition
let idx: UInt32Array = (range.start..range.end)
.map(|i| Some(indices.value(i)))
.into_iter()
.collect();

let partition_key_iter = sorted_partition_columns.iter().map(|c| {
Expand Down
Loading

0 comments on commit 89742b2

Please sign in to comment.