Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Template operator params #69

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions airflow_dbt/dbt_command_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Python versions older than 3.8 have the TypedDict in a different namespace.
# In case we find ourselves in that situation, we use the `older` import
try:
from typing import TypedDict
except ImportError:
from typing_extensions import TypedDict


class DbtGlobalParamsConfig(TypedDict, total=False):
"""
Holds the structure of a dictionary containing dbt config. Provides the
types and names for each one, and also helps shortening the constructor
since we can nest it and reuse it
"""
record_timing_info: bool
debug: bool
log_format: str # either 'text', 'json' or 'default'
write_json: bool
warn_error: bool
partial_parse: bool
use_experimental_parser: bool
use_colors: bool
verbose: bool
no_use_colors: bool


class DbtCommandParamsConfig(TypedDict, total=False):
"""
Holds the structure of a dictionary containing dbt config. Provides the
types and names for each one, and also helps shortening the constructor
since we can nest it and reuse it
"""
profiles_dir: str
project_dir: str
target: str
vars: dict
models: str
exclude: str

# run specific
full_refresh: bool
profile: str

# docs specific
no_compile: bool

# debug specific
config_dir: str

# ls specific
resource_type: str # models, snapshots, seeds, tests, and sources.
select: str
models: str
exclude: str
selector: str
output: str
output_keys: str

# rpc specific
host: str
port: int

# run specific
fail_fast: bool

# run-operation specific
args: dict

# test specific
data: bool
schema: bool
211 changes: 102 additions & 109 deletions airflow_dbt/hooks/dbt_hook.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,131 @@
from __future__ import print_function

import json
import os
import signal
import subprocess
import json
from typing import Any, Dict, List, Union

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.base import BaseHook

from airflow_dbt.dbt_command_params import DbtCommandParamsConfig


def render_config(config: Dict[str, Union[str, bool]]) -> List[str]:
"""Renders a dictionary of options into a list of cli strings"""
dbt_command_config_annotations = DbtCommandParamsConfig.__annotations__
command_params = []
for key, value in config.items():
if key not in dbt_command_config_annotations:
raise ValueError(f"{key} is not a valid key")
if value is not None:
param_value_type = type(value)
# check that the value has the correct type from dbt_command_config_annotations
if param_value_type != dbt_command_config_annotations[key]:
raise TypeError(f"{key} has to be of type {dbt_command_config_annotations[key]}")
# if the param is not bool it must have a non-null value
flag_prefix = ''
if param_value_type is bool and not value:
flag_prefix = 'no-'
cli_param_from_kwarg = "--" + flag_prefix + key.replace("_", "-")
command_params.append(cli_param_from_kwarg)
if param_value_type is str:
command_params.append(value)
elif param_value_type is int:
command_params.append(str(value))
elif param_value_type is dict:
command_params.append(json.dumps(value))
return command_params


def generate_dbt_cli_command(
dbt_bin: str,
command: str,
global_config: Dict[str, Union[str, bool]],
command_config: Dict[str, Union[str, bool]],
) -> List[str]:
"""
Creates a CLI string from the keys in the dictionary. If the key is none
it is ignored. If the key is of type boolean the name of the key is added.
If the key is of type string it adds the the key prefixed with tow dashes.
If the key is of type integer it adds the the key prefixed with three
dashes.
dbt_bin and command are mandatory.
Boolean flags must always be positive.
Available params are:
:param command_config: Specific params for the commands
:type command_config: dict
:param global_config: Params that apply to the `dbt` program regardless of
the command it is running
:type global_config: dict
:param command: The dbt sub-command to run
:type command: str
:param dbt_bin: Path to the dbt binary, defaults to `dbt` assumes it is
available in the PATH.
:type dbt_bin: str
:param command: The dbt sub command to run, for example for `dbt run`
the base_command will be `run`. If any other flag not contemplated
must be included it can also be added to this string
:type command: str
"""
if not dbt_bin:
raise ValueError("dbt_bin is mandatory")
if not command:
raise ValueError("command mandatory")
base_params = render_config(global_config)
command_params = render_config(command_config)
# commands like 'dbt docs generate' need the command to be split in two
command_pieces = command.split(" ")
return [dbt_bin, *base_params, *command_pieces, *command_params]


