Skip to content

Commit

Permalink
feat: 🎸 add again the metrics about cache and queue
Browse files Browse the repository at this point in the history
See #250 and #279
  • Loading branch information
severo committed May 23, 2022
1 parent 1fc3fa7 commit 34e6444
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 32 deletions.
37 changes: 16 additions & 21 deletions libs/libcache/src/libcache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def to_split_full_name(self) -> SplitFullName:
objects = QuerySetManager["DbSplit"]()


AnyDb = TypeVar("AnyDb", DbDataset, DbSplit) # Must be DbDataset or DbSplit


class _BaseErrorItem(TypedDict):
status_code: int
exception: str
Expand Down Expand Up @@ -465,39 +468,31 @@ def get_dataset_names_with_status(status: str) -> List[str]:
return [d.dataset_name for d in DbDataset.objects(status=status).only("dataset_name")]


def get_datasets_count_with_status(status: Status) -> int:
# TODO: take the splits statuses into account?
return DbDataset.objects(status=status).count()


def get_splits_count_with_status(status: Status) -> int:
# TODO: take the splits statuses into account?
return DbSplit.objects(status=status).count()


class CountByStatus(TypedDict):
empty: int
error: int
stalled: int
valid: int


def get_datasets_count_by_status() -> CountByStatus:
def get_entries_count_by_status(entries: QuerySet[AnyDb]) -> CountByStatus:
frequencies: Dict[str, int] = entries.item_frequencies("status", normalize=False) # type: ignore
# ensure that all the statuses are present, even if equal to zero
return {
"empty": get_datasets_count_with_status(Status.EMPTY),
"error": get_datasets_count_with_status(Status.ERROR),
"stalled": get_datasets_count_with_status(Status.STALLED),
"valid": get_datasets_count_with_status(Status.VALID),
"empty": frequencies.get(Status.EMPTY.value, 0),
"error": frequencies.get(Status.ERROR.value, 0),
"stalled": frequencies.get(Status.STALLED.value, 0),
"valid": frequencies.get(Status.VALID.value, 0),
}


def get_datasets_count_by_status() -> CountByStatus:
# TODO: take the splits statuses into account?
return get_entries_count_by_status(DbDataset.objects)


def get_splits_count_by_status() -> CountByStatus:
return {
"empty": get_splits_count_with_status(Status.EMPTY),
"error": get_splits_count_with_status(Status.ERROR),
"stalled": get_splits_count_with_status(Status.STALLED),
"valid": get_splits_count_with_status(Status.VALID),
}
return get_entries_count_by_status(DbSplit.objects)


class DatasetCacheReport(TypedDict):
Expand Down
27 changes: 27 additions & 0 deletions libs/libcache/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
clean_database,
connect_to_cache,
delete_dataset_cache,
get_datasets_count_by_status,
get_rows_response,
get_splits_count_by_status,
get_valid_or_stalled_dataset_names,
upsert_dataset,
upsert_split,
Expand Down Expand Up @@ -146,3 +148,28 @@ def test_valid() -> None:
)

assert get_valid_or_stalled_dataset_names() == ["test_dataset", "test_dataset2"]


def test_count_by_status() -> None:
assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 0}

upsert_dataset(
"test_dataset", [{"dataset_name": "test_dataset", "config_name": "test_config", "split_name": "test_split"}]
)

assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1}
assert get_splits_count_by_status() == {"empty": 1, "error": 0, "stalled": 0, "valid": 0}

upsert_split(
"test_dataset",
"test_config",
"test_split",
{
"split_name": "test_split",
"rows_response": {"rows": [], "columns": []},
"num_bytes": None,
"num_examples": None,
},
)

assert get_splits_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1}
14 changes: 8 additions & 6 deletions libs/libqueue/src/libqueue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import types
from datetime import datetime
from typing import Generic, List, Optional, Tuple, Type, TypedDict, TypeVar
from typing import Dict, Generic, List, Optional, Tuple, Type, TypedDict, TypeVar

from mongoengine import Document, DoesNotExist, connect
from mongoengine.fields import DateTimeField, EnumField, StringField
Expand Down Expand Up @@ -304,12 +304,14 @@ def cancel_started_split_jobs() -> None:


def get_jobs_count_by_status(jobs: QuerySet[AnyJob]) -> CountByStatus:
frequencies: Dict[str, int] = jobs.item_frequencies("status", normalize=False) # type: ignore
# ensure that all the statuses are present, even if equal to zero
return {
"waiting": get_jobs_with_status(jobs, Status.WAITING).count(),
"started": get_jobs_with_status(jobs, Status.STARTED).count(),
"success": get_jobs_with_status(jobs, Status.SUCCESS).count(),
"error": get_jobs_with_status(jobs, Status.ERROR).count(),
"cancelled": get_jobs_with_status(jobs, Status.CANCELLED).count(),
"waiting": frequencies.get(Status.WAITING.value, 0),
"started": frequencies.get(Status.STARTED.value, 0),
"success": frequencies.get(Status.SUCCESS.value, 0),
"error": frequencies.get(Status.ERROR.value, 0),
"cancelled": frequencies.get(Status.CANCELLED.value, 0),
}


Expand Down
28 changes: 28 additions & 0 deletions libs/libqueue/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
connect_to_queue,
finish_dataset_job,
get_dataset_job,
get_dataset_jobs_count_by_status,
get_split_job,
get_split_jobs_count_by_status,
is_dataset_in_queue,
is_split_in_queue,
)
Expand Down Expand Up @@ -87,3 +89,29 @@ def test_is_split_in_queue() -> None:
add_split_job(dataset_name, config_name, split_name)
assert is_split_in_queue(dataset_name, config_name, split_name) is True
assert is_split_in_queue(dataset_name_2, config_name, split_name) is False


def test_count_by_status() -> None:
assert get_dataset_jobs_count_by_status() == {
"waiting": 0,
"started": 0,
"success": 0,
"error": 0,
"cancelled": 0,
}

add_dataset_job("test_dataset")

assert get_dataset_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0}

assert get_split_jobs_count_by_status() == {
"waiting": 0,
"started": 0,
"success": 0,
"error": 0,
"cancelled": 0,
}

add_split_job("test_dataset", "test_config", "test_split")

assert get_split_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0}
3 changes: 1 addition & 2 deletions services/api/src/api/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def updateMetrics(self):
self.metrics["cache_entries_total"].labels(cache="splits", status=status).set(total)

def endpoint(self, request: Request) -> Response:
# disabled for now to fix https://github.com/huggingface/datasets-server/issues/279
# self.updateMetrics()
self.updateMetrics()

return Response(generate_latest(self.getRegistry()), headers={"Content-Type": CONTENT_TYPE_LATEST})
5 changes: 2 additions & 3 deletions services/api/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,5 @@ def test_metrics(client: TestClient) -> None:
assert name in metrics
assert metrics[name] > 0
name = "process_start_time_seconds"
# Disabled for now
# assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics
# assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics
assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics
assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics

0 comments on commit 34e6444

Please sign in to comment.