From a533ecce182c3ddce63e591bcd6d252669663be1 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 25 May 2023 02:49:38 +0530 Subject: [PATCH 1/4] Return triggerer health status in Airflow /health endpoint PR https://github.com/apache/airflow/pull/27755 introduced sending triggerer health status in the `/api/v1/health` endpoint and also updated relevant docs but we've the primary `/health` too which is missing this information. The PR addresses this missing status report for triggerer health in the `/health` endpoint. It also attempts to deduplicate the code between those endpoints so that in future we need to make necessary changes in only one place and at the same time ensure that change made in one endpoint is not missed for the other endpoint serving the same purpose and thus ensuring consistency in the responses. fixes: #31522 --- .../endpoints/health_endpoint.py | 49 ++----------- airflow/utils/airflow_health.py | 69 +++++++++++++++++++ airflow/www/views.py | 25 ++----- 3 files changed, 77 insertions(+), 66 deletions(-) create mode 100644 airflow/utils/airflow_health.py diff --git a/airflow/api_connexion/endpoints/health_endpoint.py b/airflow/api_connexion/endpoints/health_endpoint.py index 3389343646780..f1b686a889b7c 100644 --- a/airflow/api_connexion/endpoints/health_endpoint.py +++ b/airflow/api_connexion/endpoints/health_endpoint.py @@ -18,51 +18,10 @@ from airflow.api_connexion.schemas.health_schema import health_schema from airflow.api_connexion.types import APIResponse -from airflow.jobs.scheduler_job_runner import SchedulerJobRunner -from airflow.jobs.triggerer_job_runner import TriggererJobRunner - -HEALTHY = "healthy" -UNHEALTHY = "unhealthy" +from airflow.utils.airflow_health import get_airflow_health def get_health() -> APIResponse: - """Return the health of the airflow scheduler and metadatabase.""" - metadatabase_status = HEALTHY - latest_scheduler_heartbeat = None - latest_triggerer_heartbeat = None - scheduler_status = UNHEALTHY - triggerer_status: str | None = UNHEALTHY - try: - scheduler_job = SchedulerJobRunner.most_recent_job() - - if scheduler_job: - latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() - if scheduler_job.is_alive(): - scheduler_status = HEALTHY - except Exception: - metadatabase_status = UNHEALTHY - try: - triggerer_job = TriggererJobRunner.most_recent_job() - - if triggerer_job: - latest_triggerer_heartbeat = triggerer_job.latest_heartbeat.isoformat() - if triggerer_job.is_alive(): - triggerer_status = HEALTHY - else: - triggerer_status = None - except Exception: - metadatabase_status = UNHEALTHY - - payload = { - "metadatabase": {"status": metadatabase_status}, - "scheduler": { - "status": scheduler_status, - "latest_scheduler_heartbeat": latest_scheduler_heartbeat, - }, - "triggerer": { - "status": triggerer_status, - "latest_triggerer_heartbeat": latest_triggerer_heartbeat, - }, - } - - return health_schema.dump(payload) + """Return the health of the airflow scheduler, metadatabase and triggerer.""" + airflow_health_status = get_airflow_health() + return health_schema.dump(airflow_health_status) diff --git a/airflow/utils/airflow_health.py b/airflow/utils/airflow_health.py new file mode 100644 index 0000000000000..6a8dc1c257b92 --- /dev/null +++ b/airflow/utils/airflow_health.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.jobs.scheduler_job_runner import SchedulerJobRunner +from airflow.jobs.triggerer_job_runner import TriggererJobRunner + +HEALTHY = "healthy" +UNHEALTHY = "unhealthy" + + +def get_airflow_health(): + """Get the health for Airflow metadatabase, scheduler and triggerer.""" + metadatabase_status = HEALTHY + latest_scheduler_heartbeat = None + latest_triggerer_heartbeat = None + scheduler_status = UNHEALTHY + triggerer_status: str | None = UNHEALTHY + + try: + latest_scheduler_job = SchedulerJobRunner.most_recent_job() + + if latest_scheduler_job: + latest_scheduler_heartbeat = latest_scheduler_job.latest_heartbeat.isoformat() + if latest_scheduler_job.is_alive(): + scheduler_status = HEALTHY + except Exception: + metadatabase_status = UNHEALTHY + + try: + latest_triggerer_job = TriggererJobRunner.most_recent_job() + + if latest_triggerer_job: + latest_triggerer_heartbeat = latest_triggerer_job.latest_heartbeat.isoformat() + if latest_triggerer_job.is_alive(): + triggerer_status = HEALTHY + else: + triggerer_status = None + except Exception: + metadatabase_status = UNHEALTHY + + airflow_health_status = { + "metadatabase": {"status": metadatabase_status}, + "scheduler": { + "status": scheduler_status, + "latest_scheduler_heartbeat": latest_scheduler_heartbeat, + }, + "triggerer": { + "status": triggerer_status, + "latest_triggerer_heartbeat": latest_triggerer_heartbeat, + }, + } + + return airflow_health_status diff --git a/airflow/www/views.py b/airflow/www/views.py index 54e9053638d45..d7d36fac54b5c 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -109,6 +109,7 @@ from airflow.timetables.base import DataInterval, TimeRestriction from airflow.utils import json as utils_json, timezone, yaml from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.airflow_health import get_airflow_health from airflow.utils.dag_edges import dag_edges from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.utils.docs import get_doc_url_for_provider, get_docs_url @@ -650,29 +651,11 @@ class Airflow(AirflowBaseView): def health(self): """ An endpoint helping check the health status of the Airflow instance, - including metadatabase and scheduler. + including metadatabase, scheduler and triggerer. """ - payload = {"metadatabase": {"status": "unhealthy"}} + airflow_health_status = get_airflow_health() - latest_scheduler_heartbeat = None - scheduler_status = "unhealthy" - payload["metadatabase"] = {"status": "healthy"} - try: - scheduler_job = SchedulerJobRunner.most_recent_job() - - if scheduler_job: - latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() - if scheduler_job.is_alive(): - scheduler_status = "healthy" - except Exception: - payload["metadatabase"]["status"] = "unhealthy" - - payload["scheduler"] = { - "status": scheduler_status, - "latest_scheduler_heartbeat": latest_scheduler_heartbeat, - } - - return flask.json.jsonify(payload) + return flask.json.jsonify(airflow_health_status) @expose("/home") @auth.has_access( From a3731e99d2e04e032be912930d6a3994667c969b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 26 May 2023 02:15:51 +0530 Subject: [PATCH 2/4] Move airflow_health.py to api/common and add tests --- .../{utils => api/common}/airflow_health.py | 0 .../endpoints/health_endpoint.py | 2 +- airflow/www/views.py | 2 +- tests/api/__init__.py | 16 +++ tests/api/common/__init__.py | 16 +++ tests/api/common/test_airflow_health.py | 100 ++++++++++++++++++ 6 files changed, 134 insertions(+), 2 deletions(-) rename airflow/{utils => api/common}/airflow_health.py (100%) create mode 100644 tests/api/__init__.py create mode 100644 tests/api/common/__init__.py create mode 100644 tests/api/common/test_airflow_health.py diff --git a/airflow/utils/airflow_health.py b/airflow/api/common/airflow_health.py similarity index 100% rename from airflow/utils/airflow_health.py rename to airflow/api/common/airflow_health.py diff --git a/airflow/api_connexion/endpoints/health_endpoint.py b/airflow/api_connexion/endpoints/health_endpoint.py index f1b686a889b7c..6b3e53926e90a 100644 --- a/airflow/api_connexion/endpoints/health_endpoint.py +++ b/airflow/api_connexion/endpoints/health_endpoint.py @@ -16,9 +16,9 @@ # under the License. from __future__ import annotations +from airflow.api.common.airflow_health import get_airflow_health from airflow.api_connexion.schemas.health_schema import health_schema from airflow.api_connexion.types import APIResponse -from airflow.utils.airflow_health import get_airflow_health def get_health() -> APIResponse: diff --git a/airflow/www/views.py b/airflow/www/views.py index d7d36fac54b5c..a43c5db7c2dde 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -77,6 +77,7 @@ import airflow from airflow import models, plugins_manager, settings +from airflow.api.common.airflow_health import get_airflow_health from airflow.api.common.mark_tasks import ( set_dag_run_state_to_failed, set_dag_run_state_to_queued, @@ -109,7 +110,6 @@ from airflow.timetables.base import DataInterval, TimeRestriction from airflow.utils import json as utils_json, timezone, yaml from airflow.utils.airflow_flask_app import get_airflow_app -from airflow.utils.airflow_health import get_airflow_health from airflow.utils.dag_edges import dag_edges from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.utils.docs import get_doc_url_for_provider, get_docs_url diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/api/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/api/common/__init__.py b/tests/api/common/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/api/common/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/api/common/test_airflow_health.py b/tests/api/common/test_airflow_health.py new file mode 100644 index 0000000000000..1f778f7a25076 --- /dev/null +++ b/tests/api/common/test_airflow_health.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime +from unittest.mock import MagicMock + +from airflow.api.common.airflow_health import ( + HEALTHY, + UNHEALTHY, + SchedulerJobRunner, + TriggererJobRunner, + get_airflow_health, +) + + +def test_get_airflow_health_only_metadatabase_healthy(): + SchedulerJobRunner.most_recent_job = MagicMock(return_value=None) + TriggererJobRunner.most_recent_job = MagicMock(return_value=None) + + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": HEALTHY}, + "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None}, + "triggerer": {"status": None, "latest_triggerer_heartbeat": None}, + } + + assert health_status == expected_status + + +def test_get_airflow_health_metadatabase_unhealthy(): + SchedulerJobRunner.most_recent_job = MagicMock(side_effect=Exception) + TriggererJobRunner.most_recent_job = MagicMock(side_effect=Exception) + + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": UNHEALTHY}, + "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None}, + "triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None}, + } + + assert health_status == expected_status + + +def test_get_airflow_health_scheduler_healthy_no_triggerer(): + latest_scheduler_job_mock = MagicMock() + latest_scheduler_job_mock.latest_heartbeat = datetime.now() + latest_scheduler_job_mock.is_alive = MagicMock(return_value=True) + SchedulerJobRunner.most_recent_job = MagicMock(return_value=latest_scheduler_job_mock) + TriggererJobRunner.most_recent_job = MagicMock(return_value=None) + + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": HEALTHY}, + "scheduler": { + "status": HEALTHY, + "latest_scheduler_heartbeat": latest_scheduler_job_mock.latest_heartbeat.isoformat(), + }, + "triggerer": {"status": None, "latest_triggerer_heartbeat": None}, + } + + assert health_status == expected_status + + +def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(): + latest_triggerer_job_mock = MagicMock() + latest_triggerer_job_mock.latest_heartbeat = datetime.now() + latest_triggerer_job_mock.is_alive = MagicMock(return_value=True) + SchedulerJobRunner.most_recent_job = MagicMock(return_value=None) + TriggererJobRunner.most_recent_job = MagicMock(return_value=latest_triggerer_job_mock) + + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": HEALTHY}, + "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None}, + "triggerer": { + "status": HEALTHY, + "latest_triggerer_heartbeat": latest_triggerer_job_mock.latest_heartbeat.isoformat(), + }, + } + + assert health_status == expected_status From 7b63d3638e68f1e0232950c1b2d5f7fae95f2084 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 26 May 2023 23:43:58 +0530 Subject: [PATCH 3/4] Add return type annotation --- airflow/api/common/airflow_health.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/api/common/airflow_health.py b/airflow/api/common/airflow_health.py index 6a8dc1c257b92..2297cdd5424d3 100644 --- a/airflow/api/common/airflow_health.py +++ b/airflow/api/common/airflow_health.py @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - from __future__ import annotations from airflow.jobs.scheduler_job_runner import SchedulerJobRunner @@ -24,7 +23,7 @@ UNHEALTHY = "unhealthy" -def get_airflow_health(): +def get_airflow_health() -> dict[str, dict[str, str | None]]: """Get the health for Airflow metadatabase, scheduler and triggerer.""" metadatabase_status = HEALTHY latest_scheduler_heartbeat = None From 22994d7cbbe3b0fc02852eacd2223f9b0dd84b84 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sat, 27 May 2023 00:05:14 +0530 Subject: [PATCH 4/4] Use typing.Any instead of being descriptive as mypy complains --- airflow/api/common/airflow_health.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/api/common/airflow_health.py b/airflow/api/common/airflow_health.py index 2297cdd5424d3..3174add64842d 100644 --- a/airflow/api/common/airflow_health.py +++ b/airflow/api/common/airflow_health.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from typing import Any + from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.jobs.triggerer_job_runner import TriggererJobRunner @@ -23,7 +25,7 @@ UNHEALTHY = "unhealthy" -def get_airflow_health() -> dict[str, dict[str, str | None]]: +def get_airflow_health() -> dict[str, Any]: """Get the health for Airflow metadatabase, scheduler and triggerer.""" metadatabase_status = HEALTHY latest_scheduler_heartbeat = None