Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recognize when loaded_at_field is explicitly set to null #10065

Merged
merged 9 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240429-114610.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Support overriding source level loaded_at_field with a null table level definition
time: 2024-04-29T11:46:10.100373-05:00
custom:
Author: emmyoop
Issue: "9320"
15 changes: 15 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@
class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand All @@ -293,10 +294,22 @@
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)

@classmethod
def validate(cls, data):
super(UnparsedSourceDefinition, cls).validate(data)

if data.get("loaded_at_field", None) == "":
raise ValidationError("loaded_at_field cannot be an empty string.")

Check warning on line 307 in core/dbt/contracts/graph/unparsed.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/contracts/graph/unparsed.py#L307

Added line #L307 was not covered by tests
if "tables" in data:
for table in data["tables"]:
if table.get("loaded_at_field", None) == "":
raise ValidationError("loaded_at_field cannot be an empty string.")

@property
def yaml_key(self) -> "str":
return "sources"
Expand All @@ -316,6 +329,7 @@
data_type: Optional[str] = None
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand Down Expand Up @@ -358,6 +372,7 @@
quoting: Optional[Quoting] = None
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None

Expand Down
17 changes: 15 additions & 2 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,24 @@
# ===============================================================================


def yaml_from_file(source_file: SchemaSourceFile) -> Dict[str, Any]:
def yaml_from_file(source_file: SchemaSourceFile) -> Optional[Dict[str, Any]]:
"""If loading the yaml fails, raise an exception."""
try:
# source_file.contents can sometimes be None
return load_yaml_text(source_file.contents or "", source_file.path)
contents = load_yaml_text(source_file.contents or "", source_file.path)

if contents is None:
return contents

# When loaded_loaded_at_field is defined as None or null, it shows up in
# the dict but when it is not defined, it does not show up in the dict
# We need to capture this to be able to override source level settings later.
for source in contents.get("sources", []):
for table in source.get("tables", []):
if "loaded_at_field" in table:
table["loaded_at_field_present"] = True

return contents
except DbtValidationError as e:
raise YamlLoadError(
project_name=source_file.project_name, path=source_file.path.relative_path, exc=e
Expand Down
9 changes: 8 additions & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
unique_id = target.unique_id
description = table.description or ""
source_description = source.description or ""
loaded_at_field = table.loaded_at_field or source.loaded_at_field

# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
# specific table can default to metadata-based freshness.
if table.loaded_at_field_present or table.loaded_at_field is not None:
loaded_at_field = table.loaded_at_field
else:
loaded_at_field = source.loaded_at_field # may be None, that's okay

freshness = merge_freshness(source.freshness, table.freshness)
quoting = source.quoting.merged(table.quoting)
Expand Down
136 changes: 136 additions & 0 deletions tests/functional/sources/test_source_loaded_at_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import pytest
from dbt.tests.util import run_dbt, get_manifest, write_file
from dbt.exceptions import YamlParseDictError


loaded_at_field_null_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: null
"""

loaded_at_field_blank_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: null
"""

loaded_at_field_missing_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
"""

loaded_at_field_defined_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: updated_at_another_place
"""

loaded_at_field_empty_string_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: ""
"""


class TestParsingLoadedAtField:
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": loaded_at_field_null_schema_yml}

def test_loaded_at_field(self, project):
# test setting loaded_at_field to null explicitly at table level
run_dbt(["parse"])
manifest = get_manifest(project.project_root)

assert "source.test.test_source.table1" in manifest.sources
assert manifest.sources.get("source.test.test_source.table1").loaded_at_field is None

# test setting loaded_at_field at source level, do not set at table level
# end up with source level loaded_at_field
write_file(
loaded_at_field_missing_schema_yml, project.project_root, "models", "schema.yml"
)
run_dbt(["parse"])
manifest = get_manifest(project.project_root)
assert "source.test.test_source.table1" in manifest.sources
assert (
manifest.sources.get("source.test.test_source.table1").loaded_at_field == "updated_at"
)

# test setting loaded_at_field to nothing, should override Source value for None
write_file(loaded_at_field_blank_schema_yml, project.project_root, "models", "schema.yml")
run_dbt(["parse"])
manifest = get_manifest(project.project_root)

assert "source.test.test_source.table1" in manifest.sources
assert manifest.sources.get("source.test.test_source.table1").loaded_at_field is None

# test setting loaded_at_field at table level to a value - it should override source level
write_file(
loaded_at_field_defined_schema_yml, project.project_root, "models", "schema.yml"
)
run_dbt(["parse"])
manifest = get_manifest(project.project_root)
assert "source.test.test_source.table1" in manifest.sources
assert (
manifest.sources.get("source.test.test_source.table1").loaded_at_field
== "updated_at_another_place"
)

# test setting loaded_at_field at table level to an empty string - should error
write_file(
loaded_at_field_empty_string_schema_yml, project.project_root, "models", "schema.yml"
)
with pytest.raises(YamlParseDictError):
run_dbt(["parse"])