Skip to content

Commit

Permalink
patch resources
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldmitry committed Aug 26, 2024
1 parent afcfbec commit b9b1e90
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 8 deletions.
31 changes: 31 additions & 0 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
"charms.prometheus_k8s.v0.prometheus_scrape",
"charms.loki_k8s.v1.loki_push_api",
"charms.tempo_k8s.v2.tracing",
"charms.observability_libs.v0.kubernetes_compute_resources_patch",
)

from charms.data_platform_libs.v0.s3 import S3Requirer
from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider
from charms.loki_k8s.v1.loki_push_api import LokiPushApiConsumer
from charms.observability_libs.v0.kubernetes_compute_resources_patch import (
KubernetesComputeResourcesPatch,
adjust_resource_requirements,
)
from charms.observability_libs.v1.cert_handler import VAULT_SECRET_LABEL, CertHandler
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer
Expand Down Expand Up @@ -108,6 +113,8 @@ def __init__(
is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
tracing_receivers: Optional[Callable[[], Optional[Dict[str, str]]]] = None,
resources_requests: Optional[Dict[str, str]] = None,
container_name: Optional[str] = None,
):
"""Constructor for a Coordinator object.
Expand All @@ -125,6 +132,8 @@ def __init__(
is_coherent: Custom coherency checker for a minimal deployment.
is_recommended: Custom coherency checker for a recommended deployment.
tracing_receivers: Endpoints to which the workload (and the worker charm) can push traces to.
resources_requests: The resources request dictionary to apply when patching a container using KubernetesComputeResourcesPatch.
container_name: The container for which to apply the resources requests & limits.
"""
super().__init__(charm, key="coordinator")
self._charm = charm
Expand All @@ -147,6 +156,8 @@ def __init__(
self._is_coherent = is_coherent
self._is_recommended = is_recommended
self._tracing_receivers_getter = tracing_receivers
self._resources_request = resources_requests
self._container_name = container_name

self.nginx = Nginx(
self._charm,
Expand Down Expand Up @@ -191,6 +202,14 @@ def __init__(
self._charm, relation_name=self._endpoints["tracing"], protocols=["otlp_http"]
)

# Resources patch
if self._resources_request and self._container_name:
self.resources_patch = KubernetesComputeResourcesPatch(
self._charm,
self._container_name,
resource_reqs_func=self._resource_reqs_from_config,
)

# We always listen to collect-status
self.framework.observe(self._charm.on.collect_unit_status, self._on_collect_unit_status)

Expand Down Expand Up @@ -519,6 +538,9 @@ def _on_collect_unit_status(self, e: ops.CollectStatusEvent):
else:
e.add_status(ops.ActiveStatus())

if hasattr(self, "resources_patch") and self.resources_patch:
e.add_status(self.resources_patch.get_status())

###################
# UTILITY METHODS #
###################
Expand Down Expand Up @@ -627,3 +649,12 @@ def _render_alert_rules(self):
os.makedirs(CONSOLIDATED_ALERT_RULES_PATH, exist_ok=True)
self._render_workers_alert_rules()
self._consolidate_nginx_alert_rules()

def _resource_reqs_from_config(self):
limits = {
"cpu": self._charm.model.config.get("cpu"),
"memory": self._charm.model.config.get("memory"),
}
return adjust_resource_requirements(
limits, self._resources_request, adhere_to_requests=True
)
33 changes: 32 additions & 1 deletion src/cosl/coordinated_workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@

check_libs_installed(
"charms.loki_k8s.v1.loki_push_api",
"charms.observability_libs.v0.kubernetes_compute_resources_patch",
)

from charms.loki_k8s.v1.loki_push_api import _PebbleLogClient # type: ignore
from charms.observability_libs.v0.kubernetes_compute_resources_patch import (
KubernetesComputeResourcesPatch,
adjust_resource_requirements,
)

BASE_DIR = "/worker"
CONFIG_FILE = "/etc/worker/config.yaml"
Expand Down Expand Up @@ -69,6 +74,8 @@ def __init__(
pebble_layer: Callable[["Worker"], Layer],
endpoints: _EndpointMapping,
readiness_check_endpoint: Optional[Union[str, Callable[["Worker"], str]]] = None,
resources_requests: Optional[Dict[str, str]] = None,
container_name: Optional[str] = None,
):
"""Constructor for a Worker object.
Expand All @@ -79,6 +86,8 @@ def __init__(
endpoints: Endpoint names for coordinator relations, as defined in metadata.yaml.
readiness_check_endpoint: URL to probe with a pebble check to determine
whether the worker node is ready. Passing None will effectively disable it.
resources_requests: The resources request dictionary to apply when patching a container using KubernetesComputeResourcesPatch.
container_name: The container for which to apply the resources requests & limits.
"""
super().__init__(charm, key="worker")
self._charm = charm
Expand All @@ -94,6 +103,8 @@ def __init__(
self._readiness_check_endpoint = lambda _: readiness_check_endpoint
else:
self._readiness_check_endpoint = readiness_check_endpoint
self._resources_request = resources_requests
self._container_name = container_name

self.cluster = ClusterRequirer(
charm=self._charm,
Expand All @@ -110,6 +121,15 @@ def __init__(
],
)

# Resources patch
if self._resources_request and self._container_name:
# By default, apply resource limits to the workload container
self.resources_patch = KubernetesComputeResourcesPatch(
self._charm,
self._container_name,
resource_reqs_func=self._resource_reqs_from_config,
)

# Event listeners
self.framework.observe(self._charm.on.config_changed, self._on_config_changed)
self.framework.observe(self._charm.on.upgrade_charm, self._on_upgrade_charm)
Expand Down Expand Up @@ -256,6 +276,9 @@ def _on_collect_status(self, e: ops.CollectStatusEvent):
)
)

if hasattr(self, "resources_patch") and self.resources_patch:
e.add_status(self.resources_patch.get_status())

# Utility functions
@property
def roles(self) -> List[str]:
Expand Down Expand Up @@ -330,7 +353,6 @@ def diff(layer: Layer, plan: Plan):
logger.debug("Adding new layer to pebble...")
self._container.add_layer(self._name, new_layer, combine=True)
return True

return False

def _add_readiness_check(self, new_layer: Layer):
Expand Down Expand Up @@ -543,6 +565,15 @@ def charm_tracing_config(self) -> Tuple[Optional[str], Optional[str]]:
else:
return endpoint, None

def _resource_reqs_from_config(self):
limits = {
"cpu": self._charm.model.config.get("cpu"),
"memory": self._charm.model.config.get("memory"),
}
return adjust_resource_requirements(
limits, self._resources_request, adhere_to_requests=True
)


class ManualLogForwarder(ops.Object):
"""Forward the standard outputs of all workloads to explictly-provided Loki endpoints."""
Expand Down
188 changes: 188 additions & 0 deletions tests/test_coordinated_workers/test_coordinator_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from unittest.mock import MagicMock, PropertyMock, patch

import httpx
import pytest
import tenacity
from lightkube import ApiError
from ops import ActiveStatus, BlockedStatus, CharmBase, Framework, WaitingStatus
from scenario import Container, Context, Relation, State

from cosl.coordinated_workers.coordinator import Coordinator
from cosl.coordinated_workers.interface import ClusterProviderAppData, ClusterRequirerAppData


class MyRoles:
roles = {"role"}
meta_roles = {}
minimal_deployment = {"role": 1}
recommended_deployment = {"role": 2}


class MyCoordCharm(CharmBase):

def __init__(self, framework: Framework):
super().__init__(framework)
with patch(
"cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch._namespace",
"test-namespace",
):
self.coordinator = Coordinator(
charm=self,
roles_config=MyRoles(),
s3_bucket_name="coordinator",
external_url="localhost:3200",
worker_metrics_port="8080",
endpoints={
"cluster": "cluster",
"s3": "s3",
"certificates": "certificates",
"grafana-dashboards": "grafana-dashboard",
"logging": "logging",
"metrics": "metrics-endpoint",
"tracing": "self-tracing",
},
nginx_config=lambda _: "nginx config",
workers_config=lambda _: "worker config",
resources_requests={"cpu": "50m", "memory": "100Mi"},
container_name="charm",
)


@pytest.fixture
def ctx():
return Context(
MyCoordCharm,
meta={
"name": "lilith",
"requires": {
"s3": {"interface": "s3"},
"logging": {"interface": "loki_push_api"},
"certificates": {"interface": "tls-certificates"},
"self-tracing": {"interface": "tracing"},
},
"provides": {
"cluster": {"interface": "cluster"},
"grafana-dashboard": {"interface": "grafana_dashboard"},
"metrics-endpoint": {"interface": "prometheus_scrape"},
},
"containers": {
"nginx": {"type": "oci-image"},
"nginx-prometheus-exporter": {"type": "oci-image"},
},
},
)


@pytest.fixture()
def s3():
return Relation(
"s3",
remote_app_data={
"access-key": "key",
"bucket": "tempo",
"endpoint": "http://1.2.3.4:9000",
"secret-key": "soverysecret",
},
local_unit_data={"bucket": "tempo"},
)


@pytest.fixture()
def worker():
app_data = {}
ClusterProviderAppData(worker_config="some: yaml").dump(app_data)
remote_app_data = {}
ClusterRequirerAppData(role="role").dump(remote_app_data)
return Relation("cluster", local_app_data=app_data, remote_app_data=remote_app_data)


@pytest.fixture()
def base_state(s3, worker):

return State(
leader=True,
containers=[Container("nginx"), Container("nginx-prometheus-exporter")],
relations=[worker, s3],
)


@patch(
"charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply",
MagicMock(return_value=None),
)
def test_status_check_no_workers(ctx, base_state, s3, caplog):
# GIVEN the container cannot connect
state = base_state.with_can_connect("nginx", True)
state = state.replace(relations=[s3])

# WHEN we run any event
state_out = ctx.run("config_changed", state)

# THEN the charm sets blocked
assert state_out.unit_status == BlockedStatus("[consistency] Missing any worker relation.")


@patch(
"charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply",
MagicMock(return_value=None),
)
def test_status_check_no_s3(ctx, base_state, worker, caplog):
# GIVEN the container cannot connect
state = base_state.with_can_connect("nginx", True)
state = state.replace(relations=[worker])

# WHEN we run any event
state_out = ctx.run("config_changed", state)

# THEN the charm sets blocked
assert state_out.unit_status == BlockedStatus("[consistency] Missing S3 integration.")


@patch(
"charms.observability_libs.v0.kubernetes_compute_resources_patch.KubernetesComputeResourcesPatch.get_status",
MagicMock(return_value=(BlockedStatus(""))),
)
def test_status_check_k8s_patch_failed(ctx, base_state, caplog):
# GIVEN the container can connect
state = base_state.with_can_connect("nginx", True)
state = base_state.with_can_connect("nginx-prometheus-exporter", True)

# WHEN we run any event
state_out = ctx.run("update_status", state)

assert state_out.unit_status == BlockedStatus("")


@patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher")
@patch(
"cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.PATCH_RETRY_STOP",
PropertyMock(return_value=tenacity.wait_fixed(1)),
)
def test_status_check_k8s_patch_success_after_retries(
resource_patcher_mock, ctx, base_state, caplog
):
# GIVEN the container can connect
state = base_state.with_can_connect("nginx", True)
state = base_state.with_can_connect("nginx-prometheus-exporter", True)

# Retry on that error
response = httpx.Response(
status_code=404, content='{"status": {"code": 404, "message": "Not Found"},"code":"404"}'
)
# Success on 2nd try
resource_patcher_mock.return_value.apply.side_effect = [ApiError(response=response), None]

# on collect-unit-status, the request patches are not yet reflected
with patch(
"cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.get_status",
MagicMock(return_value=WaitingStatus("waiting")),
):
state_intermediate = ctx.run("config_changed", state)
assert state_intermediate.unit_status == WaitingStatus("waiting")

with patch(
"cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.get_status",
MagicMock(return_value=ActiveStatus("")),
):
state_out = ctx.run("update_status", state_intermediate)
assert state_out.unit_status == ActiveStatus("[coordinator] Degraded.")
Loading

0 comments on commit b9b1e90

Please sign in to comment.