From f56d8c98ce5656ff4aee93aa3861efcdb322eda3 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sun, 24 Mar 2024 13:45:35 -0500 Subject: [PATCH] feat: logical Node for find files (#2194) # Description Some of my first workings on David's proposal in #2006, this is also meant to push #2048 and general CDF forward as well by making the logical operations of delta tables more composable than they are today. # Related Issue(s) #2006 #2048 I think and @Blajda correct me here, we can build upon this and eventually move towards a `DeltaPlanner` esq enum for operations and their associated logical plan building. # Still to do - [ ] Implement different path for partition columns that don't require scanning the file - [ ] Plumbing into `DeltaScan` so delta scan can make use of this logical node - [ ] General polish and cleanup, there are lots of unnecessary fields and way things are built - [ ] More tests, there is currently a large integration style end to end test, but this can / should be broken down --- crates/benchmarks/src/bin/merge.rs | 1 + .../delta_datafusion/find_files/logical.rs | 98 ++++++ .../src/delta_datafusion/find_files/mod.rs | 286 ++++++++++++++++++ .../delta_datafusion/find_files/physical.rs | 144 +++++++++ crates/core/src/delta_datafusion/mod.rs | 50 ++- crates/core/src/kernel/snapshot/log_data.rs | 4 +- crates/core/src/table/state_arrow.rs | 8 +- python/src/lib.rs | 1 - python/src/schema.rs | 1 - 9 files changed, 570 insertions(+), 23 deletions(-) create mode 100644 crates/core/src/delta_datafusion/find_files/logical.rs create mode 100644 crates/core/src/delta_datafusion/find_files/mod.rs create mode 100644 crates/core/src/delta_datafusion/find_files/physical.rs diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index ea43171052..ddfe0bb239 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -199,6 +199,7 @@ async fn benchmark_merge_tpcds( table.snapshot()?.clone(), table.log_store(), DeltaScanConfig { + wrap_partition_values: true, file_column_name: Some("file_path".to_string()), }, ) diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs new file mode 100644 index 0000000000..6234cbe5c2 --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files/logical.rs @@ -0,0 +1,98 @@ +use std::collections::HashSet; +use std::hash::{Hash, Hasher}; + +use datafusion_common::DFSchemaRef; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; + +use crate::delta_datafusion::find_files::ONLY_FILES_DF_SCHEMA; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; + +#[derive(Debug, Clone)] +pub struct FindFilesNode { + id: String, + predicate: Expr, + table_state: DeltaTableState, + log_store: LogStoreRef, + version: i64, +} + +impl FindFilesNode { + pub fn new( + id: String, + table_state: DeltaTableState, + log_store: LogStoreRef, + predicate: Expr, + ) -> datafusion_common::Result { + let version = table_state.version(); + Ok(Self { + id, + predicate, + log_store, + table_state, + + version, + }) + } + + pub fn predicate(&self) -> Expr { + self.predicate.clone() + } + + pub fn state(&self) -> DeltaTableState { + self.table_state.clone() + } + + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() + } +} + +impl Eq for FindFilesNode {} + +impl PartialEq for FindFilesNode { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Hash for FindFilesNode { + fn hash(&self, state: &mut H) { + state.write(self.id.as_bytes()); + state.finish(); + } +} + +impl UserDefinedLogicalNodeCore for FindFilesNode { + fn name(&self) -> &str { + "FindFiles" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &ONLY_FILES_DF_SCHEMA + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn prevent_predicate_push_down_columns(&self) -> HashSet { + HashSet::new() + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "FindFiles[id={}, predicate=\"{}\", version={}]", + &self.id, self.predicate, self.version, + ) + } + + fn from_template(&self, _exprs: &[Expr], _inputs: &[LogicalPlan]) -> Self { + self.clone() + } +} diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs new file mode 100644 index 0000000000..65d113ee5b --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -0,0 +1,286 @@ +use arrow_array::cast::AsArray; +use std::sync::Arc; + +use arrow_array::types::UInt16Type; +use arrow_array::RecordBatch; +use arrow_schema::SchemaBuilder; +use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; +use arrow_select::concat::concat_batches; +use async_trait::async_trait; +use datafusion::datasource::MemTable; +use datafusion::execution::context::{QueryPlanner, SessionState}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::limit::LocalLimitExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; +use datafusion::prelude::SessionContext; +use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; +use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion_physical_expr::create_physical_expr; +use lazy_static::lazy_static; + +use crate::delta_datafusion::find_files::logical::FindFilesNode; +use crate::delta_datafusion::find_files::physical::FindFilesExec; +use crate::delta_datafusion::{ + df_logical_schema, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN, +}; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; +use crate::DeltaTableError; + +pub mod logical; +pub mod physical; + +lazy_static! { + static ref ONLY_FILES_SCHEMA: Arc = { + let mut builder = SchemaBuilder::new(); + builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + Arc::new(builder.finish()) + }; + static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef = + ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap(); +} + +struct FindFilesPlannerExtension {} + +struct FindFilesPlanner {} + +#[async_trait] +impl ExtensionPlanner for FindFilesPlannerExtension { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + _physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + if let Some(find_files_node) = node.as_any().downcast_ref::() { + return Ok(Some(Arc::new(FindFilesExec::new( + find_files_node.state(), + find_files_node.log_store(), + find_files_node.predicate(), + )?))); + } + Ok(None) + } +} + +#[async_trait] +impl QueryPlanner for FindFilesPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result> { + let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( + vec![Arc::new(FindFilesPlannerExtension {})], + ))); + planner + .create_physical_plan(logical_plan, session_state) + .await + } +} + +async fn scan_table_by_partitions(batch: RecordBatch, predicate: Expr) -> Result { + let mut arrays = Vec::new(); + let mut fields = Vec::new(); + + let schema = batch.schema(); + + arrays.push( + batch + .column_by_name("path") + .ok_or(DeltaTableError::Generic( + "Column with name `path` does not exist".to_owned(), + ))? + .to_owned(), + ); + fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + + for field in schema.fields() { + if field.name().starts_with("partition.") { + let name = field.name().strip_prefix("partition.").unwrap(); + + arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); + fields.push(Field::new( + name, + field.data_type().to_owned(), + field.is_nullable(), + )); + } + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema, arrays)?; + let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + + let ctx = SessionContext::new(); + let mut df = ctx.read_table(Arc::new(mem_table))?; + df = df + .filter(predicate.to_owned())? + .select(vec![col(PATH_COLUMN)])?; + let df_schema = df.schema().clone(); + let batches = df.collect().await?; + Ok(concat_batches(&SchemaRef::from(df_schema), &batches)?) +} + +async fn scan_table_by_files( + snapshot: DeltaTableState, + log_store: LogStoreRef, + state: SessionState, + expression: Expr, +) -> Result { + register_store(log_store.clone(), state.runtime_env().clone()); + let scan_config = DeltaScanConfigBuilder::new() + .wrap_partition_values(true) + .with_file_column(true) + .build(&snapshot)?; + + let logical_schema = df_logical_schema(&snapshot, &scan_config)?; + + // Identify which columns we need to project + let mut used_columns = expression + .to_columns()? + .into_iter() + .map(|column| logical_schema.index_of(&column.name)) + .collect::, ArrowError>>()?; + // Add path column + used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) + .with_filter(Some(expression.clone())) + .with_projection(Some(&used_columns)) + .with_scan_config(scan_config) + .build() + .await?; + + let scan = Arc::new(scan); + let input_schema = scan.logical_schema.as_ref().to_owned(); + let input_dfschema = input_schema.clone().try_into()?; + + let predicate_expr = create_physical_expr( + &Expr::IsTrue(Box::new(expression.clone())), + &input_dfschema, + state.execution_props(), + )?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); + let field_idx = input_schema.index_of(PATH_COLUMN)?; + let task_ctx = Arc::new(TaskContext::from(&state)); + let path_batches: Vec = datafusion::physical_plan::collect(limit, task_ctx) + .await? + .into_iter() + .map(|batch| { + let col = batch + .column(field_idx) + .as_dictionary::() + .values(); + RecordBatch::try_from_iter(vec![(PATH_COLUMN, col.clone())]).unwrap() + }) + .collect(); + + let result_batches = concat_batches(&ONLY_FILES_SCHEMA.clone(), &path_batches)?; + + Ok(result_batches) +} + +#[cfg(test)] +pub mod tests { + use std::sync::Arc; + + use datafusion::prelude::{DataFrame, SessionContext}; + use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; + use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan}; + + use crate::delta_datafusion::find_files::logical::FindFilesNode; + use crate::delta_datafusion::find_files::FindFilesPlanner; + use crate::operations::collect_sendable_stream; + use crate::{DeltaResult, DeltaTable, DeltaTableError}; + + pub async fn test_plan<'a>( + table: DeltaTable, + expr: Expr, + ) -> Result, DeltaTableError> { + let ctx = SessionContext::new(); + let state = ctx + .state() + .with_query_planner(Arc::new(FindFilesPlanner {})); + let find_files_node = LogicalPlan::Extension(Extension { + node: Arc::new(FindFilesNode::new( + "my_cool_plan".into(), + table.snapshot()?.clone(), + table.log_store().clone(), + expr, + )?), + }); + let df = DataFrame::new(state.clone(), find_files_node); + + let p = state + .clone() + .create_physical_plan(df.logical_plan()) + .await?; + + let e = p.execute(0, state.task_ctx())?; + collect_sendable_stream(e).await.map_err(Into::into) + } + + #[tokio::test] + pub async fn test_find_files_partitioned() -> DeltaResult<()> { + let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned").await?; + let expr: Expr = col("year").eq(lit(2020)); + let s = test_plan(table, expr).await?; + + assert_batches_eq! { + ["+---------------------------------------------------------------------------------------------+", + "| __delta_rs_path |", + "+---------------------------------------------------------------------------------------------+", + "| year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet |", + "| year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet |", + "| year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet |", + "+---------------------------------------------------------------------------------------------+"], + &s + } + Ok(()) + } + + #[tokio::test] + pub async fn test_find_files_unpartitioned() -> DeltaResult<()> { + let table = crate::open_table("../test/tests/data/simple_table").await?; + let expr: Expr = col("id").in_list(vec![lit(9i64), lit(7i64)], false); + let s = test_plan(table, expr).await?; + + assert_batches_sorted_eq! { + ["+---------------------------------------------------------------------+", + "| __delta_rs_path |", + "+---------------------------------------------------------------------+", + "| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |", + "| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |", + "+---------------------------------------------------------------------+"], + &s + } + Ok(()) + } + + #[tokio::test] + pub async fn test_find_files_unpartitioned2() -> DeltaResult<()> { + let table = crate::open_table("../test/tests/data/simple_table").await?; + let expr: Expr = col("id").is_not_null(); + let s = test_plan(table, expr).await?; + + assert_batches_sorted_eq! { + ["+---------------------------------------------------------------------+", + "| __delta_rs_path |", + "+---------------------------------------------------------------------+", + "| part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet |", + "| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |", + "| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |", + "+---------------------------------------------------------------------+"], + &s + } + Ok(()) + } +} diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs new file mode 100644 index 0000000000..9b9238dd86 --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -0,0 +1,144 @@ +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use datafusion::error::Result; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion::prelude::SessionContext; +use datafusion_common::tree_node::TreeNode; +use datafusion_expr::Expr; +use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use futures::stream::BoxStream; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; + +use crate::delta_datafusion::find_files::{ + scan_table_by_files, scan_table_by_partitions, ONLY_FILES_SCHEMA, +}; +use crate::delta_datafusion::FindFilesExprProperties; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; + +pub struct FindFilesExec { + predicate: Expr, + state: DeltaTableState, + log_store: LogStoreRef, +} + +impl FindFilesExec { + pub fn new(state: DeltaTableState, log_store: LogStoreRef, predicate: Expr) -> Result { + Ok(Self { + predicate, + log_store, + state, + }) + } +} + +struct FindFilesStream<'a> { + mem_stream: BoxStream<'a, Result>, +} + +impl<'a> FindFilesStream<'a> { + pub fn new(mem_stream: BoxStream<'a, Result>) -> Result { + Ok(Self { mem_stream }) + } +} + +impl<'a> RecordBatchStream for FindFilesStream<'a> { + fn schema(&self) -> SchemaRef { + ONLY_FILES_SCHEMA.clone() + } +} + +impl<'a> Stream for FindFilesStream<'a> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().mem_stream.poll_next_unpin(cx) + } +} + +impl Debug for FindFilesExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "FindFilesExec[predicate=\"{}\"]", self.predicate) + } +} + +impl DisplayAs for FindFilesExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "FindFilesExec[predicate=\"{}\"]", self.predicate) + } +} + +impl ExecutionPlan for FindFilesExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + ONLY_FILES_SCHEMA.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::RoundRobinBatch(num_cpus::get()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let current_metadata = self.state.metadata(); + let mut expr_properties = FindFilesExprProperties { + partition_only: true, + partition_columns: current_metadata.partition_columns.clone(), + result: Ok(()), + }; + + TreeNode::visit(&self.predicate, &mut expr_properties)?; + expr_properties.result?; + + if expr_properties.partition_only { + let actions_table = self.state.add_actions_table(true)?; + let predicate = self.predicate.clone(); + let schema = actions_table.schema(); + let mem_stream = + MemoryStream::try_new(vec![actions_table.clone()], schema.clone(), None)? + .and_then(move |batch| scan_table_by_partitions(batch, predicate.clone())) + .boxed(); + + Ok(Box::pin(FindFilesStream::new(mem_stream)?)) + } else { + let ctx = SessionContext::new(); + let state = ctx.state(); + let table_state = self.state.clone(); + let predicate = self.predicate.clone(); + let output_files = + scan_table_by_files(table_state, self.log_store.clone(), state, predicate); + + let mem_stream = output_files.into_stream().boxed(); + Ok(Box::pin(FindFilesStream::new(mem_stream)?)) + } + } +} diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index bcefcb55b8..057dcaca87 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -94,6 +94,8 @@ pub mod expr; pub mod logical; pub mod physical; +mod find_files; + impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { @@ -296,8 +298,9 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc) { env.register_object_store(url, store.object_store()); } -/// The logical schema for a Deltatable is different then protocol level schema since partiton columns must appear at the end of the schema. -/// This is to align with how partition are handled at the physical level +/// The logical schema for a Deltatable is different from the protocol level schema since partition +/// columns must appear at the end of the schema. This is to align with how partition are handled +/// at the physical level pub(crate) fn df_logical_schema( snapshot: &DeltaTableState, scan_config: &DeltaScanConfig, @@ -322,11 +325,7 @@ pub(crate) fn df_logical_schema( } if let Some(file_column_name) = &scan_config.file_column_name { - fields.push(Arc::new(Field::new( - file_column_name, - arrow_schema::DataType::Utf8, - true, - ))); + fields.push(Arc::new(Field::new(file_column_name, DataType::Utf8, true))); } Ok(Arc::new(ArrowSchema::new(fields))) @@ -335,13 +334,15 @@ pub(crate) fn df_logical_schema( #[derive(Debug, Clone, Default)] /// Used to specify if additional metadata columns are exposed to the user pub struct DeltaScanConfigBuilder { - /// Include the source path for each record. The name of this column is determine by `file_column_name` + /// Include the source path for each record. The name of this column is determined by `file_column_name` include_file_column: bool, /// Column name that contains the source path. /// /// If include_file_column is true and the name is None then it will be auto-generated /// Otherwise the user provided name will be used file_column_name: Option, + /// Whether to wrap partition values in a dictionary encoding to potentially save space + wrap_partition_values: Option, } impl DeltaScanConfigBuilder { @@ -365,6 +366,12 @@ impl DeltaScanConfigBuilder { self } + /// Whether to wrap partition values in a dictionary encoding + pub fn wrap_partition_values(mut self, wrap: bool) -> Self { + self.wrap_partition_values = Some(wrap); + self + } + /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult { let input_schema = snapshot.input_schema()?; @@ -401,7 +408,10 @@ impl DeltaScanConfigBuilder { } } - Ok(DeltaScanConfig { file_column_name }) + Ok(DeltaScanConfig { + file_column_name, + wrap_partition_values: self.wrap_partition_values.unwrap_or(true), + }) } } @@ -410,6 +420,8 @@ impl DeltaScanConfigBuilder { pub struct DeltaScanConfig { /// Include the source path for each record pub file_column_name: Option, + /// Wrap partition values in a dictionary encoding + pub wrap_partition_values: bool, } #[derive(Debug)] @@ -534,10 +546,12 @@ impl<'a> DeltaScanBuilder<'a> { let mut part = partitioned_file_from_action(action, table_partition_cols, &schema); if config.file_column_name.is_some() { - part.partition_values - .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( - action.path.clone(), - )))); + let partition_value = if config.wrap_partition_values { + wrap_partition_value_in_dict(ScalarValue::Utf8(Some(action.path.clone()))) + } else { + ScalarValue::Utf8(Some(action.path.clone())) + }; + part.partition_values.push(partition_value); } file_groups @@ -561,9 +575,14 @@ impl<'a> DeltaScanBuilder<'a> { .collect::, ArrowError>>()?; if let Some(file_column_name) = &config.file_column_name { + let field_name_datatype = if config.wrap_partition_values { + wrap_partition_type_in_dict(DataType::Utf8) + } else { + DataType::Utf8 + }; table_partition_cols.push(Field::new( file_column_name.clone(), - wrap_partition_type_in_dict(DataType::Utf8), + field_name_datatype, false, )); } @@ -1300,6 +1319,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { } } +#[derive(Debug, Hash, Eq, PartialEq)] /// Representing the result of the [find_files] function. pub struct FindFiles { /// A list of `Add` objects that match the given predicate @@ -1353,7 +1373,7 @@ fn join_batches_with_add_actions( Ok(files) } -/// Determine which files contain a record that statisfies the predicate +/// Determine which files contain a record that satisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, log_store: LogStoreRef, diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 686a4110fe..972aeb6f9a 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -120,9 +120,9 @@ impl<'a> DeletionVectorView<'a> { } } -/// A view into the log data representiang a single logical file. +/// A view into the log data representing a single logical file. /// -/// This stuct holds a pointer to a specific row in the log data and provides access to the +/// This struct holds a pointer to a specific row in the log data and provides access to the /// information stored in that row by tracking references to the underlying arrays. /// /// Additionally, references to some table metadata is tracked to provide higher level diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index 2c9ed74ce3..096234dccb 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -91,7 +91,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(partition_cols_batch.columns().iter().map(Arc::clone)), + .zip(partition_cols_batch.columns().iter().cloned()), ) } @@ -103,7 +103,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(stats.columns().iter().map(Arc::clone)), + .zip(stats.columns().iter().cloned()), ); } if files.iter().any(|add| add.deletion_vector.is_some()) { @@ -114,7 +114,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(delvs.columns().iter().map(Arc::clone)), + .zip(delvs.columns().iter().cloned()), ); } if files.iter().any(|add| { @@ -129,7 +129,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(tags.columns().iter().map(Arc::clone)), + .zip(tags.columns().iter().cloned()), ); } diff --git a/python/src/lib.rs b/python/src/lib.rs index 445ef921ea..2401b17a0e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -6,7 +6,6 @@ mod schema; mod utils; use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; use std::future::IntoFuture; use std::sync::Arc; use std::time; diff --git a/python/src/schema.rs b/python/src/schema.rs index c56010f131..0d0823cbff 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -13,7 +13,6 @@ use deltalake::kernel::{ use pyo3::exceptions::{PyException, PyNotImplementedError, PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::IntoPyDict; -use pyo3::{PyRef, PyResult}; use std::collections::HashMap; // PyO3 doesn't yet support converting classes with inheritance with Python