Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microbatch parallelism #10958

Merged
merged 44 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2314f5e
first pass: microbatch parallelism: results + build command not working
MichelleArk Oct 30, 2024
34fdcf3
merge batch-level results + log final result
MichelleArk Oct 31, 2024
330ca88
Merge branch 'main' into microbatch-parallelism
MichelleArk Nov 1, 2024
8cbd2c6
unit test for MicrobatchModelRunner: remove result builder test as it…
MichelleArk Nov 8, 2024
b3bc0b4
move relation_exists to MicrobatchModelRunner
MichelleArk Nov 8, 2024
b3317c5
move batches to MicrobatchModelRunner
MichelleArk Nov 8, 2024
c658be8
avoid adding microbatch results to run_results twice
MichelleArk Nov 8, 2024
9e56650
propagate previously successful batches to final result on retry
MichelleArk Nov 8, 2024
92b9aa2
simplify MicrobatchModelRunner state
MichelleArk Nov 8, 2024
3d09292
add tracking
MichelleArk Nov 8, 2024
801aede
order batch_results prior to returning
MichelleArk Nov 8, 2024
f6c3860
Merge branch 'main' into microbatch-parallelism
MichelleArk Nov 11, 2024
8b9e669
refactor: remove batch_callback in favor of batch_results.append
MichelleArk Nov 11, 2024
71bd3d7
refactor checkpoint: unify sequential and concurrent while loop in mi…
MichelleArk Nov 11, 2024
69c088d
Merge branch 'main' into microbatch-parallelism
MichelleArk Nov 14, 2024
109fbd4
refactor into RunTask.handle_microbatch_model
MichelleArk Nov 15, 2024
62f93ab
Use RunTask.handle_microbatch_model in BuildTask
MichelleArk Nov 15, 2024
69df6a2
Run batches in parallel dependent on if `this` is in jinja
QMalcolm Nov 15, 2024
2c37abc
Ensure KeyboardInterrupts/SystemExits get handled in call_runner
QMalcolm Nov 15, 2024
f1cea30
Switch to integration test for checking that KeyboardInterrupts stop …
QMalcolm Nov 18, 2024
edaeb4c
Add optional `parallel_batches` to node config
QMalcolm Nov 18, 2024
d21fdda
Regenerate v12 manifest.json to include `parallel_batches` config
QMalcolm Nov 18, 2024
05cfdb5
Validate `parallel_batches` config value in the context of a microbat…
QMalcolm Nov 18, 2024
b2f2c88
Use combination of relation existence, `parallel_batches`, usage of `…
QMalcolm Nov 18, 2024
4047b1c
Move _should_run_in_parallel onto MicrobatchModelRunner
QMalcolm Nov 18, 2024
281a8d8
Add unit test for all _should_run_in_parallel cases
QMalcolm Nov 18, 2024
c716a2c
Add integration test to ensure microbatch batches can be run in paral…
QMalcolm Nov 18, 2024
c3d2917
Add changelog
QMalcolm Nov 18, 2024
422cd8b
Rename `parallel_batches` to `concurrent_batches`
QMalcolm Nov 18, 2024
7cd4599
fix unit tests: ensure _has_this is not available in flat_graph
MichelleArk Nov 19, 2024
5166379
fix function artifacts and list tests
MichelleArk Nov 19, 2024
59b9889
only run in parallel if adapter supports MicrobatchConcurrency capabi…
MichelleArk Nov 19, 2024
1afad96
Merge branch 'main' into microbatch-parallelism
MichelleArk Nov 20, 2024
8bf1504
dbt-adapters dev-requirements @microbatch-concurrency-capability
MichelleArk Nov 20, 2024
743715f
Update test of `_should_run_in_parallel` to consider adapter `microba…
QMalcolm Nov 20, 2024
9779a41
Revert "dbt-adapters dev-requirements @microbatch-concurrency-capabil…
QMalcolm Nov 20, 2024
1ef88b3
Remove threading information from batch info logs
QMalcolm Nov 20, 2024
f127b7d
Add `MicrobatchExecutionDebug` event type
QMalcolm Nov 20, 2024
7b3e54a
Log whether a batch will be run in parallel/concurrently or not
QMalcolm Nov 20, 2024
3981e7b
Move `_build_run_microbatch_model_result` to `MicrobatchModelRunner`
QMalcolm Nov 21, 2024
12db4ec
empty commit to retrigger full suite of github actions
QMalcolm Nov 21, 2024
daa713e
fix core_types_pb2.py to version 28.3
MichelleArk Nov 21, 2024
bb5e267
Bump dbt-adapters minimum to 1.10.1
QMalcolm Nov 21, 2024
b78f251
safer .name access
MichelleArk Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241118-160038.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow microbatch batches to run in parallel
time: 2024-11-18T16:00:38.895449-06:00
custom:
Author: QMalcolm MichelleArk
Issue: 10853 10855
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class NodeConfig(NodeAndTestConfig):
metadata=MergeBehavior.Update.meta(),
)
event_time: Any = None
concurrent_batches: Any = None

def __post_init__(self):
# we validate that node_color has a suitable value to prevent dbt-docs from crashing
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/artifacts/schemas/batch_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@
successful=self.successful + other.successful,
failed=self.failed + other.failed,
)

def __len__(self):
return len(self.successful) + len(self.failed)

Check warning on line 24 in core/dbt/artifacts/schemas/batch_results.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/schemas/batch_results.py#L24

Added line #L24 was not covered by tests
12 changes: 12 additions & 0 deletions core/dbt/clients/jinja_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
_TESTING_MACRO_CACHE: Dict[str, Any] = {}


def statically_extract_has_name_this(source: str) -> bool:
"""Checks wether the raw jinja has any references to `this`"""
env = get_environment(None, capture_macros=True)
parsed = env.parse(source)
names = tuple(parsed.find_all(jinja2.nodes.Name))

for name in names:
if name.name == "this":
return True
return False


def statically_extract_macro_calls(
source: str, ctx: Dict[str, Any], db_wrapper: Optional["ParseDatabaseWrapper"] = None
) -> List[str]:
Expand Down
14 changes: 14 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.clients.jinja_static import statically_extract_has_name_this
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -444,6 +445,13 @@
@dataclass
class ModelNode(ModelResource, CompiledNode):
batch_info: Optional[BatchResults] = None
_has_this: Optional[bool] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_has_this" in dct:
del dct["_has_this"]
return dct

@classmethod
def resource_class(cls) -> Type[ModelResource]:
Expand Down Expand Up @@ -520,6 +528,12 @@

return constraints

@property
def has_this(self) -> bool:
if self._has_this is None:
self._has_this = statically_extract_has_name_this(self.raw_code)

Check warning on line 534 in core/dbt/contracts/graph/nodes.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/contracts/graph/nodes.py#L534

Added line #L534 was not covered by tests
return self._has_this

def infer_primary_key(self, data_tests: List["GenericTestNode"]) -> List[str]:
"""
Infers the columns that can be used as primary key of a model in the following order:
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,16 @@ message SnapshotTimestampWarningMsg {
SnapshotTimestampWarning data = 2;
}

// Q044
message MicrobatchExecutionDebug {
string msg = 1;
}

message MicrobatchExecutionDebugMsg {
CoreEventInfo info = 1;
MicrobatchExecutionDebug data = 2;
}

// W - Node testing

// Skipped W001
Expand Down
358 changes: 176 additions & 182 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,14 @@ def message(self) -> str:
)


class MicrobatchExecutionDebug(DebugLevel):
def code(self) -> str:
return "Q044"

def message(self) -> str:
return self.msg


# =======================================================
# W - Node testing
# =======================================================
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,13 @@
f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})."
)

# optional config: concurrent_batches (bool)
concurrent_batches = node.config.concurrent_batches
if not isinstance(concurrent_batches, bool) and concurrent_batches is not None:
raise dbt.exceptions.ParsingError(

Check warning on line 1467 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1467

Added line #L1467 was not covered by tests
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
Expand Down
24 changes: 18 additions & 6 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
Expand Down Expand Up @@ -141,13 +142,13 @@

def handle_model_with_unit_tests_node(self, node, pool, callback):
self._raise_set_error()
args = [node]
args = [node, pool]
if self.config.args.single_threaded:
callback(self.call_model_and_unit_tests_runner(*args))
else:
pool.apply_async(self.call_model_and_unit_tests_runner, args=args, callback=callback)

def call_model_and_unit_tests_runner(self, node) -> RunResult:
def call_model_and_unit_tests_runner(self, node, pool) -> RunResult:
assert self.manifest
for unit_test_unique_id in self.model_to_unit_test_map[node.unique_id]:
unit_test_node = self.manifest.unit_tests[unit_test_unique_id]
Expand All @@ -166,6 +167,10 @@
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)

Check warning on line 172 in core/dbt/task/build.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/build.py#L172

Added line #L172 was not covered by tests

return self.call_runner(runner)

# handle non-model-plus-unit-tests nodes
Expand All @@ -177,11 +182,12 @@
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)
args = [runner]
if self.config.args.single_threaded:
callback(self.call_runner(*args))

if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
pool.apply_async(self.call_runner, args=args, callback=callback)
args = [runner]
self._submit(pool, args, callback)

# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
Expand Down Expand Up @@ -210,6 +216,12 @@
)

def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
if (
node.resource_type == NodeType.Model
and super().get_runner_type(node) == MicrobatchModelRunner
):
return MicrobatchModelRunner

return self.RUNNER_MAP.get(node.resource_type)

# Special build compile_manifest method to pass add_test_edges to the compiler
Expand Down
Loading
Loading