diff --git a/src/consts.py b/src/consts.py index 755cdf5..b575edc 100644 --- a/src/consts.py +++ b/src/consts.py @@ -9,4 +9,5 @@ CLUSTER_SECRET_LABEL = "clustersecret.io" -BLACK_LISTED_ANNOTATIONS = ["kopf.zalando.org", "kubectl.kubernetes.io"] \ No newline at end of file +BLACK_LISTED_ANNOTATIONS = ["kopf.zalando.org", "kubectl.kubernetes.io"] +BLACK_LISTED_LABELS = ["app.kubernetes.io"] diff --git a/src/kubernetes_utils.py b/src/kubernetes_utils.py index f862bb0..a0f3cd6 100644 --- a/src/kubernetes_utils.py +++ b/src/kubernetes_utils.py @@ -1,6 +1,6 @@ import logging from datetime import datetime -from typing import Optional, Dict, Any, List, Mapping +from typing import Optional, Dict, Any, List, Mapping, Tuple, Iterator import re import kopf @@ -8,7 +8,7 @@ from os_utils import get_replace_existing, get_version from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLACK_LISTED_ANNOTATIONS, \ - CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL + BLACK_LISTED_LABELS, CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL def patch_clustersecret_status( @@ -286,27 +286,36 @@ def create_secret_metadata( Kubernetes metadata object with ClusterSecret annotations. """ - _labels = { + def filter_dict( + prefixes: List[str], + base: Dict[str, str], + source: Optional[Mapping[str, str]] = None + ) -> Iterator[Tuple[str, str]]: + """ Remove potential useless / dangerous annotations and labels""" + for item in base.items(): + yield item + if source is not None: + for item in source.items(): + key, _ = item + if not any(key.startswith(prefix) for prefix in prefixes): + yield item + + base_labels = { CLUSTER_SECRET_LABEL: 'true' } - _labels.update(labels or {}) - - _annotations = { + base_annotations = { CREATE_BY_ANNOTATION: CREATE_BY_AUTHOR, VERSION_ANNOTATION: get_version(), LAST_SYNC_ANNOTATION: datetime.now().isoformat(), } - _annotations.update(annotations or {}) - - # Remove potential useless / dangerous annotations - _annotations = {key: value for key, value in _annotations.items() if - not any(key.startswith(prefix) for prefix in BLACK_LISTED_ANNOTATIONS)} + _annotations = filter_dict(BLACK_LISTED_ANNOTATIONS, base_annotations, annotations) + _labels = filter_dict(BLACK_LISTED_LABELS, base_labels, labels) return V1ObjectMeta( name=name, namespace=namespace, - annotations=_annotations, - labels=_labels, + annotations=dict(_annotations), + labels=dict(_labels), ) diff --git a/src/tests/test_kubernetes_utils.py b/src/tests/test_kubernetes_utils.py index 0e3fcf5..5809b70 100644 --- a/src/tests/test_kubernetes_utils.py +++ b/src/tests/test_kubernetes_utils.py @@ -1,15 +1,30 @@ +import datetime import logging import unittest +from typing import Tuple, Callable, Union from unittest.mock import Mock from kubernetes.client import V1ObjectMeta -from kubernetes_utils import get_ns_list + +from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLACK_LISTED_ANNOTATIONS, \ + BLACK_LISTED_LABELS, CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL +from kubernetes_utils import get_ns_list, create_secret_metadata +from os_utils import get_version USER_NAMESPACE_COUNT = 10 initial_namespaces = ['default', 'kube-node-lease', 'kube-public', 'kube-system'] user_namespaces = [f'example-{index}' for index in range(USER_NAMESPACE_COUNT)] +def is_iso_format(date: str) -> bool: + """check whether a date string parses correctly as ISO 8601 format""" + try: + datetime.datetime.fromisoformat(date) + return True + except (TypeError, ValueError): + return False + + class TestClusterSecret(unittest.TestCase): def test_get_ns_list(self): @@ -69,3 +84,87 @@ def test_get_ns_list(self): )), msg=case['name'], ) + + def test_create_secret_metadata(self) -> None: + + expected_base_label_key = CLUSTER_SECRET_LABEL + expected_base_label_value = 'true' + + # key, value pairs, where the value can be a string or validation function + expected_base_annotations: list[Tuple[str, Union[str, Callable[[str], bool]]]] = [ + (CREATE_BY_ANNOTATION, CREATE_BY_AUTHOR), + (VERSION_ANNOTATION, get_version()), + # Since LAST_SYNC_ANNOTATION is a date string which isn't easily validated by string equality + # have the function 'is_iso_format' validate the value of this annotation. + (LAST_SYNC_ANNOTATION, is_iso_format) + ] + + attributes_black_lists = dict( + labels=BLACK_LISTED_LABELS, + annotations=BLACK_LISTED_ANNOTATIONS, + ) + + test_cases: list[Tuple[dict[str, str], dict[str, str]]] = [ + # Annotations, Labels + ( + {}, + {} + ), + ( + {}, + {"modifiedAt": "1692462880", + "name": "prometheus-operator", + "owner": "helm", + "status": "superseded", + "version": "1"} + ), + ( + {"managed-by": "argocd.argoproj.io"}, + {"argocd.argoproj.io/secret-type": "repository"} + ), + ( + {"argocd.argoproj.io/compare-options": "ServerSideDiff=true", + "argocd.argoproj.io/sync-wave": "4"}, + {"app.kubernetes.io/instance": "cluster-secret"} + ) + ] + + for annotations, labels in test_cases: + + subject: V1ObjectMeta = create_secret_metadata( + name='test_secret', + namespace='test_namespace', + annotations=annotations, + labels=labels + ) + + self.assertIsInstance(obj=subject, cls=V1ObjectMeta, msg='returned value has correct type') + + for attribute, black_list in attributes_black_lists.items(): + attribute_object = subject.__getattribute__(attribute) + self.assertIsNotNone(obj=attribute_object, msg=f'attribute "{attribute}" is not None') + + for key in attribute_object.keys(): + self.assertIsInstance(obj=key, cls=str, msg=f'the {attribute} key is a string') + for black_listed_label_prefix in black_list: + self.assertFalse( + expr=key.startswith(black_listed_label_prefix), + msg=f'{attribute} key does not match black listed prefix' + ) + + # This tells mypy that those attributes are not None + assert subject.labels is not None + assert subject.annotations is not None + + self.assertEqual( + first=subject.labels[expected_base_label_key], + second=expected_base_label_value, + msg='expected base label is present' + ) + for key, value_expectation in expected_base_annotations: + validator = value_expectation if callable(value_expectation) else value_expectation.__eq__ + value = subject.annotations[key] + self.assertTrue( + expr=validator(value), + msg=f'expected base annotation with key {key} is present and its value {value} is as expected' + )