Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking works with Click #5972

Merged
merged 19 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# TODO Move this to /core/dbt/flags.py when we're ready to break things
import os
import sys
from dataclasses import dataclass
from multiprocessing import get_context
from pprint import pformat as pf
Expand All @@ -13,7 +14,7 @@

@dataclass(frozen=True)
class Flags:
def __init__(self, ctx=None) -> None:
def __init__(self, ctx=None, invoked_subcommand=None) -> None:

if ctx is None:
ctx = get_current_context()
Expand All @@ -32,13 +33,20 @@ def assign_params(ctx):

assign_params(ctx)

# Get the invoked command flags
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
if invoked_subcommand is not None:
invoked_subcommand_ctx = invoked_subcommand.make_context(None, sys.argv[2:])
assign_params(invoked_subcommand_ctx)

# Hard coded flags
object.__setattr__(self, "WHICH", ctx.info_name)
object.__setattr__(self, "MP_CONTEXT", get_context("spawn"))

# Support console DO NOT TRACK initiave
if os.getenv("DO_NOT_TRACK", "").lower() in (1, "t", "true", "y", "yes"):
object.__setattr__(self, "ANONYMOUS_USAGE_STATS", False)
object.__setattr__(self, "SEND_ANONYMOUS_USAGE_STATS", False)
else:
object.__setattr__(self, "SEND_ANONYMOUS_USAGE_STATS", True)

def __str__(self) -> str:
return str(pf(self.__dict__))
24 changes: 20 additions & 4 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from dbt.adapters.factory import adapter_management
from dbt.cli import params as p
from dbt.cli.flags import Flags
from dbt.events.functions import setup_event_logger
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
from dbt.profiler import profiler
import dbt.tracking
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
import logging


def cli_runner():
Expand Down Expand Up @@ -52,17 +55,30 @@ def cli(ctx, **kwargs):
"""An ELT tool for managing your SQL transformations and data models.
For more documentation on these commands, visit: docs.getdbt.com
"""
incomplete_flags = Flags()
flags = Flags(invoked_subcommand=globals()[ctx.invoked_subcommand])

dbt.tracking.initialize_from_flags(flags)
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
# TODO we need to have config to get the projectID
project_id = ""
# TODO we need to get the credentials or we need to know that this will make adapter info
# not available in invocation start/end events
credentials = None
ctx.with_resource(dbt.tracking.track_run(project_id, credentials, ctx.invoked_subcommand))

# Logging
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
# N.B. Legacy logger is not supported
level_override = logging.WARN if ctx.invoked_subcommand in ("list", "ls") else None
setup_event_logger(flags.LOG_PATH or "logs", level_override)

# Profiling
if incomplete_flags.RECORD_TIMING_INFO:
ctx.with_resource(profiler(enable=True, outfile=incomplete_flags.RECORD_TIMING_INFO))
if flags.RECORD_TIMING_INFO:
ctx.with_resource(profiler(enable=True, outfile=flags.RECORD_TIMING_INFO))

# Adapter management
ctx.with_resource(adapter_management())

# Version info
if incomplete_flags.VERSION:
if flags.VERSION:
click.echo(f"`version` called\n ctx.params: {pf(ctx.params)}")
return
else:
Expand Down
25 changes: 4 additions & 21 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
from dbt.exceptions import (
Exception as dbtException,
InternalException,
NotImplementedException,
FailedToConnectException,
)


Expand Down Expand Up @@ -178,7 +176,7 @@ def handle_and_check(args):
# Set flags from args, user config, and env vars
user_config = read_user_config(flags.PROFILES_DIR) # This is read again later
flags.set_from_args(parsed, user_config)
dbt.tracking.initialize_from_flags()
dbt.tracking.initialize_from_flags(flags)
# Set log_format from flags
parsed.cls.set_log_format()

Expand All @@ -201,22 +199,6 @@ def handle_and_check(args):
return res, success


@contextmanager
def track_run(task):
dbt.tracking.track_invocation_start(config=task.config, args=task.args)
try:
yield
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="ok")
except (NotImplementedException, FailedToConnectException) as e:
fire_event(MainEncounteredError(exc=str(e)))
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
except Exception:
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
raise
finally:
dbt.tracking.flush()


def run_from_args(parsed):
log_cache_events(getattr(parsed, "log_cache_events", False))

Expand All @@ -240,8 +222,9 @@ def run_from_args(parsed):
fire_event(MainTrackingUserState(user_state=dbt.tracking.active_user.state()))

results = None

with track_run(task):
project_id = None if task.config is None else task.config.hashed_name()
credentials = None if task.config is None else task.config.credentials
with dbt.tracking.track_run(project_id, credentials, parsed.which):
results = task.run()
return task, results

Expand Down
1 change: 0 additions & 1 deletion core/dbt/task/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def track_package_install(self, package_name: str, source_type: str, version: st
elif source_type != "hub":
package_name = dbt.utils.md5(package_name)
version = dbt.utils.md5(version)

dbt.tracking.track_package_install(
self.config,
self.config.args,
Expand Down
183 changes: 95 additions & 88 deletions core/dbt/tracking.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from typing import Optional
from contextlib import contextmanager

from dbt.clients.yaml_helper import ( # noqa:F401
yaml,
safe_load,
Loader,
Dumper,
)
from dbt.clients.yaml_helper import yaml, safe_load # noqa:F401
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.types import (
DisableTracking,
Expand All @@ -14,9 +10,13 @@
FlushEvents,
FlushEventsFailure,
TrackingInitializeFailure,
MainEncounteredError,
)
from dbt import version as dbt_version
from dbt import flags
from dbt.exceptions import (
NotImplementedException,
FailedToConnectException,
)
from snowplow_tracker import Subject, Tracker, Emitter, logger as sp_logger
from snowplow_tracker import SelfDescribingJson
from datetime import datetime
Expand Down Expand Up @@ -177,61 +177,6 @@ def get_cookie(self):
active_user: Optional[User] = None


def get_run_type(args):
return "regular"


def get_invocation_context(user, config, args):
# this adapter might not have implemented the type or unique_field properties
try:
adapter_type = config.credentials.type
except Exception:
adapter_type = None
try:
adapter_unique_id = config.credentials.hashed_unique_field()
except Exception:
adapter_unique_id = None

return {
"project_id": None if config is None else config.hashed_name(),
"user_id": user.id,
"invocation_id": get_invocation_id(),
"command": args.which,
"options": None,
"version": str(dbt_version.installed),
"run_type": get_run_type(args),
"adapter_type": adapter_type,
"adapter_unique_id": adapter_unique_id,
}


def get_invocation_start_context(user, config, args):
data = get_invocation_context(user, config, args)

start_data = {"progress": "start", "result_type": None, "result": None}

data.update(start_data)
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_invocation_end_context(user, config, args, result_type):
data = get_invocation_context(user, config, args)

start_data = {"progress": "end", "result_type": result_type, "result": None}

data.update(start_data)
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_invocation_invalid_context(user, config, args, result_type):
data = get_invocation_context(user, config, args)

start_data = {"progress": "invalid", "result_type": result_type, "result": None}

data.update(start_data)
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_platform_context():
data = {
"platform": platform.platform(),
Expand Down Expand Up @@ -267,9 +212,11 @@ def track(user, *args, **kwargs):
fire_event(SendEventFailure())


def track_invocation_start(config=None, args=None):
def track_invocation_start(invocation_context):
data = {"progress": "start", "result_type": None, "result": None}
data.update(invocation_context)
context = [
get_invocation_start_context(active_user, config, args),
SelfDescribingJson(INVOCATION_SPEC, data),
get_platform_context(),
get_dbt_env_context(),
]
Expand Down Expand Up @@ -325,10 +272,34 @@ def track_rpc_request(options):
)


def get_base_invocation_context():
assert (
active_user is not None
), "initialize active user before calling get_base_invocation_context"
return {
"project_id": None,
"user_id": active_user.id,
"invocation_id": active_user.invocation_id,
"command": None,
"options": None,
"version": str(dbt_version.installed),
"run_type": "regular",
"adapter_type": None,
"adapter_unique_id": None,
}


def track_package_install(config, args, options):
assert active_user is not None, "Cannot track package installs when active user is None"

invocation_data = get_invocation_context(active_user, config, args)
invocation_data = get_base_invocation_context()

invocation_data.update(
{
"project_id": None if config is None else config.hashed_name(),
"command": args.which,
}
)

context = [
SelfDescribingJson(INVOCATION_SPEC, invocation_data),
Expand Down Expand Up @@ -361,10 +332,11 @@ def track_deprecation_warn(options):
)


def track_invocation_end(config=None, args=None, result_type=None):
user = active_user
def track_invocation_end(invocation_context, result_type=None):
data = {"progress": "end", "result_type": result_type, "result": None}
data.update(invocation_context)
context = [
get_invocation_end_context(user, config, args, result_type),
SelfDescribingJson(INVOCATION_SPEC, data),
get_platform_context(),
get_dbt_env_context(),
]
Expand All @@ -374,14 +346,17 @@ def track_invocation_end(config=None, args=None, result_type=None):
track(active_user, category="dbt", action="invocation", label="end", context=context)


def track_invalid_invocation(config=None, args=None, result_type=None):
def track_invalid_invocation(args=None, result_type=None):
assert active_user is not None, "Cannot track invalid invocations when active user is None"

user = active_user
invocation_context = get_invocation_invalid_context(user, config, args, result_type)

context = [invocation_context, get_platform_context(), get_dbt_env_context()]

invocation_context = get_base_invocation_context()
invocation_context.update({"command": args.which})
data = {"progress": "invalid", "result_type": result_type, "result": None}
data.update(invocation_context)
context = [
SelfDescribingJson(INVOCATION_SPEC, data),
get_platform_context(),
get_dbt_env_context(),
]
track(active_user, category="dbt", action="invocation", label="invalid", context=context)


Expand Down Expand Up @@ -446,16 +421,6 @@ def do_not_track():
active_user = User(None)


def initialize_tracking(cookie_dir):
global active_user
active_user = User(cookie_dir)
try:
active_user.initialize()
except Exception:
fire_event(TrackingInitializeFailure())
active_user = User(None)


class InvocationProcessor(logbook.Processor):
def __init__(self):
super().__init__()
Expand All @@ -470,9 +435,51 @@ def process(self, record):
)


def initialize_from_flags():
def initialize_from_flags(flags):
# Setting these used to be in UserConfig, but had to be moved here
global active_user
if flags.SEND_ANONYMOUS_USAGE_STATS:
initialize_tracking(flags.PROFILES_DIR)
active_user = User(flags.PROFILES_DIR)
try:
active_user.initialize()
except Exception:
fire_event(TrackingInitializeFailure())
active_user = User(None)
else:
do_not_track()
active_user = User(None)


@contextmanager
def track_run(project_id, credentials, run_command):
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
# this adapter might not have implemented the type or unique_field properties
try:
adapter_type = credentials.type
except Exception:
adapter_type = None
try:
adapter_unique_id = credentials.hashed_unique_field()
except Exception:
adapter_unique_id = None

invocation_context = get_base_invocation_context()

invocation_context.update(
{
"project_id": project_id,
"command": run_command,
"adapter_type": adapter_type,
"adapter_unique_id": adapter_unique_id,
}
)
track_invocation_start(invocation_context)
try:
yield
track_invocation_end(invocation_context, result_type="ok")
except (NotImplementedException, FailedToConnectException) as e:
fire_event(MainEncounteredError(exc=str(e)))
track_invocation_end(invocation_context, result_type="error")
except Exception:
track_invocation_end(invocation_context, result_type="error")
raise
finally:
flush()
Loading