Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC on ExecutionMode.LOCAL_AIRFLOW_ASYNC #1134

Closed
3 tasks
tatiana opened this issue Aug 1, 2024 · 6 comments · Fixed by #1230
Closed
3 tasks

PoC on ExecutionMode.LOCAL_AIRFLOW_ASYNC #1134

tatiana opened this issue Aug 1, 2024 · 6 comments · Fixed by #1230
Assignees
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc area:performance Related to performance, like memory usage, CPU usage, speed, etc dbt:compile Primarily related to dbt compile command or functionality execution:local Related to Local execution environment profile:databricks Related to Databricks ProfileConfig
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Aug 1, 2024

Description co-authored by @pankajkoti @pankajastro

Context

As of Cosmos 1.5, we are using dbt core to run transformations in the database. In other words, the Airflow worker node is being blocked during the execution of the SQL transformation. If the transformation takes 6h, we have a Airflow worker node just waiting.

One way to overcome this is to use Airflow deferrable operators, as opposed to using dbt-core commands to execute the transformation. This approach is not new, and it was discussed in Airflow Summit 2023 by Monzo. We (@tatiana ) had a meeting with Monzo to discuss the details afterward, in London.

The overall idea is:

  • Pre-compile the dbt project, so we have the pre-compiled SQL
  • Execute the pre-compiled SQL using native Airflow operators, enabling the deferable mode

For a first end-to-end, we'll only support:

  • SQL models
  • ExecutionMode.LOCAL
  • Databricks

This will allow us to measure the impact of using deferrable operators, and then we can extend to:

  • Python models
  • Other execution modes
  • Other SQL backends

Acceptance criteria

  • PoC to try to have this feature end-to-end that
    • Introduce ExecutionMode.LOCAL_AIRFLOW_ASYNC (think if we can find a better name)
    • Check how we are populating the compiled_sql template field in Cosmos. If this is being populated after running the task, then, we can simplify the first E2E to use LoadMode.MANIFEST, and assume the compiled SQL is avaialble for Cosmos.
    • If the dbt profile is not related to Databricks, error
    • if it is related to Databricks, than attempt to use the DatabricksSubmitRunOperator to run the previously compiled sql.
  • Log follow-up tasks to make this PoC production ready and to expand the scope of the first end-to-end (support Python models, support other backend, e.g. Snowflake or BQ)
  • Identify limitations with this approach (e.g. some macros may not work, and how this will play along with Airflow inlets/outlets)
@tatiana tatiana added this to the Cosmos 1.7.0 milestone Aug 1, 2024
@tatiana tatiana added the area:performance Related to performance, like memory usage, CPU usage, speed, etc label Aug 1, 2024
@dosubot dosubot bot added area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:compile Primarily related to dbt compile command or functionality execution:local Related to Local execution environment profile:databricks Related to Databricks ProfileConfig labels Aug 1, 2024
@tatiana
Copy link
Collaborator Author

tatiana commented Aug 14, 2024

One of the advantages of this feature would also be to give additional information to end-users that Airflow currently exposes, but dbt does not.

For example, by using dbt, the BigQuery job ID is not printed in the logs, but if we changed Cosmos to use the Airflow BigQuery operator to run the BQ SQL transformations, the logs include the full job ID. We could also aim to expose this information in the Airflow UI.

This was brought up in a discussion with a customer:
https://astronomer.slack.com/archives/C074NP4A8G1/p1722774197657379

@tatiana
Copy link
Collaborator Author

tatiana commented Aug 27, 2024

@pankajkoti @pankajastro will be working closely together on discussing possibilities, trying them out and deciding on a path to implement this. They will be probably creating sub-tickets as needed while this work advances

@tatiana
Copy link
Collaborator Author

tatiana commented Aug 27, 2024

Another comment: I don't think we need to worry about handling incremental models where the code compiles differently based on whether the model already exists during the PoC. This could be a feature limitation.

@tatiana
Copy link
Collaborator Author

tatiana commented Sep 18, 2024

If we want to leverage the interest from Astronomer Cosmos customer, we should do this for BQ as starting point and make sure it works for DBT LS.

Steps discussed during brainstorming

  1. Change profile mapping & change this DAG to use a new execution mode ExecutionMode.LOCAL_AIRFLOW_ASYNC
    https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/basic_cosmos_dag.py

  2. Have the compiled file & make it available to all dbt operators created by Cosmos

We need to confirm in which files we can see the compiled SQL.

a) Run dbt compile in the Airflow scheduler. Introduce LoadMode.DBT_COMPILE. It will work very similar to LoadMode.DBT_LS (from the perspective of running a subprocess with compile) and LoadMode.DBT_MANIFEST (from the perspective we'd parse the manifest file we create and just run the same logic as for LoadModeManifest). In this case, we'd need to upload the created artefacts to an Object Store, and make sure the paths would be available for the workers. This will have the same performance issues as LoadMode.DBT_LS before Cosmos 1.5.

b) Check the effort of resolving the compiled SQL from the Manifest file or another pre-existing dbt file that is created as part of LoadMode.DBT_LS.

c) We don't mess up with generating the compile files. We expect users to give a path to the compiled files the same way they give the path to the manifest. Then, we fetch the compiled SQLs.

