Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added worker error on timeout if restart fails #56

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ requires-python = ">=3.8"
dependencies = [
"ops",
"pydantic",
"tenacity",
"PyYAML",
"typing-extensions"
]
Expand Down
68 changes: 59 additions & 9 deletions src/cosl/coordinated_workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, TypedDict

import ops
import tenacity
import yaml
from ops import MaintenanceStatus
from ops.model import ActiveStatus, BlockedStatus, WaitingStatus
from ops.pebble import Check, Layer, PathError, Plan, ProtocolError

Expand Down Expand Up @@ -72,7 +74,7 @@ def __init__(

Args:
charm: The worker charm object.
name: The name of the workload container and service.
name: The name of the workload container.
pebble_layer: The pebble layer of the workload.
endpoints: Endpoint names for coordinator relations, as defined in metadata.yaml.
readiness_check_endpoint: URL to probe with a pebble check to determine
Expand Down Expand Up @@ -283,14 +285,24 @@ def roles(self) -> List[str]:
def _update_config(self) -> None:
"""Update the worker config and restart the workload if necessary."""
restart = any(
[
(
self._update_tls_certificates(),
self._update_worker_config(),
self._set_pebble_layer(),
]
)
)

if restart:
logger.debug("Config changed. Restarting worker services...")
self.restart()

# this can happen if s3 wasn't ready (server gave error) when we processed an earlier event
# causing the worker service to die on startup (exited quickly with code...)
# so we try to restart it now.
# TODO: would be nice if we could be notified of when s3 starts working, so we don't have to
# wait for an update-status and can listen to that instead.
elif not all(svc.is_running() for svc in self._container.get_services().values()):
logger.debug("Some services are not running. Starting them now...")
self.restart()

def _set_pebble_layer(self) -> bool:
Expand Down Expand Up @@ -422,21 +434,59 @@ def _update_tls_certificates(self) -> bool:

return True

SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(60 * 15)
SERVICE_START_RETRY_WAIT = tenacity.wait_fixed(60)
SERVICE_START_RETRY_IF = tenacity.retry_if_exception_type(ops.pebble.ChangeError)

def restart(self):
"""Restart the pebble service or start if not already running."""
"""Restart the pebble service or start it if not already running.

Default timeout is 15 minutes. Configure it by setting this class attr:
>>> Worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(60 * 30) # 30 minutes
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
You can also configure SERVICE_START_RETRY_WAIT and SERVICE_START_RETRY_IF.

This method will raise an exception if it fails to start the service within a
specified timeframe. This will presumably bring the charm in error status, so
that juju will retry the last emitted hook until it finally succeeds.

The assumption is that the state we are in when this method is called is consistent.
The reason why we're failing to restart is dependent on some external factor (such as network,
the reachability of a remote API, or the readiness of an external service the workload depends on).
So letting juju retry the same hook will get us unstuck as soon as that contingency is resolved.

See https://discourse.charmhub.io/t/its-probably-ok-for-a-unit-to-go-into-error-state/13022
"""
if not self._container.exists(CONFIG_FILE):
logger.error("cannot restart worker: config file doesn't exist (yet).")
return

if not self.roles:
logger.debug("cannot restart worker: no roles have been configured.")
return

try:
self._container.restart(self._name)
except ops.pebble.ChangeError as e:
logger.error(f"failed to (re)start worker job: {e}", exc_info=True)
return
for attempt in tenacity.Retrying(
# this method may fail with ChangeError (exited quickly with code...)
retry=self.SERVICE_START_RETRY_IF,
# give this method some time to pass (by default 15 minutes)
stop=self.SERVICE_START_RETRY_STOP,
# wait 1 minute between tries
wait=self.SERVICE_START_RETRY_WAIT,
# if you don't succeed raise the last caught exception when you're done
reraise=True,
):
with attempt:
self._charm.unit.status = MaintenanceStatus(
f"restarting... (attempt #{attempt.retry_state.attempt_number})"
)
# restart all services that our layer is responsible for
self._container.restart(*self._pebble_layer().services.keys())

except ops.pebble.ChangeError:
logger.error(
"failed to (re)start worker jobs. This usually means that an external resource (such as s3) "
"that the software needs to start is not available."
)
raise

def running_version(self) -> Optional[str]:
"""Get the running version from the worker process."""
Expand Down
201 changes: 197 additions & 4 deletions tests/test_coordinated_workers/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
from unittest.mock import MagicMock, patch

import ops
import pytest
import tenacity
from ops import Framework
from ops.pebble import Layer
from scenario import Container, Context, State
from ops.pebble import Layer, ServiceStatus
from scenario import Container, Context, Mount, State
from scenario.runtime import UncaughtCharmError

from cosl.coordinated_workers.worker import Worker
from cosl.coordinated_workers.worker import CONFIG_FILE, Worker


@pytest.fixture(autouse=True)
def patch_running_version():
with patch("cosl.coordinated_workers.worker.Worker.running_version", new=lambda _: "42.42"):
yield


class MyCharm(ops.CharmBase):
layer = Layer("")

def __init__(self, framework: Framework):
super().__init__(framework)
self.worker = Worker(self, "foo", lambda _: Layer(""), {"cluster": "cluster"})
self.worker = Worker(self, "foo", lambda _: self.layer, {"cluster": "cluster"})


def test_no_roles_error():
Expand Down Expand Up @@ -81,3 +92,185 @@ def test_roles_from_config(roles_active, roles_inactive, expected):
) as mgr:
# THEN the Worker.roles method correctly returns the list of only those that are set to true
assert set(mgr.charm.worker.roles) == set(expected)


