Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dbt to generate the full SQL and support different materializations for BQ for ExecutionMode.AIRFLOW_ASYNC #1474

Open
wants to merge 38 commits into
base: main
Choose a base branch
from

Conversation

pankajkoti
Copy link
Contributor

@pankajkoti pankajkoti commented Jan 21, 2025

Overview

This PR enhances ExecutionMode.AIRFLOW_ASYNC by modifying how queries are generated and executed in Cosmos. Instead of relying on custom SQL headers and that only previously supported for --full-refresh mode, we now allow dbt to generate the full SQL dynamically, supporting different materializations in BigQuery. This enables better compatibility with various dbt features while ensuring flexibility in how models are run.

Key Changes

  1. Mocked BigQuery Adapter Execution:
    • Introduced _mock_bigquery_adapter() to override BigQueryConnectionManager.execute, ensuring SQL is only written to the target directory and skipping execution in the warehouse.
    • The generated SQL is then submitted using Airflow’s BigQueryInsertJobOperator in deferrable mode.
  2. Refactoring AbstractDbtBaseOperator:
    • Previously, AbstractDbtBaseOperator inherited BaseOperator, causing conflicts when used with BigQueryInsertJobOperator with ourEXECUTIONMODE.AIRFLOW_ASYNC classes and the interface built in Add structure to support multiple db for async operator execution #1483
    • Refactored to AbstractDbtBase (no longer inheriting BaseOperator), requiring explicit BaseOperator initialization in all derived operators.
    • Updated the below existing operators to consider this refactoring needing derived classes to initialise BaseOperator:
      • DbtAzureContainerInstanceBaseOperator
      • DbtDockerBaseOperator
      • DbtGcpCloudRunJobBaseOperator
      • DbtKubernetesBaseOperator
  3. Changes to dbt Compilation Workflow
    • Removed _add_dbt_compile_task, which previously pre-generated SQL and uploaded it to remote storage and subsequent task downloaded this compiled SQL for their execution.
    • Instead, dbt run is now directly invoked in each task using the mocked adapter to generate the full SQL.
    • A future issue will assess whether we should reintroduce a compile task using the mocked adapter for SQL generation and upload, reducing redundant dbt calls in each task.

Issue updates

The PR fixes the following issues:

  1. closes: [bug] Fix ExecutionMode.AIRFLOW_ASYNC query #1260
    • Previously, we only supported --full-refresh dbt run with static SQL headers (e.g., CREATE/DROP TABLE).
    • Now, we support dynamic SQL headers based on materializations, including CREATE OR REPLACE TABLE, CREATE OR REPLACE VIEW, etc.
  2. closes: [async] Evaluate possibility of supporting macros when using ExecutionMode.AIRFLOW_ASYNC #1271
    • dbt macros are evaluated at runtime during dbt run invocation using mocked adapter, and this PR lays the groundwork for supporting them in async execution mode.
  3. closes: [async] Support running models without --full-refresh when using ExecutionMode.AIRFLOW_ASYNC #1265
    • Now, large datasets can avoid full drops and recreations, enabling incremental model updates.
  4. closes: [async] Support different materializations for BQ #1261
    • Previously, only tables (--full-refresh) were supported; this PR implements logic for handling different materializations that dbt supports like table, view, incremental, ephemeral, and materialized views.
  5. closes: [async] Evaluate the possiblity of using dbt itself to create the full SQL command #1266
    • Instead of relying on dbt compile (which only outputs SELECT statements), we now let dbt generate complete SQL queries, including SQL headers/DDL statements for the queries corresponding to the resource nodes and state of tables/views in the backend warehouse
  6. closes: [async] Emit datasets when using ExecutionMode.AIRFLOW_ASYNC #1264
    • We support emitting datasets for EXECUTIONMODE.AIRFLOW_ASYNC too with this PR

Example DAG showing EXECUTIONMODE.AIRFLOW_ASYNC deferring tasks and the dynamic query submitted in the logs

Screenshot 2025-02-04 at 1 02 42 PM

Next Steps & Considerations:

Copy link

netlify bot commented Jan 21, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit df9ff3f
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/67a0f7538a533200088282f5

