Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow API for Updating TaskInstance State to skipped is broken #40575

Closed
2 tasks done
abhishekbhakat opened this issue Jul 3, 2024 · 2 comments · Fixed by #40578
Closed
2 tasks done

Airflow API for Updating TaskInstance State to skipped is broken #40575

abhishekbhakat opened this issue Jul 3, 2024 · 2 comments · Fixed by #40578
Labels
area:API Airflow's REST/HTTP API area:core good first issue kind:bug This is a clearly a bug

Comments

@abhishekbhakat
Copy link
Contributor

abhishekbhakat commented Jul 3, 2024

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We have 2 API endpoints to update TaskInstance State:

  1. Set a state of task instances under DAG section.
  2. Updates the state of a task instance under TaskInstance section.

Both these endpoint calls set_state() function in:

This has an older code for _iter_subdag_run_ids:

sub_dag_run_ids = list(
_iter_subdag_run_ids(dag, session, DagRunState(state), task_ids, commit, confirmed_infos),
)

This is passing a state that we give to DagRunState(state). Whereas skipped is not a valid state for DagRun.

Error log for 1st endpoint:

[2024-07-03T07:44:56.205+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/parallel_tasks_dag/updateTaskInstancesState [POST]
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 196, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/response.py", line 112, in wrapper
    response = function(request)
               ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/connexion/decorators/parameter.py", line 120, in wrapper
    return function(**kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 171, in decorated
    return _requires_access(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 98, in _requires_access
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/www/decorators.py", line 159, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py", line 549, in post_set_task_instances_state
    tis = dag.set_task_instance_state(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 2087, in set_task_instance_state
    altered = set_state(
              ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/api/common/mark_tasks.py", line 148, in set_state
    _iter_subdag_run_ids(dag, session, DagRunState(state), task_ids, commit, confirmed_infos),
                                       ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/enum.py", line 714, in __call__
    return cls.__new__(cls, value)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/enum.py", line 1137, in __new__
    raise ve_exc
ValueError: 'skipped' is not a valid DagRunState

Error log for 2nd endpoint:

2024-07-03 13:39:41 [2024-07-03T08:09:41.242+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/parallel_tasks_dag/dagRuns/manual__2024-07-03T07:54:52.290465+00:00/taskInstances/task_a [PATCH]
2024-07-03 13:39:41 Traceback (most recent call last):
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 2529, in wsgi_app
2024-07-03 13:39:41     response = self.full_dispatch_request()
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1825, in full_dispatch_request
2024-07-03 13:39:41     rv = self.handle_user_exception(e)
2024-07-03 13:39:41          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1823, in full_dispatch_request
2024-07-03 13:39:41     rv = self.dispatch_request()
2024-07-03 13:39:41          ^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/flask/app.py", line 1799, in dispatch_request
2024-07-03 13:39:41     return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 196, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 399, in wrapper
2024-07-03 13:39:41     return function(request)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/response.py", line 112, in wrapper
2024-07-03 13:39:41     response = function(request)
2024-07-03 13:39:41                ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/connexion/decorators/parameter.py", line 120, in wrapper
2024-07-03 13:39:41     return function(**kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 171, in decorated
2024-07-03 13:39:41     return _requires_access(
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 98, in _requires_access
2024-07-03 13:39:41     return func(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/www/decorators.py", line 159, in wrapper
2024-07-03 13:39:41     return f(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper
2024-07-03 13:39:41     return func(*args, session=session, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py", line 600, in patch_task_instance
2024-07-03 13:39:41     ti = dag.set_task_instance_state(
2024-07-03 13:39:41          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
2024-07-03 13:39:41     return func(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 2087, in set_task_instance_state
2024-07-03 13:39:41     altered = set_state(
2024-07-03 13:39:41               ^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
2024-07-03 13:39:41     return func(*args, **kwargs)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/site-packages/airflow/api/common/mark_tasks.py", line 148, in set_state
2024-07-03 13:39:41     _iter_subdag_run_ids(dag, session, DagRunState(state), task_ids, commit, confirmed_infos),
2024-07-03 13:39:41                                        ^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/enum.py", line 714, in __call__
2024-07-03 13:39:41     return cls.__new__(cls, value)
2024-07-03 13:39:41            ^^^^^^^^^^^^^^^^^^^^^^^
2024-07-03 13:39:41   File "/usr/local/lib/python3.11/enum.py", line 1137, in __new__
2024-07-03 13:39:41     raise ve_exc
2024-07-03 13:39:41 ValueError: 'skipped' is not a valid DagRunState

What you think should happen instead?

Shouldn't break and update the task state to skipped as we publish skipped can be a valid input for these APIs in the API reference.

How to reproduce

Call these APIs for a historical DagRun.

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

NA

Deployment

Official Apache Airflow Helm Chart

Deployment details

NA

Anything else?

NA

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@abhishekbhakat abhishekbhakat added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jul 3, 2024
@dosubot dosubot bot added the area:API Airflow's REST/HTTP API label Jul 3, 2024
@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Jul 3, 2024
@potiuk
Copy link
Member

potiuk commented Jul 3, 2024

Thanks. Indeed seems that we are creating DagRunState there rather than TaskInstanceState as an enum and DagRunState does not have skipped state option. Marked it as a good first issue - and feel free to take a stab on it if you want.

@abhishekbhakat
Copy link
Contributor Author

abhishekbhakat commented Jul 3, 2024

For now, I have raised a PR to give empty list for subdags if the state is TaskInstanceState.SKIPPED.
And verified that APIs are now working for usual scenarios. Not sure about subdags.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:core good first issue kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants