Skip to content

Commit

Permalink
code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanker13 committed Dec 15, 2023
1 parent c59b0b0 commit 9cb8c9b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 42 deletions.
4 changes: 2 additions & 2 deletions sdk/python/kubeflow/storage_init_container/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import boto3
from urllib.parse import urlparse

import os

@dataclass
class S3DatasetParams:
Expand Down Expand Up @@ -50,6 +50,6 @@ def download_dataset(self):

# Download the file
s3_client.download_file(
self.config.bucket_name, self.config.file_key, self.config.download_dir
self.config.bucket_name, self.config.file_key, os.path.join(self.config.download_dir,self.config.file_key)
)
print(f"File downloaded to: {self.config.download_dir}")
30 changes: 20 additions & 10 deletions sdk/python/kubeflow/storage_init_container/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
from s3 import S3


def model_factory(model_provider, model_provider_args):
def model_factory(model_provider, model_provider_parameters):
match model_provider:
case "hf":
hf = HuggingFace()
hf.load_config(model_provider_args)
hf.load_config(model_provider_parameters)
hf.download_model_and_tokenizer()
case _:
return "This is the default case"


def dataset_factory(dataset_provider, dataset_provider_args):
def dataset_factory(dataset_provider, dataset_provider_parameters):
match dataset_provider:
case "s3":
s3 = S3()
s3.load_config(dataset_provider_args)
s3.load_config(dataset_provider_parameters)
s3.download_dataset()
case _:
return "This is the default case"
Expand All @@ -27,16 +27,26 @@ def dataset_factory(dataset_provider, dataset_provider_args):
parser = argparse.ArgumentParser(
description="script for downloading model and datasets to PVC."
)
parser.add_argument("model_provider", type=str, help="name of model provider")
parser.add_argument(
"model_provider_args", type=str, help="model provider serialised arguments"
"--model_provider", type=str, help="name of model provider", required=False
)
parser.add_argument(
"--model_provider_parameters",
type=str,
help="model provider serialised arguments",
required=False,
)

parser.add_argument("dataset_provider", type=str, help="name of dataset provider")
parser.add_argument(
"dataset_provider_args", type=str, help="dataset provider serialised arguments"
"--dataset_provider", type=str, help="name of dataset provider", required=False
)
parser.add_argument(
"--dataset_provider_parameters",
type=str,
help="dataset provider serialised arguments",
required=False,
)
args = parser.parse_args()

