Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test coordinator #60

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import re
import shutil
import socket
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Protocol, Set, TypedDict
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Set, TypedDict
from urllib.parse import urlparse

import ops
Expand Down Expand Up @@ -52,23 +53,37 @@ class S3NotFoundError(Exception):
"""Raised when the s3 integration is not present or not ready."""


class ClusterRolesConfig(Protocol):
class ClusterRolesConfigError(Exception):
"""Raised when the ClusterRolesConfig instance is not properly configured."""


@dataclass
class ClusterRolesConfig:
"""Worker roles and deployment requirements."""

roles: Iterable[str]
meta_roles: Mapping[str, Iterable[str]]
minimal_deployment: Iterable[str]
recommended_deployment: Dict[str, int]
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self):
"""Ensure the roles_config makes up a coherent worker deployment."""
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
are_meta_keys_valid = set(self.meta_roles.keys()).issubset(self.roles)
are_meta_values_valid = all(
set(meta_value).issubset(self.roles) for meta_value in self.meta_roles.values()
)
is_minimal_valid = set(self.minimal_deployment).issubset(self.roles)
is_recommended_valid = set(self.recommended_deployment).issubset(self.roles)
if not all(
[are_meta_keys_valid, are_meta_values_valid, is_minimal_valid, is_recommended_valid]
):
raise ClusterRolesConfigError(
"Invalid ClusterRolesConfig: The configuration is not coherent."
)

def validate_roles_config(roles_config: ClusterRolesConfig) -> None:
"""Assert that all the used roles have been defined."""
roles = set(roles_config.roles)
assert set(roles_config.meta_roles.keys()).issubset(roles)
for role_set in roles_config.meta_roles.values():
assert set(role_set).issubset(roles)
assert set(roles_config.minimal_deployment).issubset(roles)
assert set(roles_config.recommended_deployment.keys()).issubset(roles)
def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool:
"""Validate the ClusterRolesConfig is coherent with the provided cluster roles."""
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
return set(self.minimal_deployment).issubset(set(cluster_roles))


