Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update backfill job, and setup a cronjob in prod #1077

Merged
merged 9 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.11.0
version: 1.12.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
3 changes: 3 additions & 0 deletions chart/env/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ cacheMaintenance:
cpu: 1

# --- cron jobs ---
backfill:
enabled: false

metricsCollector:
action: "collect-metrics"
schedule: "*/5 * * * *"
Expand Down
14 changes: 14 additions & 0 deletions chart/env/prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ cacheMaintenance:
memory: "8Gi"

# --- cron jobs ---
backfill:
enabled: true
action: "backfill"
schedule: "0 * * * *"
# every hour
nodeSelector: {}
resources:
requests:
cpu: 1
limits:
cpu: 1
memory: "512Mi"
tolerations: []

metricsCollector:
action: "collect-metrics"
schedule: "*/2 * * * *"
Expand Down
5 changes: 5 additions & 0 deletions chart/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ app.kubernetes.io/component: "{{ include "name" . }}-cache-maintenance"
app.kubernetes.io/component: "{{ include "name" . }}-metrics-collector"
{{- end -}}

{{- define "labels.backfill" -}}
{{ include "hf.labels.commons" . }}
app.kubernetes.io/component: "{{ include "name" . }}-backfill"
{{- end -}}

{{- define "labels.admin" -}}
{{ include "hf.labels.commons" . }}
app.kubernetes.io/component: "{{ include "name" . }}-admin"
Expand Down
19 changes: 19 additions & 0 deletions chart/templates/cron-jobs/backfill/_container.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

{{- define "containerBackfill" -}}
- name: "{{ include "name" . }}-backfill"
image: {{ include "jobs.cacheMaintenance.image" . }}
imagePullPolicy: {{ .Values.images.pullPolicy }}
securityContext:
allowPrivilegeEscalation: false
resources: {{ toYaml .Values.backfill.resources | nindent 4 }}
env:
{{ include "envLog" . | nindent 2 }}
{{ include "envCache" . | nindent 2 }}
{{ include "envQueue" . | nindent 2 }}
{{ include "envCommon" . | nindent 2 }}
{{ include "envMetrics" . | nindent 2 }}
- name: CACHE_MAINTENANCE_ACTION
value: {{ .Values.backfill.action | quote }}
{{- end -}}
24 changes: 24 additions & 0 deletions chart/templates/cron-jobs/backfill/job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

{{- if and .Values.images.jobs.cacheMaintenance .Values.backfill.enabled }}
apiVersion: batch/v1
kind: CronJob
metadata:
labels: {{ include "labels.backfill" . | nindent 4 }}
name: "{{ include "name" . }}-job-backfill"
namespace: {{ .Release.Namespace }}
spec:
schedule: {{ .Values.backfill.schedule | quote }}
jobTemplate:
spec:
ttlSecondsAfterFinished: 3600
template:
spec:
restartPolicy: OnFailure
{{- include "image.imagePullSecrets" . | nindent 6 }}
nodeSelector: {{ toYaml .Values.backfill.nodeSelector | nindent 12 }}
tolerations: {{ toYaml .Values.backfill.tolerations | nindent 12 }}
containers: {{ include "containerBackfill" . | nindent 12 }}
securityContext: {{ include "securityContext" . | nindent 12 }}
{{- end}}
14 changes: 14 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ cacheMaintenance:
tolerations: []

# --- cron jobs ---
backfill:
enabled: false
action: "backfill"
schedule: "0 */3 * * *"
# every 3 hours
nodeSelector: {}
resources:
requests:
cpu: 0
limits:
cpu: 0
tolerations: []


metricsCollector:
action: "collect-metrics"
schedule: "*/5 * * * *"
Expand Down
29 changes: 29 additions & 0 deletions jobs/cache_maintenance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Datasets server maintenance job

> Job to run maintenance actions on the datasets-server

Available actions:

- `backfill`: backfill the cache (i.e. create jobs to add the missing entries or update the outdated entries)
- `metrics`: compute and store the cache and queue metrics
- `skip`: do nothing

## Configuration

The script can be configured using environment variables. They are grouped by scope.

### Actions

Set environment variables to configure the job (`CACHE_MAINTENANCE_` prefix):

- `CACHE_MAINTENANCE_ACTION`: the action to launch, among `backfill`, `metrics`, `skip`. Defaults to `skip`.

### Common

See [../../libs/libcommon/README.md](../../libs/libcommon/README.md) for more information about the common configuration.

## Launch

```shell
make run
```
49 changes: 30 additions & 19 deletions jobs/cache_maintenance/src/cache_maintenance/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,44 @@
import logging
from typing import Optional

import libcommon
from libcommon.operations import update_dataset
from libcommon.processing_graph import ProcessingStep
from libcommon.queue import Priority
from libcommon.dataset import get_supported_dataset_infos
from libcommon.processing_graph import ProcessingGraph
from libcommon.state import DatasetState


