From e0a5820eae763532034d17de3637bd2db40f5145 Mon Sep 17 00:00:00 2001 From: Jin Chi He Date: Tue, 17 Dec 2019 18:05:03 +0800 Subject: [PATCH] Add more APIs for SDK --- sdk/python/README.md | 6 +- sdk/python/docs/PyTorchJobClient.md | 139 +++++++++- .../examples/kubeflow-pytorchjob-sdk.ipynb | 260 +++++++++++++++--- .../pytorchjob/api/py_torch_job_client.py | 165 ++++++++++- .../pytorchjob/constants/constants.py | 11 +- sdk/python/setup.py | 16 +- sdk/python/test/test_e2e.py | 31 +-- 7 files changed, 539 insertions(+), 89 deletions(-) diff --git a/sdk/python/README.md b/sdk/python/README.md index 7f657b04b..4fb305a3a 100644 --- a/sdk/python/README.md +++ b/sdk/python/README.md @@ -39,7 +39,11 @@ Class | Method | Description [PyTorchJobClient](docs/PyTorchJobClient.md) | [get](docs/PyTorchJobClient.md#get) | Get the specified PyTorchJob or all PyTorchJob in the namespace | [PyTorchJobClient](docs/PyTorchJobClient.md) | [patch](docs/PyTorchJobClient.md#patch) | Patch the specified PyTorchJob| [PyTorchJobClient](docs/PyTorchJobClient.md) | [delete](docs/PyTorchJobClient.md#delete) | Delete the specified PyTorchJob | - +[PyTorchJobClient](docs/PyTorchJobClient.md) | [wait_for_job](docs/PyTorchJobClient.md#wait_for_job) | Wait for the specified job to finish | +[PyTorchJobClient](docs/PyTorchJobClient.md) | [wait_for_condition](docs/PyTorchJobClient.md#wait_for_condition) | Waits until any of the specified conditions occur | +[PyTorchJobClient](docs/PyTorchJobClient.md) | [get_job_status](docs/PyTorchJobClient.md#get_job_status) | Get the PyTorchJob status| +[PyTorchJobClient](docs/PyTorchJobClient.md) | [is_job_running](docs/PyTorchJobClient.md#is_job_running) | Check if the PyTorchJob running | +[PyTorchJobClient](docs/PyTorchJobClient.md) | [is_job_succeeded](docs/PyTorchJobClient.md#is_job_succeeded) | Check if the PyTorchJob Succeeded | ## Documentation For Models diff --git a/sdk/python/docs/PyTorchJobClient.md b/sdk/python/docs/PyTorchJobClient.md index bfc1149d3..1d56119de 100644 --- a/sdk/python/docs/PyTorchJobClient.md +++ b/sdk/python/docs/PyTorchJobClient.md @@ -20,7 +20,11 @@ PyTorchJobClient| [create](#create) | Create PyTorchJob| PyTorchJobClient | [get](#get) | Get the specified PyTorchJob or all PyTorchJob in the namespace | PyTorchJobClient | [patch](#patch) | Patch the specified PyTorchJob| PyTorchJobClient | [delete](#delete) | Delete the specified PyTorchJob | - +PyTorchJobClient | [wait_for_job](#wait_for_job) | Wait for the specified job to finish | +PyTorchJobClient | [wait_for_condition](#wait_for_condition) | Waits until any of the specified conditions occur | +PyTorchJobClient | [get_job_status](#get_job_status) | Get the PyTorchJob status| +PyTorchJobClient | [is_job_running](#is_job_running) | Check if the PyTorchJob running | +PyTorchJobClient | [is_job_succeeded](#is_job_succeeded) | Check if the PyTorchJob Succeeded | ## create > create(pytorchjob, namespace=None) @@ -173,3 +177,136 @@ namespace | str | The pytorchjob's namespace. Defaults to current or default nam ### Return type object + +## wait_for_job +> wait_for_job(name, +> namespace=None, +> timeout_seconds=600, +> polling_interval=30, +> status_callback=None): + +Wait for the specified job to finish. + +### Example + +```python +from kubeflow.pytorchjob import PyTorchJobClient + +pytorchjob_client = PyTorchJobClient() +pytorchjob_client.wait_for_job('mnist', namespace='kubeflow') +``` + +### Parameters +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +name | str | The PyTorchJob name.| | +namespace | str | The pytorchjob's namespace. Defaults to current or default namespace. | Optional| +timeout_seconds | int | How long to wait for the job, default wait for 600 seconds. | Optional| +polling_interval | int | How often to poll for the status of the job.| Optional| +status_callback | str | Callable. If supplied this callable is invoked after we poll the job. Callable takes a single argument which is the pytorchjob.| Optional| + +### Return type +object + + +## wait_for_condition +> wait_for_condition(name, +> expected_condition, +> namespace=None, +> timeout_seconds=600, +> polling_interval=30, +> status_callback=None): + + +Waits until any of the specified conditions occur. + +### Example + +```python +from kubeflow.pytorchjob import PyTorchJobClient + +pytorchjob_client = PyTorchJobClient() +pytorchjob_client.wait_for_condition('mnist', expected_condition=["Succeeded", "Failed"], namespace='kubeflow') +``` + +### Parameters +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +name | str | The PyTorchJob name.| | +expected_condition |List |A list of conditions. Function waits until any of the supplied conditions is reached.| | +namespace | str | The pytorchjob's namespace. Defaults to current or default namespace. | Optional| +timeout_seconds | int | How long to wait for the job, default wait for 600 seconds. | Optional| +polling_interval | int | How often to poll for the status of the job.| Optional| +status_callback | str | Callable. If supplied this callable is invoked after we poll the job. Callable takes a single argument which is the pytorchjob.| Optional| + +### Return type +object + +## get_job_status +> get_job_status(name, namespace=None) + +Returns PyTorchJob status, such as Running, Failed or Succeeded. + +### Example + +```python +from kubeflow.pytorchjob import PyTorchJobClient + +pytorchjob_client = PyTorchJobClient() +pytorchjob_client.get_job_status('mnist', namespace='kubeflow') +``` + +### Parameters +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +name | str | The PyTorchJob name. | | +namespace | str | The pytorchjob's namespace. Defaults to current or default namespace.| Optional | + +### Return type +Str + +## is_job_running +> is_job_running(name, namespace=None) + +Returns True if the PyTorchJob running; false otherwise. + +### Example + +```python +from kubeflow.pytorchjob import PyTorchJobClient + +pytorchjob_client = PyTorchJobClient() +pytorchjob_client.is_job_running('mnist', namespace='kubeflow') +``` + +### Parameters +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +name | str | The PyTorchJob name.| | +namespace | str | The pytorchjob's namespace. Defaults to current or default namespace.| Optional | + +### Return type +Bool + +## is_job_succeeded +> is_job_succeeded(name, namespace=None) + +Returns True if the PyTorchJob succeeded; false otherwise. + +### Example + +```python +from kubeflow.pytorchjob import PyTorchJobClient + +pytorchjob_client = PyTorchJobClient() +pytorchjob_client.is_job_succeeded('mnist', namespace='kubeflow') +``` + +### Parameters +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +name | str | The PyTorchJob name.| | +namespace | str | The pytorchjob's namespace. Defaults to current or default namespace.| Optional | + +### Return type +Bool diff --git a/sdk/python/examples/kubeflow-pytorchjob-sdk.ipynb b/sdk/python/examples/kubeflow-pytorchjob-sdk.ipynb index 5764a10c6..10f71ebf3 100644 --- a/sdk/python/examples/kubeflow-pytorchjob-sdk.ipynb +++ b/sdk/python/examples/kubeflow-pytorchjob-sdk.ipynb @@ -13,12 +13,12 @@ "source": [ "This is a sample for Kubeflow PyTorchJob SDK `kubeflow-pytorchjob`.\n", "\n", - "The notebook shows how to use Kubeflow PyTorchJob SDK to create, get, update and delete PyTorchJob." + "The notebook shows how to use Kubeflow PyTorchJob SDK to create, get, wait, check and delete PyTorchJob." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -45,7 +45,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -68,7 +68,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -88,6 +88,15 @@ " )\n", ")\n", "\n", + "worker = V1ReplicaSpec(\n", + " replicas=1,\n", + " restart_policy=\"OnFailure\",\n", + " template=V1PodTemplateSpec(\n", + " spec=V1PodSpec(\n", + " containers=[container]\n", + " )\n", + " )\n", + ")\n", "\n", "pytorchjob = V1PyTorchJob(\n", " api_version=\"kubeflow.org/v1\",\n", @@ -95,9 +104,10 @@ " metadata=V1ObjectMeta(name=\"pytorch-dist-mnist-gloo\",namespace=namespace),\n", " spec=V1PyTorchJobSpec(\n", " clean_pod_policy=\"None\",\n", - " pytorch_replica_specs={\"Master\": master}\n", + " pytorch_replica_specs={\"Master\": master,\n", + " \"Worker\": worker}\n", " )\n", - ")\n" + ")" ] }, { @@ -109,9 +119,39 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'apiVersion': 'kubeflow.org/v1',\n", + " 'kind': 'PyTorchJob',\n", + " 'metadata': {'creationTimestamp': '2019-12-18T02:22:07Z',\n", + " 'generation': 1,\n", + " 'name': 'pytorch-dist-mnist-gloo',\n", + " 'namespace': 'default',\n", + " 'resourceVersion': '13983940',\n", + " 'selfLink': '/apis/kubeflow.org/v1/namespaces/default/pytorchjobs/pytorch-dist-mnist-gloo',\n", + " 'uid': '3055f681-213d-11ea-9e34-00000a1001ee'},\n", + " 'spec': {'cleanPodPolicy': 'None',\n", + " 'pytorchReplicaSpecs': {'Master': {'replicas': 1,\n", + " 'restartPolicy': 'OnFailure',\n", + " 'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],\n", + " 'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',\n", + " 'name': 'pytorch'}]}}},\n", + " 'Worker': {'replicas': 1,\n", + " 'restartPolicy': 'OnFailure',\n", + " 'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],\n", + " 'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',\n", + " 'name': 'pytorch'}]}}}}}}" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "pytorch_client = PyTorchJobClient()\n", "pytorch_client.create(pytorchjob)" @@ -126,9 +166,47 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'apiVersion': 'kubeflow.org/v1',\n", + " 'kind': 'PyTorchJob',\n", + " 'metadata': {'creationTimestamp': '2019-12-18T02:22:07Z',\n", + " 'generation': 1,\n", + " 'name': 'pytorch-dist-mnist-gloo',\n", + " 'namespace': 'default',\n", + " 'resourceVersion': '13983953',\n", + " 'selfLink': '/apis/kubeflow.org/v1/namespaces/default/pytorchjobs/pytorch-dist-mnist-gloo',\n", + " 'uid': '3055f681-213d-11ea-9e34-00000a1001ee'},\n", + " 'spec': {'cleanPodPolicy': 'None',\n", + " 'pytorchReplicaSpecs': {'Master': {'replicas': 1,\n", + " 'restartPolicy': 'OnFailure',\n", + " 'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],\n", + " 'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',\n", + " 'name': 'pytorch'}]}}},\n", + " 'Worker': {'replicas': 1,\n", + " 'restartPolicy': 'OnFailure',\n", + " 'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],\n", + " 'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',\n", + " 'name': 'pytorch'}]}}}}},\n", + " 'status': {'conditions': [{'lastTransitionTime': '2019-12-18T02:22:07Z',\n", + " 'lastUpdateTime': '2019-12-18T02:22:07Z',\n", + " 'message': 'PyTorchJob pytorch-dist-mnist-gloo is created.',\n", + " 'reason': 'PyTorchJobCreated',\n", + " 'status': 'True',\n", + " 'type': 'Created'}],\n", + " 'replicaStatuses': {'Master': {}, 'Worker': {}},\n", + " 'startTime': '2019-12-18T02:22:08Z'}}" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "pytorch_client.get('pytorch-dist-mnist-gloo')" ] @@ -137,37 +215,121 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Update the created PyTorchJob" + "### Get the PyTorchJob status, check if the PyTorchJob has been started." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "'Created'" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "worker = V1ReplicaSpec(\n", - " replicas=1,\n", - " restart_policy=\"OnFailure\",\n", - " template=V1PodTemplateSpec(\n", - " spec=V1PodSpec(\n", - " containers=[container]\n", - " )\n", - " )\n", - ")\n", - "\n", - "pytorchjob = V1PyTorchJob(\n", - " api_version=\"kubeflow.org/v1\",\n", - " kind=\"PyTorchJob\",\n", - " metadata=V1ObjectMeta(name=\"pytorch-dist-mnist-gloo\",namespace=namespace),\n", - " spec=V1PyTorchJobSpec(\n", - " clean_pod_policy=\"None\",\n", - " pytorch_replica_specs={\"Master\": master,\n", - " \"Worker\": worker}\n", - " )\n", - ")\n", - "\n", - "pytorch_client.patch('pytorch-dist-mnist-gloo', pytorchjob)" + "pytorch_client.get_job_status('pytorch-dist-mnist-gloo', namespace=namespace)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Wait for the specified PyTorchJob to finish" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'apiVersion': 'kubeflow.org/v1',\n", + " 'kind': 'PyTorchJob',\n", + " 'metadata': {'creationTimestamp': '2019-12-18T02:22:07Z',\n", + " 'generation': 1,\n", + " 'name': 'pytorch-dist-mnist-gloo',\n", + " 'namespace': 'default',\n", + " 'resourceVersion': '13985828',\n", + " 'selfLink': '/apis/kubeflow.org/v1/namespaces/default/pytorchjobs/pytorch-dist-mnist-gloo',\n", + " 'uid': '3055f681-213d-11ea-9e34-00000a1001ee'},\n", + " 'spec': {'cleanPodPolicy': 'None',\n", + " 'pytorchReplicaSpecs': {'Master': {'replicas': 1,\n", + " 'restartPolicy': 'OnFailure',\n", + " 'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],\n", + " 'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',\n", + " 'name': 'pytorch'}]}}},\n", + " 'Worker': {'replicas': 1,\n", + " 'restartPolicy': 'OnFailure',\n", + " 'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],\n", + " 'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',\n", + " 'name': 'pytorch'}]}}}}},\n", + " 'status': {'completionTime': '2019-12-18T02:27:50Z',\n", + " 'conditions': [{'lastTransitionTime': '2019-12-18T02:22:07Z',\n", + " 'lastUpdateTime': '2019-12-18T02:22:07Z',\n", + " 'message': 'PyTorchJob pytorch-dist-mnist-gloo is created.',\n", + " 'reason': 'PyTorchJobCreated',\n", + " 'status': 'True',\n", + " 'type': 'Created'},\n", + " {'lastTransitionTime': '2019-12-18T02:22:18Z',\n", + " 'lastUpdateTime': '2019-12-18T02:22:18Z',\n", + " 'message': 'PyTorchJob pytorch-dist-mnist-gloo is running.',\n", + " 'reason': 'PyTorchJobRunning',\n", + " 'status': 'False',\n", + " 'type': 'Running'},\n", + " {'lastTransitionTime': '2019-12-18T02:27:50Z',\n", + " 'lastUpdateTime': '2019-12-18T02:27:50Z',\n", + " 'message': 'PyTorchJob pytorch-dist-mnist-gloo is successfully completed.',\n", + " 'reason': 'PyTorchJobSucceeded',\n", + " 'status': 'True',\n", + " 'type': 'Succeeded'}],\n", + " 'replicaStatuses': {'Master': {'succeeded': 1}, 'Worker': {'succeeded': 1}},\n", + " 'startTime': '2019-12-18T02:22:08Z'}}" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pytorch_client.wait_for_job('pytorch-dist-mnist-gloo', namespace=namespace)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Check if the PyTorchJob succeeded" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pytorch_client.is_job_succeeded('pytorch-dist-mnist-gloo', namespace=namespace)" ] }, { @@ -179,9 +341,27 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'kind': 'Status',\n", + " 'apiVersion': 'v1',\n", + " 'metadata': {},\n", + " 'status': 'Success',\n", + " 'details': {'name': 'pytorch-dist-mnist-gloo',\n", + " 'group': 'kubeflow.org',\n", + " 'kind': 'pytorchjobs',\n", + " 'uid': '3055f681-213d-11ea-9e34-00000a1001ee'}}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "pytorch_client.delete('pytorch-dist-mnist-gloo')" ] @@ -196,7 +376,7 @@ ], "metadata": { "kernelspec": { - "display_name": "My Python 3", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -210,9 +390,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.3" + "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 2 -} \ No newline at end of file +} diff --git a/sdk/python/kubeflow/pytorchjob/api/py_torch_job_client.py b/sdk/python/kubeflow/pytorchjob/api/py_torch_job_client.py index 07f385896..699931e92 100644 --- a/sdk/python/kubeflow/pytorchjob/api/py_torch_job_client.py +++ b/sdk/python/kubeflow/pytorchjob/api/py_torch_job_client.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import multiprocessing +import time + from kubernetes import client, config from kubeflow.pytorchjob.constants import constants @@ -77,28 +80,52 @@ def get(self, name=None, namespace=None): namespace = utils.get_default_target_namespace() if name: + thread = self.api_instance.get_namespaced_custom_object( + constants.PYTORCHJOB_GROUP, + constants.PYTORCHJOB_VERSION, + namespace, + constants.PYTORCHJOB_PLURAL, + name, + async_req=True) + + pytorchjob = None try: - return self.api_instance.get_namespaced_custom_object( - constants.PYTORCHJOB_GROUP, - constants.PYTORCHJOB_VERSION, - namespace, - constants.PYTORCHJOB_PLURAL, - name) + pytorchjob = thread.get(constants.APISERVER_TIMEOUT) + except multiprocessing.TimeoutError: + raise RuntimeError("Timeout trying to get PyTorchJob.") except client.rest.ApiException as e: raise RuntimeError( "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) + %s\n" % e) + except Exception as e: + raise RuntimeError( + "There was a problem to get PyTorchJob {0} in namespace {1}. Exception: \ + {2} ".format(name, namespace, e)) + else: + thread = self.api_instance.list_namespaced_custom_object( + constants.PYTORCHJOB_GROUP, + constants.PYTORCHJOB_VERSION, + namespace, + constants.PYTORCHJOB_PLURAL, + async_req=True) + + pytorchjob = None try: - return self.api_instance.list_namespaced_custom_object( - constants.PYTORCHJOB_GROUP, - constants.PYTORCHJOB_VERSION, - namespace, - constants.PYTORCHJOB_PLURAL) + pytorchjob = thread.get(constants.APISERVER_TIMEOUT) + except multiprocessing.TimeoutError: + raise RuntimeError("Timeout trying to get PyTorchJob.") except client.rest.ApiException as e: raise RuntimeError( - "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ + "Exception when calling CustomObjectsApi->list_namespaced_custom_object: \ %s\n" % e) + except Exception as e: + raise RuntimeError( + "There was a problem to List PyTorchJob in namespace {0}. \ + Exception: {1} ".format(namespace, e)) + + return pytorchjob + def patch(self, name, pytorchjob, namespace=None): """ @@ -149,3 +176,115 @@ def delete(self, name, namespace=None): raise RuntimeError( "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ %s\n" % e) + + + def wait_for_job(self, name, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None): + """Wait for the specified job to finish. + + Args: + name: Name of the PyTorchJob. + namespace: defaults to current or default namespace. + timeout_seconds: How long to wait for the job. + polling_interval: How often to poll for the status of the job. + status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + """ + if namespace is None: + namespace = utils.get_default_target_namespace() + + return self.wait_for_condition( + name, + ["Succeeded", "Failed"], + namespace=namespace, + timeout_seconds=timeout_seconds, + polling_interval=polling_interval, + status_callback=status_callback) + + + def wait_for_condition(self, name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None): + """Waits until any of the specified conditions occur. + + Args: + name: Name of the job. + expected_condition: A list of conditions. Function waits until any of the + supplied conditions is reached. + namespace: defaults to current or default namespace. + timeout_seconds: How long to wait for the job. + polling_interval: How often to poll for the status of the job. + status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + """ + + if namespace is None: + namespace = utils.get_default_target_namespace() + + for _ in range(round(timeout_seconds/polling_interval)): + + pytorchjob = None + pytorchjob = self.get(name, namespace=namespace) + + if pytorchjob: + if status_callback: + status_callback(pytorchjob) + + # If we poll the CRD quick enough status won't have been set yet. + conditions = pytorchjob.get("status", {}).get("conditions", []) + # Conditions might have a value of None in status. + conditions = conditions or [] + for c in conditions: + if c.get("type", "") in expected_condition: + return pytorchjob + + time.sleep(polling_interval) + + raise RuntimeError( + "Timeout waiting for PyTorchJob {0} in namespace {1} to enter one of the " + "conditions {2}.".format(name, namespace, expected_condition), pytorchjob) + + + def get_job_status(self, name, namespace=None): + """Returns PyTorchJob status, such as Running, Failed or Succeeded. + + Args: + name: The PyTorchJob name. + namespace: defaults to current or default namespace. + """ + if namespace is None: + namespace = utils.get_default_target_namespace() + + pytorchjob = self.get(name, namespace=namespace) + last_condition = pytorchjob.get("status", {}).get("conditions", [])[-1] + return last_condition.get("type", "") + + + def is_job_running(self, name, namespace=None): + """Returns true if the PyTorchJob running; false otherwise. + + Args: + name: The PyTorchJob name. + namespace: defaults to current or default namespace. + """ + pytorchjob_status = self.get_job_status(name, namespace=namespace) + return pytorchjob_status.lower() == "running" + + + def is_job_succeeded(self, name, namespace=None): + """Returns true if the PyTorchJob succeeded; false otherwise. + + Args: + name: The PyTorchJob name. + namespace: defaults to current or default namespace. + """ + pytorchjob_status = self.get_job_status(name, namespace=namespace) + return pytorchjob_status.lower() == "succeeded" diff --git a/sdk/python/kubeflow/pytorchjob/constants/constants.py b/sdk/python/kubeflow/pytorchjob/constants/constants.py index 7efa8dc27..887b3d958 100644 --- a/sdk/python/kubeflow/pytorchjob/constants/constants.py +++ b/sdk/python/kubeflow/pytorchjob/constants/constants.py @@ -15,9 +15,12 @@ import os # PyTorchJob K8S constants -PYTORCHJOB_GROUP = "kubeflow.org" -PYTORCHJOB_KIND = "PyTorchJob" -PYTORCHJOB_PLURAL = "pytorchjobs" -PYTORCHJOB_VERSION = "v1" +PYTORCHJOB_GROUP = 'kubeflow.org' +PYTORCHJOB_KIND = 'PyTorchJob' +PYTORCHJOB_PLURAL = 'pytorchjobs' +PYTORCHJOB_VERSION = os.environ.get('PYTORCHJOB_VERSION', 'v1') PYTORCH_LOGLEVEL = os.environ.get('PYTORCHJOB_LOGLEVEL', 'INFO').upper() + +# How long to wait in seconds for requests to the ApiServer +APISERVER_TIMEOUT = 120 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 4a7f107a1..518aa42de 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -25,15 +25,15 @@ setuptools.setup( name='kubeflow-pytorchjob', - version='0.1', + version='0.1.1', author="Kubeflow Authors", author_email='hejinchi@cn.ibm.com', - license="Apache License Version 2.0", - url="https://github.com/kubeflow/pytorch-operator/sdk/python", - description="PyTorchJob Python SDK", - long_description="PyTorchJob Python SDK", + license='Apache License Version 2.0', + url='https://github.com/kubeflow/pytorch-operator/sdk/python', + description='PyTorchJob Python SDK', + long_description='PyTorchJob Python SDK', packages=setuptools.find_packages( - include=("kubeflow*")), + include=('kubeflow*')), package_data={}, include_package_data=False, zip_safe=False, @@ -47,8 +47,8 @@ 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', - "License :: OSI Approved :: Apache Software License", - "Operating System :: OS Independent", + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: OS Independent', 'Topic :: Scientific/Engineering', 'Topic :: Scientific/Engineering :: Artificial Intelligence', 'Topic :: Software Development', diff --git a/sdk/python/test/test_e2e.py b/sdk/python/test/test_e2e.py index 4de25cdad..8d14774bd 100644 --- a/sdk/python/test/test_e2e.py +++ b/sdk/python/test/test_e2e.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import os from kubernetes.client import V1PodTemplateSpec @@ -29,24 +28,7 @@ from kubeflow.pytorchjob import PyTorchJobClient PYTORCH_CLIENT = PyTorchJobClient(config_file=os.getenv('KUBECONFIG', '~/.kube/config')) - -def wait_for_pytorchjob_ready(name, namespace='default', - timeout_seconds=600): - for _ in range(round(timeout_seconds/10)): - time.sleep(10) - pytorchjob = PYTORCH_CLIENT.get(name, namespace=namespace) - - last_condition = pytorchjob.get("status", {}).get("conditions", [])[-1] - last_status = last_condition.get("type", "").lower() - - if last_status == "succeeded": - return - elif last_status == "failed": - raise RuntimeError("The PyTorchJob is failed.") - else: - continue - - raise RuntimeError("Timeout to finish the PyTorchJob.") +SDK_TEST_NAMESPACE = 'default' def test_sdk_e2e(): container = V1Container( @@ -78,7 +60,7 @@ def test_sdk_e2e(): pytorchjob = V1PyTorchJob( api_version="kubeflow.org/v1", kind="PyTorchJob", - metadata=V1ObjectMeta(name="pytorchjob-mnist-ci-test", namespace='default'), + metadata=V1ObjectMeta(name="pytorchjob-mnist-ci-test", namespace=SDK_TEST_NAMESPACE), spec=V1PyTorchJobSpec( clean_pod_policy="None", pytorch_replica_specs={"Master": master, @@ -87,6 +69,11 @@ def test_sdk_e2e(): ) PYTORCH_CLIENT.create(pytorchjob) - wait_for_pytorchjob_ready("pytorchjob-mnist-ci-test") - PYTORCH_CLIENT.delete('pytorchjob-mnist-ci-test', namespace='default') + + PYTORCH_CLIENT.wait_for_job("pytorchjob-mnist-ci-test", namespace=SDK_TEST_NAMESPACE) + if not PYTORCH_CLIENT.is_job_succeeded("pytorchjob-mnist-ci-test", + namespace=SDK_TEST_NAMESPACE): + raise RuntimeError("The PyTorchJob is not succeeded.") + + PYTORCH_CLIENT.delete("pytorchjob-mnist-ci-test", namespace=SDK_TEST_NAMESPACE)