Skip to content

Commit

Permalink
Test coordinator cleanup (#61)
Browse files Browse the repository at this point in the history
* Catch SecretNotFoundError when privkey shouldn't be here (#48)

* Catch SecretNotFoundError when privkey shouldn't be here

* Bring back if self.tls_available check from pre-refactor times

* lint

* Bump the version that was forgotten in #48 (#50)

* Add issues integration action (#52)

* Add issues integration action

* Order imports differently as lint started to complain

* Update databag model dump return value (#53)

* dump returns databag

* tests

* root ca cert patch

* fixed static checks

* fix

* removed conftest

* added health check logic to worker (#55)

* added health check logic to worker

* adapted status check to be less tempo-specific

* use regular paths for codespell too

* rerun black

* vbump

* added worker error on timeout if restart fails (#56)

* added worker error on timeout if restart fails

* maintenance status throughout retries

* fixed static

* only restart own services

* added tls support for worker checks (#59)

* added tls support for worker checks

* lint

* static fix

* Test coordinator (#60)

* Added the following:
1) Coordinator pytest fixtures
2) Coordinator unit tests
3) Rrefactoring of roles_config in coordinator.py
  * ClusterRolesConfig was switched to a dataclass with __post_init__ and is_coherent_with methods
4) Created test_roles_config.py tests

* Chore: Fix leftover comments and minor code changes

* * Updates
1) Fmt
2) Merged 'main:coordinator.py' into test-coordinator-cleanup to fix Secrets error

* Added docstrings to ClusterRoleConfig

---------

Co-authored-by: Mateusz Kulewicz <mateusz.kulewicz@canonical.com>
Co-authored-by: PietroPasotti <starfire.daemon@gmail.com>
  • Loading branch information
3 people authored Aug 26, 2024
1 parent a27cf0f commit de543cd
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 85 deletions.
143 changes: 104 additions & 39 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,31 @@
import re
import shutil
import socket
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Protocol, Set, TypedDict
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Set,
TypedDict,
)
from urllib.parse import urlparse

import ops
import yaml

import cosl
from cosl.coordinated_workers.interface import ClusterProvider
from cosl.coordinated_workers.nginx import Nginx, NginxMappingOverrides, NginxPrometheusExporter
from cosl.coordinated_workers.nginx import (
Nginx,
NginxMappingOverrides,
NginxPrometheusExporter,
)
from cosl.helpers import check_libs_installed