def backfill_cache(
init_processing_steps: list[ProcessingStep],
processing_graph: ProcessingGraph,
hf_endpoint: str,
hf_token: Optional[str] = None,
) -> None:
logging.info("backfill init processing steps for supported datasets")
supported_datasets = libcommon.dataset.get_supported_datasets(hf_endpoint=hf_endpoint, hf_token=hf_token)
logging.info(f"about to backfill {len(supported_datasets)} datasets")
logging.info("backfill supported datasets")
supported_dataset_infos = get_supported_dataset_infos(hf_endpoint=hf_endpoint, hf_token=hf_token)
logging.info(f"analyzing {len(supported_dataset_infos)} supported datasets")
analyzed_datasets = 0
backfilled_datasets = 0
created_jobs = 0
log_batch = 100
for dataset in libcommon.dataset.get_supported_datasets(hf_endpoint=hf_endpoint, hf_token=hf_token):
update_dataset(
dataset=dataset,
init_processing_steps=init_processing_steps,
hf_endpoint=hf_endpoint,
hf_token=hf_token,
force=False,
priority=Priority.LOW,
do_check_support=False,

def log() -> None:
logging.info(
f" {analyzed_datasets} analyzed datasets: {backfilled_datasets} backfilled datasets"
f" ({100 * backfilled_datasets / analyzed_datasets:.2f}%), with {created_jobs} created jobs."
)
backfilled_datasets += 1

for dataset_info in supported_dataset_infos:
analyzed_datasets += 1

dataset = dataset_info.id
if not dataset:
# should not occur
continue
dataset_state = DatasetState(dataset=dataset, processing_graph=processing_graph, revision=dataset_info.sha)
if dataset_state.should_be_backfilled:
backfilled_datasets += 1
created_jobs += dataset_state.backfill()

if backfilled_datasets % log_batch == 0:
logging.info(f"{backfilled_datasets} datasets have been backfilled")
log()

log()
logging.info("backfill completed")
14 changes: 4 additions & 10 deletions jobs/cache_maintenance/src/cache_maintenance/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
from cache_maintenance.backfill import backfill_cache
from cache_maintenance.config import JobConfig
from cache_maintenance.metrics import collect_metrics
from cache_maintenance.upgrade import upgrade_cache


def run_job() -> None:
job_config = JobConfig.from_env()
action = job_config.action
supported_actions = ["backfill", "upgrade", "collect-metrics"]
supported_actions = ["backfill", "collect-metrics", "skip"]
# In the future we will support other kind of actions
if not action:
logging.warning("No action mode was selected, skipping tasks.")
Expand Down Expand Up @@ -55,21 +54,16 @@ def run_job() -> None:
return

processing_graph = ProcessingGraph(job_config.graph.specification)
init_processing_steps = processing_graph.get_first_steps()
processing_steps = list(processing_graph.steps.values())
start_time = datetime.now()

if action == "backfill":
backfill_cache(
init_processing_steps=init_processing_steps,
processing_graph=processing_graph,
hf_endpoint=job_config.common.hf_endpoint,
hf_token=job_config.common.hf_token,
)
if action == "upgrade":
upgrade_cache(processing_steps)

if action == "collect-metrics":
collect_metrics(processing_steps)
elif action == "collect-metrics":
collect_metrics(processing_graph=processing_graph)

end_time = datetime.now()
logging.info(f"Duration: {end_time - start_time}")
Expand Down
6 changes: 3 additions & 3 deletions jobs/cache_maintenance/src/cache_maintenance/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import logging

from libcommon.metrics import CacheTotalMetric, JobTotalMetric
from libcommon.processing_graph import ProcessingStep
from libcommon.processing_graph import ProcessingGraph
from libcommon.queue import Queue
from libcommon.simple_cache import get_responses_count_by_kind_status_and_error_code


def collect_metrics(processing_steps: list[ProcessingStep]) -> None:
def collect_metrics(processing_graph: ProcessingGraph) -> None:
logging.info("collecting jobs metrics")
queue = Queue()
for processing_step in processing_steps:
for processing_step in processing_graph.steps.values():
for status, total in queue.get_jobs_count_by_status(job_type=processing_step.job_type).items():
JobTotalMetric.objects(queue=processing_step.job_type, status=status).upsert_one(total=total)

Expand Down
43 changes: 0 additions & 43 deletions jobs/cache_maintenance/src/cache_maintenance/upgrade.py

This file was deleted.

32 changes: 0 additions & 32 deletions jobs/cache_maintenance/tests/test_backfill_cache.py

This file was deleted.

34 changes: 15 additions & 19 deletions jobs/cache_maintenance/tests/test_collect_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from http import HTTPStatus

from libcommon.metrics import CacheTotalMetric, JobTotalMetric
from libcommon.processing_graph import ProcessingStep
from libcommon.processing_graph import ProcessingGraph
from libcommon.queue import Queue
from libcommon.simple_cache import upsert_response

Expand All @@ -17,28 +17,24 @@ def test_collect_metrics() -> None:
split = None
content = {"some": "content"}

test_type = "test_type"
step_name = "test_type"
processing_graph = ProcessingGraph(
processing_graph_specification={step_name: {"input_type": "dataset", "job_runner_version": 1}}
)
queue = Queue()
queue.upsert_job(job_type=test_type, dataset="dataset", config="config", split="split")

queue.upsert_job(
job_type=processing_graph.get_step(step_name).job_type, dataset="dataset", config="config", split="split"
)
upsert_response(
kind=test_type, dataset=dataset, config=config, split=split, content=content, http_status=HTTPStatus.OK
kind=processing_graph.get_step(step_name).cache_kind,
dataset=dataset,
config=config,
split=split,
content=content,
http_status=HTTPStatus.OK,
)

processing_steps = [
ProcessingStep(
name=test_type,
input_type="dataset",
requires=[],
required_by_dataset_viewer=False,
ancestors=[],
children=[],
parents=[],
job_runner_version=1,
)
]

collect_metrics(processing_steps=processing_steps)
collect_metrics(processing_graph=processing_graph)

cache_metrics = CacheTotalMetric.objects()
assert cache_metrics
Expand Down
Loading