Skip to content

Commit

Permalink
OpenTelemetry tracing, with jaeger in development environment.
Browse files Browse the repository at this point in the history
For developers - includes an instance of Jaeger in the docker-compose development environment, available at port 16686.

For production - supports OTLP/http exported traces to a specified endpoint in the configuration files.

Please see docs/tracing.md for more details.

Signed-off-by: Mike Kingsbury <mike.kingsbury@redhat.com>
  • Loading branch information
mike-kingsbury committed Nov 1, 2023
1 parent 750d47d commit ef63f09
Show file tree
Hide file tree
Showing 25 changed files with 1,384 additions and 170 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Documents that outgrew this README can be found in the `docs/` drectory.
* [pip.md](./docs/pip.md) is a guide for using pip with Cachito
* [using_requests_locally.md](./docs/using_requests_locally.md) explains how to use Cachito
requests to run builds on your PC
* [tracing.md](./docs/tracing.md) documents Cachito's support for OpenTelemetry tracing

## Coding Standards

Expand Down Expand Up @@ -548,6 +549,8 @@ Custom configuration for the Celery workers are listed below:
* `cachito_subprocess_timeout` - a number (in seconds) to set a timeout for commands executed by
the `subprocess` module. Default is 3600 seconds. A timeout is always required, and there is no
way provided by Cachito to disable it. Set a larger number to give the subprocess execution more time.
* `cachito_otlp_exporter_endpoint` - A valid URL with a port number as necessary to a OTLP/http-compatible
endpoint to receive OpenTelemetry trace data.

To configure the workers to use a Kerberos keytab for authentication, set the `KRB5_CLIENT_KTNAME`
environment variable to the path of the keytab. Additional Kerberos configuration can be made in
Expand All @@ -574,6 +577,8 @@ Custom configuration for the API:
* `CACHITO_WORKER_USERNAMES` - the list of usernames that are allowed to use the `/requests/<id>`
PATCH endpoint.
* `LOGIN_DISABLED` - disables authentication requirements.
* `CACHITO_OTLP_EXPORTER_ENDPOINT` - A valid URL with a port number as necessary to a OTLP/http-compatible
endpoint to receive OpenTelemetry trace data.

Additionally, to configure the communication with the Cachito Celery workers, create a Python file
at `/etc/cachito/celery.py`, and set the
Expand Down
23 changes: 21 additions & 2 deletions cachito/web/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from celery import chain
from flask import stream_with_context
from flask_login import current_user, login_required
from opentelemetry import trace
from sqlalchemy import and_, func
from sqlalchemy.orm import joinedload, load_only
from werkzeug.exceptions import BadRequest, Forbidden, Gone, InternalServerError, NotFound
Expand Down Expand Up @@ -43,6 +44,8 @@

api_v1 = flask.Blueprint("api_v1", __name__)

tracer = trace.get_tracer(__name__)


class RequestsArgs(pydantic.BaseModel):
"""Query parameters for /request endpoint."""
Expand Down Expand Up @@ -153,6 +156,7 @@ def get_requests():
return flask.jsonify(response)


@tracer.start_as_current_span("get_request")
def get_request(request_id):
"""
Retrieve details for the given request.
Expand Down Expand Up @@ -231,6 +235,7 @@ def get_request_environment_variables(request_id):
return flask.jsonify(env_vars_json)


@tracer.start_as_current_span("download_archive")
def download_archive(request_id):
"""
Download archive of source code.
Expand Down Expand Up @@ -311,6 +316,7 @@ def list_packages_and_dependencies(request_id):


@login_required
@tracer.start_as_current_span("create_request")
def create_request():
"""
Submit a request to resolve and cache the given source code and its dependencies.
Expand Down Expand Up @@ -342,12 +348,25 @@ def create_request():
cachito_metrics["gauge_state"].labels(state="total").inc()
cachito_metrics["gauge_state"].labels(state=request.state.state_name).inc()

ctx = trace.get_current_span().get_span_context()
# Format the trace_id to a conventional 32 digit hexadecimal number that can be used
# by jaeger or other endpoints for tracing.
trace_id = "{trace:032x}".format(trace=ctx.trace_id)

current_span = trace.get_current_span()
current_span.set_attribute("cachito.request.id", request.id)

if current_user.is_authenticated:
flask.current_app.logger.info(
"The user %s submitted request %d", current_user.username, request.id
"The user %s submitted request %d; trace_id: %s",
current_user.username,
request.id,
trace_id,
)
else:
flask.current_app.logger.info("An anonymous user submitted request %d", request.id)
flask.current_app.logger.info(
"An anonymous user submitted request %d; trace_id: %s", request.id, trace_id
)

# Chain tasks
error_callback = tasks.failed_request_callback.s(request.id)
Expand Down
65 changes: 64 additions & 1 deletion cachito/web/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@
from flask.logging import default_handler
from flask_login import LoginManager
from flask_migrate import Migrate
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.exceptions import InternalServerError, default_exceptions

Expand Down Expand Up @@ -133,10 +144,62 @@ def create_app(config_obj=None):
app.register_error_handler(pydantic.ValidationError, validation_error)

init_metrics(app)

_instrument_app(app)
return app


def _instrument_app(app):
"""
Instrument the Flask app.
Sets up the OpenTelemetry tracing exporter, configures the endpoint
to send trace data to.
"""
# Some of the following has already been executed due to the manner in which
# the tasks config is included....

service_name = "cachito-api"
resource = Resource(attributes={SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)

# Used for local development environment aka docker-compose up.
if "CACHITO_JAEGER_EXPORTER_ENDPOINT" in app.config.keys():
app.logger.info("Configuring Jaeger Exporter")
jaeger_exporter = JaegerExporter(
agent_host_name=app.config["CACHITO_JAEGER_EXPORTER_ENDPOINT"],
agent_port=int(app.config["CACHITO_JAEGER_EXPORTER_PORT"]),
)
processor = BatchSpanProcessor(jaeger_exporter)
# test/stage/prod environments....
elif "CACHITO_OTLP_EXPORTER_ENDPOINT" in app.config.keys():
app.logger.info(
"Configuring OTLP Exporter: " + str(app.config["CACHITO_OTLP_EXPORTER_ENDPOINT"])
)
otlp_exporter = OTLPSpanExporter(endpoint=app.config["CACHITO_OTLP_EXPORTER_ENDPOINT"])
processor = BatchSpanProcessor(otlp_exporter)
# Undefined; send data to the console.
else:
app.logger.info("Configuring ConsoleSpanExporter")
processor = BatchSpanProcessor(ConsoleSpanExporter())

# Toggle between sending to jaeger and displaying span info on console
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

FlaskInstrumentor().instrument_app(
app, excluded_urls="/static/*,/favicon.ico,/metrics,/healthcheck"
)
RequestsInstrumentor().instrument()
CeleryInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(
enable_commenter=True,
commenter_options={
"db_driver": True,
},
)
Psycopg2Instrumentor().instrument(enable_commenter=True, commenter_options={})


def create_cli_app():
"""
Create a Flask application instance and validate the configuration for the Flask CLI.
Expand Down
2 changes: 2 additions & 0 deletions cachito/web/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class DevelopmentConfig(Config):
SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://cachito:cachito@db:5432/cachito"
SQLALCHEMY_TRACK_MODIFICATIONS = True
LOGIN_DISABLED = True
CACHITO_JAEGER_EXPORTER_ENDPOINT = "jaeger"
CACHITO_JAEGER_EXPORTER_PORT = 6831


class TestingConfig(DevelopmentConfig):
Expand Down
2 changes: 0 additions & 2 deletions cachito/web/content_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def process_gomod(self, package, dependency, type="icm"):
:param type: icm or sbom component
"""
if dependency.type == "gomod":

parent_module_name = package.name
relpath_from_parent_module_to_dep = None

Expand Down Expand Up @@ -339,7 +338,6 @@ def to_json(self):
self._gitsubmodule_data = {}

for package in self.packages:

if package.type == "go-package":
purl = to_top_level_purl(package, self.request, subpath=package.path)
self._gopkg_data.setdefault(
Expand Down
30 changes: 16 additions & 14 deletions cachito/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
from tarfile import ExtractError, TarFile
from typing import Iterator

from opentelemetry import trace

from cachito.errors import SubprocessCallError
from cachito.workers.config import get_worker_config
from cachito.workers.errors import CachitoCalledProcessError

log = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)


def run_cmd(cmd, params, exc_msg=None):
Expand All @@ -29,22 +32,22 @@ def run_cmd(cmd, params, exc_msg=None):
params.setdefault("capture_output", True)
params.setdefault("universal_newlines", True)
params.setdefault("encoding", "utf-8")
with tracer.start_as_current_span("running cmd " + " ".join(cmd)):
conf = get_worker_config()
params.setdefault("timeout", conf.cachito_subprocess_timeout)

conf = get_worker_config()
params.setdefault("timeout", conf.cachito_subprocess_timeout)

try:
response = subprocess.run(cmd, **params) # nosec
except subprocess.TimeoutExpired as e:
raise SubprocessCallError(str(e))
try:
response = subprocess.run(cmd, **params) # nosec
except subprocess.TimeoutExpired as e:
raise SubprocessCallError(str(e))

if response.returncode != 0:
log.error('The command "%s" failed with: %s', " ".join(cmd), response.stderr)
raise CachitoCalledProcessError(
exc_msg or "An unexpected error occurred", response.returncode
)
if response.returncode != 0:
log.error('The command "%s" failed with: %s', " ".join(cmd), response.stderr)
raise CachitoCalledProcessError(
exc_msg or "An unexpected error occurred", response.returncode
)

return response.stdout
return response.stdout


def load_json_stream(s: str) -> Iterator:
Expand Down Expand Up @@ -79,7 +82,6 @@ def safe_extract(tar: TarFile, path: str = ".", *, numeric_owner: bool = False):
"""
abs_path = Path(path).resolve()
for member in tar.getmembers():

member_path = Path(path).joinpath(member.name)
abs_member_path = member_path.resolve()

