diff --git a/backend/infrahub/cli/git_agent.py b/backend/infrahub/cli/git_agent.py index f5f36e155d..72baa08c57 100644 --- a/backend/infrahub/cli/git_agent.py +++ b/backend/infrahub/cli/git_agent.py @@ -9,7 +9,7 @@ from prometheus_client import start_http_server from rich.logging import RichHandler -from infrahub import config +from infrahub import __version__, config from infrahub.components import ComponentType from infrahub.core.initialization import initialization from infrahub.database import InfrahubDatabase, get_db @@ -21,6 +21,7 @@ from infrahub.services import InfrahubServices from infrahub.services.adapters.cache.redis import RedisCache from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus +from infrahub.trace import configure_trace app = typer.Typer() @@ -66,6 +67,15 @@ async def _start(debug: bool, port: int) -> None: client = await InfrahubClient.init(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=log) await client.branch.all() + # Initialize trace + if config.SETTINGS.trace.enable: + configure_trace( + service="infrahub-git-agent", + version=__version__, + exporter_endpoint=config.SETTINGS.trace.trace_endpoint, + exporter_protocol=config.SETTINGS.trace.exporter_protocol, + ) + # Initialize the lock initialize_lock() diff --git a/backend/infrahub/database/__init__.py b/backend/infrahub/database/__init__.py index a147df7ab7..2482240322 100644 --- a/backend/infrahub/database/__init__.py +++ b/backend/infrahub/database/__init__.py @@ -13,6 +13,7 @@ Record, ) from neo4j.exceptions import ClientError, ServiceUnavailable +from otel_extensions import get_tracer from typing_extensions import Self from infrahub import config @@ -161,19 +162,22 @@ async def close(self): async def execute_query( self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined" ) -> List[Record]: - with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): - if self.is_transaction: - execution_method = await self.transaction() - else: - execution_method = await self.session() - - try: - response = await execution_method.run(query=query, parameters=params) - except ServiceUnavailable as exc: - log.error("Database Service unavailable", error=str(exc)) - raise DatabaseError(message="Unable to connect to the database") from exc - - return [item async for item in response] + with get_tracer(__name__).start_as_current_span("execute_db_query") as span: + span.set_attribute("query", query) + + with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): + if self.is_transaction: + execution_method = await self.transaction() + else: + execution_method = await self.session() + + try: + response = await execution_method.run(query=query, parameters=params) + except ServiceUnavailable as exc: + log.error("Database Service unavailable", error=str(exc)) + raise DatabaseError(message="Unable to connect to the database") from exc + + return [item async for item in response] def render_list_comprehension(self, items: str, item_name: str) -> str: if self.db_type == DatabaseType.MEMGRAPH: diff --git a/backend/infrahub/graphql/mutations/branch.py b/backend/infrahub/graphql/mutations/branch.py index dee5099869..9241d28ea7 100644 --- a/backend/infrahub/graphql/mutations/branch.py +++ b/backend/infrahub/graphql/mutations/branch.py @@ -5,6 +5,7 @@ from graphql import GraphQLResolveInfo from infrahub_sdk.utils import extract_fields, extract_fields_first_node from typing_extensions import Self +from otel_extensions import instrumented from infrahub import config, lock from infrahub.core import registry @@ -46,6 +47,7 @@ class Arguments: ok = Boolean() object = Field(BranchType) + @instrumented @classmethod async def mutate( cls, root: dict, info: GraphQLResolveInfo, data: BranchCreateInput, background_execution: bool = False diff --git a/backend/infrahub/message_bus/operations/__init__.py b/backend/infrahub/message_bus/operations/__init__.py index 2dd0e4e963..902c1cdf71 100644 --- a/backend/infrahub/message_bus/operations/__init__.py +++ b/backend/infrahub/message_bus/operations/__init__.py @@ -13,6 +13,8 @@ transform, trigger, ) +from otel_extensions import get_tracer + from infrahub.message_bus.types import MessageTTL from infrahub.services import InfrahubServices from infrahub.tasks.check import set_check_status @@ -68,19 +70,22 @@ async def execute_message(routing_key: str, message_body: bytes, service: InfrahubServices): - message_data = json.loads(message_body) - message = messages.MESSAGE_MAP[routing_key](**message_data) - message.set_log_data(routing_key=routing_key) - try: - await COMMAND_MAP[routing_key](message=message, service=service) - except Exception as exc: # pylint: disable=broad-except - if message.reply_requested: - response = RPCErrorResponse(errors=[str(exc)], initial_message=message.model_dump()) - await service.reply(message=response, initiator=message) - return - if message.reached_max_retries: - service.log.exception("Message failed after maximum number of retries", error=exc) - await set_check_status(message, conclusion="failure", service=service) - return - message.increase_retry_count() - await service.send(message, delay=MessageTTL.FIVE) + with get_tracer(__name__).start_as_current_span("execute_message") as span: + span.set_attribute("routing_key", routing_key) + + message_data = json.loads(message_body) + message = messages.MESSAGE_MAP[routing_key](**message_data) + message.set_log_data(routing_key=routing_key) + try: + await COMMAND_MAP[routing_key](message=message, service=service) + except Exception as exc: # pylint: disable=broad-except + if message.reply_requested: + response = RPCErrorResponse(errors=[str(exc)], initial_message=message.model_dump()) + await service.reply(message=response, initiator=message) + return + if message.reached_max_retries: + service.log.exception("Message failed after maximum number of retries", error=exc) + await set_check_status(message, conclusion="failure", service=service) + return + message.increase_retry_count() + await service.send(message, delay=MessageTTL.FIVE) diff --git a/backend/infrahub/server.py b/backend/infrahub/server.py index d114b45af4..549252c289 100644 --- a/backend/infrahub/server.py +++ b/backend/infrahub/server.py @@ -13,7 +13,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from infrahub_sdk.timestamp import TimestampFormatError -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor, Span from pydantic import ValidationError from starlette_exporter import PrometheusMiddleware, handle_metrics @@ -32,7 +32,7 @@ from infrahub.services import InfrahubServices, services from infrahub.services.adapters.cache.redis import RedisCache from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus -from infrahub.trace import add_span_exception, configure_trace, get_traceid, get_tracer +from infrahub.trace import add_span_exception, configure_trace, get_traceid from infrahub.worker import WORKER_IDENTITY @@ -42,8 +42,8 @@ async def app_initialization(application: FastAPI) -> None: # Initialize trace if config.SETTINGS.trace.enable: configure_trace( + service="infrahub-server", version=__version__, - exporter_type=config.SETTINGS.trace.exporter_type, exporter_endpoint=config.SETTINGS.trace.trace_endpoint, exporter_protocol=config.SETTINGS.trace.exporter_protocol, ) @@ -92,8 +92,13 @@ async def lifespan(application: FastAPI): redoc_url="/api/redoc", ) -FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics") -tracer = get_tracer() + +def server_request_hook(span: Span, scope: dict): # pylint: disable=unused-argument + if span and span.is_recording(): + span.set_attribute("worker", WORKER_IDENTITY) + + +FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics", server_request_hook=server_request_hook) FRONTEND_DIRECTORY = os.environ.get("INFRAHUB_FRONTEND_DIRECTORY", os.path.abspath("frontend")) FRONTEND_ASSET_DIRECTORY = f"{FRONTEND_DIRECTORY}/dist/assets" @@ -115,15 +120,17 @@ async def lifespan(application: FastAPI): async def logging_middleware(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response: clear_log_context() request_id = correlation_id.get() - with tracer.start_as_current_span("processing request " + request_id): - trace_id = get_traceid() - set_log_data(key="request_id", value=request_id) - set_log_data(key="app", value="infrahub.api") - set_log_data(key="worker", value=WORKER_IDENTITY) - if trace_id: - set_log_data(key="trace_id", value=trace_id) - response = await call_next(request) - return response + + set_log_data(key="request_id", value=request_id) + set_log_data(key="app", value="infrahub.api") + set_log_data(key="worker", value=WORKER_IDENTITY) + + trace_id = get_traceid() + if trace_id: + set_log_data(key="trace_id", value=trace_id) + + response = await call_next(request) + return response @app.middleware("http") diff --git a/backend/infrahub/services/adapters/message_bus/rabbitmq.py b/backend/infrahub/services/adapters/message_bus/rabbitmq.py index 9e59d64fa2..89763e288b 100644 --- a/backend/infrahub/services/adapters/message_bus/rabbitmq.py +++ b/backend/infrahub/services/adapters/message_bus/rabbitmq.py @@ -5,7 +5,11 @@ from typing import TYPE_CHECKING, Awaitable, Callable, List, MutableMapping, Optional, Type, TypeVar import aio_pika +import opentelemetry.instrumentation.aio_pika.span_builder from infrahub_sdk import UUIDT +from opentelemetry import context, propagate +from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor +from opentelemetry.semconv.trace import SpanAttributes from infrahub import config from infrahub.components import ComponentType @@ -24,6 +28,7 @@ AbstractQueue, AbstractRobustConnection, ) + from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder from infrahub.config import BrokerSettings from infrahub.services import InfrahubServices @@ -32,6 +37,29 @@ ResponseClass = TypeVar("ResponseClass") +AioPikaInstrumentor().instrument() + + +# TODO: remove this once https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1835 is resolved +def patch_spanbuilder_set_channel() -> None: + """ + The default SpanBuilder.set_channel does not work with aio_pika 9.1 and the refactored connection + attribute + """ + + def set_channel(self: SpanBuilder, channel: AbstractChannel) -> None: + if hasattr(channel, "_connection"): + url = channel._connection.url + self._attributes.update( + { + SpanAttributes.NET_PEER_NAME: url.host, + SpanAttributes.NET_PEER_PORT: url.port, + } + ) + + opentelemetry.instrumentation.aio_pika.span_builder.SpanBuilder.set_channel = set_channel # type: ignore + + async def _add_request_id(message: InfrahubMessage) -> None: log_data = get_log_data() message.meta.request_id = log_data.get("request_id", "") @@ -54,6 +82,8 @@ def __init__(self, settings: Optional[BrokerSettings] = None) -> None: self.futures: MutableMapping[str, asyncio.Future] = {} async def initialize(self, service: InfrahubServices) -> None: + patch_spanbuilder_set_channel() + self.service = service self.connection = await aio_pika.connect_robust( host=self.settings.address, @@ -193,17 +223,28 @@ async def subscribe(self) -> None: async for message in qiterator: try: async with message.process(requeue=False): + # auto instrumentation not supported yet for RPCs, do it ourselves... + token = None + headers = message.headers or {} + ctx = propagate.extract(headers) + if ctx is not None: + token = context.attach(ctx) + clear_log_context() - if message.routing_key in messages.MESSAGE_MAP: - await execute_message( - routing_key=message.routing_key, message_body=message.body, service=self.service - ) - else: - self.service.log.error( - "Unhandled routing key for message", - routing_key=message.routing_key, - message=message.body, - ) + try: + if message.routing_key in messages.MESSAGE_MAP: + await execute_message( + routing_key=message.routing_key, message_body=message.body, service=self.service + ) + else: + self.service.log.error( + "Unhandled routing key for message", + routing_key=message.routing_key, + message=message.body, + ) + finally: + if token is not None: + context.detach(token) except Exception: # pylint: disable=broad-except self.service.log.exception("Processing error for message %r" % message) diff --git a/backend/infrahub/trace.py b/backend/infrahub/trace.py index 689b1f7aac..6362408f29 100644 --- a/backend/infrahub/trace.py +++ b/backend/infrahub/trace.py @@ -1,18 +1,6 @@ from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( - OTLPSpanExporter as GRPCSpanExporter, -) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter as HTTPSpanExporter, -) -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.trace import StatusCode - - -def get_tracer(name: str = "infrahub") -> trace.Tracer: - return trace.get_tracer(name) +from otel_extensions import TelemetryOptions, init_telemetry_provider def get_current_span_with_context() -> trace.Span: @@ -54,42 +42,12 @@ def add_span_exception(exception: Exception) -> None: current_span.record_exception(exception) -def create_tracer_provider( - version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None -) -> TracerProvider: - # Create a BatchSpanProcessor exporter based on the type - if exporter_type == "console": - exporter = ConsoleSpanExporter() - elif exporter_type == "otlp": - if not exporter_endpoint: - raise ValueError("Exporter type is set to otlp but endpoint is not set") - if exporter_protocol == "http/protobuf": - exporter = HTTPSpanExporter(endpoint=exporter_endpoint) - elif exporter_protocol == "grpc": - exporter = GRPCSpanExporter(endpoint=exporter_endpoint) - else: - raise ValueError("Exporter type unsupported by Infrahub") - - # Resource can be required for some backends, e.g. Jaeger - resource = Resource(attributes={"service.name": "infrahub", "service.version": version}) - span_processor = BatchSpanProcessor(exporter) - tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor(span_processor) - - return tracer_provider - - def configure_trace( - version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None + service: str, version: str, exporter_endpoint: str | None = None, exporter_protocol: str = None ) -> None: - # Create a trace provider with the exporter - tracer_provider = create_tracer_provider( - version=version, - exporter_type=exporter_type, - exporter_endpoint=exporter_endpoint, - exporter_protocol=exporter_protocol, + options = TelemetryOptions( + OTEL_SERVICE_NAME=service, + OTEL_EXPORTER_OTLP_ENDPOINT=exporter_endpoint, + OTEL_EXPORTER_OTLP_PROTOCOL=exporter_protocol, ) - tracer_provider.get_tracer(__name__) - - # Register the trace provider - trace.set_tracer_provider(tracer_provider) + init_telemetry_provider(options, **{"service.version": version}) diff --git a/development/docker-compose.override.yml.tmp b/development/docker-compose.override.yml.tmp index aacccba4a6..33e077ec11 100644 --- a/development/docker-compose.override.yml.tmp +++ b/development/docker-compose.override.yml.tmp @@ -3,7 +3,7 @@ version: "3.4" services: # -------------------------------------------------------------------------------- # - Prometheus to collect all metrics endpoints - # - Tempo to receive traces + # - Tempo or Jaeger to receive traces # - Grafana to visualize these metrics # - Loki to receive logs from promtail # - Promtail to parse logs from different source @@ -43,6 +43,13 @@ services: ports: - "3200:3200" + # jaeger: + # image: jaegertracing/all-in-one:1.53 + # environment: + # COLLECTOR_ZIPKIN_HOST_PORT: ":9411" + # ports: + # - "16686:16686" + prometheus: image: prom/prometheus:latest volumes: diff --git a/development/infrahub.toml b/development/infrahub.toml index 7aa6b1121f..c8aec827fc 100644 --- a/development/infrahub.toml +++ b/development/infrahub.toml @@ -30,7 +30,7 @@ enable = false insecure = "True" exporter_type = "otlp" exporter_protocol = "grpc" -exporter_endpoint = "tempo" +exporter_endpoint = "jaeger" exporter_port = 4317 diff --git a/poetry.lock b/poetry.lock index 78d53e040b..389794b5cf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. [[package]] name = "aio-pika" @@ -2523,6 +2523,25 @@ opentelemetry-api = ">=1.4,<2.0" setuptools = ">=16.0" wrapt = ">=1.0.0,<2.0.0" +[[package]] +name = "opentelemetry-instrumentation-aio-pika" +version = "0.43b0" +description = "OpenTelemetry Aio-pika instrumentation" +optional = false +python-versions = ">=3.7" +files = [ + {file = "opentelemetry_instrumentation_aio_pika-0.43b0-py3-none-any.whl", hash = "sha256:1ca914033c093d3c720bd74116456df61c3a67b46f90af6b0cb9afda377ae59f"}, + {file = "opentelemetry_instrumentation_aio_pika-0.43b0.tar.gz", hash = "sha256:90847c68f8fd4ff40818e694e7f6f26d01616e617c8aa50f6de5bfba33f33e3e"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.5,<2.0" +wrapt = ">=1.0.0,<2.0.0" + +[package.extras] +instruments = ["aio-pika (>=7.2.0,<10.0.0)"] +test = ["opentelemetry-instrumentation-aio-pika[instruments]", "opentelemetry-test-utils (==0.43b0)", "pytest", "wrapt (>=1.0.0,<2.0.0)"] + [[package]] name = "opentelemetry-instrumentation-asgi" version = "0.42b0" @@ -2633,6 +2652,21 @@ files = [ [package.extras] dev = ["black", "mypy", "pytest"] +[[package]] +name = "otel-extensions" +version = "1.0.1" +description = "Python extensions for OpenTelemetry" +optional = false +python-versions = ">=3.7" +files = [ + {file = "otel-extensions-1.0.1.tar.gz", hash = "sha256:b301ae271f37d7405fed648be92619f4af936e03fb96bf306195bdcf36478f2b"}, + {file = "otel_extensions-1.0.1-py2.py3-none-any.whl", hash = "sha256:45f548bf264424c5212be429c4d80687fcf59390cc0c6de32f91829d4a2bb8bb"}, +] + +[package.dependencies] +opentelemetry-api = "*" +opentelemetry-sdk = "*" + [[package]] name = "packaging" version = "23.2" @@ -3493,7 +3527,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -4722,4 +4755,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = "^3.8, < 3.13" -content-hash = "4c8c4ddb6522969329b15a63602e674edb7559c2b62225388c43a17db5b40571" +content-hash = "a9b9819f305f345c8ba3f646d6251e1867253e48020df0120494437b2c320aae" diff --git a/pyproject.toml b/pyproject.toml index f47363beb0..c5ac61ed60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,8 @@ pydantic-settings = "^2.1" fastapi-storages = "~0.2" pytest = "*" lunr = "^0.7.0.post1" +otel-extensions = "1.0.1" +opentelemetry-instrumentation-aio-pika = "^0.43b0" [tool.poetry.group.server.dependencies] fastapi = "~0.108"