Skip to content

Commit

Permalink
Reenable metrics (#298)
Browse files Browse the repository at this point in the history
* feat: 🎸 add again the metrics about cache and queue

See #250 and #279

* feat: 🎸 add again the starlette metrics
  • Loading branch information
severo authored May 23, 2022
1 parent 1fc3fa7 commit f68c206
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 39 deletions.
37 changes: 16 additions & 21 deletions libs/libcache/src/libcache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def to_split_full_name(self) -> SplitFullName:
objects = QuerySetManager["DbSplit"]()


AnyDb = TypeVar("AnyDb", DbDataset, DbSplit) # Must be DbDataset or DbSplit


class _BaseErrorItem(TypedDict):
status_code: int
exception: str
Expand Down Expand Up @@ -465,39 +468,31 @@ def get_dataset_names_with_status(status: str) -> List[str]:
return [d.dataset_name for d in DbDataset.objects(status=status).only("dataset_name")]


def get_datasets_count_with_status(status: Status) -> int:
# TODO: take the splits statuses into account?
return DbDataset.objects(status=status).count()


def get_splits_count_with_status(status: Status) -> int:
# TODO: take the splits statuses into account?
return DbSplit.objects(status=status).count()


class CountByStatus(TypedDict):
empty: int
error: int
stalled: int
valid: int


def get_datasets_count_by_status() -> CountByStatus:
def get_entries_count_by_status(entries: QuerySet[AnyDb]) -> CountByStatus:
frequencies: Dict[str, int] = entries.item_frequencies("status", normalize=False) # type: ignore
# ensure that all the statuses are present, even if equal to zero
return {
"empty": get_datasets_count_with_status(Status.EMPTY),
"error": get_datasets_count_with_status(Status.ERROR),
"stalled": get_datasets_count_with_status(Status.STALLED),
"valid": get_datasets_count_with_status(Status.VALID),
"empty": frequencies.get(Status.EMPTY.value, 0),
"error": frequencies.get(Status.ERROR.value, 0),
"stalled": frequencies.get(Status.STALLED.value, 0),
"valid": frequencies.get(Status.VALID.value, 0),
}


def get_datasets_count_by_status() -> CountByStatus:
# TODO: take the splits statuses into account?
return get_entries_count_by_status(DbDataset.objects)


def get_splits_count_by_status() -> CountByStatus:
return {
"empty": get_splits_count_with_status(Status.EMPTY),
"error": get_splits_count_with_status(Status.ERROR),
"stalled": get_splits_count_with_status(Status.STALLED),
"valid": get_splits_count_with_status(Status.VALID),
}
return get_entries_count_by_status(DbSplit.objects)


class DatasetCacheReport(TypedDict):
Expand Down
27 changes: 27 additions & 0 deletions libs/libcache/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
clean_database,
connect_to_cache,
delete_dataset_cache,
get_datasets_count_by_status,
get_rows_response,
get_splits_count_by_status,
get_valid_or_stalled_dataset_names,
upsert_dataset,
upsert_split,
Expand Down Expand Up @@ -146,3 +148,28 @@ def test_valid() -> None:
)

assert get_valid_or_stalled_dataset_names() == ["test_dataset", "test_dataset2"]


def test_count_by_status() -> None:
assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 0}

upsert_dataset(
"test_dataset", [{"dataset_name": "test_dataset", "config_name": "test_config", "split_name": "test_split"}]
)

assert get_datasets_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1}
assert get_splits_count_by_status() == {"empty": 1, "error": 0, "stalled": 0, "valid": 0}

upsert_split(
"test_dataset",
"test_config",
"test_split",
{
"split_name": "test_split",
"rows_response": {"rows": [], "columns": []},
"num_bytes": None,
"num_examples": None,
},
)

assert get_splits_count_by_status() == {"empty": 0, "error": 0, "stalled": 0, "valid": 1}
14 changes: 8 additions & 6 deletions libs/libqueue/src/libqueue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import types
from datetime import datetime
from typing import Generic, List, Optional, Tuple, Type, TypedDict, TypeVar
from typing import Dict, Generic, List, Optional, Tuple, Type, TypedDict, TypeVar

from mongoengine import Document, DoesNotExist, connect
from mongoengine.fields import DateTimeField, EnumField, StringField
Expand Down Expand Up @@ -304,12 +304,14 @@ def cancel_started_split_jobs() -> None:


