Skip to content

Commit

Permalink
Include triggerer health status in Airflow /health endpoint (#31529)
Browse files Browse the repository at this point in the history
PR apache/airflow#27755 introduced sending triggerer
health status in the `/api/v1/health` endpoint and also updated relevant
docs but we've the primary `/health` too which is missing this information.
The PR addresses this missing status report for triggerer health in the
`/health` endpoint. It also attempts to deduplicate the code between those
endpoints so that in future we need to make necessary changes in only one
place and at the same time ensure that change made in one endpoint is not
missed for the other endpoint serving the same purpose and thus ensuring
consistency in the responses.

fixes: #31522
GitOrigin-RevId: f048aba47e079e0c81417170a5ac582ed00595c4
  • Loading branch information
pankajkoti authored and Cloud Composer Team committed May 15, 2024
1 parent 462eef7 commit 1ba9899
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 66 deletions.
70 changes: 70 additions & 0 deletions airflow/api/common/airflow_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any

from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner

HEALTHY = "healthy"
UNHEALTHY = "unhealthy"


def get_airflow_health() -> dict[str, Any]:
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
latest_triggerer_heartbeat = None
scheduler_status = UNHEALTHY
triggerer_status: str | None = UNHEALTHY

try:
latest_scheduler_job = SchedulerJobRunner.most_recent_job()

if latest_scheduler_job:
latest_scheduler_heartbeat = latest_scheduler_job.latest_heartbeat.isoformat()
if latest_scheduler_job.is_alive():
scheduler_status = HEALTHY
except Exception:
metadatabase_status = UNHEALTHY

try:
latest_triggerer_job = TriggererJobRunner.most_recent_job()

if latest_triggerer_job:
latest_triggerer_heartbeat = latest_triggerer_job.latest_heartbeat.isoformat()
if latest_triggerer_job.is_alive():
triggerer_status = HEALTHY
else:
triggerer_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
"status": scheduler_status,
"latest_scheduler_heartbeat": latest_scheduler_heartbeat,
},
"triggerer": {
"status": triggerer_status,
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
},
}

return airflow_health_status
49 changes: 4 additions & 45 deletions airflow/api_connexion/endpoints/health_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,12 @@
# under the License.
from __future__ import annotations

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_connexion.schemas.health_schema import health_schema
from airflow.api_connexion.types import APIResponse
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner

HEALTHY = "healthy"
UNHEALTHY = "unhealthy"


def get_health() -> APIResponse:
"""Return the health of the airflow scheduler and metadatabase."""
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
latest_triggerer_heartbeat = None
scheduler_status = UNHEALTHY
triggerer_status: str | None = UNHEALTHY
try:
scheduler_job = SchedulerJobRunner.most_recent_job()

if scheduler_job:
latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat()
if scheduler_job.is_alive():
scheduler_status = HEALTHY
except Exception:
metadatabase_status = UNHEALTHY
try:
triggerer_job = TriggererJobRunner.most_recent_job()

if triggerer_job:
latest_triggerer_heartbeat = triggerer_job.latest_heartbeat.isoformat()
if triggerer_job.is_alive():
triggerer_status = HEALTHY
else:
triggerer_status = None
except Exception:
metadatabase_status = UNHEALTHY

payload = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
"status": scheduler_status,
"latest_scheduler_heartbeat": latest_scheduler_heartbeat,
},
"triggerer": {
"status": triggerer_status,
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
},
}

return health_schema.dump(payload)
"""Return the health of the airflow scheduler, metadatabase and triggerer."""
airflow_health_status = get_airflow_health()
return health_schema.dump(airflow_health_status)
25 changes: 4 additions & 21 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

import airflow
from airflow import models, plugins_manager, settings
from airflow.api.common.airflow_health import get_airflow_health
from airflow.api.common.mark_tasks import (
set_dag_run_state_to_failed,
set_dag_run_state_to_queued,
Expand Down Expand Up @@ -650,29 +651,11 @@ class Airflow(AirflowBaseView):
def health(self):
"""
An endpoint helping check the health status of the Airflow instance,
including metadatabase and scheduler.
including metadatabase, scheduler and triggerer.
"""
payload = {"metadatabase": {"status": "unhealthy"}}
airflow_health_status = get_airflow_health()

latest_scheduler_heartbeat = None
scheduler_status = "unhealthy"
payload["metadatabase"] = {"status": "healthy"}
try:
scheduler_job = SchedulerJobRunner.most_recent_job()

if scheduler_job:
latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat()
if scheduler_job.is_alive():
scheduler_status = "healthy"
except Exception:
payload["metadatabase"]["status"] = "unhealthy"

payload["scheduler"] = {
"status": scheduler_status,
"latest_scheduler_heartbeat": latest_scheduler_heartbeat,
}

return flask.json.jsonify(payload)
return flask.json.jsonify(airflow_health_status)

@expose("/home")
@auth.has_access(
Expand Down
16 changes: 16 additions & 0 deletions tests/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions tests/api/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
100 changes: 100 additions & 0 deletions tests/api/common/test_airflow_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime
from unittest.mock import MagicMock

from airflow.api.common.airflow_health import (
HEALTHY,
UNHEALTHY,
SchedulerJobRunner,
TriggererJobRunner,
get_airflow_health,
)


def test_get_airflow_health_only_metadatabase_healthy():
SchedulerJobRunner.most_recent_job = MagicMock(return_value=None)
TriggererJobRunner.most_recent_job = MagicMock(return_value=None)

health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {"status": None, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


def test_get_airflow_health_metadatabase_unhealthy():
SchedulerJobRunner.most_recent_job = MagicMock(side_effect=Exception)
TriggererJobRunner.most_recent_job = MagicMock(side_effect=Exception)

health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": UNHEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


def test_get_airflow_health_scheduler_healthy_no_triggerer():
latest_scheduler_job_mock = MagicMock()
latest_scheduler_job_mock.latest_heartbeat = datetime.now()
latest_scheduler_job_mock.is_alive = MagicMock(return_value=True)
SchedulerJobRunner.most_recent_job = MagicMock(return_value=latest_scheduler_job_mock)
TriggererJobRunner.most_recent_job = MagicMock(return_value=None)

health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": latest_scheduler_job_mock.latest_heartbeat.isoformat(),
},
"triggerer": {"status": None, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record():
latest_triggerer_job_mock = MagicMock()
latest_triggerer_job_mock.latest_heartbeat = datetime.now()
latest_triggerer_job_mock.is_alive = MagicMock(return_value=True)
SchedulerJobRunner.most_recent_job = MagicMock(return_value=None)
TriggererJobRunner.most_recent_job = MagicMock(return_value=latest_triggerer_job_mock)

health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": latest_triggerer_job_mock.latest_heartbeat.isoformat(),
},
}

assert health_status == expected_status

0 comments on commit 1ba9899

Please sign in to comment.