Skip to content

Latest commit

 

History

History
93 lines (67 loc) · 4.54 KB

EXAMPLES.md

File metadata and controls

93 lines (67 loc) · 4.54 KB

Examples

Using dbt with User-Defined Adapters and Jinja Methods

To add custom methods to an existing adapter and expose them to Jinja templates, follow these steps:

Step-1: Extend the Adapter Create a new adapter class that inherits from the desired base adapter. Add the necessary methods to this class.

class DuckDBAdapterV2Custom(DuckDBAdapter):
@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
model_unique_id = parsed_model.get('unique_id')
__py_code = f"""
{compiled_code}
# NOTE this is local python execution so session is None
model(dbt=dbtObj(None), session=None)
"""
with tempfile.NamedTemporaryFile(suffix=f'__{model_unique_id}.py', delete=False) as fp:
fp.write(__py_code.encode('utf-8'))
fp.close()
print(f"Created temp py file {fp.name}")
Utils.runcommand(command=['python', fp.name])

Step-2: In your dbt_project.yml file, set the dbt_custom_adapter variable to the fully qualified name of your custom adapter class. This will enable opendbt to recognize activate your adapter.

vars:
  dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom

Step-3: Execute dbt commands as usual. dbt will now load and utilize your custom adapter class, allowing you to access the newly defined methods within your Jinja macros.

from opendbt import OpenDbtProject

dp = OpenDbtProject(project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir")
dp.run(command="run")

Executing Python Models Locally with dbt

By leveraging a customized adapter and a custom materialization, dbt can be extended to execute Python code locally. This powerful capability is particularly useful for scenarios involving data ingestion from external APIs, enabling seamless integration within the dbt framework.

Step-1: We'll extend an existing adapter (like DuckDBAdapter) to add a new method, submit_local_python_job. This method will execute the provided Python code as a subprocess.

class DuckDBAdapterV2Custom(DuckDBAdapter):
@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
model_unique_id = parsed_model.get('unique_id')
__py_code = f"""
{compiled_code}
# NOTE this is local python execution so session is None
model(dbt=dbtObj(None), session=None)
"""
with tempfile.NamedTemporaryFile(suffix=f'__{model_unique_id}.py', delete=False) as fp:
fp.write(__py_code.encode('utf-8'))
fp.close()
print(f"Created temp py file {fp.name}")
Utils.runcommand(command=['python', fp.name])

Step-2: Create a new materialization named executepython. This materialization will call the newly added submit_local_python_job method from the custom adapter to execute the compiled Python code.

{% materialization executepython, supported_languages=['python']%}
{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}
{% set grant_config = config.get('grants') %}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}
{% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %}
{%- set res = adapter.submit_local_python_job(model, compiled_code) -%}
{% endcall %}
{{ run_hooks(post_hooks) }}
{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

Step-3: Let's create a sample Python model that will be executed locally by dbt using the executepython materialization.

import os
import platform
from dbt import version
def print_info():
_str = f"name:{os.name}, system:{platform.system()} release:{platform.release()}"
_str += f"\npython version:{platform.python_version()}, dbt:{version.__version__}"
print(_str)
def model(dbt, session):
dbt.config(materialized="executepython")
print("==================================================")
print("========IM LOCALLY EXECUTED PYTHON MODEL==========")
print("==================================================")
print_info()
print("==================================================")
print("===============MAKE DBT GREAT AGAIN===============")
print("==================================================")
return None

Orchestrating dbt Models with Airflow

Step-1: Let's create an Airflow DAG to orchestrate the execution of your dbt project.

with DAG(
dag_id='dbt_workflow',
default_args=default_args,
description='DAG To run dbt',
schedule_interval=None,
start_date=days_ago(3),
catchup=False,
max_active_runs=1
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
DBTTEST_DIR = Path("/opt/dbttest")
p = OpenDbtAirflowProject(project_dir=DBTTEST_DIR, profiles_dir=DBTTEST_DIR, target='dev')
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)

airflow-dbt-flow.png

Creating Airflow DAG that selectively executes a specific subset of models from your dbt project.

from opendbt.airflow import OpenDbtAirflowProject

# create dbt build tasks for models with given tag
p = OpenDbtAirflowProject(resource_type='model', project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir",
                          target='dev', tag="MY_TAG")
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)

Creating dag to run dbt tests

from opendbt.airflow import OpenDbtAirflowProject

# create dbt test tasks with given model tag
p = OpenDbtAirflowProject(resource_type='test', project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir",
                          target='dev', tag="MY_TAG")
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)

Integrating dbt Documentation into Airflow

Airflow, a powerful workflow orchestration tool, can be leveraged to streamline not only dbt execution but also dbt documentation access. By integrating dbt documentation into your Airflow interface, you can centralize your data engineering resources and improve team collaboration.

here is how: Step-1: Create python file. Navigate to your Airflow's {airflow}/plugins directory. Create a new Python file and name it appropriately, such as dbt_docs_plugin.py. Add following code to dbt_docs_plugin.py file. Ensure that the specified path accurately points to the folder where your dbt project generates its documentation.

from pathlib import Path
from opendbt.airflow import dbtdocs
# create public page on airflow server to serve DBT docs
airflow_dbtdocs_page = dbtdocs.init_plugins_dbtdocs_page(Path("/opt/dbttest/target"))

Step-2: Restart Airflow to activate the plugin. Once the restart is complete, you should see a new link labeled DBT Docs within your Airflow web interface. This link will provide access to your dbt documentation. airflow-dbt-docs-link.png

Step-3: Click on the DBT Docs link to open your dbt documentation. airflow-dbt-docs-page.png