diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index f844a868c2..2e147c5e35 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -34,6 +34,12 @@ {"metadata": {"name": DUMMY_POD_NAME}}, ] SUCCESS = "success" +FAILED = "Failed" +CREATED = "Created" +RUNNING = "Running" +RESTARTING = "Restarting" +SUCCEEDED = "Succeeded" +INVALID = "invalid" def conditional_error_handler(*args, **kwargs): @@ -96,13 +102,6 @@ def get(self, timeout): return MockResponse() -def get_job_response(*args, **kwargs): - if kwargs.get("namespace") == RUNTIME: - return generate_job_with_status(create_job(), constants.JOB_CONDITION_FAILED) - else: - return generate_job_with_status(create_job()) - - def generate_container() -> V1Container: return V1Container( name="pytorch", @@ -442,11 +441,6 @@ def __init__(self, kind) -> None: ] test_data_get_job = [ - ( - "valid flow with default namespace and default timeout", - {"name": TEST_NAME}, - SUCCESS, - ), ( "valid flow with all parameters set", { @@ -468,8 +462,8 @@ def __init__(self, kind) -> None: TypeError, ), ( - "invalid flow withincorrect value", - {"name": TEST_NAME, "job_kind": "FailJob"}, + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, ValueError, ), ( @@ -486,11 +480,6 @@ def __init__(self, kind) -> None: {"name": TEST_NAME, "namespace": TIMEOUT}, TimeoutError, ), - ( - "invalid flow with runtime error", - {"name": TEST_NAME, "namespace": RUNTIME}, - RuntimeError, - ), ] @@ -547,6 +536,313 @@ def __init__(self, kind) -> None: ] +test_data_get_job_conditions = [ + ( + "valid flow with failed job condition", + {"name": TEST_NAME, "namespace": FAILED}, + generate_job_with_status( + create_job(), condition_type=constants.JOB_CONDITION_FAILED + ), + ), + ( + "valid flow with restarting job condition", + {"name": TEST_NAME, "namespace": RESTARTING}, + generate_job_with_status( + create_job(), condition_type=constants.JOB_CONDITION_RESTARTING + ), + ), + ( + "valid flow with running job condition", + {"name": TEST_NAME, "namespace": RUNNING}, + generate_job_with_status( + create_job(), condition_type=constants.JOB_CONDITION_RUNNING + ), + ), + ( + "valid flow with created job condition", + {"name": TEST_NAME, "namespace": CREATED}, + generate_job_with_status( + create_job(), condition_type=constants.JOB_CONDITION_CREATED + ), + ), + ( + "valid flow with all parameters set", + { + "name": TEST_NAME, + "namespace": TEST_NAME, + "job": create_job(), + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + generate_job_with_status(create_job()), + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"name": TEST_NAME, "job_kind": constants.TFJOB_KIND}, + RuntimeError, + ), + ( + "invalid flow incorrect parameter", + {"name": TEST_NAME, "test": "example"}, + TypeError, + ), + ( + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, + ValueError, + ), + ( + "runtime error case", + { + "name": TEST_NAME, + "namespace": "runtime", + "job_kind": constants.PYTORCHJOB_KIND, + }, + RuntimeError, + ), + ( + "invalid flow with timeout error", + {"name": TEST_NAME, "namespace": TIMEOUT}, + TimeoutError, + ), +] + + +test_data_is_job_created = [ + ( + "valid flow with all parameters set", + { + "name": TEST_NAME, + "namespace": CREATED, + "job": create_job(), + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + True, + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"name": TEST_NAME, "job_kind": constants.TFJOB_KIND}, + RuntimeError, + ), + ( + "invalid flow incorrect parameter", + {"name": TEST_NAME, "test": "example"}, + TypeError, + ), + ( + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, + ValueError, + ), + ( + "runtime error case", + { + "name": TEST_NAME, + "namespace": "runtime", + "job_kind": constants.PYTORCHJOB_KIND, + }, + RuntimeError, + ), + ( + "invalid flow with timeout error", + {"name": TEST_NAME, "namespace": TIMEOUT}, + TimeoutError, + ), +] + + +test_data_is_job_running = [ + ( + "valid flow with job that is running", + {"name": TEST_NAME, "namespace": RUNNING}, + True, + ), + ( + "valid flow with all parameters set", + { + "name": TEST_NAME, + "namespace": RUNNING, + "job": create_job(), + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + True, + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"name": TEST_NAME, "job_kind": constants.TFJOB_KIND}, + RuntimeError, + ), + ( + "invalid flow incorrect parameter", + {"name": TEST_NAME, "test": "example"}, + TypeError, + ), + ( + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, + ValueError, + ), + ( + "runtime error case", + { + "name": TEST_NAME, + "namespace": "runtime", + "job_kind": constants.PYTORCHJOB_KIND, + }, + RuntimeError, + ), + ( + "invalid flow with timeout error", + {"name": TEST_NAME, "namespace": TIMEOUT}, + TimeoutError, + ), +] + + +test_data_is_job_restarting = [ + ( + "valid flow with job that is restarting", + {"name": TEST_NAME, "namespace": RESTARTING}, + True, + ), + ( + "valid flow with all parameters set", + { + "name": TEST_NAME, + "namespace": RESTARTING, + "job": create_job(), + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + True, + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"name": TEST_NAME, "job_kind": constants.TFJOB_KIND}, + RuntimeError, + ), + ( + "invalid flow incorrect parameter", + {"name": TEST_NAME, "test": "example"}, + TypeError, + ), + ( + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, + ValueError, + ), + ( + "runtime error case", + { + "name": TEST_NAME, + "namespace": "runtime", + "job_kind": constants.PYTORCHJOB_KIND, + }, + RuntimeError, + ), + ( + "invalid flow with timeout error", + {"name": TEST_NAME, "namespace": TIMEOUT}, + TimeoutError, + ), +] + + +test_data_is_job_failed = [ + ( + "valid flow with job that is failed", + {"name": TEST_NAME, "namespace": FAILED}, + True, + ), + ( + "valid flow with all parameters set", + { + "name": TEST_NAME, + "namespace": FAILED, + "job": create_job(), + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + True, + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"name": TEST_NAME, "job_kind": constants.TFJOB_KIND}, + RuntimeError, + ), + ( + "invalid flow incorrect parameter", + {"name": TEST_NAME, "test": "example"}, + TypeError, + ), + ( + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, + ValueError, + ), + ( + "runtime error case", + { + "name": TEST_NAME, + "namespace": "runtime", + "job_kind": constants.PYTORCHJOB_KIND, + }, + RuntimeError, + ), + ( + "invalid flow with timeout error", + {"name": TEST_NAME, "namespace": TIMEOUT}, + TimeoutError, + ), +] + + +test_data_is_job_succeded = [ + ( + "valid flow with all parameters set", + { + "name": TEST_NAME, + "namespace": SUCCEEDED, + "job": create_job(), + "job_kind": constants.PYTORCHJOB_KIND, + "timeout": 120, + }, + True, + ), + ( + "invalid flow with default namespace and a Job that doesn't exist", + {"name": TEST_NAME, "job_kind": constants.TFJOB_KIND}, + RuntimeError, + ), + ( + "invalid flow incorrect parameter", + {"name": TEST_NAME, "test": "example"}, + TypeError, + ), + ( + "invalid flow with incorrect value", + {"name": TEST_NAME, "job_kind": INVALID}, + ValueError, + ), + ( + "runtime error case", + { + "name": TEST_NAME, + "namespace": "runtime", + "job_kind": constants.PYTORCHJOB_KIND, + }, + RuntimeError, + ), + ( + "invalid flow with timeout error", + {"name": TEST_NAME, "namespace": TIMEOUT}, + TimeoutError, + ), +] + + @pytest.fixture def training_client(): with patch( @@ -693,3 +989,123 @@ def test_get_job(training_client, test_name, kwargs, expected_output): assert type(e) is expected_output print("test execution complete") + + +@pytest.mark.parametrize( + "test_name,kwargs,expected_output", test_data_get_job_conditions +) +def test_get_job_conditions(training_client, test_name, kwargs, expected_output): + """ + test get_job_conditions function of training client + """ + print("Executing test: ", test_name) + + try: + training_client.get_job_conditions(**kwargs) + if kwargs.get("namespace") is TEST_NAME: + assert expected_output == generate_job_with_status(create_job()) + else: + assert expected_output == generate_job_with_status( + create_job(), condition_type=kwargs.get("namespace") + ) + except Exception as e: + assert type(e) is expected_output + + print("test execution complete") + + +@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_is_job_created) +def test_is_job_created(training_client, test_name, kwargs, expected_output): + """ + test is_job_created function of training client + """ + print("Executing test: ", test_name) + + try: + training_client.is_job_created(**kwargs) + if kwargs.get("namespace") is not (CREATED or RUNTIME or TIMEOUT): + assert expected_output is False + else: + assert expected_output is True + except Exception as e: + assert type(e) is expected_output + + print("test execution complete") + + +@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_is_job_running) +def test_is_job_running(training_client, test_name, kwargs, expected_output): + """ + test is_job_running function of training client + """ + print("Executing test: ", test_name) + + try: + training_client.is_job_running(**kwargs) + if kwargs.get("namespace") is not (RUNNING or RUNTIME or TIMEOUT): + assert expected_output is False + else: + assert expected_output is True + except Exception as e: + assert type(e) is expected_output + + print("test execution complete") + + +@pytest.mark.parametrize( + "test_name,kwargs,expected_output", test_data_is_job_restarting +) +def test_is_job_restarting(training_client, test_name, kwargs, expected_output): + """ + test is_is_job_restarting function of training client + """ + print("Executing test: ", test_name) + + try: + training_client.is_job_restarting(**kwargs) + if kwargs.get("namespace") is not (RESTARTING or RUNTIME or TIMEOUT): + assert expected_output is False + else: + assert expected_output is True + except Exception as e: + assert type(e) is expected_output + + print("test execution complete") + + +@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_is_job_failed) +def test_is_job_failed(training_client, test_name, kwargs, expected_output): + """ + test is_is_job_failed function of training client + """ + print("Executing test: ", test_name) + + try: + training_client.is_job_failed(**kwargs) + if kwargs.get("namespace") is not (FAILED or RUNTIME or TIMEOUT): + assert expected_output is False + else: + assert expected_output is True + except Exception as e: + assert type(e) is expected_output + + print("test execution complete") + + +@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_is_job_succeded) +def test_is_job_succeeded(training_client, test_name, kwargs, expected_output): + """ + test is_job_succeeded function of training client + """ + print("Executing test: ", test_name) + + try: + training_client.is_job_succeeded(**kwargs) + if kwargs.get("namespace") is not (SUCCEEDED or RUNTIME or TIMEOUT): + assert expected_output is False + else: + assert expected_output is True + except Exception as e: + assert type(e) is expected_output + + print("test execution complete")