diff --git a/faststream/asgi/app.py b/faststream/asgi/app.py index 36685f23fe..a031021ad0 100644 --- a/faststream/asgi/app.py +++ b/faststream/asgi/app.py @@ -153,12 +153,14 @@ async def run( port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type] workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type] host = str(run_extra_options.pop("host", "localhost")) + fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type] config = uvicorn.Config( self, host=host, port=port, log_level=log_level, workers=workers, + fd=fd if fd != -1 else None, **run_extra_options, ) server = uvicorn.Server(config) diff --git a/faststream/prometheus/container.py b/faststream/prometheus/container.py index 6b5f813f63..6ac764866e 100644 --- a/faststream/prometheus/container.py +++ b/faststream/prometheus/container.py @@ -1,6 +1,10 @@ -from typing import Optional, Sequence +from typing import TYPE_CHECKING, Optional, Sequence, Union, cast -from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from prometheus_client import Counter, Gauge, Histogram + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + from prometheus_client.registry import Collector class MetricsContainer: @@ -43,58 +47,110 @@ def __init__( self._registry = registry self._metrics_prefix = metrics_prefix - self.received_messages_total = Counter( + self.received_messages_total = cast( + Counter, + self._get_registered_metric(f"{metrics_prefix}_received_messages_total"), + ) or Counter( name=f"{metrics_prefix}_received_messages_total", documentation="Count of received messages by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_messages_size_bytes = Histogram( + + self.received_messages_size_bytes = cast( + Histogram, + self._get_registered_metric( + f"{metrics_prefix}_received_messages_size_bytes" + ), + ) or Histogram( name=f"{metrics_prefix}_received_messages_size_bytes", documentation="Histogram of received messages size in bytes by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS, ) - self.received_messages_in_process = Gauge( + + self.received_messages_in_process = cast( + Gauge, + self._get_registered_metric( + f"{metrics_prefix}_received_messages_in_process" + ), + ) or Gauge( name=f"{metrics_prefix}_received_messages_in_process", documentation="Gauge of received messages in process by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_processed_messages_total = Counter( + + self.received_processed_messages_total = cast( + Counter, + self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_total" + ), + ) or Counter( name=f"{metrics_prefix}_received_processed_messages_total", documentation="Count of received processed messages by broker, handler and status", labelnames=["app_name", "broker", "handler", "status"], registry=registry, ) - self.received_processed_messages_duration_seconds = Histogram( + + self.received_processed_messages_duration_seconds = cast( + Histogram, + self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_duration_seconds" + ), + ) or Histogram( name=f"{metrics_prefix}_received_processed_messages_duration_seconds", documentation="Histogram of received processed messages duration in seconds by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_processed_messages_exceptions_total = Counter( + + self.received_processed_messages_exceptions_total = cast( + Counter, + self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_exceptions_total" + ), + ) or Counter( name=f"{metrics_prefix}_received_processed_messages_exceptions_total", documentation="Count of received processed messages exceptions by broker, handler and exception_type", labelnames=["app_name", "broker", "handler", "exception_type"], registry=registry, ) - self.published_messages_total = Counter( + + self.published_messages_total = cast( + Counter, + self._get_registered_metric(f"{metrics_prefix}_published_messages_total"), + ) or Counter( name=f"{metrics_prefix}_published_messages_total", documentation="Count of published messages by destination and status", labelnames=["app_name", "broker", "destination", "status"], registry=registry, ) - self.published_messages_duration_seconds = Histogram( + + self.published_messages_duration_seconds = cast( + Histogram, + self._get_registered_metric( + f"{metrics_prefix}_published_messages_duration_seconds" + ), + ) or Histogram( name=f"{metrics_prefix}_published_messages_duration_seconds", documentation="Histogram of published messages duration in seconds by broker and destination", labelnames=["app_name", "broker", "destination"], registry=registry, ) - self.published_messages_exceptions_total = Counter( + + self.published_messages_exceptions_total = cast( + Counter, + self._get_registered_metric( + f"{metrics_prefix}_published_messages_exceptions_total" + ), + ) or Counter( name=f"{metrics_prefix}_published_messages_exceptions_total", documentation="Count of published messages exceptions by broker, destination and exception_type", labelnames=["app_name", "broker", "destination", "exception_type"], registry=registry, ) + + def _get_registered_metric(self, metric_name: str) -> Union["Collector", None]: + return self._registry._names_to_collectors.get(metric_name) diff --git a/tests/prometheus/basic.py b/tests/prometheus/basic.py index f2d9a5d6cf..1d749af1ee 100644 --- a/tests/prometheus/basic.py +++ b/tests/prometheus/basic.py @@ -11,6 +11,7 @@ from faststream.prometheus.middleware import ( PROCESSING_STATUS_BY_ACK_STATUS, PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP, + BasePrometheusMiddleware, ) from faststream.prometheus.types import ProcessingStatus from tests.brokers.base.basic import BaseTestcaseConfig @@ -21,7 +22,7 @@ class LocalPrometheusTestcase(BaseTestcaseConfig): def get_broker(self, apply_types=False, **kwargs): raise NotImplementedError - def get_middleware(self, **kwargs): + def get_middleware(self, **kwargs) -> BasePrometheusMiddleware: raise NotImplementedError @staticmethod @@ -202,3 +203,18 @@ def assert_publish_metrics(self, metrics_manager: Any): status="success", ), ] + + async def test_one_registry_for_some_middlewares( + self, event: asyncio.Event, queue: str + ) -> None: + registry = CollectorRegistry() + + middleware_1 = self.get_middleware(registry=registry) + middleware_2 = self.get_middleware(registry=registry) + self.get_broker(middlewares=(middleware_1,)) + self.get_broker(middlewares=(middleware_2,)) + + assert ( + middleware_1._metrics_container.received_messages_total + is middleware_2._metrics_container.received_messages_total + )