class DbtCliHook(BaseHook):
"""
Simple wrapper around the dbt CLI.

:param env: If set, passes the env variables to the subprocess handler
:param env: Environment variables to pass to the dbt process
:type env: dict
:param profiles_dir: If set, passed as the `--profiles-dir` argument to the `dbt` command
:type profiles_dir: str
:param target: If set, passed as the `--target` argument to the `dbt` command
:type dir: str
:param dir: The directory to run the CLI in
:type vars: str
:param vars: If set, passed as the `--vars` argument to the `dbt` command
:type vars: dict
:param full_refresh: If `True`, will fully-refresh incremental models.
:type full_refresh: bool
:param models: If set, passed as the `--models` argument to the `dbt` command
:type models: str
:param warn_error: If `True`, treat warnings as errors.
:type warn_error: bool
:param exclude: If set, passed as the `--exclude` argument to the `dbt` command
:type exclude: str
:param select: If set, passed as the `--select` argument to the `dbt` command
:type select: str
:param selector: If set, passed as the `--selector` argument to the `dbt` command
:type selector: str
:param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`
:param dbt_bin: Path to the dbt binary
:type dbt_bin: str
:param output_encoding: Output encoding of bash command. Defaults to utf-8
:param global_flags: Global flags to pass to the dbt process
:type global_flags: dict
:param command_flags: Command flags to pass to the dbt process
:type command_flags: dict
:param command: The dbt command to run
:type command: str
:param output_encoding: The encoding of the output
:type output_encoding: str
:param verbose: The operator will log verbosely to the Airflow logs
:type verbose: bool
"""

def __init__(self,
env=None,
profiles_dir=None,
target=None,
dir='.',
vars=None,
full_refresh=False,
data=False,
schema=False,
models=None,
exclude=None,
select=None,
selector=None,
dbt_bin='dbt',
output_encoding='utf-8',
verbose=True,
warn_error=False):
def __init__(
self,
env: dict = None,
output_encoding: str = 'utf-8',
):
super().__init__()
self.env = env or {}
self.profiles_dir = profiles_dir
self.dir = dir
self.target = target
self.vars = vars
self.full_refresh = full_refresh
self.data = data
self.schema = schema
self.models = models
self.exclude = exclude
self.select = select
self.selector = selector
self.dbt_bin = dbt_bin
self.verbose = verbose
self.warn_error = warn_error
self.output_encoding = output_encoding
self.sp = None # declare the terminal to be user later on

def _dump_vars(self):
# The dbt `vars` parameter is defined using YAML. Unfortunately the standard YAML library
# for Python isn't very good and I couldn't find an easy way to have it formatted
# correctly. However, as YAML is a super-set of JSON, this works just fine.
return json.dumps(self.vars)
def get_conn(self) -> Any:
"""Implements the get_conn method of the BaseHook class"""
pass

def run_cli(self, *command):
def run_cli(self, dbt_cmd: List[str]):
"""
Run the dbt cli
Run the rendered dbt command

:param command: The dbt command to run
:type command: str
:param dbt_cmd: The dbt command to run
:type dbt_cmd: list
"""

dbt_cmd = [self.dbt_bin, *command]

if self.profiles_dir is not None:
dbt_cmd.extend(['--profiles-dir', self.profiles_dir])

if self.target is not None:
dbt_cmd.extend(['--target', self.target])

if self.vars is not None:
dbt_cmd.extend(['--vars', self._dump_vars()])

if self.data:
dbt_cmd.extend(['--data'])

if self.schema:
dbt_cmd.extend(['--schema'])

if self.models is not None:
dbt_cmd.extend(['--models', self.models])

if self.exclude is not None:
dbt_cmd.extend(['--exclude', self.exclude])

if self.select is not None:
dbt_cmd.extend(['--select', self.select])

if self.selector is not None:
dbt_cmd.extend(['--selector', self.selector])

if self.full_refresh:
dbt_cmd.extend(['--full-refresh'])

if self.warn_error:
dbt_cmd.insert(1, '--warn-error')

if self.verbose:
self.log.info(" ".join(dbt_cmd))

sp = subprocess.Popen(
dbt_cmd,
args=dbt_cmd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self.dir,
close_fds=True)
close_fds=True
)
self.sp = sp
self.log.info("Output:")
line = ''
Expand All @@ -150,5 +142,6 @@ def run_cli(self, *command):
raise AirflowException("dbt command failed")

def on_kill(self):
"""Called when the task is killed by Airflow. This will kill the dbt process and wait for it to exit"""
self.log.info('Sending SIGTERM signal to dbt command')
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
Loading