From 839fe927aac30368154fac46bea57223fe60a988 Mon Sep 17 00:00:00 2001 From: PietroPasotti Date: Fri, 6 Sep 2024 09:52:31 +0200 Subject: [PATCH] Holistic worker logic (#70) * common-exit-hook for Worker * test fix * vbump * tests * lint * rename * rename tests --- pyproject.toml | 2 +- src/cosl/coordinated_workers/worker.py | 40 +++++-------------- tests/test_coordinated_workers/test_worker.py | 31 +++++++++----- .../test_worker_status.py | 23 ++++++----- 4 files changed, 47 insertions(+), 49 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0c66d86..e643e89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "cosl" -version = "0.0.29" +version = "0.0.30" authors = [ { name="sed-i", email="82407168+sed-i@users.noreply.github.com" }, ] diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index cd2df15..9cfdc89 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -171,14 +171,11 @@ def __init__( if self._resources_requests_getter else None ) + # holistic update logic, aka common exit hook + self._reconcile() # Event listeners - self.framework.observe(self._charm.on.config_changed, self._on_config_changed) - self.framework.observe(self._charm.on.upgrade_charm, self._on_upgrade_charm) self.framework.observe(self._charm.on.collect_unit_status, self._on_collect_status) - - self.framework.observe(self.cluster.on.config_received, self._on_worker_config_received) - self.framework.observe(self.cluster.on.created, self._on_cluster_created) self.framework.observe(self.cluster.on.removed, self._log_forwarder.disable_logging) self.framework.observe(self._charm.on[self._name].pebble_ready, self._on_pebble_ready) @@ -192,7 +189,6 @@ def __init__( # Event handlers def _on_pebble_ready(self, _: ops.PebbleReadyEvent): self._charm.unit.set_workload_version(self.running_version() or "") - self._update_config() def _on_pebble_check_failed(self, event: ops.PebbleCheckFailedEvent): if event.info.name == "ready": @@ -204,26 +200,6 @@ def _on_pebble_check_recovered(self, event: ops.PebbleCheckFailedEvent): logger.info("Pebble `ready` check is now passing: " "worker node is up.") # collect-status will detect that we're ready and set active status. - def _on_worker_config_received(self, _: ops.EventBase): - self._update_config() - - def _on_upgrade_charm(self, _: ops.UpgradeCharmEvent): - self._update_cluster_relation() - - def _on_cluster_created(self, _: ops.EventBase): - self._update_cluster_relation() - self._update_config() - - def _on_cluster_changed(self, _: ops.EventBase): - self._update_config() - - def _on_config_changed(self, _: ops.ConfigChangedEvent): - # If the user has changed roles, publish them to relation data - self._update_cluster_relation() - # If there is a config, start the worker - if self._worker_config: - self._update_worker_config() - @property def _worker_config(self): """The configuration that this worker should run with, as received from the coordinator. @@ -358,6 +334,10 @@ def roles(self) -> List[str]: def _update_config(self) -> None: """Update the worker config and restart the workload if necessary.""" + if not self._container.can_connect(): + logger.debug("container cannot connect, skipping update_config.") + return + restart = any( ( self._update_tls_certificates(), @@ -417,6 +397,11 @@ def _add_readiness_check(self, new_layer: Layer): "ready", {"override": "replace", "http": {"url": self._readiness_check_endpoint(self)}} ) + def _reconcile(self): + """Run all unconditional logic.""" + self._update_cluster_relation() + self._update_config() + def _update_cluster_relation(self) -> None: """Publish all the worker information to relation data.""" self.cluster.publish_unit_address(socket.getfqdn()) @@ -424,9 +409,6 @@ def _update_cluster_relation(self) -> None: logger.info(f"publishing roles: {self.roles}") self.cluster.publish_app_roles(self.roles) - if self._worker_config: - self._update_config() - def _running_worker_config(self) -> Optional[Dict[str, Any]]: """Return the worker config as dict, or None if retrieval failed.""" if not self._container.can_connect(): diff --git a/tests/test_coordinated_workers/test_worker.py b/tests/test_coordinated_workers/test_worker.py index 49d188a..5b44a84 100644 --- a/tests/test_coordinated_workers/test_worker.py +++ b/tests/test_coordinated_workers/test_worker.py @@ -1,4 +1,5 @@ import json +from contextlib import ExitStack from pathlib import Path from unittest.mock import MagicMock, patch @@ -266,19 +267,31 @@ def test_worker_raises_if_service_restart_fails_for_too_long(tmp_path): }, ) - # WHEN service restart fails def raise_change_error(*args): raise ops.pebble.ChangeError("something", MagicMock()) - with patch("ops.model.Container.restart", new=raise_change_error): - # THEN the charm errors out - with pytest.raises(Exception): - # technically an ops.pebble.ChangeError but the context manager doesn't catch it for some reason + with ExitStack() as stack: + # WHEN service restart fails + stack.enter_context(patch("ops.model.Container.restart", new=raise_change_error)) - with ctx.manager(container.pebble_ready_event, State(containers=[container])) as mgr: - # so we don't have to wait for minutes: - mgr.charm.worker.SERVICE_START_RETRY_WAIT = tenacity.wait_none() - mgr.charm.worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(2) + # so we don't have to wait for minutes: + stack.enter_context( + patch( + "cosl.coordinated_workers.worker.Worker.SERVICE_START_RETRY_WAIT", + new=tenacity.wait_none(), + ) + ) + stack.enter_context( + patch( + "cosl.coordinated_workers.worker.Worker.SERVICE_START_RETRY_STOP", + new=tenacity.stop_after_delay(2), + ) + ) + + # THEN the charm errors out + # technically an ops.pebble.ChangeError but the context manager doesn't catch it for some reason + stack.enter_context(pytest.raises(Exception)) + ctx.run(container.pebble_ready_event, State(containers=[container])) @pytest.mark.parametrize( diff --git a/tests/test_coordinated_workers/test_worker_status.py b/tests/test_coordinated_workers/test_worker_status.py index 0eff5f9..e0c66c9 100644 --- a/tests/test_coordinated_workers/test_worker_status.py +++ b/tests/test_coordinated_workers/test_worker_status.py @@ -130,11 +130,13 @@ def test_status_check_no_pebble(ctx, base_state, caplog): @k8s_patch() def test_status_check_no_config(ctx, base_state, caplog): - # GIVEN there is no config file on disk state = base_state.with_can_connect("workload", True) - + # GIVEN there is no config file on disk # WHEN we run any event - state_out = ctx.run("update_status", state) + with patch( + "cosl.coordinated_workers.worker.Worker._running_worker_config", new=lambda _: None + ): + state_out = ctx.run("update_status", state) # THEN the charm sets blocked assert state_out.unit_status == BlockedStatus("node down (see logs)") @@ -178,7 +180,7 @@ def __init__(self, framework: Framework): self.worker = Worker( self, "workload", - lambda _: Layer(""), + lambda _: Layer({"services": {"foo": {"command": "foo"}}}), {"cluster": "cluster"}, ) @@ -211,12 +213,13 @@ def test_access_status_no_endpoint_raises(): # GIVEN the caller doesn't pass an endpoint to Worker caller = MagicMock() with patch("cosl.juju_topology.JujuTopology.from_charm"): - worker = Worker( - caller, - "workload", - lambda _: Layer(""), - {"cluster": "cluster"}, - ) + with patch("cosl.coordinated_workers.worker.Worker._reconcile"): + worker = Worker( + caller, + "workload", + lambda _: Layer({"services": {"foo": {"command": "foo"}}}), + {"cluster": "cluster"}, + ) # THEN calling .status raises with pytest.raises(WorkerError):