Skip to content

Commit

Permalink
Merge pull request #28 from ScalefreeCOM/ma-sat_fixes
Browse files Browse the repository at this point in the history
Fixing Multi Active Hash Calculation and loading mechanism
  • Loading branch information
tkirschke authored Jan 24, 2025
2 parents bccf44c + ef51365 commit fcd9060
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 12 deletions.
6 changes: 3 additions & 3 deletions macros/macro-1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ macroString: |-
"hash": "MD5",
"hash_datatype": "STRING",
"hash_input_case_sensitive": "TRUE",
"hash_passthrough_input_transformations": "TRUE",
"hash_passthrough_input_transformations": "FALSE",
"beginning_of_all_times": "0001-01-01T00:00:01",
"end_of_all_times": "8888-12-31T23:59:59",
"timestamp_format": "YYYY-MM-DDTHH24:MI:SS",
Expand Down Expand Up @@ -406,7 +406,7 @@ macroString: |-
{#-- hashing calculation of multi-active --#}
{#-- satellite's hash diff attribute. --#}
{%- if is_hashdiff and multi_active_key is defined and multi_active_key|length>0 -%}
{%- set multi_active_key = multi_active_key|join(", ") -%}
{%- set multi_active_key = multi_active_key|join('", "') -%}
{%- set listagg_closing = ' WITHIN GROUP (ORDER BY "{}") OVER (PARTITION BY "{}", "{}"))'.format(multi_active_key, main_hashkey_col, ldts_alias) -%}
{%- endif -%}
Expand Down Expand Up @@ -498,7 +498,7 @@ macroString: |-
{{ standardise_prefix }}
{%- for column in columns -%}
{%- set ns.all_null = ns.all_null + [null_placeholder_string] -%}
{%- if datavault4coalesce.config.hash_passthrough_input_transformations -%}
{%- if datavault4coalesce.config.hash_passthrough_input_transformations == TRUE -%}
{%- set column_str = datavault4coalesce__as_constant(get_source_transform(column)) -%}
{%- else -%}
{%- if '.' in column.name %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,24 @@

INSERT INTO {{ ref_no_link(node.location.name, node.name) }}

WITH latest_entries_in_sat AS (
WITH

source_data AS (

{% for source in sources %}

SELECT
{% for col in source.columns %}
{{ get_source_transform(col) }} AS "{{ col.name }}",
{% endfor %}

{{ source.join }}

{% endfor %}

),

latest_entries_in_sat AS (
/* get current rows from satellite */

{# /* USE THIS again when ColumnDropdownSelector is used
Expand All @@ -33,7 +50,7 @@ deduplicated_numbered_source AS (

SELECT
{% for col in source.columns %}
{{ get_source_transform(col) }} AS {{ col.name }},
{{ get_source_transform(col) }} AS "{{ col.name }}",
{% endfor %}
ROW_NUMBER() OVER(PARTITION BY "{{ hashkey_column }}" ORDER BY "{{ datavault4coalesce.config.ldts_alias }}") as rn

Expand Down Expand Up @@ -62,17 +79,21 @@ deduplicated_numbered_source AS (
{% for source in sources %}
SELECT DISTINCT
{% for col in source.columns %}
{{ col.name }}
src."{{ col.name }}"
{%- if not loop.last -%}, {% endif %}
{% endfor %}

FROM deduplicated_numbered_source
FROM source_data src
INNER JOIN deduplicated_numbered_source dedupe
ON src."{{ hashkey_column }}" = dedupe."{{ hashkey_column }}"
AND src."{{ hashdiff_column }}" = dedupe."{{ hashdiff_column }}"
AND src."{{ datavault4coalesce.config.ldts_alias }}" = dedupe."{{ datavault4coalesce.config.ldts_alias }}"
WHERE NOT EXISTS (
SELECT 1 FROM latest_entries_in_sat
WHERE
deduplicated_numbered_source.{{ hashdiff_column }} = latest_entries_in_sat."{{ hashdiff_column }}"
AND deduplicated_numbered_source.{{ hashkey_column }} = latest_entries_in_sat."{{ hashkey_column }}"
AND deduplicated_numbered_source.rn = 1
dedupe."{{ hashdiff_column }}" = latest_entries_in_sat."{{ hashdiff_column }}"
AND dedupe."{{ hashkey_column }}" = latest_entries_in_sat."{{ hashkey_column }}"
AND dedupe.rn = 1
)

{% endfor %}
Expand Down
33 changes: 31 additions & 2 deletions nodeTypes/DatavaultbyScalefreeStage-4/create.sql.j2
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,31 @@
)
{%- if node.description | length > 0 %} COMMENT = '{{ node.description }}'{% endif %}
AS

WITH

{# All columns that are not hash columns #}

regular_columns AS (

{% for source in sources %}
SELECT
{% for col in source.columns %}
{% if not col.hashDetails %}
{{ get_source_transform(col) }} AS "{{ col.name }}"
{%- if not loop.last -%}, {% endif %}
{% endif %}
{% endfor %}

{{ source.join }}

{% endfor %}

),

all_columns AS (


{% for source in sources %}
{#-------------------- Process multi-activity config --------------------#}
{#----------------------- FIXME explain variables -----------------------#}
Expand All @@ -42,12 +67,12 @@
{{ datavault4coalesce__hash(columns=col.hashDetails.columns, is_hashdiff=col.is_hd,datatype=col.dataType, algo=datavault4coalesce.config.hash, multi_active_key=multi_active_key, main_hashkey_col=main_hashkey_col.hk_col) }} AS "{{ col.name }}"
{#- Print other table columns -#}
{% else %}
{{ get_source_transform(col) }} AS "{{ col.name }}"
"{{ col.name }}"
{% endif %}
{%- if not loop.last -%}, {% endif %}
{% endfor %}

{{ source.join }}
FROM regular_columns

{% if not loop.last %}
{% if config.insertStrategy in ['UNION', 'UNION ALL'] %}
Expand Down Expand Up @@ -93,4 +118,8 @@

{% endfor %}

)

SELECT * FROM all_columns

{% endif %}

0 comments on commit fcd9060

Please sign in to comment.