-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use dbt to generate the full SQL and support different materializations for BQ for ExecutionMode.AIRFLOW_ASYNC #1474
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
Deploying astronomer-cosmos with Cloudflare Pages
|
9590f54
to
e521051
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pankajkoti I'm very excited that we now have a more reliable way of calculating the full dbt SQL query. This approach fixes #1260 and solves many of the async tickets we have open.
Monkey-patching always carries a risk, but it is worth it at this stage.
It would be great if - either as part of this PR - or as a priority follow-up PR, we have an efficient way of testing that the monkey patching works in multiple versions of dbt, including the latest releases, and that the transformation is not being executed when we run the dbt command. I believe this must be done before we release this feature in 1.9.0
I've logged two follow-up tickets that are relevant:
- one is to consider the re-introduction of the compile task, if that means we can avoid having dbt installed in all dbt worker nodes, and executing the command in most Cosmos tasks: Re-evaluate adding compile task when using
ExecutionMode.AIRFLOW_ASYNC
#1477 - the other is to support
TestBehavior.BUILD
SupportTestBehavior.BUILD
when usingExecutionMode.AIRFLOW_ASYNC
#1476
It would be great if these could be accomplished before 1.9.0 release, but I'm also happy with us sticking to approach if time does not allow further analysis / implementation.
cc: @joppevos for visibility on the ongoing work |
8ef4ac5
to
bd85529
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1474 +/- ##
==========================================
+ Coverage 97.13% 97.31% +0.17%
==========================================
Files 77 79 +2
Lines 4505 4615 +110
==========================================
+ Hits 4376 4491 +115
+ Misses 129 124 -5 ☔ View full report in Codecov by Sentry. |
bac9c59
to
8751ff7
Compare
…tion comments on baseoperator init
assert not isinstance(base_operator, BaseOperator) | ||
|
||
|
||
def test_abstract_dbt_base_init_no_super(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test so that we do not reintroduce back the conflicting super.init()
call in AbstractDbtBase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we change this to private module some options
_adapters
_adapters/mock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we can, also will wait for @tatiana's feedback on this one.
from dbt_common.clients.agate_helper import empty_table | ||
|
||
def execute( # type: ignore[no-untyped-def] | ||
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None | |
self, sql, auto_begin=False, fetch=None, limit: int | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied the signature from the dbt_adapter as is defined there since we wish to mock that function. I think it's good to have the signature as is there to avoid any type mismatch errors. WDYT?
self, | ||
context: Context, | ||
cmd_flags: list[str] | None = None, | ||
run_as_async: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The run_as_async
may sound misleading since we do not run query in this case, right?
shall we rename to mock_query_execution
or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do actually run the query using async operator as part of run_command
method that this method calls https://github.com/astronomer/astronomer-cosmos/pull/1474/files#diff-cdc7f3cedeca298d5c3bdd8d9be8fa15791b915fbe1b2810e9c86c271eb86446R537
There's this block at the end
if run_as_async and async_context:
self._handle_async_execution(tmp_project_dir, context, async_context)
Overview
This PR enhances
ExecutionMode.AIRFLOW_ASYNC
by modifying how queries are generated and executed in Cosmos. Instead of relying on custom SQL headers and that only previously supported for--full-refresh
mode, we now allowdbt
to generate the full SQL dynamically, supporting different materializations in BigQuery. This enables better compatibility with various dbt features while ensuring flexibility in how models are run.Key Changes
_mock_bigquery_adapter()
to overrideBigQueryConnectionManager.execute
, ensuring SQL is only written to thetarget
directory and skipping execution in the warehouse.AbstractDbtBaseOperator
:AbstractDbtBaseOperator
inheritedBaseOperator
, causing conflicts when used withBigQueryInsertJobOperator
with ourEXECUTIONMODE.AIRFLOW_ASYNC
classes and the interface built in Add structure to support multiple db for async operator execution #1483AbstractDbtBase
(no longer inheritingBaseOperator
), requiring explicitBaseOperator
initialization in all derived operators.BaseOperator
:DbtAzureContainerInstanceBaseOperator
DbtDockerBaseOperator
DbtGcpCloudRunJobBaseOperator
DbtKubernetesBaseOperator
_add_dbt_compile_task
, which previously pre-generated SQL and uploaded it to remote storage and subsequent task downloaded this compiled SQL for their execution.dbt run
is now directly invoked in each task using the mocked adapter to generate the full SQL.Issue updates
The PR fixes the following issues:
ExecutionMode.AIRFLOW_ASYNC
query #1260ExecutionMode.AIRFLOW_ASYNC
#1271--full-refresh
when usingExecutionMode.AIRFLOW_ASYNC
#1265ExecutionMode.AIRFLOW_ASYNC
#1264EXECUTIONMODE.AIRFLOW_ASYNC
too with this PRExample DAG showing
EXECUTIONMODE.AIRFLOW_ASYNC
deferring tasks and the dynamic query submitted in the logsNext Steps & Considerations:
ExecutionMode.AIRFLOW_ASYNC
by seeking feedback from users by testing alpha https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.9.0a5 created with changes from this PR.ExecutionMode.AIRFLOW_ASYNC
#1477, Compare the efficiency of generating SQL dynamically vs. pre-compiling and uploading SQL via a separate task.