From beef8aa75e091612c85ca069875fdaee983b73ca Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Fri, 30 Sep 2022 14:48:28 -0500 Subject: [PATCH 01/14] Mostly working, WIP for Chenyu --- core/dbt/cli/flags.py | 8 +++++++- core/dbt/cli/main.py | 15 +++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index 3593a69de84..7ed32ec17fe 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -1,5 +1,6 @@ # TODO Move this to /core/dbt/flags.py when we're ready to break things import os +import sys from dataclasses import dataclass from multiprocessing import get_context from pprint import pformat as pf @@ -13,7 +14,7 @@ @dataclass(frozen=True) class Flags: - def __init__(self, ctx=None) -> None: + def __init__(self, ctx=None, invoked_subcommand=None) -> None: if ctx is None: ctx = get_current_context() @@ -32,6 +33,11 @@ def assign_params(ctx): assign_params(ctx) + # Get the invoked command flags + if invoked_subcommand is not None: + invoked_subcommand_ctx = invoked_subcommand.make_context(None, sys.argv[2:]) + assign_params(invoked_subcommand_ctx) + # Hard coded flags object.__setattr__(self, "WHICH", ctx.info_name) object.__setattr__(self, "MP_CONTEXT", get_context("spawn")) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 3f3b94ea9e3..f9dc0007ea7 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -6,7 +6,9 @@ from dbt.adapters.factory import adapter_management from dbt.cli import params as p from dbt.cli.flags import Flags +from dbt.events.functions import setup_event_logger from dbt.profiler import profiler +import logging def cli_runner(): @@ -52,17 +54,22 @@ def cli(ctx, **kwargs): """An ELT tool for managing your SQL transformations and data models. For more documentation on these commands, visit: docs.getdbt.com """ - incomplete_flags = Flags() + flags = Flags(invoked_subcommand=globals()[ctx.invoked_subcommand]) + + # Logging + # N.B. Legacy logger is not supported + level_override = logging.WARN if ctx.invoked_subcommand in ("list", "ls") else None + setup_event_logger(flags.LOG_PATH or "logs", level_override) # Profiling - if incomplete_flags.RECORD_TIMING_INFO: - ctx.with_resource(profiler(enable=True, outfile=incomplete_flags.RECORD_TIMING_INFO)) + if flags.RECORD_TIMING_INFO: + ctx.with_resource(profiler(enable=True, outfile=flags.RECORD_TIMING_INFO)) # Adapter management ctx.with_resource(adapter_management()) # Version info - if incomplete_flags.VERSION: + if flags.VERSION: click.echo(f"`version` called\n ctx.params: {pf(ctx.params)}") return else: From e75ce55d02608e89b745ae091af078b94208823d Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Sat, 1 Oct 2022 10:30:32 -0500 Subject: [PATCH 02/14] Fixed passing command object for flag generation --- core/dbt/cli/flags.py | 9 ++++++--- core/dbt/cli/main.py | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index 7ed32ec17fe..0aeb205b59d 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -2,10 +2,11 @@ import os import sys from dataclasses import dataclass +from importlib import import_module from multiprocessing import get_context from pprint import pformat as pf -from click import get_current_context +from click import Context, get_current_context if os.name != "nt": # https://bugs.python.org/issue41567 @@ -14,7 +15,7 @@ @dataclass(frozen=True) class Flags: - def __init__(self, ctx=None, invoked_subcommand=None) -> None: + def __init__(self, ctx: Context = None) -> None: if ctx is None: ctx = get_current_context() @@ -34,7 +35,9 @@ def assign_params(ctx): assign_params(ctx) # Get the invoked command flags - if invoked_subcommand is not None: + if hasattr(ctx, "invoked_subcommand") and ctx.invoked_subcommand is not None: + invoked_subcommand = getattr(import_module("dbt.cli.main"), ctx.invoked_subcommand) + # TODO: I think sys.argv[2::] is a little fragile-- Think through edge cases! invoked_subcommand_ctx = invoked_subcommand.make_context(None, sys.argv[2:]) assign_params(invoked_subcommand_ctx) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index f9dc0007ea7..54380e3bb9b 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -1,4 +1,5 @@ import inspect # This is temporary for RAT-ing +import logging from copy import copy from pprint import pformat as pf # This is temporary for RAT-ing @@ -6,9 +7,9 @@ from dbt.adapters.factory import adapter_management from dbt.cli import params as p from dbt.cli.flags import Flags -from dbt.events.functions import setup_event_logger +from dbt.events.functions import fire_event, setup_event_logger +from dbt.events.types import MainEncounteredError from dbt.profiler import profiler -import logging def cli_runner(): @@ -54,12 +55,20 @@ def cli(ctx, **kwargs): """An ELT tool for managing your SQL transformations and data models. For more documentation on these commands, visit: docs.getdbt.com """ - flags = Flags(invoked_subcommand=globals()[ctx.invoked_subcommand]) + flags = Flags() # Logging # N.B. Legacy logger is not supported + + # TODO: Check w Nate and Jerco-- does this do what we need it to wrt to list/ls log levels? level_override = logging.WARN if ctx.invoked_subcommand in ("list", "ls") else None setup_event_logger(flags.LOG_PATH or "logs", level_override) + # TODO: Do we need to set `event_logger.format_json = flags.LOG_FORMAT` like we used to? + # Do we even need these pre-init-hooks in basetask and list? + # I need Nate/Jerco to walk me through this convoluted mess. + + # This is just a test log event, remove before merge + fire_event(MainEncounteredError(exc="bork bork bork!")) # Profiling if flags.RECORD_TIMING_INFO: From c3ea0d1e01adc70d000375fb73b9897ffe45ec4d Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 3 Oct 2022 10:12:36 -0500 Subject: [PATCH 03/14] fixed flags param parse failure in root --- core/dbt/cli/flags.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index 0aeb205b59d..ffe066e366a 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -37,8 +37,9 @@ def assign_params(ctx): # Get the invoked command flags if hasattr(ctx, "invoked_subcommand") and ctx.invoked_subcommand is not None: invoked_subcommand = getattr(import_module("dbt.cli.main"), ctx.invoked_subcommand) - # TODO: I think sys.argv[2::] is a little fragile-- Think through edge cases! - invoked_subcommand_ctx = invoked_subcommand.make_context(None, sys.argv[2:]) + invoked_subcommand.allow_extra_args = True + invoked_subcommand.ignore_unknown_options = True + invoked_subcommand_ctx = invoked_subcommand.make_context(None, sys.argv) assign_params(invoked_subcommand_ctx) # Hard coded flags From f354296e29f4af4d9fd1e8a37122b4751fb4ba45 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 11 Oct 2022 14:21:26 -0500 Subject: [PATCH 04/14] WIP --- core/dbt/cli/main.py | 15 +- core/dbt/cli/params.py | 1 + core/dbt/events/functions.py | 104 +++++----- core/dbt/events/functions_old.py | 344 +++++++++++++++++++++++++++++++ 4 files changed, 399 insertions(+), 65 deletions(-) create mode 100644 core/dbt/events/functions_old.py diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 54380e3bb9b..e347cb3ba40 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -59,16 +59,15 @@ def cli(ctx, **kwargs): # Logging # N.B. Legacy logger is not supported - - # TODO: Check w Nate and Jerco-- does this do what we need it to wrt to list/ls log levels? - level_override = logging.WARN if ctx.invoked_subcommand in ("list", "ls") else None - setup_event_logger(flags.LOG_PATH or "logs", level_override) - # TODO: Do we need to set `event_logger.format_json = flags.LOG_FORMAT` like we used to? - # Do we even need these pre-init-hooks in basetask and list? - # I need Nate/Jerco to walk me through this convoluted mess. + setup_event_logger( + flags.LOG_PATH if hasattr(flags.LOG_PATH) else "logs", + flags.LOG_FORMAT, + flags.USE_COLORS, + flags.DEBUG, + ) # This is just a test log event, remove before merge - fire_event(MainEncounteredError(exc="bork bork bork!")) + fire_event(MainEncounteredError(exc="bork bork bork!\n\n\n")) # Profiling if flags.RECORD_TIMING_INFO: diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 1661e6e8c55..2eaf366a6d8 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -131,6 +131,7 @@ "--log-path", envvar="DBT_LOG_PATH", help="Configure the 'log-path'. Only applies this setting for the current run. Overrides the 'DBT_LOG_PATH' if it is set.", + default=Path.cwd() / "logs", type=click.Path(), ) diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index c98c7176117..903cb7b177e 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -1,62 +1,61 @@ -from colorama import Style -import dbt.events.functions as this # don't worry I hate it too. -from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache -from dbt.events.types import T_Event, MainReportVersion, EmptyLine, EventBufferFull -import dbt.flags as flags -from dbt.constants import SECRET_ENV_PREFIX - -# TODO this will need to move eventually -from dbt.logger import make_log_dir_if_missing, GLOBAL_LOGGER -from datetime import datetime -import json import io -from io import StringIO, TextIOWrapper -import logbook +import json import logging -from logging import Logger -import sys -from logging.handlers import RotatingFileHandler import os -import uuid +import sys import threading -from typing import Any, Dict, List, Optional, Union +import uuid from collections import deque +from datetime import datetime +from io import StringIO, TextIOWrapper +from logging import Logger +from logging.handlers import RotatingFileHandler +from typing import Any, Dict, List, Optional, Union + +import dbt.flags as flags +import logbook +from colorama import Style +from dbt.constants import SECRET_ENV_PREFIX +from dbt.events.base_types import Cache, Event, NoFile, NoStdOut, ShowException +from dbt.events.types import EmptyLine, EventBufferFull, MainReportVersion, T_Event +from dbt.logger import GLOBAL_LOGGER, make_log_dir_if_missing LOG_VERSION = 2 EVENT_HISTORY = None -# create the global file logger with no configuration +# create the module-global loggers FILE_LOG = logging.getLogger("default_file") -null_handler = logging.NullHandler() -FILE_LOG.addHandler(null_handler) - -# set up logger to go to stdout with defaults -# setup_event_logger will be called once args have been parsed STDOUT_LOG = logging.getLogger("default_stdout") -STDOUT_LOG.setLevel(logging.INFO) -stdout_handler = logging.StreamHandler(sys.stdout) -stdout_handler.setLevel(logging.INFO) -STDOUT_LOG.addHandler(stdout_handler) -format_color = True -format_json = False invocation_id: Optional[str] = None -def setup_event_logger(log_path, level_override=None): +def setup_event_logger(log_path, log_format, use_colors, debug): + global FILE_LOG + global STDOUT_LOG + breakpoint() + make_log_dir_if_missing(log_path) + + null_handler = logging.NullHandler() + FILE_LOG.addHandler(null_handler) + + STDOUT_LOG.setLevel(logging.INFO) + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.INFO) + STDOUT_LOG.addHandler(stdout_handler) - this.format_json = flags.LOG_FORMAT == "json" + format_json = log_format == "json" # USE_COLORS can be None if the app just started and the cli flags # havent been applied yet - this.format_color = True if flags.USE_COLORS else False + format_color = True if use_colors else False # TODO this default should live somewhere better log_dest = os.path.join(log_path, "dbt.log") - level = level_override or (logging.DEBUG if flags.DEBUG else logging.INFO) + level = logging.DEBUG if debug else logging.INFO # overwrite the STDOUT_LOG logger with the configured one - this.STDOUT_LOG = logging.getLogger("configured_std_out") - this.STDOUT_LOG.setLevel(level) + STDOUT_LOG = logging.getLogger("configured_std_out") + STDOUT_LOG.setLevel(level) FORMAT = "%(message)s" stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT) @@ -65,16 +64,16 @@ def setup_event_logger(log_path, level_override=None): stdout_handler.setFormatter(stdout_passthrough_formatter) stdout_handler.setLevel(level) # clear existing stdout TextIOWrapper stream handlers - this.STDOUT_LOG.handlers = [ + STDOUT_LOG.handlers = [ h - for h in this.STDOUT_LOG.handlers + for h in STDOUT_LOG.handlers if not (hasattr(h, "stream") and isinstance(h.stream, TextIOWrapper)) # type: ignore ] - this.STDOUT_LOG.addHandler(stdout_handler) + STDOUT_LOG.addHandler(stdout_handler) # overwrite the FILE_LOG logger with the configured one - this.FILE_LOG = logging.getLogger("configured_file") - this.FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input + FILE_LOG = logging.getLogger("configured_file") + FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input file_passthrough_formatter = logging.Formatter(fmt=FORMAT) @@ -83,8 +82,8 @@ def setup_event_logger(log_path, level_override=None): ) file_handler.setFormatter(file_passthrough_formatter) file_handler.setLevel(logging.DEBUG) # always debug regardless of user input - this.FILE_LOG.handlers.clear() - this.FILE_LOG.addHandler(file_handler) + FILE_LOG.handlers.clear() + FILE_LOG.addHandler(file_handler) # used for integration tests @@ -92,15 +91,15 @@ def capture_stdout_logs() -> StringIO: capture_buf = io.StringIO() stdout_capture_handler = logging.StreamHandler(capture_buf) stdout_handler.setLevel(logging.DEBUG) - this.STDOUT_LOG.addHandler(stdout_capture_handler) + STDOUT_LOG.addHandler(stdout_capture_handler) return capture_buf # used for integration tests def stop_capture_stdout_logs() -> None: - this.STDOUT_LOG.handlers = [ + STDOUT_LOG.handlers = [ h - for h in this.STDOUT_LOG.handlers + for h in STDOUT_LOG.handlers if not (hasattr(h, "stream") and isinstance(h.stream, StringIO)) # type: ignore ] @@ -157,7 +156,7 @@ def event_to_serializable_dict( # translates an Event to a completely formatted text-based log line # type hinting everything as strings so we don't get any unintentional string conversions via str() def reset_color() -> str: - return "" if not this.format_color else Style.RESET_ALL + return "" if not format_color else Style.RESET_ALL def create_info_text_log_line(e: T_Event) -> str: @@ -200,7 +199,7 @@ def create_json_log_line(e: T_Event) -> Optional[str]: # calls create_stdout_text_log_line() or create_json_log_line() according to logger config def create_log_line(e: T_Event, file_output=False) -> Optional[str]: - if this.format_json: + if format_json: return create_json_log_line(e) # json output, both console and file elif file_output is True or flags.DEBUG: return create_debug_text_log_line(e) # default file output @@ -261,15 +260,6 @@ def fire_event(e: Event) -> None: add_to_event_history(e) - # backwards compatibility for plugins that require old logger (dbt-rpc) - if flags.ENABLE_LEGACY_LOGGER: - # using Event::message because the legacy logger didn't differentiate messages by - # destination - log_line = create_log_line(e) - if log_line: - send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line) - return # exit the function to avoid using the current logger as well - # always logs debug level regardless of user input if not isinstance(e, NoFile): log_line = create_log_line(e, file_output=True) diff --git a/core/dbt/events/functions_old.py b/core/dbt/events/functions_old.py new file mode 100644 index 00000000000..c98c7176117 --- /dev/null +++ b/core/dbt/events/functions_old.py @@ -0,0 +1,344 @@ +from colorama import Style +import dbt.events.functions as this # don't worry I hate it too. +from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache +from dbt.events.types import T_Event, MainReportVersion, EmptyLine, EventBufferFull +import dbt.flags as flags +from dbt.constants import SECRET_ENV_PREFIX + +# TODO this will need to move eventually +from dbt.logger import make_log_dir_if_missing, GLOBAL_LOGGER +from datetime import datetime +import json +import io +from io import StringIO, TextIOWrapper +import logbook +import logging +from logging import Logger +import sys +from logging.handlers import RotatingFileHandler +import os +import uuid +import threading +from typing import Any, Dict, List, Optional, Union +from collections import deque + +LOG_VERSION = 2 +EVENT_HISTORY = None + +# create the global file logger with no configuration +FILE_LOG = logging.getLogger("default_file") +null_handler = logging.NullHandler() +FILE_LOG.addHandler(null_handler) + +# set up logger to go to stdout with defaults +# setup_event_logger will be called once args have been parsed +STDOUT_LOG = logging.getLogger("default_stdout") +STDOUT_LOG.setLevel(logging.INFO) +stdout_handler = logging.StreamHandler(sys.stdout) +stdout_handler.setLevel(logging.INFO) +STDOUT_LOG.addHandler(stdout_handler) + +format_color = True +format_json = False +invocation_id: Optional[str] = None + + +def setup_event_logger(log_path, level_override=None): + make_log_dir_if_missing(log_path) + + this.format_json = flags.LOG_FORMAT == "json" + # USE_COLORS can be None if the app just started and the cli flags + # havent been applied yet + this.format_color = True if flags.USE_COLORS else False + # TODO this default should live somewhere better + log_dest = os.path.join(log_path, "dbt.log") + level = level_override or (logging.DEBUG if flags.DEBUG else logging.INFO) + + # overwrite the STDOUT_LOG logger with the configured one + this.STDOUT_LOG = logging.getLogger("configured_std_out") + this.STDOUT_LOG.setLevel(level) + + FORMAT = "%(message)s" + stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT) + + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(stdout_passthrough_formatter) + stdout_handler.setLevel(level) + # clear existing stdout TextIOWrapper stream handlers + this.STDOUT_LOG.handlers = [ + h + for h in this.STDOUT_LOG.handlers + if not (hasattr(h, "stream") and isinstance(h.stream, TextIOWrapper)) # type: ignore + ] + this.STDOUT_LOG.addHandler(stdout_handler) + + # overwrite the FILE_LOG logger with the configured one + this.FILE_LOG = logging.getLogger("configured_file") + this.FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input + + file_passthrough_formatter = logging.Formatter(fmt=FORMAT) + + file_handler = RotatingFileHandler( + filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb + ) + file_handler.setFormatter(file_passthrough_formatter) + file_handler.setLevel(logging.DEBUG) # always debug regardless of user input + this.FILE_LOG.handlers.clear() + this.FILE_LOG.addHandler(file_handler) + + +# used for integration tests +def capture_stdout_logs() -> StringIO: + capture_buf = io.StringIO() + stdout_capture_handler = logging.StreamHandler(capture_buf) + stdout_handler.setLevel(logging.DEBUG) + this.STDOUT_LOG.addHandler(stdout_capture_handler) + return capture_buf + + +# used for integration tests +def stop_capture_stdout_logs() -> None: + this.STDOUT_LOG.handlers = [ + h + for h in this.STDOUT_LOG.handlers + if not (hasattr(h, "stream") and isinstance(h.stream, StringIO)) # type: ignore + ] + + +def env_secrets() -> List[str]: + return [v for k, v in os.environ.items() if k.startswith(SECRET_ENV_PREFIX) and v.strip()] + + +def scrub_secrets(msg: str, secrets: List[str]) -> str: + scrubbed = msg + + for secret in secrets: + scrubbed = scrubbed.replace(secret, "*****") + + return scrubbed + + +# returns a dictionary representation of the event fields. +# the message may contain secrets which must be scrubbed at the usage site. +def event_to_serializable_dict( + e: T_Event, +) -> Dict[str, Any]: + + log_line = dict() + code: str + try: + log_line = e.to_dict() + except AttributeError as exc: + event_type = type(e).__name__ + raise Exception( # TODO this may hang async threads + f"type {event_type} is not serializable. {str(exc)}" + ) + + # We get the code from the event object, so we don't need it in the data + if "code" in log_line: + del log_line["code"] + + event_dict = { + "type": "log_line", + "log_version": LOG_VERSION, + "ts": get_ts_rfc3339(), + "pid": e.get_pid(), + "msg": e.message(), + "level": e.level_tag(), + "data": log_line, + "invocation_id": e.get_invocation_id(), + "thread_name": e.get_thread_name(), + "code": e.code, + } + + return event_dict + + +# translates an Event to a completely formatted text-based log line +# type hinting everything as strings so we don't get any unintentional string conversions via str() +def reset_color() -> str: + return "" if not this.format_color else Style.RESET_ALL + + +def create_info_text_log_line(e: T_Event) -> str: + color_tag: str = reset_color() + ts: str = get_ts().strftime("%H:%M:%S") + scrubbed_msg: str = scrub_secrets(e.message(), env_secrets()) + log_line: str = f"{color_tag}{ts} {scrubbed_msg}" + return log_line + + +def create_debug_text_log_line(e: T_Event) -> str: + log_line: str = "" + # Create a separator if this is the beginning of an invocation + if type(e) == MainReportVersion: + separator = 30 * "=" + log_line = f"\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n" + color_tag: str = reset_color() + ts: str = get_ts().strftime("%H:%M:%S.%f") + scrubbed_msg: str = scrub_secrets(e.message(), env_secrets()) + level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} " + thread = "" + if threading.current_thread().name: + thread_name = threading.current_thread().name + thread_name = thread_name[:10] + thread_name = thread_name.ljust(10, " ") + thread = f" [{thread_name}]:" + log_line = log_line + f"{color_tag}{ts} [{level}]{thread} {scrubbed_msg}" + return log_line + + +# translates an Event to a completely formatted json log line +def create_json_log_line(e: T_Event) -> Optional[str]: + if type(e) == EmptyLine: + return None # will not be sent to logger + # using preformatted ts string instead of formatting it here to be extra careful about timezone + values = event_to_serializable_dict(e) + raw_log_line = json.dumps(values, sort_keys=True) + return scrub_secrets(raw_log_line, env_secrets()) + + +# calls create_stdout_text_log_line() or create_json_log_line() according to logger config +def create_log_line(e: T_Event, file_output=False) -> Optional[str]: + if this.format_json: + return create_json_log_line(e) # json output, both console and file + elif file_output is True or flags.DEBUG: + return create_debug_text_log_line(e) # default file output + else: + return create_info_text_log_line(e) # console output + + +# allows for reuse of this obnoxious if else tree. +# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra +def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str): + if not log_line: + return + if level_tag == "test": + # TODO after implmenting #3977 send to new test level + l.debug(log_line) + elif level_tag == "debug": + l.debug(log_line) + elif level_tag == "info": + l.info(log_line) + elif level_tag == "warn": + l.warning(log_line) + elif level_tag == "error": + l.error(log_line) + else: + raise AssertionError( + f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}" + ) + + +def send_exc_to_logger( + l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False +): + if level_tag == "test": + # TODO after implmenting #3977 send to new test level + l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) + elif level_tag == "debug": + l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) + elif level_tag == "info": + l.info(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) + elif level_tag == "warn": + l.warning(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) + elif level_tag == "error": + l.error(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) + else: + raise AssertionError( + f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}" + ) + + +# top-level method for accessing the new eventing system +# this is where all the side effects happen branched by event type +# (i.e. - mutating the event history, printing to stdout, logging +# to files, etc.) +def fire_event(e: Event) -> None: + # skip logs when `--log-cache-events` is not passed + if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS: + return + + add_to_event_history(e) + + # backwards compatibility for plugins that require old logger (dbt-rpc) + if flags.ENABLE_LEGACY_LOGGER: + # using Event::message because the legacy logger didn't differentiate messages by + # destination + log_line = create_log_line(e) + if log_line: + send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line) + return # exit the function to avoid using the current logger as well + + # always logs debug level regardless of user input + if not isinstance(e, NoFile): + log_line = create_log_line(e, file_output=True) + # doesn't send exceptions to exception logger + if log_line: + send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line) + + if not isinstance(e, NoStdOut): + # explicitly checking the debug flag here so that potentially expensive-to-construct + # log messages are not constructed if debug messages are never shown. + if e.level_tag() == "debug" and not flags.DEBUG: + return # eat the message in case it was one of the expensive ones + if e.level_tag() != "error" and flags.QUIET: + return # eat all non-exception messages in quiet mode + + log_line = create_log_line(e) + if log_line: + if not isinstance(e, ShowException): + send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line) + else: + send_exc_to_logger( + STDOUT_LOG, + level_tag=e.level_tag(), + log_line=log_line, + exc_info=e.exc_info, + stack_info=e.stack_info, + extra=e.extra, + ) + + +def get_invocation_id() -> str: + global invocation_id + if invocation_id is None: + invocation_id = str(uuid.uuid4()) + return invocation_id + + +def set_invocation_id() -> None: + # This is primarily for setting the invocation_id for separate + # commands in the dbt servers. It shouldn't be necessary for the CLI. + global invocation_id + invocation_id = str(uuid.uuid4()) + + +# exactly one time stamp per concrete event +def get_ts() -> datetime: + ts = datetime.utcnow() + return ts + + +# preformatted time stamp +def get_ts_rfc3339() -> str: + ts = get_ts() + ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + return ts_rfc3339 + + +def add_to_event_history(event): + if flags.EVENT_BUFFER_SIZE == 0: + return + global EVENT_HISTORY + if EVENT_HISTORY is None: + reset_event_history() + EVENT_HISTORY.append(event) + # We only set the EventBufferFull message for event buffers >= 10,000 + if flags.EVENT_BUFFER_SIZE >= 10000 and len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1): + fire_event(EventBufferFull()) + + +def reset_event_history(): + global EVENT_HISTORY + EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) From e41edafe577455beb66fcc5c3ee7030c7a819a4e Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 11 Oct 2022 14:22:12 -0500 Subject: [PATCH 05/14] WIP --- core/dbt/cli/main.py | 6 +++--- core/dbt/events/functions.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index e347cb3ba40..5773e4373ab 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -60,11 +60,11 @@ def cli(ctx, **kwargs): # Logging # N.B. Legacy logger is not supported setup_event_logger( - flags.LOG_PATH if hasattr(flags.LOG_PATH) else "logs", + flags.LOG_PATH if hasattr(flags.LOG_PATH) else "logs", flags.LOG_FORMAT, flags.USE_COLORS, - flags.DEBUG, - ) + flags.DEBUG, + ) # This is just a test log event, remove before merge fire_event(MainEncounteredError(exc="bork bork bork!\n\n\n")) diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 903cb7b177e..3caa5691f39 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -34,9 +34,9 @@ def setup_event_logger(log_path, log_format, use_colors, debug): global FILE_LOG global STDOUT_LOG breakpoint() - + make_log_dir_if_missing(log_path) - + null_handler = logging.NullHandler() FILE_LOG.addHandler(null_handler) From f65a9c5df469428ad221c0d3fd065f501bc30a94 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 17 Oct 2022 13:22:57 -0500 Subject: [PATCH 06/14] logging mostly works --- core/dbt/cli/main.py | 17 +++---------- core/dbt/cli/params.py | 2 +- core/dbt/clients/system.py | 46 ++++++++++++++++++--------------- core/dbt/events/functions.py | 49 +++++++++++++++++++++--------------- 4 files changed, 59 insertions(+), 55 deletions(-) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 5773e4373ab..826b068c7b9 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -1,5 +1,4 @@ import inspect # This is temporary for RAT-ing -import logging from copy import copy from pprint import pformat as pf # This is temporary for RAT-ing @@ -7,8 +6,7 @@ from dbt.adapters.factory import adapter_management from dbt.cli import params as p from dbt.cli.flags import Flags -from dbt.events.functions import fire_event, setup_event_logger -from dbt.events.types import MainEncounteredError +from dbt.events.functions import setup_event_logger from dbt.profiler import profiler @@ -38,6 +36,7 @@ def cli_runner(): @p.fail_fast @p.log_cache_events @p.log_format +@p.log_path @p.macro_debugging @p.partial_parse @p.print @@ -60,15 +59,12 @@ def cli(ctx, **kwargs): # Logging # N.B. Legacy logger is not supported setup_event_logger( - flags.LOG_PATH if hasattr(flags.LOG_PATH) else "logs", + flags.LOG_PATH, flags.LOG_FORMAT, flags.USE_COLORS, flags.DEBUG, ) - # This is just a test log event, remove before merge - fire_event(MainEncounteredError(exc="bork bork bork!\n\n\n")) - # Profiling if flags.RECORD_TIMING_INFO: ctx.with_resource(profiler(enable=True, outfile=flags.RECORD_TIMING_INFO)) @@ -92,7 +88,6 @@ def cli(ctx, **kwargs): @p.fail_fast @p.full_refresh @p.indirect_selection -@p.log_path @p.models @p.profile @p.profiles_dir @@ -139,7 +134,6 @@ def docs(ctx, **kwargs): @p.compile_docs @p.defer @p.exclude -@p.log_path @p.models @p.profile @p.profiles_dir @@ -179,7 +173,6 @@ def docs_serve(ctx, **kwargs): @p.defer @p.exclude @p.full_refresh -@p.log_path @p.models @p.parse_only @p.profile @@ -269,7 +262,6 @@ def list(ctx, **kwargs): @cli.command("parse") @click.pass_context @p.compile_parse -@p.log_path @p.profile @p.profiles_dir @p.project_dir @@ -292,7 +284,6 @@ def parse(ctx, **kwargs): @p.exclude @p.fail_fast @p.full_refresh -@p.log_path @p.models @p.profile @p.profiles_dir @@ -330,7 +321,6 @@ def run_operation(ctx, **kwargs): @click.pass_context @p.exclude @p.full_refresh -@p.log_path @p.models @p.profile @p.profiles_dir @@ -403,7 +393,6 @@ def freshness(ctx, **kwargs): @p.exclude @p.fail_fast @p.indirect_selection -@p.log_path @p.models @p.profile @p.profiles_dir diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 2eaf366a6d8..d092d7eae51 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -132,7 +132,7 @@ envvar="DBT_LOG_PATH", help="Configure the 'log-path'. Only applies this setting for the current run. Overrides the 'DBT_LOG_PATH' if it is set.", default=Path.cwd() / "logs", - type=click.Path(), + type=click.Path(resolve_path=True, path_type=Path), ) macro_debugging = click.option( diff --git a/core/dbt/clients/system.py b/core/dbt/clients/system.py index b1cd1b5a074..8350fe2b5c9 100644 --- a/core/dbt/clients/system.py +++ b/core/dbt/clients/system.py @@ -1,30 +1,31 @@ import errno -import functools import fnmatch +import functools import json import os import os.path import re import shutil +import stat import subprocess import sys import tarfile -import requests -import stat -from typing import Type, NoReturn, List, Optional, Dict, Any, Tuple, Callable, Union -from pathspec import PathSpec # type: ignore +from pathlib import Path, PosixPath, WindowsPath +from typing import Any, Callable, Dict, List, NoReturn, Optional, Tuple, Type, Union +import dbt.exceptions +import requests from dbt.events.functions import fire_event from dbt.events.types import ( - SystemErrorRetrievingModTime, SystemCouldNotWrite, + SystemErrorRetrievingModTime, SystemExecutingCmd, - SystemStdOutMsg, - SystemStdErrMsg, SystemReportReturnCode, + SystemStdErrMsg, + SystemStdOutMsg, ) -import dbt.exceptions from dbt.utils import _connection_exception_retry as connection_exception_retry +from pathspec import PathSpec # type: ignore if sys.platform == "win32": from ctypes import WinDLL, c_bool @@ -106,23 +107,28 @@ def load_file_contents(path: str, strip: bool = True) -> str: return to_return -def make_directory(path: str) -> None: +def make_directory(path: Union[str, Path]) -> None: """ Make a directory and any intermediate directories that don't already exist. This function handles the case where two threads try to create a directory at once. """ - path = convert_path(path) - if not os.path.exists(path): - # concurrent writes that try to create the same dir can fail - try: - os.makedirs(path) - except OSError as e: - if e.errno == errno.EEXIST: - pass - else: - raise e + if type(path) is str: + path = convert_path(path) + if not os.path.exists(path): + # concurrent writes that try to create the same dir can fail + try: + os.makedirs(path) + + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + raise e + elif type(path) in (PosixPath, WindowsPath): + assert type(path) is PosixPath + path.mkdir(parents=True, exist_ok=True) def make_file(path: str, contents: str = "", overwrite: bool = False) -> bool: diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 3caa5691f39..eeb425e0d31 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -18,14 +18,14 @@ from dbt.constants import SECRET_ENV_PREFIX from dbt.events.base_types import Cache, Event, NoFile, NoStdOut, ShowException from dbt.events.types import EmptyLine, EventBufferFull, MainReportVersion, T_Event -from dbt.logger import GLOBAL_LOGGER, make_log_dir_if_missing +from dbt.logger import make_log_dir_if_missing +# create the module-globals LOG_VERSION = 2 EVENT_HISTORY = None -# create the module-global loggers FILE_LOG = logging.getLogger("default_file") -STDOUT_LOG = logging.getLogger("default_stdout") +STDOUT_LOG = logging.getLogger("default_std_out") invocation_id: Optional[str] = None @@ -33,22 +33,9 @@ def setup_event_logger(log_path, log_format, use_colors, debug): global FILE_LOG global STDOUT_LOG - breakpoint() make_log_dir_if_missing(log_path) - null_handler = logging.NullHandler() - FILE_LOG.addHandler(null_handler) - - STDOUT_LOG.setLevel(logging.INFO) - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setLevel(logging.INFO) - STDOUT_LOG.addHandler(stdout_handler) - - format_json = log_format == "json" - # USE_COLORS can be None if the app just started and the cli flags - # havent been applied yet - format_color = True if use_colors else False # TODO this default should live somewhere better log_dest = os.path.join(log_path, "dbt.log") level = logging.DEBUG if debug else logging.INFO @@ -56,6 +43,8 @@ def setup_event_logger(log_path, log_format, use_colors, debug): # overwrite the STDOUT_LOG logger with the configured one STDOUT_LOG = logging.getLogger("configured_std_out") STDOUT_LOG.setLevel(level) + STDOUT_LOG.format_json = log_format == "json" + STDOUT_LOG.format_color = True if use_colors else False FORMAT = "%(message)s" stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT) @@ -74,6 +63,8 @@ def setup_event_logger(log_path, log_format, use_colors, debug): # overwrite the FILE_LOG logger with the configured one FILE_LOG = logging.getLogger("configured_file") FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input + FILE_LOG.format_json = log_format == "json" + FILE_LOG.format_color = True if use_colors else False file_passthrough_formatter = logging.Formatter(fmt=FORMAT) @@ -90,7 +81,7 @@ def setup_event_logger(log_path, log_format, use_colors, debug): def capture_stdout_logs() -> StringIO: capture_buf = io.StringIO() stdout_capture_handler = logging.StreamHandler(capture_buf) - stdout_handler.setLevel(logging.DEBUG) + stdout_capture_handler.setLevel(logging.DEBUG) STDOUT_LOG.addHandler(stdout_capture_handler) return capture_buf @@ -124,7 +115,6 @@ def event_to_serializable_dict( ) -> Dict[str, Any]: log_line = dict() - code: str try: log_line = e.to_dict() except AttributeError as exc: @@ -156,7 +146,7 @@ def event_to_serializable_dict( # translates an Event to a completely formatted text-based log line # type hinting everything as strings so we don't get any unintentional string conversions via str() def reset_color() -> str: - return "" if not format_color else Style.RESET_ALL + return Style.RESET_ALL if getattr(STDOUT_LOG, "format_color", False) else "" def create_info_text_log_line(e: T_Event) -> str: @@ -199,7 +189,26 @@ def create_json_log_line(e: T_Event) -> Optional[str]: # calls create_stdout_text_log_line() or create_json_log_line() according to logger config def create_log_line(e: T_Event, file_output=False) -> Optional[str]: - if format_json: + global FILE_LOG + global STDOUT_LOG + if FILE_LOG is None and STDOUT_LOG is None: + + # TODO: This is only necessary because our test framework doesn't correctly set up logging. + # This code should be moved to the test framework when we do CT-XXX (tix # needed) + null_handler = logging.NullHandler() + FILE_LOG.addHandler(null_handler) + FILE_LOG.format_json = False + FILE_LOG.format_color = False + + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.INFO) + STDOUT_LOG.setLevel(logging.INFO) + STDOUT_LOG.addHandler(stdout_handler) + STDOUT_LOG.format_json = False + STDOUT_LOG.format_color = False + + logger = FILE_LOG if file_output else STDOUT_LOG + if getattr(logger, "format_json"): return create_json_log_line(e) # json output, both console and file elif file_output is True or flags.DEBUG: return create_debug_text_log_line(e) # default file output From 8ca631de7c3870f5654e9e6f5ee4eabbb08b0ac3 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 17 Oct 2022 13:43:11 -0500 Subject: [PATCH 07/14] fix tests --- core/dbt/events/functions.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index eeb425e0d31..6b5ece58e06 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -24,8 +24,11 @@ LOG_VERSION = 2 EVENT_HISTORY = None -FILE_LOG = logging.getLogger("default_file") -STDOUT_LOG = logging.getLogger("default_std_out") +DEFAULT_FILE_LOGGER_NAME = "default_file" +FILE_LOG = logging.getLogger(DEFAULT_FILE_LOGGER_NAME) + +DEFAULT_STDOUT_LOGGER_NAME = "default_std_out" +STDOUT_LOG = logging.getLogger(DEFAULT_STDOUT_LOGGER_NAME) invocation_id: Optional[str] = None @@ -43,8 +46,8 @@ def setup_event_logger(log_path, log_format, use_colors, debug): # overwrite the STDOUT_LOG logger with the configured one STDOUT_LOG = logging.getLogger("configured_std_out") STDOUT_LOG.setLevel(level) - STDOUT_LOG.format_json = log_format == "json" - STDOUT_LOG.format_color = True if use_colors else False + setattr(STDOUT_LOG, "format_json", log_format == "json") + setattr(STDOUT_LOG, "format_color", True if use_colors else False) FORMAT = "%(message)s" stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT) @@ -63,8 +66,8 @@ def setup_event_logger(log_path, log_format, use_colors, debug): # overwrite the FILE_LOG logger with the configured one FILE_LOG = logging.getLogger("configured_file") FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input - FILE_LOG.format_json = log_format == "json" - FILE_LOG.format_color = True if use_colors else False + setattr(FILE_LOG, "format_json", log_format == "json") + setattr(FILE_LOG, "format_color", True if use_colors else False) file_passthrough_formatter = logging.Formatter(fmt=FORMAT) @@ -191,21 +194,22 @@ def create_json_log_line(e: T_Event) -> Optional[str]: def create_log_line(e: T_Event, file_output=False) -> Optional[str]: global FILE_LOG global STDOUT_LOG - if FILE_LOG is None and STDOUT_LOG is None: + + if FILE_LOG.name == DEFAULT_FILE_LOGGER_NAME and STDOUT_LOG.name == DEFAULT_STDOUT_LOGGER_NAME: # TODO: This is only necessary because our test framework doesn't correctly set up logging. # This code should be moved to the test framework when we do CT-XXX (tix # needed) null_handler = logging.NullHandler() FILE_LOG.addHandler(null_handler) - FILE_LOG.format_json = False - FILE_LOG.format_color = False + setattr(FILE_LOG, "format_json", False) + setattr(FILE_LOG, "format_color", False) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.INFO) STDOUT_LOG.setLevel(logging.INFO) STDOUT_LOG.addHandler(stdout_handler) - STDOUT_LOG.format_json = False - STDOUT_LOG.format_color = False + setattr(STDOUT_LOG, "format_json", False) + setattr(STDOUT_LOG, "format_color", False) logger = FILE_LOG if file_output else STDOUT_LOG if getattr(logger, "format_json"): From 0ce4188a293c8912b4236f9f7452a67649e7373c Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 17 Oct 2022 17:05:18 -0500 Subject: [PATCH 08/14] Changelog --- .changes/unreleased/Under the Hood-20221017-170500.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changes/unreleased/Under the Hood-20221017-170500.yaml diff --git a/.changes/unreleased/Under the Hood-20221017-170500.yaml b/.changes/unreleased/Under the Hood-20221017-170500.yaml new file mode 100644 index 00000000000..8f5e20cd8ff --- /dev/null +++ b/.changes/unreleased/Under the Hood-20221017-170500.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Click CLI supports logging +time: 2022-10-17T17:05:00.478948-05:00 +custom: + Author: iknox-fa + Issue: "5530" + PR: "6088" From 97ee04ece49aa2e5de505eb5dd54b4667fd12c19 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 17 Oct 2022 17:53:42 -0500 Subject: [PATCH 09/14] test fixes pt 2 --- core/dbt/main.py | 4 ++-- core/dbt/tests/fixtures/project.py | 2 +- test/integration/base.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/dbt/main.py b/core/dbt/main.py index 88196fd98ea..f2d3668f57a 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -230,8 +230,8 @@ def run_from_args(parsed): log_path = getattr(task.config, "log_path", None) log_manager.set_path(log_path) # if 'list' task: set stdout to WARN instead of INFO - level_override = parsed.cls.pre_init_hook(parsed) - setup_event_logger(log_path or "logs", level_override) + # level_override = parsed.cls.pre_init_hook(parsed) + setup_event_logger(log_path or "logs", "json", False, True) fire_event(MainReportVersion(version=str(dbt.version.installed), log_version=LOG_VERSION)) fire_event(MainReportArgs(args=args_to_dict(parsed))) diff --git a/core/dbt/tests/fixtures/project.py b/core/dbt/tests/fixtures/project.py index 5da885edf9b..fe97176cfb6 100644 --- a/core/dbt/tests/fixtures/project.py +++ b/core/dbt/tests/fixtures/project.py @@ -456,7 +456,7 @@ def project( # Logbook warnings are ignored so we don't have to fork logbook to support python 3.10. # This _only_ works for tests in `tests/` that use the project fixture. warnings.filterwarnings("ignore", category=DeprecationWarning, module="logbook") - setup_event_logger(logs_dir) + setup_event_logger(logs_dir, "json", False, False) orig_cwd = os.getcwd() os.chdir(project_root) # Return whatever is needed later in tests but can only come from fixtures, so we can keep diff --git a/test/integration/base.py b/test/integration/base.py index b2e55159d6b..9726ff7d482 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -313,7 +313,7 @@ def setUp(self): os.chdir(self.initial_dir) # before we go anywhere, collect the initial path info self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix) - setup_event_logger(self._logs_dir) + setup_event_logger(self._logs_dir, None, False, True) _really_makedirs(self._logs_dir) self.test_original_source_path = _pytest_get_test_root() self.test_root_dir = self._generate_test_root_dir() From 5d68e9b0336545f34fe9e5e58f8036008433f064 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 18 Oct 2022 09:16:29 -0500 Subject: [PATCH 10/14] removed old temp file --- core/dbt/events/functions_old.py | 344 ------------------------------- 1 file changed, 344 deletions(-) delete mode 100644 core/dbt/events/functions_old.py diff --git a/core/dbt/events/functions_old.py b/core/dbt/events/functions_old.py deleted file mode 100644 index c98c7176117..00000000000 --- a/core/dbt/events/functions_old.py +++ /dev/null @@ -1,344 +0,0 @@ -from colorama import Style -import dbt.events.functions as this # don't worry I hate it too. -from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache -from dbt.events.types import T_Event, MainReportVersion, EmptyLine, EventBufferFull -import dbt.flags as flags -from dbt.constants import SECRET_ENV_PREFIX - -# TODO this will need to move eventually -from dbt.logger import make_log_dir_if_missing, GLOBAL_LOGGER -from datetime import datetime -import json -import io -from io import StringIO, TextIOWrapper -import logbook -import logging -from logging import Logger -import sys -from logging.handlers import RotatingFileHandler -import os -import uuid -import threading -from typing import Any, Dict, List, Optional, Union -from collections import deque - -LOG_VERSION = 2 -EVENT_HISTORY = None - -# create the global file logger with no configuration -FILE_LOG = logging.getLogger("default_file") -null_handler = logging.NullHandler() -FILE_LOG.addHandler(null_handler) - -# set up logger to go to stdout with defaults -# setup_event_logger will be called once args have been parsed -STDOUT_LOG = logging.getLogger("default_stdout") -STDOUT_LOG.setLevel(logging.INFO) -stdout_handler = logging.StreamHandler(sys.stdout) -stdout_handler.setLevel(logging.INFO) -STDOUT_LOG.addHandler(stdout_handler) - -format_color = True -format_json = False -invocation_id: Optional[str] = None - - -def setup_event_logger(log_path, level_override=None): - make_log_dir_if_missing(log_path) - - this.format_json = flags.LOG_FORMAT == "json" - # USE_COLORS can be None if the app just started and the cli flags - # havent been applied yet - this.format_color = True if flags.USE_COLORS else False - # TODO this default should live somewhere better - log_dest = os.path.join(log_path, "dbt.log") - level = level_override or (logging.DEBUG if flags.DEBUG else logging.INFO) - - # overwrite the STDOUT_LOG logger with the configured one - this.STDOUT_LOG = logging.getLogger("configured_std_out") - this.STDOUT_LOG.setLevel(level) - - FORMAT = "%(message)s" - stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT) - - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setFormatter(stdout_passthrough_formatter) - stdout_handler.setLevel(level) - # clear existing stdout TextIOWrapper stream handlers - this.STDOUT_LOG.handlers = [ - h - for h in this.STDOUT_LOG.handlers - if not (hasattr(h, "stream") and isinstance(h.stream, TextIOWrapper)) # type: ignore - ] - this.STDOUT_LOG.addHandler(stdout_handler) - - # overwrite the FILE_LOG logger with the configured one - this.FILE_LOG = logging.getLogger("configured_file") - this.FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input - - file_passthrough_formatter = logging.Formatter(fmt=FORMAT) - - file_handler = RotatingFileHandler( - filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb - ) - file_handler.setFormatter(file_passthrough_formatter) - file_handler.setLevel(logging.DEBUG) # always debug regardless of user input - this.FILE_LOG.handlers.clear() - this.FILE_LOG.addHandler(file_handler) - - -# used for integration tests -def capture_stdout_logs() -> StringIO: - capture_buf = io.StringIO() - stdout_capture_handler = logging.StreamHandler(capture_buf) - stdout_handler.setLevel(logging.DEBUG) - this.STDOUT_LOG.addHandler(stdout_capture_handler) - return capture_buf - - -# used for integration tests -def stop_capture_stdout_logs() -> None: - this.STDOUT_LOG.handlers = [ - h - for h in this.STDOUT_LOG.handlers - if not (hasattr(h, "stream") and isinstance(h.stream, StringIO)) # type: ignore - ] - - -def env_secrets() -> List[str]: - return [v for k, v in os.environ.items() if k.startswith(SECRET_ENV_PREFIX) and v.strip()] - - -def scrub_secrets(msg: str, secrets: List[str]) -> str: - scrubbed = msg - - for secret in secrets: - scrubbed = scrubbed.replace(secret, "*****") - - return scrubbed - - -# returns a dictionary representation of the event fields. -# the message may contain secrets which must be scrubbed at the usage site. -def event_to_serializable_dict( - e: T_Event, -) -> Dict[str, Any]: - - log_line = dict() - code: str - try: - log_line = e.to_dict() - except AttributeError as exc: - event_type = type(e).__name__ - raise Exception( # TODO this may hang async threads - f"type {event_type} is not serializable. {str(exc)}" - ) - - # We get the code from the event object, so we don't need it in the data - if "code" in log_line: - del log_line["code"] - - event_dict = { - "type": "log_line", - "log_version": LOG_VERSION, - "ts": get_ts_rfc3339(), - "pid": e.get_pid(), - "msg": e.message(), - "level": e.level_tag(), - "data": log_line, - "invocation_id": e.get_invocation_id(), - "thread_name": e.get_thread_name(), - "code": e.code, - } - - return event_dict - - -# translates an Event to a completely formatted text-based log line -# type hinting everything as strings so we don't get any unintentional string conversions via str() -def reset_color() -> str: - return "" if not this.format_color else Style.RESET_ALL - - -def create_info_text_log_line(e: T_Event) -> str: - color_tag: str = reset_color() - ts: str = get_ts().strftime("%H:%M:%S") - scrubbed_msg: str = scrub_secrets(e.message(), env_secrets()) - log_line: str = f"{color_tag}{ts} {scrubbed_msg}" - return log_line - - -def create_debug_text_log_line(e: T_Event) -> str: - log_line: str = "" - # Create a separator if this is the beginning of an invocation - if type(e) == MainReportVersion: - separator = 30 * "=" - log_line = f"\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n" - color_tag: str = reset_color() - ts: str = get_ts().strftime("%H:%M:%S.%f") - scrubbed_msg: str = scrub_secrets(e.message(), env_secrets()) - level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} " - thread = "" - if threading.current_thread().name: - thread_name = threading.current_thread().name - thread_name = thread_name[:10] - thread_name = thread_name.ljust(10, " ") - thread = f" [{thread_name}]:" - log_line = log_line + f"{color_tag}{ts} [{level}]{thread} {scrubbed_msg}" - return log_line - - -# translates an Event to a completely formatted json log line -def create_json_log_line(e: T_Event) -> Optional[str]: - if type(e) == EmptyLine: - return None # will not be sent to logger - # using preformatted ts string instead of formatting it here to be extra careful about timezone - values = event_to_serializable_dict(e) - raw_log_line = json.dumps(values, sort_keys=True) - return scrub_secrets(raw_log_line, env_secrets()) - - -# calls create_stdout_text_log_line() or create_json_log_line() according to logger config -def create_log_line(e: T_Event, file_output=False) -> Optional[str]: - if this.format_json: - return create_json_log_line(e) # json output, both console and file - elif file_output is True or flags.DEBUG: - return create_debug_text_log_line(e) # default file output - else: - return create_info_text_log_line(e) # console output - - -# allows for reuse of this obnoxious if else tree. -# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra -def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str): - if not log_line: - return - if level_tag == "test": - # TODO after implmenting #3977 send to new test level - l.debug(log_line) - elif level_tag == "debug": - l.debug(log_line) - elif level_tag == "info": - l.info(log_line) - elif level_tag == "warn": - l.warning(log_line) - elif level_tag == "error": - l.error(log_line) - else: - raise AssertionError( - f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}" - ) - - -def send_exc_to_logger( - l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False -): - if level_tag == "test": - # TODO after implmenting #3977 send to new test level - l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) - elif level_tag == "debug": - l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) - elif level_tag == "info": - l.info(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) - elif level_tag == "warn": - l.warning(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) - elif level_tag == "error": - l.error(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra) - else: - raise AssertionError( - f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}" - ) - - -# top-level method for accessing the new eventing system -# this is where all the side effects happen branched by event type -# (i.e. - mutating the event history, printing to stdout, logging -# to files, etc.) -def fire_event(e: Event) -> None: - # skip logs when `--log-cache-events` is not passed - if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS: - return - - add_to_event_history(e) - - # backwards compatibility for plugins that require old logger (dbt-rpc) - if flags.ENABLE_LEGACY_LOGGER: - # using Event::message because the legacy logger didn't differentiate messages by - # destination - log_line = create_log_line(e) - if log_line: - send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line) - return # exit the function to avoid using the current logger as well - - # always logs debug level regardless of user input - if not isinstance(e, NoFile): - log_line = create_log_line(e, file_output=True) - # doesn't send exceptions to exception logger - if log_line: - send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line) - - if not isinstance(e, NoStdOut): - # explicitly checking the debug flag here so that potentially expensive-to-construct - # log messages are not constructed if debug messages are never shown. - if e.level_tag() == "debug" and not flags.DEBUG: - return # eat the message in case it was one of the expensive ones - if e.level_tag() != "error" and flags.QUIET: - return # eat all non-exception messages in quiet mode - - log_line = create_log_line(e) - if log_line: - if not isinstance(e, ShowException): - send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line) - else: - send_exc_to_logger( - STDOUT_LOG, - level_tag=e.level_tag(), - log_line=log_line, - exc_info=e.exc_info, - stack_info=e.stack_info, - extra=e.extra, - ) - - -def get_invocation_id() -> str: - global invocation_id - if invocation_id is None: - invocation_id = str(uuid.uuid4()) - return invocation_id - - -def set_invocation_id() -> None: - # This is primarily for setting the invocation_id for separate - # commands in the dbt servers. It shouldn't be necessary for the CLI. - global invocation_id - invocation_id = str(uuid.uuid4()) - - -# exactly one time stamp per concrete event -def get_ts() -> datetime: - ts = datetime.utcnow() - return ts - - -# preformatted time stamp -def get_ts_rfc3339() -> str: - ts = get_ts() - ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - return ts_rfc3339 - - -def add_to_event_history(event): - if flags.EVENT_BUFFER_SIZE == 0: - return - global EVENT_HISTORY - if EVENT_HISTORY is None: - reset_event_history() - EVENT_HISTORY.append(event) - # We only set the EventBufferFull message for event buffers >= 10,000 - if flags.EVENT_BUFFER_SIZE >= 10000 and len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1): - fire_event(EventBufferFull()) - - -def reset_event_history(): - global EVENT_HISTORY - EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) From 5d4cda905a326aa308d843966cbd4f8a224720f0 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 18 Oct 2022 11:23:26 -0500 Subject: [PATCH 11/14] Fixed issue where I was assuming POSIX env --- core/dbt/clients/system.py | 43 ++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/core/dbt/clients/system.py b/core/dbt/clients/system.py index 8350fe2b5c9..326c775d325 100644 --- a/core/dbt/clients/system.py +++ b/core/dbt/clients/system.py @@ -10,7 +10,7 @@ import subprocess import sys import tarfile -from pathlib import Path, PosixPath, WindowsPath +from pathlib import Path from typing import Any, Callable, Dict, List, NoReturn, Optional, Tuple, Type, Union import dbt.exceptions @@ -24,6 +24,7 @@ SystemStdErrMsg, SystemStdOutMsg, ) +from dbt.exceptions import InternalException from dbt.utils import _connection_exception_retry as connection_exception_retry from pathspec import PathSpec # type: ignore @@ -45,7 +46,7 @@ def find_matching( absolute root path (`relative_paths_to_search`), and a `file_pattern` like '*.sql', returns information about the files. For example: - > find_matching('/root/path', ['models'], '*.sql') + > find_matching('/root/path', ['models'], '*.sql')a [ { 'absolute_path': '/root/path/models/model_one.sql', 'relative_path': 'model_one.sql', @@ -107,28 +108,34 @@ def load_file_contents(path: str, strip: bool = True) -> str: return to_return -def make_directory(path: Union[str, Path]) -> None: +@functools.singledispatch +def make_directory(path=None) -> None: """ Make a directory and any intermediate directories that don't already exist. This function handles the case where two threads try to create a directory at once. """ + raise InternalException(f"Can not create directory from {type(path)} ") - if type(path) is str: - path = convert_path(path) - if not os.path.exists(path): - # concurrent writes that try to create the same dir can fail - try: - os.makedirs(path) - - except OSError as e: - if e.errno == errno.EEXIST: - pass - else: - raise e - elif type(path) in (PosixPath, WindowsPath): - assert type(path) is PosixPath - path.mkdir(parents=True, exist_ok=True) + +@make_directory.register +def _(path: str) -> None: + path = convert_path(path) + if not os.path.exists(path): + # concurrent writes that try to create the same dir can fail + try: + os.makedirs(path) + + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + raise e + + +@make_directory.register +def _(path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) def make_file(path: str, contents: str = "", overwrite: bool = False) -> bool: From 6d9f9b22db9b4effd58d82595caaf4757283f7a4 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 18 Oct 2022 12:05:32 -0500 Subject: [PATCH 12/14] added mkdir test --- test/unit/test_system_client.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/unit/test_system_client.py b/test/unit/test_system_client.py index 9bf239c0650..63316b26852 100644 --- a/test/unit/test_system_client.py +++ b/test/unit/test_system_client.py @@ -53,6 +53,17 @@ def test__make_file_with_overwrite(self): self.assertTrue(written) self.assertEqual(self.get_profile_text(), 'NEW_TEXT') + def test__make_dir_from_str(self): + test_dir_str = self.tmp_dir + "/test_make_from_str/sub_dir" + dbt.clients.system.make_directory(test_dir_str) + self.assertTrue(Path(test_dir_str).is_dir()) + + def test__make_dir_from_pathobj(self): + test_dir_pathobj = Path(self.tmp_dir + "/test_make_from_pathobj/sub_dir") + dbt.clients.system.make_directory(test_dir_pathobj) + self.assertTrue(test_dir_pathobj.is_dir()) + + class TestRunCmd(unittest.TestCase): """Test `run_cmd`. From 6e53083b4bbccbd540024d1df4061afd9e34781e Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 25 Oct 2022 10:55:27 -0500 Subject: [PATCH 13/14] PR feedback --- core/dbt/clients/system.py | 2 +- core/dbt/main.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/dbt/clients/system.py b/core/dbt/clients/system.py index 326c775d325..d1b1c461f50 100644 --- a/core/dbt/clients/system.py +++ b/core/dbt/clients/system.py @@ -46,7 +46,7 @@ def find_matching( absolute root path (`relative_paths_to_search`), and a `file_pattern` like '*.sql', returns information about the files. For example: - > find_matching('/root/path', ['models'], '*.sql')a + > find_matching('/root/path', ['models'], '*.sql') [ { 'absolute_path': '/root/path/models/model_one.sql', 'relative_path': 'model_one.sql', diff --git a/core/dbt/main.py b/core/dbt/main.py index f2d3668f57a..4bb596cad68 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -229,8 +229,6 @@ def run_from_args(parsed): if task.config is not None: log_path = getattr(task.config, "log_path", None) log_manager.set_path(log_path) - # if 'list' task: set stdout to WARN instead of INFO - # level_override = parsed.cls.pre_init_hook(parsed) setup_event_logger(log_path or "logs", "json", False, True) fire_event(MainReportVersion(version=str(dbt.version.installed), log_version=LOG_VERSION)) From 4c7ab600bc4bf6b81e36f14e8bfb2ab4056739fc Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 25 Oct 2022 13:41:19 -0500 Subject: [PATCH 14/14] empty commit, re-trigger CI