diff --git a/dbt/adapters/duckdb/plugins/glue.py b/dbt/adapters/duckdb/plugins/glue.py index cf447dde..1735bf1b 100644 --- a/dbt/adapters/duckdb/plugins/glue.py +++ b/dbt/adapters/duckdb/plugins/glue.py @@ -1,3 +1,4 @@ +import re from typing import Any from typing import Dict from typing import List @@ -65,6 +66,16 @@ def _dbt2glue(dtype: str, ignore_null: bool = False) -> str: # pragma: no cover return "date" if data_type.lower() in ["blob", "bytea", "binary", "varbinary"]: return "binary" + if data_type.lower() in ["struct"]: + struct_fields = re.findall(r"(\w+)\s+(\w+)", dtype[dtype.find("(") + 1 : dtype.rfind(")")]) + glue_fields = [] + for field_name, field_type in struct_fields: + glue_field_type = _dbt2glue(field_type) + glue_fields.append(f"{field_name}:{glue_field_type}") + struct_schema = f"struct<{','.join(glue_fields)}>" + if dtype.strip().endswith("[]"): + return f"array<{struct_schema}>" + return struct_schema if data_type is None: if ignore_null: return "" @@ -267,6 +278,7 @@ def _get_table_def( def _get_glue_client( settings: Dict[str, Any], secrets: Optional[List[Dict[str, Any]]] ) -> "GlueClient": + client = None if secrets is not None: for secret in secrets: if isinstance(secret, Secret) and "config" == str(secret.provider).lower(): @@ -279,16 +291,17 @@ def _get_glue_client( region_name=secret_kwargs.get("region"), ) break - elif settings: - client = boto3.client( - "glue", - aws_access_key_id=settings.get("s3_access_key_id"), - aws_secret_access_key=settings.get("s3_secret_access_key"), - aws_session_token=settings.get("s3_session_token"), - region_name=settings.get("s3_region"), - ) - else: - client = boto3.client("glue") + if client is None: + if settings: + client = boto3.client( + "glue", + aws_access_key_id=settings.get("s3_access_key_id"), + aws_secret_access_key=settings.get("s3_secret_access_key"), + aws_session_token=settings.get("s3_session_token"), + region_name=settings.get("s3_region"), + ) + else: + client = boto3.client("glue") return client diff --git a/dbt/include/duckdb/macros/materializations/external.sql b/dbt/include/duckdb/macros/materializations/external.sql index 91e9b5ab..70d7dae9 100644 --- a/dbt/include/duckdb/macros/materializations/external.sql +++ b/dbt/include/duckdb/macros/materializations/external.sql @@ -2,9 +2,23 @@ {%- set location = render(config.get('location', default=external_location(this, config))) -%}) {%- set rendered_options = render_write_options(config) -%} - {%- set format = config.get('format', 'parquet') -%} + + {%- set format = config.get('format') -%} + {%- set allowed_formats = ['csv', 'parquet', 'json'] -%} + {%- if format -%} + {%- if format not in allowed_formats -%} + {{ exceptions.raise_compiler_error("Invalid format: " ~ format ~ ". Allowed formats are: " ~ allowed_formats | join(', ')) }} + {%- endif -%} + {%- else -%} + {%- set format = location.split('.')[-1].lower() if '.' in location else 'parquet' -%} + {%- set format = format if format in allowed_formats else 'parquet' -%} + {%- endif -%} + {%- set write_options = adapter.external_write_options(location, rendered_options) -%} {%- set read_location = adapter.external_read_location(location, rendered_options) -%} + {%- set parquet_read_options = config.get('parquet_read_options', {'union_by_name': False}) -%} + {%- set json_read_options = config.get('json_read_options', {'auto_detect': True}) -%} + {%- set csv_read_options = config.get('csv_read_options', {'auto_detect': True}) -%} -- set language - python or sql {%- set language = model['language'] -%} @@ -45,13 +59,51 @@ {{- create_table_as(False, temp_relation, compiled_code, language) }} {%- endcall %} - -- write an temp relation into file + -- write a temp relation into file {{ write_to_file(temp_relation, location, write_options) }} - -- create a view on top of the location + +-- create a view on top of the location {% call statement('main', language='sql') -%} + {% if format == 'json' %} + create or replace view {{ intermediate_relation }} as ( + select * from read_json('{{ read_location }}' + {%- for key, value in json_read_options.items() -%} + , {{ key }}= + {%- if value is string -%} + '{{ value }}' + {%- else -%} + {{ value }} + {%- endif -%} + {%- endfor -%} + ) + ); + {% elif format == 'parquet' %} + create or replace view {{ intermediate_relation }} as ( + select * from read_parquet('{{ read_location }}' + {%- for key, value in parquet_read_options.items() -%} + , {{ key }}= + {%- if value is string -%} + '{{ value }}' + {%- else -%} + {{ value }} + {%- endif -%} + {%- endfor -%} + ) + ); + {% elif format == 'csv' %} create or replace view {{ intermediate_relation }} as ( - select * from '{{ read_location }}' + select * from read_csv('{{ read_location }}' + {%- for key, value in csv_read_options.items() -%} + , {{ key }}= + {%- if value is string -%} + '{{ value }}' + {%- else -%} + {{ value }} + {%- endif -%} + {%- endfor -%} + ) ); + {% endif %} {%- endcall %} -- cleanup