-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow passing db context via op_kwargs #106
Conversation
{ | ||
"conn_id": "postgres_conn", | ||
"database": "pagila", | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to test the opt_kwargs
, don't we need the kwargs
to be:
{
"op_kwargs": {
"conn_id": "postgres_conn",
"database": "pagila",
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @dimberman , thank you very much!
Some questions:
- Does this help in any sense regarding overriding
AIRFLOW__ASTRO__SQL_SCHEMA
? - Do we need to implement this approach in any other operators/tasks, so they can all benefit from
op_kwargs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dimberman What is the overall benefit of using op_kwargs
as opposed to using default_args
?
@ashb perhaps you can help on this one but I wasn't able to get default_args to work on these tasks. I can't tell if it's because they are taskflow functions or because they have default values. |
So what is the interface/example that you would like to work? |
@ashb the interface mentioned in the first comment, namely: @aql.transform
def test_astro():
return "SELECT * FROM actor"
with dag:
actor_table = test_astro(database="pagile", conn_id="my_postgres_conn") The idea here is that a user can decide the database and conn_id at runtime without needing to pass in a table. |
For queries where users don't want to pass a table, object, this feature will allow users to define context at runtime using op_kwargs. example: ```python @aql.transform def test_astro(): return "SELECT * FROM actor" with dag: actor_table = test_astro(database="pagile", conn_id="my_postgres_conn") ```
86f6e46
to
df32d35
Compare
Do you want it to be at the call time, because the other option is something like this: @aql.transform(database="pagile", conn_id="my_postgres_conn")
def test_astro():
return "SELECT * FROM actor"
with dag:
actor_table = test_astro() Right now Operator/Task arguments are passed to the decorator, and the only thing that can be passed to the decorated function are actual func parameters. We could look at changing that (not sure if we'd need a change in Airflow or if we can just do that in our custom decorator) but there might be confusion about which parameter goes where? |
If we did want your first example to work you'd have to overide def __call__(self, *args, **kwargs) -> XComArg:
operator_kwargs = {**self.kwargs]
operator_kwargs.update((k, v) for (k,v) in kwargs.items() if k not in self.function_arg_names())
function_kwargs = {k: v for (k,v) in kwargs.items() if k in self.function_arg_names() }
op = self.operator_class(
python_callable=self.function,
op_args=args,
op_kwargs=function_kwargs,
multiple_outputs=self.multiple_outputs,
**self.kwargs,
**operator_kwargs
)
if self.function.__doc__:
op.doc_md = self.function.__doc__
return XComArg(op) I think that might work, but I'm not sure how predictable it might be. Take this example: @aql.transform
def test_astro(table):
return "SELECT * FROM {{ table }}"
with dag:
actor_table = test_astro(table=some_table, database="pagile", conn_id="my_postgres_conn" Is it clear to users what args go where, and what the valid options are? |
* Allow passing db context via op_kwargs For queries where users don't want to pass a table, object, this feature will allow users to define context at runtime using op_kwargs. example: ```python @aql.transform def test_astro(): return "SELECT * FROM actor" with dag: actor_table = test_astro(database="pagile", conn_id="my_postgres_conn") ``` * simplify * fix test * fix test
For queries where users don't want to pass a table, object, this feature
will allow users to define context at runtime using op_kwargs.
example: