diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 7a321e69d30..692b515ccef 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -3,6 +3,7 @@ import threading import time from copy import deepcopy +from dataclasses import asdict from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -100,7 +101,14 @@ def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str return status, message -def track_model_run(index, num_nodes, run_model_result): +def _get_adapter_info(adapter, run_model_result) -> Dict[str, Any]: + """Each adapter returns a dataclass with a flexible dictionary for + adapter-specific fields. Only the non-'model_adapter_details' fields + are guaranteed cross adapter.""" + return asdict(adapter.get_adapter_run_info(run_model_result.node.config)) if adapter else {} + + +def track_model_run(index, num_nodes, run_model_result, adapter=None): if tracking.active_user is None: raise DbtInternalError("cannot track model run with no active user") invocation_id = get_invocation_id() @@ -116,6 +124,7 @@ def track_model_run(index, num_nodes, run_model_result): contract_enforced = False versioned = False incremental_strategy = None + tracking.track_model_run( { "invocation_id": invocation_id, @@ -135,6 +144,7 @@ def track_model_run(index, num_nodes, run_model_result): "contract_enforced": contract_enforced, "access": access, "versioned": versioned, + "adapter_info": _get_adapter_info(adapter, run_model_result), } ) @@ -283,7 +293,7 @@ def before_execute(self) -> None: self.print_start_line() def after_execute(self, result) -> None: - track_model_run(self.node_index, self.num_nodes, result) + track_model_run(self.node_index, self.num_nodes, result, adapter=self.adapter) self.print_result_line(result) def _build_run_model_result(self, model, context, elapsed_time: float = 0.0): diff --git a/core/dbt/tracking.py b/core/dbt/tracking.py index e34b80bf6cd..7fb62c4bb2d 100644 --- a/core/dbt/tracking.py +++ b/core/dbt/tracking.py @@ -50,7 +50,7 @@ RESOURCE_COUNTS = "iglu:com.dbt/resource_counts/jsonschema/1-0-1" RPC_REQUEST_SPEC = "iglu:com.dbt/rpc_request/jsonschema/1-0-1" RUNNABLE_TIMING = "iglu:com.dbt/runnable/jsonschema/1-0-0" -RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-0-4" +RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-1-0" PLUGIN_GET_NODES = "iglu:com.dbt/plugin_get_nodes/jsonschema/1-0-0" SNOWPLOW_TRACKER_VERSION = Version(snowplow_version) diff --git a/dev-requirements.txt b/dev-requirements.txt index 20605e632b8..72e6cf199a8 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@main +git+https://github.com/dbt-labs/dbt-adapters.git@ADAP-301/add-adapter-telemetry git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main diff --git a/tests/unit/task/test_run.py b/tests/unit/task/test_run.py index 70de7aa76ca..d2ed50ec1ad 100644 --- a/tests/unit/task/test_run.py +++ b/tests/unit/task/test_run.py @@ -2,6 +2,7 @@ from argparse import Namespace from dataclasses import dataclass from datetime import datetime, timedelta +from importlib import import_module from typing import Optional from unittest.mock import MagicMock, patch @@ -18,7 +19,7 @@ from dbt.contracts.graph.nodes import ModelNode from dbt.events.types import LogModelResult from dbt.flags import get_flags, set_from_args -from dbt.task.run import ModelRunner, RunTask +from dbt.task.run import ModelRunner, RunTask, _get_adapter_info from dbt.tests.util import safe_set_invocation_context from dbt_common.events.base_types import EventLevel from dbt_common.events.event_manager_client import add_callback_to_manager @@ -68,6 +69,22 @@ def test_run_task_preserve_edges(): mock_node_selector.get_graph_queue.assert_called_with(mock_spec, True) +def test_tracking_fails_safely_for_missing_adapter(): + assert {} == _get_adapter_info(None, {}) + + +def test_adapter_info_tracking(): + mock_run_result = MagicMock() + mock_run_result.node = MagicMock() + mock_run_result.node.config = {} + assert _get_adapter_info(PostgresAdapter, mock_run_result) == { + "model_adapter_details": {}, + "adapter_name": PostgresAdapter.__name__.split("Adapter")[0].lower(), + "adapter_version": import_module("dbt.adapters.postgres.__version__").version, + "base_adapter_version": import_module("dbt.adapters.__about__").version, + } + + class TestModelRunner: @pytest.fixture def log_model_result_catcher(self) -> EventCatcher: