diff --git a/charmcraft.yaml b/charmcraft.yaml index b2a69ac..74e2483 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -129,10 +129,10 @@ parts: config: options: - retention_period_hours: + retention-period: description: | Maximum trace retention period, in hours. This will be used to configure the compactor to clean up trace data after this time. - Defaults to 720 hours, which is equivalent to 30 days. + Defaults to 720 hours, which is equivalent to 30 days. Per-stream retention limits are currently not supported. type: int default: 720 always_enable_zipkin: diff --git a/lib/charms/tempo_k8s/v1/charm_tracing.py b/lib/charms/tempo_k8s/v1/charm_tracing.py index 4306b4d..db977ba 100644 --- a/lib/charms/tempo_k8s/v1/charm_tracing.py +++ b/lib/charms/tempo_k8s/v1/charm_tracing.py @@ -172,14 +172,64 @@ def my_tracing_endpoint(self) -> Optional[str]: provide an *absolute* path to the certificate file instead. """ + +def _remove_stale_otel_sdk_packages(): + """Hack to remove stale opentelemetry sdk packages from the charm's python venv. + + See https://github.com/canonical/grafana-agent-operator/issues/146 and + https://bugs.launchpad.net/juju/+bug/2058335 for more context. This patch can be removed after + this juju issue is resolved and sufficient time has passed to expect most users of this library + have migrated to the patched version of juju. When this patch is removed, un-ignore rule E402 for this file in the pyproject.toml (see setting + [tool.ruff.lint.per-file-ignores] in pyproject.toml). + + This only has an effect if executed on an upgrade-charm event. + """ + # all imports are local to keep this function standalone, side-effect-free, and easy to revert later + import os + + if os.getenv("JUJU_DISPATCH_PATH") != "hooks/upgrade-charm": + return + + import logging + import shutil + from collections import defaultdict + + from importlib_metadata import distributions + + otel_logger = logging.getLogger("charm_tracing_otel_patcher") + otel_logger.debug("Applying _remove_stale_otel_sdk_packages patch on charm upgrade") + # group by name all distributions starting with "opentelemetry_" + otel_distributions = defaultdict(list) + for distribution in distributions(): + name = distribution._normalized_name # type: ignore + if name.startswith("opentelemetry_"): + otel_distributions[name].append(distribution) + + otel_logger.debug(f"Found {len(otel_distributions)} opentelemetry distributions") + + # If we have multiple distributions with the same name, remove any that have 0 associated files + for name, distributions_ in otel_distributions.items(): + if len(distributions_) <= 1: + continue + + otel_logger.debug(f"Package {name} has multiple ({len(distributions_)}) distributions.") + for distribution in distributions_: + if not distribution.files: # Not None or empty list + path = distribution._path # type: ignore + otel_logger.info(f"Removing empty distribution of {name} at {path}.") + shutil.rmtree(path) + + otel_logger.debug("Successfully applied _remove_stale_otel_sdk_packages patch. ") + + +_remove_stale_otel_sdk_packages() + import functools import inspect import logging import os -import shutil from contextlib import contextmanager from contextvars import Context, ContextVar, copy_context -from importlib.metadata import distributions from pathlib import Path from typing import ( Any, @@ -219,7 +269,7 @@ def my_tracing_endpoint(self) -> Optional[str]: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 13 +LIBPATCH = 15 PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"] @@ -229,7 +279,6 @@ def my_tracing_endpoint(self) -> Optional[str]: # set this to 0 if you are debugging/developing this library source dev_logger.setLevel(logging.CRITICAL) - _CharmType = Type[CharmBase] # the type CharmBase and any subclass thereof _C = TypeVar("_C", bound=_CharmType) _T = TypeVar("_T", bound=type) @@ -281,9 +330,22 @@ def _get_tracer() -> Optional[Tracer]: try: return tracer.get() except LookupError: + # fallback: this course-corrects for a user error where charm_tracing symbols are imported + # from different paths (typically charms.tempo_k8s... and lib.charms.tempo_k8s...) try: ctx: Context = copy_context() if context_tracer := _get_tracer_from_context(ctx): + logger.warning( + "Tracer not found in `tracer` context var. " + "Verify that you're importing all `charm_tracing` symbols from the same module path. \n" + "For example, DO" + ": `from charms.lib...charm_tracing import foo, bar`. \n" + "DONT: \n" + " \t - `from charms.lib...charm_tracing import foo` \n" + " \t - `from lib...charm_tracing import bar` \n" + "For more info: https://python-notes.curiousefficiency.org/en/latest/python" + "_concepts/import_traps.html#the-double-import-trap" + ) return context_tracer.get() else: return None @@ -361,30 +423,6 @@ def _get_server_cert( return server_cert -def _remove_stale_otel_sdk_packages(): - """Hack to remove stale opentelemetry sdk packages from the charm's python venv. - - See https://github.com/canonical/grafana-agent-operator/issues/146 and - https://bugs.launchpad.net/juju/+bug/2058335 for more context. This patch can be removed after - this juju issue is resolved and sufficient time has passed to expect most users of this library - have migrated to the patched version of juju. - - This only does something if executed on an upgrade-charm event. - """ - if os.getenv("JUJU_DISPATCH_PATH") == "hooks/upgrade-charm": - logger.debug("Executing _remove_stale_otel_sdk_packages patch on charm upgrade") - # Find any opentelemetry_sdk distributions - otel_sdk_distributions = list(distributions(name="opentelemetry_sdk")) - # If there is more than 1, inspect each and if it has 0 entrypoints, infer that it is stale - if len(otel_sdk_distributions) > 1: - for distribution in otel_sdk_distributions: - if len(distribution.entry_points) == 0: - # Distribution appears to be empty. Remove it - path = distribution._path # type: ignore - logger.debug(f"Removing empty opentelemetry_sdk distribution at: {path}") - shutil.rmtree(path) - - def _setup_root_span_initializer( charm_type: _CharmType, tracing_endpoint_attr: str, @@ -420,7 +458,6 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): # apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. # it could be trouble if someone ever decides to implement their own tracer parallel to # ours and before the charm has inited. We assume they won't. - _remove_stale_otel_sdk_packages() resource = Resource.create( attributes={ "service.name": _service_name, diff --git a/lib/charms/traefik_k8s/v2/ingress.py b/lib/charms/traefik_k8s/v2/ingress.py index 407cfb5..bb7ac5e 100644 --- a/lib/charms/traefik_k8s/v2/ingress.py +++ b/lib/charms/traefik_k8s/v2/ingress.py @@ -56,13 +56,14 @@ def _on_ingress_revoked(self, event: IngressPerAppRevokedEvent): import socket import typing from dataclasses import dataclass +from functools import partial from typing import Any, Callable, Dict, List, MutableMapping, Optional, Sequence, Tuple, Union import pydantic from ops.charm import CharmBase, RelationBrokenEvent, RelationEvent from ops.framework import EventSource, Object, ObjectEvents, StoredState from ops.model import ModelError, Relation, Unit -from pydantic import AnyHttpUrl, BaseModel, Field, validator +from pydantic import AnyHttpUrl, BaseModel, Field # The unique Charmhub library identifier, never change it LIBID = "e6de2a5cd5b34422a204668f3b8f90d2" @@ -72,7 +73,7 @@ def _on_ingress_revoked(self, event: IngressPerAppRevokedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 13 +LIBPATCH = 14 PYDEPS = ["pydantic"] @@ -84,6 +85,9 @@ def _on_ingress_revoked(self, event: IngressPerAppRevokedEvent): PYDANTIC_IS_V1 = int(pydantic.version.VERSION.split(".")[0]) < 2 if PYDANTIC_IS_V1: + from pydantic import validator + + input_validator = partial(validator, pre=True) class DatabagModel(BaseModel): # type: ignore """Base databag model.""" @@ -143,7 +147,9 @@ def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): return databag else: - from pydantic import ConfigDict + from pydantic import ConfigDict, field_validator + + input_validator = partial(field_validator, mode="before") class DatabagModel(BaseModel): """Base databag model.""" @@ -171,7 +177,7 @@ def load(cls, databag: MutableMapping): k: json.loads(v) for k, v in databag.items() # Don't attempt to parse model-external values - if k in {(f.alias or n) for n, f in cls.__fields__.items()} # type: ignore + if k in {(f.alias or n) for n, f in cls.model_fields.items()} # type: ignore } except json.JSONDecodeError as e: msg = f"invalid databag contents: expecting json. {databag}" @@ -252,14 +258,14 @@ class IngressRequirerAppData(DatabagModel): default="http", description="What scheme to use in the generated ingress url" ) - @validator("scheme", pre=True) + @input_validator("scheme") def validate_scheme(cls, scheme): # noqa: N805 # pydantic wants 'cls' as first arg """Validate scheme arg.""" if scheme not in {"http", "https", "h2c"}: raise ValueError("invalid scheme: should be one of `http|https|h2c`") return scheme - @validator("port", pre=True) + @input_validator("port") def validate_port(cls, port): # noqa: N805 # pydantic wants 'cls' as first arg """Validate port.""" assert isinstance(port, int), type(port) @@ -277,13 +283,13 @@ class IngressRequirerUnitData(DatabagModel): "IP can only be None if the IP information can't be retrieved from juju.", ) - @validator("host", pre=True) + @input_validator("host") def validate_host(cls, host): # noqa: N805 # pydantic wants 'cls' as first arg """Validate host.""" assert isinstance(host, str), type(host) return host - @validator("ip", pre=True) + @input_validator("ip") def validate_ip(cls, ip): # noqa: N805 # pydantic wants 'cls' as first arg """Validate ip.""" if ip is None: @@ -462,7 +468,10 @@ def _handle_relation(self, event): event.relation, data.app.name, data.app.model, - [unit.dict() for unit in data.units], + [ + unit.dict() if PYDANTIC_IS_V1 else unit.model_dump(mode="json") + for unit in data.units + ], data.app.strip_prefix or False, data.app.redirect_https or False, ) diff --git a/pyproject.toml b/pyproject.toml index 8eb7b54..681ad68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,12 @@ extend-ignore = [ ] ignore = ["E501", "D107"] extend-exclude = ["__pycache__", "*.egg_info", "*integration/tester*"] -per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104"]} + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["D100","D101","D102","D103","D104"] +# Remove charm_tracing.py E402 when _remove_stale_otel_sdk_packages() is removed +# from the library +"lib/charms/tempo_k8s/v1/charm_tracing.py" = ["E402"] [lint.mccabe] max-complexity = 10 diff --git a/requirements.txt b/requirements.txt index 19114ee..a2b81cf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,8 +3,8 @@ importlib-metadata~=6.0.0 ops crossplane jsonschema==4.17.0 -lightkube==0.15.4 -lightkube-models==1.24.1.4 +lightkube>=0.15.4 +lightkube-models>=1.24.1.4 tenacity==8.2.3 # crossplane is a package from nginxinc to interact with the Nginx config crossplane diff --git a/src/charm.py b/src/charm.py index 0136d6c..44960f8 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,7 +17,6 @@ from charms.tempo_k8s.v1.charm_tracing import trace_charm from charms.tempo_k8s.v2.tracing import ( ReceiverProtocol, - RequestEvent, TracingEndpointProvider, TransportProtocolType, receiver_protocol_to_transport_protocol, @@ -26,7 +25,7 @@ from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator from cosl.coordinated_workers.nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH from ops import CollectStatusEvent -from ops.charm import CharmBase, RelationEvent +from ops.charm import CharmBase from ops.main import main from nginx_config import NginxConfig @@ -48,13 +47,13 @@ def __init__(self, *args): super().__init__(*args) self.ingress = TraefikRouteRequirer(self, self.model.get_relation("ingress"), "ingress") # type: ignore - # set alert_rules_path="", as we don't want to populate alert rules into the relation databag - # we only need `self._remote_write.endpoints` - self._remote_write = PrometheusRemoteWriteConsumer(self, alert_rules_path="") self.tempo = Tempo( requested_receivers=self._requested_receivers, - retention_period_hours=self.trace_retention_period_hours, + retention_period_hours=self._trace_retention_period_hours, ) + # set alert_rules_path="", as we don't want to populate alert rules into the relation databag + # we only need `self._remote_write.endpoints` + self._remote_write = PrometheusRemoteWriteConsumer(self, alert_rules_path="") # set the open ports for this unit self.unit.set_ports(*self.tempo.all_ports.values()) @@ -99,34 +98,20 @@ def __init__(self, *args): # refuse to handle any other event as we can't possibly know what to do. if not self.coordinator.can_handle_events: - # logging will be handled by `self.coordinator` for each of the above circumstances. + # logging is handled by the Coordinator object return - # lifecycle - self.framework.observe(self.on.leader_elected, self._on_leader_elected) - self.framework.observe(self.on.list_receivers_action, self._on_list_receivers_action) - - # ingress - ingress = self.on["ingress"] - self.framework.observe(ingress.relation_created, self._on_ingress_relation_created) - self.framework.observe(ingress.relation_joined, self._on_ingress_relation_joined) - self.framework.observe(self.ingress.on.ready, self._on_ingress_ready) + # do this regardless of what event we are processing + self._reconcile() - # tracing - self.framework.observe(self.tracing.on.request, self._on_tracing_request) - self.framework.observe(self.tracing.on.broken, self._on_tracing_broken) + # actions + self.framework.observe(self.on.list_receivers_action, self._on_list_receivers_action) # tls self.framework.observe( self.coordinator.cert_handler.on.cert_changed, self._on_cert_handler_changed ) - # remote-write - self.framework.observe( - self._remote_write.on.endpoints_changed, # pyright: ignore - self._on_remote_write_changed, - ) - ###################### # UTILITY PROPERTIES # ###################### @@ -195,57 +180,18 @@ def enabled_receivers(self) -> Set[str]: ################## # EVENT HANDLERS # ################## - def _on_tracing_broken(self, _): - """Update tracing relations' databags once one relation is removed.""" - self._update_tracing_relations() - - def _on_cert_handler_changed(self, e: ops.RelationChangedEvent): - - # tls readiness change means config change. - # sync scheme change with traefik and related consumers - self._configure_ingress() + def _on_cert_handler_changed(self, _: ops.RelationChangedEvent): # sync the server CA cert with the charm container. # technically, because of charm tracing, this will be called first thing on each event self._update_server_ca_cert() - # update relations to reflect the new certificate - self._update_tracing_relations() - - def _on_tracing_request(self, e: RequestEvent): - """Handle a remote requesting a tracing endpoint.""" - logger.debug(f"received tracing request from {e.relation.app}: {e.requested_receivers}") - self._update_tracing_relations() - - def _on_ingress_relation_created(self, _: RelationEvent): - self._configure_ingress() - - def _on_ingress_relation_joined(self, _: RelationEvent): - self._configure_ingress() - - def _on_leader_elected(self, _: ops.LeaderElectedEvent): - # as traefik_route goes through app data, we need to take lead of traefik_route if our leader dies. - self._configure_ingress() - - def _on_ingress_ready(self, _event): - # whenever there's a change in ingress, we need to update all tracing relations - self._update_tracing_relations() - - def _on_ingress_revoked(self, _event): - # whenever there's a change in ingress, we need to update all tracing relations - self._update_tracing_relations() - def _on_list_receivers_action(self, event: ops.ActionEvent): res = {} for receiver in self._requested_receivers(): res[receiver.replace("_", "-")] = self.get_receiver_url(receiver) event.set_results(res) - def _on_remote_write_changed(self, _event): - """Event handler for the remote write changed event.""" - # notify the cluster - self.coordinator.update_cluster() - def _on_collect_status(self, e: CollectStatusEvent): # add Tempo coordinator-specific statuses if ( @@ -261,8 +207,8 @@ def _on_collect_status(self, e: CollectStatusEvent): ################### # UTILITY METHODS # ################### - def _configure_ingress(self) -> None: - """Make sure the traefik route and tracing relation data are up-to-date.""" + def _update_ingress_relation(self) -> None: + """Make sure the traefik route is up-to-date.""" if not self.unit.is_leader(): return @@ -270,11 +216,6 @@ def _configure_ingress(self) -> None: self.ingress.submit_to_traefik( self._ingress_config, static=self._static_ingress_config ) - if self.ingress.external_host: - self._update_tracing_relations() - - # notify the cluster - self.coordinator.update_cluster() def _update_tracing_relations(self) -> None: tracing_relations = self.model.relations["tracing"] @@ -291,8 +232,6 @@ def _update_tracing_relations(self) -> None: [(p, self.get_receiver_url(p)) for p in requested_receivers] ) - self.coordinator.update_cluster() - def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]: """List what receivers we should activate, based on the active tracing relations and config-enabled extra receivers.""" # we start with the sum of the requested endpoints from the requirers @@ -305,14 +244,14 @@ def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]: return tuple(requested_receivers) @property - def trace_retention_period_hours(self) -> int: + def _trace_retention_period_hours(self) -> int: """Trace retention period for the compactor.""" - # if unset, default to 30 days - trace_retention_period_hours = cast(int, self.config.get("retention_period_hours", 720)) - return trace_retention_period_hours + # if unset, defaults to 30 days + return cast(int, self.config["retention-period"]) def server_ca_cert(self) -> str: """For charm tracing.""" + # Fixme: we do this once too many times if we're handling cert_handler.changed self._update_server_ca_cert() return self.tempo.tls_ca_path @@ -438,6 +377,15 @@ def remote_write_endpoints(self): """Return remote-write endpoints.""" return self._remote_write.endpoints + def _reconcile(self): + # This method contains unconditional update logic, i.e. logic that should be executed + # regardless of the event we are processing. + # reason is, if we miss these events because our coordinator cannot process events (inconsistent status), + # we need to 'remember' to run this logic as soon as we become ready, which is hard and error-prone + self._update_ingress_relation() + self._update_tracing_relations() + self.coordinator.update_cluster() + if __name__ == "__main__": # pragma: nocover main(TempoCoordinatorCharm) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 151a6d2..97f60a5 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -299,13 +299,13 @@ async def deploy_cluster(ops_test: OpsTest, tempo_app=APP_NAME): await ops_test.model.integrate(tempo_app + ":tempo-cluster", WORKER_NAME + ":tempo-cluster") await deploy_and_configure_minio(ops_test) - - await ops_test.model.wait_for_idle( - apps=[tempo_app, WORKER_NAME, S3_INTEGRATOR], - status="active", - timeout=1000, - idle_period=30, - ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[tempo_app, WORKER_NAME, S3_INTEGRATOR], + status="active", + timeout=1000, + idle_period=30, + ) def get_traces(tempo_host: str, service_name="tracegen-otlp_http", tls=True): diff --git a/tests/integration/test_ingressed_tls.py b/tests/integration/test_ingressed_tls.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 2125063..fe107bc 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -79,11 +79,12 @@ async def test_relate(ops_test: OpsTest): # then relation should appear await ops_test.model.add_relation(APP_NAME + ":tracing", TESTER_APP_NAME + ":tracing") await ops_test.model.add_relation(APP_NAME + ":tracing", TESTER_GRPC_APP_NAME + ":tracing") - await ops_test.model.wait_for_idle( - apps=[APP_NAME, WORKER_NAME, TESTER_APP_NAME, TESTER_GRPC_APP_NAME], - status="active", - timeout=1000, - ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[APP_NAME, WORKER_NAME, TESTER_APP_NAME, TESTER_GRPC_APP_NAME], + status="active", + timeout=1000, + ) async def test_verify_traces_http(ops_test: OpsTest): diff --git a/tests/integration/test_self_tracing_remote.py b/tests/integration/test_self_tracing_remote.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scenario/test_tempo_clustered.py b/tests/scenario/test_tempo_clustered.py index 62a3fa2..ee6b1df 100644 --- a/tests/scenario/test_tempo_clustered.py +++ b/tests/scenario/test_tempo_clustered.py @@ -55,7 +55,7 @@ def all_worker_with_initial_config(all_worker: Relation, coordinator_with_initia @pytest.fixture def certs_relation(): - return scenario.Relation("certificates") + return scenario.Relation("certificates", remote_app_data={}) MOCK_SERVER_CERT = "SERVER_CERT-foo"