check_libs_installed(
Expand Down Expand Up @@ -52,23 +67,47 @@ class S3NotFoundError(Exception):
"""Raised when the s3 integration is not present or not ready."""


class ClusterRolesConfig(Protocol):
class ClusterRolesConfigError(Exception):
"""Raised when the ClusterRolesConfig instance is not properly configured."""


@dataclass
class ClusterRolesConfig:
"""Worker roles and deployment requirements."""

roles: Iterable[str]
"""The union of enabled roles for the application."""
meta_roles: Mapping[str, Iterable[str]]
"""Meta roles are composed of non-meta roles (default: all)."""
minimal_deployment: Iterable[str]
"""The minimal set of roles that need to be allocated for the deployment to be considered consistent."""
recommended_deployment: Dict[str, int]
"""The set of roles that need to be allocated for the deployment to be considered robust according to the official recommendations/guidelines.."""

def __post_init__(self):
"""Ensure the various role specifications are consistent with one another."""
are_meta_keys_valid = set(self.meta_roles.keys()).issubset(self.roles)
are_meta_values_valid = all(
set(meta_value).issubset(self.roles)
for meta_value in self.meta_roles.values()
)
is_minimal_valid = set(self.minimal_deployment).issubset(self.roles)
is_recommended_valid = set(self.recommended_deployment).issubset(self.roles)
if not all(
[
are_meta_keys_valid,
are_meta_values_valid,
is_minimal_valid,
is_recommended_valid,
]
):
raise ClusterRolesConfigError(
"Invalid ClusterRolesConfig: The configuration is not coherent."
)


def validate_roles_config(roles_config: ClusterRolesConfig) -> None:
"""Assert that all the used roles have been defined."""
roles = set(roles_config.roles)
assert set(roles_config.meta_roles.keys()).issubset(roles)
for role_set in roles_config.meta_roles.values():
assert set(role_set).issubset(roles)
assert set(roles_config.minimal_deployment).issubset(roles)
assert set(roles_config.recommended_deployment.keys()).issubset(roles)
def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool:
"""Returns True if the provided roles satisfy the minimal deployment spec; False otherwise."""
return set(self.minimal_deployment).issubset(set(cluster_roles))


_EndpointMapping = TypedDict(
Expand Down Expand Up @@ -105,8 +144,12 @@ def __init__(
nginx_config: Callable[["Coordinator"], str],
workers_config: Callable[["Coordinator"], str],
nginx_options: Optional[NginxMappingOverrides] = None,
is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_coherent: Optional[
Callable[[ClusterProvider, ClusterRolesConfig], bool]
] = None,
is_recommended: Optional[
Callable[[ClusterProvider, ClusterRolesConfig], bool]
] = None,
tracing_receivers: Optional[Callable[[], Optional[Dict[str, str]]]] = None,
):
"""Constructor for a Coordinator object.
Expand Down Expand Up @@ -134,7 +177,6 @@ def __init__(

self._endpoints = endpoints

validate_roles_config(roles_config)
self.roles_config = roles_config

self.cluster = ClusterProvider(
Expand All @@ -154,7 +196,9 @@ def __init__(
options=nginx_options,
)
self._workers_config_getter = partial(workers_config, self)
self.nginx_exporter = NginxPrometheusExporter(self._charm, options=nginx_options)
self.nginx_exporter = NginxPrometheusExporter(
self._charm, options=nginx_options
)

self.cert_handler = CertHandler(
self._charm,
Expand All @@ -164,13 +208,17 @@ def __init__(
sans=[self.hostname],
)

self.s3_requirer = S3Requirer(self._charm, self._endpoints["s3"], s3_bucket_name)
self.s3_requirer = S3Requirer(
self._charm, self._endpoints["s3"], s3_bucket_name
)

self._grafana_dashboards = GrafanaDashboardProvider(
self._charm, relation_name=self._endpoints["grafana-dashboards"]
)

self._logging = LokiPushApiConsumer(self._charm, relation_name=self._endpoints["logging"])
self._logging = LokiPushApiConsumer(
self._charm, relation_name=self._endpoints["logging"]
)

# Provide ability for this to be scraped by Prometheus using prometheus_scrape
refresh_events = [self._charm.on.update_status, self.cluster.on.changed]
Expand All @@ -188,11 +236,15 @@ def __init__(
)

self.tracing = TracingEndpointRequirer(
self._charm, relation_name=self._endpoints["tracing"], protocols=["otlp_http"]
self._charm,
relation_name=self._endpoints["tracing"],
protocols=["otlp_http"],
)

# We always listen to collect-status
self.framework.observe(self._charm.on.collect_unit_status, self._on_collect_unit_status)
self.framework.observe(
self._charm.on.collect_unit_status, self._on_collect_unit_status
)

# If the cluster isn't ready, refuse to handle any other event as we can't possibly know what to do
if not self.cluster.has_workers:
Expand Down Expand Up @@ -223,7 +275,9 @@ def __init__(
self.framework.observe(self._charm.on.config_changed, self._on_config_changed)

# nginx
self.framework.observe(self._charm.on.nginx_pebble_ready, self._on_nginx_pebble_ready)
self.framework.observe(
self._charm.on.nginx_pebble_ready, self._on_nginx_pebble_ready
)
self.framework.observe(
self._charm.on.nginx_prometheus_exporter_pebble_ready,
self._on_nginx_prometheus_exporter_pebble_ready,
Expand All @@ -233,22 +287,28 @@ def __init__(
self.framework.observe(
self.s3_requirer.on.credentials_changed, self._on_s3_credentials_changed
)
self.framework.observe(self.s3_requirer.on.credentials_gone, self._on_s3_credentials_gone)
self.framework.observe(
self.s3_requirer.on.credentials_gone, self._on_s3_credentials_gone
)

# tracing
# self.framework.observe(self._charm.on.peers_relation_created, self._on_peers_relation_created)
# self.framework.observe(self._charm.on.peers_relation_changed, self._on_peers_relation_changed)

# logging
self.framework.observe(
self._logging.on.loki_push_api_endpoint_joined, self._on_loki_relation_changed
self._logging.on.loki_push_api_endpoint_joined,
self._on_loki_relation_changed,
)
self.framework.observe(
self._logging.on.loki_push_api_endpoint_departed, self._on_loki_relation_changed
self._logging.on.loki_push_api_endpoint_departed,
self._on_loki_relation_changed,
)

# tls
self.framework.observe(self.cert_handler.on.cert_changed, self._on_cert_handler_changed)
self.framework.observe(
self.cert_handler.on.cert_changed, self._on_cert_handler_changed
)

# cluster
self.framework.observe(self.cluster.on.changed, self._on_cluster_changed)
Expand All @@ -263,15 +323,7 @@ def is_coherent(self) -> bool:
if manual_coherency_checker := self._is_coherent:
return manual_coherency_checker(self.cluster, self.roles_config)

rc = self.roles_config
minimal_deployment = set(rc.minimal_deployment)
cluster = self.cluster
roles = cluster.gather_roles()

# Whether the roles list makes up a coherent mimir deployment.
is_coherent = set(roles.keys()).issuperset(minimal_deployment)

return is_coherent
return self.roles_config.is_coherent_with(self.cluster.gather_roles().keys())

@property
def missing_roles(self) -> Set[str]:
Expand Down Expand Up @@ -423,7 +475,10 @@ def _workers_scrape_jobs(self) -> List[Dict[str, Any]]:
"relabel_configs": [
{"target_label": "juju_charm", "replacement": worker["charm_name"]},
{"target_label": "juju_unit", "replacement": worker["unit"]},
{"target_label": "juju_application", "replacement": worker["application"]},
{
"target_label": "juju_application",
"replacement": worker["application"],
},
{"target_label": "juju_model", "replacement": self.model.name},
{"target_label": "juju_model_uuid", "replacement": self.model.uuid},
],
Expand Down Expand Up @@ -508,7 +563,9 @@ def _on_collect_unit_status(self, e: ops.CollectStatusEvent):
# todo add [nginx.workload] statuses

if not self.cluster.has_workers:
e.add_status(ops.BlockedStatus("[consistency] Missing any worker relation."))
e.add_status(
ops.BlockedStatus("[consistency] Missing any worker relation.")
)
if not self.is_coherent:
e.add_status(ops.BlockedStatus("[consistency] Cluster inconsistent."))
if not self.s3_ready:
Expand Down Expand Up @@ -543,7 +600,9 @@ def loki_endpoints_by_unit(self) -> Dict[str, str]:
}
"""
endpoints: Dict[str, str] = {}
relations: List[ops.Relation] = self.model.relations.get(self._endpoints["logging"], [])
relations: List[ops.Relation] = self.model.relations.get(
self._endpoints["logging"], []
)

for relation in relations:
for unit in relation.units:
Expand Down Expand Up @@ -576,9 +635,15 @@ def update_cluster(self):
# all arguments below are optional:
ca_cert=self.cert_handler.ca_cert,
server_cert=self.cert_handler.server_cert,
privkey_secret_id=self.cluster.grant_privkey(VAULT_SECRET_LABEL),
# FIXME tls_available check is due to fetching secret from vault. We should be generating a new secret.
# see https://github.com/canonical/cos-lib/issues/49 for full context
privkey_secret_id=(
self.cluster.grant_privkey(VAULT_SECRET_LABEL) if self.tls_available else None
),
tracing_receivers=(
self._tracing_receivers_getter() if self._tracing_receivers_getter else None
self._tracing_receivers_getter()
if self._tracing_receivers_getter
else None
),
)

Expand Down
17 changes: 8 additions & 9 deletions src/cosl/coordinated_workers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,27 @@ def load(cls, databag: _RawDatabag):
log.debug(msg, exc_info=True)
raise DataValidationError(msg) from e

def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> Dict[str, str]:
def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag:
"""Write the contents of this model to Juju databag.
:param databag: the databag to write the data to.
:param clear: ensure the databag is cleared before writing it.
"""
if clear and databag:
databag.clear()
_databag: _RawDatabag = {} if databag is None else databag

if clear:
_databag.clear()

if databag is None:
databag = {}
if nest_under := self.model_config.get("_NEST_UNDER"):
databag[nest_under] = self.model_dump_json( # type: ignore
_databag[nest_under] = self.model_dump_json( # type: ignore
by_alias=True,
# skip keys whose values are default
exclude_defaults=True,
)

dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore
raw = {k: json.dumps(v) for k, v in dct.items()}
databag.update(raw)
return raw
_databag.update({k: json.dumps(v) for k, v in dct.items()})
return _databag


# =============
Expand Down
Loading

0 comments on commit de543cd

Please sign in to comment.