Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: minimum scheduler age before task-creation-check #612

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions charts/airflow/docs/faq/monitoring/scheduler-liveness-probe.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ for each airflow scheduler which regularly queries the Airflow Metadata Database
A scheduler is "healthy" if it has had a "heartbeat" in the last `AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD` seconds.
Each scheduler will perform a "heartbeat" every `AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC` seconds by updating the `latest_heartbeat` of its `SchedulerJob` in the Airflow Metadata `jobs` table.

> 🟥 __Warning__ 🟥
>
> A scheduler can have a "heartbeat" but be deadlocked such that it's unable to schedule new tasks,
> we provide the [`scheduler.livenessProbe.taskCreationCheck.*`](#scheduler-task-creation-check) values to automatically restart the scheduler in these cases.
>
> - https://github.com/apache/airflow/issues/7935 - patched in airflow `2.0.2`
> - https://github.com/apache/airflow/issues/15938 - patched in airflow `2.1.1`

By default, the chart runs a liveness probe every __30 seconds__ (`periodSeconds`), and will restart a scheduler if __5 probe failures__ (`failureThreshold`) occur in a row.
This means a scheduler must be unhealthy for at least `30 x 5 = 150` seconds before Kubernetes will automatically restart a scheduler Pod.

Expand Down Expand Up @@ -49,6 +41,14 @@ scheduler:
failureThreshold: 5
```

> 🟥 __Warning__ 🟥
>
> A scheduler can have a "heartbeat" but be deadlocked such that it's unable to schedule new tasks,
> the ["task creation check"](#scheduler-task-creation-check) should detect these situations and force a scheduler restart.
>
> - https://github.com/apache/airflow/issues/7935 - patched in airflow `2.0.2`
> - https://github.com/apache/airflow/issues/15938 - patched in airflow `2.1.1`

## Scheduler "Task Creation Check"

The liveness probe can additionally check if the Scheduler is creating new [tasks](https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html) as an indication of its health.
Expand All @@ -73,6 +73,11 @@ scheduler:
## WARNING: must be AT LEAST equal to your shortest DAG schedule_interval
## WARNING: DummyOperator tasks will NOT be seen by this probe
thresholdSeconds: 300

## minimum number of seconds the scheduler must have run before the task creation check begins
## WARNING: must be long enough for the scheduler to boot and create a task
##
schedulerAgeBeforeCheck: 180
```

> 🟦 __Tip__ 🟦
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/examples/google-gke/custom-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/examples/minikube/custom-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/sample-values-CeleryExecutor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/sample-values-KubernetesExecutor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
45 changes: 28 additions & 17 deletions charts/airflow/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ spec:
{{- end }}

with create_session() as session:
########################
# heartbeat check
########################
# ensure the SchedulerJob with most recent heartbeat for this `hostname` is alive
hostname = get_hostname()
scheduler_job = session \
Expand All @@ -161,29 +164,37 @@ spec:
pass
else:
sys.exit(f"The SchedulerJob (id={scheduler_job.id}) for hostname '{hostname}' is not alive")

{{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }}
{{- $min_scheduler_age := .Values.scheduler.livenessProbe.taskCreationCheck.schedulerAgeBeforeCheck }}
{{- if not (or (typeIs "float64" $min_scheduler_age) (typeIs "int64" $min_scheduler_age)) }}
{{- /* the type of a number could be float64 or int64 depending on how it was set (values.yaml, or --set) */ -}}
{{ required (printf "`scheduler.livenessProbe.taskCreationCheck.schedulerAgeBeforeCheck` must be int-type, but got %s!" (typeOf $min_scheduler_age)) nil }}
{{- end }}
{{- $task_job_threshold := .Values.scheduler.livenessProbe.taskCreationCheck.thresholdSeconds }}
{{- if not (or (typeIs "float64" $task_job_threshold) (typeIs "int64" $task_job_threshold)) }}
{{- /* the type of a number could be float64 or int64 depending on how it was set (values.yaml, or --set) */ -}}
{{ required (printf "`scheduler.livenessProbe.taskCreationCheck.thresholdSeconds` must be int-type, but got %s!" (typeOf $task_job_threshold)) nil }}
{{- end }}

# ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds
task_job_threshold = {{ $task_job_threshold }}
task_job = session \
.query(LocalTaskJob) \
.order_by(LocalTaskJob.id.desc()) \
.limit(1) \
.first()
if task_job is not None:
if (timezone.utcnow() - task_job.start_date).total_seconds() < task_job_threshold:
pass
else:
sys.exit(
f"The most recent LocalTaskJob (id={task_job.id}, dag_id={task_job.dag_id}) "
f"started over {task_job_threshold} seconds ago"
)
########################
# task creation check
########################
min_scheduler_age = {{ $min_scheduler_age }}
if (timezone.utcnow() - scheduler_job.start_date).total_seconds() > min_scheduler_age:
# ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds
task_job_threshold = {{ $task_job_threshold }}
task_job = session \
.query(LocalTaskJob) \
.order_by(LocalTaskJob.id.desc()) \
.limit(1) \
.first()
if task_job is not None:
if (timezone.utcnow() - task_job.start_date).total_seconds() < task_job_threshold:
pass
else:
sys.exit(
f"The most recent LocalTaskJob (id={task_job.id}, dag_id={task_job.dag_id}) "
f"started over {task_job_threshold} seconds ago"
)
{{- end }}
{{- end }}
{{- if or ($volumeMounts) (include "airflow.executor.kubernetes_like" .) }}
Expand Down
5 changes: 5 additions & 0 deletions charts/airflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,11 @@ scheduler:
##
thresholdSeconds: 300

## minimum number of seconds the scheduler must have run before the task creation check begins
## - [WARNING] must be long enough for the scheduler to boot and create a task
##
schedulerAgeBeforeCheck: 180

## extra pip packages to install in the scheduler Pods
##
## ____ EXAMPLE _______________
Expand Down