Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1481] [Feature] Expose async arg in run_query() #315

Closed
3 tasks done
jaysobel opened this issue Nov 9, 2022 · 8 comments
Closed
3 tasks done

[CT-1481] [Feature] Expose async arg in run_query() #315

jaysobel opened this issue Nov 9, 2022 · 8 comments
Labels
Stale type:enhancement New feature or request

Comments

@jaysobel
Copy link

jaysobel commented Nov 9, 2022

Is this your first time submitting a feature request?

  • I have read the expectations for open source contributors
  • I have searched the existing issues, and I could not find an existing issue for this feature
  • I am requesting a straightforward extension of existing dbt-snowflake functionality, rather than a Big Idea better suited to a discussion

Describe the feature

In dbt Snowflake projects with ~1,000 models, cloning schemas takes 10s of minutes, and typically comes up once per-user per-day to refresh development environments, or even more frequently to set up CI testing schemas, as in the documentation for the dbt Cloud CI Job.

Typically the create schema ci_temp.x as clone prod.x command is run as a dbt macro using the run_query() function, which (presumably) uses the Snowflake connector for Python's cursor.execute() function. However this blog post from select.dev demonstrates that in the context of cloning schemas (or many tables at a time) the execute_async() variant of this function (a convenience function setting async = true on execute()) can be 60x faster than the basic execute() of the same command.

Can the run_query() macro, and underlying Jinja interface with Snowflake's Python cursor, expose this optional parameter?

Describe alternatives you've considered

Waiting for Snowflake to improve the performance of clone schema. Why does a "zero copy clone" take so long?

In the context of clones for CI test runs the whole schema isn't needed. The specific models required to run state:modified+1 is given by the selector -s @state:modified --exclude state:modfied+1 (assuming the subsequent run has --full-refresh). However selectors cannot be passed to the run-operation command (per #5005). If cloning the whole schema can take 20 seconds, this level of optimization would be moot anyway.

Who will this benefit?

Enterprise clients of dbt Cloud using the dbt Cloud CI Job will save 10 minutes off the top of CI runs (which can also only run one at a time, and often queue up).

AE teams blocked from automatically refreshing clones everyday by the runtime of 10 minutes * headcount.

Are you interested in contributing this feature?

No response

Anything else?

The article by Niall Woodward.
https://select.dev/posts/snowflake-clones

Snowflake Python docs "Performing an Asynchronous Query"
https://docs.snowflake.com/en/user-guide/python-connector-example.html#performing-an-asynchronous-query

A sad log of dbt Cloud CI job executions taking 10+ minutes.
image

Typical 10 minutes spent on cloning
image

@jaysobel jaysobel added type:enhancement New feature or request triage:product labels Nov 9, 2022
@github-actions github-actions bot changed the title [Feature] Expose async arg in run_query() [CT-1481] [Feature] Expose async arg in run_query() Nov 9, 2022
@dbeatty10
Copy link
Contributor

@jaysobel I buy into your vision of speeding up the zero-copy clone process for large projects!

Reminds me of a famous song by the Proclaimers:

When I start up, well, I know I'm gonna be
I'm gonna be the tool who starts up just for you
When I clone it, yeah, I know I'm gonna be
I'm gonna be the tool who clones it just for you

But I would clone five hundred models
And I would clone five hundred more
Just to be the tool who cloned a thousand models
Served up right at your door

I think implementing dbt-labs/dbt-core#5005 would be a better bet than exposing execute_async.

I saw your comment here -- would dbt-labs/dbt-core#5005 be just as good an outcome for you?

Rationale

  • dbt-labs/dbt-core#5005 would:
    • presumably enable both slim CI as well as additional use-cases
    • allow targeting just the right nodes rather than hitting all of them
  • whereas for execute_async:
    • the requisite code would be pretty low-level (see here and here)
    • unless other use-cases emerge, we'd only be enabling async = true on execute() because of slow zero-copy clones on Snowflake
      • we try not to implement functionality like this unless it applies across more than one data platform

@jaysobel
Copy link
Author

@dbeatty10 I don't see 5005 as a substitute since it would just enable cloning fewer models in CI runs, as opposed to cloning faster, in all contexts.

@jtcohen6
Copy link
Contributor

Had a chance to chat more about this with @dbeatty10 & @Fleid. Super interesting stuff.

Let's stay concrete to the two use cases called out in the issue:

  • once per-user per-day to refresh development environments
  • even more frequently to set up CI testing schemas

These are completely legit! We should seek to make these as fast + painless as possible.


That said, if I'm being candid, I live in fear of code like:

{% macro clone_all_my_tables() %}
  {% for query in queries %}
    {% do run_query(query, async=true) %}  -- "will I ever get a callback?" - dbt, or jerco during high school theatre auditions
  {% endfor %}
  {% set wait_sql %}
  select system$wait(30); -- hopefully long enough for everything to complete...?
  {% end wait_sql %}
  {% do run_query(wait_sql, async=false) %}
{% endmacro %}
  • What if a query fails?
  • What if a query takes longer than 30 seconds to run?
  • Would we actually want people to do async polling from within the Jinja context? Or to register a callback, somehow?

Snowflake calls out some "best practices," which feel (to me) pretty nontrivial:

  • Ensure that you know which queries are dependent upon other queries before you run any queries in parallel. Some queries are interdependent and order sensitive, and therefore not suitable for parallelizing. For example, obviously an INSERT statement should not start until after the corresponding CREATE TABLE statement has finished.
  • Ensure that you do not run too many queries for the memory that you have available. Running multiple queries in parallel typically consumes more memory, especially if more than one set of results is stored in memory at the same time.
  • When polling, handle the rare cases where a query does not succeed.
  • Ensure that transaction control statements (BEGIN, COMMIT, and ROLLBACK) do not execute in parallel with other statements.

I worry that we'd be setting ourselves on the precipice of some pretty gnarly request-handling, all of which would need to handled with Jinja code...

tl;dr What if we used threads instead?

If you're doing parallelism in dbt today, it's because you're executing a DAG, and some resources within that DAG can be built / operated on independently from each other — insofar as the dependency order allows. This holds true for dbt source freshness and dbt test, where all of the selected resources can be built in parallel, because there are no interdependencies between them. In those cases, dbt will always be using exactly as many --threads as you've configured.

Async queries, parallelism... there be dragons in these waters. We want to be thoughtful about how we expose these capabilities to end users, rather than throwing them right into the deep.

Async calls != DAG: these things are mutually exclusive. "Houston, we have a leak in the abstraction."

IMO the best answer here looks:

Let's imagine a CI environment, and a form of dbt run --select state:modified --defer --state ..., where dbt identifies that 1 model out of 500 has changed, and 9 other models are downstream of the changed model. Currently, dbt will select only those 10 models to run, and implicitly rewrite the ref() within those models to select from production versions of the unchanged upstream models (since they won't exist in your CI schema). Instead, what if the first step were for dbt to "run" those 490 unchanged models — by cloning each and every one of them, in an independent thread, with parallelism up to your configured number of --threads?

To be clear, this would require a change within dbt-core. But I think it is the right way to solve for this use case, in keeping with the existing constructs and (good) constraints that we have in place for dbt today.

@jaysobel
Copy link
Author

where dbt identifies that 1 model out of 500 has changed, and 9 other models are downstream of the changed model. Currently, dbt will select only those 10 models to run, and implicitly rewrite the ref() within those models to select from production versions of the unchanged upstream models (since they won't exist in your CI schema).

Just noting that I'm not on the same page here. May just be our implementation, or out-of-date docs in the dbt Cloud CI Job but pic implies full 500 models are cloned as step 1. And we do ours this way so we can do full-refresh, then incremental, then tests as separate steps.
image

I think what I'm hearing is dbt clone -s @state:modified? Which sounds 🔥. Would be faster on account of threads, and on account of the more restrictive selection (can even --exclude state:modified if first step is a full refresh of state:modified. Hopefully that would realize about the same gains as the async one... Not sure what the path forward would be if it was only 50% faster, vs 95%.

@jtcohen6
Copy link
Contributor

I think what I'm hearing is dbt clone -s @state:modified? Which sounds 🔥.

Yes, this is what I'm thinking! Sorry for not being clearer in the leadup; this doesn't yet exist today.

You could picture it as something like:

$ dbt clone -s -exclude state:modified+ --threads 10
$ dbt build -s state:modified+

Except, what if dbt did the first step for you implicitly? And what you actually run is a more configurable version of "deferral," where rewriting refs for unbuilt upstream models is one option, and cloning is another:

$ dbt build -s state:modified+ --unbuilt-upstream=clone

Hopefully that would realize about the same gains as the async one... Not sure what the path forward would be if it was only 50% faster, vs 95%.

Fair - we'd want to do some testing / benchmarking to confirm.

@jaysobel
Copy link
Author

Both of these seem like great options.

The --unbuilt-upstream sounds a little more novel, since I think dbt throws a SQL exception when an upstream doesn't exist. But maybe you're suggesting that under the hood it's doing a clone for you, and not actually DAG'ing in the clones as-needed?

@epapineau
Copy link

Wow this would be such a win for dbt on Snowflake. Cloning our dev schemas takes upwards of 12 minutes right now.

It's a real flow breaker - @aescay

@github-actions
Copy link
Contributor

github-actions bot commented Jul 4, 2023

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Stale type:enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants