From 2f18ab7af09559b494e51210299a7eceba31f599 Mon Sep 17 00:00:00 2001 From: Chitsing KUI Date: Thu, 13 Jul 2023 01:11:07 +0800 Subject: [PATCH] Set correct ENV for PytorchJob to support torchrun (#1840) * fix pytorch WORLD_SIZE env inconsistent * add correct env node_rank, nnodes for torch * restore elastic nnproc * use string for nproc_per_node * add defaults in api * add validation for two nproc_per_node, use auto for defaulter * add ut for defaults and validation * fix ut * add doc for nproc_per_node --- docs/api/kubeflow.org_v1_generated.asciidoc | 3 +- hack/python-sdk/swagger.json | 6 ++- .../base/crds/kubeflow.org_pytorchjobs.yaml | 8 +++- pkg/apis/kubeflow.org/v1/openapi_generated.go | 9 +++- pkg/apis/kubeflow.org/v1/pytorch_defaults.go | 15 ++++++ .../kubeflow.org/v1/pytorch_defaults_test.go | 38 +++++++++++++++ pkg/apis/kubeflow.org/v1/pytorch_types.go | 11 +++++ .../kubeflow.org/v1/pytorch_validation.go | 10 ++++ .../v1/pytorch_validation_test.go | 29 +++++++++++ .../kubeflow.org/v1/zz_generated.deepcopy.go | 5 ++ pkg/controller.v1/pytorch/elastic.go | 12 ++--- pkg/controller.v1/pytorch/elastic_test.go | 9 +--- pkg/controller.v1/pytorch/envvar.go | 48 +++++++++++++++++-- pkg/controller.v1/pytorch/master.go | 11 +++++ .../pytorch/pytorchjob_controller_test.go | 8 +++- sdk/python/docs/KubeflowOrgV1ElasticPolicy.md | 2 +- .../docs/KubeflowOrgV1PyTorchJobSpec.md | 1 + .../models/kubeflow_org_v1_elastic_policy.py | 4 +- .../kubeflow_org_v1_py_torch_job_spec.py | 30 +++++++++++- .../test/test_kubeflow_org_v1_py_torch_job.py | 1 + .../test_kubeflow_org_v1_py_torch_job_list.py | 2 + .../test_kubeflow_org_v1_py_torch_job_spec.py | 1 + 22 files changed, 236 insertions(+), 27 deletions(-) diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 6c4964f425..0ad1208aa5 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -53,7 +53,7 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group | *`rdzvId`* __string__ | | *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (=,=,...). | *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. -| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. +| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. | *`maxRestarts`* __integer__ | | *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. |=== @@ -396,6 +396,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. | *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. | *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ | | *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, } +| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 4fff444db9..ec48eddd03 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -33,7 +33,7 @@ "format": "int32" }, "nProcPerNode": { - "description": "Number of workers per node; supported values: [auto, cpu, gpu, int].", + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.", "type": "integer", "format": "int32" }, @@ -491,6 +491,10 @@ "elasticPolicy": { "$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy" }, + "nprocPerNode": { + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.", + "type": "string" + }, "pytorchReplicaSpecs": { "description": "A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example,\n {\n \"Master\": PyTorchReplicaSpec,\n \"Worker\": PyTorchReplicaSpec,\n }", "type": "object", diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index afe8ab040a..6855393617 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -554,7 +554,8 @@ spec: type: integer nProcPerNode: description: 'Number of workers per node; supported values: [auto, - cpu, gpu, int].' + cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ + Use .spec.nprocPerNode instead.' format: int32 type: integer rdzvBackend: @@ -585,6 +586,11 @@ spec: set values are ignored. type: boolean type: object + nprocPerNode: + description: 'Number of workers per node; supported values: [auto, + cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. + Defaults to auto.' + type: string pytorchReplicaSpecs: additionalProperties: description: ReplicaSpec is a description of the replica diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index ea93510040..9944bf0fde 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -126,7 +126,7 @@ func schema_pkg_apis_kubefloworg_v1_ElasticPolicy(ref common.ReferenceCallback) }, "nProcPerNode": { SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node; supported values: [auto, cpu, gpu, int].", + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.", Type: []string{"integer"}, Format: "int32", }, @@ -909,6 +909,13 @@ func schema_pkg_apis_kubefloworg_v1_PyTorchJobSpec(ref common.ReferenceCallback) }, }, }, + "nprocPerNode": { + SchemaProps: spec.SchemaProps{ + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"runPolicy", "pytorchReplicaSpecs"}, }, diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go index e1bb569342..a71625fd4c 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go @@ -19,6 +19,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +var ( + DefaultNprocPerNode = "auto" +) + func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error { return RegisterDefaults(scheme) } @@ -61,6 +65,14 @@ func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) { } } +func setDefaultNprocPerNode(job *PyTorchJob) { + if (job.Spec.ElasticPolicy != nil && job.Spec.ElasticPolicy.NProcPerNode == nil) || (job.Spec.ElasticPolicy == nil) { + if job.Spec.NprocPerNode == nil { + job.Spec.NprocPerNode = &DefaultNprocPerNode + } + } +} + // SetDefaults_PyTorchJob sets any unspecified values to defaults. func SetDefaults_PyTorchJob(job *PyTorchJob) { // Set default cleanpod policy to None. @@ -78,4 +90,7 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) { } // Set default elastic policy. setElasticPolicy(job) + + // Set default nproc_per_node. + setDefaultNprocPerNode(job) } diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go index 4a9ef9f895..a489b6eef5 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go @@ -152,3 +152,41 @@ func TestSetElasticPolicy(t *testing.T) { }) } } + +func TestSetDefaultNprocPerNode(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + t.Run("test default nproc per node", func(t *testing.T) { + job := &PyTorchJob{ + Spec: PyTorchJobSpec{ + ElasticPolicy: &ElasticPolicy{ + NProcPerNode: nil, + }, + PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{ + PyTorchJobReplicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + }, + }, + } + + setDefaultNprocPerNode(job) + gomega.Expect(job.Spec.NprocPerNode). + To(gomega.Equal(&DefaultNprocPerNode)) + }) + t.Run("test default nproc per node", func(t *testing.T) { + job := &PyTorchJob{ + Spec: PyTorchJobSpec{ + ElasticPolicy: nil, + PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{ + PyTorchJobReplicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + }, + }, + } + + setDefaultNprocPerNode(job) + gomega.Expect(job.Spec.NprocPerNode). + To(gomega.Equal(&DefaultNprocPerNode)) + }) +} diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index b932b4469a..8e5b4030b0 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -67,6 +67,10 @@ type PyTorchJob struct { Status JobStatus `json:"status,omitempty"` } +// For PyTorch launch/run related spec declaration, please see the following doc for more detail: +// https://pytorch.org/docs/stable/elastic/run.html +// Or run command `torchrun --help` for a brief description. + // PyTorchJobSpec is a desired state description of the PyTorchJob. type PyTorchJobSpec struct { // RunPolicy encapsulates various runtime policies of the distributed training @@ -84,6 +88,11 @@ type PyTorchJobSpec struct { // "Worker": PyTorchReplicaSpec, // } PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"` + + // Number of workers per node; supported values: [auto, cpu, gpu, int]. + // For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. + // Defaults to auto. + NprocPerNode *string `json:"nprocPerNode,omitempty"` } type ElasticPolicy struct { @@ -107,6 +116,8 @@ type ElasticPolicy struct { // are ignored. Standalone *bool `json:"standalone,omitempty"` // Number of workers per node; supported values: [auto, cpu, gpu, int]. + // Deprecated: This API is deprecated in v1.7+ + // Use .spec.nprocPerNode instead. NProcPerNode *int32 `json:"nProcPerNode,omitempty"` MaxRestarts *int32 `json:"maxRestarts,omitempty"` diff --git a/pkg/apis/kubeflow.org/v1/pytorch_validation.go b/pkg/apis/kubeflow.org/v1/pytorch_validation.go index 4a15293d62..19932d6834 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_validation.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_validation.go @@ -27,6 +27,16 @@ func ValidateV1PyTorchJob(pytorchJob *PyTorchJob) error { if err := validatePyTorchReplicaSpecs(pytorchJob.Spec.PyTorchReplicaSpecs); err != nil { return err } + if err := validateNprocPerNode(pytorchJob); err != nil { + return err + } + return nil +} + +func validateNprocPerNode(pytorchJob *PyTorchJob) error { + if pytorchJob.Spec.NprocPerNode != nil && pytorchJob.Spec.ElasticPolicy != nil && pytorchJob.Spec.ElasticPolicy.NProcPerNode != nil { + return fmt.Errorf(".spec.elasticPolicy.nProcPerNode is deprecated, use .spec.nprocPerNode instead") + } return nil } diff --git a/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go b/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go index 98eb71a761..0b46da3742 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go @@ -180,6 +180,35 @@ func TestValidateV1PyTorchJob(t *testing.T) { }, wantErr: true, }, + "Spec.NprocPerNode and Spec.ElasticPolicy.NProcPerNode are set": { + pytorchJob: &PyTorchJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: PyTorchJobSpec{ + NprocPerNode: pointer.String("1"), + ElasticPolicy: &ElasticPolicy{ + NProcPerNode: pointer.Int32(1), + }, + PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{ + PyTorchJobReplicaTypeMaster: { + Replicas: pointer.Int32(2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: "gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0", + }, + }, + }, + }, + }, + }, + }, + }, + wantErr: true, + }, } for name, tc := range testCases { diff --git a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go index 57c01e70bf..106b79a47f 100644 --- a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go @@ -585,6 +585,11 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { (*out)[key] = outVal } } + if in.NprocPerNode != nil { + in, out := &in.NprocPerNode, &out.NprocPerNode + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PyTorchJobSpec. diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index 52ba459a52..f1c9be94ae 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -47,12 +47,10 @@ const ( // EnvStartMethod is the environment variable name for the multiprocessing start method to use when creating workers, which could be fork, spawn and forkserver. EnvStartMethod = "PET_START_METHOD" - // Worker/node size related arguments. + // EnvNNodes is the common environment variable name from envvar // EnvNProcPerNode is the environment variable name for the number of processes per node. EnvNProcPerNode = "PET_NPROC_PER_NODE" - // EnvNNodes is the environment variable name for the number of nodes. - EnvNNodes = "PET_NNODES" ) var ( @@ -89,7 +87,7 @@ func (e ElasticEnvVarGenerator) Generate( // Generate RDZV_BACKEND. envVars = append(envVars, e.generateEnvBackend(elasticPolicy)) // Generate NNODES. - if envVar, err := e.generateEnvNNodes(job); err != nil { + if envVar, err := e.generateEnvNnodes(job); err != nil { return nil, err } else { envVars = append(envVars, *envVar) @@ -126,7 +124,7 @@ func (e ElasticEnvVarGenerator) Generate( return envVars, nil } -func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) { +func (e ElasticEnvVarGenerator) generateEnvNnodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) { // Return worker.replicas if there is no max and min replicas specified. if job.Spec.ElasticPolicy.MinReplicas == nil && job.Spec.ElasticPolicy.MaxReplicas == nil { @@ -134,7 +132,7 @@ func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (* return nil, fmt.Errorf("cannot find the worker spec") } return &corev1.EnvVar{ - Name: EnvNNodes, + Name: EnvNnodes, Value: strconv.Itoa( int(*job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker]. Replicas)), @@ -142,7 +140,7 @@ func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (* } return &corev1.EnvVar{ - Name: EnvNNodes, + Name: EnvNnodes, Value: fmt.Sprintf("%d:%d", *job.Spec.ElasticPolicy.MinReplicas, *job.Spec.ElasticPolicy.MaxReplicas), }, nil diff --git a/pkg/controller.v1/pytorch/elastic_test.go b/pkg/controller.v1/pytorch/elastic_test.go index fdec37f869..9cf9dabdee 100644 --- a/pkg/controller.v1/pytorch/elastic_test.go +++ b/pkg/controller.v1/pytorch/elastic_test.go @@ -72,8 +72,7 @@ func TestElasticGenerate(t *testing.T) { Value: "rdzv-conf-value-1", }, }, - NProcPerNode: pointer.Int32(1), - MaxRestarts: pointer.Int32(3), + MaxRestarts: pointer.Int32(3), }, PyTorchReplicaSpecs: map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ kubeflowv1.PyTorchJobReplicaTypeWorker: { @@ -88,10 +87,6 @@ func TestElasticGenerate(t *testing.T) { Name: EnvMaxRestarts, Value: "3", }, - { - Name: EnvNProcPerNode, - Value: "1", - }, { Name: EnvRDZVBackend, Value: "c10d", @@ -109,7 +104,7 @@ func TestElasticGenerate(t *testing.T) { Value: "rdzv-conf-name=rdzv-conf-value,rdzv-conf-name-1=rdzv-conf-value-1", }, { - Name: EnvNNodes, + Name: EnvNnodes, Value: "1:3", }, }, diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index d3ff5880a7..9ee5593bf2 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -24,6 +24,17 @@ import ( kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) +const ( + // Worker/node size related arguments. + + // EnvNprocPerNode is the environment variable name for the number of processes per node. + EnvNprocPerNode = "PET_NPROC_PER_NODE" + // EnvNnodes is the environment variable name for the number of nodes. + EnvNnodes = "PET_NNODES" + // EnvNodeRank is the environment variable name for the rank of nodes. + EnvNodeRank = "PET_NODE_RANK" +) + // EnvVarGenerator is the environment variable generator interface. type EnvVarGenerator interface { Generate(job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error) @@ -48,6 +59,10 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, Value: "0", }) + totalReplicas := getTotalReplicas(pytorchjob) + nprocPerNode := getNprocPerNodeInt(pytorchjob) + worldSize := int(totalReplicas) * nprocPerNode + // If the master is not null, then we need to set the MASTER_ADDR and RANK. if pytorchjob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] != nil { envVars, err := GetMasterEnvVarGenerator().Generate(pytorchjob) @@ -67,19 +82,27 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, rank = rank + 1 } - totalReplicas := getTotalReplicas(pytorchjob) - podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "WORLD_SIZE", - Value: strconv.Itoa(int(totalReplicas)), + Value: strconv.Itoa(worldSize), }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "RANK", Value: strconv.Itoa(rank), }) + podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ + Name: EnvNprocPerNode, + Value: *pytorchjob.Spec.NprocPerNode, + }) + podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ + Name: EnvNodeRank, + Value: strconv.Itoa(rank), + }) } // Set the elastic environment variables if the elasticPolicy is not null. + // nnodes is set in range format in elastic mode, e.g. nnodes=1:4 + // otherwise, nnodes is set by int, e.g. nnodes=2 if pytorchjob.Spec.ElasticPolicy != nil { envVars, err := GetElasticEnvVarGenerator().Generate(pytorchjob) if err != nil { @@ -88,12 +111,31 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, // Set elastic related environment variables. podTemplateSpec.Spec.Containers[i].Env = append( podTemplateSpec.Spec.Containers[i].Env, envVars...) + } else { + podTemplateSpec.Spec.Containers[i].Env = append( + podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ + Name: EnvNnodes, + Value: strconv.Itoa(int(totalReplicas)), + }) } } return nil } +// getNprocPerNodeInt return the int value of NprocPerNode, return 1 if not int +// When nproc_per_node set to auto, it means the number of process will be determinated +// in the user process phase, in this case, world size env will not be used. +func getNprocPerNodeInt(job *kubeflowv1.PyTorchJob) int { + if job.Spec.NprocPerNode == nil { + return 1 + } + if np, err := strconv.Atoi(*job.Spec.NprocPerNode); err == nil { + return np + } + return 1 +} + func getTotalReplicas(job *kubeflowv1.PyTorchJob) int32 { jobReplicas := int32(0) for _, r := range job.Spec.PyTorchReplicaSpecs { diff --git a/pkg/controller.v1/pytorch/master.go b/pkg/controller.v1/pytorch/master.go index ce116db88e..064c7054b9 100644 --- a/pkg/controller.v1/pytorch/master.go +++ b/pkg/controller.v1/pytorch/master.go @@ -14,6 +14,9 @@ var ( onceMaster sync.Once EnvMasterPort = "MASTER_PORT" EnvMasterAddr = "MASTER_ADDR" + + PETMasterPort = "PET_MASTER_PORT" + PETMasterAddr = "PET_MASTER_ADDR" ) // MasterEnvVarGenerator is the environment variable generator for Master related arguments. @@ -42,10 +45,18 @@ func (e MasterEnvVarGenerator) Generate( Name: EnvMasterPort, Value: strconv.Itoa(int(masterPort)), }) + envVars = append(envVars, corev1.EnvVar{ + Name: PETMasterPort, + Value: strconv.Itoa(int(masterPort)), + }) envVars = append(envVars, corev1.EnvVar{ Name: EnvMasterAddr, Value: masterAddr, }) + envVars = append(envVars, corev1.EnvVar{ + Name: PETMasterAddr, + Value: masterAddr, + }) } return envVars, nil } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 81b2a29556..1d2134b8e7 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -86,6 +86,7 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } + job.Spec.NprocPerNode = nil Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) @@ -116,13 +117,16 @@ var _ = Describe("PyTorchJob controller", func() { Name: kubeflowv1.PytorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP})) - // Check MASTER_PORT and MASTER_ADDR env variable + // Check env variable Expect(masterPod.Spec.Containers[0].Env).To(ContainElements(corev1.EnvVar{ Name: EnvMasterPort, Value: fmt.Sprintf("%d", masterSvc.Spec.Ports[0].Port), }, corev1.EnvVar{ Name: EnvMasterAddr, Value: masterSvc.Name, + }, corev1.EnvVar{ + Name: EnvNprocPerNode, + Value: kubeflowv1.DefaultNprocPerNode, })) // Check service port. Expect(masterSvc.Spec.Ports[0].Port).To(Equal(expectedPort)) @@ -244,7 +248,7 @@ var _ = Describe("PyTorchJob controller", func() { Name: EnvRDZVBackend, Value: string(backendC10D), }, corev1.EnvVar{ - Name: EnvNNodes, + Name: EnvNnodes, Value: fmt.Sprintf("%d:%d", *minReplicas, *maxReplicas), }, corev1.EnvVar{ Name: EnvRDZVEndpoint, diff --git a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md index 3a7a0589ee..c39927a013 100644 --- a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md @@ -7,7 +7,7 @@ Name | Type | Description | Notes **max_restarts** | **int** | | [optional] **metrics** | [**list[K8sIoApiAutoscalingV2MetricSpec]**](K8sIoApiAutoscalingV2MetricSpec.md) | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. | [optional] **min_replicas** | **int** | minReplicas is the lower limit for the number of replicas to which the training job can scale down. It defaults to null. | [optional] -**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. | [optional] +**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. | [optional] **rdzv_backend** | **str** | | [optional] **rdzv_conf** | [**list[KubeflowOrgV1RDZVConf]**](KubeflowOrgV1RDZVConf.md) | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...). | [optional] **rdzv_host** | **str** | | [optional] diff --git a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md index 7647a472d2..6e24755a14 100644 --- a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md +++ b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md @@ -5,6 +5,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **elastic_policy** | [**KubeflowOrgV1ElasticPolicy**](KubeflowOrgV1ElasticPolicy.md) | | [optional] +**nproc_per_node** | **str** | Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. | [optional] **pytorch_replica_specs** | [**dict(str, KubeflowOrgV1ReplicaSpec)**](KubeflowOrgV1ReplicaSpec.md) | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { \"Master\": PyTorchReplicaSpec, \"Worker\": PyTorchReplicaSpec, } | **run_policy** | [**KubeflowOrgV1RunPolicy**](KubeflowOrgV1RunPolicy.md) | | diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py index c6e3d9f6bd..311bd6d288 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py @@ -196,7 +196,7 @@ def min_replicas(self, min_replicas): def n_proc_per_node(self): """Gets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. # noqa: E501 :return: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 :rtype: int @@ -207,7 +207,7 @@ def n_proc_per_node(self): def n_proc_per_node(self, n_proc_per_node): """Sets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. # noqa: E501 :param n_proc_per_node: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 :type: int diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py index 34a915d257..663c27376d 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py @@ -34,29 +34,34 @@ class KubeflowOrgV1PyTorchJobSpec(object): """ openapi_types = { 'elastic_policy': 'KubeflowOrgV1ElasticPolicy', + 'nproc_per_node': 'str', 'pytorch_replica_specs': 'dict(str, KubeflowOrgV1ReplicaSpec)', 'run_policy': 'KubeflowOrgV1RunPolicy' } attribute_map = { 'elastic_policy': 'elasticPolicy', + 'nproc_per_node': 'nprocPerNode', 'pytorch_replica_specs': 'pytorchReplicaSpecs', 'run_policy': 'runPolicy' } - def __init__(self, elastic_policy=None, pytorch_replica_specs=None, run_policy=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, elastic_policy=None, nproc_per_node=None, pytorch_replica_specs=None, run_policy=None, local_vars_configuration=None): # noqa: E501 """KubeflowOrgV1PyTorchJobSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration self._elastic_policy = None + self._nproc_per_node = None self._pytorch_replica_specs = None self._run_policy = None self.discriminator = None if elastic_policy is not None: self.elastic_policy = elastic_policy + if nproc_per_node is not None: + self.nproc_per_node = nproc_per_node self.pytorch_replica_specs = pytorch_replica_specs self.run_policy = run_policy @@ -81,6 +86,29 @@ def elastic_policy(self, elastic_policy): self._elastic_policy = elastic_policy + @property + def nproc_per_node(self): + """Gets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 + + Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. # noqa: E501 + + :return: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 + :rtype: str + """ + return self._nproc_per_node + + @nproc_per_node.setter + def nproc_per_node(self, nproc_per_node): + """Sets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. + + Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. # noqa: E501 + + :param nproc_per_node: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 + :type: str + """ + + self._nproc_per_node = nproc_per_node + @property def pytorch_replica_specs(self): """Gets the pytorch_replica_specs of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index 39758b1777..f31bdec8cf 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -58,6 +58,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index af3cffec1a..ab041186dc 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -61,6 +61,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, @@ -131,6 +132,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index 749bd7b959..366188de7b 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -54,6 +54,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56,