Expand Down
21 changes: 18 additions & 3 deletions cachito/workers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

import celery
import kombu
from opentelemetry.instrumentation.requests import RequestsInstrumentor

from cachito.errors import ConfigError

ARCHIVES_VOLUME = os.path.join(tempfile.gettempdir(), "cachito-archives")

RequestsInstrumentor().instrument()


app = celery.Celery()


Expand Down Expand Up @@ -68,6 +72,9 @@ class Config(object):
cachito_task_log_format = (
"[%(asctime)s #%(request_id)s %(name)s %(levelname)s %(module)s.%(funcName)s] %(message)s"
)
cachito_jaeger_exporter_endpoint: Optional[str] = ""
cachito_jaeger_exporter_port: Optional[int]
cachito_otlp_exporter_endpoint: Optional[str] = ""
include = [
"cachito.workers.tasks.general",
"cachito.workers.tasks.gomod",
Expand Down Expand Up @@ -140,6 +147,8 @@ class DevelopmentConfig(Config):
}
cachito_request_file_logs_dir: Optional[str] = "/var/log/cachito/requests"
cachito_sources_dir = os.path.join(ARCHIVES_VOLUME, "sources")
cachito_jaeger_exporter_endpoint = "jaeger"
cachito_jaeger_exporter_port = 6831


class TestingConfig(DevelopmentConfig):
Expand All @@ -166,6 +175,14 @@ def configure_celery(celery_app):
:param celery.Celery celery: the Celery application instance to configure
"""
config = get_config()

celery_app.config_from_object(config, force=True)
logging.getLogger("cachito.workers").setLevel(celery_app.conf.cachito_log_level)


def get_config():
"""Read in the config based on the environment."""
config = ProductionConfig
prod_config_file_path = "/etc/cachito/celery.py"
if os.getenv("CACHITO_DEV", "").lower() == "true":
Expand All @@ -190,9 +207,7 @@ def configure_celery(celery_app):
# The _user_config dictionary will contain the __builtins__ key, which we need to skip
if not key.startswith("__"):
setattr(config, key, value)

celery_app.config_from_object(config, force=True)
logging.getLogger("cachito.workers").setLevel(celery_app.conf.cachito_log_level)
return config


def validate_celery_config(conf, **kwargs):
Expand Down
8 changes: 8 additions & 0 deletions cachito/workers/pkg_managers/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import aiohttp
import aiohttp_retry
import requests
from opentelemetry import trace

from cachito.common.checksum import hash_file
from cachito.errors import InvalidChecksum, InvalidRequestData, NetworkError, UnknownHashAlgorithm
Expand All @@ -27,6 +28,7 @@
]

log = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)

ChecksumInfo = collections.namedtuple("ChecksumInfo", "algorithm hexdigest")

Expand All @@ -45,6 +47,7 @@ def _get_request_url(request_id):
return f'{config.cachito_api_url.rstrip("/")}/requests/{request_id}'


@tracer.start_as_current_span("update_request_with_config_files")
def update_request_with_config_files(request_id, config_files):
"""
Update the Cachito request with the input configuration files.
Expand Down Expand Up @@ -77,6 +80,7 @@ def update_request_with_config_files(request_id, config_files):
raise InvalidRequestData(f"Adding configuration files on request {request_id} failed")


@tracer.start_as_current_span("update_request_env_vars")
def update_request_env_vars(request_id: int, env_vars: Dict[str, Dict[str, str]]) -> None:
"""Update environment variables of a request.
Expand Down Expand Up @@ -111,6 +115,7 @@ def update_request_env_vars(request_id: int, env_vars: Dict[str, Dict[str, str]]
raise InvalidRequestData(f"Updating environment variables on request {request_id} failed")


@tracer.start_as_current_span("verify_checksum")
def verify_checksum(file_path: str, checksum_info: ChecksumInfo, chunk_size: int = 10240):
"""
Verify the checksum of the file at the given path matches the expected checksum info.
Expand Down Expand Up @@ -138,6 +143,7 @@ def verify_checksum(file_path: str, checksum_info: ChecksumInfo, chunk_size: int
raise InvalidChecksum(msg)


@tracer.start_as_current_span("download_binary_file")
def download_binary_file(url, download_path, auth=None, insecure=False, chunk_size=8192):
"""
Download a binary file (such as a TAR archive) from a URL.
Expand Down Expand Up @@ -205,6 +211,7 @@ async def async_download_binary_file(
log.debug(f"Download completed - {tarball_name}")


@tracer.start_as_current_span("download_raw_component")
def download_raw_component(raw_component_name, raw_repo_name, download_path, nexus_auth):
"""
Download raw component if present in raw repo.
Expand All @@ -222,6 +229,7 @@ def download_raw_component(raw_component_name, raw_repo_name, download_path, nex
return False


@tracer.start_as_current_span("upload_raw_package")
def upload_raw_package(repo_name, artifact_path, dest_dir, filename, is_request_repository):
"""
Upload a raw package to a Nexus repository.
Expand Down
Loading

0 comments on commit ef63f09

Please sign in to comment.