-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.py
818 lines (693 loc) · 33.6 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
"""Generic worker for a distributed charm deployment."""
import logging
import re
import socket
import subprocess
import urllib.request
from enum import Enum
from functools import partial
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, TypedDict, Union
from urllib.error import HTTPError
import ops
import tenacity
import yaml
from ops import MaintenanceStatus, StatusBase
from ops.model import ActiveStatus, BlockedStatus, WaitingStatus
from ops.pebble import Check, Layer, PathError, Plan, ProtocolError
from cosl import JujuTopology
from cosl.coordinated_workers.interface import ClusterRequirer, TLSData
from cosl.helpers import check_libs_installed
check_libs_installed(
"charms.loki_k8s.v1.loki_push_api",
"charms.observability_libs.v0.kubernetes_compute_resources_patch",
)
from charms.loki_k8s.v1.loki_push_api import _PebbleLogClient # type: ignore
from charms.observability_libs.v0.kubernetes_compute_resources_patch import (
KubernetesComputeResourcesPatch,
adjust_resource_requirements,
)
from lightkube.models.core_v1 import ResourceRequirements
BASE_DIR = "/worker"
CONFIG_FILE = "/etc/worker/config.yaml"
CERT_FILE = "/etc/worker/server.cert"
S3_TLS_CA_CHAIN_FILE = "/etc/worker/s3_ca.crt"
KEY_FILE = "/etc/worker/private.key"
CLIENT_CA_FILE = "/etc/worker/ca.cert"
ROOT_CA_CERT = "/usr/local/share/ca-certificates/ca.crt"
ROOT_CA_CERT_PATH = Path(ROOT_CA_CERT)
logger = logging.getLogger(__name__)
def _validate_container_name(
container_name: Optional[str],
resources_requests: Optional[Callable[["Worker"], Dict[str, str]]],
):
"""Raise `ValueError` if `resources_requests` is not None and `container_name` is None."""
if resources_requests is not None and container_name is None:
raise ValueError(
"Cannot have a None value for container_name while resources_requests is provided."
)
_EndpointMapping = TypedDict("_EndpointMapping", {"cluster": str}, total=True)
"""Mapping of the relation endpoint names that the charms uses, as defined in metadata.yaml."""
_ResourceLimitOptionsMapping = TypedDict(
"_ResourceLimitOptionsMapping",
{
"cpu_limit": str,
"memory_limit": str,
},
)
"""Mapping of the resources limit option names that the charms use, as defined in config.yaml."""
class WorkerError(Exception):
"""Base class for exceptions raised by this module."""
class ServiceEndpointStatus(Enum):
"""Status of the worker service managed by pebble."""
starting = "starting"
up = "up"
down = "down"
class Worker(ops.Object):
"""Charming worker."""
# configuration for the service start retry logic in .restart().
# this will determine how long we wait for pebble to try to start the worker process
SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(60 * 15)
SERVICE_START_RETRY_WAIT = tenacity.wait_fixed(60)
SERVICE_START_RETRY_IF = tenacity.retry_if_exception_type(ops.pebble.ChangeError)
# configuration for the service status retry logic after a restart has occurred.
# this will determine how long we wait for the worker process to report "ready" after it
# has been successfully restarted
SERVICE_STATUS_UP_RETRY_STOP = tenacity.stop_after_delay(60 * 15)
SERVICE_STATUS_UP_RETRY_WAIT = tenacity.wait_fixed(10)
SERVICE_STATUS_UP_RETRY_IF = tenacity.retry_if_not_result(bool)
_endpoints: _EndpointMapping = {
"cluster": "cluster",
}
def __init__(
self,
charm: ops.CharmBase,
name: str,
pebble_layer: Callable[["Worker"], Layer],
endpoints: _EndpointMapping,
readiness_check_endpoint: Optional[Union[str, Callable[["Worker"], str]]] = None,
resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None,
resources_requests: Optional[Callable[["Worker"], Dict[str, str]]] = None,
container_name: Optional[str] = None,
):
"""Constructor for a Worker object.
Args:
charm: The worker charm object.
name: The name of the workload container.
pebble_layer: The pebble layer of the workload.
endpoints: Endpoint names for coordinator relations, as defined in metadata.yaml.
readiness_check_endpoint: URL to probe with a pebble check to determine
whether the worker node is ready. Passing None will effectively disable it.
resources_limit_options: A dictionary containing resources limit option names. The dictionary should include
"cpu_limit" and "memory_limit" keys with values as option names, as defined in the config.yaml.
If no dictionary is provided, the default option names "cpu_limit" and "memory_limit" would be used.
resources_requests: A function generating the resources "requests" portion to apply when patching a container using
KubernetesComputeResourcesPatch. The "limits" portion of the patch gets populated by setting
their respective config options in config.yaml.
container_name: The container for which to apply the resources requests & limits.
Required if `resources_requests` is provided.
Raises:
ValueError:
If `resources_requests` is not None and `container_name` is None, a ValueError is raised.
"""
super().__init__(charm, key="worker")
self._charm = charm
self._name = name
self._pebble_layer = partial(
pebble_layer, self
) # do not call this directly. use self.pebble_layer instead
self.topology = JujuTopology.from_charm(self._charm)
self._container = self._charm.unit.get_container(name)
self._endpoints = endpoints
_validate_container_name(container_name, resources_requests)
# turn str to Callable[[Worker], str]
self._readiness_check_endpoint: Optional[Callable[[Worker], str]]
if isinstance(readiness_check_endpoint, str):
self._readiness_check_endpoint = lambda _: readiness_check_endpoint
else:
self._readiness_check_endpoint = readiness_check_endpoint
self._resources_requests_getter = (
partial(resources_requests, self) if resources_requests is not None else None
)
self._container_name = container_name
self._resources_limit_options = resources_limit_options or {}
self.cluster = ClusterRequirer(
charm=self._charm,
endpoint=self._endpoints["cluster"],
)
self._log_forwarder = ManualLogForwarder(
charm=self._charm,
loki_endpoints=self.cluster.get_loki_endpoints(),
refresh_events=[
self.cluster.on.config_received,
self.cluster.on.created,
self.cluster.on.removed,
],
)
# Resources patch
self.resources_patch = (
KubernetesComputeResourcesPatch(
self._charm,
self._container_name, # type: ignore
resource_reqs_func=self._adjust_resource_requirements,
)
if self._resources_requests_getter
else None
)
# holistic update logic, aka common exit hook
self._reconcile()
# Event listeners
self.framework.observe(self._charm.on.collect_unit_status, self._on_collect_status)
self.framework.observe(self.cluster.on.removed, self._log_forwarder.disable_logging)
self.framework.observe(self._charm.on[self._name].pebble_ready, self._on_pebble_ready)
self.framework.observe(
self._charm.on[name].pebble_check_failed, self._on_pebble_check_failed
)
self.framework.observe(
self._charm.on[name].pebble_check_recovered, self._on_pebble_check_recovered
)
# Event handlers
def _on_pebble_ready(self, _: ops.PebbleReadyEvent):
self._charm.unit.set_workload_version(self.running_version() or "")
def _on_pebble_check_failed(self, event: ops.PebbleCheckFailedEvent):
if event.info.name == "ready":
logger.warning("Pebble `ready` check started to fail: " "worker node is down.")
# collect-status will detect that we're not ready and set waiting status.
def _on_pebble_check_recovered(self, event: ops.PebbleCheckFailedEvent):
if event.info.name == "ready":
logger.info("Pebble `ready` check is now passing: " "worker node is up.")
# collect-status will detect that we're ready and set active status.
@property
def _worker_config(self):
"""The configuration that this worker should run with, as received from the coordinator.
Charms that wish to modify their config before it's written to disk by the Worker
should subclass the worker, override this method, and use it to manipulate the
config that's presented to the Worker.
"""
return self.cluster.get_worker_config()
@property
def pebble_layer(self) -> Optional[Layer]:
"""Attempt to fetch a pebble layer from the charm.
If the charm raises, report the exception and return None.
"""
try:
return self._pebble_layer()
except Exception:
logger.exception("exception while attempting to get pebble layer from charm")
return None
@property
def status(self) -> ServiceEndpointStatus:
"""Determine the status of the service's endpoint."""
if not self._container.can_connect():
logger.debug("Container cannot connect. Skipping status check.")
return ServiceEndpointStatus.down
if not self._running_worker_config():
logger.debug("Config file not on disk. Skipping status check.")
return ServiceEndpointStatus.down
if not (layer := self.pebble_layer):
return ServiceEndpointStatus.down
# we really don't want this code to raise errors, so we blanket catch all.
try:
services = self._container.get_services(*layer.services.keys())
running_status = {name: svc.is_running() for name, svc in services.items()}
if not all(running_status.values()):
if any(running_status.values()):
starting_services = tuple(
name for name, running in running_status.items() if not running
)
logger.info(
f"Some services which should be running are not: {starting_services}."
)
return ServiceEndpointStatus.starting
logger.info("All services are down.")
return ServiceEndpointStatus.down
except Exception:
logger.exception(
"Unexpected error while getting worker status. "
"This could mean that the worker is still starting."
)
return ServiceEndpointStatus.down
return self.check_readiness()
def check_readiness(self) -> ServiceEndpointStatus:
"""If the user has configured a readiness check endpoint, GET it and check the workload status."""
check_endpoint = self._readiness_check_endpoint
if not check_endpoint:
raise WorkerError(
"cannot check readiness without a readiness_check_endpoint configured. "
"Pass one to Worker on __init__."
)
try:
with urllib.request.urlopen(check_endpoint(self)) as response:
html: bytes = response.read()
# ready response should simply be a string:
# "ready"
raw_out = html.decode("utf-8").strip()
if raw_out == "ready":
return ServiceEndpointStatus.up
# depending on the workload, we get something like:
# Some services are not Running:
# Starting: 1
# Running: 16
# (tempo)
# Ingester not ready: waiting for 15s after being ready
# (mimir)
# anything that isn't 'ready' but also is a 2xx response will be interpreted as:
# we're not ready yet, but we're working on it.
logger.debug(f"GET {check_endpoint} returned: {raw_out!r}.")
return ServiceEndpointStatus.starting
except HTTPError:
logger.debug("Error getting readiness endpoint: server not up (yet)")
except Exception:
logger.exception("Unexpected exception getting readiness endpoint")
return ServiceEndpointStatus.down
def _on_collect_status(self, e: ops.CollectStatusEvent):
# these are the basic failure modes. if any of these conditions are not met, the worker
# is still starting or not yet configured. The user needs to wait or take some action.
statuses: List[StatusBase] = []
if self.resources_patch and self.resources_patch.get_status().name != "active":
statuses.append(self.resources_patch.get_status())
if not self._container.can_connect():
statuses.append(WaitingStatus(f"Waiting for `{self._name}` container"))
if not self.model.get_relation(self._endpoints["cluster"]):
statuses.append(BlockedStatus("Missing relation to a coordinator charm"))
elif not self.cluster.relation:
statuses.append(WaitingStatus("Cluster relation not ready"))
if not self._worker_config or not self._running_worker_config():
statuses.append(WaitingStatus("Waiting for coordinator to publish a config"))
if not self.roles:
statuses.append(
BlockedStatus("Invalid or no roles assigned: please configure some valid roles")
)
# if none of the conditions above applies, the worker should in principle be either up or starting
if not statuses:
try:
status = self.status
if status == ServiceEndpointStatus.starting:
statuses.append(WaitingStatus("Starting..."))
elif status == ServiceEndpointStatus.down:
logger.error(
"The worker service appears to be down and we don't know why. "
"Please check the pebble services' status and their logs."
)
statuses.append(BlockedStatus("node down (see logs)"))
except WorkerError:
# this means that the node is not down for any obvious reason (no container,...)
# but we still can't know for sure that the node is up, because we don't have
# a readiness endpoint configured.
logger.debug(
"Unable to determine worker readiness: no endpoint given. "
"This means we're going to report active, but the node might still "
"be coming up and not ready to serve."
)
# if still there are no statuses, we report we're all ready
if not statuses:
statuses.append(
ActiveStatus(
"(all roles) ready."
if ",".join(self.roles) == "all"
else f"{','.join(self.roles)} ready."
)
)
# report all applicable statuses to the model
for status in statuses:
e.add_status(status)
# Utility functions
@property
def roles(self) -> List[str]:
"""Return a list of the roles this worker should take on.
Expects that the charm defines a set of roles by config like:
"role-a": bool
"role-b": bool
"role-b": bool
If this is not the case, it will raise an error.
"""
config = self._charm.config
role_config_options = [option for option in config.keys() if option.startswith("role-")]
if not role_config_options:
raise WorkerError(
"The charm should define a set of `role-X` config "
"options for it to use the Worker."
)
active_roles: List[str] = [
role[5:] for role in role_config_options if config[role] is True
]
return active_roles
def _update_config(self) -> None:
"""Update the worker config and restart the workload if necessary."""
if not self._container.can_connect():
logger.debug("container cannot connect, skipping update_config.")
return
restart = any(
(
self._update_tls_certificates(),
self._update_worker_config(),
self._set_pebble_layer(),
)
)
# we restart in 2 situations:
# - we need to because our config has changed
# - some services are not running
success = True
if restart:
logger.debug("Config changed. Restarting worker services...")
success = self.restart()
# this can happen if s3 wasn't ready (server gave error) when we processed an earlier event
# causing the worker service to die on startup (exited quickly with code...)
# so we try to restart it now.
# TODO: would be nice if we could be notified of when s3 starts working, so we don't have to
# wait for an update-status and can listen to that instead.
else:
services_not_up = [
svc.name for svc in self._container.get_services().values() if not svc.is_running()
]
if services_not_up:
logger.debug(
f"Not all services are running: {services_not_up}. Restarting worker services..."
)
success = self.restart()
if not success:
# this means that we have managed to start the process without pebble errors,
# but somehow the status is still not "up" after 15m
# we are going to set blocked status, but we can also log it here
logger.warning("failed to (re)start the worker services")
def _set_pebble_layer(self) -> bool:
"""Set Pebble layer.
Returns: True if Pebble layer was added, otherwise False.
"""
if not self._container.can_connect():
return False
if not self.roles:
return False
current_plan = self._container.get_plan()
if not (layer := self.pebble_layer):
return False
self._add_readiness_check(layer)
def diff(layer: Layer, plan: Plan):
layer_dct = layer.to_dict()
plan_dct = plan.to_dict()
for key in ["checks", "services"]:
if layer_dct.get(key) != plan_dct.get(key):
return True
return False
if diff(layer, current_plan):
logger.debug("Adding new layer to pebble...")
self._container.add_layer(self._name, layer, combine=True)
return True
return False
def _add_readiness_check(self, new_layer: Layer):
"""Add readiness check to a pebble layer."""
if not self._readiness_check_endpoint:
# skip
return
new_layer.checks["ready"] = Check(
"ready",
{
"override": "replace",
# threshold gets added automatically by pebble
"threshold": 3,
"http": {"url": self._readiness_check_endpoint(self)},
},
)
def _reconcile(self):
"""Run all logic that is independent of what event we're processing."""
# There could be a race between the resource patch and pebble operations
# i.e., charm code proceeds beyond a can_connect guard, and then lightkube patches the statefulset
# and the workload is no longer available
# `resources_patch` might be `None` when no resources requests or limits are requested by the charm.
if self.resources_patch and not self.resources_patch.is_ready():
logger.debug("Resource patch not ready yet. Skipping reconciliation step.")
return
self._update_cluster_relation()
self._update_config()
def _update_cluster_relation(self) -> None:
"""Publish all the worker information to relation data."""
self.cluster.publish_unit_address(socket.getfqdn())
if self._charm.unit.is_leader() and self.roles:
logger.info(f"publishing roles: {self.roles}")
self.cluster.publish_app_roles(self.roles)
def _running_worker_config(self) -> Optional[Dict[str, Any]]:
"""Return the worker config as dict, or None if retrieval failed."""
if not self._container.can_connect():
logger.debug("Could not connect to the workload container")
return None
try:
raw_current = self._container.pull(CONFIG_FILE).read()
return yaml.safe_load(raw_current)
except (ProtocolError, PathError) as e:
logger.warning(
"Could not check the current worker configuration due to "
"a failure in retrieving the file: %s",
e,
)
return None
def _update_worker_config(self) -> bool:
"""Set worker config for the workload.
Returns: True if config has changed, otherwise False.
Raises: BlockedStatusError exception if PebbleError, ProtocolError, PathError exceptions
are raised by container.remove_path
"""
if not self._container.can_connect():
logger.warning("cannot update worker config: container cannot connect.")
return False
if len(self.roles) == 0:
logger.warning("cannot update worker config: role missing or misconfigured.")
return False
worker_config = self._worker_config
if not worker_config:
logger.warning("cannot update worker config: coordinator hasn't published one yet.")
return False
if self._running_worker_config() != worker_config:
config_as_yaml = yaml.safe_dump(worker_config)
self._container.push(CONFIG_FILE, config_as_yaml, make_dirs=True)
logger.info("Pushed new worker configuration")
return True
return False
def _sync_tls_files(self, tls_data: TLSData):
logger.debug("tls config in cluster. writing to container...")
if tls_data.privkey_secret_id:
private_key_secret = self.model.get_secret(id=tls_data.privkey_secret_id)
private_key = private_key_secret.get_content().get("private-key")
else:
private_key = None
new_contents: Optional[str]
any_changes = False
for new_contents, file in (
(tls_data.ca_cert, CLIENT_CA_FILE),
(tls_data.server_cert, CERT_FILE),
(private_key, KEY_FILE),
(tls_data.s3_tls_ca_chain, S3_TLS_CA_CHAIN_FILE),
(tls_data.ca_cert, ROOT_CA_CERT),
):
if not new_contents:
if self._container.exists(file):
any_changes = True
self._container.remove_path(file, recursive=True)
logger.debug(f"{file} deleted")
continue
logger.debug(f"{file} skipped")
continue
if self._container.exists(file):
current_contents = self._container.pull(file).read()
if current_contents == new_contents:
logger.debug(f"{file} unchanged")
continue
logger.debug(f"{file} updated")
any_changes = True
self._container.push(file, new_contents, make_dirs=True)
# Save the cacert in the charm container for charm traces
# we do it unconditionally to avoid the extra complexity.
if tls_data.ca_cert:
ROOT_CA_CERT_PATH.write_text(tls_data.ca_cert)
else:
ROOT_CA_CERT_PATH.unlink(missing_ok=True)
return any_changes
def _update_tls_certificates(self) -> bool:
"""Update the TLS certificates on disk according to their availability.
Return True if we need to restart the workload after this update.
"""
if not self._container.can_connect():
return False
tls_data = self.cluster.get_tls_data(allow_none=True)
if not tls_data:
return False
any_changes = self._sync_tls_files(tls_data)
if any_changes:
logger.debug("running update-ca-certificates")
self._container.exec(["update-ca-certificates", "--fresh"]).wait()
subprocess.run(["update-ca-certificates", "--fresh"])
return any_changes
def restart(self):
"""Restart the pebble service or start it if not already running, then wait for it to become ready.
Default timeout is 15 minutes. Configure it by setting this class attr:
>>> Worker.SERVICE_START_RETRY_STOP = tenacity.stop_after_delay(60 * 30) # 30 minutes
You can also configure SERVICE_START_RETRY_WAIT and SERVICE_START_RETRY_IF.
This method will raise an exception if it fails to start the service within a
specified timeframe. This will presumably bring the charm in error status, so
that juju will retry the last emitted hook until it finally succeeds.
The assumption is that the state we are in when this method is called is consistent.
The reason why we're failing to restart is dependent on some external factor (such as network,
the reachability of a remote API, or the readiness of an external service the workload depends on).
So letting juju retry the same hook will get us unstuck as soon as that contingency is resolved.
See https://discourse.charmhub.io/t/its-probably-ok-for-a-unit-to-go-into-error-state/13022
Raises:
ChangeError, after continuously failing to restart the service.
"""
if not self._container.exists(CONFIG_FILE):
logger.error("cannot restart worker: config file doesn't exist (yet).")
return
if not self.roles:
logger.debug("cannot restart worker: no roles have been configured.")
return
if not (layer := self.pebble_layer):
return
service_names = layer.services.keys()
try:
for attempt in tenacity.Retrying(
# this method may fail with ChangeError (exited quickly with code...)
retry=self.SERVICE_START_RETRY_IF,
# give this method some time to pass (by default 15 minutes)
stop=self.SERVICE_START_RETRY_STOP,
# wait 1 minute between tries
wait=self.SERVICE_START_RETRY_WAIT,
# if you don't succeed raise the last caught exception when you're done
reraise=True,
):
with attempt:
self._charm.unit.status = MaintenanceStatus(
f"restarting... (attempt #{attempt.retry_state.attempt_number})"
)
# restart all services that our layer is responsible for
self._container.restart(*service_names)
except ops.pebble.ChangeError:
logger.error(
"failed to (re)start worker jobs. This usually means that an external resource (such as s3) "
"that the software needs to start is not available."
)
raise
try:
for attempt in tenacity.Retrying(
# status may report .down
retry=self.SERVICE_STATUS_UP_RETRY_IF,
# give this method some time to pass (by default 15 minutes)
stop=self.SERVICE_STATUS_UP_RETRY_STOP,
# wait 10 seconds between tries
wait=self.SERVICE_STATUS_UP_RETRY_WAIT,
# if you don't succeed raise the last caught exception when you're done
reraise=True,
):
with attempt:
self._charm.unit.status = MaintenanceStatus(
f"waiting for worker process to report ready... (attempt #{attempt.retry_state.attempt_number})"
)
# set result to status; will retry unless it's up
attempt.retry_state.set_result(self.status is ServiceEndpointStatus.up)
except WorkerError:
# unable to check worker readiness: no readiness_check_endpoint configured.
# this status is already set on the unit so no need to log it
pass
except Exception:
logger.exception("unexpected error while attempting to determine worker status")
return False
def running_version(self) -> Optional[str]:
"""Get the running version from the worker process."""
if not self._container.can_connect():
return None
version_output, _ = self._container.exec([f"/bin/{self._name}", "-version"]).wait_output()
# Output looks like this:
# <WORKLOAD_NAME>, version 2.4.0 (branch: HEAD, revision 32137ee...)
if result := re.search(r"[Vv]ersion:?\s*(\S+)", version_output):
return result.group(1)
return None
def charm_tracing_config(self) -> Tuple[Optional[str], Optional[str]]:
"""Get the charm tracing configuration from the coordinator.
Usage:
assuming you are using charm_tracing >= v1.9:
>>> from ops import CharmBase
>>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm
>>> from lib.charms.tempo_k8s.v2.tracing import charm_tracing_config
>>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path")
>>> class MyCharm(CharmBase):
>>> def __init__(self, ...):
>>> self.worker = Worker(...)
>>> self.my_endpoint, self.cert_path = self.worker.charm_tracing_config()
"""
receivers = self.cluster.get_tracing_receivers()
if not receivers:
return None, None
endpoint = receivers.get("otlp_http")
if not endpoint:
return None, None
is_https = endpoint.startswith("https://")
tls_data = self.cluster.get_tls_data()
server_ca_cert = tls_data.server_cert if tls_data else None
if is_https:
if server_ca_cert is None:
raise RuntimeError(
"Cannot send traces to an https endpoint without a certificate."
)
elif not ROOT_CA_CERT_PATH.exists():
# if endpoint is https and we have a tls integration BUT we don't have the
# server_cert on disk yet (this could race with _update_tls_certificates):
# put it there and proceed
ROOT_CA_CERT_PATH.parent.mkdir(parents=True, exist_ok=True)
ROOT_CA_CERT_PATH.write_text(server_ca_cert)
return endpoint, ROOT_CA_CERT
else:
return endpoint, None
def _adjust_resource_requirements(self) -> ResourceRequirements:
"""A method that gets called by `KubernetesComputeResourcesPatch` to adjust the resources requests and limits to patch."""
cpu_limit_key = self._resources_limit_options.get("cpu_limit", "cpu_limit")
memory_limit_key = self._resources_limit_options.get("memory_limit", "memory_limit")
limits = {
"cpu": self._charm.model.config.get(cpu_limit_key),
"memory": self._charm.model.config.get(memory_limit_key),
}
return adjust_resource_requirements(
limits, self._resources_requests_getter(), adhere_to_requests=True # type: ignore
)
class ManualLogForwarder(ops.Object):
"""Forward the standard outputs of all workloads to explictly-provided Loki endpoints."""
def __init__(
self,
charm: ops.CharmBase,
*,
loki_endpoints: Optional[Dict[str, str]],
refresh_events: Optional[List[ops.BoundEvent]] = None,
):
_PebbleLogClient.check_juju_version()
super().__init__(charm, "worker-log-forwarder")
self._charm = charm
self._loki_endpoints = loki_endpoints
self._topology: JujuTopology = JujuTopology.from_charm(charm)
if not refresh_events:
return
for event in refresh_events:
self.framework.observe(event, self.update_logging)
def update_logging(self, _: Optional[ops.EventBase] = None):
"""Update the log forwarding to match the active Loki endpoints."""
loki_endpoints = self._loki_endpoints
if not loki_endpoints:
logger.warning("No Loki endpoints available")
loki_endpoints = {}
for container in self._charm.unit.containers.values():
if container.can_connect():
_PebbleLogClient.disable_inactive_endpoints( # type:ignore
container=container,
active_endpoints=loki_endpoints,
topology=self._topology,
)
_PebbleLogClient.enable_endpoints( # type:ignore
container=container, active_endpoints=loki_endpoints, topology=self._topology
)
def disable_logging(self, _: Optional[ops.EventBase] = None):
"""Disable all log forwarding."""
# This is currently necessary because, after a relation broken, the charm can still see
# the Loki endpoints in the relation data.
for container in self._charm.unit.containers.values():
if container.can_connect():
_PebbleLogClient.disable_inactive_endpoints( # type:ignore
container=container, active_endpoints={}, topology=self._topology
)