_EndpointMapping = TypedDict(
Expand Down Expand Up @@ -134,7 +149,6 @@ def __init__(

self._endpoints = endpoints

validate_roles_config(roles_config)
self.roles_config = roles_config

self.cluster = ClusterProvider(
Expand Down Expand Up @@ -263,15 +277,7 @@ def is_coherent(self) -> bool:
if manual_coherency_checker := self._is_coherent:
return manual_coherency_checker(self.cluster, self.roles_config)

rc = self.roles_config
minimal_deployment = set(rc.minimal_deployment)
cluster = self.cluster
roles = cluster.gather_roles()

# Whether the roles list makes up a coherent mimir deployment.
is_coherent = set(roles.keys()).issuperset(minimal_deployment)

return is_coherent
return self.roles_config.is_coherent_with(self.cluster.gather_roles().keys())

@property
def missing_roles(self) -> Set[str]:
Expand Down
6 changes: 2 additions & 4 deletions src/cosl/coordinated_workers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ def gather_roles(self) -> Dict[str, int]:
remote_app_databag = relation.data[relation.app]
try:
worker_role: str = ClusterRequirerAppData.load(remote_app_databag).role
except DataValidationError as e:
log.error(f"invalid databag contents: {e}")
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
except DataValidationError:
continue

# the number of units with each role is the number of remote units
Expand All @@ -373,8 +372,7 @@ def gather_topology(self) -> List[Dict[str, str]]:
try:
worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit])
unit_address = worker_data.address
except DataValidationError as e:
log.info(f"invalid databag contents: {e}")
except DataValidationError:
continue
worker_topology = {
"address": unit_address,
Expand Down
4 changes: 3 additions & 1 deletion tests/test_coordinated_workers/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@
@pytest.fixture(autouse=True)
def root_ca_cert(tmp_path: Path) -> Generator[Path, None, None]:
# Prevent the charm's _update_tls_certificates method to try and write our local filesystem
with patch("src.cosl.coordinated_workers.worker.ROOT_CA_CERT_LOCAL", new=tmp_path / "rootcacert"):
with patch(
"src.cosl.coordinated_workers.worker.ROOT_CA_CERT_LOCAL", new=tmp_path / "rootcacert"
):
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
yield tmp_path / "rootcacert"
176 changes: 176 additions & 0 deletions tests/test_coordinated_workers/test_coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import ops
import pytest
from ops import Framework
from scenario import Container, Context, Relation, State

from src.cosl.coordinated_workers.coordinator import (
ClusterRolesConfig,
Coordinator,
S3NotFoundError,
)
from src.cosl.coordinated_workers.interface import ClusterRequirerAppData


@pytest.fixture
def coordinator_state():
requires_relations = {
endpoint: Relation(endpoint=endpoint, interface=interface["interface"])
for endpoint, interface in {
"my-certificates": {"interface": "certificates"},
"my-logging": {"interface": "loki_push_api"},
"my-tracing": {"interface": "tracing"},
}.items()
}
requires_relations["my-s3"] = Relation(
"my-s3",
interface="s3",
remote_app_data={
"endpoint": "s3",
"bucket": "foo-bucket",
"access-key": "my-access-key",
"secret-key": "my-secret-key",
},
)
requires_relations["cluster_worker0"] = Relation(
"my-cluster",
remote_app_name="worker0",
remote_app_data=ClusterRequirerAppData(role="read").dump(),
)
requires_relations["cluster_worker1"] = Relation(
"my-cluster",
remote_app_name="worker1",
remote_app_data=ClusterRequirerAppData(role="write").dump(),
)
requires_relations["cluster_worker2"] = Relation(
"my-cluster",
remote_app_name="worker2",
remote_app_data=ClusterRequirerAppData(role="backend").dump(),
)

provides_relations = {
endpoint: Relation(endpoint=endpoint, interface=interface["interface"])
for endpoint, interface in {
"my-dashboards": {"interface": "grafana_dashboard"},
"my-metrics": {"interface": "prometheus_scrape"},
}.items()
}

return State(
containers=[
Container("nginx", can_connect=True),
Container("nginx-prometheus-exporter", can_connect=True),
],
relations=list(requires_relations.values()) + list(provides_relations.values()),
)


@pytest.fixture()
def coordinator_charm(request):
class MyCoordinator(ops.CharmBase):
META = {
"name": "foo-app",
"requires": {
"my-certificates": {"interface": "certificates"},
"my-cluster": {"interface": "cluster"},
"my-logging": {"interface": "loki_push_api"},
"my-tracing": {"interface": "tracing"},
"my-s3": {"interface": "s3"},
},
"provides": {
"my-dashboards": {"interface": "grafana_dashboard"},
"my-metrics": {"interface": "prometheus_scrape"},
},
"containers": {
"nginx": {"type": "oci-image"},
"nginx-prometheus-exporter": {"type": "oci-image"},
},
}

def __init__(self, framework: Framework):
super().__init__(framework)
# Note: Here it is a good idea not to use context mgr because it is "ops aware"
self.coordinator = Coordinator(
charm=self,
# Roles were take from loki-coordinator-k8s-operator
roles_config=ClusterRolesConfig(
roles={"all", "read", "write", "backend"},
meta_roles={"all": {"all", "read", "write", "backend"}},
minimal_deployment={
"read",
"write",
"backend",
},
recommended_deployment={
"read": 3,
"write": 3,
"backend": 3,
},
),
s3_bucket_name="foo-bucket",
external_url="https://foo.example.com",
worker_metrics_port=123,
endpoints={
"certificates": "my-certificates",
"cluster": "my-cluster",
"grafana-dashboards": "my-dashboards",
"logging": "my-logging",
"metrics": "my-metrics",
"tracing": "my-tracing",
"s3": "my-s3",
},
nginx_config=lambda coordinator: f"nginx configuration for {coordinator.name}",
workers_config=lambda coordinator: f"workers configuration for {coordinator.name}",
# nginx_options: Optional[NginxMappingOverrides] = None,
# is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
# is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
# tracing_receivers: Optional[Callable[[], Optional[Dict[str, str]]]] = None,
)

return MyCoordinator


def test_worker_roles_subset_of_minimal_deployment(
coordinator_state: State, coordinator_charm: ops.CharmBase
):
# Test that the combination of worker roles is a subset of the minimal deployment roles

# GIVEN a coordinator_charm
ctx = Context(coordinator_charm, meta=coordinator_charm.META)

# AND a coordinator_state defining relations to worker charms with incomplete distributed roles
missing_backend_worker_relation = [
relation
for relation in coordinator_state.relations
if relation.remote_app_name != "worker2"
]

# WHEN we process any event
with ctx.manager(
"update-status",
state=coordinator_state.replace(relations=missing_backend_worker_relation),
) as mgr:
charm: coordinator_charm = mgr.charm

# THEN the deployment is coherent
assert not charm.coordinator.is_coherent


def test_without_s3_integration_raises_error(
coordinator_state: State, coordinator_charm: ops.CharmBase
):
# Test that a charm without an s3 integration raises S3NotFoundError

# GIVEN a coordinator charm without an s3 integration
ctx = Context(coordinator_charm, meta=coordinator_charm.META)
relations_without_s3 = [
relation for relation in coordinator_state.relations if relation.endpoint != "my-s3"
]

# WHEN we process any event
with ctx.manager(
"update-status",
state=coordinator_state.replace(relations=relations_without_s3),
) as mgr:
# THEN the _s3_config method raises and S3NotFoundError
with pytest.raises(S3NotFoundError):
mgr.charm.coordinator._s3_config
63 changes: 63 additions & 0 deletions tests/test_coordinated_workers/test_roles_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest

from src.cosl.coordinated_workers.coordinator import ClusterRolesConfig, ClusterRolesConfigError


def test_meta_role_keys_not_in_roles():
# Test that the meta roles keys are a subset of roles
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved

# GIVEN an invalid_role_config
# WHEN ClusterRolesConfig is instantiated
# THEN the __post_init__ method raises a ClusterRolesConfigError
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(ClusterRolesConfigError):
ClusterRolesConfig(
roles={"read"},
meta_roles={"I AM NOT A SUBSET OF ROLES": {"read"}},
minimal_deployment={"read"},
recommended_deployment={"read": 3},
)


def test_meta_role_values_not_in_roles():
# Test that the meta roles values are a subset of roles

# GIVEN an invalid_role_config
# WHEN ClusterRolesConfig is instantiated
# THEN the __post_init__ method raises a ClusterRolesConfigError
with pytest.raises(ClusterRolesConfigError):
ClusterRolesConfig(
roles={"read"},
meta_roles={"read": {"I AM NOT A SUBSET OF ROLES"}},
minimal_deployment={"read"},
recommended_deployment={"read": 3},
)


def test_minimal_deployment_roles_not_in_roles():
# Test that the minimal deployment roles are a subset of roles

# GIVEN an invalid_role_config
# WHEN ClusterRolesConfig is instantiated
# THEN the __post_init__ method raises a ClusterRolesConfigError
with pytest.raises(ClusterRolesConfigError):
ClusterRolesConfig(
roles={"read"},
meta_roles={"read": {"read"}},
minimal_deployment={"I AM NOT A SUBSET OF ROLES"},
recommended_deployment={"read": 3},
)


def test_recommended_deployment_roles_not_in_roles():
# Test that the recommended deployment roles are a subset of roles

# GIVEN an invalid_role_config
# WHEN ClusterRolesConfig is instantiated
# THEN the __post_init__ method raises a ClusterRolesConfigError
with pytest.raises(ClusterRolesConfigError):
ClusterRolesConfig(
roles={"read"},
meta_roles={"read": {"read"}},
minimal_deployment={"read"},
recommended_deployment={"I AM NOT A SUBSET OF ROLES": 3},
)
Loading