From 2cb2c96a18735f66e0fbf70d7472fc19c8b14586 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 21 Nov 2024 11:21:51 -0500 Subject: [PATCH 1/3] make microbatch models skippable --- core/dbt/task/run.py | 9 ++++--- .../functional/microbatch/test_microbatch.py | 24 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 1b4be463e78..0b991fc9f29 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -2,7 +2,7 @@ import threading import time from copy import deepcopy -from dataclasses import asdict, field +from dataclasses import asdict from datetime import datetime from multiprocessing.pool import ThreadPool from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -335,7 +335,7 @@ def execute(self, model, manifest): class MicrobatchModelRunner(ModelRunner): batch_idx: Optional[int] = None - batches: Dict[int, BatchType] = field(default_factory=dict) + batches: Dict[int, BatchType] = {} relation_exists: bool = False def set_batch_idx(self, batch_idx: int) -> None: @@ -704,8 +704,11 @@ def handle_microbatch_model( runner: MicrobatchModelRunner, pool: ThreadPool, ) -> RunResult: - # Initial run computes batch metadata + # Initial run computes batch metadata, unless model is skipped result = self.call_runner(runner) + if result.status == RunStatus.Skipped: + return result + batch_results: List[RunResult] = [] # Execute batches serially until a relation exists, at which point future batches are run in parallel diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c9b9db2ff4e..ddac8959a5a 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -33,6 +33,12 @@ select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ +input_model_invalid_sql = """ +{{ config(materialized='table', event_time='event_time') }} + +select invalid as event_time +""" + input_model_without_event_time_sql = """ {{ config(materialized='table') }} @@ -835,6 +841,24 @@ def test_microbatch( assert len(catch_aw.caught_events) == 1 +class TestMicrobatchModelSkipped(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_invalid_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def test_microbatch_model_skipped(self, project) -> None: + run_dbt(["run"], expect_pass=False) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + + microbatch_result = run_results["results"][1] + assert microbatch_result["status"] == "skipped" + assert microbatch_result["batch_results"] is None + + class TestMicrobatchCanRunParallelOrSequential(BaseMicrobatchTest): @pytest.fixture def batch_exc_catcher(self) -> EventCatcher: From 255eaf2bc04bbb43d28b112e2e11dcad4221eafe Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 21 Nov 2024 11:26:43 -0500 Subject: [PATCH 2/3] changelog entry --- .changes/unreleased/Fixes-20241121-112638.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20241121-112638.yaml diff --git a/.changes/unreleased/Fixes-20241121-112638.yaml b/.changes/unreleased/Fixes-20241121-112638.yaml new file mode 100644 index 00000000000..15a23ae8995 --- /dev/null +++ b/.changes/unreleased/Fixes-20241121-112638.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Make microbatch models skippable +time: 2024-11-21T11:26:38.192345-05:00 +custom: + Author: michelleark + Issue: "11021" From 052d699ff31651f833a434a352f8247662cc4ac7 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 21 Nov 2024 11:47:53 -0500 Subject: [PATCH 3/3] Implement MicrobatchModelRunner.__init__ --- core/dbt/task/run.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 0b991fc9f29..0c2888bb325 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -334,9 +334,12 @@ def execute(self, model, manifest): class MicrobatchModelRunner(ModelRunner): - batch_idx: Optional[int] = None - batches: Dict[int, BatchType] = {} - relation_exists: bool = False + def __init__(self, config, adapter, node, node_index: int, num_nodes: int): + super().__init__(config, adapter, node, node_index, num_nodes) + + self.batch_idx: Optional[int] = None + self.batches: Dict[int, BatchType] = {} + self.relation_exists: bool = False def set_batch_idx(self, batch_idx: int) -> None: self.batch_idx = batch_idx