Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply "Initial refactoring of incremental materialization" #148

Merged
merged 6 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.3.0 (Release TBD)

### Under the hood
- Apply "Initial refactoring of incremental materialization" ([#148](https://github.com/databricks/dbt-databricks/pull/148))
- Now dbt-databricks uses `adapter.get_incremental_strategy_macro` instead of `dbt_spark_get_incremental_sql` macro to dispatch the incremental strategy macro. The overwritten `dbt_spark_get_incremental_sql` macro will not work anymore.

## dbt-databricks 1.2.1 (August 24, 2022)

### Features
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def _get_columns_for_catalog(self, relation: DatabricksRelation) -> Iterable[Dic
as_dict["column_type"] = as_dict.pop("dtype")
yield as_dict

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite"]

@contextmanager
def _catalog(self, catalog: Optional[str]) -> Iterator[None]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}
Expand All @@ -17,9 +17,8 @@

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'insert_overwrite' and partition_by %}
{% if incremental_strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
Expand All @@ -37,9 +36,12 @@
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% set temp_relation = make_temp_relation(this) %}
{% do run_query(create_table_as(True, temp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{% endif %}

{%- call statement('main') -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro databricks__get_incremental_default_sql(arg_dict) %}
{{ return(get_incremental_merge_sql(arg_dict)) }}
{% endmacro %}

{% macro databricks__get_incremental_append_sql(arg_dict) %}
{% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %}
{% endmacro %}

{% macro databricks__get_incremental_merge_sql(arg_dict) %}
{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], dest_columns=none, predicates=none)) %}
{% endmacro %}

{% macro databricks__get_incremental_insert_overwrite_sql(arg_dict) %}
{% do return(get_insert_overwrite_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %}
{% endmacro %}