Skip to content

Commit

Permalink
Merge pull request #25 from JasperHG90/chore/optional-catalog
Browse files Browse the repository at this point in the history
Feat: allow configuration with environment variables and config file
  • Loading branch information
JasperHG90 authored Nov 19, 2024
2 parents b284435 + a414d6d commit ae54873
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions src/dagster_pyiceberg/io_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class _IcebergCatalogProperties(TypedDict):
class _IcebergTableIOManagerResourceConfig(TypedDict):

name: str
config: _IcebergCatalogProperties
config: Optional[_IcebergCatalogProperties]
schema_: Optional[str]
db_io_manager: DbIoManagerImplementation
partition_spec_update_mode: PartitionSpecUpdateMode
schema_update_mode: SchemaUpdateMode

Expand Down Expand Up @@ -82,9 +84,14 @@ def connect(context, table_slice: TableSlice) -> Iterator[Catalog]:
resource_config = cast(
_IcebergTableIOManagerResourceConfig, context.resource_config
)
yield load_catalog(
name=resource_config["name"], **resource_config["config"]["properties"]
)
# Config passed as env variables or using config file.
# See: https://py.iceberg.apache.org/configuration/
if resource_config["config"] is None:
yield load_catalog(name=resource_config["name"])
else:
yield load_catalog(
name=resource_config["name"], **resource_config["config"]["properties"]
)


class IcebergIOManager(ConfigurableIOManagerFactory):
Expand Down Expand Up @@ -146,8 +153,10 @@ def my_table_a(my_table: pd.DataFrame):
"""

name: str = Field(description="The name of the iceberg catalog.")
config: IcebergCatalogConfig = Field(
description="Additional configuration properties for the iceberg catalog.",
config: Optional[IcebergCatalogConfig] = Field(
description="Additional configuration properties for the iceberg catalog. See <https://py.iceberg.apache.org/configuration/>"
" for passing these as environment variables or using a configuration file.",
default=None,
)
schema_: Optional[str] = Field(
default=None,
Expand Down Expand Up @@ -177,7 +186,8 @@ def default_load_type() -> Optional[Type]:
return None

def create_io_manager(self, context) -> DbIOManager:
self.config.model_dump()
if self.config is not None:
self.config.model_dump()
IoManagerImplementation = (
DbIOManager
if self.db_io_manager == DbIoManagerImplementation.default
Expand Down

0 comments on commit ae54873

Please sign in to comment.