From 029a9d053c61ee564e47db7fd24a2557da9a161f Mon Sep 17 00:00:00 2001 From: Richard Liu <39319471+richardsliu@users.noreply.github.com> Date: Sat, 22 Sep 2018 12:37:56 -0700 Subject: [PATCH] Recreate k8s client on auth failure (#215) * Recreate k8s client on auth failure * Recreate client on each call * Fix syntax * Remove debug msg * Try to fix unit tests --- py/kubeflow/testing/argo_client.py | 28 +++++++++++++++---------- py/kubeflow/testing/run_e2e_workflow.py | 4 +--- py/kubeflow/testing/util.py | 2 ++ py/kubeflow/tests/argo_client_test.py | 15 +++++++------ 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/py/kubeflow/testing/argo_client.py b/py/kubeflow/testing/argo_client.py index 51d8bf6f63d..fd84a9f1746 100644 --- a/py/kubeflow/testing/argo_client.py +++ b/py/kubeflow/testing/argo_client.py @@ -30,9 +30,12 @@ def log_status(workflow): def handle_retriable_exception(exception): - if isinstance(exception, rest.ApiException) and exception.status == 401: - # See https://github.com/kubeflow/testing/issues/207. - # If we get an unauthorized response, just reload the kubeconfig and retry. + if (isinstance(exception, rest.ApiException) and + (exception.status == 401 or exception.status == 403)): + # Due to https://github.com/kubernetes-client/python-base/issues/59, + # we need to reload the kube config (which refreshes the GCP token). + # TODO(richardsliu): Remove this workaround when the k8s client issue + # is resolved. util.load_kube_config() return True return not isinstance(exception, util.TimeoutError) @@ -49,27 +52,31 @@ def handle_retriable_exception(exception): @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, stop_max_delay=5*60*1000, retry_on_exception=handle_retriable_exception) -def get_namespaced_custom_object_with_retries(client, namespace, name): +def get_namespaced_custom_object_with_retries(namespace, name): """Call get_namespaced_customer_object API with retries. Args: - client: K8s api client. namespace: namespace for the workflow. name: name of the workflow. """ + # Due to https://github.com/kubernetes-client/python-base/issues/59, + # we need to recreate the API client since it may contain stale auth + # tokens. + # TODO(richardsliu): Remove this workaround when the k8s client issue + # is resolved. + client = k8s_client.ApiClient() crd_api = k8s_client.CustomObjectsApi(client) return crd_api.get_namespaced_custom_object( GROUP, VERSION, namespace, PLURAL, name) -def wait_for_workflows(client, namespace, names, +def wait_for_workflows(namespace, names, timeout=datetime.timedelta(minutes=30), polling_interval=datetime.timedelta(seconds=30), status_callback=None): """Wait for multiple workflows to finish. Args: - client: K8s api client. namespace: namespace for the workflow. names: Names of the workflows to wait for. timeout: How long to wait for the workflow. @@ -88,7 +95,7 @@ def wait_for_workflows(client, namespace, names, all_results = [] for n in names: - results = get_namespaced_custom_object_with_retries(client, namespace, n) + results = get_namespaced_custom_object_with_retries(namespace, n) all_results.append(results) if status_callback: status_callback(results) @@ -111,14 +118,13 @@ def wait_for_workflows(client, namespace, names, return [] -def wait_for_workflow(client, namespace, name, +def wait_for_workflow(namespace, name, timeout=datetime.timedelta(minutes=30), polling_interval=datetime.timedelta(seconds=30), status_callback=None): """Wait for the specified workflow to finish. Args: - client: K8s api client. namespace: namespace for the workflow. name: Name of the workflow timeout: How long to wait for the workflow. @@ -130,6 +136,6 @@ def wait_for_workflow(client, namespace, name, Raises: TimeoutError: If timeout waiting for the job to finish. """ - results = wait_for_workflows(client, namespace, [name], + results = wait_for_workflows(namespace, [name], timeout, polling_interval, status_callback) return results[0] diff --git a/py/kubeflow/testing/run_e2e_workflow.py b/py/kubeflow/testing/run_e2e_workflow.py index ebda6dddacc..a45ca294d99 100644 --- a/py/kubeflow/testing/run_e2e_workflow.py +++ b/py/kubeflow/testing/run_e2e_workflow.py @@ -42,7 +42,6 @@ import datetime import fnmatch import logging -from kubernetes import client as k8s_client import os import tempfile from kubeflow.testing import argo_client @@ -132,7 +131,6 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran util.configure_kubectl(args.project, args.zone, args.cluster) util.load_kube_config() - api_client = k8s_client.ApiClient() workflow_names = [] ui_urls = {} @@ -237,7 +235,7 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran success = True workflow_phase = {} try: - results = argo_client.wait_for_workflows(api_client, get_namespace(args), + results = argo_client.wait_for_workflows(get_namespace(args), workflow_names, timeout=datetime.timedelta(minutes=60), status_callback=argo_client.log_status) diff --git a/py/kubeflow/testing/util.py b/py/kubeflow/testing/util.py index db6fd2258c1..b69722444d5 100755 --- a/py/kubeflow/testing/util.py +++ b/py/kubeflow/testing/util.py @@ -497,6 +497,8 @@ def _save_kube_config(config_map): kubernetes_configuration.Configuration.set_default(config) else: loader.load_and_set(client_configuration) # pylint: disable=too-many-function-args + # Dump the loaded config. + run(["kubectl", "config", "view"]) def maybe_activate_service_account(): diff --git a/py/kubeflow/tests/argo_client_test.py b/py/kubeflow/tests/argo_client_test.py index 2dd7280eb65..63a0a35ecb1 100644 --- a/py/kubeflow/tests/argo_client_test.py +++ b/py/kubeflow/tests/argo_client_test.py @@ -3,7 +3,6 @@ import unittest from kubeflow.testing import argo_client -from kubernetes import client as k8s_client import mock import os import yaml @@ -13,14 +12,14 @@ def setUp(self): self.test_dir = os.path.join(os.path.dirname(__file__), "test-data") def test_wait_for_workflow(self): - api_client = mock.MagicMock(spec=k8s_client.ApiClient) + with mock.patch("kubeflow.testing.argo_client.k8s_client.ApiClient") as mock_client: + with open(os.path.join(self.test_dir, "successful_workflow.yaml")) as hf: + response = yaml.load(hf) - with open(os.path.join(self.test_dir, "successful_workflow.yaml")) as hf: - response = yaml.load(hf) - - api_client.call_api.return_value = response - result = argo_client.wait_for_workflow(api_client, "some-namespace", "some-set") - self.assertIsNotNone(result) + client = mock_client.return_value + client.call_api.return_value = response + result = argo_client.wait_for_workflow("some-namespace", "some-set") + self.assertIsNotNone(result) if __name__ == "__main__": unittest.main()