Skip to content

Commit

Permalink
Migrate health info to fastapi (apache#42938)
Browse files Browse the repository at this point in the history
* Add health info to public fastapi

* Add test and fix TaskInstanceState name
  • Loading branch information
bbovenzi authored Oct 15, 2024
1 parent 330cdf0 commit f708fc9
Show file tree
Hide file tree
Showing 14 changed files with 561 additions and 10 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/health_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_connexion.schemas.health_schema import health_schema
from airflow.utils.api_migration import mark_fastapi_migration_done

if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
def get_health() -> APIResponse:
"""Return the health of the airflow scheduler, metadatabase and triggerer."""
airflow_health_status = get_airflow_health()
Expand Down
103 changes: 100 additions & 3 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,33 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
- Monitor
summary: Get Health
operationId: get_health
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/HealthInfoSchema'
components:
schemas:
BaseInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
type: object
required:
- status
title: BaseInfoSchema
description: Base status field for metadatabase and scheduler.
ConnectionCollectionResponse:
properties:
connections:
Expand Down Expand Up @@ -1553,6 +1578,24 @@ components:
- dataset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DagProcessorInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
latest_dag_processor_heartbeat:
anyOf:
- type: string
- type: 'null'
title: Latest Dag Processor Heartbeat
type: object
required:
- status
- latest_dag_processor_heartbeat
title: DagProcessorInfoSchema
description: Schema for DagProcessor info.
DagRunState:
type: string
enum:
Expand Down Expand Up @@ -1627,22 +1670,58 @@ components:
title: Detail
type: object
title: HTTPValidationError
HealthInfoSchema:
properties:
metadatabase:
$ref: '#/components/schemas/BaseInfoSchema'
scheduler:
$ref: '#/components/schemas/SchedulerInfoSchema'
triggerer:
$ref: '#/components/schemas/TriggererInfoSchema'
dag_processor:
$ref: '#/components/schemas/DagProcessorInfoSchema'
type: object
required:
- metadatabase
- scheduler
- triggerer
- dag_processor
title: HealthInfoSchema
description: Schema for the Health endpoint.
HistoricalMetricDataResponse:
properties:
dag_run_types:
$ref: '#/components/schemas/DAGRunTypes'
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
$ref: '#/components/schemas/TaskInstantState'
$ref: '#/components/schemas/TaskInstanceState'
type: object
required:
- dag_run_types
- dag_run_states
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
TaskInstantState:
SchedulerInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
latest_scheduler_heartbeat:
anyOf:
- type: string
- type: 'null'
title: Latest Scheduler Heartbeat
type: object
required:
- status
- latest_scheduler_heartbeat
title: SchedulerInfoSchema
description: Schema for Scheduler info.
TaskInstanceState:
properties:
no_status:
type: integer
Expand Down Expand Up @@ -1698,8 +1777,26 @@ components:
- upstream_failed
- skipped
- deferred
title: TaskInstantState
title: TaskInstanceState
description: TaskInstance serializer for responses.
TriggererInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
latest_triggerer_heartbeat:
anyOf:
- type: string
- type: 'null'
title: Latest Triggerer Heartbeat
type: object
required:
- status
- latest_triggerer_heartbeat
title: TriggererInfoSchema
description: Schema for Triggerer info.
ValidationError:
properties:
loc:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/serializers/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DAGRunStates(BaseModel):
failed: int


class TaskInstantState(BaseModel):
class TaskInstanceState(BaseModel):
"""TaskInstance serializer for responses."""

no_status: int
Expand All @@ -60,4 +60,4 @@ class HistoricalMetricDataResponse(BaseModel):

dag_run_types: DAGRunTypes
dag_run_states: DAGRunStates
task_instance_states: TaskInstantState
task_instance_states: TaskInstanceState
52 changes: 52 additions & 0 deletions airflow/api_fastapi/serializers/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import BaseModel


class BaseInfoSchema(BaseModel):
"""Base status field for metadatabase and scheduler."""

status: str | None


class SchedulerInfoSchema(BaseInfoSchema):
"""Schema for Scheduler info."""

latest_scheduler_heartbeat: str | None


class TriggererInfoSchema(BaseInfoSchema):
"""Schema for Triggerer info."""

latest_triggerer_heartbeat: str | None


class DagProcessorInfoSchema(BaseInfoSchema):
"""Schema for DagProcessor info."""

latest_dag_processor_heartbeat: str | None


class HealthInfoSchema(BaseModel):
"""Schema for the Health endpoint."""

metadatabase: BaseInfoSchema
scheduler: SchedulerInfoSchema
triggerer: TriggererInfoSchema
dag_processor: DagProcessorInfoSchema
2 changes: 2 additions & 0 deletions airflow/api_fastapi/views/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow.api_fastapi.views.public.connections import connections_router
from airflow.api_fastapi.views.public.dag_run import dag_run_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.public.monitor import monitor_router
from airflow.api_fastapi.views.public.variables import variables_router
from airflow.api_fastapi.views.router import AirflowRouter

Expand All @@ -30,3 +31,4 @@
public_router.include_router(connections_router)
public_router.include_router(variables_router)
public_router.include_router(dag_run_router)
public_router.include_router(monitor_router)
30 changes: 30 additions & 0 deletions airflow/api_fastapi/views/public/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_fastapi.serializers.monitor import HealthInfoSchema
from airflow.api_fastapi.views.router import AirflowRouter

monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")


@monitor_router.get("/health")
async def get_health() -> HealthInfoSchema:
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
13 changes: 13 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
Expand Down Expand Up @@ -209,6 +210,18 @@ export const UseDagRunServiceGetDagRunKeyFn = (
},
queryKey?: Array<unknown>,
) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
export type MonitorServiceGetHealthQueryResult<
TData = MonitorServiceGetHealthDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth";
export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array<unknown>) => [
useMonitorServiceGetHealthKey,
...(queryKey ?? []),
];
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
Expand Down
11 changes: 11 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
Expand Down Expand Up @@ -259,3 +260,13 @@ export const prefetchUseDagRunServiceGetDagRun = (
queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
});
/**
* Get Health
* @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
export const prefetchUseMonitorServiceGetHealth = (queryClient: QueryClient) =>
queryClient.prefetchQuery({
queryKey: Common.UseMonitorServiceGetHealthKeyFn(),
queryFn: () => MonitorService.getHealth(),
});
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DAGPatchBody, DagRunState, VariableBody } from "../requests/types.gen";
Expand Down Expand Up @@ -331,6 +332,24 @@ export const useDagRunServiceGetDagRun = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
/**
* Get Health
* @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
export const useMonitorServiceGetHealth = <
TData = Common.MonitorServiceGetHealthDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
/**
* Post Variable
* Create a variable.
Expand Down
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
Expand Down Expand Up @@ -326,3 +327,21 @@ export const useDagRunServiceGetDagRunSuspense = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
/**
* Get Health
* @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
export const useMonitorServiceGetHealthSuspense = <
TData = Common.MonitorServiceGetHealthDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
Loading

0 comments on commit f708fc9

Please sign in to comment.