Skip to content

Commit

Permalink
Holistic worker logic (#70)
Browse files Browse the repository at this point in the history
* common-exit-hook for Worker

* test fix

* vbump

* tests

* lint

* rename

* rename tests
  • Loading branch information
PietroPasotti authored Sep 6, 2024
1 parent 539f535 commit 839fe92
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.29"
version = "0.0.30"
authors = [
{ name="sed-i", email="82407168+sed-i@users.noreply.github.com" },
]
Expand Down
40 changes: 11 additions & 29 deletions src/cosl/coordinated_workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,11 @@ def __init__(
if self._resources_requests_getter
else None
)
# holistic update logic, aka common exit hook
self._reconcile()

# Event listeners
self.framework.observe(self._charm.on.config_changed, self._on_config_changed)
self.framework.observe(self._charm.on.upgrade_charm, self._on_upgrade_charm)
self.framework.observe(self._charm.on.collect_unit_status, self._on_collect_status)

self.framework.observe(self.cluster.on.config_received, self._on_worker_config_received)
self.framework.observe(self.cluster.on.created, self._on_cluster_created)
self.framework.observe(self.cluster.on.removed, self._log_forwarder.disable_logging)

self.framework.observe(self._charm.on[self._name].pebble_ready, self._on_pebble_ready)
Expand All @@ -192,7 +189,6 @@ def __init__(
# Event handlers
def _on_pebble_ready(self, _: ops.PebbleReadyEvent):
self._charm.unit.set_workload_version(self.running_version() or "")
self._update_config()

def _on_pebble_check_failed(self, event: ops.PebbleCheckFailedEvent):
if event.info.name == "ready":
Expand All @@ -204,26 +200,6 @@ def _on_pebble_check_recovered(self, event: ops.PebbleCheckFailedEvent):
logger.info("Pebble `ready` check is now passing: " "worker node is up.")
# collect-status will detect that we're ready and set active status.

def _on_worker_config_received(self, _: ops.EventBase):
self._update_config()

def _on_upgrade_charm(self, _: ops.UpgradeCharmEvent):
self._update_cluster_relation()

def _on_cluster_created(self, _: ops.EventBase):
self._update_cluster_relation()
self._update_config()

def _on_cluster_changed(self, _: ops.EventBase):
self._update_config()

def _on_config_changed(self, _: ops.ConfigChangedEvent):
# If the user has changed roles, publish them to relation data
self._update_cluster_relation()
# If there is a config, start the worker
if self._worker_config:
self._update_worker_config()

@property
def _worker_config(self):
"""The configuration that this worker should run with, as received from the coordinator.
Expand Down Expand Up @@ -358,6 +334,10 @@ def roles(self) -> List[str]:

def _update_config(self) -> None:
"""Update the worker config and restart the workload if necessary."""
if not self._container.can_connect():
logger.debug("container cannot connect, skipping update_config.")
return

restart = any(
(
self._update_tls_certificates(),
Expand Down Expand Up @@ -417,16 +397,18 @@ def _add_readiness_check(self, new_layer: Layer):
"ready", {"override": "replace", "http": {"url": self._readiness_check_endpoint(self)}}
)

def _reconcile(self):
"""Run all unconditional logic."""
self._update_cluster_relation()
self._update_config()

def _update_cluster_relation(self) -> None:
"""Publish all the worker information to relation data."""
self.cluster.publish_unit_address(socket.getfqdn())
if self._charm.unit.is_leader() and self.roles:
logger.info(f"publishing roles: {self.roles}")
self.cluster.publish_app_roles(self.roles)

if self._worker_config:
self._update_config()

def _running_worker_config(self) -> Optional[Dict[str, Any]]:
"""Return the worker config as dict, or None if retrieval failed."""
if not self._container.can_connect():
Expand Down
31 changes: 22 additions & 9 deletions tests/test_coordinated_workers/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from contextlib import ExitStack
from pathlib import Path
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -266,19 +267,31 @@ def test_worker_raises_if_service_restart_fails_for_too_long(tmp_path):
},
)

# WHEN service restart fails
def raise_change_error(*args):
raise ops.pebble.ChangeError("something", MagicMock())

with patch("ops.model.Container.restart", new=raise_change_error):
# THEN the charm errors out
with pytest.raises(Exception):
# technically an ops.pebble.ChangeError but the context manager doesn't catch it for some reason
with ExitStack() as stack:
# WHEN service restart fails
stack.enter_context(patch("ops.model.Container.restart", new=raise_change_error))

with ctx.manager(container.pebble_ready_event, State(containers=[container])) as mgr:
# so we don't have to wait for minutes:
mgr.charm.worker.SERVICE_START_RETRY_WAIT = tenacity.wait_none()
mgr.charm.worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(2)
# so we don't have to wait for minutes:
stack.enter_context(
patch(
"cosl.coordinated_workers.worker.Worker.SERVICE_START_RETRY_WAIT",
new=tenacity.wait_none(),
)
)
stack.enter_context(
patch(
"cosl.coordinated_workers.worker.Worker.SERVICE_START_RETRY_STOP",
new=tenacity.stop_after_delay(2),
)
)

# THEN the charm errors out
# technically an ops.pebble.ChangeError but the context manager doesn't catch it for some reason
stack.enter_context(pytest.raises(Exception))
ctx.run(container.pebble_ready_event, State(containers=[container]))


@pytest.mark.parametrize(
Expand Down
23 changes: 13 additions & 10 deletions tests/test_coordinated_workers/test_worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ def test_status_check_no_pebble(ctx, base_state, caplog):

@k8s_patch()
def test_status_check_no_config(ctx, base_state, caplog):
# GIVEN there is no config file on disk
state = base_state.with_can_connect("workload", True)

# GIVEN there is no config file on disk
# WHEN we run any event
state_out = ctx.run("update_status", state)
with patch(
"cosl.coordinated_workers.worker.Worker._running_worker_config", new=lambda _: None
):
state_out = ctx.run("update_status", state)

# THEN the charm sets blocked
assert state_out.unit_status == BlockedStatus("node down (see logs)")
Expand Down Expand Up @@ -178,7 +180,7 @@ def __init__(self, framework: Framework):
self.worker = Worker(
self,
"workload",
lambda _: Layer(""),
lambda _: Layer({"services": {"foo": {"command": "foo"}}}),
{"cluster": "cluster"},
)

Expand Down Expand Up @@ -211,12 +213,13 @@ def test_access_status_no_endpoint_raises():
# GIVEN the caller doesn't pass an endpoint to Worker
caller = MagicMock()
with patch("cosl.juju_topology.JujuTopology.from_charm"):
worker = Worker(
caller,
"workload",
lambda _: Layer(""),
{"cluster": "cluster"},
)
with patch("cosl.coordinated_workers.worker.Worker._reconcile"):
worker = Worker(
caller,
"workload",
lambda _: Layer({"services": {"foo": {"command": "foo"}}}),
{"cluster": "cluster"},
)

# THEN calling .status raises
with pytest.raises(WorkerError):
Expand Down

0 comments on commit 839fe92

Please sign in to comment.