Skip to content

Commit

Permalink
Update taskinstance clear endpoint to support mapped tasks (#45349)
Browse files Browse the repository at this point in the history
* support mapped task in taskinstance clear

* update-taskinstance-clear-endpoint-to-support-mapped-tasks

* adding tests

* adding tests

* fix test

* resolving conflicts

* update typing and format of request

* add new tests + fix useClearTaskInstances.ts

* add comment
  • Loading branch information
vatsrahul1001 authored Jan 27, 2025
1 parent 236a818 commit 2b46b8f
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 15 deletions.
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class ClearTaskInstancesBody(BaseModel):
only_failed: bool = True
only_running: bool = False
reset_dag_runs: bool = True
task_ids: list[str] | None = None
task_ids: list[str | tuple[str, int]] | None = None
dag_run_id: str | None = None
include_upstream: bool = False
include_downstream: bool = False
Expand Down
9 changes: 8 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7008,7 +7008,14 @@ components:
task_ids:
anyOf:
- items:
type: string
anyOf:
- type: string
- prefixItems:
- type: string
- type: integer
type: array
maxItems: 2
minItems: 2
type: array
- type: 'null'
title: Task Ids
Expand Down
19 changes: 18 additions & 1 deletion airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,24 @@ export const $ClearTaskInstancesBody = {
anyOf: [
{
items: {
type: "string",
anyOf: [
{
type: "string",
},
{
prefixItems: [
{
type: "string",
},
{
type: "integer",
},
],
type: "array",
maxItems: 2,
minItems: 2,
},
],
},
type: "array",
},
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ export type ClearTaskInstancesBody = {
only_failed?: boolean;
only_running?: boolean;
reset_dag_runs?: boolean;
task_ids?: Array<string> | null;
task_ids?: Array<string | [string, number]> | null;
dag_run_id?: string | null;
include_upstream?: boolean;
include_downstream?: boolean;
Expand Down
28 changes: 18 additions & 10 deletions airflow/ui/src/queries/useClearTaskInstances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,26 @@ export const useClearTaskInstances = ({
_: TaskInstanceCollectionResponse,
variables: { dagId: string; requestBody: ClearTaskInstancesBody },
) => {
const taskInstanceKeys = (variables.requestBody.task_ids ?? [])
.map((taskId) => {
const runId = variables.requestBody.dag_run_id;
// deduplication using set as user can clear multiple map index of the same task_id.
const taskInstanceKeys = [
...new Set(
(variables.requestBody.task_ids ?? [])
.filter((taskId) => typeof taskId === "string" || Array.isArray(taskId))
.map((taskId) => {
const actualTaskId = Array.isArray(taskId) ? taskId[0] : taskId;
const runId = variables.requestBody.dag_run_id;

if (runId === null || runId === undefined) {
return undefined;
}
const params = { dagId, dagRunId: runId, taskId };
if (runId === null || runId === undefined) {
return undefined;
}

return UseTaskInstanceServiceGetTaskInstanceKeyFn(params);
})
.filter((key) => key !== undefined);
const params = { dagId, dagRunId: runId, taskId: actualTaskId };

return UseTaskInstanceServiceGetTaskInstanceKeyFn(params);
})
.filter((key) => key !== undefined),
),
];

const queryKeys = [
[useTaskInstanceServiceGetTaskInstancesKey],
Expand Down
79 changes: 78 additions & 1 deletion tests/api_fastapi/core_api/routes/public/test_task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

pytestmark = pytest.mark.db_test


DEFAULT = datetime(2020, 1, 1)
DEFAULT_DATETIME_STR_1 = "2020-01-01T00:00:00+00:00"
DEFAULT_DATETIME_STR_2 = "2020-01-02T00:00:00+00:00"
Expand Down Expand Up @@ -1852,6 +1851,31 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
2,
id="dry_run default",
),
pytest.param(
"example_python_operator",
[
{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED},
{
"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1),
"state": State.FAILED,
},
{
"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2),
"state": State.FAILED,
},
{
"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=3),
"state": State.FAILED,
},
],
"example_python_operator",
{
"dry_run": False,
"task_ids": [["print_the_context", 0], "sleep_for_1"],
},
2,
id="clear mapped task and unmapped tasks together",
),
],
)
def test_should_respond_200(
Expand All @@ -1878,6 +1902,59 @@ def test_should_respond_200(
assert response.status_code == 200
assert response.json()["total_entries"] == expected_ti

@pytest.mark.parametrize(
"main_dag, task_instances, request_dag, payload, expected_ti",
[
pytest.param(
"example_python_operator",
[
{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED},
{
"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1),
"state": State.FAILED,
},
{
"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2),
"state": State.FAILED,
},
{
"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=3),
"state": State.FAILED,
},
],
"example_python_operator",
{
"dry_run": False,
"task_ids": [["print_the_context", 1, 2]],
},
2,
id="clear mapped task and unmapped tasks together",
),
],
)
def test_should_respond_422(
self,
test_client,
session,
main_dag,
task_instances,
request_dag,
payload,
expected_ti,
):
self.create_task_instances(
session,
dag_id=main_dag,
task_instances=task_instances,
update_extras=False,
)
self.dagbag.sync_to_db("dags-folder", None)
response = test_client.post(
f"/public/dags/{request_dag}/clearTaskInstances",
json=payload,
)
assert response.status_code == 422

@mock.patch("airflow.api_fastapi.core_api.routes.public.task_instances.clear_task_instances")
def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, test_client, session):
"""Test that if reset_dag_runs is True, then clear_task_instances is called with State.QUEUED"""
Expand Down

0 comments on commit 2b46b8f

Please sign in to comment.