-
Notifications
You must be signed in to change notification settings - Fork 189
/
Copy pathadapters.sql
332 lines (264 loc) · 11.3 KB
/
adapters.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
{% macro get_column_comment_sql(column_name, column_dict) -%}
{% if (column_name|upper in column_dict) -%}
{% set matched_column = column_name|upper -%}
{% elif (column_name|lower in column_dict) -%}
{% set matched_column = column_name|lower -%}
{% elif (column_name in column_dict) -%}
{% set matched_column = column_name -%}
{% else -%}
{% set matched_column = None -%}
{% endif -%}
{% if matched_column -%}
{{ adapter.quote(column_name) }} COMMENT $${{ column_dict[matched_column]['description'] | replace('$', '[$]') }}$$
{%- else -%}
{{ adapter.quote(column_name) }} COMMENT $$$$
{%- endif -%}
{% endmacro %}
{% macro get_persist_docs_column_list(model_columns, query_columns) %}
(
{% for column_name in query_columns %}
{{ get_column_comment_sql(column_name, model_columns) }}
{{- ", " if not loop.last else "" }}
{% endfor %}
)
{% endmacro %}
{% macro snowflake__get_columns_in_relation(relation) -%}
{%- set sql -%}
describe table {{ relation.render() }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{% set maximum = 10000 %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many columns in relation {{ relation.render() }}! dbt can only get
information about relations with fewer than {{ maximum }} columns.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{% set columns = [] %}
{% for row in result %}
{% do columns.append(api.Column.from_description(row['name'], row['type'])) %}
{% endfor %}
{% do return(columns) %}
{% endmacro %}
{% macro snowflake__show_object_metadata(relation) %}
{%- set sql -%}
show objects in {{ relation.include(identifier=False) }} starts with '{{ relation.identifier }}' limit 1
{%- endset -%}
{%- set result = run_query(sql) -%}
{{ return(result) }}
{% endmacro %}
{% macro snowflake__list_schemas(database) -%}
{# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #}
{% set maximum = 10000 %}
{% set sql -%}
show terse schemas in database {{ database }}
limit {{ maximum }}
{%- endset %}
{% set result = run_query(sql) %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many schemas in database {{ database }}! dbt can only get
information about databases with fewer than {{ maximum }} schemas.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{{ return(result) }}
{% endmacro %}
{% macro snowflake__get_paginated_relations_array(max_iter, max_results_per_iter, max_total_results, schema_relation, watermark) %}
{% set paginated_relations = [] %}
{% for _ in range(0, max_iter) %}
{% if schema_relation is string %}
{%- set paginated_sql -%}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
{%- endset -%}
{% else %}
{%- set paginated_sql -%}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
{%- endset -%}
{% endif -%}
{%- set paginated_result = run_query(paginated_sql) %}
{%- set paginated_n = (paginated_result | length) -%}
{#
terminating condition: if there are 0 records in the result we reached
the end exactly on the previous iteration
#}
{%- if paginated_n == 0 -%}
{%- break -%}
{%- endif -%}
{#
terminating condition: At some point the user needs to be reasonable with how
many objects are contained in their schemas. Since there was already
one iteration before attempting pagination, loop.index == max_iter means
the limit has been surpassed.
#}
{%- if loop.index == max_iter -%}
{%- set msg -%}
dbt is currently configured to list a maximum of {{ max_total_results }} objects per schema.
{{ schema_relation }} exceeds this limit. If this is expected, you may configure this limit
by setting list_relations_per_page and list_relations_page_limit in your project flags.
It is recommended to start by increasing list_relations_page_limit to something more than the default of 10.
{%- endset -%}
{% do exceptions.raise_compiler_error(msg) %}
{%- endif -%}
{%- do paginated_relations.append(paginated_result) -%}
{% set watermark.table_name = paginated_result.columns[1].values()[-1] %}
{#
terminating condition: paginated_n < max_results_per_iter means we reached the end
#}
{%- if paginated_n < max_results_per_iter -%}
{%- break -%}
{%- endif -%}
{%- endfor -%}
{{ return(paginated_relations) }}
{% endmacro %}
{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}
{%- set max_results_per_iter = adapter.config.flags.get('list_relations_per_page', max_results_per_iter) -%}
{%- set max_iter = adapter.config.flags.get('list_relations_page_limit', max_iter) -%}
{%- set max_total_results = max_results_per_iter * max_iter -%}
{%- set sql -%}
{% if schema_relation is string %}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }};
{% else %}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }};
{% endif -%}
{# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the
-- latency penalty. #}
{% if adapter.behavior.enable_iceberg_materializations.no_warn %}
select all_objects.*, is_iceberg
from table(result_scan(last_query_id(-1))) all_objects
left join INFORMATION_SCHEMA.tables as all_tables
on all_tables.table_name = all_objects."name"
and all_tables.table_schema = all_objects."schema_name"
and all_tables.table_catalog = all_objects."database_name"
{% endif -%}
{%- endset -%}
{%- set result = run_query(sql) -%}
{%- set n = (result | length) -%}
{%- set watermark = namespace(table_name=result.columns[1].values()[-1]) -%}
{%- set paginated = namespace(result=[]) -%}
{% if n >= max_results_per_iter %}
{% set paginated.result = snowflake__get_paginated_relations_array(
max_iter,
max_results_per_iter,
max_total_results,
schema_relation,
watermark
)
%}
{% endif %}
{%- set all_results_array = [result] + paginated.result -%}
{%- set result = result.merge(all_results_array) -%}
{%- do return(result) -%}
{% endmacro %}
{% macro snowflake__check_schema_exists(information_schema, schema) -%}
{% call statement('check_schema_exists', fetch_result=True) -%}
select count(*)
from {{ information_schema }}.schemata
where upper(schema_name) = upper('{{ schema }}')
and upper(catalog_name) = upper('{{ information_schema.database }}')
{%- endcall %}
{{ return(load_result('check_schema_exists').table) }}
{%- endmacro %}
{% macro snowflake__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter {{ relation.get_ddl_prefix_for_alter() }} table {{ relation.render() }} alter {{ adapter.quote(column_name) }} set data type {{ new_column_type }};
{% endcall %}
{% endmacro %}
{% macro snowflake__alter_relation_comment(relation, relation_comment) -%}
{%- if relation.is_dynamic_table -%}
{%- set relation_type = 'dynamic table' -%}
{%- else -%}
{%- set relation_type = relation.type -%}
{%- endif -%}
comment on {{ relation_type }} {{ relation.render() }} IS $${{ relation_comment | replace('$', '[$]') }}$$;
{% endmacro %}
{% macro snowflake__alter_column_comment(relation, column_dict) -%}
{% set existing_columns = adapter.get_columns_in_relation(relation) | map(attribute="name") | list %}
{% if relation.is_dynamic_table -%}
{% set relation_type = "table" %}
{% else -%}
{% set relation_type = relation.type %}
{% endif %}
alter {{ relation.get_ddl_prefix_for_alter() }} {{ relation_type }} {{ relation.render() }} alter
{% for column_name in existing_columns if (column_name in existing_columns) or (column_name|lower in existing_columns) %}
{{ get_column_comment_sql(column_name, column_dict) }} {{- ',' if not loop.last else ';' }}
{% endfor %}
{% endmacro %}
{% macro get_current_query_tag() -%}
{{ return(run_query("show parameters like 'query_tag' in session").rows[0]['value']) }}
{% endmacro %}
{% macro set_query_tag() -%}
{{ return(adapter.dispatch('set_query_tag', 'dbt')()) }}
{% endmacro %}
{% macro snowflake__set_query_tag() -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
{% macro unset_query_tag(original_query_tag) -%}
{{ return(adapter.dispatch('unset_query_tag', 'dbt')(original_query_tag)) }}
{% endmacro %}
{% macro snowflake__unset_query_tag(original_query_tag) -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% if original_query_tag %}
{{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }}
{% do run_query("alter session set query_tag = '{}'".format(original_query_tag)) %}
{% else %}
{{ log("No original query_tag, unsetting parameter.") }}
{% do run_query("alter session unset query_tag") %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro snowflake__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if relation.is_dynamic_table -%}
{% set relation_type = "dynamic table" %}
{% else -%}
{% set relation_type = relation.type %}
{% endif %}
{% if add_columns %}
{% set sql -%}
alter {{ relation.get_ddl_prefix_for_alter() }} {{ relation_type }} {{ relation.render() }} add column
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% if remove_columns %}
{% set sql -%}
alter {{ relation.get_ddl_prefix_for_alter() }} {{ relation_type }} {{ relation.render() }} drop column
{% for column in remove_columns %}
{{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}
{% macro snowflake_dml_explicit_transaction(dml) %}
{#
Use this macro to wrap all INSERT, MERGE, UPDATE, DELETE, and TRUNCATE
statements before passing them into run_query(), or calling in the 'main' statement
of a materialization
#}
{% set dml_transaction -%}
begin;
{{ dml }};
commit;
{%- endset %}
{% do return(dml_transaction) %}
{% endmacro %}
{% macro snowflake__truncate_relation(relation) -%}
{% set truncate_dml %}
truncate table {{ relation.render() }}
{% endset %}
{% call statement('truncate_relation') -%}
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}