def get_jobs_count_by_status(jobs: QuerySet[AnyJob]) -> CountByStatus:
frequencies: Dict[str, int] = jobs.item_frequencies("status", normalize=False) # type: ignore
# ensure that all the statuses are present, even if equal to zero
return {
"waiting": get_jobs_with_status(jobs, Status.WAITING).count(),
"started": get_jobs_with_status(jobs, Status.STARTED).count(),
"success": get_jobs_with_status(jobs, Status.SUCCESS).count(),
"error": get_jobs_with_status(jobs, Status.ERROR).count(),
"cancelled": get_jobs_with_status(jobs, Status.CANCELLED).count(),
"waiting": frequencies.get(Status.WAITING.value, 0),
"started": frequencies.get(Status.STARTED.value, 0),
"success": frequencies.get(Status.SUCCESS.value, 0),
"error": frequencies.get(Status.ERROR.value, 0),
"cancelled": frequencies.get(Status.CANCELLED.value, 0),
}


Expand Down
28 changes: 28 additions & 0 deletions libs/libqueue/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
connect_to_queue,
finish_dataset_job,
get_dataset_job,
get_dataset_jobs_count_by_status,
get_split_job,
get_split_jobs_count_by_status,
is_dataset_in_queue,
is_split_in_queue,
)
Expand Down Expand Up @@ -87,3 +89,29 @@ def test_is_split_in_queue() -> None:
add_split_job(dataset_name, config_name, split_name)
assert is_split_in_queue(dataset_name, config_name, split_name) is True
assert is_split_in_queue(dataset_name_2, config_name, split_name) is False


def test_count_by_status() -> None:
assert get_dataset_jobs_count_by_status() == {
"waiting": 0,
"started": 0,
"success": 0,
"error": 0,
"cancelled": 0,
}

add_dataset_job("test_dataset")

assert get_dataset_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0}

assert get_split_jobs_count_by_status() == {
"waiting": 0,
"started": 0,
"success": 0,
"error": 0,
"cancelled": 0,
}

add_split_job("test_dataset", "test_config", "test_split")

assert get_split_jobs_count_by_status() == {"waiting": 1, "started": 0, "success": 0, "error": 0, "cancelled": 0}
26 changes: 21 additions & 5 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ huggingface-hub = "^0.5.1"
libcache = { path = "../../libs/libcache", develop = true }
libqueue = { path = "../../libs/libqueue", develop = true }
libutils = { path = "../../libs/libutils", develop = true }
prometheus-client = "^0.14.1"
python = "3.9.6"
python-dotenv = "^0.20.0"
starlette = "^0.16.0"
starlette-prometheus = "^0.9.0"
uvicorn = "^0.14.0"
watchdog = { extras = ["watchmedo"], version = "^2.1.3" }

Expand Down
3 changes: 2 additions & 1 deletion services/api/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from starlette.middleware.gzip import GZipMiddleware
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from starlette_prometheus import PrometheusMiddleware

from api.config import (
APP_HOSTNAME,
Expand Down Expand Up @@ -44,7 +45,7 @@ def create_app() -> Starlette:
show_assets_dir(ASSETS_DIRECTORY)
prometheus = Prometheus()

middleware = [Middleware(GZipMiddleware)]
middleware = [Middleware(GZipMiddleware), Middleware(PrometheusMiddleware, filter_unhandled_paths=True)]
routes = [
Mount("/assets", app=StaticFiles(directory=init_assets_dir(ASSETS_DIRECTORY), check_dir=True), name="assets"),
Route("/cache-reports", endpoint=cache_reports_endpoint),
Expand Down
3 changes: 1 addition & 2 deletions services/api/src/api/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def updateMetrics(self):
self.metrics["cache_entries_total"].labels(cache="splits", status=status).set(total)

def endpoint(self, request: Request) -> Response:
# disabled for now to fix https://github.com/huggingface/datasets-server/issues/279
# self.updateMetrics()
self.updateMetrics()

return Response(generate_latest(self.getRegistry()), headers={"Content-Type": CONTENT_TYPE_LATEST})
6 changes: 3 additions & 3 deletions services/api/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,6 @@ def test_metrics(client: TestClient) -> None:
assert name in metrics
assert metrics[name] > 0
name = "process_start_time_seconds"
# Disabled for now
# assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics
# assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics
assert 'queue_jobs_total{queue="datasets",status="waiting"}' in metrics
assert 'cache_entries_total{cache="datasets",status="empty"}' in metrics
assert 'starlette_requests_total{method="GET",path_template="/metrics"}' in metrics

0 comments on commit f68c206

Please sign in to comment.