Skip to content

Commit

Permalink
fix: don't re-encode paths (#1613)
Browse files Browse the repository at this point in the history
# Description

In the delta log, paths are percent encoded. We decode them here:


https://github.com/delta-io/delta-rs/blob/787c13a63efa9ada96d303c10c093424215aaa80/rust/src/action/mod.rs#L435-L437

Which is good. But then we've been re-encoding them with `Path::from`.
This PR changes to use `Path::parse` when possible instead. Instead of
propagating errors, we just fallback to `Path::from` for now. Read more
here:
https://docs.rs/object_store/0.7.0/object_store/path/struct.Path.html#encode

# Related Issue(s)

* closes #1533
* closes #1446 
* closes #1079
* closes #1393


# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored Sep 11, 2023
1 parent 78c4aab commit 41efefd
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 21 deletions.
31 changes: 21 additions & 10 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ pub struct DeltaFileSystemHandler {
pub(crate) config: FsConfig,
}

impl DeltaFileSystemHandler {
fn parse_path(path: &str) -> Path {
// Path::from will percent-encode the input, while Path::parse won't. So
// we should prefer Path::parse.
match Path::parse(path) {
Ok(path) => path,
Err(_) => Path::from(path),
}
}
}

#[pymethods]
impl DeltaFileSystemHandler {
#[new]
Expand Down Expand Up @@ -57,8 +68,8 @@ impl DeltaFileSystemHandler {
}

fn copy_file(&self, src: String, dest: String) -> PyResult<()> {
let from_path = Path::from(src);
let to_path = Path::from(dest);
let from_path = Self::parse_path(&src);
let to_path = Self::parse_path(&dest);
self.rt
.block_on(self.inner.copy(&from_path, &to_path))
.map_err(PythonError::from)?;
Expand All @@ -71,15 +82,15 @@ impl DeltaFileSystemHandler {
}

fn delete_dir(&self, path: String) -> PyResult<()> {
let path = Path::from(path);
let path = Self::parse_path(&path);
self.rt
.block_on(delete_dir(self.inner.as_ref(), &path))
.map_err(PythonError::from)?;
Ok(())
}

fn delete_file(&self, path: String) -> PyResult<()> {
let path = Path::from(path);
let path = Self::parse_path(&path);
self.rt
.block_on(self.inner.delete(&path))
.map_err(PythonError::from)?;
Expand All @@ -100,7 +111,7 @@ impl DeltaFileSystemHandler {

let mut infos = Vec::new();
for file_path in paths {
let path = Path::from(file_path);
let path = Self::parse_path(&file_path);
let listed = py.allow_threads(|| {
self.rt
.block_on(self.inner.list_with_delimiter(Some(&path)))
Expand Down Expand Up @@ -160,7 +171,7 @@ impl DeltaFileSystemHandler {
fs.call_method("FileInfo", (loc, type_), Some(kwargs.into_py_dict(py)))
};

let path = Path::from(base_dir);
let path = Self::parse_path(&base_dir);
let list_result = match self
.rt
.block_on(walk_tree(self.inner.clone(), &path, recursive))
Expand Down Expand Up @@ -216,8 +227,8 @@ impl DeltaFileSystemHandler {
}

fn move_file(&self, src: String, dest: String) -> PyResult<()> {
let from_path = Path::from(src);
let to_path = Path::from(dest);
let from_path = Self::parse_path(&src);
let to_path = Self::parse_path(&dest);
// TODO check the if not exists semantics
self.rt
.block_on(self.inner.rename(&from_path, &to_path))
Expand All @@ -226,7 +237,7 @@ impl DeltaFileSystemHandler {
}

fn open_input_file(&self, path: String) -> PyResult<ObjectInputFile> {
let path = Path::from(path);
let path = Self::parse_path(&path);
let file = self
.rt
.block_on(ObjectInputFile::try_new(
Expand All @@ -244,7 +255,7 @@ impl DeltaFileSystemHandler {
path: String,
#[allow(unused)] metadata: Option<HashMap<String, String>>,
) -> PyResult<ObjectOutputStream> {
let path = Path::from(path);
let path = Self::parse_path(&path);
let file = self
.rt
.block_on(ObjectOutputStream::try_new(
Expand Down
15 changes: 11 additions & 4 deletions python/tests/data_acceptance/test_reader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
from pathlib import Path
from typing import Any, Dict, NamedTuple, Optional

Expand Down Expand Up @@ -44,9 +45,7 @@ class ReadCase(NamedTuple):

failing_cases = {
"multi_partitioned_2": "Waiting for PyArrow 11.0.0 for decimal cast support (#1078)",
"nested_types": "Waiting for PyArrow 11.0.0 so we can ignore internal field names in equality",
"multi_partitioned": "Escaped characters in data file paths aren't yet handled (#1079)",
"no_stats": "We don't yet support files without stats (#582)",
"multi_partitioned": "Test case handles binary poorly",
}


Expand All @@ -66,6 +65,10 @@ def test_dat(case: ReadCase):
# Load table
dt = DeltaTable(str(delta_root), version=version)

# Verify table paths can be found
for path in dt.file_uris():
assert os.path.exists(path)

# Compare protocol versions
assert dt.protocol().min_reader_version == version_metadata["min_reader_version"]
assert dt.protocol().min_writer_version == version_metadata["min_writer_version"]
Expand All @@ -85,7 +88,11 @@ def test_dat(case: ReadCase):

def assert_tables_equal(first: pa.Table, second: pa.Table) -> None:
assert first.schema == second.schema
sort_keys = [(col, "ascending") for col in first.column_names]
sort_keys = [
(col, "ascending")
for i, col in enumerate(first.column_names)
if not pa.types.is_nested(first.schema.field(i).type)
]
first_sorted = first.sort_by(sort_keys)
second_sorted = second.sort_by(sort_keys)
assert first_sorted == second_sorted
22 changes: 22 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@ def test_read_table_with_stats():
assert data.num_rows == 0


def test_read_special_partition():
table_path = "../rust/tests/data/delta-0.8.0-special-partition"
dt = DeltaTable(table_path)

file1 = (
r"x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
)
file2 = (
r"x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
)

assert set(dt.files()) == {file1, file2}

assert dt.files([("x", "=", "A/A")]) == [file1]
assert dt.files([("x", "=", "B B")]) == [file2]
assert dt.files([("x", "=", "c")]) == []

table = dt.to_pyarrow_table()

assert set(table["x"].to_pylist()) == {"A/A", "B B"}


def test_read_partitioned_table_metadata():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
Expand Down
8 changes: 7 additions & 1 deletion rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,13 @@ impl DeltaTable {
) -> Result<Vec<Path>, DeltaTableError> {
Ok(self
.get_active_add_actions_by_partitions(filters)?
.map(|add| Path::from(add.path.as_ref()))
.map(|add| {
// Try to preserve percent encoding if possible
match Path::parse(&add.path) {
Ok(path) => path,
Err(_) => Path::from(add.path.as_ref()),
}
})
.collect())
}

Expand Down
13 changes: 8 additions & 5 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,14 @@ mod tests {
assert_eq!(
table.get_files(),
vec![
Path::from(
Path::parse(
"x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
),
Path::from(
)
.unwrap(),
Path::parse(
"x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
)
.unwrap()
]
);

Expand All @@ -429,9 +431,10 @@ mod tests {
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
vec![Path::from(
vec![Path::parse(
"x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
)]
)
.unwrap()]
);
}

Expand Down
5 changes: 4 additions & 1 deletion rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ impl DeltaTableState {
/// Returns an iterator of file names present in the loaded state
#[inline]
pub fn file_paths_iter(&self) -> impl Iterator<Item = Path> + '_ {
self.files.iter().map(|add| Path::from(add.path.as_ref()))
self.files.iter().map(|add| match Path::parse(&add.path) {
Ok(path) => path,
Err(_) => Path::from(add.path.as_ref()),
})
}

/// HashMap containing the last txn version stored for every app id writing txn
Expand Down

0 comments on commit 41efefd

Please sign in to comment.