Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pruning on partition columns #1179

Merged
merged 11 commits into from
Mar 10, 2023
9 changes: 2 additions & 7 deletions rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,17 @@ impl From<BuilderError> for DeltaTableError {
}

/// possible version specifications for loading a delta table
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum DeltaVersion {
/// load the newest version
#[default]
Newest,
/// specify the version to load
Version(DeltaDataTypeVersion),
/// specify the timestamp in UTC
Timestamp(DateTime<Utc>),
}

impl Default for DeltaVersion {
fn default() -> Self {
DeltaVersion::Newest
}
}

/// Configuration options for delta table
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
170 changes: 127 additions & 43 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::builder::ensure_table_uri;
use crate::Invariant;
use crate::{action, open_table, open_table_with_storage_options};
use crate::{action, open_table, open_table_with_storage_options, SchemaDataType};
use crate::{schema, DeltaTableBuilder};
use crate::{DeltaResult, Invariant};
use crate::{DeltaTable, DeltaTableError};

impl From<DeltaTableError> for DataFusionError {
Expand Down Expand Up @@ -236,49 +236,59 @@ impl DeltaTable {
}
}

fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option<ArrayRef> {
let field = table
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;

// See issue 1214. Binary type does not support natural order which is required for Datafusion to prune
if let SchemaDataType::primitive(t) = &field.get_type() {
if t == "binary" {
return None;
}
}

let data_type = field.get_type().try_into().ok()?;
let partition_columns = &table.get_metadata().ok()?.partition_columns;

let values = table.get_state().files().iter().map(|add| {
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
let value = match value {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, &data_type).unwrap_or(ScalarValue::Null)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
} else {
statistics.min_values
};

values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
}

impl PruningStatistics for DeltaTable {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.min_values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
get_prune_stats(self, column, false)
}

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.max_values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
get_prune_stats(self, column, true)
}

/// return the number of containers (e.g. row groups) being
Expand All @@ -292,13 +302,29 @@ impl PruningStatistics for DeltaTable {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.get_metadata().ok()?.partition_columns;

let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
match value {
Some(_) => ScalarValue::UInt64(Some(0)),
None => ScalarValue::UInt64(Some(statistics.num_records as u64)),
}
} else {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
}
} else if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
match value {
Some(_) => ScalarValue::UInt64(Some(0)),
None => ScalarValue::UInt64(None),
}
} else {
ScalarValue::UInt64(None)
}
Expand Down Expand Up @@ -510,6 +536,64 @@ impl ExecutionPlan for DeltaScan {
}
}

fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
match t {
ArrowDataType::Null => Ok(ScalarValue::Null),
ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
ArrowDataType::FixedSizeBinary(size) => {
Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
}
ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
None,
precision.to_owned(),
scale.to_owned(),
)),
ArrowDataType::Timestamp(unit, tz) => {
let tz = tz.to_owned();
Ok(match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
})
}
//Unsupported types...
ArrowDataType::Float16
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::Union(_, _, _)
| ArrowDataType::Dictionary(_, _)
| ArrowDataType::LargeList(_)
| ArrowDataType::Struct(_)
| ArrowDataType::List(_)
| ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::Time32(_)
| ArrowDataType::Time64(_)
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::RunEndEncoded(_, _)
| ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
"Unsupported data type for Delta Lake {}",
t
))),
}
}

fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> PartitionedFile {
let partition_values = schema
.fields()
Expand All @@ -521,7 +605,7 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P
f.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => ScalarValue::Null,
None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null),
})
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -570,7 +654,7 @@ fn to_correct_scalar_value(
match stat_val {
serde_json::Value::Array(_) => None,
serde_json::Value::Object(_) => None,
serde_json::Value::Null => None,
serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(),
serde_json::Value::String(string_val) => match field_dt {
ArrowDataType::Timestamp(_, _) => {
let time_nanos = ScalarValue::try_from_string(
Expand Down
1 change: 0 additions & 1 deletion rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ pub(crate) fn divide_by_partition_values(
// get row indices for current partition
let idx: UInt32Array = (range.start..range.end)
.map(|i| Some(indices.value(i)))
.into_iter()
.collect();

let partition_key_iter = sorted_partition_columns.iter().map(|c| {
Expand Down
Loading