Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read schema from parquet files in datafusion scans #1266

Merged
merged 7 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,10 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let schema = Arc::new(<ArrowSchema as TryFrom<&schema::Schema>>::try_from(
DeltaTable::schema(self).unwrap(),
)?);
let schema = self
.state
.physical_arrow_schema(self.object_store())
.await?;

register_store(self, session.runtime_env().clone());

Expand Down
114 changes: 62 additions & 52 deletions rust/src/operations/load.rs
Original file line number Diff line number Diff line change
@@ -1,71 +1,40 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::storage::DeltaObjectStore;
use crate::{builder::ensure_table_uri, DeltaResult, DeltaTable};

use datafusion::datasource::TableProvider;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use futures::future::BoxFuture;

use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

#[derive(Debug, Clone)]
pub struct LoadBuilder {
location: Option<String>,
/// A snapshot of the to-be-loaded table's state
snapshot: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// A sub-selection of columns to be loaded
columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
object_store: Option<Arc<DeltaObjectStore>>,
}

impl Default for LoadBuilder {
fn default() -> Self {
Self::new()
}
}

impl LoadBuilder {
/// Create a new [`LoadBuilder`]
pub fn new() -> Self {
pub fn new(store: Arc<DeltaObjectStore>, snapshot: DeltaTableState) -> Self {
Self {
location: None,
snapshot,
store,
columns: None,
storage_options: None,
object_store: None,
}
}

/// Specify the path to the location where table data is stored,
/// which could be a path on distributed storage.
pub fn with_location(mut self, location: impl Into<String>) -> Self {
self.location = Some(location.into());
self
}

/// Specify column selection to load
pub fn with_columns(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.columns = Some(columns.into_iter().map(|s| s.into()).collect());
self
}

/// Set options used to initialize storage backend
///
/// Options may be passed in the HashMap or set as environment variables.
///
/// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
/// [crate::builder::azure_storage_options] describes the available options for the Azure backend.
/// [crate::builder::gcp_storage_options] describes the available options for the Google Cloud Platform backend.
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.storage_options = Some(storage_options);
self
}

/// Provide a [`DeltaObjectStore`] instance, that points at table location
pub fn with_object_store(mut self, object_store: Arc<DeltaObjectStore>) -> Self {
self.object_store = Some(object_store);
self
}
}

impl std::future::IntoFuture for LoadBuilder {
Expand All @@ -76,20 +45,31 @@ impl std::future::IntoFuture for LoadBuilder {
let this = self;

Box::pin(async move {
let object_store = this.object_store.unwrap();
let url = ensure_table_uri(object_store.root_uri())?;
let store = object_store.storage_backend().clone();
let mut table = DeltaTable::new(object_store, Default::default());
table.load().await?;
let table = DeltaTable::new_with_state(this.store, this.snapshot);
let schema = table.state.arrow_schema()?;
let projection = this
.columns
.map(|cols| {
cols.iter()
.map(|col| {
schema.column_with_name(col).map(|(idx, _)| idx).ok_or(
DeltaTableError::SchemaMismatch {
msg: format!("Column '{col}' does not exist in table schema."),
},
)
})
.collect::<Result<_, _>>()
})
.transpose()?;

let ctx = SessionContext::new();
ctx.state()
.runtime_env()
.register_object_store(url.scheme(), "", store);
let scan_plan = table.scan(&ctx.state(), None, &[], None).await?;
let scan_plan = table
.scan(&ctx.state(), projection.as_ref(), &[], None)
.await?;
let plan = CoalescePartitionsExec::new(scan_plan);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;

Ok((table, stream))
})
}
Expand Down Expand Up @@ -157,4 +137,34 @@ mod tests {
assert_eq!(batch.schema(), data[0].schema());
Ok(())
}

#[tokio::test]
async fn test_load_with_columns() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?;

let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?;
let data = collect_sendable_stream(stream).await?;

let expected = vec![
"+----+-------+",
"| id | value |",
"+----+-------+",
"| A | 1 |",
"| B | 2 |",
"| A | 3 |",
"| B | 4 |",
"| A | 5 |",
"| A | 6 |",
"| A | 7 |",
"| B | 8 |",
"| B | 9 |",
"| A | 10 |",
"| A | 11 |",
"+----+-------+",
];

assert_batches_sorted_eq!(&expected, &data);
Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl DeltaOps {
#[cfg(feature = "datafusion")]
#[must_use]
pub fn load(self) -> LoadBuilder {
LoadBuilder::default().with_object_store(self.0.object_store())
LoadBuilder::new(self.0.object_store(), self.0.state)
}

/// Write data to Delta table
Expand Down
39 changes: 39 additions & 0 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference};
use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use itertools::Either;
use object_store::ObjectStore;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;
Expand Down Expand Up @@ -82,6 +84,43 @@ impl DeltaTableState {

Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?)
}

/// Get the pysical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
pub async fn physical_arrow_schema(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can guarantee this schema is consistent across all parquet files in the table; different writers may have written to the table with different physical types for timestamps. IMO this should be handled in the scan of each Parquet file. That is, we should cast the physical type to microsecond timestamps as needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PyArrow, we handle the int96 timestamp issue by passing an argument to the reader to coerce it to microsecond precision. Maybe we could implement something similar upstream?

parquet_read_options=ParquetReadOptions(coerce_int96_timestamp_unit="ms")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There definitely are no guarantees as to the file schema being consistent. Datafusion however needs a consistent schema. Once we get into column mappings etc, things might get even more demanding and we may have to roll our own parquet scan, or rather start putting logic into our DeltaScan.

That said, I do believe using the schema from the latest file is an improvement over the current way, which at least for me fails for more or less every databricks written table where there are timestamps involved.

Not sure about the best way forward, but I'm happy to keep that logic on a private branch somewhere until we have a more general fix.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat related, so I already published it as WIP - in #1267 I did some work on the write command. There my plan was to use the same schema to validates writes. But there it would be even more confusing, since we might end up on situation, where writing the "official" schema of the chart would not be permissible. But somehow it feels very strange to me to have potentially many schemas in the same table.

i guess spark must allow at least some flexibility in what schema it expects at write time, otherwise how would we end up in this discussion at all :D.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we are definitely hitting the limits of DataFusion's scanner. I've created an issue upstream apache/datafusion#5950

I'm fine with moving this forward; I mostly care that we have a more robust implementation in the future and have at least some momentum towards it.

&self,
object_store: Arc<dyn ObjectStore>,
) -> DeltaResult<ArrowSchemaRef> {
if let Some(add) = self.files().iter().max_by_key(|obj| obj.modification_time) {
let file_meta = add.try_into()?;
let file_reader = ParquetObjectReader::new(object_store, file_meta);
let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()?
.schema()
.clone();

let table_schema = Arc::new(ArrowSchema::new(
self.arrow_schema()?
.fields
.clone()
.into_iter()
.map(|field| {
file_schema
.field_with_name(field.name())
.cloned()
.unwrap_or(field)
})
.collect(),
));

Ok(table_schema)
} else {
self.arrow_schema()
}
}
}

pub struct AddContainer<'a> {
Expand Down
35 changes: 34 additions & 1 deletion rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,40 @@ async fn test_datafusion_partitioned_types() -> Result<()> {
),
]);

assert_eq!(Arc::new(expected_schema), batches[0].schema());
assert_eq!(
Arc::new(expected_schema),
Arc::new(
batches[0]
.schema()
.as_ref()
.clone()
.with_metadata(Default::default())
)
);

Ok(())
}

#[tokio::test]
async fn test_datafusion_scan_timestamps() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/table_with_edge_timestamps")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;

let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?;

let expected = vec![
"+-------------------------------+---------------------+------------+",
"| BIG_DATE | NORMAL_DATE | SOME_VALUE |",
"+-------------------------------+---------------------+------------+",
"| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |",
"| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |",
"+-------------------------------+---------------------+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}