Skip to content

Commit

Permalink
fix: double url encode of partition key (#1324)
Browse files Browse the repository at this point in the history
# Description
This PR is fixing #1228.

# Related Issue(s)

-  #1228
  • Loading branch information
mrjoe7 authored May 3, 2023
1 parent e046a77 commit 6b66455
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
6 changes: 3 additions & 3 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::{path::Path, ObjectMeta};
use object_store::ObjectMeta;
use url::Url;

use crate::builder::ensure_table_uri;
Expand Down Expand Up @@ -594,9 +594,8 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P
);
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(action.path.clone()),
last_modified,
size: action.size as usize,
..action.try_into().unwrap()
},
partition_values,
range: None,
Expand Down Expand Up @@ -952,6 +951,7 @@ mod tests {
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use object_store::path::Path;
use serde_json::json;
use std::ops::Deref;

Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl DeltaTableState {
Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?)
}

/// Get the pysical table schema.
/// Get the physical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
Expand Down
31 changes: 29 additions & 2 deletions rust/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,37 @@ impl TryFrom<&Add> for ObjectMeta {
Utc,
);
Ok(Self {
// TODO this won't work for absoute paths, since Paths are always relative to store.
location: Path::from(value.path.as_str()),
// TODO this won't work for absolute paths, since Paths are always relative to store.
location: Path::parse(value.path.as_str())?,
last_modified,
size: value.size as usize,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_object_meta_from_add_action() {
let add = Add {
path: "x=A%252FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
.to_string(),
size: 123,
modification_time: 123456789,
..Default::default()
};

let meta: ObjectMeta = (&add).try_into().unwrap();
assert_eq!(
meta.location,
Path::parse(
"x=A%252FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
)
.unwrap()
);
assert_eq!(meta.size, 123);
assert_eq!(meta.last_modified.timestamp_millis(), 123456789);
}
}
24 changes: 24 additions & 0 deletions rust/tests/read_delta_partitions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,27 @@ async fn read_null_partitions_from_checkpoint() {
let table = deltalake::open_table(&table.table_uri()).await.unwrap();
assert_eq!(table.version(), 2);
}

#[cfg(feature = "datafusion")]
#[tokio::test]
async fn load_from_delta_8_0_table_with_special_partition() {
use datafusion::physical_plan::SendableRecordBatchStream;
use deltalake::{DeltaOps, DeltaTable};
use futures::{future, StreamExt};

let table = deltalake::open_table("./tests/data/delta-0.8.0-special-partition")
.await
.unwrap();

let (_, stream): (DeltaTable, SendableRecordBatchStream) = DeltaOps(table)
.load()
.with_columns(vec!["x", "y"])
.await
.unwrap();
stream
.for_each(|batch| {
assert!(batch.is_ok());
future::ready(())
})
.await;
}

0 comments on commit 6b66455

Please sign in to comment.