Skip to content

Commit

Permalink
Set correct ENV for PytorchJob to support torchrun (#1840)
Browse files Browse the repository at this point in the history
* fix pytorch WORLD_SIZE env inconsistent

* add correct env node_rank, nnodes for torch

* restore elastic nnproc

* use string for nproc_per_node

* add defaults in api

* add validation for two nproc_per_node, use auto for defaulter

* add ut for defaults and validation

* fix ut

* add doc for nproc_per_node
  • Loading branch information
kuizhiqing authored Jul 12, 2023
1 parent a3a2972 commit 2f18ab7
Show file tree
Hide file tree
Showing 22 changed files with 236 additions and 27 deletions.
3 changes: 2 additions & 1 deletion docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group
| *`rdzvId`* __string__ |
| *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...).
| *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored.
| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int].
| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.
| *`maxRestarts`* __integer__ |
| *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created.
|===
Expand Down Expand Up @@ -396,6 +396,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob.
| *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.
| *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ |
| *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, }
| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.
|===


Expand Down
6 changes: 5 additions & 1 deletion hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"format": "int32"
},
"nProcPerNode": {
"description": "Number of workers per node; supported values: [auto, cpu, gpu, int].",
"description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.",
"type": "integer",
"format": "int32"
},
Expand Down Expand Up @@ -491,6 +491,10 @@
"elasticPolicy": {
"$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy"
},
"nprocPerNode": {
"description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.",
"type": "string"
},
"pytorchReplicaSpecs": {
"description": "A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example,\n {\n \"Master\": PyTorchReplicaSpec,\n \"Worker\": PyTorchReplicaSpec,\n }",
"type": "object",
Expand Down
8 changes: 7 additions & 1 deletion manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ spec:
type: integer
nProcPerNode:
description: 'Number of workers per node; supported values: [auto,
cpu, gpu, int].'
cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+
Use .spec.nprocPerNode instead.'
format: int32
type: integer
rdzvBackend:
Expand Down Expand Up @@ -585,6 +586,11 @@ spec:
set values are ignored.
type: boolean
type: object
nprocPerNode:
description: 'Number of workers per node; supported values: [auto,
cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658.
Defaults to auto.'
type: string
pytorchReplicaSpecs:
additionalProperties:
description: ReplicaSpec is a description of the replica
Expand Down
9 changes: 8 additions & 1 deletion pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)

var (
DefaultNprocPerNode = "auto"
)

func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}
Expand Down Expand Up @@ -61,6 +65,14 @@ func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) {
}
}

func setDefaultNprocPerNode(job *PyTorchJob) {
if (job.Spec.ElasticPolicy != nil && job.Spec.ElasticPolicy.NProcPerNode == nil) || (job.Spec.ElasticPolicy == nil) {
if job.Spec.NprocPerNode == nil {
job.Spec.NprocPerNode = &DefaultNprocPerNode
}
}
}

// SetDefaults_PyTorchJob sets any unspecified values to defaults.
func SetDefaults_PyTorchJob(job *PyTorchJob) {
// Set default cleanpod policy to None.
Expand All @@ -78,4 +90,7 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) {
}
// Set default elastic policy.
setElasticPolicy(job)

// Set default nproc_per_node.
setDefaultNprocPerNode(job)
}
38 changes: 38 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,41 @@ func TestSetElasticPolicy(t *testing.T) {
})
}
}

func TestSetDefaultNprocPerNode(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)
t.Run("test default nproc per node", func(t *testing.T) {
job := &PyTorchJob{
Spec: PyTorchJobSpec{
ElasticPolicy: &ElasticPolicy{
NProcPerNode: nil,
},
PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{
PyTorchJobReplicaTypeWorker: {
Replicas: pointer.Int32(1),
},
},
},
}

setDefaultNprocPerNode(job)
gomega.Expect(job.Spec.NprocPerNode).
To(gomega.Equal(&DefaultNprocPerNode))
})
t.Run("test default nproc per node", func(t *testing.T) {
job := &PyTorchJob{
Spec: PyTorchJobSpec{
ElasticPolicy: nil,
PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{
PyTorchJobReplicaTypeWorker: {
Replicas: pointer.Int32(1),
},
},
},
}

setDefaultNprocPerNode(job)
gomega.Expect(job.Spec.NprocPerNode).
To(gomega.Equal(&DefaultNprocPerNode))
})
}
11 changes: 11 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type PyTorchJob struct {
Status JobStatus `json:"status,omitempty"`
}

// For PyTorch launch/run related spec declaration, please see the following doc for more detail:
// https://pytorch.org/docs/stable/elastic/run.html
// Or run command `torchrun --help` for a brief description.

// PyTorchJobSpec is a desired state description of the PyTorchJob.
type PyTorchJobSpec struct {
// RunPolicy encapsulates various runtime policies of the distributed training
Expand All @@ -84,6 +88,11 @@ type PyTorchJobSpec struct {
// "Worker": PyTorchReplicaSpec,
// }
PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"`

// Number of workers per node; supported values: [auto, cpu, gpu, int].
// For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658.
// Defaults to auto.
NprocPerNode *string `json:"nprocPerNode,omitempty"`
}

type ElasticPolicy struct {
Expand All @@ -107,6 +116,8 @@ type ElasticPolicy struct {
// are ignored.
Standalone *bool `json:"standalone,omitempty"`
// Number of workers per node; supported values: [auto, cpu, gpu, int].
// Deprecated: This API is deprecated in v1.7+
// Use .spec.nprocPerNode instead.
NProcPerNode *int32 `json:"nProcPerNode,omitempty"`

MaxRestarts *int32 `json:"maxRestarts,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ func ValidateV1PyTorchJob(pytorchJob *PyTorchJob) error {
if err := validatePyTorchReplicaSpecs(pytorchJob.Spec.PyTorchReplicaSpecs); err != nil {
return err
}
if err := validateNprocPerNode(pytorchJob); err != nil {
return err
}
return nil
}

func validateNprocPerNode(pytorchJob *PyTorchJob) error {
if pytorchJob.Spec.NprocPerNode != nil && pytorchJob.Spec.ElasticPolicy != nil && pytorchJob.Spec.ElasticPolicy.NProcPerNode != nil {
return fmt.Errorf(".spec.elasticPolicy.nProcPerNode is deprecated, use .spec.nprocPerNode instead")
}
return nil
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,35 @@ func TestValidateV1PyTorchJob(t *testing.T) {
},
wantErr: true,
},
"Spec.NprocPerNode and Spec.ElasticPolicy.NProcPerNode are set": {
pytorchJob: &PyTorchJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: PyTorchJobSpec{
NprocPerNode: pointer.String("1"),
ElasticPolicy: &ElasticPolicy{
NProcPerNode: pointer.Int32(1),
},
PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{
PyTorchJobReplicaTypeMaster: {
Replicas: pointer.Int32(2),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "pytorch",
Image: "gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0",
},
},
},
},
},
},
},
},
wantErr: true,
},
}

for name, tc := range testCases {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions pkg/controller.v1/pytorch/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ const (
// EnvStartMethod is the environment variable name for the multiprocessing start method to use when creating workers, which could be fork, spawn and forkserver.
EnvStartMethod = "PET_START_METHOD"

// Worker/node size related arguments.
// EnvNNodes is the common environment variable name from envvar

// EnvNProcPerNode is the environment variable name for the number of processes per node.
EnvNProcPerNode = "PET_NPROC_PER_NODE"
// EnvNNodes is the environment variable name for the number of nodes.
EnvNNodes = "PET_NNODES"
)

var (
Expand Down Expand Up @@ -89,7 +87,7 @@ func (e ElasticEnvVarGenerator) Generate(
// Generate RDZV_BACKEND.
envVars = append(envVars, e.generateEnvBackend(elasticPolicy))
// Generate NNODES.
if envVar, err := e.generateEnvNNodes(job); err != nil {
if envVar, err := e.generateEnvNnodes(job); err != nil {
return nil, err
} else {
envVars = append(envVars, *envVar)
Expand Down Expand Up @@ -126,23 +124,23 @@ func (e ElasticEnvVarGenerator) Generate(
return envVars, nil
}

func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) {
func (e ElasticEnvVarGenerator) generateEnvNnodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) {
// Return worker.replicas if there is no max and min replicas specified.
if job.Spec.ElasticPolicy.MinReplicas == nil &&
job.Spec.ElasticPolicy.MaxReplicas == nil {
if job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker] == nil {
return nil, fmt.Errorf("cannot find the worker spec")
}
return &corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: strconv.Itoa(
int(*job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].
Replicas)),
}, nil
}

return &corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: fmt.Sprintf("%d:%d",
*job.Spec.ElasticPolicy.MinReplicas, *job.Spec.ElasticPolicy.MaxReplicas),
}, nil
Expand Down
9 changes: 2 additions & 7 deletions pkg/controller.v1/pytorch/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func TestElasticGenerate(t *testing.T) {
Value: "rdzv-conf-value-1",
},
},
NProcPerNode: pointer.Int32(1),
MaxRestarts: pointer.Int32(3),
MaxRestarts: pointer.Int32(3),
},
PyTorchReplicaSpecs: map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{
kubeflowv1.PyTorchJobReplicaTypeWorker: {
Expand All @@ -88,10 +87,6 @@ func TestElasticGenerate(t *testing.T) {
Name: EnvMaxRestarts,
Value: "3",
},
{
Name: EnvNProcPerNode,
Value: "1",
},
{
Name: EnvRDZVBackend,
Value: "c10d",
Expand All @@ -109,7 +104,7 @@ func TestElasticGenerate(t *testing.T) {
Value: "rdzv-conf-name=rdzv-conf-value,rdzv-conf-name-1=rdzv-conf-value-1",
},
{
Name: EnvNNodes,
Name: EnvNnodes,
Value: "1:3",
},
},
Expand Down
Loading

0 comments on commit 2f18ab7

Please sign in to comment.