Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing db context via op_kwargs #106

Merged
merged 4 commits into from
Feb 16, 2022
Merged

Conversation

dimberman
Copy link
Collaborator

For queries where users don't want to pass a table, object, this feature
will allow users to define context at runtime using op_kwargs.

example:

@aql.transform
def test_astro():
    return "SELECT * FROM actor"

with dag:
    actor_table = test_astro(database="pagile", conn_id="my_postgres_conn")

Comment on lines +210 to +213
{
"conn_id": "postgres_conn",
"database": "pagila",
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to test the opt_kwargs, don't we need the kwargs to be:

   { 
      "op_kwargs": {
         "conn_id": "postgres_conn",
         "database": "pagila",
      }
   }

Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dimberman , thank you very much!

Some questions:

  1. Does this help in any sense regarding overriding AIRFLOW__ASTRO__SQL_SCHEMA?
  2. Do we need to implement this approach in any other operators/tasks, so they can all benefit from op_kwargs?

Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimberman What is the overall benefit of using op_kwargs as opposed to using default_args?

@dimberman
Copy link
Collaborator Author

@dimberman What is the overall benefit of using op_kwargs as opposed to using default_args?

@ashb perhaps you can help on this one but I wasn't able to get default_args to work on these tasks. I can't tell if it's because they are taskflow functions or because they have default values.

@ashb
Copy link
Collaborator

ashb commented Feb 16, 2022

So what is the interface/example that you would like to work?

@dimberman
Copy link
Collaborator Author

@ashb the interface mentioned in the first comment, namely:

@aql.transform
def test_astro():
    return "SELECT * FROM actor"

with dag:
    actor_table = test_astro(database="pagile", conn_id="my_postgres_conn")

The idea here is that a user can decide the database and conn_id at runtime without needing to pass in a table.

For queries where users don't want to pass a table, object, this feature
will allow users to define context at runtime using op_kwargs.

example:

```python
@aql.transform
def test_astro():
    return "SELECT * FROM actor"

with dag:
    actor_table = test_astro(database="pagile", conn_id="my_postgres_conn")
```
@dimberman dimberman force-pushed the allow-passing-op-kwargs branch from 86f6e46 to df32d35 Compare February 16, 2022 18:40
@dimberman dimberman merged commit 472165a into main Feb 16, 2022
@dimberman dimberman deleted the allow-passing-op-kwargs branch February 16, 2022 19:32
@ashb
Copy link
Collaborator

ashb commented Feb 18, 2022

Do you want it to be at the call time, because the other option is something like this:

@aql.transform(database="pagile", conn_id="my_postgres_conn")
def test_astro():
    return "SELECT * FROM actor"

with dag:
    actor_table = test_astro()

Right now Operator/Task arguments are passed to the decorator, and the only thing that can be passed to the decorated function are actual func parameters.

We could look at changing that (not sure if we'd need a change in Airflow or if we can just do that in our custom decorator) but there might be confusion about which parameter goes where?

@ashb
Copy link
Collaborator

ashb commented Feb 18, 2022

If we did want your first example to work you'd have to overide __call__ on the decorator to be something like this:

     def __call__(self, *args, **kwargs) -> XComArg:
        operator_kwargs = {**self.kwargs]
        operator_kwargs.update((k, v) for (k,v) in kwargs.items() if k not in self.function_arg_names())
        function_kwargs = {k: v for (k,v) in kwargs.items() if k in self.function_arg_names() }
        op = self.operator_class(
            python_callable=self.function,
            op_args=args,
            op_kwargs=function_kwargs,
            multiple_outputs=self.multiple_outputs,
            **self.kwargs,
            **operator_kwargs
        )
        if self.function.__doc__:
            op.doc_md = self.function.__doc__
        return XComArg(op)

I think that might work, but I'm not sure how predictable it might be. Take this example:

@aql.transform
def test_astro(table):
    return "SELECT * FROM {{ table }}"

with dag:
    actor_table = test_astro(table=some_table, database="pagile", conn_id="my_postgres_conn"

Is it clear to users what args go where, and what the valid options are?

utkarsharma2 pushed a commit that referenced this pull request Mar 30, 2022
* Allow passing db context via op_kwargs

For queries where users don't want to pass a table, object, this feature
will allow users to define context at runtime using op_kwargs.

example:

```python
@aql.transform
def test_astro():
    return "SELECT * FROM actor"

with dag:
    actor_table = test_astro(database="pagile", conn_id="my_postgres_conn")
```

* simplify

* fix test

* fix test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants