-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PoC on ExecutionMode.LOCAL_AIRFLOW_ASYNC
#1134
Comments
One of the advantages of this feature would also be to give additional information to end-users that Airflow currently exposes, but dbt does not. For example, by using dbt, the BigQuery job ID is not printed in the logs, but if we changed Cosmos to use the Airflow BigQuery operator to run the BQ SQL transformations, the logs include the full job ID. We could also aim to expose this information in the Airflow UI. This was brought up in a discussion with a customer: |
@pankajkoti @pankajastro will be working closely together on discussing possibilities, trying them out and deciding on a path to implement this. They will be probably creating sub-tickets as needed while this work advances |
Another comment: I don't think we need to worry about handling incremental models where the code compiles differently based on whether the model already exists during the PoC. This could be a feature limitation. |
If we want to leverage the interest from Astronomer Cosmos customer, we should do this for BQ as starting point and make sure it works for DBT LS. Steps discussed during brainstorming
We need to confirm in which files we can see the compiled SQL. a) Run b) Check the effort of resolving the compiled SQL from the Manifest file or another pre-existing dbt file that is created as part of c) We don't mess up with generating the compile files. We expect users to give a path to the compiled files the same way they give the path to the manifest. Then, we fetch the compiled SQLs. d) Run dbt compile as a setup task at the beginning of the DAG. |
We agreed not to do 2(a) because it will revert to performance issues we faced in the past. |
To evaluate approach 2(b) mentioned in comment, I explored on where all we could get the compiled SQLs from. I looked into the
Eventually, we can look forward to also supporting approach "2(c)" but since that would mean we're expecting users to supply the path to the compiled SQLs, I'm inclined towards taking the approach "2(d)" mentioned in comment so that Cosmos takes care of generating the compiled SQLs for the user and is an easier start for them then. |
This PR is the groundwork for the implementation of `ExecutionMode.AIRFLOW_ASYNC` (#1120), which - once all other epic tasks are completed - will enable asynchronous execution of dbt resources using Apache Airflow’s deferrable operators. As part of this work, this PR introduces a new option to the enum `ExecutionMode` : `AIRFLOW_ASYNC`. When this execution mode is used, Cosmos now creates a setup task that will pre-compile the dbt project SQL and make it available to the remaining dbt tasks. This PR, however, does not yet leverage Airflow's deferrable operators. If users use `ExecutionMode.AIRFLOW_ASYNC` they will actually be running `ExecutionMode.LOCAL` operators with this change. The PR (#1230) has a first experimental version of using deferrable operators for task execution. ## Setup task as the ground work for a new Execution Mode: `ExecutionMode.AIRFLOW_ASYNC`: - Adds a new operator, `DbtCompileAirflowAsyncOperator`, as a root task(analogous to a setup task) in the DAG, running the dbt compile command and uploading the compiled SQL files to a remote storage location for subsequent tasks that fetch these compiled SQL files from the remote storage and run them asynchronously using Airflow's deferrable operators. ## Airflow Configurations: - `remote_target_path`: Introduces a configurable path to store dbt-generated files remotely, supporting any storage scheme that works with Airflow’s Object Store (e.g., S3, GCS, Azure Blob). - `remote_target_path_conn_id`: Allows specifying a custom connection ID for the remote target path, defaulting to the scheme’s associated Airflow connection if not set. ## Example DAG for CI Testing: Introduces an example DAG (`simple_dag_async.py`) demonstrating how to use the new execution mode(The execution like mentioned earlier would still run like Execution Mode LOCAL operators at the moment with this PR alone) This DAG is integrated into the CI pipeline to run integration tests and aims at verifying the functionality of the `ExecutionMode.AIRFLOW_ASYNC` as and when implementation gets added starting with the experimental implementation in #1230 . ## Unit & Integration Tests: - Adds comprehensive unit and integration tests to ensure correct behavior. - Tests include validation for successful uploads, error handling for misconfigured remote paths, and scenarios where `remote_target_path` are not set. ## Documentation: - Adds detailed documentation explaining how to configure and set the `ExecutionMode.AIRFLOW_ASYNC`. ## Scope & Limitations of the feature being introduced: 1. This feature is meant to be released as Experimental and is also marked so in the documentation. 2. Currently, it has been scoped for only dbt models to be executed asynchronously (being worked upon in PR #1230), while other resource types would be run synchronously. 3. `BigQuery` will be the only supported target database for this execution mode ((being worked upon in PR #1230). Thus, this PR enhances Cosmos by providing the ground work for more efficient execution of long-running dbt resources ## Additional Notes: - This feature is planned to be introduced in Cosmos v1.7.0. related: #1134
Description co-authored by @pankajkoti @pankajastro
Context
As of Cosmos 1.5, we are using dbt core to run transformations in the database. In other words, the Airflow worker node is being blocked during the execution of the SQL transformation. If the transformation takes 6h, we have a Airflow worker node just waiting.
One way to overcome this is to use Airflow deferrable operators, as opposed to using dbt-core commands to execute the transformation. This approach is not new, and it was discussed in Airflow Summit 2023 by Monzo. We (@tatiana ) had a meeting with Monzo to discuss the details afterward, in London.
The overall idea is:
For a first end-to-end, we'll only support:
ExecutionMode.LOCAL
This will allow us to measure the impact of using deferrable operators, and then we can extend to:
Acceptance criteria
ExecutionMode.LOCAL_AIRFLOW_ASYNC
(think if we can find a better name)compiled_sql
template field in Cosmos. If this is being populated after running the task, then, we can simplify the first E2E to useLoadMode.MANIFEST
, and assume the compiled SQL is avaialble for Cosmos.The text was updated successfully, but these errors were encountered: