Skip to content

Commit

Permalink
Merge branch 'airtai:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
spataphore1337 authored Nov 20, 2024
2 parents 706f681 + 4bc70b2 commit b711c6b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
2 changes: 2 additions & 0 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ async def run(
port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type]
workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type]
host = str(run_extra_options.pop("host", "localhost"))
fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type]
config = uvicorn.Config(
self,
host=host,
port=port,
log_level=log_level,
workers=workers,
fd=fd if fd != -1 else None,
**run_extra_options,
)
server = uvicorn.Server(config)
Expand Down
78 changes: 67 additions & 11 deletions faststream/prometheus/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional, Sequence
from typing import TYPE_CHECKING, Optional, Sequence, Union, cast

from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from prometheus_client import Counter, Gauge, Histogram

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
from prometheus_client.registry import Collector


class MetricsContainer:
Expand Down Expand Up @@ -43,58 +47,110 @@ def __init__(
self._registry = registry
self._metrics_prefix = metrics_prefix

self.received_messages_total = Counter(
self.received_messages_total = cast(
Counter,
self._get_registered_metric(f"{metrics_prefix}_received_messages_total"),
) or Counter(
name=f"{metrics_prefix}_received_messages_total",
documentation="Count of received messages by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_messages_size_bytes = Histogram(

self.received_messages_size_bytes = cast(
Histogram,
self._get_registered_metric(
f"{metrics_prefix}_received_messages_size_bytes"
),
) or Histogram(
name=f"{metrics_prefix}_received_messages_size_bytes",
documentation="Histogram of received messages size in bytes by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS,
)
self.received_messages_in_process = Gauge(

self.received_messages_in_process = cast(
Gauge,
self._get_registered_metric(
f"{metrics_prefix}_received_messages_in_process"
),
) or Gauge(
name=f"{metrics_prefix}_received_messages_in_process",
documentation="Gauge of received messages in process by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_processed_messages_total = Counter(

self.received_processed_messages_total = cast(
Counter,
self._get_registered_metric(
f"{metrics_prefix}_received_processed_messages_total"
),
) or Counter(
name=f"{metrics_prefix}_received_processed_messages_total",
documentation="Count of received processed messages by broker, handler and status",
labelnames=["app_name", "broker", "handler", "status"],
registry=registry,
)
self.received_processed_messages_duration_seconds = Histogram(

self.received_processed_messages_duration_seconds = cast(
Histogram,
self._get_registered_metric(
f"{metrics_prefix}_received_processed_messages_duration_seconds"
),
) or Histogram(
name=f"{metrics_prefix}_received_processed_messages_duration_seconds",
documentation="Histogram of received processed messages duration in seconds by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_processed_messages_exceptions_total = Counter(

self.received_processed_messages_exceptions_total = cast(
Counter,
self._get_registered_metric(
f"{metrics_prefix}_received_processed_messages_exceptions_total"
),
) or Counter(
name=f"{metrics_prefix}_received_processed_messages_exceptions_total",
documentation="Count of received processed messages exceptions by broker, handler and exception_type",
labelnames=["app_name", "broker", "handler", "exception_type"],
registry=registry,
)
self.published_messages_total = Counter(

self.published_messages_total = cast(
Counter,
self._get_registered_metric(f"{metrics_prefix}_published_messages_total"),
) or Counter(
name=f"{metrics_prefix}_published_messages_total",
documentation="Count of published messages by destination and status",
labelnames=["app_name", "broker", "destination", "status"],
registry=registry,
)
self.published_messages_duration_seconds = Histogram(

self.published_messages_duration_seconds = cast(
Histogram,
self._get_registered_metric(
f"{metrics_prefix}_published_messages_duration_seconds"
),
) or Histogram(
name=f"{metrics_prefix}_published_messages_duration_seconds",
documentation="Histogram of published messages duration in seconds by broker and destination",
labelnames=["app_name", "broker", "destination"],
registry=registry,
)
self.published_messages_exceptions_total = Counter(

self.published_messages_exceptions_total = cast(
Counter,
self._get_registered_metric(
f"{metrics_prefix}_published_messages_exceptions_total"
),
) or Counter(
name=f"{metrics_prefix}_published_messages_exceptions_total",
documentation="Count of published messages exceptions by broker, destination and exception_type",
labelnames=["app_name", "broker", "destination", "exception_type"],
registry=registry,
)

def _get_registered_metric(self, metric_name: str) -> Union["Collector", None]:
return self._registry._names_to_collectors.get(metric_name)
18 changes: 17 additions & 1 deletion tests/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from faststream.prometheus.middleware import (
PROCESSING_STATUS_BY_ACK_STATUS,
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
BasePrometheusMiddleware,
)
from faststream.prometheus.types import ProcessingStatus
from tests.brokers.base.basic import BaseTestcaseConfig
Expand All @@ -21,7 +22,7 @@ class LocalPrometheusTestcase(BaseTestcaseConfig):
def get_broker(self, apply_types=False, **kwargs):
raise NotImplementedError

def get_middleware(self, **kwargs):
def get_middleware(self, **kwargs) -> BasePrometheusMiddleware:
raise NotImplementedError

@staticmethod
Expand Down Expand Up @@ -202,3 +203,18 @@ def assert_publish_metrics(self, metrics_manager: Any):
status="success",
),
]

async def test_one_registry_for_some_middlewares(
self, event: asyncio.Event, queue: str
) -> None:
registry = CollectorRegistry()

middleware_1 = self.get_middleware(registry=registry)
middleware_2 = self.get_middleware(registry=registry)
self.get_broker(middlewares=(middleware_1,))
self.get_broker(middlewares=(middleware_2,))

assert (
middleware_1._metrics_container.received_messages_total
is middleware_2._metrics_container.received_messages_total
)

0 comments on commit b711c6b

Please sign in to comment.