Skip to content

Commit

Permalink
Add adapter telemetry to snowplow event. (#10859)
Browse files Browse the repository at this point in the history
* Add adapter telemetry to snowplow event.

* Temporary dev branch switch.

* Set tracking for overrideable adapter method.

* Do safer adapter ref.

* Improve comment.

* Code review comments.

* Don't call the asdict on a dict.

* Bump ci to pull in fix from base adapter.

* Add unit tests for coverage.

* Update field name from base adapter/schema change.

* remove breakpoint.
  • Loading branch information
VersusFacit authored Oct 28, 2024
1 parent 316ecfc commit 4d4b05e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
14 changes: 12 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time
from copy import deepcopy
from dataclasses import asdict
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

Expand Down Expand Up @@ -100,7 +101,14 @@ def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str
return status, message


def track_model_run(index, num_nodes, run_model_result):
def _get_adapter_info(adapter, run_model_result) -> Dict[str, Any]:
"""Each adapter returns a dataclass with a flexible dictionary for
adapter-specific fields. Only the non-'model_adapter_details' fields
are guaranteed cross adapter."""
return asdict(adapter.get_adapter_run_info(run_model_result.node.config)) if adapter else {}


def track_model_run(index, num_nodes, run_model_result, adapter=None):
if tracking.active_user is None:
raise DbtInternalError("cannot track model run with no active user")
invocation_id = get_invocation_id()
Expand All @@ -116,6 +124,7 @@ def track_model_run(index, num_nodes, run_model_result):
contract_enforced = False
versioned = False
incremental_strategy = None

tracking.track_model_run(
{
"invocation_id": invocation_id,
Expand All @@ -135,6 +144,7 @@ def track_model_run(index, num_nodes, run_model_result):
"contract_enforced": contract_enforced,
"access": access,
"versioned": versioned,
"adapter_info": _get_adapter_info(adapter, run_model_result),
}
)

Expand Down Expand Up @@ -283,7 +293,7 @@ def before_execute(self) -> None:
self.print_start_line()

def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
track_model_run(self.node_index, self.num_nodes, result, adapter=self.adapter)
self.print_result_line(result)

def _build_run_model_result(self, model, context, elapsed_time: float = 0.0):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
RESOURCE_COUNTS = "iglu:com.dbt/resource_counts/jsonschema/1-0-1"
RPC_REQUEST_SPEC = "iglu:com.dbt/rpc_request/jsonschema/1-0-1"
RUNNABLE_TIMING = "iglu:com.dbt/runnable/jsonschema/1-0-0"
RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-0-4"
RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-1-0"
PLUGIN_GET_NODES = "iglu:com.dbt/plugin_get_nodes/jsonschema/1-0-0"

SNOWPLOW_TRACKER_VERSION = Version(snowplow_version)
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git@ADAP-301/add-adapter-telemetry
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
19 changes: 18 additions & 1 deletion tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timedelta
from importlib import import_module
from typing import Optional
from unittest.mock import MagicMock, patch

Expand All @@ -18,7 +19,7 @@
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import LogModelResult
from dbt.flags import get_flags, set_from_args
from dbt.task.run import ModelRunner, RunTask
from dbt.task.run import ModelRunner, RunTask, _get_adapter_info
from dbt.tests.util import safe_set_invocation_context
from dbt_common.events.base_types import EventLevel
from dbt_common.events.event_manager_client import add_callback_to_manager
Expand Down Expand Up @@ -68,6 +69,22 @@ def test_run_task_preserve_edges():
mock_node_selector.get_graph_queue.assert_called_with(mock_spec, True)


def test_tracking_fails_safely_for_missing_adapter():
assert {} == _get_adapter_info(None, {})


def test_adapter_info_tracking():
mock_run_result = MagicMock()
mock_run_result.node = MagicMock()
mock_run_result.node.config = {}
assert _get_adapter_info(PostgresAdapter, mock_run_result) == {
"model_adapter_details": {},
"adapter_name": PostgresAdapter.__name__.split("Adapter")[0].lower(),
"adapter_version": import_module("dbt.adapters.postgres.__version__").version,
"base_adapter_version": import_module("dbt.adapters.__about__").version,
}


class TestModelRunner:
@pytest.fixture
def log_model_result_catcher(self) -> EventCatcher:
Expand Down

0 comments on commit 4d4b05e

Please sign in to comment.