Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set resource limits and requests for HA #62

Merged
merged 11 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.23"
version = "0.0.25"
authors = [
{ name="sed-i", email="82407168+sed-i@users.noreply.github.com" },
]
Expand All @@ -16,7 +16,8 @@ dependencies = [
"pydantic",
"tenacity",
"PyYAML",
"typing-extensions"
"typing-extensions",
"lightkube>=v0.15.4"
]
classifiers = [
"Programming Language :: Python :: 3.8",
Expand Down
74 changes: 74 additions & 0 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,20 @@
"charms.prometheus_k8s.v0.prometheus_scrape",
"charms.loki_k8s.v1.loki_push_api",
"charms.tempo_k8s.v2.tracing",
"charms.observability_libs.v0.kubernetes_compute_resources_patch",
)

from charms.data_platform_libs.v0.s3 import S3Requirer
from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider
from charms.loki_k8s.v1.loki_push_api import LokiPushApiConsumer
from charms.observability_libs.v0.kubernetes_compute_resources_patch import (
KubernetesComputeResourcesPatch,
adjust_resource_requirements,
)
from charms.observability_libs.v1.cert_handler import VAULT_SECRET_LABEL, CertHandler
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer
from lightkube.models.core_v1 import ResourceRequirements

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,6 +115,17 @@ def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool:
return set(self.minimal_deployment).issubset(set(cluster_roles))


def _validate_container_name(
container_name: Optional[str],
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]],
):
"""Raise `ValueError` if `resources_requests` is not None and `container_name` is None."""
if resources_requests is not None and container_name is None:
raise ValueError(
"Cannot have a None value for container_name while resources_requests is provided."
)


