From e3e063e0db92aebd50bc1165406a5da6a3b6e860 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Fri, 16 Aug 2024 13:58:07 +0200 Subject: [PATCH 1/4] added worker error on timeout if restart fails --- src/cosl/coordinated_workers/worker.py | 58 ++++++-- tests/test_coordinated_workers/test_worker.py | 127 +++++++++++++++++- 2 files changed, 173 insertions(+), 12 deletions(-) diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index 99b3ec2..7adabcd 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -11,6 +11,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, TypedDict import ops +import tenacity import yaml from ops.model import ActiveStatus, BlockedStatus, WaitingStatus from ops.pebble import Layer, PathError, ProtocolError @@ -61,7 +62,7 @@ def __init__( Args: charm: The worker charm object. - name: The name of the workload container and service. + name: The name of the workload container. pebble_layer: The pebble layer of the workload. endpoints: Endpoint names for coordinator relations, as defined in metadata.yaml. """ @@ -175,14 +176,24 @@ def roles(self) -> List[str]: def _update_config(self) -> None: """Update the worker config and restart the workload if necessary.""" restart = any( - [ + ( self._update_tls_certificates(), self._update_worker_config(), self._set_pebble_layer(), - ] + ) ) if restart: + logger.debug("Config changed. Restarting worker services...") + self.restart() + + # this can happen if s3 wasn't ready (server gave error) when we processed an earlier event + # causing the worker service to die on startup (exited quickly with code...) + # so we try to restart it now. + # TODO: would be nice if we could be notified of when s3 starts working, so we don't have to + # wait for an update-status and can listen to that instead. + elif not all(svc.is_running() for svc in self._container.get_services().values()): + logger.debug("Some services are not running. Starting them now...") self.restart() def _set_pebble_layer(self) -> bool: @@ -297,8 +308,35 @@ def _update_tls_certificates(self) -> bool: return True + SERVICE_START_RETRY_TIMEOUT = 60 * 15 + + @tenacity.retry( + # this method may fail with ChangeError (exited quickly with code...) + retry=tenacity.retry_if_exception_type(ops.pebble.ChangeError), + # give this method some time to pass (by default 15 minutes) + stop=tenacity.stop_after_delay(SERVICE_START_RETRY_TIMEOUT), + # wait 1 minute between tries + wait=tenacity.wait_fixed(60), + # if you don't succeed raise the last caught exception when you're done + reraise=True, + ) def restart(self): - """Restart the pebble service or start if not already running.""" + """Restart the pebble service or start it if not already running. + + Default timeout is 15 minutes. Configure it by setting this class attr: + >>> Worker.SERVICE_START_RETRY_TIMEOUT = 60*30 # 30 minutes + + This method will raise an exception if it fails to start the service within a + specified timeframe. This will presumably bring the charm in error status, so + that juju will retry the last emitted hook until it finally succeeds. + + The assumption is that the state we are in when this method is called is consistent. + The reason why we're failing to restart is dependent on some external factor (such as network, + the reachability of a remote API, or the readiness of an external service the workload depends on). + So letting juju retry the same hook will get us unstuck as soon as that contingency is resolved. + + See https://discourse.charmhub.io/t/its-probably-ok-for-a-unit-to-go-into-error-state/13022 + """ if not self._container.exists(CONFIG_FILE): logger.error("cannot restart worker: config file doesn't exist (yet).") return @@ -308,10 +346,14 @@ def restart(self): return try: - self._container.restart(self._name) - except ops.pebble.ChangeError as e: - logger.error(f"failed to (re)start worker job: {e}", exc_info=True) - return + # restart all services that our layer is responsible for + self._container.restart(*self._container.get_services().keys()) + except ops.pebble.ChangeError: + logger.error( + "failed to (re)start worker jobs. This usually means that an external resource (such as s3) " + "that the software needs to start is not available." + ) + raise def running_version(self) -> Optional[str]: """Get the running version from the worker process.""" diff --git a/tests/test_coordinated_workers/test_worker.py b/tests/test_coordinated_workers/test_worker.py index 9c3b35b..7947b50 100644 --- a/tests/test_coordinated_workers/test_worker.py +++ b/tests/test_coordinated_workers/test_worker.py @@ -1,17 +1,28 @@ +from unittest.mock import MagicMock, patch + import ops import pytest +import tenacity from ops import Framework -from ops.pebble import Layer -from scenario import Container, Context, State +from ops.pebble import Layer, ServiceStatus +from scenario import Container, Context, Mount, State from scenario.runtime import UncaughtCharmError -from cosl.coordinated_workers.worker import Worker +from cosl.coordinated_workers.worker import CONFIG_FILE, Worker + + +@pytest.fixture(autouse=True) +def patch_running_version(): + with patch("cosl.coordinated_workers.worker.Worker.running_version", new=lambda _: "42.42"): + yield class MyCharm(ops.CharmBase): + layer = Layer("") + def __init__(self, framework: Framework): super().__init__(framework) - self.worker = Worker(self, "foo", lambda _: Layer(""), {"cluster": "cluster"}) + self.worker = Worker(self, "foo", lambda _: self.layer, {"cluster": "cluster"}) def test_no_roles_error(): @@ -81,3 +92,111 @@ def test_roles_from_config(roles_active, roles_inactive, expected): ) as mgr: # THEN the Worker.roles method correctly returns the list of only those that are set to true assert set(mgr.charm.worker.roles) == set(expected) + + +def test_worker_restarts_if_some_service_not_up(tmp_path): + # GIVEN a worker with some services + MyCharm.layer = Layer( + { + "services": { + "foo": { + "summary": "foos all the things", + "description": "bar", + "startup": "enabled", + "command": "ls -la", + }, + "bar": { + "summary": "bars the foos", + "description": "bar", + "startup": "enabled", + "command": "exit 1", + }, + "baz": { + "summary": "bazzes all of the bars", + "description": "bar", + "startup": "enabled", + "command": "echo hi", + }, + } + } + ) + ctx = Context( + MyCharm, + meta={ + "name": "foo", + "requires": {"cluster": {"interface": "cluster"}}, + "containers": {"foo": {"type": "oci-image"}}, + }, + config={"options": {"role-all": {"type": "boolean", "default": True}}}, + ) + # WHEN the charm receives any event and there are no changes to the config or the layer, + # but some of the services are down + cfg = tmp_path / "cfg.yaml" + cfg.write_text("some: yaml") + container = Container( + "foo", + can_connect=True, + mounts={"local": Mount(CONFIG_FILE, cfg)}, + service_status={ + "foo": ServiceStatus.INACTIVE, + "bar": ServiceStatus.ACTIVE, + "baz": ServiceStatus.INACTIVE, + }, + ) + state_out = ctx.run(container.pebble_ready_event, State(containers=[container])) + + # THEN the charm restarts all the services that are down + container_out = state_out.get_container("foo") + service_statuses = container_out.service_status.values() + assert all(svc is ServiceStatus.ACTIVE for svc in service_statuses), [ + stat.value for stat in service_statuses + ] + + +def test_worker_raises_if_service_restart_fails_for_too_long(tmp_path): + # GIVEN a worker with some services + MyCharm.layer = Layer( + { + "services": { + "foo": { + "summary": "foos all the things", + "description": "bar", + "startup": "enabled", + "command": "ls -la", + }, + } + } + ) + ctx = Context( + MyCharm, + meta={ + "name": "foo", + "requires": {"cluster": {"interface": "cluster"}}, + "containers": {"foo": {"type": "oci-image"}}, + }, + config={"options": {"role-all": {"type": "boolean", "default": True}}}, + ) + cfg = tmp_path / "cfg.yaml" + cfg.write_text("some: yaml") + container = Container( + "foo", + can_connect=True, + mounts={"local": Mount(CONFIG_FILE, cfg)}, + service_status={ + "foo": ServiceStatus.INACTIVE, + }, + ) + + # WHEN service restart fails + def raise_change_error(*args): + raise ops.pebble.ChangeError("something", MagicMock()) + + with patch("ops.model.Container.restart", new=raise_change_error): + # THEN the charm errors out + with pytest.raises(Exception): + # technically an ops.pebble.ChangeError but the context manager doesn't catch it for some reason + + with ctx.manager(container.pebble_ready_event, State(containers=[container])) as mgr: + # so we don't have to wait for minutes: + mgr.charm.worker.restart.retry.wait = tenacity.wait_none() + mgr.charm.worker.restart.retry.stop = tenacity.stop_after_delay(2) From 7b33a9f96e91c7721117a4c04c78bae4b5cad0a8 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Fri, 16 Aug 2024 14:15:56 +0200 Subject: [PATCH 2/4] maintenance status throughout retries --- src/cosl/coordinated_workers/worker.py | 40 +++++++++++-------- tests/test_coordinated_workers/test_worker.py | 4 +- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index 7adabcd..ee094a8 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -13,6 +13,7 @@ import ops import tenacity import yaml +from ops import MaintenanceStatus from ops.model import ActiveStatus, BlockedStatus, WaitingStatus from ops.pebble import Layer, PathError, ProtocolError @@ -308,23 +309,16 @@ def _update_tls_certificates(self) -> bool: return True - SERVICE_START_RETRY_TIMEOUT = 60 * 15 - - @tenacity.retry( - # this method may fail with ChangeError (exited quickly with code...) - retry=tenacity.retry_if_exception_type(ops.pebble.ChangeError), - # give this method some time to pass (by default 15 minutes) - stop=tenacity.stop_after_delay(SERVICE_START_RETRY_TIMEOUT), - # wait 1 minute between tries - wait=tenacity.wait_fixed(60), - # if you don't succeed raise the last caught exception when you're done - reraise=True, - ) + SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(60 * 15) + SERVICE_START_RETRY_WAIT = tenacity.wait_fixed(60) + SERVICE_START_RETRY_IF = tenacity.retry_if_exception_type(ops.pebble.ChangeError) + def restart(self): """Restart the pebble service or start it if not already running. Default timeout is 15 minutes. Configure it by setting this class attr: - >>> Worker.SERVICE_START_RETRY_TIMEOUT = 60*30 # 30 minutes + >>> Worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(60 * 30) # 30 minutes + You can also configure SERVICE_START_RETRY_WAIT and SERVICE_START_RETRY_IF. This method will raise an exception if it fails to start the service within a specified timeframe. This will presumably bring the charm in error status, so @@ -340,14 +334,28 @@ def restart(self): if not self._container.exists(CONFIG_FILE): logger.error("cannot restart worker: config file doesn't exist (yet).") return - if not self.roles: logger.debug("cannot restart worker: no roles have been configured.") return try: - # restart all services that our layer is responsible for - self._container.restart(*self._container.get_services().keys()) + for attempt in tenacity.Retrying( + # this method may fail with ChangeError (exited quickly with code...) + retry=self.SERVICE_START_RETRY_IF, + # give this method some time to pass (by default 15 minutes) + stop=self.SERVICE_START_RETRY_STOP, + # wait 1 minute between tries + wait=self.SERVICE_START_RETRY_WAIT, + # if you don't succeed raise the last caught exception when you're done + reraise=True, + ): + with attempt: + self._charm.unit.status = MaintenanceStatus( + f"restarting... (attempt #{attempt.retry_state.attempt_number})" + ) + # restart all services that our layer is responsible for + self._container.restart(*self._container.get_services().keys()) + except ops.pebble.ChangeError: logger.error( "failed to (re)start worker jobs. This usually means that an external resource (such as s3) " diff --git a/tests/test_coordinated_workers/test_worker.py b/tests/test_coordinated_workers/test_worker.py index 7947b50..9806f2b 100644 --- a/tests/test_coordinated_workers/test_worker.py +++ b/tests/test_coordinated_workers/test_worker.py @@ -198,5 +198,5 @@ def raise_change_error(*args): with ctx.manager(container.pebble_ready_event, State(containers=[container])) as mgr: # so we don't have to wait for minutes: - mgr.charm.worker.restart.retry.wait = tenacity.wait_none() - mgr.charm.worker.restart.retry.stop = tenacity.stop_after_delay(2) + mgr.charm.worker.SERVICE_START_RETRY_WAIT = tenacity.wait_none() + mgr.charm.worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(2) From 9c27c78b0d8115e36ebaeff117c527f813f049b4 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Fri, 16 Aug 2024 14:22:13 +0200 Subject: [PATCH 3/4] fixed static --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index d4c2682..9df0fe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ requires-python = ">=3.8" dependencies = [ "ops", "pydantic", + "tenacity", "PyYAML", "typing-extensions" ] From b225eba842e91f739acb67bb3ac21148e13f03b2 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Fri, 16 Aug 2024 15:22:00 +0200 Subject: [PATCH 4/4] only restart own services --- src/cosl/coordinated_workers/worker.py | 2 +- tests/test_coordinated_workers/test_worker.py | 74 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index ee094a8..5717ef3 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -354,7 +354,7 @@ def restart(self): f"restarting... (attempt #{attempt.retry_state.attempt_number})" ) # restart all services that our layer is responsible for - self._container.restart(*self._container.get_services().keys()) + self._container.restart(*self._pebble_layer().services.keys()) except ops.pebble.ChangeError: logger.error( diff --git a/tests/test_coordinated_workers/test_worker.py b/tests/test_coordinated_workers/test_worker.py index 9806f2b..de56f11 100644 --- a/tests/test_coordinated_workers/test_worker.py +++ b/tests/test_coordinated_workers/test_worker.py @@ -103,6 +103,7 @@ def test_worker_restarts_if_some_service_not_up(tmp_path): "summary": "foos all the things", "description": "bar", "startup": "enabled", + "override": "merge", "command": "ls -la", }, "bar": { @@ -153,6 +154,79 @@ def test_worker_restarts_if_some_service_not_up(tmp_path): ] +def test_worker_does_not_restart_external_services(tmp_path): + # GIVEN a worker with some services and a layer with some other services + MyCharm.layer = Layer( + { + "services": { + "foo": { + "summary": "foos all the things", + "override": "merge", + "description": "bar", + "startup": "enabled", + "command": "ls -la", + } + } + } + ) + other_layer = Layer( + { + "services": { + "bar": { + "summary": "bars the foos", + "description": "bar", + "startup": "enabled", + "command": "exit 1", + }, + "baz": { + "summary": "bazzes all of the bars", + "description": "bar", + "startup": "enabled", + "command": "echo hi", + }, + } + } + ) + + ctx = Context( + MyCharm, + meta={ + "name": "foo", + "requires": {"cluster": {"interface": "cluster"}}, + "containers": {"foo": {"type": "oci-image"}}, + }, + config={"options": {"role-all": {"type": "boolean", "default": True}}}, + ) + # WHEN the charm receives any event and there are no changes to the config or the layer, + # but some of the services are down + cfg = tmp_path / "cfg.yaml" + cfg.write_text("some: yaml") + container = Container( + "foo", + can_connect=True, + mounts={"local": Mount(CONFIG_FILE, cfg)}, + layers={"foo": MyCharm.layer, "bar": other_layer}, + service_status={ + # layer foo has some inactive + "foo": ServiceStatus.INACTIVE, + # layer bar has some inactive + "bar": ServiceStatus.ACTIVE, + "baz": ServiceStatus.INACTIVE, + }, + ) + state_out = ctx.run(container.pebble_ready_event, State(containers=[container])) + + # THEN the charm restarts all the services that are down + container_out = state_out.get_container("foo") + assert container_out.service_status == { + # layer foo service is now active + "foo": ServiceStatus.ACTIVE, + # layer bar services is unchanged + "bar": ServiceStatus.ACTIVE, + "baz": ServiceStatus.INACTIVE, + } + + def test_worker_raises_if_service_restart_fails_for_too_long(tmp_path): # GIVEN a worker with some services MyCharm.layer = Layer(