model_factory(args.model_provider, args.model_provider_args)
dataset_factory(args.dataset_provider, args.dataset_provider_args)
model_factory(args.model_provider, args.model_provider_parameters)
dataset_factory(args.dataset_provider, args.dataset_provider_parameters)
66 changes: 42 additions & 24 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def train(
namespace: str = None,
num_workers: int = 1,
num_procs_per_worker: int = 1,
pvc: Dict[Literal["name", "claimName"], str] = None,
model_params: HuggingFaceModelParams = None,
dataset_params: S3DatasetParams = None,
parameters: HuggingFaceTrainParams = None,
storage_config: Dict[Literal["size", "storage_class"], str] = None,
model_provider_parameters: HuggingFaceModelParams = None,
dataset_provider_parameters: S3DatasetParams = None,
train_parameters: HuggingFaceTrainParams = None,
resources_per_worker: Dict[Literal["gpu", "cpu", "memory"], any] = None,
):
"""
Expand All @@ -107,21 +107,41 @@ def train(
if (
not name
or not namespace
or not pvc
or not model_params
or not dataset_params
or not parameters
or not storage_config
or not model_provider_parameters
or not dataset_provider_parameters
or not train_parameters
or not resources_per_worker
):
raise ValueError("One of the required parameters is None")

if num_procs_per_worker > resources_per_worker["gpu"]:
try:
self.core_api.create_namespace(
body=utils.get_namespace_spec(namespace=namespace)
)
except Exception as e:
print(e)

PVC_NAME = "train-job-pvc"
self.core_api.create_namespaced_persistent_volume_claim(
namespace=namespace,
body=utils.get_pvc_spec(
pvc_name=PVC_NAME,
namespace=namespace,
storage_size=storage_config["size"],
storage_class=storage_config["storage_class"],
),
)

if (
resources_per_worker["gpu"] is None and num_procs_per_worker != 0
) or num_procs_per_worker > resources_per_worker["gpu"]:
raise ValueError("Insufficient gpu resources allocated to the container.")

if isinstance(model_params, HuggingFaceModelParams):
if isinstance(model_provider_parameters, HuggingFaceModelParams):
mp = "hf"

if isinstance(dataset_params, S3DatasetParams):
if isinstance(dataset_provider_parameters, S3DatasetParams):
dp = "s3"

# create init container spec
Expand All @@ -133,17 +153,15 @@ def train(
args=[
"--model_provider",
mp,
"--model_provider_args",
json.dumps(model_params.__dict__),
"--model_provider_parameters",
json.dumps(model_provider_parameters.__dict__),
"--dataset_provider",
dp,
"--dataset_provider_args",
json.dumps(dataset_params.__dict__),
"--dataset_provider_parameters",
json.dumps(dataset_provider_parameters.__dict__),
],
volume_mounts=[
models.V1VolumeMount(
name="model_dataset_store", mount_path="/workspace"
)
models.V1VolumeMount(name="train_job_pv", mount_path="/workspace")
],
)

Expand All @@ -153,9 +171,9 @@ def train(
image=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND][
"train_container_image"
],
args=["--parameters", json.dumps(parameters.__dict__)],
args=["--train_parameters", json.dumps(train_parameters.__dict__)],
volume_mounts=[
models.V1VolumeMount(name=pvc["name"], mount_path="/workspace")
models.V1VolumeMount(name="train_job_pv", mount_path="/workspace")
],
resources=models.V1ResourceRequirements(
limits={
Expand All @@ -172,9 +190,9 @@ def train(
containers_spec=[container_spec],
volumes_spec=[
models.V1Volume(
name=pvc["name"],
name="train_job_pv",
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
claim_name=pvc["claimName"]
claim_name=PVC_NAME
),
)
],
Expand All @@ -186,9 +204,9 @@ def train(
containers_spec=[init_container_spec, container_spec],
volumes_spec=[
models.V1Volume(
name=pvc["name"],
name="train_job_pv",
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
claim_name=pvc["claimName"]
claim_name=PVC_NAME
),
)
],
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
PYTORCHJOB_CONTAINER = "pytorch"
PYTORCHJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower())
PYTORCHJOB_BASE_IMAGE = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime"
PYTORCHJOB_STORAGE_CONTAINER = "pytorch-storage"
PYTORCHJOB_STORAGE_CONTAINER_IMAGE = "docker image path"
PYTORCHJOB_TRAIN_CONTAINER_IMAGE = "docker image path"
STORAGE_CONTAINER = "pytorch-storage"
STORAGE_CONTAINER_IMAGE = "docker image path"
TRAINER_TRANSFORMER_IMAGE = "docker image path"

# MXJob constants
MXJOB_KIND = "MXJob"
Expand Down Expand Up @@ -130,9 +130,9 @@
"plural": PYTORCHJOB_PLURAL,
"container": PYTORCHJOB_CONTAINER,
"base_image": PYTORCHJOB_BASE_IMAGE,
"init_container": PYTORCHJOB_STORAGE_CONTAINER,
"init_container_image": PYTORCHJOB_STORAGE_CONTAINER_IMAGE,
"train_container_image": PYTORCHJOB_TRAIN_CONTAINER_IMAGE,
"init_container": STORAGE_CONTAINER,
"init_container_image": STORAGE_CONTAINER_IMAGE,
"train_container_image": TRAINER_TRANSFORMER_IMAGE,
},
MXJOB_KIND: {
"model": models.KubeflowOrgV1MXJob,
Expand Down
30 changes: 30 additions & 0 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,33 @@ def get_pytorchjob_template(
)

return pytorchjob


def get_pvc_spec(
pvc_name: str, namespace: str, storage_size: str, storage_class: str = None
):
if pvc_name is None or namespace is None or storage_size is None:
raise ValueError("One of the arguments is None")

pvc_spec = models.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata={"name": pvc_name, "namepsace": namespace},
spec=models.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce", "ReadOnlyMany"],
resources=models.V1ResourceRequirements(requests={"storage": storage_size}),
),
)

if storage_class is not None:
pvc_spec.spec.storage_class_name = storage_class

return pvc_spec


def get_namespace_spec(namespace):
namespace_spec = models.V1Namespace(
api_version="v1", kind="Namespace", metadata=models.V1ObjectMeta(name=namespace)
)

return namespace_spec

0 comments on commit 9cb8c9b

Please sign in to comment.