d) Run dbt compile as a setup task at the beginning of the DAG. ProjectConfig(pre_compile_sql=True, remote_root_path, remote_target_conn) first task of Cosmos could be a task that runs dbt compile and store it in a remote store. We'd have to share this location with the other tasks - this can be done either upfront based on the user configuration. If we decide to manage the path, we'd be using something like <dbt-project-path> / <target> / <compiled> / * or <dag> / <task group> / <target> / <compiled>.

@tatiana
Copy link
Collaborator Author

tatiana commented Sep 18, 2024

We agreed not to do 2(a) because it will revert to performance issues we faced in the past.

@pankajkoti
Copy link
Contributor

To evaluate approach 2(b) mentioned in comment, I explored on where all we could get the compiled SQLs from. I looked into the target directory of the dbt project and the files available in it. It contains the following files I explored for finding compiled SQLs:

  1. graph.gpickle -> The purpose of the graph.gpickle file is to store the structure of the project’s dependency graph (DAG), including relationships between models, tests, seeds, and other nodes. It does not contain the compiled SQLs.
  2. partial_parse.msgpack -> partial_parse.msgpack file is a binary-encoded file that dbt uses to store the results of previous parsing operations. It allows dbt to skip parsing unchanged files on subsequent runs by loading the cached metadata from this file. This one too does not contain compiled SQLs.
  3. manifest.json -> The manifest.json does not always contain compiled SQLs. I ran the dbt compile command and observed that the compiled_sql still not populated for the model nodes. The dbt docs suggests that A few node properties, such as compiled_sql, only appear for executed nodes. That would be mean we would have to run at least once the dbt project, hence this does not seem to be a reliable way for us to start on, IMO.
  4. Upon running the dbt compile command, a compiled directory is created in the target directory of the dbt project. This nested folder does contain the compiled SQLs.

Eventually, we can look forward to also supporting approach "2(c)" but since that would mean we're expecting users to supply the path to the compiled SQLs, I'm inclined towards taking the approach "2(d)" mentioned in comment so that Cosmos takes care of generating the compiled SQLs for the user and is an easier start for them then.

@tatiana tatiana modified the milestones: Cosmos 1.7.0, Triage Sep 20, 2024
@pankajkoti pankajkoti modified the milestones: Triage, Cosmos 1.7.0 Sep 20, 2024
tatiana pushed a commit that referenced this issue Sep 30, 2024
This PR is the groundwork for the implementation of
`ExecutionMode.AIRFLOW_ASYNC`
(#1120), which -
once all other epic tasks are completed - will enable asynchronous
execution of dbt resources using Apache Airflow’s deferrable operators.
As part of this work, this PR introduces a new option to the enum
`ExecutionMode` : `AIRFLOW_ASYNC`. When this execution mode is used,
Cosmos now creates a setup task that will pre-compile the dbt project
SQL and make it available to the remaining dbt tasks. This PR, however,
does not yet leverage Airflow's deferrable operators. If users use
`ExecutionMode.AIRFLOW_ASYNC` they will actually be running
`ExecutionMode.LOCAL` operators with this change. The PR (#1230) has a
first experimental version of using deferrable operators for task
execution.

## Setup task as the ground work for a new Execution Mode:
`ExecutionMode.AIRFLOW_ASYNC`:
- Adds a new operator, `DbtCompileAirflowAsyncOperator`, as a root
task(analogous to a setup task) in the DAG, running the dbt compile
command and uploading the compiled SQL files to a remote storage
location for subsequent tasks that fetch these compiled SQL files from
the remote storage and run them asynchronously using Airflow's
deferrable operators.

## Airflow Configurations:
- `remote_target_path`: Introduces a configurable path to store
dbt-generated files remotely, supporting any storage scheme that works
with Airflow’s Object Store (e.g., S3, GCS, Azure Blob).
- `remote_target_path_conn_id`: Allows specifying a custom connection ID
for the remote target path, defaulting to the scheme’s associated
Airflow connection if not set.

## Example DAG for CI Testing:
Introduces an example DAG (`simple_dag_async.py`) demonstrating how to
use the new execution mode(The execution like mentioned earlier would
still run like Execution Mode LOCAL operators at the moment with this PR
alone)
This DAG is integrated into the CI pipeline to run integration tests and
aims at verifying the functionality of the `ExecutionMode.AIRFLOW_ASYNC`
as and when implementation gets added starting with the experimental
implementation in #1230 .

## Unit & Integration Tests:
- Adds comprehensive unit and integration tests to ensure correct
behavior.
- Tests include validation for successful uploads, error handling for
misconfigured remote paths, and scenarios where `remote_target_path` are
not set.

## Documentation:
- Adds detailed documentation explaining how to configure and set the
`ExecutionMode.AIRFLOW_ASYNC`.

## Scope & Limitations of the feature being introduced:
1. This feature is meant to be released as Experimental and is also
marked so in the documentation.
2. Currently, it has been scoped for only dbt models to be executed
asynchronously (being worked upon in PR #1230), while other resource
types would be run synchronously.
3. `BigQuery` will be the only supported target database for this
execution mode ((being worked upon in PR #1230).

Thus, this PR enhances Cosmos by providing the ground work for more
efficient execution of long-running dbt resources

## Additional Notes:
- This feature is planned to be introduced in Cosmos v1.7.0.

related: #1134
@tatiana tatiana closed this as completed in 111d430 Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc area:performance Related to performance, like memory usage, CPU usage, speed, etc dbt:compile Primarily related to dbt compile command or functionality execution:local Related to Local execution environment profile:databricks Related to Databricks ProfileConfig
Projects
None yet
3 participants