Skip to content

Commit

Permalink
model contracts + constraints with nested fields (#738) (#772)
Browse files Browse the repository at this point in the history
(cherry picked from commit b1e950b)
  • Loading branch information
MichelleArk authored Jun 14, 2023
1 parent 05f2b0f commit 5d821c4
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 15 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230601-141255.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support model contracts + constraints on nested columns
time: 2023-06-01T14:12:55.433346-04:00
custom:
Author: MichelleArk
Issue: "673"
133 changes: 132 additions & 1 deletion dbt/adapters/bigquery/column.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Optional, List, TypeVar, Iterable, Type, Any
from typing import Optional, List, TypeVar, Iterable, Type, Any, Dict, Union

from dbt.adapters.base.column import Column

Expand Down Expand Up @@ -126,3 +126,134 @@ def column_to_bq_schema(self) -> SchemaField:
kwargs = {"fields": fields}

return SchemaField(self.name, self.dtype, self.mode, **kwargs) # type: ignore[arg-type]


def get_nested_column_data_types(
columns: Dict[str, Dict[str, Any]],
constraints: Optional[Dict[str, str]] = None,
) -> Dict[str, Dict[str, str]]:
"""
columns:
* Dictionary where keys are of flat columns names and values are dictionary of column attributes
* column names with "." indicate a nested column within a STRUCT type
* e.g. {"a": {"name": "a", "data_type": "string", ...}}
constraints:
* Dictionary where keys are flat column names and values are rendered constraints for the column
* If provided, rendered column is included in returned "data_type" values.
returns:
* Dictionary where keys are root column names and values are corresponding nested data_type values.
* Fields other than "name" and "data_type" are __not__ preserved in the return value for nested columns.
* Fields other than "name" and "data_type" are preserved in the return value for flat columns.
Example:
columns: {
"a": {"name": "a", "data_type": "string", "description": ...},
"b.nested": {"name": "b.nested", "data_type": "string"},
"b.nested2": {"name": "b.nested2", "data_type": "string"}
}
returns: {
"a": {"name": "a", "data_type": "string"},
"b": {"name": "b": "data_type": "struct<nested string, nested2 string>}
}
"""
constraints = constraints or {}

nested_column_data_types: Dict[str, Union[str, Dict]] = {}
for column in columns.values():
_update_nested_column_data_types(
column["name"],
column["data_type"],
constraints.get(column["name"]),
nested_column_data_types,
)

formatted_nested_column_data_types: Dict[str, Dict[str, str]] = {}
for column_name, unformatted_column_type in nested_column_data_types.items():
formatted_nested_column_data_types[column_name] = {
"name": column_name,
"data_type": _format_nested_data_type(unformatted_column_type),
}

# add column configs back to flat columns
for column_name in formatted_nested_column_data_types:
if column_name in columns:
formatted_nested_column_data_types[column_name].update(
{
k: v
for k, v in columns[column_name].items()
if k not in formatted_nested_column_data_types[column_name]
}
)

return formatted_nested_column_data_types


def _update_nested_column_data_types(
column_name: str,
column_data_type: str,
column_rendered_constraint: Optional[str],
nested_column_data_types: Dict[str, Union[str, Dict]],
) -> None:
"""
Recursively update nested_column_data_types given a column_name, column_data_type, and optional column_rendered_constraint.
Examples:
>>> nested_column_data_types = {}
>>> BigQueryAdapter._update_nested_column_data_types("a", "string", "not_null", nested_column_data_types)
>>> nested_column_data_types
{"a": "string not null"}
>>> BigQueryAdapter._update_nested_column_data_types("b.c", "string", "not_null", nested_column_data_types)
>>> nested_column_data_types
{"a": "string not null", "b": {"c": "string not null"}}
>>> BigQueryAdapter._update_nested_column_data_types("b.d", "string", None, nested_column_data_types)
>>> nested_column_data_types
{"a": "string not null", "b": {"c": "string not null", "d": "string"}}
"""
column_name_parts = column_name.split(".")
root_column_name = column_name_parts[0]

if len(column_name_parts) == 1:
# Base case: column is not nested - store its data_type concatenated with constraint if provided.
nested_column_data_types[root_column_name] = (
column_data_type
if column_rendered_constraint is None
else f"{column_data_type} {column_rendered_constraint}"
)
else:
# Initialize nested dictionary
if root_column_name not in nested_column_data_types:
nested_column_data_types[root_column_name] = {}

# Recursively process rest of remaining column name
remaining_column_name = ".".join(column_name_parts[1:])
remaining_column_data_types = nested_column_data_types[root_column_name]
assert isinstance(remaining_column_data_types, dict) # keeping mypy happy
_update_nested_column_data_types(
remaining_column_name,
column_data_type,
column_rendered_constraint,
remaining_column_data_types,
)


def _format_nested_data_type(unformatted_nested_data_type: Union[str, Dict[str, Any]]) -> str:
"""
Recursively format a (STRUCT) data type given an arbitrarily nested data type structure.
Examples:
>>> BigQueryAdapter._format_nested_data_type("string")
'string'
>>> BigQueryAdapter._format_nested_data_type({'c': 'string not_null', 'd': 'string'})
'struct<c string not_null, d string>'
>>> BigQueryAdapter._format_nested_data_type({'c': 'string not_null', 'd': {'e': 'string'}})
'struct<c string not_null, d struct<e string>>'
"""
if isinstance(unformatted_nested_data_type, str):
return unformatted_nested_data_type
else:
formatted_nested_types = [
f"{column_name} {_format_nested_data_type(column_type)}"
for column_name, column_type in unformatted_nested_data_type.items()
]
return f"""struct<{", ".join(formatted_nested_types)}>"""
40 changes: 39 additions & 1 deletion dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from dbt.adapters.cache import _make_ref_key_dict

from dbt.adapters.bigquery.column import get_nested_column_data_types
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset
from dbt.adapters.bigquery import BigQueryColumn
Expand Down Expand Up @@ -269,6 +270,15 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
return False
return True

@available.parse(lambda *a, **k: {})
@classmethod
def nest_column_data_types(
cls,
columns: Dict[str, Dict[str, Any]],
constraints: Optional[Dict[str, str]] = None,
) -> Dict[str, Dict[str, str]]:
return get_nested_column_data_types(columns, constraints)

def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryColumn]:
try:
table = self.connections.get_bq_table(
Expand Down Expand Up @@ -503,7 +513,10 @@ def get_column_schema_from_query(self, sql: str) -> List[BigQueryColumn]:
"""
_, iterator = self.connections.raw_execute(sql)
columns = [self.Column.create_from_field(field) for field in iterator.schema]
return columns
flattened_columns = []
for column in columns:
flattened_columns += column.flatten()
return flattened_columns

@available.parse(lambda *a, **k: False)
def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]:
Expand Down Expand Up @@ -944,6 +957,31 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
"serverless": ServerlessDataProcHelper,
}

@available
@classmethod
def render_raw_columns_constraints(cls, raw_columns: Dict[str, Dict[str, Any]]) -> List:
rendered_constraints: Dict[str, str] = {}
for raw_column in raw_columns.values():
for con in raw_column.get("constraints", None):
constraint = cls._parse_column_constraint(con)
rendered_constraint = cls.process_parsed_constraint(
constraint, cls.render_column_constraint
)

if rendered_constraint:
column_name = raw_column["name"]
if column_name not in rendered_constraints:
rendered_constraints[column_name] = rendered_constraint
else:
rendered_constraints[column_name] += f" {rendered_constraint}"

nested_columns = cls.nest_column_data_types(raw_columns, rendered_constraints)
rendered_column_constraints = [
f"{cls.quote(column['name']) if column.get('quote') else column['name']} {column['data_type']}"
for column in nested_columns.values()
]
return rendered_column_constraints

@classmethod
def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]:
c = super().render_column_constraint(constraint)
Expand Down
22 changes: 22 additions & 0 deletions dbt/include/bigquery/macros/utils/get_columns_spec_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,25 @@
{% set formatted = column.column.lower() ~ " " ~ data_type %}
{{ return({'name': column.name, 'data_type': data_type, 'formatted': formatted}) }}
{%- endmacro -%}

{% macro bigquery__get_empty_schema_sql(columns) %}
{%- set columns = adapter.nest_column_data_types(columns) -%}
{{ return(dbt.default__get_empty_schema_sql(columns)) }}
{% endmacro %}

{% macro bigquery__get_select_subquery(sql) %}
select {{ adapter.dispatch('get_column_names')() }}
from (
{{ sql }}
) as model_subq
{%- endmacro %}

{% macro bigquery__get_column_names() %}
{#- loop through nested user_provided_columns to get column names -#}
{%- set user_provided_columns = adapter.nest_column_data_types(model['columns']) -%}
{%- for i in user_provided_columns %}
{%- set col = user_provided_columns[i] -%}
{%- set col_name = adapter.quote(col['name']) if col.get('quote') else col['name'] -%}
{{ col_name }}{{ ", " if not loop.last }}
{%- endfor -%}
{% endmacro %}
118 changes: 118 additions & 0 deletions tests/functional/adapter/constraints/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
my_model_struct_wrong_data_type_sql = """
{{ config(materialized = "table") }}
select
STRUCT(1 AS struct_column_being_tested, "test" AS another_struct_column) as a
"""

my_model_struct_correct_data_type_sql = """
{{ config(materialized = "table")}}
select
STRUCT("test" AS struct_column_being_tested, "test" AS b) as a
"""

model_struct_data_type_schema_yml = """
version: 2
models:
- name: contract_struct_wrong
config:
contract:
enforced: true
columns:
- name: a.struct_column_being_tested
data_type: string
- name: a.b
data_type: string
- name: contract_struct_correct
config:
contract:
enforced: true
columns:
- name: a.struct_column_being_tested
data_type: string
- name: a.b
data_type: string
"""

my_model_double_struct_wrong_data_type_sql = """
{{ config(materialized = "table") }}
select
STRUCT(
STRUCT(1 AS struct_column_being_tested, "test" AS c) as b,
"test" as d
) as a
"""

my_model_double_struct_correct_data_type_sql = """
{{ config(materialized = "table") }}
select
STRUCT(
STRUCT("test" AS struct_column_being_tested, "test" AS c) as b,
"test" as d
) as a
"""

model_double_struct_data_type_schema_yml = """
version: 2
models:
- name: contract_struct_wrong
config:
contract:
enforced: true
columns:
- name: a.b.struct_column_being_tested
data_type: string
- name: a.b.c
data_type: string
- name: a.d
data_type: string
- name: contract_struct_correct
config:
contract:
enforced: true
columns:
- name: a.b.struct_column_being_tested
data_type: string
- name: a.b.c
data_type: string
- name: a.d
data_type: string
"""


my_model_struct_sql = """
{{
config(
materialized = "table"
)
}}
select STRUCT("test" as nested_column, "test" as nested_column2) as id
"""


model_struct_schema_yml = """
version: 2
models:
- name: my_model
config:
contract:
enforced: true
columns:
- name: id.nested_column
quote: true
data_type: string
description: hello
constraints:
- type: not_null
- type: unique
- name: id.nested_column2
data_type: string
constraints:
- type: unique
"""
Loading

0 comments on commit 5d821c4

Please sign in to comment.