Skip to content

Commit

Permalink
feat: 🎸 use the new DatasetState.backfill() method to backfill
Browse files Browse the repository at this point in the history
Also: remove the "upgrade" action, since it's handled by backfill. Also:
remove test for backfill action, because it's not clear how it would
work (maybe -> e2e test)
  • Loading branch information
severo committed Apr 21, 2023
1 parent 8aab8cd commit f6118d3
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 184 deletions.
29 changes: 29 additions & 0 deletions jobs/cache_maintenance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Datasets server maintenance job

> Job to run maintenance actions on the datasets-server
Available actions:

- `backfill`: backfill the cache (i.e. create jobs to add the missing entries or update the outdated entries)
- `metrics`: compute and store the cache and queue metrics
- `skip`: do nothing

## Configuration

The script can be configured using environment variables. They are grouped by scope.

### Actions

Set environment variables to configure the job (`DATABASE_MIGRATIONS_` prefix):

- `CACHE_MAINTENANCE_ACTION`: the action to launch, among `backfill`, `metrics`, `skip`. Defaults to `skip`.

### Common

See [../../libs/libcommon/README.md](../../libs/libcommon/README.md) for more information about the common configuration.

## Launch

```shell
make run
```
50 changes: 31 additions & 19 deletions jobs/cache_maintenance/src/cache_maintenance/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,45 @@
import logging
from typing import Optional

import libcommon
from libcommon.operations import update_dataset
from libcommon.processing_graph import ProcessingStep
from libcommon.queue import Priority
from libcommon.dataset import get_supported_dataset_infos
from libcommon.processing_graph import ProcessingGraph
from libcommon.state import DatasetState


def backfill_cache(
init_processing_steps: list[ProcessingStep],
processing_graph: ProcessingGraph,
hf_endpoint: str,
hf_token: Optional[str] = None,
) -> None:
logging.info("backfill init processing steps for supported datasets")
supported_datasets = libcommon.dataset.get_supported_datasets(hf_endpoint=hf_endpoint, hf_token=hf_token)
logging.info(f"about to backfill {len(supported_datasets)} datasets")
logging.info("backfill supported datasets")
supported_dataset_infos = get_supported_dataset_infos(hf_endpoint=hf_endpoint, hf_token=hf_token)
logging.info(f"analyzing {len(supported_dataset_infos)} supported datasets")
analyzed_datasets = 0
backfilled_datasets = 0
created_jobs = 0
log_batch = 100
for dataset in libcommon.dataset.get_supported_datasets(hf_endpoint=hf_endpoint, hf_token=hf_token):
update_dataset(
dataset=dataset,
init_processing_steps=init_processing_steps,
hf_endpoint=hf_endpoint,
hf_token=hf_token,
force=False,
priority=Priority.LOW,
do_check_support=False,

def log() -> None:
logging.info(
f" {analyzed_datasets} analyzed datasets: {backfilled_datasets} backfilled datasets"
f" ({100 * backfilled_datasets / analyzed_datasets:.2f}%), with {created_jobs} created jobs."
)
backfilled_datasets += 1

for dataset_info in supported_dataset_infos:
analyzed_datasets += 1

dataset = dataset_info.id
if not dataset:
# should not occur
continue
dataset_state = DatasetState(dataset=dataset, processing_graph=processing_graph, revision=dataset_info.sha)
if dataset_state.should_be_backfilled:
backfilled_datasets += 1
created_jobs += dataset_state.backfill()

analyzed_datasets += 1
if backfilled_datasets % log_batch == 0:
logging.info(f"{backfilled_datasets} datasets have been backfilled")
log()

log()
logging.info("backfill completed")
14 changes: 4 additions & 10 deletions jobs/cache_maintenance/src/cache_maintenance/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
from cache_maintenance.backfill import backfill_cache
from cache_maintenance.config import JobConfig
from cache_maintenance.metrics import collect_metrics
from cache_maintenance.upgrade import upgrade_cache


def run_job() -> None:
job_config = JobConfig.from_env()
action = job_config.action
supported_actions = ["backfill", "upgrade", "collect-metrics"]
supported_actions = ["backfill", "collect-metrics", "skip"]
# In the future we will support other kind of actions
if not action:
logging.warning("No action mode was selected, skipping tasks.")
Expand Down Expand Up @@ -55,21 +54,16 @@ def run_job() -> None:
return

processing_graph = ProcessingGraph(job_config.graph.specification)
init_processing_steps = processing_graph.get_first_steps()
processing_steps = list(processing_graph.steps.values())
start_time = datetime.now()

if action == "backfill":
backfill_cache(
init_processing_steps=init_processing_steps,
processing_graph=processing_graph,
hf_endpoint=job_config.common.hf_endpoint,
hf_token=job_config.common.hf_token,
)
if action == "upgrade":
upgrade_cache(processing_steps)

if action == "collect-metrics":
collect_metrics(processing_steps)
elif action == "collect-metrics":
collect_metrics(processing_graph=processing_graph)

end_time = datetime.now()
logging.info(f"Duration: {end_time - start_time}")
Expand Down
6 changes: 3 additions & 3 deletions jobs/cache_maintenance/src/cache_maintenance/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import logging

from libcommon.metrics import CacheTotalMetric, JobTotalMetric
from libcommon.processing_graph import ProcessingStep
from libcommon.processing_graph import ProcessingGraph
from libcommon.queue import Queue
from libcommon.simple_cache import get_responses_count_by_kind_status_and_error_code


def collect_metrics(processing_steps: list[ProcessingStep]) -> None:
def collect_metrics(processing_graph: ProcessingGraph) -> None:
logging.info("collecting jobs metrics")
queue = Queue()
for processing_step in processing_steps:
for processing_step in processing_graph.steps.values():
for status, total in queue.get_jobs_count_by_status(job_type=processing_step.job_type).items():
JobTotalMetric.objects(queue=processing_step.job_type, status=status).upsert_one(total=total)

Expand Down
43 changes: 0 additions & 43 deletions jobs/cache_maintenance/src/cache_maintenance/upgrade.py

This file was deleted.

32 changes: 0 additions & 32 deletions jobs/cache_maintenance/tests/test_backfill_cache.py

This file was deleted.

34 changes: 15 additions & 19 deletions jobs/cache_maintenance/tests/test_collect_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from http import HTTPStatus

from libcommon.metrics import CacheTotalMetric, JobTotalMetric
from libcommon.processing_graph import ProcessingStep
from libcommon.processing_graph import ProcessingGraph
from libcommon.queue import Queue
from libcommon.simple_cache import upsert_response

Expand All @@ -17,28 +17,24 @@ def test_collect_metrics() -> None:
split = None
content = {"some": "content"}

test_type = "test_type"
step_name = "test_type"
processing_graph = ProcessingGraph(
processing_graph_specification={step_name: {"input_type": "dataset", "job_runner_version": 1}}
)
queue = Queue()
queue.upsert_job(job_type=test_type, dataset="dataset", config="config", split="split")

queue.upsert_job(
job_type=processing_graph.get_step(step_name).job_type, dataset="dataset", config="config", split="split"
)
upsert_response(
kind=test_type, dataset=dataset, config=config, split=split, content=content, http_status=HTTPStatus.OK
kind=processing_graph.get_step(step_name).cache_kind,
dataset=dataset,
config=config,
split=split,
content=content,
http_status=HTTPStatus.OK,
)

processing_steps = [
ProcessingStep(
name=test_type,
input_type="dataset",
requires=[],
required_by_dataset_viewer=False,
ancestors=[],
children=[],
parents=[],
job_runner_version=1,
)
]

collect_metrics(processing_steps=processing_steps)
collect_metrics(processing_graph=processing_graph)

cache_metrics = CacheTotalMetric.objects()
assert cache_metrics
Expand Down
48 changes: 0 additions & 48 deletions jobs/cache_maintenance/tests/test_upgrade_cache.py

This file was deleted.

46 changes: 39 additions & 7 deletions libs/libcommon/src/libcommon/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,42 @@ def ask_access(
raise err


def raise_if_not_supported(dataset_info: DatasetInfo) -> None:
"""
Raise an error if the dataset is not supported by the datasets-server.
Args:
dataset_info (`DatasetInfo`):
The dataset info.
Returns:
`None`
<Tip>
Raises the following errors:
- [`~libcommon.dataset.DisabledViewerError`]: if the dataset viewer is disabled.
- [`~libcommon.dataset.DatasetNotFoundError`]: if the dataset id does not exist, or if the dataset is private
</Tip>
"""
if not dataset_info.id or dataset_info.private:
raise DatasetNotFoundError(DOES_NOT_EXIST_OR_PRIVATE_DATASET_ERROR_MESSAGE)
if dataset_info.cardData and not dataset_info.cardData.get("viewer", True):
raise DisabledViewerError("The dataset viewer has been disabled on this dataset.")


def is_supported(dataset_info: DatasetInfo) -> bool:
"""
Check if the dataset is supported by the datasets-server.
Args:
dataset_info (`DatasetInfo`):
The dataset info.
Returns:
`bool`: True if the dataset is supported, False otherwise.
"""
try:
raise_if_not_supported(dataset_info)
except DatasetError:
return False
return True


def get_dataset_info_for_supported_datasets(
dataset: str,
hf_endpoint: str,
Expand Down Expand Up @@ -257,10 +293,7 @@ def get_dataset_info_for_supported_datasets(
),
cause=err,
) from err
if dataset_info.private:
raise DatasetNotFoundError(DOES_NOT_EXIST_OR_PRIVATE_DATASET_ERROR_MESSAGE)
if dataset_info.cardData and not dataset_info.cardData.get("viewer", True):
raise DisabledViewerError("The dataset viewer has been disabled on this dataset.")
raise_if_not_supported(dataset_info)
return dataset_info


Expand Down Expand Up @@ -346,6 +379,5 @@ def check_support(
)


def get_supported_datasets(hf_endpoint: str, hf_token: Optional[str] = None) -> list[str]:
return [d.id for d in HfApi(endpoint=hf_endpoint, token=hf_token).list_datasets() if d.id and not d.private]
# no timeout on this function. It's used only in the admin service
def get_supported_dataset_infos(hf_endpoint: str, hf_token: Optional[str] = None) -> list[DatasetInfo]:
return [d for d in HfApi(endpoint=hf_endpoint, token=hf_token).list_datasets() if is_supported(d)]
Loading

0 comments on commit f6118d3

Please sign in to comment.