def test_worker_restarts_if_some_service_not_up(tmp_path):
# GIVEN a worker with some services
MyCharm.layer = Layer(
{
"services": {
"foo": {
"summary": "foos all the things",
"description": "bar",
"startup": "enabled",
"override": "merge",
"command": "ls -la",
},
"bar": {
"summary": "bars the foos",
"description": "bar",
"startup": "enabled",
"command": "exit 1",
},
"baz": {
"summary": "bazzes all of the bars",
"description": "bar",
"startup": "enabled",
"command": "echo hi",
},
}
}
)
ctx = Context(
MyCharm,
meta={
"name": "foo",
"requires": {"cluster": {"interface": "cluster"}},
"containers": {"foo": {"type": "oci-image"}},
},
config={"options": {"role-all": {"type": "boolean", "default": True}}},
)
# WHEN the charm receives any event and there are no changes to the config or the layer,
# but some of the services are down
cfg = tmp_path / "cfg.yaml"
cfg.write_text("some: yaml")
container = Container(
"foo",
can_connect=True,
mounts={"local": Mount(CONFIG_FILE, cfg)},
service_status={
"foo": ServiceStatus.INACTIVE,
"bar": ServiceStatus.ACTIVE,
"baz": ServiceStatus.INACTIVE,
},
)
state_out = ctx.run(container.pebble_ready_event, State(containers=[container]))

# THEN the charm restarts all the services that are down
container_out = state_out.get_container("foo")
service_statuses = container_out.service_status.values()
assert all(svc is ServiceStatus.ACTIVE for svc in service_statuses), [
stat.value for stat in service_statuses
]


def test_worker_does_not_restart_external_services(tmp_path):
# GIVEN a worker with some services and a layer with some other services
MyCharm.layer = Layer(
{
"services": {
"foo": {
"summary": "foos all the things",
"override": "merge",
"description": "bar",
"startup": "enabled",
"command": "ls -la",
}
}
}
)
other_layer = Layer(
{
"services": {
"bar": {
"summary": "bars the foos",
"description": "bar",
"startup": "enabled",
"command": "exit 1",
},
"baz": {
"summary": "bazzes all of the bars",
"description": "bar",
"startup": "enabled",
"command": "echo hi",
},
}
}
)

ctx = Context(
MyCharm,
meta={
"name": "foo",
"requires": {"cluster": {"interface": "cluster"}},
"containers": {"foo": {"type": "oci-image"}},
},
config={"options": {"role-all": {"type": "boolean", "default": True}}},
)
# WHEN the charm receives any event and there are no changes to the config or the layer,
# but some of the services are down
cfg = tmp_path / "cfg.yaml"
cfg.write_text("some: yaml")
container = Container(
"foo",
can_connect=True,
mounts={"local": Mount(CONFIG_FILE, cfg)},
layers={"foo": MyCharm.layer, "bar": other_layer},
service_status={
# layer foo has some inactive
"foo": ServiceStatus.INACTIVE,
# layer bar has some inactive
"bar": ServiceStatus.ACTIVE,
"baz": ServiceStatus.INACTIVE,
},
)
state_out = ctx.run(container.pebble_ready_event, State(containers=[container]))

# THEN the charm restarts all the services that are down
container_out = state_out.get_container("foo")
assert container_out.service_status == {
# layer foo service is now active
"foo": ServiceStatus.ACTIVE,
# layer bar services is unchanged
"bar": ServiceStatus.ACTIVE,
"baz": ServiceStatus.INACTIVE,
}


def test_worker_raises_if_service_restart_fails_for_too_long(tmp_path):
# GIVEN a worker with some services
MyCharm.layer = Layer(
{
"services": {
"foo": {
"summary": "foos all the things",
"description": "bar",
"startup": "enabled",
"command": "ls -la",
},
}
}
)
ctx = Context(
MyCharm,
meta={
"name": "foo",
"requires": {"cluster": {"interface": "cluster"}},
"containers": {"foo": {"type": "oci-image"}},
},
config={"options": {"role-all": {"type": "boolean", "default": True}}},
)
cfg = tmp_path / "cfg.yaml"
cfg.write_text("some: yaml")
container = Container(
"foo",
can_connect=True,
mounts={"local": Mount(CONFIG_FILE, cfg)},
service_status={
"foo": ServiceStatus.INACTIVE,
},
)

# WHEN service restart fails
def raise_change_error(*args):
raise ops.pebble.ChangeError("something", MagicMock())

with patch("ops.model.Container.restart", new=raise_change_error):
# THEN the charm errors out
with pytest.raises(Exception):
# technically an ops.pebble.ChangeError but the context manager doesn't catch it for some reason

with ctx.manager(container.pebble_ready_event, State(containers=[container])) as mgr:
# so we don't have to wait for minutes:
mgr.charm.worker.SERVICE_START_RETRY_WAIT = tenacity.wait_none()
mgr.charm.worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(2)
Loading