@pankajkoti pankajkoti changed the title Monkeypatch BiqQuery adapter for retriveing SQL for async execution Monkeypatch BiqQuery adapter to retrive SQL for async execution Jan 21, 2025
cosmos/operators/local.py Outdated Show resolved Hide resolved
cosmos/operators/local.py Outdated Show resolved Hide resolved
Copy link

cloudflare-workers-and-pages bot commented Jan 21, 2025

Deploying astronomer-cosmos with  Cloudflare Pages  Cloudflare Pages

Latest commit: df9ff3f
Status: ✅  Deploy successful!
Preview URL: https://46218928.astronomer-cosmos.pages.dev
Branch Preview URL: https://monkeypatch-bq-adapter.astronomer-cosmos.pages.dev

View logs

cosmos/operators/local.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajkoti I'm very excited that we now have a more reliable way of calculating the full dbt SQL query. This approach fixes #1260 and solves many of the async tickets we have open.

Monkey-patching always carries a risk, but it is worth it at this stage.

It would be great if - either as part of this PR - or as a priority follow-up PR, we have an efficient way of testing that the monkey patching works in multiple versions of dbt, including the latest releases, and that the transformation is not being executed when we run the dbt command. I believe this must be done before we release this feature in 1.9.0

I've logged two follow-up tickets that are relevant:

It would be great if these could be accomplished before 1.9.0 release, but I'm also happy with us sticking to approach if time does not allow further analysis / implementation.

@pankajkoti
Copy link
Contributor Author

cc: @joppevos for visibility on the ongoing work

Copy link

codecov bot commented Feb 2, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 97.31%. Comparing base (be44d7c) to head (df9ff3f).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1474      +/-   ##
==========================================
+ Coverage   97.13%   97.31%   +0.17%     
==========================================
  Files          77       79       +2     
  Lines        4505     4615     +110     
==========================================
+ Hits         4376     4491     +115     
+ Misses        129      124       -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pankajkoti pankajkoti changed the title Monkeypatch BiqQuery adapter to retrive SQL for async execution Using dbt to generate the full SQL and support different materializations for BQ for ExecutionMode.AIRFLOW_ASYNC Feb 3, 2025
@pankajkoti pankajkoti changed the title Using dbt to generate the full SQL and support different materializations for BQ for ExecutionMode.AIRFLOW_ASYNC Use dbt to generate the full SQL and support different materializations for BQ for ExecutionMode.AIRFLOW_ASYNC Feb 3, 2025
@pankajkoti pankajkoti marked this pull request as ready for review February 4, 2025 07:29
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:run Primarily related to dbt run command or functionality profile:bigquery Related to BigQuery ProfileConfig labels Feb 4, 2025
@pankajkoti pankajkoti requested a review from tatiana February 4, 2025 07:33
assert not isinstance(base_operator, BaseOperator)


def test_abstract_dbt_base_init_no_super():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test so that we do not reintroduce back the conflicting super.init() call in AbstractDbtBase

Copy link
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we change this to private module some options

  • _adapters
  • _adapters/mock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can, also will wait for @tatiana's feedback on this one.

from dbt_common.clients.agate_helper import empty_table

def execute( # type: ignore[no-untyped-def]
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None
self, sql, auto_begin=False, fetch=None, limit: int | None = None

Copy link
Contributor Author

@pankajkoti pankajkoti Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the signature from the dbt_adapter as is defined there since we wish to mock that function. I think it's good to have the signature as is there to avoid any type mismatch errors. WDYT?

self,
context: Context,
cmd_flags: list[str] | None = None,
run_as_async: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run_as_async may sound misleading since we do not run query in this case, right?
shall we rename to mock_query_execution or something else?

Copy link
Contributor Author

@pankajkoti pankajkoti Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do actually run the query using async operator as part of run_command method that this method calls https://github.com/astronomer/astronomer-cosmos/pull/1474/files#diff-cdc7f3cedeca298d5c3bdd8d9be8fa15791b915fbe1b2810e9c86c271eb86446R537

There's this block at the end

if run_as_async and async_context:
    self._handle_async_execution(tmp_project_dir, context, async_context)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:run Primarily related to dbt run command or functionality priority:high High priority issues are blocking or critical issues without a workaround and large impact profile:bigquery Related to BigQuery ProfileConfig size:XXL This PR changes 1000+ lines, ignoring generated files.
Projects
None yet
3 participants