Skip to content

Commit

Permalink
Merge pull request #44 from dbt-labs/feature/databricks--insert-by-pe…
Browse files Browse the repository at this point in the history
…riod

Feature/databricks  insert by period
  • Loading branch information
matt-winkler authored Apr 30, 2024
2 parents c845674 + 6c9880d commit 29cea5f
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 9 deletions.
4 changes: 4 additions & 0 deletions insert_by_period/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ require-dbt-version: [">=1.3.0", "<2.0.0"]
# This setting configures which "profile" dbt uses for this project.
profile: 'insert_by_period'

dispatch:
- macro_namespace: dbt_utils
search_order: ['spark_utils', 'dbt_utils']

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
Expand Down
9 changes: 9 additions & 0 deletions insert_by_period/integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ integration_tests:
warehouse: "{{ env_var('SNOWFLAKE_TEST_WAREHOUSE') }}"
schema: dbt_utils_integration_tests_snowflake
threads: 10

databricks:
type: databricks
catalog: "{{ env_var('DATABRICKS_TEST_CATALOG') }}"
host: "{{ env_var('DATABRICKS_TEST_HOST') }}"
http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}"
token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}"
schema: dbt_utils_integration_tests_databricks
threads: 4
10 changes: 8 additions & 2 deletions insert_by_period/integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
name: 'insert_by_period_integration_tests'
version: '1.0'

profile: 'integration_tests'

# require-dbt-version: inherit this from dbt-utils

config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'integration_tests'

dispatch:
- macro_namespace: dbt_utils
search_order: ['spark_utils', 'dbt_utils', 'insert_by_period_integration_tests']


model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
Expand Down
4 changes: 3 additions & 1 deletion insert_by_period/integration_tests/packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
packages:
- local: ../
- package: dbt-labs/dbt_utils
version: [">0.9.0", "<2.0.0"]
version: [">0.9.0", "<2.0.0"]
- package: dbt-labs/spark_utils
version: 0.3.0
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@
{% do return (api.Relation.create(identifier=tmp_identifier,
schema=None, type=type)) %}
{%- endmacro %}

{% macro databricks__create_relation_for_insert_by_period(tmp_identifier, schema, type) -%}
{% do return (api.Relation.create(identifier=tmp_identifier,
schema=None, type=type)) %}
{%- endmacro %}
25 changes: 25 additions & 0 deletions insert_by_period/macros/get_rows_inserted.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro get_rows_inserted(result) -%}
{{ return(adapter.dispatch('get_rows_inserted', 'insert_by_period')(result)) }}
{% endmacro %}

{% macro default__get_rows_inserted(result) %}

{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{% set rows_inserted = result['response']['rows_affected'] %}
{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{{return(rows_inserted)}}

{% endmacro %}

{% macro databricks__get_rows_inserted(result) %}

{% if 'data' in result.keys() %}
{% set rows_inserted = result['data'][0][0] | int %}
{% endif %}

{{return(rows_inserted)}}

{% endmacro %}
9 changes: 3 additions & 6 deletions insert_by_period/macros/insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,13 @@
);
{%- endcall %}
{% set result = load_result('main-' ~ i) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{% set rows_inserted = result['response']['rows_affected'] %}
{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{% set rows_inserted = insert_by_period.get_rows_inserted(result) %}

{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%}

{%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%}
{%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " record(s) inserted" -%}
{{ print(msg) }}

{%- endfor %}
Expand Down

0 comments on commit 29cea5f

Please sign in to comment.