diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index d52c75dbce..2aa0f5b73c 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -3,9 +3,9 @@ use arrow::datatypes::Schema as ArrowSchema; // NOTE: Temporarily allowing these deprecated imports pending the completion of: // +use arrow::error::ArrowError; #[allow(deprecated)] use arrow::json::reader::{Decoder, DecoderOptions}; -use arrow::error::ArrowError; use chrono::{DateTime, Datelike, Duration, Utc}; use futures::StreamExt; use lazy_static::lazy_static; diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index a56b79ac57..7f64173a17 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -9,6 +9,7 @@ use arrow::error::ArrowError; use lazy_static::lazy_static; use regex::Regex; use std::convert::TryFrom; +use std::sync::Arc; impl TryFrom<&schema::Schema> for ArrowSchema { type Error = ArrowError; @@ -64,14 +65,17 @@ impl TryFrom<&schema::SchemaTypeMap> for ArrowField { fn try_from(a: &schema::SchemaTypeMap) -> Result { Ok(ArrowField::new( "entries", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::try_from(a.get_key_type())?, false), - ArrowField::new( - "value", - ArrowDataType::try_from(a.get_value_type())?, - a.get_value_contains_null(), - ), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::try_from(a.get_key_type())?, false), + ArrowField::new( + "value", + ArrowDataType::try_from(a.get_value_type())?, + a.get_value_contains_null(), + ), + ] + .into(), + ), false, // always non-null )) } @@ -131,34 +135,38 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { s.get_fields() .iter() .map(>::try_from) - .collect::, ArrowError>>()?, + .collect::, ArrowError>>()? + .into(), )), schema::SchemaDataType::array(a) => { - Ok(ArrowDataType::List(Box::new(>::try_from( a )?))) } schema::SchemaDataType::map(m) => Ok(ArrowDataType::Map( - Box::new(ArrowField::new( + Arc::new(ArrowField::new( "entries", - ArrowDataType::Struct(vec![ - ArrowField::new( - "keys", - >::try_from( - m.get_key_type(), - )?, - false, - ), - ArrowField::new( - "values", - >::try_from( - m.get_value_type(), - )?, - m.get_value_contains_null(), - ), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new( + "keys", + >::try_from( + m.get_key_type(), + )?, + false, + ), + ArrowField::new( + "values", + >::try_from( + m.get_value_type(), + )?, + m.get_value_contains_null(), + ), + ] + .into(), + ), true, )), false, @@ -175,7 +183,7 @@ impl TryFrom<&ArrowSchema> for schema::Schema { .iter() .map(|field| field.try_into()) .collect(); - Ok(schema::Schema::new(new_fields?)) + Ok(schema::Schema::new(new_fields?.into())) } } @@ -306,75 +314,101 @@ pub(crate) fn delta_log_schema_for_table( static ref SCHEMA_FIELDS: Vec = vec![ ArrowField::new( "metaData", - ArrowDataType::Struct(vec![ - ArrowField::new("id", ArrowDataType::Utf8, true), - ArrowField::new("name", ArrowDataType::Utf8, true), - ArrowField::new("description", ArrowDataType::Utf8, true), - ArrowField::new("schemaString", ArrowDataType::Utf8, true), - ArrowField::new("createdTime", ArrowDataType::Int64, true), - ArrowField::new( - "partitionColumns", - ArrowDataType::List(Box::new(ArrowField::new( - "element", - ArrowDataType::Utf8, + ArrowDataType::Struct( + vec![ + ArrowField::new("id", ArrowDataType::Utf8, true), + ArrowField::new("name", ArrowDataType::Utf8, true), + ArrowField::new("description", ArrowDataType::Utf8, true), + ArrowField::new("schemaString", ArrowDataType::Utf8, true), + ArrowField::new("createdTime", ArrowDataType::Int64, true), + ArrowField::new( + "partitionColumns", + ArrowDataType::List(Arc::new(ArrowField::new( + "element", + ArrowDataType::Utf8, + true + ))), true - ))), - true - ), - ArrowField::new( - "configuration", - ArrowDataType::Map( - Box::new(ArrowField::new( - "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), - false - )), - false ), - true - ), - ArrowField::new( - "format", - ArrowDataType::Struct(vec![ - ArrowField::new("provider", ArrowDataType::Utf8, true), - ArrowField::new( - "options", - ArrowDataType::Map( - Box::new(ArrowField::new( - "key_value", - ArrowDataType::Struct(vec![ + ArrowField::new( + "configuration", + ArrowDataType::Map( + Arc::new(ArrowField::new( + "key_value", + ArrowDataType::Struct( + vec![ ArrowField::new("key", ArrowDataType::Utf8, false), ArrowField::new("value", ArrowDataType::Utf8, true), - ]), - false - )), + ] + .into() + ), false - ), + )), false - ) - ]), - true - ), - ]), + ), + true + ), + ArrowField::new( + "format", + ArrowDataType::Struct( + vec![ + ArrowField::new("provider", ArrowDataType::Utf8, true), + ArrowField::new( + "options", + ArrowDataType::Map( + Arc::new(ArrowField::new( + "key_value", + ArrowDataType::Struct( + vec![ + ArrowField::new( + "key", + ArrowDataType::Utf8, + false + ), + ArrowField::new( + "value", + ArrowDataType::Utf8, + true + ), + ] + .into() + ), + false + )), + false + ), + false + ) + ] + .into() + ), + true + ), + ] + .into() + ), true ), ArrowField::new( "protocol", - ArrowDataType::Struct(vec![ - ArrowField::new("minReaderVersion", ArrowDataType::Int32, true), - ArrowField::new("minWriterVersion", ArrowDataType::Int32, true), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("minReaderVersion", ArrowDataType::Int32, true), + ArrowField::new("minWriterVersion", ArrowDataType::Int32, true), + ] + .into() + ), true ), ArrowField::new( "txn", - ArrowDataType::Struct(vec![ - ArrowField::new("appId", ArrowDataType::Utf8, true), - ArrowField::new("version", ArrowDataType::Int64, true), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("appId", ArrowDataType::Utf8, true), + ArrowField::new("version", ArrowDataType::Int64, true), + ] + .into() + ), true ), ]; @@ -387,12 +421,15 @@ pub(crate) fn delta_log_schema_for_table( ArrowField::new( "partitionValues", ArrowDataType::Map( - Box::new(ArrowField::new( + Arc::new(ArrowField::new( "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::Utf8, false), + ArrowField::new("value", ArrowDataType::Utf8, true), + ] + .into() + ), false )), false @@ -402,12 +439,15 @@ pub(crate) fn delta_log_schema_for_table( ArrowField::new( "tags", ArrowDataType::Map( - Box::new(ArrowField::new( + Arc::new(ArrowField::new( "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::Utf8, false), + ArrowField::new("value", ArrowDataType::Utf8, true), + ] + .into() + ), false )), false @@ -426,12 +466,15 @@ pub(crate) fn delta_log_schema_for_table( ArrowField::new( "partitionValues", ArrowDataType::Map( - Box::new(ArrowField::new( + Arc::new(ArrowField::new( "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::Utf8, false), + ArrowField::new("value", ArrowDataType::Utf8, true), + ] + .into() + ), false )), false @@ -441,12 +484,15 @@ pub(crate) fn delta_log_schema_for_table( ArrowField::new( "tags", ArrowDataType::Map( - Box::new(ArrowField::new( + Arc::new(ArrowField::new( "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::Utf8, false), + ArrowField::new("value", ArrowDataType::Utf8, true), + ] + .into() + ), false )), false @@ -470,31 +516,36 @@ pub(crate) fn delta_log_schema_for_table( .iter() .for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f)); - stats_parsed_fields.extend( - ["minValues", "maxValues"].into_iter().map(|name| { - ArrowField::new(name, ArrowDataType::Struct(max_min_vec.clone()), true) - }), - ); + stats_parsed_fields.extend(["minValues", "maxValues"].into_iter().map(|name| { + ArrowField::new( + name, + ArrowDataType::Struct(max_min_vec.clone().into()), + true, + ) + })); let mut null_count_vec = Vec::new(); non_partition_fields .iter() .for_each(|f| null_count_schema_for_fields(&mut null_count_vec, f)); - let null_count_struct = - ArrowField::new("nullCount", ArrowDataType::Struct(null_count_vec), true); + let null_count_struct = ArrowField::new( + "nullCount", + ArrowDataType::Struct(null_count_vec.into()), + true, + ); stats_parsed_fields.push(null_count_struct); } let mut add_fields = ADD_FIELDS.clone(); add_fields.push(ArrowField::new( "stats_parsed", - ArrowDataType::Struct(stats_parsed_fields), + ArrowDataType::Struct(stats_parsed_fields.into()), true, )); if !partition_fields.is_empty() { add_fields.push(ArrowField::new( "partitionValues_parsed", - ArrowDataType::Struct(partition_fields), + ArrowDataType::Struct(partition_fields.into()), true, )); } @@ -509,12 +560,12 @@ pub(crate) fn delta_log_schema_for_table( let mut schema_fields = SCHEMA_FIELDS.clone(); schema_fields.push(ArrowField::new( "add", - ArrowDataType::Struct(add_fields), + ArrowDataType::Struct(add_fields.into()), true, )); schema_fields.push(ArrowField::new( "remove", - ArrowDataType::Struct(remove_fields), + ArrowDataType::Struct(remove_fields.into()), true, )); @@ -534,7 +585,7 @@ fn max_min_schema_for_fields(dest: &mut Vec, f: &ArrowField) { dest.push(ArrowField::new( f.name(), - ArrowDataType::Struct(child_dest), + ArrowDataType::Struct(child_dest.into()), true, )); } @@ -558,7 +609,7 @@ fn null_count_schema_for_fields(dest: &mut Vec, f: &ArrowField) { dest.push(ArrowField::new( f.name(), - ArrowDataType::Struct(child_dest), + ArrowDataType::Struct(child_dest.into()), true, )); } diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index b8544d4e97..e7f0a7e233 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -337,10 +337,7 @@ impl PruningStatistics for DeltaTable { fn register_store(table: &DeltaTable, env: Arc) { let object_store_url = table.storage.object_store_url(); let url: &Url = object_store_url.as_ref(); - env.register_object_store( - &url, - table.object_store(), - ); + env.register_object_store(&url, table.object_store()); } #[async_trait] diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 2a3e0f31f2..f6a6654047 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -83,9 +83,7 @@ impl std::future::IntoFuture for LoadBuilder { table.load().await?; let ctx = SessionContext::new(); - ctx.state() - .runtime_env() - .register_object_store(&url, store); + ctx.state().runtime_env().register_object_store(&url, store); let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state()));