Skip to content

Commit

Permalink
Remove deprecated features from KubernetesHook (#31402)
Browse files Browse the repository at this point in the history
* Remove deprecated features from KubernetesHook
  • Loading branch information
dstandish committed May 19, 2023
1 parent ac00547 commit a1f5a54
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 196 deletions.
58 changes: 8 additions & 50 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
from __future__ import annotations

import contextlib
import json
import tempfile
import warnings
from typing import TYPE_CHECKING, Any, Generator

from asgiref.sync import sync_to_async
Expand All @@ -30,7 +28,7 @@
from urllib3.exceptions import HTTPError

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowNotFoundException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.models import Connection
Expand Down Expand Up @@ -101,12 +99,6 @@ def get_connection_form_widgets() -> dict[str, Any]:
"cluster_context": StringField(lazy_gettext("Cluster context"), widget=BS3TextFieldWidget()),
"disable_verify_ssl": BooleanField(lazy_gettext("Disable SSL")),
"disable_tcp_keepalive": BooleanField(lazy_gettext("Disable TCP keepalive")),
"xcom_sidecar_container_image": StringField(
lazy_gettext("XCom sidecar image"), widget=BS3TextFieldWidget()
),
"xcom_sidecar_container_resources": StringField(
lazy_gettext("XCom sidecar resources (JSON format)"), widget=BS3TextFieldWidget()
),
}

@staticmethod
Expand Down Expand Up @@ -303,7 +295,7 @@ def create_custom_object(
response = api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace or self.get_namespace(),
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
plural=plural,
body=body_dict,
)
Expand All @@ -327,7 +319,7 @@ def get_custom_object(
response = api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace or self.get_namespace(),
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
plural=plural,
name=name,
)
Expand All @@ -349,52 +341,18 @@ def delete_custom_object(
return api.delete_namespaced_custom_object(
group=group,
version=version,
namespace=namespace or self.get_namespace(),
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
plural=plural,
name=name,
**kwargs,
)

def get_namespace(self) -> str | None:
"""
Returns the namespace defined in the connection or 'default'.
TODO: in provider version 6.0, return None when namespace not defined in connection
"""
namespace = self._get_namespace()
if self.conn_id and not namespace:
warnings.warn(
"Airflow connection defined but namespace is not set; returning 'default'. In "
"cncf.kubernetes provider version 6.0 we will return None when namespace is "
"not defined in the connection so that it's clear whether user intends 'default' or "
"whether namespace is unset (which is required in order to apply precedence logic in "
"KubernetesPodOperator).",
AirflowProviderDeprecationWarning,
)
return "default"
return namespace

def _get_namespace(self) -> str | None:
"""
Returns the namespace that defined in the connection
TODO: in provider version 6.0, get rid of this method and make it the behavior of get_namespace.
"""
"""Returns the namespace that defined in the connection"""
if self.conn_id:
return self._get_field("namespace")
return None

def get_xcom_sidecar_container_image(self):
"""Returns the xcom sidecar image that defined in the connection"""
return self._get_field("xcom_sidecar_container_image")

def get_xcom_sidecar_container_resources(self):
"""Returns the xcom sidecar resources that defined in the connection"""
field = self._get_field("xcom_sidecar_container_resources")
if not field:
return None
return json.loads(field)

def get_pod_log_stream(
self,
pod_name: str,
Expand All @@ -415,7 +373,7 @@ def get_pod_log_stream(
self.core_v1_client.read_namespaced_pod_log,
name=pod_name,
container=container,
namespace=namespace or self._get_namespace() or self.DEFAULT_NAMESPACE,
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
),
)

Expand All @@ -436,7 +394,7 @@ def get_pod_logs(
name=pod_name,
container=container,
_preload_content=False,
namespace=namespace or self._get_namespace() or self.DEFAULT_NAMESPACE,
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
)

def get_pod(self, name: str, namespace: str) -> V1Pod:
Expand All @@ -460,7 +418,7 @@ def get_namespaced_pod_list(
:param watch: Watch for changes to the described resources and return them as a stream
"""
return self.core_v1_client.list_namespaced_pod(
namespace=namespace or self._get_namespace() or self.DEFAULT_NAMESPACE,
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
watch=watch,
label_selector=label_selector,
_preload_content=False,
Expand Down
36 changes: 3 additions & 33 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import json
import logging
import os
import re
import secrets
import string
import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Any, Sequence
Expand All @@ -34,7 +32,7 @@
from urllib3.exceptions import HTTPError

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
Expand Down Expand Up @@ -473,14 +471,6 @@ def hook(self) -> PodOperatorHookProtocol:
)
return hook

def get_hook(self):
warnings.warn(
"get_hook is deprecated. Please use hook instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
return self.hook

@cached_property
def client(self) -> CoreV1Api:
return self.hook.core_v1_client
Expand Down Expand Up @@ -589,20 +579,6 @@ def execute_async(self, context: Context):
)
self.invoke_defer_method()

def convert_config_file_to_dict(self):
"""Converts passed config_file to dict format."""
warnings.warn(
"This method is deprecated and will be removed in a future version.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
config_file = self.config_file if self.config_file else os.environ.get(KUBE_CONFIG_ENV_VAR)
if config_file:
with open(config_file) as f:
self._config_dict = yaml.safe_load(f)
else:
self._config_dict = None

def invoke_defer_method(self):
"""Method to easily redefine triggers which are being used in child classes."""
trigger_start_time = utcnow()
Expand Down Expand Up @@ -851,9 +827,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
pod.metadata.name = _add_pod_suffix(pod_name=pod.metadata.name)

if not pod.metadata.namespace:
# todo: replace with call to `hook.get_namespace` in 6.0, when it doesn't default to `default`.
# if namespace not actually defined in hook, we want to check k8s if in cluster
hook_namespace = self.hook._get_namespace()
hook_namespace = self.hook.get_namespace()
pod_namespace = self.namespace or hook_namespace or self._incluster_namespace or "default"
pod.metadata.namespace = pod_namespace

Expand All @@ -862,11 +836,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(
pod,
sidecar_container_image=self.hook.get_xcom_sidecar_container_image(),
sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(),
)
pod = xcom_sidecar.add_xcom_sidecar(pod)

labels = self._get_ti_pod_labels(context)
self.log.info("Building pod %s with labels: %s", pod.metadata.name, labels)
Expand Down
14 changes: 2 additions & 12 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,8 @@ def is_in_cluster(self) -> bool:
def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Read pod object from kubernetes API."""

def _get_namespace(self) -> str | None:
"""
Returns the namespace that defined in the connection
TODO: in provider version 6.0, get rid of this method and make it the behavior of get_namespace.
"""

def get_xcom_sidecar_container_image(self) -> str | None:
"""Returns the xcom sidecar image that defined in the connection"""

def get_xcom_sidecar_container_resources(self) -> str | None:
"""Returns the xcom sidecar resources that defined in the connection"""
def get_namespace(self) -> str | None:
"""Returns the namespace that defined in the connection"""


def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
Expand Down
2 changes: 0 additions & 2 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,6 @@ def test_pod_template_file(
# todo: This isn't really a system test
await_xcom_sidecar_container_start_mock.return_value = None
hook_mock.return_value.is_in_cluster = False
hook_mock.return_value.get_xcom_sidecar_container_image.return_value = None
hook_mock.return_value.get_xcom_sidecar_container_resources.return_value = None
hook_mock.return_value.get_connection.return_value = Connection(conn_id="kubernetes_default")
extract_xcom_mock.return_value = "{}"
path = sys.path[0] + "/tests/kubernetes/pod.yaml"
Expand Down
12 changes: 1 addition & 11 deletions tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"

XCOM_IMAGE = "XCOM_IMAGE"


@pytest.fixture(autouse=True)
def mock_create_pod() -> mock.Mock:
Expand Down Expand Up @@ -127,12 +125,6 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None):
dr = dag_maker.create_dagrun()
(ti,) = dr.task_instances

mock_hook.return_value.get_xcom_sidecar_container_image.return_value = XCOM_IMAGE
mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = {
"requests": {"cpu": "1m", "memory": "10Mi"},
"limits": {"cpu": "1m", "memory": "50Mi"},
}

dag.get_task("my_task_id").execute(context=ti.get_template_context(session=session))

mock_hook.assert_called_once_with(
Expand All @@ -142,8 +134,6 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None):
config_file="/tmp/fake_file",
)
assert mock_create_pod.call_count == 1
assert mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1
assert mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1

containers = mock_create_pod.call_args[1]["pod"].spec.containers

Expand All @@ -162,7 +152,7 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None):
assert decoded_input == {"args": ("arg1", "arg2"), "kwargs": {"kwarg1": "kwarg1"}}

