-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support asset partitions for load_assets_from_dbt_project #7683
Comments
Would love this |
I second the above. I see a ton of value in incremental partitioning because they are partitioned by nature in some sense. Most cases I've seen use a date for example. |
@GeorgePearse @aryan-tmg - just to deepen my understanding, are you using the BigQuery? The main information I see when I Google dbt partitioning is about BigQuery. |
My data warehouse is currently v. budget. Just in postgres, might migrate to Snowflake or Firebolt at some point in the future. Just don't want to have to run SQL queries over huge amounts of data to do a computation that unnecessarily recreates everything from scratch. I haven't thoroughly explored what would already be available for this though. |
My use case is that I have a lot of data in my dbt models. If I were to materialize them as views each day it would greatly extend my workflows. Right now, I have my models configured as incremental and I pass a variable "run_date" whenever I do a dbt run. I'd like to be able to configure that somehow in dagster so that I can pass a partition definition to all assets in my workflow including dbt |
We're currently investigating adding this. Some questions for those who are interested in this:
Tagging people who 👍 'ed this issue who I haven't yet reached out to in other contexts: @deugene @hungtd9 @xaniasd @nvinhphuc. |
Hi @sryza , I will describe what we currently do. I have an ingestion job running daily.
This is to process data from start_time to end_time, not fully refresh all the data. However, we can run full refresh with dbt run --full-refresh if we want to reprocess all data. But the idea is to bring the partition in Dagster into DBT for coherence.
|
Hi @sryza here's some input from my side as well
It really depends on the model. Typically daily partitions, could be hourly as well. Event-like tables are typically partitioned, but there are also models that partitioning is not needed, for instance small tables with master data (think zipcode per city).
For incremental models, you can also define static partitions. You could tell dbt to figure out all missing partitions itself and update the model accordingly, but this is not a great option when each partition contains gigabytes of data. By providing a partition as a variable, I can update exactly the partition that I want.
All I can say: please avoid the confusion Airflow created with their execution_date (I think they're trying to move past that as well) :) I'm trying to use the var as the partition to be processed wherever possible. Perhaps it would be good to leave the interpretation to the individual user, as it can differ wildly. Hope this helps, happy to share more if necessary |
@xaniasd @nvinhphuc thanks for the feedback! You both mentioned sometimes partitioning by hour and sometimes by day, and I have some followup questions around that.
|
Hi @OwenKephart
The start_time and end_time variables is string, and I will CAST them to DATE or DATETIME depends on the partition is hourly or daily.
|
Hi everyone! As an initial implementation of this behavior, we're planning to add a For simple cases, where you want to parameterize all models in a dbt project by (let's say) a run date, that would look something like: from dagster import DailyPartitionsDefinition
my_partitions_def = DailyPartitionsDefinition(start_date="2022-02-02")
def partition_key_to_dbt_vars(partition_key):
return {"run_date": partition_key}
dbt_assets = load_assets_from_dbt_project(
...,
partitions_def=my_partitions_def,
partition_key_to_vars_fn=partition_key_to_dbt_vars,
) In this example, when Dagster is invoked to update the "2022-06-06" partition (for example), then it will invoke the dbt cli with a vars argument of This does not currently provide any special support for dbt projects that have multiple partitioning schemes (although you can invoke the load_assets_... function multiple times, once for each set of dbt assets). We're definitely interested in how well this setup maps to your usecases, so feel free to leave feedback. All of these changes are currently experimental and subject to change, so if this misses the mark, definitely let us know :) |
awesome, I'll give it a try. Thanks for this! |
Hi @sryza
When we supplying partition information to a DB/query engine a sensible default is daily, however there are sometimes cases to make this lower eg if we want to make hourly partitions for more recent data to make scans cheaper. Conversely we might want to make partitions larger eg BigQuery has a default max quota of 4000 partitions per table, and if we want more than 4000
I've done it previously by partitioning BigQuery tables per date, and we've used incremental DBT models with variables representing the start/end dates passed into the is_incremental WHERE clause to do partial backfills. Today we are using Dagster/DBT/Snowflake, letting Snowflake decide how it wants to partition/shard it's data files, and don't have anything better than passing timestamp bookmarks to DBT incremental models.
Depends on the data requirements, but if it's just to create a clean copy ready for analysts we might make an incremental doing an insert only where the incoming set has timestamps greater than the max timestamp in the downstream table, or alternatively we might filter out on a WHERE NOT EXISTS looking for existence of the primary key in the downstream table.
Ideally we'd want to be able to start start/min stop/max bookends, for example if we need to backfill 1 specific week eg 2022-02-01 to 2022-02-07 without having to backfill 2022-02-01 to whatever today's timestamp is. One alternative is for us to do full loads, which is expensive when you're at/above 100s of TB in an insert statement. Another is to not invoke DBT via Dagster, and have another runner/orchestrator invoke DBT passing in the timestamp bookends but we really like using Dagster. It would be a massive time saving feature for Dagster to be able to push partition/date-ranges/id-ranges down to DBT to do partial backfills. |
Thanks a ton for the info @its-a-gas. Would the approach that @OwenKephart added support for and mentioned here work for you? |
Hi @sryza 👋 I think this approach would get us a step closer to having Dagster push the partitioned-asset concept down to the DBT layer. My interpretation is that I think we all agree the ideal state is to pass start, and end, bookmarks of arbitrary granularity.... but @OwenKephart solution would get us closer to the ideal state |
I'm going to close this because the |
Wouldn't just pushin slices of range instead of pushing a range at once achieve the same? i.e. e.g. to refill 2022-02-01 and 2022-02-07 , dagster just run 7 parallel dbt jobs, each having received a day to backfill from dagster? |
That works too, but there's some fixed overhead with each job, so doing it all at once reduces that overhead |
We don't currently support partitioned dbt assets, but we should
Relevant mentions in Slack:
The text was updated successfully, but these errors were encountered: