From 34e64442bca668dfba8f6b07341715cb70204d69 Mon Sep 17 00:00:00 2001 From: Test User Date: Mon, 23 May 2022 15:41:44 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20add=20again=20the=20metr?= =?UTF-8?q?ics=20about=20cache=20and=20queue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See #250 and #279 --- libs/libcache/src/libcache/cache.py | 37 +++++++++++++---------------- libs/libcache/tests/test_cache.py | 27 +++++++++++++++++++++ libs/libqueue/src/libqueue/queue.py | 14 ++++++----- libs/libqueue/tests/test_queue.py | 28 ++++++++++++++++++++++ services/api/src/api/prometheus.py | 3 +-- services/api/tests/test_app.py | 5 ++-- 6 files changed, 82 insertions(+), 32 deletions(-) diff --git a/libs/libcache/src/libcache/cache.py b/libs/libcache/src/libcache/cache.py index f2905ca42a..49089c4267 100644 --- a/libs/libcache/src/libcache/cache.py +++ b/libs/libcache/src/libcache/cache.py @@ -120,6 +120,9 @@ def to_split_full_name(self) -> SplitFullName: objects = QuerySetManager["DbSplit"]() +AnyDb = TypeVar("AnyDb", DbDataset, DbSplit) # Must be DbDataset or DbSplit + + class _BaseErrorItem(TypedDict): status_code: int exception: str @@ -465,16 +468,6 @@ def get_dataset_names_with_status(status: str) -> List[str]: return [d.dataset_name for d in DbDataset.objects(status=status).only("dataset_name")] -def get_datasets_count_with_status(status: Status) -> int: - # TODO: take the splits statuses into account? - return DbDataset.objects(status=status).count() - - -def get_splits_count_with_status(status: Status) -> int: - # TODO: take the splits statuses into account? - return DbSplit.objects(status=status).count() - - class CountByStatus(TypedDict): empty: int error: int @@ -482,22 +475,24 @@ class CountByStatus(TypedDict): valid: int -def get_datasets_count_by_status() -> CountByStatus: +def get_entries_count_by_status(entries: QuerySet[AnyDb]) -> CountByStatus: + frequencies: Dict[str, int] = entries.item_frequencies("status", normalize=False) # type: ignore + # ensure that all the statuses are present, even if equal to zero return { - "empty": get_datasets_count_with_status(Status.EMPTY), - "error": get_datasets_count_with_status(Status.ERROR), - "stalled": get_datasets_count_with_status(Status.STALLED), - "valid": get_datasets_count_with_status(Status.VALID), + "empty": frequencies.get(Status.EMPTY.value, 0), + "error": frequencies.get(Status.ERROR.value, 0), + "stalled": frequencies.get(Status.STALLED.value, 0), + "valid": frequencies.get(Status.VALID.value, 0), } +def get_datasets_count_by_status() -> CountByStatus: + # TODO: take the splits statuses into account? + return get_entries_count_by_status(DbDataset.objects) + + def get_splits_count_by_status() -> CountByStatus: - return { - "empty": get_splits_count_with_status(Status.EMPTY), - "error": get_splits_count_with_status(Status.ERROR), - "stalled": get_splits_count_with_status(Status.STALLED), - "valid": get_splits_count_with_status(Status.VALID), - } + return get_entries_count_by_status(DbSplit.objects) class DatasetCacheReport(TypedDict): diff --git a/libs/libcache/tests/test_cache.py b/libs/libcache/tests/test_cache.py index 2fc0fbb6b2..5cd673f393 100644 --- a/libs/libcache/tests/test_cache.py +++ b/libs/libcache/tests/test_cache.py @@ -10,7 +10,9 @@ clean_database, connect_to_cache, delete_dataset_cache, + get_datasets_count_by_status, get_rows_response, + get_splits_count_by_status, get_valid_or_stalled_dataset_names, upsert_dataset, upsert_split, @@ -146,3 +148,28 @@ def test_valid() -> None: ) assert get_valid_or_stalled_dataset_names() == ["test_dataset", "test_dataset2"] + + +def test_count_by_status() -> None: + assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 0} + + upsert_dataset( + "test_dataset", [{"dataset_name": "test_dataset", "config_name": "test_config", "split_name": "test_split"}] + ) + + assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1} + assert get_splits_count_by_status() == {"empty": 1, "error": 0, "stalled": 0, "valid": 0} + + upsert_split( + "test_dataset", + "test_config", + "test_split", + { + "split_name": "test_split", + "rows_response": {"rows": [], "columns": []}, + "num_bytes": None, + "num_examples": None, + }, + ) + + assert get_splits_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1} diff --git a/libs/libqueue/src/libqueue/queue.py b/libs/libqueue/src/libqueue/queue.py index 67e1edb7a4..e3dc6832f6 100644 --- a/libs/libqueue/src/libqueue/queue.py +++ b/libs/libqueue/src/libqueue/queue.py @@ -2,7 +2,7 @@ import logging import types from datetime import datetime -from typing import Generic, List, Optional, Tuple, Type, TypedDict, TypeVar +from typing import Dict, Generic, List, Optional, Tuple, Type, TypedDict, TypeVar from mongoengine import Document, DoesNotExist, connect from mongoengine.fields import DateTimeField, EnumField, StringField @@ -304,12 +304,14 @@ def cancel_started_split_jobs() -> None: def get_jobs_count_by_status(jobs: QuerySet[AnyJob]) -> CountByStatus: + frequencies: Dict[str, int] = jobs.item_frequencies("status", normalize=False) # type: ignore + # ensure that all the statuses are present, even if equal to zero return { - "waiting": get_jobs_with_status(jobs, Status.WAITING).count(), - "started": get_jobs_with_status(jobs, Status.STARTED).count(), - "success": get_jobs_with_status(jobs, Status.SUCCESS).count(), - "error": get_jobs_with_status(jobs, Status.ERROR).count(), - "cancelled": get_jobs_with_status(jobs, Status.CANCELLED).count(), + "waiting": frequencies.get(Status.WAITING.value, 0), + "started": frequencies.get(Status.STARTED.value, 0), + "success": frequencies.get(Status.SUCCESS.value, 0), + "error": frequencies.get(Status.ERROR.value, 0), + "cancelled": frequencies.get(Status.CANCELLED.value, 0), } diff --git a/libs/libqueue/tests/test_queue.py b/libs/libqueue/tests/test_queue.py index 837076407d..01278c1054 100644 --- a/libs/libqueue/tests/test_queue.py +++ b/libs/libqueue/tests/test_queue.py @@ -8,7 +8,9 @@ connect_to_queue, finish_dataset_job, get_dataset_job, + get_dataset_jobs_count_by_status, get_split_job, + get_split_jobs_count_by_status, is_dataset_in_queue, is_split_in_queue, ) @@ -87,3 +89,29 @@ def test_is_split_in_queue() -> None: add_split_job(dataset_name, config_name, split_name) assert is_split_in_queue(dataset_name, config_name, split_name) is True assert is_split_in_queue(dataset_name_2, config_name, split_name) is False + + +def test_count_by_status() -> None: + assert get_dataset_jobs_count_by_status() == { + "waiting": 0, + "started": 0, + "success": 0, + "error": 0, + "cancelled": 0, + } + + add_dataset_job("test_dataset") + + assert get_dataset_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0} + + assert get_split_jobs_count_by_status() == { + "waiting": 0, + "started": 0, + "success": 0, + "error": 0, + "cancelled": 0, + } + + add_split_job("test_dataset", "test_config", "test_split") + + assert get_split_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0} diff --git a/services/api/src/api/prometheus.py b/services/api/src/api/prometheus.py index a890c751cc..e51f6f1f0c 100644 --- a/services/api/src/api/prometheus.py +++ b/services/api/src/api/prometheus.py @@ -55,7 +55,6 @@ def updateMetrics(self): self.metrics["cache_entries_total"].labels(cache="splits", status=status).set(total) def endpoint(self, request: Request) -> Response: - # disabled for now to fix https://github.com/huggingface/datasets-server/issues/279 - # self.updateMetrics() + self.updateMetrics() return Response(generate_latest(self.getRegistry()), headers={"Content-Type": CONTENT_TYPE_LATEST}) diff --git a/services/api/tests/test_app.py b/services/api/tests/test_app.py index 73ad64accd..ab3e5aeb6a 100644 --- a/services/api/tests/test_app.py +++ b/services/api/tests/test_app.py @@ -343,6 +343,5 @@ def test_metrics(client: TestClient) -> None: assert name in metrics assert metrics[name] > 0 name = "process_start_time_seconds" - # Disabled for now - # assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics - # assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics + assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics + assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics