diff --git a/libs/libcache/src/libcache/cache.py b/libs/libcache/src/libcache/cache.py index f2905ca42a..49089c4267 100644 --- a/libs/libcache/src/libcache/cache.py +++ b/libs/libcache/src/libcache/cache.py @@ -120,6 +120,9 @@ def to_split_full_name(self) -> SplitFullName: objects = QuerySetManager["DbSplit"]() +AnyDb = TypeVar("AnyDb", DbDataset, DbSplit) # Must be DbDataset or DbSplit + + class _BaseErrorItem(TypedDict): status_code: int exception: str @@ -465,16 +468,6 @@ def get_dataset_names_with_status(status: str) -> List[str]: return [d.dataset_name for d in DbDataset.objects(status=status).only("dataset_name")] -def get_datasets_count_with_status(status: Status) -> int: - # TODO: take the splits statuses into account? - return DbDataset.objects(status=status).count() - - -def get_splits_count_with_status(status: Status) -> int: - # TODO: take the splits statuses into account? - return DbSplit.objects(status=status).count() - - class CountByStatus(TypedDict): empty: int error: int @@ -482,22 +475,24 @@ class CountByStatus(TypedDict): valid: int -def get_datasets_count_by_status() -> CountByStatus: +def get_entries_count_by_status(entries: QuerySet[AnyDb]) -> CountByStatus: + frequencies: Dict[str, int] = entries.item_frequencies("status", normalize=False) # type: ignore + # ensure that all the statuses are present, even if equal to zero return { - "empty": get_datasets_count_with_status(Status.EMPTY), - "error": get_datasets_count_with_status(Status.ERROR), - "stalled": get_datasets_count_with_status(Status.STALLED), - "valid": get_datasets_count_with_status(Status.VALID), + "empty": frequencies.get(Status.EMPTY.value, 0), + "error": frequencies.get(Status.ERROR.value, 0), + "stalled": frequencies.get(Status.STALLED.value, 0), + "valid": frequencies.get(Status.VALID.value, 0), } +def get_datasets_count_by_status() -> CountByStatus: + # TODO: take the splits statuses into account? + return get_entries_count_by_status(DbDataset.objects) + + def get_splits_count_by_status() -> CountByStatus: - return { - "empty": get_splits_count_with_status(Status.EMPTY), - "error": get_splits_count_with_status(Status.ERROR), - "stalled": get_splits_count_with_status(Status.STALLED), - "valid": get_splits_count_with_status(Status.VALID), - } + return get_entries_count_by_status(DbSplit.objects) class DatasetCacheReport(TypedDict): diff --git a/libs/libcache/tests/test_cache.py b/libs/libcache/tests/test_cache.py index 2fc0fbb6b2..5cd673f393 100644 --- a/libs/libcache/tests/test_cache.py +++ b/libs/libcache/tests/test_cache.py @@ -10,7 +10,9 @@ clean_database, connect_to_cache, delete_dataset_cache, + get_datasets_count_by_status, get_rows_response, + get_splits_count_by_status, get_valid_or_stalled_dataset_names, upsert_dataset, upsert_split, @@ -146,3 +148,28 @@ def test_valid() -> None: ) assert get_valid_or_stalled_dataset_names() == ["test_dataset", "test_dataset2"] + + +def test_count_by_status() -> None: + assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 0} + + upsert_dataset( + "test_dataset", [{"dataset_name": "test_dataset", "config_name": "test_config", "split_name": "test_split"}] + ) + + assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1} + assert get_splits_count_by_status() == {"empty": 1, "error": 0, "stalled": 0, "valid": 0} + + upsert_split( + "test_dataset", + "test_config", + "test_split", + { + "split_name": "test_split", + "rows_response": {"rows": [], "columns": []}, + "num_bytes": None, + "num_examples": None, + }, + ) + + assert get_splits_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1} diff --git a/libs/libqueue/src/libqueue/queue.py b/libs/libqueue/src/libqueue/queue.py index 67e1edb7a4..e3dc6832f6 100644 --- a/libs/libqueue/src/libqueue/queue.py +++ b/libs/libqueue/src/libqueue/queue.py @@ -2,7 +2,7 @@ import logging import types from datetime import datetime -from typing import Generic, List, Optional, Tuple, Type, TypedDict, TypeVar +from typing import Dict, Generic, List, Optional, Tuple, Type, TypedDict, TypeVar from mongoengine import Document, DoesNotExist, connect from mongoengine.fields import DateTimeField, EnumField, StringField @@ -304,12 +304,14 @@ def cancel_started_split_jobs() -> None: def get_jobs_count_by_status(jobs: QuerySet[AnyJob]) -> CountByStatus: + frequencies: Dict[str, int] = jobs.item_frequencies("status", normalize=False) # type: ignore + # ensure that all the statuses are present, even if equal to zero return { - "waiting": get_jobs_with_status(jobs, Status.WAITING).count(), - "started": get_jobs_with_status(jobs, Status.STARTED).count(), - "success": get_jobs_with_status(jobs, Status.SUCCESS).count(), - "error": get_jobs_with_status(jobs, Status.ERROR).count(), - "cancelled": get_jobs_with_status(jobs, Status.CANCELLED).count(), + "waiting": frequencies.get(Status.WAITING.value, 0), + "started": frequencies.get(Status.STARTED.value, 0), + "success": frequencies.get(Status.SUCCESS.value, 0), + "error": frequencies.get(Status.ERROR.value, 0), + "cancelled": frequencies.get(Status.CANCELLED.value, 0), } diff --git a/libs/libqueue/tests/test_queue.py b/libs/libqueue/tests/test_queue.py index 837076407d..01278c1054 100644 --- a/libs/libqueue/tests/test_queue.py +++ b/libs/libqueue/tests/test_queue.py @@ -8,7 +8,9 @@ connect_to_queue, finish_dataset_job, get_dataset_job, + get_dataset_jobs_count_by_status, get_split_job, + get_split_jobs_count_by_status, is_dataset_in_queue, is_split_in_queue, ) @@ -87,3 +89,29 @@ def test_is_split_in_queue() -> None: add_split_job(dataset_name, config_name, split_name) assert is_split_in_queue(dataset_name, config_name, split_name) is True assert is_split_in_queue(dataset_name_2, config_name, split_name) is False + + +def test_count_by_status() -> None: + assert get_dataset_jobs_count_by_status() == { + "waiting": 0, + "started": 0, + "success": 0, + "error": 0, + "cancelled": 0, + } + + add_dataset_job("test_dataset") + + assert get_dataset_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0} + + assert get_split_jobs_count_by_status() == { + "waiting": 0, + "started": 0, + "success": 0, + "error": 0, + "cancelled": 0, + } + + add_split_job("test_dataset", "test_config", "test_split") + + assert get_split_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0} diff --git a/services/api/poetry.lock b/services/api/poetry.lock index 5e20e5ca9f..6133a99b62 100644 --- a/services/api/poetry.lock +++ b/services/api/poetry.lock @@ -732,11 +732,11 @@ tests = ["pytest (>=5.4.1)", "pytest-cov (>=2.8.1)", "sphinx (>=3.0.3)", "pytest [[package]] name = "prometheus-client" -version = "0.14.1" +version = "0.12.0" description = "Python client for the Prometheus monitoring system." category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [package.extras] twisted = ["twisted"] @@ -1067,6 +1067,18 @@ anyio = ">=3.0.0,<4" [package.extras] full = ["itsdangerous", "jinja2", "python-multipart", "pyyaml", "requests", "graphene"] +[[package]] +name = "starlette-prometheus" +version = "0.9.0" +description = "Prometheus integration for Starlette" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" + +[package.dependencies] +prometheus_client = ">=0.12,<0.13" +starlette = ">=0.12.2" + [[package]] name = "stevedore" version = "3.5.0" @@ -1189,7 +1201,7 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "1.1" python-versions = "3.9.6" -content-hash = "220f504588ab6b978e29e6e613926e8f5b67d01df5109248827987b4e5f21510" +content-hash = "4e0781c9059d07cfda1a4419eee819ad7a24ef7b84f0d998077b7c0619dc18f6" [metadata.files] anyio = [ @@ -1625,8 +1637,8 @@ portalocker = [ {file = "portalocker-2.4.0.tar.gz", hash = "sha256:a648ad761b8ea27370cb5915350122cd807b820d2193ed5c9cc28f163df637f4"}, ] prometheus-client = [ - {file = "prometheus_client-0.14.1-py3-none-any.whl", hash = "sha256:522fded625282822a89e2773452f42df14b5a8e84a86433e3f8a189c1d54dc01"}, - {file = "prometheus_client-0.14.1.tar.gz", hash = "sha256:5459c427624961076277fdc6dc50540e2bacb98eebde99886e59ec55ed92093a"}, + {file = "prometheus_client-0.12.0-py2.py3-none-any.whl", hash = "sha256:317453ebabff0a1b02df7f708efbab21e3489e7072b61cb6957230dd004a0af0"}, + {file = "prometheus_client-0.12.0.tar.gz", hash = "sha256:1b12ba48cee33b9b0b9de64a1047cbd3c5f2d0ab6ebcead7ddda613a750ec3c5"}, ] py = [ {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, @@ -1995,6 +2007,10 @@ starlette = [ {file = "starlette-0.16.0-py3-none-any.whl", hash = "sha256:38eb24bf705a2c317e15868e384c1b8a12ca396e5a3c3a003db7e667c43f939f"}, {file = "starlette-0.16.0.tar.gz", hash = "sha256:e1904b5d0007aee24bdd3c43994be9b3b729f4f58e740200de1d623f8c3a8870"}, ] +starlette-prometheus = [ + {file = "starlette-prometheus-0.9.0.tar.gz", hash = "sha256:a52fb0f1df52b44a7a677a792759337ef0ce0d59ddf3e684a7d6459a93a90e99"}, + {file = "starlette_prometheus-0.9.0-py3-none-any.whl", hash = "sha256:b4702e4ec67dce508d28551db0e45f12f58411afdb5d1078c92ff74331915381"}, +] stevedore = [ {file = "stevedore-3.5.0-py3-none-any.whl", hash = "sha256:a547de73308fd7e90075bb4d301405bebf705292fa90a90fc3bcf9133f58616c"}, {file = "stevedore-3.5.0.tar.gz", hash = "sha256:f40253887d8712eaa2bb0ea3830374416736dc8ec0e22f5a65092c1174c44335"}, diff --git a/services/api/pyproject.toml b/services/api/pyproject.toml index f24d6b1a30..943a2181b5 100644 --- a/services/api/pyproject.toml +++ b/services/api/pyproject.toml @@ -9,10 +9,10 @@ huggingface-hub = "^0.5.1" libcache = { path = "../../libs/libcache", develop = true } libqueue = { path = "../../libs/libqueue", develop = true } libutils = { path = "../../libs/libutils", develop = true } -prometheus-client = "^0.14.1" python = "3.9.6" python-dotenv = "^0.20.0" starlette = "^0.16.0" +starlette-prometheus = "^0.9.0" uvicorn = "^0.14.0" watchdog = { extras = ["watchmedo"], version = "^2.1.3" } diff --git a/services/api/src/api/app.py b/services/api/src/api/app.py index e21523d45d..c8d330d359 100644 --- a/services/api/src/api/app.py +++ b/services/api/src/api/app.py @@ -8,6 +8,7 @@ from starlette.middleware.gzip import GZipMiddleware from starlette.routing import Mount, Route from starlette.staticfiles import StaticFiles +from starlette_prometheus import PrometheusMiddleware from api.config import ( APP_HOSTNAME, @@ -44,7 +45,7 @@ def create_app() -> Starlette: show_assets_dir(ASSETS_DIRECTORY) prometheus = Prometheus() - middleware = [Middleware(GZipMiddleware)] + middleware = [Middleware(GZipMiddleware), Middleware(PrometheusMiddleware, filter_unhandled_paths=True)] routes = [ Mount("/assets", app=StaticFiles(directory=init_assets_dir(ASSETS_DIRECTORY), check_dir=True), name="assets"), Route("/cache-reports", endpoint=cache_reports_endpoint), diff --git a/services/api/src/api/prometheus.py b/services/api/src/api/prometheus.py index a890c751cc..e51f6f1f0c 100644 --- a/services/api/src/api/prometheus.py +++ b/services/api/src/api/prometheus.py @@ -55,7 +55,6 @@ def updateMetrics(self): self.metrics["cache_entries_total"].labels(cache="splits", status=status).set(total) def endpoint(self, request: Request) -> Response: - # disabled for now to fix https://github.com/huggingface/datasets-server/issues/279 - # self.updateMetrics() + self.updateMetrics() return Response(generate_latest(self.getRegistry()), headers={"Content-Type": CONTENT_TYPE_LATEST}) diff --git a/services/api/tests/test_app.py b/services/api/tests/test_app.py index 73ad64accd..11149e4dd2 100644 --- a/services/api/tests/test_app.py +++ b/services/api/tests/test_app.py @@ -343,6 +343,6 @@ def test_metrics(client: TestClient) -> None: assert name in metrics assert metrics[name] > 0 name = "process_start_time_seconds" - # Disabled for now - # assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics - # assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics + assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics + assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics + assert 'starlette_requests_total{method="GET",path_template="/metrics"}' in metrics