# Second container is xcom image
assert containers[1].image == XCOM_IMAGE
assert containers[1].image == "alpine"
assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom"


Expand Down
48 changes: 1 addition & 47 deletions tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,6 @@ def setup_class(cls) -> None:
("disable_verify_ssl_empty", {"disable_verify_ssl": ""}),
("disable_tcp_keepalive", {"disable_tcp_keepalive": True}),
("disable_tcp_keepalive_empty", {"disable_tcp_keepalive": ""}),
("sidecar_container_image", {"xcom_sidecar_container_image": "private.repo.com/alpine:3.16"}),
("sidecar_container_image_empty", {"xcom_sidecar_container_image": ""}),
(
"sidecar_container_resources",
{
"xcom_sidecar_container_resources": json.dumps(
{
"requests": {"cpu": "1m", "memory": "10Mi"},
"limits": {"cpu": "1m", "memory": "50Mi"},
}
),
},
),
("sidecar_container_resources_empty", {"xcom_sidecar_container_resources": ""}),
]:
db.merge_conn(Connection(conn_type="kubernetes", conn_id=conn_id, extra=json.dumps(extra)))

Expand Down Expand Up @@ -348,7 +334,7 @@ def test_default_kube_config_connection(self, mock_kube_config_merger, mock_kube
(
pytest.param(None, None, id="no-conn-id"),
pytest.param("with_namespace", "mock_namespace", id="conn-with-namespace"),
pytest.param("default_kube_config", "default", id="conn-without-namespace"),
pytest.param("default_kube_config", None, id="conn-without-namespace"),
),
)
def test_get_namespace(self, conn_id, expected):
Expand All @@ -361,38 +347,6 @@ def test_get_namespace(self, conn_id, expected):
"and rename _get_namespace to get_namespace."
)

@pytest.mark.parametrize(
"conn_id, expected",
(
pytest.param("sidecar_container_image", "private.repo.com/alpine:3.16", id="sidecar-with-image"),
pytest.param("sidecar_container_image_empty", None, id="sidecar-without-image"),
),
)
def test_get_xcom_sidecar_container_image(self, conn_id, expected):
hook = KubernetesHook(conn_id=conn_id)
assert hook.get_xcom_sidecar_container_image() == expected

@pytest.mark.parametrize(
"conn_id, expected",
(
pytest.param(
"sidecar_container_resources",
{
"requests": {"cpu": "1m", "memory": "10Mi"},
"limits": {
"cpu": "1m",
"memory": "50Mi",
},
},
id="sidecar-with-resources",
),
pytest.param("sidecar_container_resources_empty", None, id="sidecar-without-resources"),
),
)
def test_get_xcom_sidecar_container_resources(self, conn_id, expected):
hook = KubernetesHook(conn_id=conn_id)
assert hook.get_xcom_sidecar_container_resources() == expected

@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
def test_client_types(self, mock_kube_config_merger, mock_kube_config_loader):
Expand Down
Loading

0 comments on commit a1f5a54

Please sign in to comment.