From cbd4ae7bb32164816278fc0e313effc14b5d2161 Mon Sep 17 00:00:00 2001 From: Wei-Cheng Lai Date: Thu, 3 Oct 2024 11:29:03 -0400 Subject: [PATCH] [SDK] test: add unit test for list_jobs method of the training_client (#2267) Signed-off-by: wei-chenglai --- .../training/api/training_client_test.py | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index 2e147c5e35..ea8c495032 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -70,6 +70,30 @@ def get_namespaced_custom_object_response(*args, **kwargs): return mock_thread +def list_namespaced_custom_object_response(*args, **kwargs): + if args[2] == TIMEOUT: + raise multiprocessing.TimeoutError() + elif args[2] == RUNTIME: + raise RuntimeError() + elif args[2] == "empty-namespace": + mock_response = {"items": []} + elif args[2] == "multi-jobs": + mock_response = { + "items": [ + serialize_k8s_object(generate_job_with_status(create_job())), + serialize_k8s_object(generate_job_with_status(create_job())), + ] + } + else: + mock_response = { + "items": [serialize_k8s_object(generate_job_with_status(create_job()))] + } + + mock_thread = Mock() + mock_thread.get.return_value = mock_response + return mock_thread + + def list_namespaced_pod_response(*args, **kwargs): class MockResponse: def get(self, timeout): @@ -482,6 +506,74 @@ def __init__(self, kind) -> None: ), ] +test_data_list_jobs = [ + ( + "valid flow with default namespace and default timeout", + {}, + SUCCESS, + [generate_job_with_status(create_job())], + ), + ( + "valid flow with all parameters set", + { + "namespace": TEST_NAME, + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + SUCCESS, + [generate_job_with_status(create_job())], + ), + ( + "valid flow with empty job list", + { + "namespace": "empty-namespace", + }, + SUCCESS, + [], + ), + ( + "valid flow with multiple jobs", + { + "namespace": "multi-jobs", + }, + SUCCESS, + [ + generate_job_with_status(create_job()), + generate_job_with_status(create_job()), + ], + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"job_kind": constants.TFJOB_KIND}, + RuntimeError, + None, + ), + ( + "invalid flow with incorrect parameter", + {"test": "example"}, + TypeError, + None, + ), + ( + "invalid flow with incorrect job_kind value", + {"job_kind": "FailJob"}, + ValueError, + None, + ), + ( + "runtime error case", + {"namespace": RUNTIME}, + RuntimeError, + None, + ), + ( + "timeout error case", + {"namespace": TIMEOUT}, + TimeoutError, + None, + ), +] + test_data_delete_job = [ ( @@ -854,6 +946,9 @@ def training_client(): get_namespaced_custom_object=Mock( side_effect=get_namespaced_custom_object_response ), + list_namespaced_custom_object=Mock( + side_effect=list_namespaced_custom_object_response + ), ), ), patch( "kubernetes.client.CoreV1Api", @@ -1109,3 +1204,26 @@ def test_is_job_succeeded(training_client, test_name, kwargs, expected_output): assert type(e) is expected_output print("test execution complete") + + +@pytest.mark.parametrize( + "test_name,kwargs,expected_status,expected_response", test_data_list_jobs +) +def test_list_jobs( + training_client, test_name, kwargs, expected_status, expected_response +): + """ + test list_jobs function of training client + """ + print("Executing test: ", test_name) + try: + resp = training_client.list_jobs(**kwargs) + assert expected_status == SUCCESS + assert isinstance(resp, list) + assert len(resp) == len(expected_response) + for actual_job, expected_job in zip(resp, expected_response): + assert actual_job.to_dict() == expected_job.to_dict() + except Exception as e: + assert type(e) is expected_status + + print("test execution complete")