Equivalent for Airflow templating via Parameters? #3217
-
A very common use case in Airflow / Airflow-like system is the use of templating. One typical use case: let's say I have a
I would like to parameterize this query on the source table, so that I can run my flow for new source tables that show up in this dataset every month / every year, as the BigQuery Reddit comments dataset grows (this is just an example, but is typical of any event log dataset). I tried defining a
This, of course, doesn't work. It just resolves to:
So I know I am doing something wrong. In Airflow I could handle this behavior via a parameter. So far the most relevant part of the documentation seems to be the X-Files episode tutorial, but I can't quite understand how to resolve a parameter inside an argument to a task like |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
I should mention I've also tried using:
where
|
Beta Was this translation helpful? Give feedback.
-
So, I think I answered my own question: having a "factory" function
Calling |
Beta Was this translation helpful? Give feedback.
-
Hi @bogdanstate - remember that all Prefect tasks represent deferred computation. So when you format your query using a Parameter object (which has not run and produced a value yet), it's expected that you get a weird result. To properly achieve your goal, you need to place all this logic within the Flow so that it executes using runtime information: source_table = Parameter("source_table")
bigquery = BigQueryTask([...]) # all init kwargs but NOT the query kwarg as it is not known yet
@task
def construct_query(source_table):
return """
SELECT [columns]
FROM `fh-bigquery.reddit_comments.{source_table}`
""".format(source_table=source_table)
with Flow("BigQuery-template") as flow:
query = construct_query(source_table)
bigquery(query=query) # pass the runtime resolved value
flow.run(source_table="A")
flow.run(source_table="B") There are also some string formatting tasks that help you with some of this boilerplate. |
Beta Was this translation helpful? Give feedback.
Hi @bogdanstate - remember that all Prefect tasks represent deferred computation. So when you format your query using a Parameter object (which has not run and produced a value yet), it's expected that you get a weird result. To properly achieve your goal, you need to place all this logic within the Flow so that it executes using runtime information: