Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kfp pipeline for running a pytorch job #14

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions training/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
description: Kubeflow PyTorchJob launcher
inputs:
- {name: name, type: String, description: 'PyTorchJob name.'}
- {name: namespace, type: String, default: kubeflow, description: 'PyTorchJob namespace (likely your current namespace).'}
- {name: version, type: String, default: v1, description: 'PyTorchJob version.'}
- {name: master_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Master replicaSpecs.'}
- {name: worker_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Worker replicaSpecs.'}
- {name: job_timeout_minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the job to complete.'}
- {name: delete_after_done, type: Boolean, default: 'True' , description: 'Whether to delete the job after it is finished.'}
- {name: clean_pod_policy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the PyTorchJob completes.'}
- {name: active_deadline_seconds, type: Integer, optional: true, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'}
- {name: backoff_limit, type: Integer, optional: true, description: 'Number of retries before marking this job as failed.'}
- {name: ttl_seconds_after_finished, type: Integer, optional: true, description: 'Defines the TTL for cleaning up finished PyTorchJobs.'}
implementation:
container:
image: cascribner/kubeflow-pytorchjob-launcher:v1
command: [python, /ml/launch_pytorchjob.py]
args:
- --name
- {inputValue: name}
- --namespace
- {inputValue: namespace}
- --version
- {inputValue: version}
- --masterSpec
- {inputValue: master_spec}
- --workerSpec
- {inputValue: worker_spec}
- --jobTimeoutMinutes
- {inputValue: job_timeout_minutes}
- --deleteAfterDone
- {inputValue: delete_after_done}
- --cleanPodPolicy
- {inputValue: clean_pod_policy}
- --activeDeadlineSeconds
- {inputValue: active_deadline_seconds}
- --backoffLimit
- {inputValue: backoff_limit}
- --ttlSecondsAfterFinished
- {inputValue: ttl_seconds_after_finished}
148 changes: 148 additions & 0 deletions training/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import NamedTuple
import kfp.dsl as dsl
from kfp import components

@dsl.component(base_image="python:slim")
def create_worker_spec(worker_num: int = 0) -> NamedTuple(
"CreatWorkerSpec", [("worker_spec", dict)]):
"""
Creates pytorch-job worker spec
"""
from collections import namedtuple
worker = {}
if worker_num > 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole thing can be rewritten as:

if worker_num <= 0:
  return {}

return {}

or even better - not a component at all. Afterall, it is a single if statement + setting of a single value in a dict. This doesn't have to be a component at all. Remember that each component we create we start a container - this only slows the workflow. Especially in cases where it's a simple data formatting.

worker = {
"replicas": worker_num,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{ "command": [
'/bin/bash',
'-c',
'--'
],
"args": [
"python3.11 -u run.py"
],
"image": "quay.io/michaelclifford/test-train:0.0.11",
"name": "pytorch",
"resources": {
"requests": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
"limits": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
},
}
]
},
},
}

worker_spec_output = namedtuple(
"MyWorkerOutput", ["worker_spec"]
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to output a named tuple here? This is useful only if you output multiple params. I don't think it's needed here at all.

https://www.kubeflow.org/docs/components/pipelines/user-guides/data-handling/parameters/#output-parameters

return worker_spec_output(worker)

@dsl.pipeline(
name="launch-kubeflow-pytorchjob",
description="An example to launch pytorch.",
)
def ilab_train(
namespace: str = "mcliffor",
worker_replicas: int = 1,
ttl_seconds_after_finished: int = -1,
job_timeout_minutes: int = 600,
delete_after_done: bool = False):

pytorchjob_launcher_op = components.load_component_from_file("component.yaml")

master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
# To override default command
"command": [
'/bin/bash',
'-c',
'--'
],
"args": [
"python3.11 -u run.py"
],
# Or, create your own image from
# https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist
"image": "quay.io/michaelclifford/test-train:0.0.11",
"name": "pytorch",
"resources": {
"requests": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
"limits": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
Comment on lines +41 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we parametrize this?

},
}
],
# If imagePullSecrets required
# "imagePullSecrets": [
# {"name": "image-pull-secret"},
# ],
},
},
}

worker_spec_create = create_worker_spec(worker_num=worker_replicas)

# Launch and monitor the job with the launcher
pytorchjob_launcher_op(
name="pytorch-job",
namespace=namespace,
master_spec=master,
worker_spec = worker_spec_create.outputs["worker_spec"],
ttl_seconds_after_finished=ttl_seconds_after_finished,
job_timeout_minutes=job_timeout_minutes,
delete_after_done=delete_after_done,
active_deadline_seconds=100,
backoff_limit=1
)


if __name__ == "__main__":
import kfp.compiler as compiler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to nest the import here. This is against https://pylint.readthedocs.io/en/latest/user_guide/messages/convention/import-outside-toplevel.html


pipeline_file = "pipeline.yaml"
print(
f"Compiling pipeline as {pipeline_file}"
)
compiler.Compiler().compile(
ilab_train, pipeline_file
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat weirdly formatted. I think simple:

Suggested change
pipeline_file = "pipeline.yaml"
print(
f"Compiling pipeline as {pipeline_file}"
)
compiler.Compiler().compile(
ilab_train, pipeline_file
)
pipeline_file = "pipeline.yaml"
print(f"Compiling pipeline as {pipeline_file}")
compiler.Compiler().compile(ilab_train, pipeline_file)

Would be fully compliant with PEP8.

Loading