From cc8541c05fa5df4bf2c5f0e776c8368198af070c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 12 Sep 2024 18:16:04 -0400 Subject: [PATCH] Microbatch: event_time ref + source filtering (#10594) --- .../unreleased/Features-20240911-121029.yaml | 6 + core/dbt/artifacts/resources/types.py | 7 + core/dbt/artifacts/resources/v1/config.py | 3 + .../resources/v1/source_definition.py | 1 + core/dbt/cli/main.py | 4 + core/dbt/cli/params.py | 16 + core/dbt/context/providers.py | 120 ++++++- core/setup.py | 2 +- schemas/dbt/manifest/v12.json | 168 +++++++++ .../functional/artifacts/expected_manifest.py | 11 + tests/functional/list/test_list.py | 28 ++ .../functional/microbatch/test_microbatch.py | 332 ++++++++++++++++++ tests/unit/context/test_providers.py | 206 ++++++++++- tests/unit/contracts/graph/test_nodes.py | 1 + .../unit/contracts/graph/test_nodes_parsed.py | 13 + 15 files changed, 911 insertions(+), 7 deletions(-) create mode 100644 .changes/unreleased/Features-20240911-121029.yaml create mode 100644 tests/functional/microbatch/test_microbatch.py diff --git a/.changes/unreleased/Features-20240911-121029.yaml b/.changes/unreleased/Features-20240911-121029.yaml new file mode 100644 index 00000000000..365faf7fadd --- /dev/null +++ b/.changes/unreleased/Features-20240911-121029.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add basic functionality for creating microbatch incremental models +time: 2024-09-11T12:10:29.822189-05:00 +custom: + Author: MichelleArk QMalcolm + Issue: 9490 10635 10637 10638 10636 10662 10639 diff --git a/core/dbt/artifacts/resources/types.py b/core/dbt/artifacts/resources/types.py index c0ab5341e4c..bac25bd2e0e 100644 --- a/core/dbt/artifacts/resources/types.py +++ b/core/dbt/artifacts/resources/types.py @@ -68,3 +68,10 @@ class TimePeriod(StrEnum): def plural(self) -> str: return str(self) + "s" + + +class BatchSize(StrEnum): + hour = "hour" + day = "day" + month = "month" + year = "year" diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index fc9be8f7c70..e6cd26ec823 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -80,6 +80,8 @@ class NodeConfig(NodeAndTestConfig): # 'mergebehavior' dictionary materialized: str = "view" incremental_strategy: Optional[str] = None + batch_size: Any = None + lookback: Any = 0 persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, @@ -122,6 +124,7 @@ class NodeConfig(NodeAndTestConfig): default_factory=ContractConfig, metadata=MergeBehavior.Update.meta(), ) + event_time: Any = None def __post_init__(self): # we validate that node_color has a suitable value to prevent dbt-docs from crashing diff --git a/core/dbt/artifacts/resources/v1/source_definition.py b/core/dbt/artifacts/resources/v1/source_definition.py index ac0fcfca1b2..6c1c3679a00 100644 --- a/core/dbt/artifacts/resources/v1/source_definition.py +++ b/core/dbt/artifacts/resources/v1/source_definition.py @@ -19,6 +19,7 @@ @dataclass class SourceConfig(BaseConfig): enabled: bool = True + event_time: Any = None @dataclass diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 9bb6e04c69e..e042888ef4b 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -165,6 +165,8 @@ def cli(ctx, **kwargs): @click.pass_context @global_flags @p.empty +@p.event_time_start +@p.event_time_end @p.exclude @p.export_saved_queries @p.full_refresh @@ -537,6 +539,8 @@ def parse(ctx, **kwargs): @p.profiles_dir @p.project_dir @p.empty +@p.event_time_start +@p.event_time_end @p.select @p.selector @p.target_path diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index eb8e5594f76..425009d76ee 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -91,6 +91,22 @@ is_flag=True, ) +event_time_end = click.option( + "--event-time-end", + envvar="DBT_EVENT_TIME_END", + help="If specified, the end datetime dbt uses to filter microbatch model inputs (exclusive).", + type=click.DateTime(), + default=None, +) + +event_time_start = click.option( + "--event-time-start", + envvar="DBT_EVENT_TIME_START", + help="If specified, the start datetime dbt uses to filter microbatch model inputs (inclusive).", + type=click.DateTime(), + default=None, +) + exclude = click.option( "--exclude", envvar=None, diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index d9010a9f4f9..05fecd8423d 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1,6 +1,7 @@ import abc import os from copy import deepcopy +from datetime import datetime, timedelta from typing import ( TYPE_CHECKING, Any, @@ -16,10 +17,12 @@ Union, ) +import pytz from typing_extensions import Protocol from dbt import selected_resources from dbt.adapters.base.column import Column +from dbt.adapters.base.relation import EventTimeFilter from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.exceptions import MissingConfigError from dbt.adapters.factory import ( @@ -27,7 +30,8 @@ get_adapter_package_names, get_adapter_type_names, ) -from dbt.artifacts.resources import NodeVersion, RefArgs +from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig +from dbt.artifacts.resources.types import BatchSize from dbt.clients.jinja import ( MacroGenerator, MacroStack, @@ -230,6 +234,95 @@ def Relation(self): def resolve_limit(self) -> Optional[int]: return 0 if getattr(self.config.args, "EMPTY", False) else None + def _build_end_time(self) -> Optional[datetime]: + return datetime.now(tz=pytz.utc) + + def _build_start_time( + self, checkpoint: Optional[datetime], is_incremental: bool + ) -> Optional[datetime]: + if not is_incremental or checkpoint is None: + return None + + assert isinstance(self.model.config, NodeConfig) + batch_size = self.model.config.batch_size + if batch_size is None: + raise DbtRuntimeError(f"The model `{self.model.name}` requires a `batch_size`") + + lookback = self.model.config.lookback + if batch_size == BatchSize.hour: + start = datetime( + checkpoint.year, + checkpoint.month, + checkpoint.day, + checkpoint.hour, + 0, + 0, + 0, + pytz.utc, + ) - timedelta(hours=lookback) + elif batch_size == BatchSize.day: + start = datetime( + checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc + ) - timedelta(days=lookback) + elif batch_size == BatchSize.month: + start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc) + for _ in range(lookback): + start = start - timedelta(days=1) + start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc) + elif batch_size == BatchSize.year: + start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc) + else: + raise DbtInternalError( + f"Batch size `{batch_size}` is not handled during batch calculation" + ) + + return start + + def _is_incremental(self) -> bool: + # TODO: Remove. This is a temporary method. We're working with adapters on + # a strategy to ensure we can access the `is_incremental` logic without drift + relation_info = self.Relation.create_from(self.config, self.model) + relation = self.db_wrapper.get_relation( + relation_info.database, relation_info.schema, relation_info.name + ) + return ( + relation is not None + and relation.type == "table" + and self.model.config.materialized == "incremental" + and not ( + getattr(self.config.args, "FULL_REFRESH", False) or self.model.config.full_refresh + ) + ) + + def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: + event_time_filter = None + if ( + os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig)) + and target.config.event_time + and self.model.config.materialized == "incremental" + and self.model.config.incremental_strategy == "microbatch" + ): + is_incremental = self._is_incremental() + end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None) + end = end.replace(tzinfo=pytz.UTC) if end else self._build_end_time() + + start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None) + start = ( + start.replace(tzinfo=pytz.UTC) + if start + else self._build_start_time(checkpoint=end, is_incremental=is_incremental) + ) + + if start is not None or end is not None: + event_time_filter = EventTimeFilter( + field_name=target.config.event_time, + start=start, + end=end, + ) + + return event_time_filter + @abc.abstractmethod def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]: pass @@ -545,7 +638,11 @@ def resolve( def create_relation(self, target_model: ManifestNode) -> RelationProxy: if target_model.is_ephemeral_model: self.model.set_cte(target_model.unique_id, None) - return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit) + return self.Relation.create_ephemeral_from( + target_model, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_model), + ) elif ( hasattr(target_model, "defer_relation") and target_model.defer_relation @@ -563,10 +660,18 @@ def create_relation(self, target_model: ManifestNode) -> RelationProxy: ) ): return self.Relation.create_from( - self.config, target_model.defer_relation, limit=self.resolve_limit + self.config, + target_model.defer_relation, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_model), ) else: - return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit) + return self.Relation.create_from( + self.config, + target_model, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_model), + ) def validate( self, @@ -633,7 +738,12 @@ def resolve(self, source_name: str, table_name: str): target_kind="source", disabled=(isinstance(target_source, Disabled)), ) - return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit) + return self.Relation.create_from( + self.config, + target_source, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_source), + ) class RuntimeUnitTestSourceResolver(BaseSourceResolver): diff --git a/core/setup.py b/core/setup.py index 904b23afeb6..c254062486b 100644 --- a/core/setup.py +++ b/core/setup.py @@ -72,7 +72,7 @@ "dbt-semantic-interfaces>=0.7.0,<0.8", # Minor versions for these are expected to be backwards-compatible "dbt-common>=1.6.0,<2.0", - "dbt-adapters>=1.3.0,<2.0", + "dbt-adapters>=1.6.0,<2.0", # ---- # Expect compatibility with all new versions of these packages, so lower bounds only. "packaging>20.9", diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index dfa5744ce70..3ff25374066 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -259,6 +259,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -434,6 +440,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "delimiter": { "type": "string", "default": "," @@ -909,6 +918,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -1083,6 +1098,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -1283,6 +1301,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -1457,6 +1481,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -2704,6 +2731,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -2878,6 +2911,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -3488,6 +3524,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -3663,6 +3705,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "access": { "enum": [ "private", @@ -4399,6 +4444,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -4573,6 +4624,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -4800,6 +4854,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -4974,6 +5034,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -6282,6 +6345,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -6451,6 +6520,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "strategy": { "anyOf": [ { @@ -7117,6 +7189,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -7291,6 +7369,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -7873,6 +7954,9 @@ "enabled": { "type": "boolean", "default": true + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -9874,6 +9958,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -10049,6 +10139,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "delimiter": { "type": "string", "default": "," @@ -10524,6 +10617,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -10698,6 +10797,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -10898,6 +11000,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -11072,6 +11180,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -12319,6 +12430,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -12493,6 +12610,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -13103,6 +13223,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -13278,6 +13404,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "access": { "enum": [ "private", @@ -14014,6 +14143,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -14188,6 +14323,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -14415,6 +14553,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -14589,6 +14733,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -15897,6 +16044,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -16066,6 +16219,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "strategy": { "anyOf": [ { @@ -16732,6 +16888,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -16906,6 +17068,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -17479,6 +17644,9 @@ "enabled": { "type": "boolean", "default": true + }, + "event_time": { + "default": null } }, "additionalProperties": true diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index 7098a4f4fea..a50efab9434 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -39,6 +39,9 @@ def get_rendered_model_config(**updates): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, } result.update(updates) return result @@ -74,6 +77,9 @@ def get_rendered_seed_config(**updates): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "lookback": 0, + "batch_size": None, } result.update(updates) return result @@ -114,6 +120,9 @@ def get_rendered_snapshot_config(**updates): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "lookback": 0, + "batch_size": None, } result.update(updates) return result @@ -752,6 +761,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): }, "config": { "enabled": True, + "event_time": None, }, "quoting": { "database": None, @@ -1254,6 +1264,7 @@ def expected_references_manifest(project): }, "config": { "enabled": True, + "event_time": None, }, "quoting": { "database": False, diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index 145b4e58cb9..de20970038a 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -79,6 +79,9 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811 "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "lookback": 0, + "batch_size": None, }, "unique_id": "snapshot.test.my_snapshot", "original_file_path": normalize("snapshots/snapshot.sql"), @@ -121,6 +124,9 @@ def expect_analyses_output(self): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "lookback": 0, + "batch_size": None, }, "unique_id": "analysis.test.a", "original_file_path": normalize("analyses/a.sql"), @@ -182,6 +188,9 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/ephemeral.sql"), "unique_id": "model.test.ephemeral", @@ -220,6 +229,9 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/incremental.sql"), "unique_id": "model.test.incremental", @@ -258,6 +270,9 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/sub/inner.sql"), "unique_id": "model.test.inner", @@ -296,6 +311,9 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/metricflow_time_spine.sql"), "unique_id": "model.test.metricflow_time_spine", @@ -334,6 +352,9 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/metricflow_time_spine_second.sql"), "unique_id": "model.test.metricflow_time_spine_second", @@ -372,6 +393,9 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/outer.sql"), "unique_id": "model.test.outer", @@ -440,6 +464,7 @@ def expect_source_output(self): "json": { "config": { "enabled": True, + "event_time": None, }, "unique_id": "source.test.my_source.my_table", "original_file_path": normalize("models/schema.yml"), @@ -490,6 +515,9 @@ def expect_seed_output(self): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "lookback": 0, + "batch_size": None, }, "depends_on": {"macros": []}, "unique_id": "seed.test.seed", diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py new file mode 100644 index 00000000000..8bdb5abab2c --- /dev/null +++ b/tests/functional/microbatch/test_microbatch.py @@ -0,0 +1,332 @@ +import os +from unittest import mock + +import pytest +from freezegun import freeze_time + +from dbt.tests.util import relation_from_name, run_dbt, write_file + +input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +input_model_without_event_time_sql = """ +{{ config(materialized='table') }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + +microbatch_model_ref_render_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model').render() }} +""" + +seed_csv = """id,event_time +1,'2020-01-01 00:00:00-0' +2,'2020-01-02 00:00:00-0' +3,'2020-01-03 00:00:00-0' +""" + +seeds_yaml = """ +seeds: + - name: raw_source + config: + column_types: + event_time: TIMESTAMP +""" + +sources_yaml = """ +sources: + - name: seed_sources + schema: "{{ target.schema }}" + tables: + - name: raw_source + config: + event_time: event_time +""" + +microbatch_model_calling_source_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ source('seed_sources', 'raw_source') }} +""" + + +class TestMicrobatchCLI: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # run without --event-time-start or --event-time-end - 3 expected rows in output + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # build model >= 2020-01-02 + run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"]) + self.assert_row_count(project, "microbatch_model", 2) + + # build model < 2020-01-03 + run_dbt(["run", "--event-time-end", "2020-01-03", "--full-refresh"]) + self.assert_row_count(project, "microbatch_model", 2) + + # build model between 2020-01-02 >= event_time < 2020-01-03 + run_dbt( + [ + "run", + "--event-time-start", + "2020-01-02", + "--event-time-end", + "2020-01-03", + "--full-refresh", + ] + ) + self.assert_row_count(project, "microbatch_model", 1) + + +class TestMicroBatchBoundsDefault: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with freeze_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with freeze_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) + + +class TestMicrobatchWithSource: + @pytest.fixture(scope="class") + def seeds(self): + return { + "raw_source.csv": seed_csv, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "microbatch_model.sql": microbatch_model_calling_source_sql, + "sources.yml": sources_yaml, + "seeds.yml": seeds_yaml, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # ensure seed is created for source + run_dbt(["seed"]) + + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.raw_source(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "raw_source", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with freeze_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with freeze_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) + + +class TestMicrobatchWithInputWithoutEventTime: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_without_event_time_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => INSERT BECAUSE INPUT MODEL ISN'T BEING FILTERED + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) + + +class TestMicrobatchUsingRefRenderSkipsFilter: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # Update microbatch model to call .render() on ref('input_model') + write_file( + microbatch_model_ref_render_sql, project.project_root, "models", "microbatch_model.sql" + ) + + # re-run without changing current time => INSERT because .render() skips filtering + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 224675143e4..97f04eba817 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -1,11 +1,19 @@ +import os +from datetime import datetime +from typing import Optional from unittest import mock import pytest +import pytz +from freezegun import freeze_time +from pytest_mock import MockerFixture from dbt.adapters.base import BaseRelation -from dbt.artifacts.resources import Quoting +from dbt.artifacts.resources import NodeConfig, Quoting +from dbt.artifacts.resources.types import BatchSize from dbt.context.providers import ( BaseResolver, + EventTimeFilter, RuntimeRefResolver, RuntimeSourceResolver, ) @@ -34,6 +42,202 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): assert resolver.resolve_limit == expected_resolve_limit + @pytest.mark.parametrize( + "dbt_experimental_microbatch,materialized,incremental_strategy,expect_filter", + [ + (True, "incremental", "microbatch", True), + (False, "incremental", "microbatch", False), + (True, "table", "microbatch", False), + (True, "incremental", "merge", False), + ], + ) + def test_resolve_event_time_filter_gating( + self, + mocker: MockerFixture, + resolver: ResolverSubclass, + dbt_experimental_microbatch: bool, + materialized: str, + incremental_strategy: str, + expect_filter: bool, + ) -> None: + if dbt_experimental_microbatch: + mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = True + + # Target mocking + target = mock.Mock() + target.config = mock.MagicMock(NodeConfig) + target.config.event_time = "created_at" + + # Resolver mocking + resolver.config.args.EVENT_TIME_END = None + resolver.config.args.EVENT_TIME_START = None + resolver.model.config = mock.MagicMock(NodeConfig) + resolver.model.config.materialized = materialized + resolver.model.config.incremental_strategy = incremental_strategy + resolver.model.config.batch_size = BatchSize.day + resolver.model.config.lookback = 0 + + # Try to get an EventTimeFilter + event_time_filter = resolver.resolve_event_time_filter(target=target) + + if expect_filter: + assert isinstance(event_time_filter, EventTimeFilter) + else: + assert event_time_filter is None + + @freeze_time("2024-09-05 08:56:00") + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.mark.parametrize( + "event_time_end,event_time_start,expect_filter", + [ + (None, None, True), + (datetime(2024, 9, 5), None, True), + (None, datetime(2024, 9, 4), True), + (datetime(2024, 9, 5), datetime(2024, 9, 4), True), + ], + ) + def test_event_time_filtering_is_incremental_false( + self, + mocker: MockerFixture, + resolver: ResolverSubclass, + event_time_end: datetime, + event_time_start: datetime, + expect_filter: bool, + ) -> None: + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = False + + # Target mocking + target = mock.Mock() + target.config = mock.MagicMock(NodeConfig) + target.config.event_time = "created_at" + + # Resolver mocking + resolver.config.args.EVENT_TIME_END = event_time_end + resolver.config.args.EVENT_TIME_START = event_time_start + resolver.model.config = mock.MagicMock(NodeConfig) + resolver.model.config.materialized = "incremental" + resolver.model.config.incremental_strategy = "microbatch" + resolver.model.config.batch_size = BatchSize.day + resolver.model.config.lookback = 0 + + # Try to get an EventTimeFilter + event_time_filter = resolver.resolve_event_time_filter(target=target) + + if expect_filter: + assert isinstance(event_time_filter, EventTimeFilter) + else: + assert event_time_filter is None + + @freeze_time("2024-09-05 08:56:00") + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.mark.parametrize( + "event_time_end,event_time_start,batch_size,lookback,expected_end,expected_start", + [ + ( + None, + None, + BatchSize.day, + 0, + datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 8, 1, 8, 11), + None, + BatchSize.day, + 0, + datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + None, + datetime(2024, 8, 1), + BatchSize.day, + 0, + datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 1), + datetime(2024, 8, 1), + BatchSize.day, + 0, + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 1, 0, 49), + None, + BatchSize.hour, + 1, + datetime(2024, 9, 1, 0, 49, 0, 0, pytz.UTC), + datetime(2024, 8, 31, 23, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 1, 13, 31), + None, + BatchSize.day, + 1, + datetime(2024, 9, 1, 13, 31, 0, 0, pytz.UTC), + datetime(2024, 8, 31, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 1, 23, 12, 30), + None, + BatchSize.month, + 1, + datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), + datetime(2023, 12, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 1, 23, 12, 30), + None, + BatchSize.year, + 1, + datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), + datetime(2023, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), + ], + ) + def test_resolve_event_time_filter_batch_calculation( + self, + mocker: MockerFixture, + resolver: ResolverSubclass, + event_time_end: Optional[datetime], + event_time_start: Optional[datetime], + batch_size: BatchSize, + lookback: int, + expected_end: datetime, + expected_start: datetime, + ) -> None: + event_time = "created_at" + + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = True + + # Target mocking + target = mock.Mock() + target.config = mock.MagicMock(NodeConfig) + target.config.event_time = event_time + + # Resolver mocking + resolver.model.config = mock.MagicMock(NodeConfig) + resolver.model.config.materialized = "incremental" + resolver.model.config.incremental_strategy = "microbatch" + resolver.model.config.batch_size = batch_size + resolver.model.config.lookback = lookback + resolver.config.args.EVENT_TIME_END = event_time_end + resolver.config.args.EVENT_TIME_START = event_time_start + + # Get EventTimeFilter + event_time_filter = resolver.resolve_event_time_filter(target=target) + + assert event_time_filter is not None + assert event_time_filter.field_name == event_time + assert event_time_filter.end == expected_end + assert event_time_filter.start == expected_start + class TestRuntimeRefResolver: @pytest.fixture diff --git a/tests/unit/contracts/graph/test_nodes.py b/tests/unit/contracts/graph/test_nodes.py index a498b99dcbc..a67ca1f5efc 100644 --- a/tests/unit/contracts/graph/test_nodes.py +++ b/tests/unit/contracts/graph/test_nodes.py @@ -183,6 +183,7 @@ def basic_compiled_dict(): "contract": {"enforced": False, "alias_types": True}, "docs": {"show": True}, "access": "protected", + "lookback": 0, }, "docs": {"show": True}, "columns": {}, diff --git a/tests/unit/contracts/graph/test_nodes_parsed.py b/tests/unit/contracts/graph/test_nodes_parsed.py index ebbe2443771..7655b7aa444 100644 --- a/tests/unit/contracts/graph/test_nodes_parsed.py +++ b/tests/unit/contracts/graph/test_nodes_parsed.py @@ -100,6 +100,7 @@ def populated_node_config_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "lookback": 0, } @@ -187,6 +188,7 @@ def base_parsed_model_dict(): "contract": {"enforced": False, "alias_types": True}, "packages": [], "access": "protected", + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -297,6 +299,7 @@ def complex_parsed_model_dict(): "contract": {"enforced": False, "alias_types": True}, "packages": [], "access": "protected", + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -520,6 +523,7 @@ def basic_parsed_seed_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "lookback": 0, }, "docs": {"show": True}, "columns": {}, @@ -611,6 +615,7 @@ def complex_parsed_seed_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "lookback": 0, }, "docs": {"show": True}, "columns": { @@ -818,6 +823,7 @@ def base_parsed_hook_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -899,6 +905,7 @@ def complex_parsed_hook_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -1253,6 +1260,7 @@ def basic_timestamp_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "lookback": 0, } @@ -1291,6 +1299,7 @@ def complex_timestamp_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "lookback": 0, } @@ -1357,6 +1366,7 @@ def basic_check_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "lookback": 0, } @@ -1395,6 +1405,7 @@ def complex_set_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "lookback": 0, } @@ -1519,6 +1530,7 @@ def basic_timestamp_snapshot_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -1621,6 +1633,7 @@ def basic_check_snapshot_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True},