Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unity catalog import from write_deltalake #3630

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,12 @@
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
from daft.unity_catalog import UnityCatalogTable

_UNITY_AVAILABLE = True
try:
from daft.unity_catalog import UnityCatalogTable
desmondcheongzx marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
_UNITY_AVAILABLE = False

Check warning on line 882 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L881-L882

Added lines #L881 - L882 were not covered by tests

if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
Expand All @@ -884,30 +889,35 @@

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
# Retrieve table_uri and storage_options from various backends
table_uri: str
storage_options: dict

if isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})

Check warning on line 900 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L897-L900

Added lines #L897 - L900 were not covered by tests
else:
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
elif isinstance(table, UnityCatalogTable):
elif _UNITY_AVAILABLE and isinstance(table, UnityCatalogTable):

Check warning on line 906 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L906

Added line #L906 was not covered by tests
desmondcheongzx marked this conversation as resolved.
Show resolved Hide resolved
table_uri = table.table_uri
io_config = table.io_config
else:
elif isinstance(table, DataCatalogTable):

Check warning on line 909 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L909

Added line #L909 was not covered by tests
table_uri = table.table_uri(io_config)
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

Check warning on line 912 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L912

Added line #L912 was not covered by tests

if io_config is None:
raise ValueError(
"io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
"io_config was not provided to write_deltalake and could not be retrieved from defaults."
)

storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

# see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
scheme = get_protocol_from_path(table_uri)
Expand Down
Loading