Skip to content

Commit

Permalink
feat!: integrate with kernel schema types
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Oct 24, 2023
1 parent f99158b commit de4d980
Show file tree
Hide file tree
Showing 40 changed files with 1,255 additions and 822 deletions.
4 changes: 4 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ tokio = { workspace = true, features = [
# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
errno = "0.3"
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
hyper = { version = "0.14", optional = true }
itertools = "0.11"
lazy_static = "1"
Expand All @@ -80,8 +82,10 @@ once_cell = "1.16.0"
parking_lot = "0.12"
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
roaring = "0.10.1"
tracing = { version = "0.1", optional = true }
rand = "0.8"
z85 = "3.0.5"

# hdfs
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
Expand Down
58 changes: 24 additions & 34 deletions rust/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,13 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {

#[cfg(test)]
mod test {
use std::collections::HashMap;

use arrow_schema::DataType;
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::{col, decode, lit, substring, Cast, Expr, ExprSchemable};

use crate::{DeltaOps, DeltaTable, Schema, SchemaDataType, SchemaField};
use crate::kernel::{DataType, PrimitiveType, StructField, StructType};
use crate::{DeltaOps, DeltaTable};

use super::fmt_expr_to_sql;

Expand All @@ -366,66 +365,57 @@ mod test {
}

async fn setup_table() -> DeltaTable {
let schema = Schema::new(vec![
SchemaField::new(
let schema = StructType::new(vec![
StructField::new(
"id".to_string(),
SchemaDataType::primitive("string".to_string()),
DataType::Primitive(PrimitiveType::String),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"value".to_string(),
SchemaDataType::primitive("integer".to_string()),
DataType::Primitive(PrimitiveType::Integer),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"value2".to_string(),
SchemaDataType::primitive("integer".to_string()),
DataType::Primitive(PrimitiveType::Integer),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"modified".to_string(),
SchemaDataType::primitive("string".to_string()),
DataType::Primitive(PrimitiveType::String),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"active".to_string(),
SchemaDataType::primitive("boolean".to_string()),
DataType::Primitive(PrimitiveType::Boolean),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"money".to_string(),
SchemaDataType::primitive("decimal(12,2)".to_string()),
DataType::Primitive(PrimitiveType::Decimal(12, 2)),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"_date".to_string(),
SchemaDataType::primitive("date".to_string()),
DataType::Primitive(PrimitiveType::Date),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"_timestamp".to_string(),
SchemaDataType::primitive("timestamp".to_string()),
DataType::Primitive(PrimitiveType::Timestamp),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"_binary".to_string(),
SchemaDataType::primitive("binary".to_string()),
DataType::Primitive(PrimitiveType::Binary),
true,
HashMap::new(),
),
]);

let table = DeltaOps::new_in_memory()
.create()
.with_columns(schema.get_fields().clone())
.with_columns(schema.fields().clone())
.await
.unwrap();
assert_eq!(table.version(), 0);
Expand All @@ -441,7 +431,7 @@ mod test {
simple!(
Expr::Cast(Cast {
expr: Box::new(lit(1_i64)),
data_type: DataType::Int32
data_type: ArrowDataType::Int32
}),
"arrow_cast(1, 'Int32')".to_string()
),
Expand Down
38 changes: 18 additions & 20 deletions rust/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ use serde::{Deserialize, Serialize};
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::protocol::{self, Add};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::protocol::{self};
use crate::storage::ObjectStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType};
use crate::{open_table, open_table_with_storage_options, DeltaTable};

const PATH_COLUMN: &str = "__delta_rs_path";

Expand Down Expand Up @@ -121,7 +122,7 @@ impl DeltaTableState {
min_value: None,
distinct_count: None
};
self.schema().unwrap().get_fields().len()
self.schema().unwrap().fields().len()
]),
is_exact: true,
},
Expand All @@ -139,13 +140,13 @@ impl DeltaTableState {
column_statistics: acc.column_statistics.map(|col_stats| {
self.schema()
.unwrap()
.get_fields()
.fields()
.iter()
.zip(col_stats)
.map(|(field, stats)| {
let null_count = new_stats
.null_count
.get(field.get_name())
.get(field.name())
.and_then(|x| {
let null_count_acc = stats.null_count?;
let null_count = x.as_value()? as usize;
Expand All @@ -155,7 +156,7 @@ impl DeltaTableState {

let max_value = new_stats
.max_values
.get(field.get_name())
.get(field.name())
.and_then(|x| {
let old_stats = stats.clone();
let max_value = to_scalar_value(x.as_value()?);
Expand All @@ -179,7 +180,7 @@ impl DeltaTableState {

let min_value = new_stats
.min_values
.get(field.get_name())
.get(field.name())
.and_then(|x| {
let old_stats = stats.clone();
let min_value = to_scalar_value(x.as_value()?);
Expand Down Expand Up @@ -222,15 +223,15 @@ impl DeltaTableState {
num_rows: stats.num_rows,
total_byte_size: stats.total_byte_size,
column_statistics: stats.column_statistics.map(|col_stats| {
let fields = self.schema().unwrap().get_fields();
let fields = self.schema().unwrap().fields();
col_stats
.iter()
.zip(fields)
.map(|(col_states, field)| {
let dt = self
.arrow_schema()
.unwrap()
.field_with_name(field.get_name())
.field_with_name(field.name())
.unwrap()
.data_type()
.clone();
Expand Down Expand Up @@ -258,16 +259,14 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
let field = table
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
.map(|s| s.field_with_name(&column.name).ok())??;

// See issue 1214. Binary type does not support natural order which is required for Datafusion to prune
if let SchemaDataType::primitive(t) = &field.get_type() {
if t == "binary" {
return None;
}
if let DeltaDataType::Primitive(PrimitiveType::Binary) = &field.data_type() {
return None;
}

let data_type = field.get_type().try_into().ok()?;
let data_type = field.data_type().try_into().ok()?;
let partition_columns = &table.get_metadata().ok()?.partition_columns;

let values = table.get_state().files().iter().map(|add| {
Expand Down Expand Up @@ -921,10 +920,7 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal
}
}

pub(crate) fn partitioned_file_from_action(
action: &protocol::Add,
schema: &ArrowSchema,
) -> PartitionedFile {
pub(crate) fn partitioned_file_from_action(action: &Add, schema: &ArrowSchema) -> PartitionedFile {
let partition_values = schema
.fields()
.iter()
Expand Down Expand Up @@ -1780,7 +1776,7 @@ mod tests {
let mut partition_values = std::collections::HashMap::new();
partition_values.insert("month".to_string(), Some("1".to_string()));
partition_values.insert("year".to_string(), Some("2015".to_string()));
let action = protocol::Add {
let action = Add {
path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
size: 10644,
partition_values,
Expand All @@ -1791,6 +1787,8 @@ mod tests {
deletion_vector: None,
stats_parsed: None,
tags: None,
base_row_id: None,
default_row_commit_version: None,
};
let schema = ArrowSchema::new(vec![
Field::new("year", ArrowDataType::Int64, true),
Expand Down
6 changes: 6 additions & 0 deletions rust/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ pub enum DeltaTableError {
/// Source error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

#[error("Kernel: {source}")]
Kernel {
#[from]
source: crate::kernel::Error,
},
}

impl From<object_store::path::Error> for DeltaTableError {
Expand Down
6 changes: 3 additions & 3 deletions rust/src/kernel/actions/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_schema::{
SchemaRef as ArrowSchemaRef, TimeUnit,
};

use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType};
use super::super::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType};

impl TryFrom<&StructType> for ArrowSchema {
type Error = ArrowError;
Expand All @@ -14,7 +14,7 @@ impl TryFrom<&StructType> for ArrowSchema {
let fields = s
.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(f))
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;

Ok(ArrowSchema::new(fields))
Expand Down Expand Up @@ -129,7 +129,7 @@ impl TryFrom<&DataType> for ArrowDataType {
DataType::Struct(s) => Ok(ArrowDataType::Struct(
s.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(f))
.collect::<Result<Vec<ArrowField>, ArrowError>>()?
.into(),
)),
Expand Down
37 changes: 33 additions & 4 deletions rust/src/kernel/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
//! Actions are the fundamental unit of work in Delta Lake. Each action performs a single atomic
//! operation on the state of a Delta table. Actions are stored in the `_delta_log` directory of a
//! Delta table in JSON format. The log is a time series of actions that represent all the changes
//! made to a table.

use std::collections::HashMap;
use std::str::FromStr;

use arrow_array::{
BooleanArray, Int32Array, Int64Array, ListArray, MapArray, RecordBatch, StringArray,
Expand All @@ -7,17 +13,20 @@ use arrow_array::{
use either::Either;
use fix_hidden_lifetime_bug::fix_hidden_lifetime_bug;
use itertools::izip;
use serde::{Deserialize, Serialize};

use super::super::{DeltaResult, Error};
use super::{error::Error, DeltaResult};

pub(crate) mod arrow;
pub(crate) mod schemas;
mod serde_path;
pub(crate) mod types;

pub use schemas::get_log_schema;
pub use types::*;

#[derive(Debug)]
/// The type of action that was performed on the table
pub enum ActionType {
/// modify the data in a table by adding individual logical files
Add,
Expand All @@ -35,18 +44,36 @@ pub enum ActionType {
Remove,
/// The Row ID high-water mark tracks the largest ID that has been assigned to a row in the table.
RowIdHighWaterMark,
/// Transactional information
Txn,
}

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(missing_docs)]
pub enum Action {
Metadata(Metadata),
Protocol(Protocol),
Add(Add),
Remove(Remove),
Cdc(AddCDCFile),
Txn(Txn),
CommitInfo(CommitInfo),
DomainMetadata(DomainMetadata),
}

impl Action {
/// Create a commit info from a map
pub fn commit_info(info: HashMap<String, serde_json::Value>) -> Self {
Self::CommitInfo(CommitInfo {
info,
..Default::default()
})
}
}

#[fix_hidden_lifetime_bug]
#[allow(dead_code)]
pub(crate) fn parse_actions<'a>(
batch: &RecordBatch,
types: impl IntoIterator<Item = &'a ActionType>,
Expand Down Expand Up @@ -326,10 +353,12 @@ fn parse_actions_add(arr: &StructArray) -> DeltaResult<Box<dyn Iterator<Item = A
data_change,
partition_values: partition_values.unwrap_or_default(),
stats: stat.map(|v| v.to_string()),
tags: tags.unwrap_or_default(),
tags: tags,
base_row_id,
default_row_commit_version,
deletion_vector,
stats_parsed: None,
partition_values_parsed: None,
})
} else {
None
Expand Down Expand Up @@ -488,7 +517,7 @@ fn parse_dv(
maybe_cardinality,
) {
Some(DeletionVectorDescriptor {
storage_type: storage_type.into(),
storage_type: StorageType::from_str(storage_type).unwrap(),
path_or_inline_dv: path_or_inline_dv.into(),
size_in_bytes,
cardinality,
Expand Down
Loading

0 comments on commit de4d980

Please sign in to comment.