Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added basic AWS Glue struct datatype support and JSON/CSV/PARQUET read_options #464

33 changes: 23 additions & 10 deletions dbt/adapters/duckdb/plugins/glue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from typing import Any
from typing import Dict
from typing import List
Expand Down Expand Up @@ -65,6 +66,16 @@ def _dbt2glue(dtype: str, ignore_null: bool = False) -> str: # pragma: no cover
return "date"
if data_type.lower() in ["blob", "bytea", "binary", "varbinary"]:
return "binary"
if data_type.lower() in ["struct"]:
struct_fields = re.findall(r"(\w+)\s+(\w+)", dtype[dtype.find("(") + 1 : dtype.rfind(")")])
glue_fields = []
for field_name, field_type in struct_fields:
glue_field_type = _dbt2glue(field_type)
glue_fields.append(f"{field_name}:{glue_field_type}")
struct_schema = f"struct<{','.join(glue_fields)}>"
if dtype.strip().endswith("[]"):
return f"array<{struct_schema}>"
return struct_schema
if data_type is None:
if ignore_null:
return ""
Expand Down Expand Up @@ -267,6 +278,7 @@ def _get_table_def(
def _get_glue_client(
settings: Dict[str, Any], secrets: Optional[List[Dict[str, Any]]]
) -> "GlueClient":
client = None
if secrets is not None:
for secret in secrets:
if isinstance(secret, Secret) and "config" == str(secret.provider).lower():
Expand All @@ -279,16 +291,17 @@ def _get_glue_client(
region_name=secret_kwargs.get("region"),
)
break
elif settings:
client = boto3.client(
"glue",
aws_access_key_id=settings.get("s3_access_key_id"),
aws_secret_access_key=settings.get("s3_secret_access_key"),
aws_session_token=settings.get("s3_session_token"),
region_name=settings.get("s3_region"),
)
else:
client = boto3.client("glue")
if client is None:
if settings:
client = boto3.client(
"glue",
aws_access_key_id=settings.get("s3_access_key_id"),
aws_secret_access_key=settings.get("s3_secret_access_key"),
aws_session_token=settings.get("s3_session_token"),
region_name=settings.get("s3_region"),
)
else:
client = boto3.client("glue")
return client


Expand Down
65 changes: 60 additions & 5 deletions dbt/include/duckdb/macros/materializations/external.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,22 @@

{%- set location = render(config.get('location', default=external_location(this, config))) -%})
{%- set rendered_options = render_write_options(config) -%}
{%- set format = config.get('format', 'parquet') -%}

{%- set format = config.get('format') -%}
{%- set allowed_formats = ['csv', 'parquet', 'json'] -%}

{%- if format -%}
{%- set format = format if format in allowed_formats else 'parquet' -%}
firewall413 marked this conversation as resolved.
Show resolved Hide resolved
{%- else -%}
{%- set format = location.split('.')[-1] if '.' in location else 'parquet' -%}
firewall413 marked this conversation as resolved.
Show resolved Hide resolved
{%- set format = format if format in allowed_formats else 'parquet' -%}
{%- endif -%}

{%- set write_options = adapter.external_write_options(location, rendered_options) -%}
{%- set read_location = adapter.external_read_location(location, rendered_options) -%}
{%- set parquet_read_options = config.get('parquet_read_options', {'union_by_name': False}) -%}
{%- set json_read_options = config.get('json_read_options', {'auto_detect': True}) -%}
{%- set csv_read_options = config.get('csv_read_options', {'auto_detect': True}) -%}

-- set language - python or sql
{%- set language = model['language'] -%}
Expand Down Expand Up @@ -45,13 +58,55 @@
{{- create_table_as(False, temp_relation, compiled_code, language) }}
{%- endcall %}

-- write an temp relation into file
{{ write_to_file(temp_relation, location, write_options) }}
-- create a view on top of the location
-- write a temp relation into file
firewall413 marked this conversation as resolved.
Show resolved Hide resolved
{{ write_to_file(temp_relation, location, write_options) }}

-- create a view on top of the location
{% call statement('main', language='sql') -%}
{% if format == 'json' %}
create or replace view {{ intermediate_relation }} as (
select * from read_json('{{ read_location }}'
{%- for key, value in json_read_options.items() -%}
, {{ key }}=
{%- if value is string -%}
'{{ value }}'
{%- else -%}
{{ value }}
{%- endif -%}
{%- endfor -%}
)
);
{% elif format == 'parquet' %}
create or replace view {{ intermediate_relation }} as (
select * from read_parquet('{{ read_location }}'
{%- for key, value in parquet_read_options.items() -%}
, {{ key }}=
{%- if value is string -%}
'{{ value }}'
{%- else -%}
{{ value }}
{%- endif -%}
{%- endfor -%}
)
);
{% elif format == 'csv' %}
create or replace view {{ intermediate_relation }} as (
select * from '{{ read_location }}'
select * from read_csv('{{ read_location }}'
{%- for key, value in csv_read_options.items() -%}
, {{ key }}=
{%- if value is string -%}
'{{ value }}'
{%- else -%}
{{ value }}
{%- endif -%}
{%- endfor -%}
)
);
{% else %}
create or replace view {{ intermediate_relation }} as (
firewall413 marked this conversation as resolved.
Show resolved Hide resolved
select * from '{{ read_location }}'
);
{% endif %}
{%- endcall %}

-- cleanup
Expand Down
Loading