From b1882edcd90e7109aadf4b579d630c4152d5ba85 Mon Sep 17 00:00:00 2001 From: Shreyanand Date: Fri, 6 Sep 2024 14:47:30 -0400 Subject: [PATCH 1/2] Add kfp pipeline for running a pytorch job Signed-off-by: Shreyanand --- training/component.yaml | 40 +++++++ training/pipeline.py | 148 +++++++++++++++++++++++ training/pipeline.yaml | 252 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 440 insertions(+) create mode 100644 training/component.yaml create mode 100644 training/pipeline.py create mode 100644 training/pipeline.yaml diff --git a/training/component.yaml b/training/component.yaml new file mode 100644 index 0000000..66bb8dd --- /dev/null +++ b/training/component.yaml @@ -0,0 +1,40 @@ +description: Kubeflow PyTorchJob launcher +inputs: +- {name: name, type: String, description: 'PyTorchJob name.'} +- {name: namespace, type: String, default: kubeflow, description: 'PyTorchJob namespace (likely your current namespace).'} +- {name: version, type: String, default: v1, description: 'PyTorchJob version.'} +- {name: master_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Master replicaSpecs.'} +- {name: worker_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Worker replicaSpecs.'} +- {name: job_timeout_minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the job to complete.'} +- {name: delete_after_done, type: Boolean, default: 'True' , description: 'Whether to delete the job after it is finished.'} +- {name: clean_pod_policy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the PyTorchJob completes.'} +- {name: active_deadline_seconds, type: Integer, optional: true, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'} +- {name: backoff_limit, type: Integer, optional: true, description: 'Number of retries before marking this job as failed.'} +- {name: ttl_seconds_after_finished, type: Integer, optional: true, description: 'Defines the TTL for cleaning up finished PyTorchJobs.'} +implementation: + container: + image: cascribner/kubeflow-pytorchjob-launcher:v1 + command: [python, /ml/launch_pytorchjob.py] + args: + - --name + - {inputValue: name} + - --namespace + - {inputValue: namespace} + - --version + - {inputValue: version} + - --masterSpec + - {inputValue: master_spec} + - --workerSpec + - {inputValue: worker_spec} + - --jobTimeoutMinutes + - {inputValue: job_timeout_minutes} + - --deleteAfterDone + - {inputValue: delete_after_done} + - --cleanPodPolicy + - {inputValue: clean_pod_policy} + - --activeDeadlineSeconds + - {inputValue: active_deadline_seconds} + - --backoffLimit + - {inputValue: backoff_limit} + - --ttlSecondsAfterFinished + - {inputValue: ttl_seconds_after_finished} \ No newline at end of file diff --git a/training/pipeline.py b/training/pipeline.py new file mode 100644 index 0000000..588bd0d --- /dev/null +++ b/training/pipeline.py @@ -0,0 +1,148 @@ +from typing import NamedTuple +import kfp.dsl as dsl +from kfp import components + +@dsl.component(base_image="python:slim") +def create_worker_spec(worker_num: int = 0) -> NamedTuple( + "CreatWorkerSpec", [("worker_spec", dict)]): + """ + Creates pytorch-job worker spec + """ + from collections import namedtuple + worker = {} + if worker_num > 0: + worker = { + "replicas": worker_num, + "restartPolicy": "OnFailure", + "template": { + "metadata": { + "annotations": { + "sidecar.istio.io/inject": "false" + } + }, + "spec": { + "containers": [ + { "command": [ + '/bin/bash', + '-c', + '--' + ], + "args": [ + "python3.11 -u run.py" + ], + "image": "quay.io/michaelclifford/test-train:0.0.11", + "name": "pytorch", + "resources": { + "requests": { + "memory": "8Gi", + "cpu": "2000m", + # Uncomment for GPU + "nvidia.com/gpu": 1, + }, + "limits": { + "memory": "8Gi", + "cpu": "2000m", + # Uncomment for GPU + "nvidia.com/gpu": 1, + }, + }, + } + ] + }, + }, + } + + worker_spec_output = namedtuple( + "MyWorkerOutput", ["worker_spec"] + ) + return worker_spec_output(worker) + +@dsl.pipeline( + name="launch-kubeflow-pytorchjob", + description="An example to launch pytorch.", +) +def ilab_train( + namespace: str = "mcliffor", + worker_replicas: int = 1, + ttl_seconds_after_finished: int = -1, + job_timeout_minutes: int = 600, + delete_after_done: bool = False): + + pytorchjob_launcher_op = components.load_component_from_file("component.yaml") + + master = { + "replicas": 1, + "restartPolicy": "OnFailure", + "template": { + "metadata": { + "annotations": { + # See https://github.com/kubeflow/website/issues/2011 + "sidecar.istio.io/inject": "false" + } + }, + "spec": { + "containers": [ + { + # To override default command + "command": [ + '/bin/bash', + '-c', + '--' + ], + "args": [ + "python3.11 -u run.py" + ], + # Or, create your own image from + # https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist + "image": "quay.io/michaelclifford/test-train:0.0.11", + "name": "pytorch", + "resources": { + "requests": { + "memory": "8Gi", + "cpu": "2000m", + # Uncomment for GPU + "nvidia.com/gpu": 1, + }, + "limits": { + "memory": "8Gi", + "cpu": "2000m", + # Uncomment for GPU + "nvidia.com/gpu": 1, + }, + }, + } + ], + # If imagePullSecrets required + # "imagePullSecrets": [ + # {"name": "image-pull-secret"}, + # ], + }, + }, + } + + worker_spec_create = create_worker_spec(worker_num=worker_replicas) + + # Launch and monitor the job with the launcher + pytorchjob_launcher_op( + name="pytorch-job", + namespace=namespace, + master_spec=master, + worker_spec = worker_spec_create.outputs["worker_spec"], + ttl_seconds_after_finished=ttl_seconds_after_finished, + job_timeout_minutes=job_timeout_minutes, + delete_after_done=delete_after_done, + active_deadline_seconds=100, + backoff_limit=1 + ) + + +if __name__ == "__main__": + import kfp.compiler as compiler + + pipeline_file = "pipeline.yaml" + print( + f"Compiling pipeline as {pipeline_file}" + ) + compiler.Compiler().compile( + ilab_train, pipeline_file + ) \ No newline at end of file diff --git a/training/pipeline.yaml b/training/pipeline.yaml new file mode 100644 index 0000000..c7282b7 --- /dev/null +++ b/training/pipeline.yaml @@ -0,0 +1,252 @@ +# PIPELINE DEFINITION +# Name: launch-kubeflow-pytorchjob +# Description: An example to launch pytorch. +# Inputs: +# delete_after_done: bool [Default: False] +# job_timeout_minutes: int [Default: 600.0] +# namespace: str [Default: 'mcliffor'] +# ttl_seconds_after_finished: int [Default: -1.0] +# worker_replicas: int [Default: 1.0] +components: + comp-create-worker-spec: + executorLabel: exec-create-worker-spec + inputDefinitions: + parameters: + worker_num: + defaultValue: 0.0 + isOptional: true + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + worker_spec: + parameterType: STRUCT + comp-name: + executorLabel: exec-name + inputDefinitions: + parameters: + active_deadline_seconds: + isOptional: true + parameterType: NUMBER_INTEGER + backoff_limit: + isOptional: true + parameterType: NUMBER_INTEGER + clean_pod_policy: + defaultValue: Running + isOptional: true + parameterType: STRING + delete_after_done: + defaultValue: true + isOptional: true + parameterType: BOOLEAN + job_timeout_minutes: + defaultValue: 1440.0 + isOptional: true + parameterType: NUMBER_INTEGER + master_spec: + defaultValue: {} + isOptional: true + parameterType: STRUCT + name: + parameterType: STRING + namespace: + defaultValue: kubeflow + isOptional: true + parameterType: STRING + ttl_seconds_after_finished: + isOptional: true + parameterType: NUMBER_INTEGER + version: + defaultValue: v1 + isOptional: true + parameterType: STRING + worker_spec: + defaultValue: {} + isOptional: true + parameterType: STRUCT +deploymentSpec: + executors: + exec-create-worker-spec: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_worker_spec + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_worker_spec(worker_num: int = 0) -> NamedTuple(\n \"\ + CreatWorkerSpec\", [(\"worker_spec\", dict)]):\n \"\"\"\n Creates\ + \ pytorch-job worker spec\n \"\"\"\n from collections import namedtuple\n\ + \ worker = {}\n if worker_num > 0:\n worker = {\n \ + \ \"replicas\": worker_num,\n \"restartPolicy\": \"OnFailure\"\ + ,\n \"template\": {\n \"metadata\": {\n \ + \ \"annotations\": {\n \"sidecar.istio.io/inject\"\ + : \"false\"\n }\n },\n \ + \ \"spec\": {\n \"containers\": [\n \ + \ { \"command\": [\n '/bin/bash',\n \ + \ '-c',\n '--'\n \ + \ ],\n \"args\": [\n\ + \ \"python3.11 -u run.py\"\n \ + \ ],\n \"image\": \"quay.io/michaelclifford/test-train:0.0.11\"\ + ,\n \"name\": \"pytorch\",\n \ + \ \"resources\": {\n \"requests\"\ + : {\n \"memory\": \"8Gi\",\n \ + \ \"cpu\": \"2000m\",\n \ + \ # Uncomment for GPU\n \ + \ \"nvidia.com/gpu\": 1,\n },\n \ + \ \"limits\": {\n \ + \ \"memory\": \"8Gi\",\n \"cpu\"\ + : \"2000m\",\n # Uncomment for GPU\n\ + \ \"nvidia.com/gpu\": 1,\n \ + \ },\n },\n \ + \ }\n ]\n },\n },\n\ + \ }\n\n worker_spec_output = namedtuple(\n \"MyWorkerOutput\"\ + , [\"worker_spec\"]\n )\n return worker_spec_output(worker)\n\n" + image: python:slim + exec-name: + container: + args: + - --name + - '{{$.inputs.parameters[''name'']}}' + - --namespace + - '{{$.inputs.parameters[''namespace'']}}' + - --version + - '{{$.inputs.parameters[''version'']}}' + - --masterSpec + - '{{$.inputs.parameters[''master_spec'']}}' + - --workerSpec + - '{{$.inputs.parameters[''worker_spec'']}}' + - --jobTimeoutMinutes + - '{{$.inputs.parameters[''job_timeout_minutes'']}}' + - --deleteAfterDone + - '{{$.inputs.parameters[''delete_after_done'']}}' + - --cleanPodPolicy + - '{{$.inputs.parameters[''clean_pod_policy'']}}' + - --activeDeadlineSeconds + - '{{$.inputs.parameters[''active_deadline_seconds'']}}' + - --backoffLimit + - '{{$.inputs.parameters[''backoff_limit'']}}' + - --ttlSecondsAfterFinished + - '{{$.inputs.parameters[''ttl_seconds_after_finished'']}}' + command: + - python + - /ml/launch_pytorchjob.py + image: cascribner/kubeflow-pytorchjob-launcher:v1 +pipelineInfo: + description: An example to launch pytorch. + name: launch-kubeflow-pytorchjob +root: + dag: + tasks: + create-worker-spec: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-worker-spec + inputs: + parameters: + worker_num: + componentInputParameter: worker_replicas + taskInfo: + name: create-worker-spec + name: + cachingOptions: + enableCache: true + componentRef: + name: comp-name + dependentTasks: + - create-worker-spec + inputs: + parameters: + active_deadline_seconds: + runtimeValue: + constant: 100.0 + backoff_limit: + runtimeValue: + constant: 1.0 + delete_after_done: + componentInputParameter: delete_after_done + job_timeout_minutes: + componentInputParameter: job_timeout_minutes + master_spec: + runtimeValue: + constant: + replicas: 1.0 + restartPolicy: OnFailure + template: + metadata: + annotations: + sidecar.istio.io/inject: 'false' + spec: + containers: + - args: + - python3.11 -u run.py + command: + - /bin/bash + - -c + - -- + image: quay.io/michaelclifford/test-train:0.0.11 + name: pytorch + resources: + limits: + cpu: 2000m + memory: 8Gi + nvidia.com/gpu: 1.0 + requests: + cpu: 2000m + memory: 8Gi + nvidia.com/gpu: 1.0 + name: + runtimeValue: + constant: pytorch-job + namespace: + componentInputParameter: namespace + ttl_seconds_after_finished: + componentInputParameter: ttl_seconds_after_finished + worker_spec: + taskOutputParameter: + outputParameterKey: worker_spec + producerTask: create-worker-spec + taskInfo: + name: name + inputDefinitions: + parameters: + delete_after_done: + defaultValue: false + isOptional: true + parameterType: BOOLEAN + job_timeout_minutes: + defaultValue: 600.0 + isOptional: true + parameterType: NUMBER_INTEGER + namespace: + defaultValue: mcliffor + isOptional: true + parameterType: STRING + ttl_seconds_after_finished: + defaultValue: -1.0 + isOptional: true + parameterType: NUMBER_INTEGER + worker_replicas: + defaultValue: 1.0 + isOptional: true + parameterType: NUMBER_INTEGER +schemaVersion: 2.1.0 +sdkVersion: kfp-2.8.0 From fd8be21d74ed80f77c376d7a16437c487e9eaf12 Mon Sep 17 00:00:00 2001 From: Shreyanand Date: Mon, 9 Sep 2024 14:26:22 -0400 Subject: [PATCH 2/2] Remove worker spec container component Signed-off-by: Shreyanand --- training/pipeline.py | 121 +++++++++++++++++------------------------ training/pipeline.yaml | 112 ++++++++++---------------------------- 2 files changed, 79 insertions(+), 154 deletions(-) diff --git a/training/pipeline.py b/training/pipeline.py index 588bd0d..7df4e6a 100644 --- a/training/pipeline.py +++ b/training/pipeline.py @@ -1,66 +1,10 @@ -from typing import NamedTuple import kfp.dsl as dsl from kfp import components +import kfp.compiler as compiler -@dsl.component(base_image="python:slim") -def create_worker_spec(worker_num: int = 0) -> NamedTuple( - "CreatWorkerSpec", [("worker_spec", dict)]): - """ - Creates pytorch-job worker spec - """ - from collections import namedtuple - worker = {} - if worker_num > 0: - worker = { - "replicas": worker_num, - "restartPolicy": "OnFailure", - "template": { - "metadata": { - "annotations": { - "sidecar.istio.io/inject": "false" - } - }, - "spec": { - "containers": [ - { "command": [ - '/bin/bash', - '-c', - '--' - ], - "args": [ - "python3.11 -u run.py" - ], - "image": "quay.io/michaelclifford/test-train:0.0.11", - "name": "pytorch", - "resources": { - "requests": { - "memory": "8Gi", - "cpu": "2000m", - # Uncomment for GPU - "nvidia.com/gpu": 1, - }, - "limits": { - "memory": "8Gi", - "cpu": "2000m", - # Uncomment for GPU - "nvidia.com/gpu": 1, - }, - }, - } - ] - }, - }, - } - - worker_spec_output = namedtuple( - "MyWorkerOutput", ["worker_spec"] - ) - return worker_spec_output(worker) -@dsl.pipeline( - name="launch-kubeflow-pytorchjob", - description="An example to launch pytorch.", -) +@dsl.pipeline(name="launch-kubeflow-pytorchjob", + description="An example to launch pytorch.") def ilab_train( namespace: str = "mcliffor", worker_replicas: int = 1, @@ -120,29 +64,64 @@ def ilab_train( }, } - worker_spec_create = create_worker_spec(worker_num=worker_replicas) + worker = {} + if worker_replicas > 0: + worker = { + "replicas": worker_replicas, + "restartPolicy": "OnFailure", + "template": { + "metadata": { + "annotations": { + "sidecar.istio.io/inject": "false" + } + }, + "spec": { + "containers": [ + { "command": [ + '/bin/bash', + '-c', + '--' + ], + "args": [ + "python3.11 -u run.py" + ], + "image": "quay.io/michaelclifford/test-train:0.0.11", + "name": "pytorch", + "resources": { + "requests": { + "memory": "8Gi", + "cpu": "2000m", + # Uncomment for GPU + "nvidia.com/gpu": 1, + }, + "limits": { + "memory": "8Gi", + "cpu": "2000m", + # Uncomment for GPU + "nvidia.com/gpu": 1, + }, + }, + } + ] + }, + }, + } # Launch and monitor the job with the launcher pytorchjob_launcher_op( name="pytorch-job", namespace=namespace, master_spec=master, - worker_spec = worker_spec_create.outputs["worker_spec"], + worker_spec = worker, ttl_seconds_after_finished=ttl_seconds_after_finished, job_timeout_minutes=job_timeout_minutes, delete_after_done=delete_after_done, active_deadline_seconds=100, - backoff_limit=1 - ) + backoff_limit=1) if __name__ == "__main__": - import kfp.compiler as compiler - pipeline_file = "pipeline.yaml" - print( - f"Compiling pipeline as {pipeline_file}" - ) - compiler.Compiler().compile( - ilab_train, pipeline_file - ) \ No newline at end of file + print(f"Compiling pipeline as {pipeline_file}") + compiler.Compiler().compile(ilab_train, + pipeline_file) \ No newline at end of file diff --git a/training/pipeline.yaml b/training/pipeline.yaml index c7282b7..0e964d3 100644 --- a/training/pipeline.yaml +++ b/training/pipeline.yaml @@ -8,18 +8,6 @@ # ttl_seconds_after_finished: int [Default: -1.0] # worker_replicas: int [Default: 1.0] components: - comp-create-worker-spec: - executorLabel: exec-create-worker-spec - inputDefinitions: - parameters: - worker_num: - defaultValue: 0.0 - isOptional: true - parameterType: NUMBER_INTEGER - outputDefinitions: - parameters: - worker_spec: - parameterType: STRUCT comp-name: executorLabel: exec-name inputDefinitions: @@ -65,61 +53,6 @@ components: parameterType: STRUCT deploymentSpec: executors: - exec-create-worker-spec: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_worker_spec - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_worker_spec(worker_num: int = 0) -> NamedTuple(\n \"\ - CreatWorkerSpec\", [(\"worker_spec\", dict)]):\n \"\"\"\n Creates\ - \ pytorch-job worker spec\n \"\"\"\n from collections import namedtuple\n\ - \ worker = {}\n if worker_num > 0:\n worker = {\n \ - \ \"replicas\": worker_num,\n \"restartPolicy\": \"OnFailure\"\ - ,\n \"template\": {\n \"metadata\": {\n \ - \ \"annotations\": {\n \"sidecar.istio.io/inject\"\ - : \"false\"\n }\n },\n \ - \ \"spec\": {\n \"containers\": [\n \ - \ { \"command\": [\n '/bin/bash',\n \ - \ '-c',\n '--'\n \ - \ ],\n \"args\": [\n\ - \ \"python3.11 -u run.py\"\n \ - \ ],\n \"image\": \"quay.io/michaelclifford/test-train:0.0.11\"\ - ,\n \"name\": \"pytorch\",\n \ - \ \"resources\": {\n \"requests\"\ - : {\n \"memory\": \"8Gi\",\n \ - \ \"cpu\": \"2000m\",\n \ - \ # Uncomment for GPU\n \ - \ \"nvidia.com/gpu\": 1,\n },\n \ - \ \"limits\": {\n \ - \ \"memory\": \"8Gi\",\n \"cpu\"\ - : \"2000m\",\n # Uncomment for GPU\n\ - \ \"nvidia.com/gpu\": 1,\n \ - \ },\n },\n \ - \ }\n ]\n },\n },\n\ - \ }\n\n worker_spec_output = namedtuple(\n \"MyWorkerOutput\"\ - , [\"worker_spec\"]\n )\n return worker_spec_output(worker)\n\n" - image: python:slim exec-name: container: args: @@ -155,24 +88,11 @@ pipelineInfo: root: dag: tasks: - create-worker-spec: - cachingOptions: - enableCache: true - componentRef: - name: comp-create-worker-spec - inputs: - parameters: - worker_num: - componentInputParameter: worker_replicas - taskInfo: - name: create-worker-spec name: cachingOptions: enableCache: true componentRef: name: comp-name - dependentTasks: - - create-worker-spec inputs: parameters: active_deadline_seconds: @@ -218,12 +138,38 @@ root: constant: pytorch-job namespace: componentInputParameter: namespace + pipelinechannel--worker_replicas: + componentInputParameter: worker_replicas ttl_seconds_after_finished: componentInputParameter: ttl_seconds_after_finished worker_spec: - taskOutputParameter: - outputParameterKey: worker_spec - producerTask: create-worker-spec + runtimeValue: + constant: + replicas: '{{$.inputs.parameters[''pipelinechannel--worker_replicas'']}}' + restartPolicy: OnFailure + template: + metadata: + annotations: + sidecar.istio.io/inject: 'false' + spec: + containers: + - args: + - python3.11 -u run.py + command: + - /bin/bash + - -c + - -- + image: quay.io/michaelclifford/test-train:0.0.11 + name: pytorch + resources: + limits: + cpu: 2000m + memory: 8Gi + nvidia.com/gpu: 1.0 + requests: + cpu: 2000m + memory: 8Gi + nvidia.com/gpu: 1.0 taskInfo: name: name inputDefinitions: