Skip to content

Commit

Permalink
use same base for monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Jan 7, 2022
1 parent d8e284e commit 91ccbe0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 137 deletions.
37 changes: 31 additions & 6 deletions packages/service-library/src/servicelib/aiohttp/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
import logging
import time
from typing import List, Optional
from typing import Awaitable, Callable, List, Optional

import prometheus_client
from aiohttp import web
Expand Down Expand Up @@ -70,7 +70,14 @@ async def metrics_handler(request: web.Request):
return response


def middleware_factory(app_name: str, excluded_paths: Optional[List[str]] = None):
def middleware_factory(
app_name: str,
excluded_paths: Optional[List[str]] = None,
enter_middleware_cb: Optional[Callable[[web.Request], Awaitable[None]]] = None,
exit_middleware_cb: Optional[
Callable[[web.Request, web.StreamResponse], Awaitable[None]]
] = None,
):
if not excluded_paths:
excluded_paths = []

Expand All @@ -91,6 +98,8 @@ async def middleware_handler(request: web.Request, handler: Handler):

try:
log.debug("ENTERING monitoring middleware for %s", f"{request=}")
if enter_middleware_cb:
await enter_middleware_cb(request)
request[kSTART_TIME] = time.time()

in_flight_gauge = request.app[kINFLIGHTREQUESTS]
Expand Down Expand Up @@ -142,6 +151,9 @@ async def middleware_handler(request: web.Request, handler: Handler):
resp.status,
).inc()

if exit_middleware_cb:
await exit_middleware_cb(request, resp)

if log_exception:
log.error(
'Unexpected server error "%s" from access: %s "%s %s" done '
Expand All @@ -159,12 +171,19 @@ async def middleware_handler(request: web.Request, handler: Handler):
return resp

# adds identifier
middleware_handler.__middleware_name__ = __name__ # SEE check_outermost_middleware
middleware_handler.__middleware_name__ = f"{__name__}.monitor_{app_name}"

return middleware_handler


def setup_monitoring(app: web.Application, app_name: str):
def setup_monitoring(
app: web.Application,
app_name: str,
enter_middleware_cb: Optional[Callable[[web.Request], Awaitable[None]]] = None,
exit_middleware_cb: Optional[
Callable[[web.Request, web.StreamResponse], Awaitable[None]]
] = None,
):
# app-scope registry
app[kCOLLECTOR_REGISTRY] = reg = CollectorRegistry(auto_describe=False)
app[kPROCESS_COLLECTOR] = ProcessCollector(registry=reg)
Expand Down Expand Up @@ -193,7 +212,6 @@ def setup_monitoring(app: web.Application, app_name: str):
registry=reg,
)

log.error("Currently registered metrics: %s", f"{reg.get_target_info()}")
# WARNING: ensure ERROR middleware is over this one
#
# non-API request/response (e.g /metrics, /x/* ...)
Expand All @@ -210,7 +228,14 @@ def setup_monitoring(app: web.Application, app_name: str):
#

# ensures is first layer but cannot guarantee the order setup is applied
app.middlewares.insert(0, middleware_factory(app_name))
app.middlewares.insert(
0,
middleware_factory(
app_name,
enter_middleware_cb=enter_middleware_cb,
exit_middleware_cb=exit_middleware_cb,
),
)

app.router.add_get("/metrics", metrics_handler)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
"""
import logging
import time
from asyncio.exceptions import CancelledError

import prometheus_client
from aiohttp import web
from prometheus_client import CONTENT_TYPE_LATEST, Counter
from prometheus_client.registry import CollectorRegistry
from servicelib.aiohttp.monitor_services import add_instrumentation
from servicelib.aiohttp.typing_extension import Handler, Middleware
from servicelib.aiohttp import monitor_services
from servicelib.aiohttp.monitoring import get_collector_registry
from servicelib.aiohttp.monitoring import setup_monitoring as service_lib_setup

from .diagnostics_core import DelayWindowProbe, is_sensing_enabled, kLATENCY_PROBE

Expand Down Expand Up @@ -43,140 +40,31 @@
kCOLLECTOR_REGISTRY = f"{__name__}.collector_registry"


def get_collector_registry(app: web.Application) -> CollectorRegistry:
return app[kCOLLECTOR_REGISTRY]
async def enter_middleware_cb(request: web.Request):
request[kSTART_TIME] = time.time()


async def metrics_handler(request: web.Request):
registry = get_collector_registry(request.app)

# NOTE: Cannot use ProcessPoolExecutor because registry is not pickable
result = await request.loop.run_in_executor(
None, prometheus_client.generate_latest, registry
)
response = web.Response(body=result)
response.content_type = CONTENT_TYPE_LATEST
return response


def middleware_factory(app_name: str) -> Middleware:
@web.middleware
async def _middleware_handler(request: web.Request, handler: Handler):
if request.rel_url.path == "/socket.io/":
return await handler(request)

log_exception = None
resp: web.StreamResponse = web.HTTPInternalServerError(
reason="Unexpected exception"
)

try:
request[kSTART_TIME] = time.time()

resp = await handler(request)

assert isinstance( # nosec
resp, web.StreamResponse
), "Forgot envelope middleware?"

except web.HTTPServerError as exc:
# Transforms exception into response object and log exception
resp = exc
log_exception = exc

except web.HTTPException as exc:
# Transforms non-HTTPServerError exceptions into response object
resp = exc
log_exception = None

except Exception as exc: # pylint: disable=broad-except
# Transforms unhandled exceptions into responses with status 500
# NOTE: Prevents issue #1025
resp = web.HTTPInternalServerError(reason=str(exc))
resp.__cause__ = exc
log_exception = exc

except CancelledError as exc:
# Mostly for logging
resp = web.HTTPInternalServerError(reason=str(exc))
log_exception = exc
raise

finally:
resp_time_secs: float = time.time() - request[kSTART_TIME]

exc_name = ""
if log_exception:
exc_name: str = log_exception.__class__.__name__

# Probes request latency
# NOTE: sockets connection is long
# FIXME: tmp by hand, add filters directly in probe
if not str(request.path).startswith("/socket.io") and is_sensing_enabled(
request.app
):
request.app[kLATENCY_PROBE].observe(resp_time_secs)

# prometheus probes
request.app[kREQUEST_COUNT].labels(
app_name, request.method, request.path, resp.status, exc_name
).inc()

if log_exception:
log.error(
'Unexpected server error "%s" from access: %s "%s %s" done in %3.2f secs. Responding with status %s',
type(log_exception),
request.remote,
request.method,
request.path,
resp_time_secs,
resp.status,
exc_info=log_exception,
stack_info=True,
)

return resp

# adds identifier
_middleware_handler.__middleware_name__ = f"{__name__}.monitor_{app_name}"

return _middleware_handler
async def exit_middleware_cb(request: web.Request, response: web.StreamResponse):
resp_time_secs: float = time.time() - request[kSTART_TIME]
if not str(request.path).startswith("/socket.io") and is_sensing_enabled(
request.app
):
request.app[kLATENCY_PROBE].observe(resp_time_secs)


def setup_monitoring(app: web.Application):
# app-scope registry
app[kCOLLECTOR_REGISTRY] = reg = CollectorRegistry(auto_describe=True)

# Total number of requests processed
app[kREQUEST_COUNT] = Counter(
name="http_requests_total",
documentation="Total Request Count",
labelnames=["app_name", "method", "endpoint", "http_status", "exception"],
registry=reg,
service_lib_setup(
app,
"simcore_service_webserver",
enter_middleware_cb=enter_middleware_cb,
exit_middleware_cb=exit_middleware_cb,
)

add_instrumentation(app, get_collector_registry(app), "simcore_service_webserver")
monitor_services.add_instrumentation(
app, get_collector_registry(app), "simcore_service_webserver"
)

# on-the fly stats
app[kLATENCY_PROBE] = DelayWindowProbe()

# WARNING: ensure ERROR middleware is over this one
#
# non-API request/response (e.g /metrics, /x/* ...)
# |
# API request/response (/v0/*) |
# | |
# | |
# v |
# ===== monitoring-middleware =====
# == rest-error-middlewarer ==== |
# == ... == |
# == rest-envelope-middleware == v
#
#
app.middlewares.insert(0, middleware_factory("simcore_service_webserver"))

# TODO: in production, it should only be accessible to backend services
app.router.add_get("/metrics", metrics_handler)

return True

0 comments on commit 91ccbe0

Please sign in to comment.