_EndpointMapping = TypedDict(
"_EndpointMapping",
{
Expand All @@ -124,6 +141,15 @@ def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool:
)
"""Mapping of the relation endpoint names that the charms uses, as defined in metadata.yaml."""

_ResourceLimitOptionsMapping = TypedDict(
"_ResourceLimitOptionsMapping",
{
"cpu_limit": str,
"memory_limit": str,
},
)
"""Mapping of the resources limit option names that the charms use, as defined in config.yaml."""


class Coordinator(ops.Object):
"""Charming coordinator.
Expand All @@ -146,6 +172,9 @@ def __init__(
is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
tracing_receivers: Optional[Callable[[], Optional[Dict[str, str]]]] = None,
resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None,
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None,
container_name: Optional[str] = None,
):
"""Constructor for a Coordinator object.

Expand All @@ -163,6 +192,18 @@ def __init__(
is_coherent: Custom coherency checker for a minimal deployment.
is_recommended: Custom coherency checker for a recommended deployment.
tracing_receivers: Endpoints to which the workload (and the worker charm) can push traces to.
resources_limit_options: A dictionary containing resources limit option names. The dictionary should include
"cpu_limit" and "memory_limit" keys with values as option names, as defined in the config.yaml.
If no dictionary is provided, the default option names "cpu_limit" and "memory_limit" would be used.
resources_requests: A function generating the resources "requests" portion to apply when patching a container using
KubernetesComputeResourcesPatch. The "limits" portion of the patch gets populated by setting
their respective config options in config.yaml.
container_name: The container for which to apply the resources requests & limits.
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
Required if `resources_requests` is provided.

Raises:
ValueError:
If `resources_requests` is not None and `container_name` is None, a ValueError is raised.
"""
super().__init__(charm, key="coordinator")
self._charm = charm
Expand All @@ -172,6 +213,7 @@ def __init__(

self._endpoints = endpoints

_validate_container_name(container_name, resources_requests)
self.roles_config = roles_config

self.cluster = ClusterProvider(
Expand All @@ -184,6 +226,11 @@ def __init__(
self._is_coherent = is_coherent
self._is_recommended = is_recommended
self._tracing_receivers_getter = tracing_receivers
self._resources_requests_getter = (
partial(resources_requests, self) if resources_requests is not None else None
)
self._container_name = container_name
self._resources_limit_options = resources_limit_options or {}

self.nginx = Nginx(
self._charm,
Expand Down Expand Up @@ -230,6 +277,17 @@ def __init__(
protocols=["otlp_http"],
)

# Resources patch
self.resources_patch = (
KubernetesComputeResourcesPatch(
self._charm,
self._container_name, # type: ignore
resource_reqs_func=self._adjust_resource_requirements,
)
if self._resources_requests_getter
else None
)

# We always listen to collect-status
self.framework.observe(self._charm.on.collect_unit_status, self._on_collect_unit_status)

Expand Down Expand Up @@ -555,6 +613,9 @@ def _on_collect_unit_status(self, e: ops.CollectStatusEvent):
else:
e.add_status(ops.ActiveStatus())

if self.resources_patch:
e.add_status(self.resources_patch.get_status())

###################
# UTILITY METHODS #
###################
Expand Down Expand Up @@ -663,3 +724,16 @@ def _render_alert_rules(self):
os.makedirs(CONSOLIDATED_ALERT_RULES_PATH, exist_ok=True)
self._render_workers_alert_rules()
self._consolidate_nginx_alert_rules()

def _adjust_resource_requirements(self) -> ResourceRequirements:
"""A method that gets called by `KubernetesComputeResourcesPatch` to adjust the resources requests and limits to patch."""
cpu_limit_key = self._resources_limit_options.get("cpu_limit", "cpu_limit")
memory_limit_key = self._resources_limit_options.get("memory_limit", "memory_limit")

limits = {
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
"cpu": self._charm.model.config.get(cpu_limit_key),
"memory": self._charm.model.config.get(memory_limit_key),
}
return adjust_resource_requirements(
limits, self._resources_requests_getter(), adhere_to_requests=True # type: ignore
)
78 changes: 77 additions & 1 deletion src/cosl/coordinated_workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@

check_libs_installed(
"charms.loki_k8s.v1.loki_push_api",
"charms.observability_libs.v0.kubernetes_compute_resources_patch",
)

from charms.loki_k8s.v1.loki_push_api import _PebbleLogClient # type: ignore
from charms.observability_libs.v0.kubernetes_compute_resources_patch import (
KubernetesComputeResourcesPatch,
adjust_resource_requirements,
)
from lightkube.models.core_v1 import ResourceRequirements

BASE_DIR = "/worker"
CONFIG_FILE = "/etc/worker/config.yaml"
Expand All @@ -37,9 +43,31 @@

logger = logging.getLogger(__name__)


def _validate_container_name(
container_name: Optional[str],
resources_requests: Optional[Callable[["Worker"], Dict[str, str]]],
):
"""Raise `ValueError` if `resources_requests` is not None and `container_name` is None."""
if resources_requests is not None and container_name is None:
raise ValueError(
"Cannot have a None value for container_name while resources_requests is provided."
)


_EndpointMapping = TypedDict("_EndpointMapping", {"cluster": str}, total=True)
"""Mapping of the relation endpoint names that the charms uses, as defined in metadata.yaml."""

_ResourceLimitOptionsMapping = TypedDict(
"_ResourceLimitOptionsMapping",
{
"cpu_limit": str,
"memory_limit": str,
},
)
"""Mapping of the resources limit option names that the charms use, as defined in config.yaml."""


ROOT_CA_CERT = Path("/usr/local/share/ca-certificates/ca.crt")


Expand Down Expand Up @@ -69,6 +97,9 @@ def __init__(
pebble_layer: Callable[["Worker"], Layer],
endpoints: _EndpointMapping,
readiness_check_endpoint: Optional[Union[str, Callable[["Worker"], str]]] = None,
resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None,
resources_requests: Optional[Callable[["Worker"], Dict[str, str]]] = None,
container_name: Optional[str] = None,
):
"""Constructor for a Worker object.

Expand All @@ -79,6 +110,18 @@ def __init__(
endpoints: Endpoint names for coordinator relations, as defined in metadata.yaml.
readiness_check_endpoint: URL to probe with a pebble check to determine
whether the worker node is ready. Passing None will effectively disable it.
resources_limit_options: A dictionary containing resources limit option names. The dictionary should include
"cpu_limit" and "memory_limit" keys with values as option names, as defined in the config.yaml.
If no dictionary is provided, the default option names "cpu_limit" and "memory_limit" would be used.
resources_requests: A function generating the resources "requests" portion to apply when patching a container using
KubernetesComputeResourcesPatch. The "limits" portion of the patch gets populated by setting
their respective config options in config.yaml.
container_name: The container for which to apply the resources requests & limits.
Required if `resources_requests` is provided.

Raises:
ValueError:
If `resources_requests` is not None and `container_name` is None, a ValueError is raised.
"""
super().__init__(charm, key="worker")
self._charm = charm
Expand All @@ -88,12 +131,19 @@ def __init__(
self._container = self._charm.unit.get_container(name)

self._endpoints = endpoints
_validate_container_name(container_name, resources_requests)

# turn str to Callable[[Worker], str]
self._readiness_check_endpoint: Optional[Callable[[Worker], str]]
if isinstance(readiness_check_endpoint, str):
self._readiness_check_endpoint = lambda _: readiness_check_endpoint
else:
self._readiness_check_endpoint = readiness_check_endpoint
self._resources_requests_getter = (
partial(resources_requests, self) if resources_requests is not None else None
)
self._container_name = container_name
self._resources_limit_options = resources_limit_options or {}

self.cluster = ClusterRequirer(
charm=self._charm,
Expand All @@ -110,6 +160,17 @@ def __init__(
],
)

# Resources patch
self.resources_patch = (
KubernetesComputeResourcesPatch(
self._charm,
self._container_name, # type: ignore
resource_reqs_func=self._adjust_resource_requirements,
)
if self._resources_requests_getter
else None
)

# Event listeners
self.framework.observe(self._charm.on.config_changed, self._on_config_changed)
self.framework.observe(self._charm.on.upgrade_charm, self._on_upgrade_charm)
Expand Down Expand Up @@ -256,6 +317,9 @@ def _on_collect_status(self, e: ops.CollectStatusEvent):
)
)

if self.resources_patch:
e.add_status(self.resources_patch.get_status())

# Utility functions
@property
def roles(self) -> List[str]:
Expand Down Expand Up @@ -330,7 +394,6 @@ def diff(layer: Layer, plan: Plan):
logger.debug("Adding new layer to pebble...")
self._container.add_layer(self._name, new_layer, combine=True)
return True

return False

def _add_readiness_check(self, new_layer: Layer):
Expand Down Expand Up @@ -543,6 +606,19 @@ def charm_tracing_config(self) -> Tuple[Optional[str], Optional[str]]:
else:
return endpoint, None

def _adjust_resource_requirements(self) -> ResourceRequirements:
"""A method that gets called by `KubernetesComputeResourcesPatch` to adjust the resources requests and limits to patch."""
cpu_limit_key = self._resources_limit_options.get("cpu_limit", "cpu_limit")
memory_limit_key = self._resources_limit_options.get("memory_limit", "memory_limit")

limits = {
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
"cpu": self._charm.model.config.get(cpu_limit_key),
"memory": self._charm.model.config.get(memory_limit_key),
}
return adjust_resource_requirements(
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
limits, self._resources_requests_getter(), adhere_to_requests=True # type: ignore
)


class ManualLogForwarder(ops.Object):
"""Forward the standard outputs of all workloads to explictly-provided Loki endpoints."""
Expand Down
Loading
Loading