diff --git a/eval/mmlu/__init__.py b/eval/mmlu/__init__.py index 836daac..fa62a73 100644 --- a/eval/mmlu/__init__.py +++ b/eval/mmlu/__init__.py @@ -1,5 +1,4 @@ from .components import run_mmlu_op, load_mmlu_results_op -#from . import faked +# from . import faked __all__ = ["run_mmlu_op", "load_mmlu_results_op"] - diff --git a/eval/mmlu/components.py b/eval/mmlu/components.py index cb46dad..23c5c89 100644 --- a/eval/mmlu/components.py +++ b/eval/mmlu/components.py @@ -6,6 +6,7 @@ EVAL_IMAGE = "quay.io/sallyom/instructlab-ocp:eval" + @component(base_image=EVAL_IMAGE) def run_mmlu_op( mmlu_output: Output[Artifact], @@ -16,21 +17,27 @@ def run_mmlu_op( batch_size: int, device: str, models_list: List[str], -) -> NamedTuple('outputs', best_model=str, best_score=float): +) -> NamedTuple("outputs", best_model=str, best_score=float): import json import os import torch from instructlab.eval.mmlu import MMLUEvaluator, MMLU_TASKS - mmlu_tasks = mmlu_tasks_list.split(',') if mmlu_tasks_list else MMLU_TASKS + mmlu_tasks = mmlu_tasks_list.split(",") if mmlu_tasks_list else MMLU_TASKS # Device setup and debug gpu_available = torch.cuda.is_available() - gpu_name = torch.cuda.get_device_name(torch.cuda.current_device()) if gpu_available else "No GPU available" + gpu_name = ( + torch.cuda.get_device_name(torch.cuda.current_device()) + if gpu_available + else "No GPU available" + ) print(f"GPU Available: {gpu_available}, Using: {gpu_name}") - effective_device = device if device is not None else ("cuda" if gpu_available else "cpu") + effective_device = ( + device if device is not None else ("cuda" if gpu_available else "cpu") + ) print(f"Running on device: {effective_device}") scores = {} @@ -53,33 +60,39 @@ def run_mmlu_op( mmlu_score, individual_scores = evaluator.run() average_score = round(mmlu_score, 2) - print(f"Model {model_name} is stored at: {model_path} with AVERAGE_SCORE: {average_score}") + print( + f"Model {model_name} is stored at: {model_path} with AVERAGE_SCORE: {average_score}" + ) mmlu_data = { "report_title": "KNOWLEDGE EVALUATION REPORT", "model": model_name, "average_score": average_score, "number_of_tasks": len(individual_scores), - "individual_scores": [{task: round(score['score'], 2)} for task, score in individual_scores.items()] + "individual_scores": [ + {task: round(score["score"], 2)} + for task, score in individual_scores.items() + ], } all_mmlu_data.append(mmlu_data) scores[model_path] = average_score - with open(mmlu_output.path, 'w') as f: + with open(mmlu_output.path, "w") as f: json.dump(all_mmlu_data, f, indent=4) - outputs = NamedTuple('outputs', best_model=str, best_score=float) + outputs = NamedTuple("outputs", best_model=str, best_score=float) best_model = max(scores, key=scores.get) best_score = scores[best_model] return outputs(best_model=best_model, best_score=best_score) + @component(base_image=PYTHON_IMAGE) def load_mmlu_results_op(mmlu_output: Input[Artifact]) -> list: import json mmlu_score_list = [] - with open(mmlu_output.path, 'r') as f: - mmlu_score_list = json.load(f) + with open(mmlu_output.path, "r") as f: + mmlu_score_list = json.load(f) print("MMLU Evaluation Data:") for mmlu_score in mmlu_score_list: diff --git a/eval/mt_bench/__init__.py b/eval/mt_bench/__init__.py index 753bb2a..d637889 100644 --- a/eval/mt_bench/__init__.py +++ b/eval/mt_bench/__init__.py @@ -1,5 +1,4 @@ from .components import run_mt_bench_op, load_mt_bench_results_op -#from . import faked +# from . import faked __all__ = ["run_mt_bench_op", "load_mt_bench_results_op"] - diff --git a/eval/mt_bench/components.py b/eval/mt_bench/components.py index d8b2fb1..5971888 100644 --- a/eval/mt_bench/components.py +++ b/eval/mt_bench/components.py @@ -6,6 +6,7 @@ EVAL_IMAGE = "quay.io/sallyom/instructlab-ocp:eval" + @component(base_image=EVAL_IMAGE, packages_to_install=["vllm"]) def run_mt_bench_op( models_path_prefix: str, @@ -17,10 +18,10 @@ def run_mt_bench_op( # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36 max_workers: str = "auto", device: str = None, -) -> NamedTuple('outputs', best_model=str, best_score=float): - - - def launch_vllm_server_background(model_path: str, gpu_count: int, retries: int = 60, delay: int = 5): +) -> NamedTuple("outputs", best_model=str, best_score=float): + def launch_vllm_server_background( + model_path: str, gpu_count: int, retries: int = 60, delay: int = 5 + ): import subprocess import sys import time @@ -29,15 +30,20 @@ def launch_vllm_server_background(model_path: str, gpu_count: int, retries: int if gpu_count > 0: command = [ sys.executable, - "-m", "vllm.entrypoints.openai.api_server", - "--model", model_path, - "--tensor-parallel-size", str(gpu_count), + "-m", + "vllm.entrypoints.openai.api_server", + "--model", + model_path, + "--tensor-parallel-size", + str(gpu_count), ] else: command = [ sys.executable, - "-m", "vllm.entrypoints.openai.api_server", - "--model", model_path, + "-m", + "vllm.entrypoints.openai.api_server", + "--model", + model_path, ] subprocess.Popen(args=command) @@ -54,10 +60,14 @@ def launch_vllm_server_background(model_path: str, gpu_count: int, retries: int except requests.ConnectionError: pass - print(f"Server not available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...") + print( + f"Server not available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})..." + ) time.sleep(delay) - raise RuntimeError(f"Failed to start vLLM server at {server_url} after {retries} retries.") + raise RuntimeError( + f"Failed to start vLLM server at {server_url} after {retries} retries." + ) # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn't work # Also, the base image does not include `pkill` cmd, so can't pkill -f vllm.entrypoints.openai.api_server either @@ -67,21 +77,30 @@ def stop_vllm_server_by_name(): for process in psutil.process_iter(attrs=["pid", "name", "cmdline"]): cmdline = process.info.get("cmdline") if cmdline and "vllm.entrypoints.openai.api_server" in cmdline: - print(f"Found vLLM server process with PID: {process.info['pid']}, terminating...") + print( + f"Found vLLM server process with PID: {process.info['pid']}, terminating..." + ) try: process.terminate() # Try graceful termination process.wait(timeout=5) # Wait a bit for it to terminate if process.is_running(): - print(f"Forcefully killing vLLM server process with PID: {process.info['pid']}") + print( + f"Forcefully killing vLLM server process with PID: {process.info['pid']}" + ) process.kill() # Force kill if it's still running - print(f"Successfully stopped vLLM server with PID: {process.info['pid']}") + print( + f"Successfully stopped vLLM server with PID: {process.info['pid']}" + ) except psutil.NoSuchProcess: print(f"Process with PID {process.info['pid']} no longer exists.") except psutil.AccessDenied: - print(f"Access denied when trying to terminate process with PID {process.info['pid']}.") + print( + f"Access denied when trying to terminate process with PID {process.info['pid']}." + ) except Exception as e: - print(f"Failed to terminate process with PID {process.info['pid']}. Error: {e}") - + print( + f"Failed to terminate process with PID {process.info['pid']}. Error: {e}" + ) import json import torch @@ -93,7 +112,11 @@ def stop_vllm_server_by_name(): candidate_server_url = "http://localhost:8000/v1" gpu_available = torch.cuda.is_available() - gpu_name = torch.cuda.get_device_name(torch.cuda.current_device()) if gpu_available else "No GPU available" + gpu_name = ( + torch.cuda.get_device_name(torch.cuda.current_device()) + if gpu_available + else "No GPU available" + ) gpu_count = torch.cuda.device_count() if gpu_available else 0 print(f"GPU Available: {gpu_available}, {gpu_name}") @@ -108,12 +131,12 @@ def stop_vllm_server_by_name(): # TODO: Using evaluator results in connection errors, need to determine why. # For now, using mt_bench_answers.generate_answers & mt_bench_judgment.generate_judgment - #evaluator = MTBenchEvaluator( + # evaluator = MTBenchEvaluator( # model_name=candidate_model_name, # judge_model_name=judge_model_name, # max_workers=max_workers, # merge_system_user_message=merge_system_user_message - #) + # ) judge_api_key = os.getenv("JUDGE_API_KEY", "") judge_model_name = os.getenv("JUDGE_NAME") @@ -125,7 +148,7 @@ def stop_vllm_server_by_name(): for model_name in models_list: print(f"Serving candidate model: {model_name}") model_path = f"{models_path_prefix}/{model_name}" - + # Launch the vLLM server and wait until it is ready launch_vllm_server_background(model_path, gpu_count) @@ -135,18 +158,20 @@ def stop_vllm_server_by_name(): model_name=model_path, model_api_base=candidate_server_url, output_dir="/tmp/eval_output", - max_workers=max_workers + max_workers=max_workers, ) print("Judging answers...") - overall_score, qa_pairs, turn_scores, error_rate = mt_bench_judgment.generate_judgment( - model_name=model_path, - judge_model_name=judge_model_name, - model_api_base=judge_endpoint, - api_key=judge_api_key, - output_dir="/tmp/eval_output", - max_workers=max_workers, - merge_system_user_message=merge_system_user_message + overall_score, qa_pairs, turn_scores, error_rate = ( + mt_bench_judgment.generate_judgment( + model_name=model_path, + judge_model_name=judge_model_name, + model_api_base=judge_endpoint, + api_key=judge_api_key, + output_dir="/tmp/eval_output", + max_workers=max_workers, + merge_system_user_message=merge_system_user_message, + ) ) stop_vllm_server_by_name() @@ -164,21 +189,22 @@ def stop_vllm_server_by_name(): all_mt_bench_data.append(mt_bench_data) scores[model_path] = overall_score - with open(mt_bench_output.path, 'w') as f: + with open(mt_bench_output.path, "w") as f: json.dump(all_mt_bench_data, f, indent=4) - outputs = NamedTuple('outputs', best_model=str, best_score=float) + outputs = NamedTuple("outputs", best_model=str, best_score=float) best_model = max(scores, key=scores.get) best_score = scores[best_model] return outputs(best_model=best_model, best_score=best_score) + @component(base_image=PYTHON_IMAGE) def load_mt_bench_results_op(mt_bench_output: Input[Artifact]) -> list: import json mt_bench_score_list = [] - with open(mt_bench_output.path, 'r') as f: - mt_bench_score_list = json.load(f) + with open(mt_bench_output.path, "r") as f: + mt_bench_score_list = json.load(f) print("MT_Bench Evaluation Data:") for mt_bench_score in mt_bench_score_list: diff --git a/kubernetes_yaml/model_downloader/container_file/download_hf_model.py b/kubernetes_yaml/model_downloader/container_file/download_hf_model.py index 237ffaa..bd0d012 100644 --- a/kubernetes_yaml/model_downloader/container_file/download_hf_model.py +++ b/kubernetes_yaml/model_downloader/container_file/download_hf_model.py @@ -14,7 +14,14 @@ # Set up logging logger = logging.getLogger(__name__) -def upload_and_save_model_to_s3(model_name: str, local_model_path: Union[str, Path], s3_model_path: str, verbose: bool = False, replace_if_exists: bool = False) -> str: + +def upload_and_save_model_to_s3( + model_name: str, + local_model_path: Union[str, Path], + s3_model_path: str, + verbose: bool = False, + replace_if_exists: bool = False, +) -> str: """ Download a Hugging Face model, upload it to S3, and log the details. @@ -29,33 +36,35 @@ def upload_and_save_model_to_s3(model_name: str, local_model_path: Union[str, Pa str: S3 path where the model is stored. """ # Convert s3_model_path to lowercase - s3_model_path = os.getenv('MODEL_PATH') + s3_model_path = os.getenv("MODEL_PATH") s3_model_path = s3_model_path.lower() # S3 Configuration - s3_endpoint = os.getenv('S3_ENDPOINT') - aws_access_key = os.getenv('AWS_ACCESS_KEY') - aws_secret_key = os.getenv('AWS_SECRET_KEY') + s3_endpoint = os.getenv("S3_ENDPOINT") + aws_access_key = os.getenv("AWS_ACCESS_KEY") + aws_secret_key = os.getenv("AWS_SECRET_KEY") bucket_name = os.getenv("AWS_BUCKET_NAME") s3_folder = os.getenv("S3_FOLDER") s3_prefix = f"{s3_folder}/{s3_model_path}" # Create an S3 client s3 = boto3.client( - 's3', + "s3", endpoint_url=s3_endpoint, aws_access_key_id=aws_access_key, - aws_secret_access_key=aws_secret_key + aws_secret_access_key=aws_secret_key, ) # Handle Hugging Face model download - hf_token = os.getenv('HF_TOKEN') + hf_token = os.getenv("HF_TOKEN") if hf_token: hf_token = hf_token.strip() login(token=hf_token, add_to_git_credential=True) logger.info("Successfully logged in to Hugging Face.") else: - raise EnvironmentError("HF_TOKEN is not defined. Please set the Hugging Face token as an environment variable.") + raise EnvironmentError( + "HF_TOKEN is not defined. Please set the Hugging Face token as an environment variable." + ) converted_model_path = os.path.join(local_model_path, model_name.replace("/", "-")) @@ -65,12 +74,16 @@ def upload_and_save_model_to_s3(model_name: str, local_model_path: Union[str, Pa if Path(converted_model_path).exists(): shutil.rmtree(converted_model_path) snapshot_download(repo_id=model_name, local_dir=converted_model_path) - logger.info(f"Model '{model_name}' downloaded and saved to {converted_model_path}") + logger.info( + f"Model '{model_name}' downloaded and saved to {converted_model_path}" + ) # Upload files to S3 with a progress bar total_files = sum(len(files) for _, _, files in os.walk(converted_model_path)) num_files = 0 - with tqdm(total=total_files, desc="Uploading files to S3", disable=not verbose) as pbar: + with tqdm( + total=total_files, desc="Uploading files to S3", disable=not verbose + ) as pbar: for root, _, files in os.walk(converted_model_path): for filename in files: file_path = os.path.join(root, filename) @@ -82,7 +95,9 @@ def upload_and_save_model_to_s3(model_name: str, local_model_path: Union[str, Pa pbar.update(1) if num_files == 0: - raise ValueError(f"No files were uploaded. Check access to {converted_model_path}.") + raise ValueError( + f"No files were uploaded. Check access to {converted_model_path}." + ) # Log connection details s3_path = f"s3://{bucket_name}/{s3_prefix}" @@ -100,11 +115,12 @@ def upload_and_save_model_to_s3(model_name: str, local_model_path: Union[str, Pa return s3_path + # Example usage upload_and_save_model_to_s3( - model_name = os.getenv('MODEL'), + model_name=os.getenv("MODEL"), local_model_path="./models", - s3_model_path=os.getenv('MODEL_PATH'), + s3_model_path=os.getenv("MODEL_PATH"), verbose=True, - replace_if_exists=False + replace_if_exists=False, ) diff --git a/pipeline.py b/pipeline.py index c3b057b..2708821 100644 --- a/pipeline.py +++ b/pipeline.py @@ -35,7 +35,7 @@ def pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]): kubectl_wait_for_op, huggingface_importer_op, pvc_to_artifact_op, - pvc_to_model_op + pvc_to_model_op, ) from utils import artifact_to_pvc_op else: @@ -46,7 +46,7 @@ def pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]): artifact_to_pvc_op, huggingface_importer_op, pvc_to_artifact_op, - pvc_to_model_op + pvc_to_model_op, ) # Imports for MMLU, MT_BENCH stage @@ -76,10 +76,11 @@ def pipeline( merge_system_user_message: bool = False, device: str = None, ): - # SDG stage git_clone_task = git_clone_op( - repo_branch=repo_branch, repo_pr=repo_pr if repo_pr and repo_pr > 0 else None, repo_url=repo_url + repo_branch=repo_branch, + repo_pr=repo_pr if repo_pr and repo_pr > 0 else None, + repo_url=repo_url, ) sdg_task = sdg_op( @@ -93,7 +94,6 @@ def pipeline( ) use_secret_as_env(sdg_task, K8S_NAME, {"api_key": "api_key"}) - # Training stage # We need to pass storage_class_name as "" to use the default StorageClass, if left empty, KFP uses "standard" StorageClass. @@ -116,10 +116,9 @@ def pipeline( task=model_to_pvc_task, pvc_name=model_pvc_task.output, mount_path="/model" ) - #Data processing + # Data processing data_processing_task = data_processing_op( - sdg = sdg_task.outputs["sdg"], - model = model_to_artifact.outputs["model"] + sdg=sdg_task.outputs["sdg"], model=model_to_artifact.outputs["model"] ) sdg_input_pvc_task = CreatePVC( @@ -151,7 +150,7 @@ def pipeline( name_suffix=sdg_input_pvc_task.output, output_pvc_name=output_pvc_task.output, path_to_model="/input_model/model", - phase_name="first" + phase_name="first", ) pytorchjob_manifest_task.set_caching_options(False) @@ -179,12 +178,14 @@ def pipeline( models_list_task.after(kubectl_wait_task) mount_pvc( - task=models_list_task, pvc_name=output_pvc_task.output, mount_path="/output/model" + task=models_list_task, + pvc_name=output_pvc_task.output, + mount_path="/output/model", ) run_mmlu_task = run_mmlu_op( models_list=models_list_task.output, - models_path_prefix = "/output/model/hf_format", + models_path_prefix="/output/model/hf_format", mmlu_tasks_list=mmlu_tasks_list, model_dtype=model_dtype, few_shots=few_shots, @@ -199,10 +200,10 @@ def pipeline( ) load_mmlu_results_task = load_mmlu_results_op( - mmlu_output=run_mmlu_task.outputs['mmlu_output'], + mmlu_output=run_mmlu_task.outputs["mmlu_output"], ) - run_mmlu_task.set_accelerator_type('nvidia.com/gpu') + run_mmlu_task.set_accelerator_type("nvidia.com/gpu") run_mmlu_task.set_accelerator_limit(1) # Run training on MMLU best-model @@ -210,17 +211,15 @@ def pipeline( # For now, running mt_bench on same output models as training phase 1 # TODO: Another training phase, using the best-model from MMLU as base - #### Train 2 - - + #### Train 2 pytorchjob_manifest_2_task = pytorchjob_manifest_op( model_pvc_name=model_pvc_task.output, input_pvc_name=sdg_input_pvc_task.output, name_suffix=sdg_input_pvc_task.output, output_pvc_name=output_pvc_task.output, - path_to_model= run_mmlu_task.outputs["best_model"], - phase_name="second" + path_to_model=run_mmlu_task.outputs["best_model"], + phase_name="second", ) pytorchjob_manifest_2_task.set_caching_options(False) @@ -239,8 +238,7 @@ def pipeline( kubectl_wait_2_task.after(kubectl_apply_2_task) kubectl_wait_2_task.set_caching_options(False) - - ### + ### models_list_2_task = list_models_in_directory_op( models_folder="/output/model/model/hf_format", @@ -250,72 +248,78 @@ def pipeline( models_list_2_task.after(kubectl_wait_2_task) mount_pvc( - task=models_list_2_task, pvc_name=output_pvc_task.output, mount_path="/output/model" + task=models_list_2_task, + pvc_name=output_pvc_task.output, + mount_path="/output/model", ) - ### + ### run_mt_bench_task = run_mt_bench_op( # TODO: make a second models_list_task from the 2nd phase of training models_list=models_list_2_task.output, - models_path_prefix = "/output/model/hf_format", - max_workers = max_workers, - merge_system_user_message = merge_system_user_message, - device = device, + models_path_prefix="/output/model/hf_format", + max_workers=max_workers, + merge_system_user_message=merge_system_user_message, + device=device, ) mount_pvc( - task=run_mt_bench_task, pvc_name=output_pvc_task.output, mount_path="/output" + task=run_mt_bench_task, + pvc_name=output_pvc_task.output, + mount_path="/output", ) # For now run on same models from same training run as MMLU run_mt_bench_task.after(models_list_2_task) - run_mt_bench_task.set_accelerator_type('nvidia.com/gpu') + run_mt_bench_task.set_accelerator_type("nvidia.com/gpu") run_mt_bench_task.set_accelerator_limit(1) run_mt_bench_task.set_caching_options(False) - use_config_map_as_env( - run_mt_bench_task, JUDGE_CONFIG_MAP, dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME") + run_mt_bench_task, + JUDGE_CONFIG_MAP, + dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME"), ) use_secret_as_env(run_mt_bench_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"}) - # Technically `output_model_task` and `output_data_task` can happen before evaluation, # however the PVC can only be mounted once, so, setting these to _after_ so the eval proceeds. output_model_task = pvc_to_artifact_op( pvc_path="/output/data", - ) - #output_model_task.after(kubectl_wait_task) + ) + # output_model_task.after(kubectl_wait_task) output_model_task.after(run_mt_bench_task) output_model_task.set_caching_options(False) mount_pvc( - task=output_model_task, pvc_name=output_pvc_task.output, mount_path="/output/data" + task=output_model_task, + pvc_name=output_pvc_task.output, + mount_path="/output/data", ) output_data_task = pvc_to_model_op( pvc_path="/output/model", - ) - #output_data_task.after(kubectl_wait_task) + ) + # output_data_task.after(kubectl_wait_task) output_data_task.after(run_mt_bench_task) mount_pvc( - task=output_data_task, pvc_name=output_pvc_task.output, mount_path="/output/model" + task=output_data_task, + pvc_name=output_pvc_task.output, + mount_path="/output/model", ) - + output_pvc_delete_task = DeletePVC(pvc_name=output_pvc_task.output) output_pvc_delete_task.after(output_data_task) - sdg_pvc_delete_task = DeletePVC(pvc_name=sdg_input_pvc_task.output) sdg_pvc_delete_task.after(output_data_task) model_pvc_delete_task = DeletePVC(pvc_name=model_pvc_task.output) model_pvc_delete_task.after(output_data_task) - return return pipeline @@ -330,7 +334,6 @@ def pipeline( default=[], ) def cli(mock): - p = pipeline_wrapper(mock) with click.progressbar(length=1, label="Generating pipeline") as bar: diff --git a/pipeline.yaml b/pipeline.yaml index 9f202f6..a4f50a0 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -566,36 +566,37 @@ deploymentSpec: - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n processed_data:\ \ dsl.Output[dsl.Dataset],\n model: dsl.Input[dsl.Artifact],\n max_seq_len:\ - \ Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000\n):\n\ + \ Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000,\n):\n\ \ import instructlab.training.data_process as dp\n import os\n \ \ from instructlab.training import (\n TrainingArgs,\n DataProcessArgs,\n\ - \ )\n # define training-specific arguments\n training_args\ - \ = TrainingArgs(\n # define data-specific arguments\n model_path\ - \ = model.path,\n data_path = f\"{sdg.path}/*_train_msgs*.jsonl\"\ - ,\n data_output_dir = processed_data.path,\n\n # define model-trianing\ - \ parameters\n max_seq_len = max_seq_len,\n max_batch_len\ - \ = max_batch_len,\n\n # XXX(shanand): We don't need the following\ - \ arguments \n # for data processing. Added them for now to avoid\n\ - \ # Pydantic validation errors for TrainingArgs\n ckpt_output_dir\ - \ = \"data/saved_checkpoints\",\n num_epochs = 2,\n effective_batch_size\ - \ = 3840,\n save_samples = 0,\n learning_rate = 2e-6,\n \ - \ warmup_steps = 800,\n is_padding_free = True,\n )\n \ - \ def data_processing(train_args: TrainingArgs) -> None:\n # early\ - \ validation logic here\n if train_args.max_batch_len < train_args.max_seq_len:\n\ - \ raise ValueError(\n f\"the `max_batch_len` cannot\ + \ )\n\n # define training-specific arguments\n training_args =\ + \ TrainingArgs(\n # define data-specific arguments\n model_path=model.path,\n\ + \ data_path=f\"{sdg.path}/*_train_msgs*.jsonl\",\n data_output_dir=processed_data.path,\n\ + \ # define model-trianing parameters\n max_seq_len=max_seq_len,\n\ + \ max_batch_len=max_batch_len,\n # XXX(shanand): We don't\ + \ need the following arguments\n # for data processing. Added them\ + \ for now to avoid\n # Pydantic validation errors for TrainingArgs\n\ + \ ckpt_output_dir=\"data/saved_checkpoints\",\n num_epochs=2,\n\ + \ effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n\ + \ warmup_steps=800,\n is_padding_free=True,\n )\n\n \ + \ def data_processing(train_args: TrainingArgs) -> None:\n # early\ + \ validation logic here\n if train_args.max_batch_len < train_args.max_seq_len:\n\ + \ raise ValueError(\n f\"the `max_batch_len` cannot\ \ be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}\"\ - \n )\n\n # process the training data\n if not os.path.exists(train_args.data_output_dir):\n\ - \ os.makedirs(train_args.data_output_dir, exist_ok=True)\n \ - \ dp.main(\n DataProcessArgs(\n # XXX(osilkin): make\ - \ a decision here, either:\n # 1. the CLI is fully responsible\ - \ for managing where the data is written\n # 2. we never\ - \ cache it and simply write it to a tmp file every time.\n \ - \ #\n # An important reason for why #1 would be preferable\ - \ is in the case of OpenShift/SELinux\n # where the user has\ - \ a defined place for new temporary data to be written.\n data_output_path=train_args.data_output_dir,\n\ - \ model_path=train_args.model_path,\n data_path=train_args.data_path,\n\ - \ max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n\ - \ )\n )\n data_processing(train_args=training_args)\n\n" + \n )\n\n # process the training data\n if not\ + \ os.path.exists(train_args.data_output_dir):\n os.makedirs(train_args.data_output_dir,\ + \ exist_ok=True)\n dp.main(\n DataProcessArgs(\n \ + \ # XXX(osilkin): make a decision here, either:\n \ + \ # 1. the CLI is fully responsible for managing where the data is\ + \ written\n # 2. we never cache it and simply write it\ + \ to a tmp file every time.\n #\n # An important\ + \ reason for why #1 would be preferable is in the case of OpenShift/SELinux\n\ + \ # where the user has a defined place for new temporary\ + \ data to be written.\n data_output_path=train_args.data_output_dir,\n\ + \ model_path=train_args.model_path,\n data_path=train_args.data_path,\n\ + \ max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n\ + \ )\n )\n\n data_processing(train_args=training_args)\n\ + \n" image: registry.access.redhat.com/ubi9/python-311:latest exec-deletepvc: container: @@ -613,7 +614,8 @@ deploymentSpec: && cd {{$.outputs.artifacts[''taxonomy''].path}} && if [ -n "{{$.inputs.parameters[''repo_branch'']}}" ]; then git fetch origin {{$.inputs.parameters[''repo_branch'']}} && git checkout {{$.inputs.parameters[''repo_branch'']}}; elif [ -n "{{$.inputs.parameters[''repo_pr'']}}" - && {{$.inputs.parameters[''repo_pr'']}} -gt 0 ]; then git fetch origin pull/{{$.inputs.parameters[''repo_pr'']}}/head:{{$.inputs.parameters[''repo_pr'']}} + ] && [ {{$.inputs.parameters[''repo_pr'']}} -gt 0 ]; then git fetch origin + pull/{{$.inputs.parameters[''repo_pr'']}}/head:{{$.inputs.parameters[''repo_pr'']}} && git checkout {{$.inputs.parameters[''repo_pr'']}}; fi ' command: - /bin/sh @@ -711,7 +713,7 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef list_models_in_directory_op(models_folder: str) -> List:\n \ - \ import os\n models = os.listdir(models_folder)\n return models\n\ + \ import os\n\n models = os.listdir(models_folder)\n return models\n\ \n" image: python:3.8 exec-list-models-in-directory-op-2: @@ -741,7 +743,7 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef list_models_in_directory_op(models_folder: str) -> List:\n \ - \ import os\n models = os.listdir(models_folder)\n return models\n\ + \ import os\n\n models = os.listdir(models_folder)\n return models\n\ \n" image: python:3.8 exec-load-mmlu-results-op: @@ -772,7 +774,7 @@ deploymentSpec: - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef load_mmlu_results_op(mmlu_output: Input[Artifact]) -> list:\n\ \ import json\n\n mmlu_score_list = []\n with open(mmlu_output.path,\ - \ 'r') as f:\n mmlu_score_list = json.load(f)\n\n print(\"MMLU\ + \ \"r\") as f:\n mmlu_score_list = json.load(f)\n\n print(\"MMLU\ \ Evaluation Data:\")\n for mmlu_score in mmlu_score_list:\n print(json.dumps(mmlu_score,\ \ indent=4))\n\n return mmlu_score_list\n\n" image: registry.access.redhat.com/ubi9/python-311:latest @@ -820,21 +822,21 @@ deploymentSpec: - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ \ str,\n output_pvc_name: str,\n name_suffix: str,\n path_to_model:\ - \ str,\n phase_name: str\n) -> NamedTuple(\"outputs\", manifest=str,\ + \ str,\n phase_name: str,\n) -> NamedTuple(\"outputs\", manifest=str,\ \ name=str):\n import inspect\n\n Outputs = NamedTuple(\"outputs\"\ , manifest=str, name=str)\n name = f\"train-{phase_name}-{name_suffix.rstrip('-sdg')}\"\ - \n\n image = 'quay.io/shanand/test-train:0.0.4'\n nprocPerNode = 3\n\ - \ nnodes = 2\n\n manifest = inspect.cleandoc(\n f\"\"\"\n \ - \ apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n\ - \ name: {name}\n spec:\n nprocPerNode: \\\\\"{nprocPerNode}\\\ - \\\"\n pytorchReplicaSpecs:\n Master:\n \ - \ replicas: 1\n restartPolicy: OnFailure\n template:\n\ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n mkdir -p /output/model;\n \ - \ mkdir -p /output/data;\n \ - \ python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir\ + \n\n image = \"quay.io/shanand/test-train:0.0.4\"\n nprocPerNode =\ + \ 3\n nnodes = 2\n\n manifest = inspect.cleandoc(\n f\"\"\"\ + \n apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n \ + \ metadata:\n name: {name}\n spec:\n nprocPerNode:\ + \ \\\\\"{nprocPerNode}\\\\\"\n pytorchReplicaSpecs:\n \ + \ Master:\n replicas: 1\n restartPolicy: OnFailure\n\ + \ template:\n metadata:\n annotations:\n\ + \ sidecar.istio.io/inject: 'false'\n spec:\n\ + \ containers:\n - args:\n \ + \ - |\n mkdir -p /output/model;\n\ + \ mkdir -p /output/data;\n \ + \ python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir\ \ /output/model --data_output_dir /input_data/processed_data\n \ \ command:\n - /bin/bash\n \ \ - '-c'\n - '--'\n \ @@ -917,21 +919,21 @@ deploymentSpec: - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ \ str,\n output_pvc_name: str,\n name_suffix: str,\n path_to_model:\ - \ str,\n phase_name: str\n) -> NamedTuple(\"outputs\", manifest=str,\ + \ str,\n phase_name: str,\n) -> NamedTuple(\"outputs\", manifest=str,\ \ name=str):\n import inspect\n\n Outputs = NamedTuple(\"outputs\"\ , manifest=str, name=str)\n name = f\"train-{phase_name}-{name_suffix.rstrip('-sdg')}\"\ - \n\n image = 'quay.io/shanand/test-train:0.0.4'\n nprocPerNode = 3\n\ - \ nnodes = 2\n\n manifest = inspect.cleandoc(\n f\"\"\"\n \ - \ apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n\ - \ name: {name}\n spec:\n nprocPerNode: \\\\\"{nprocPerNode}\\\ - \\\"\n pytorchReplicaSpecs:\n Master:\n \ - \ replicas: 1\n restartPolicy: OnFailure\n template:\n\ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n mkdir -p /output/model;\n \ - \ mkdir -p /output/data;\n \ - \ python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir\ + \n\n image = \"quay.io/shanand/test-train:0.0.4\"\n nprocPerNode =\ + \ 3\n nnodes = 2\n\n manifest = inspect.cleandoc(\n f\"\"\"\ + \n apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n \ + \ metadata:\n name: {name}\n spec:\n nprocPerNode:\ + \ \\\\\"{nprocPerNode}\\\\\"\n pytorchReplicaSpecs:\n \ + \ Master:\n replicas: 1\n restartPolicy: OnFailure\n\ + \ template:\n metadata:\n annotations:\n\ + \ sidecar.istio.io/inject: 'false'\n spec:\n\ + \ containers:\n - args:\n \ + \ - |\n mkdir -p /output/model;\n\ + \ mkdir -p /output/data;\n \ + \ python3.11 -u run_main_ds.py --model_path {path_to_model} --ckpt_output_dir\ \ /output/model --data_output_dir /input_data/processed_data\n \ \ command:\n - /bin/bash\n \ \ - '-c'\n - '--'\n \ @@ -1015,34 +1017,37 @@ deploymentSpec: \ *\n\ndef run_mmlu_op(\n mmlu_output: Output[Artifact],\n models_path_prefix:\ \ str,\n mmlu_tasks_list: str,\n model_dtype: str,\n few_shots:\ \ int,\n batch_size: int,\n device: str,\n models_list: List[str],\n\ - ) -> NamedTuple('outputs', best_model=str, best_score=float):\n import\ + ) -> NamedTuple(\"outputs\", best_model=str, best_score=float):\n import\ \ json\n import os\n import torch\n from instructlab.eval.mmlu\ - \ import MMLUEvaluator, MMLU_TASKS\n\n mmlu_tasks = mmlu_tasks_list.split(',')\ - \ if mmlu_tasks_list else MMLU_TASKS\n\n # Device setup and debug\n \ - \ gpu_available = torch.cuda.is_available()\n gpu_name = torch.cuda.get_device_name(torch.cuda.current_device())\ - \ if gpu_available else \"No GPU available\"\n\n print(f\"GPU Available:\ - \ {gpu_available}, Using: {gpu_name}\")\n\n effective_device = device\ - \ if device is not None else (\"cuda\" if gpu_available else \"cpu\")\n\ - \ print(f\"Running on device: {effective_device}\")\n\n scores = {}\n\ - \ all_mmlu_data = []\n\n for model_name in models_list:\n model_path\ - \ = f\"{models_path_prefix}/{model_name}\"\n # Debug\n print(f\"\ - Model {model_name} is stored at: {model_path}\")\n\n # Evaluation\n\ - \ evaluator = MMLUEvaluator(\n model_path=model_path,\n\ - \ tasks=mmlu_tasks,\n model_dtype=model_dtype,\n \ - \ few_shots=few_shots,\n batch_size=batch_size,\n \ - \ device=effective_device,\n )\n\n mmlu_score, individual_scores\ - \ = evaluator.run()\n average_score = round(mmlu_score, 2)\n \ - \ print(f\"Model {model_name} is stored at: {model_path} with AVERAGE_SCORE:\ - \ {average_score}\")\n\n mmlu_data = {\n \"report_title\"\ - : \"KNOWLEDGE EVALUATION REPORT\",\n \"model\": model_name,\n\ - \ \"average_score\": average_score,\n \"number_of_tasks\"\ - : len(individual_scores),\n \"individual_scores\": [{task: round(score['score'],\ - \ 2)} for task, score in individual_scores.items()]\n }\n\n \ - \ all_mmlu_data.append(mmlu_data)\n scores[model_path] = average_score\n\ - \n with open(mmlu_output.path, 'w') as f:\n json.dump(all_mmlu_data,\ - \ f, indent=4)\n outputs = NamedTuple('outputs', best_model=str, best_score=float)\n\ - \ best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n\ - \ return outputs(best_model=best_model, best_score=best_score)\n\n" + \ import MMLUEvaluator, MMLU_TASKS\n\n mmlu_tasks = mmlu_tasks_list.split(\"\ + ,\") if mmlu_tasks_list else MMLU_TASKS\n\n # Device setup and debug\n\ + \ gpu_available = torch.cuda.is_available()\n gpu_name = (\n \ + \ torch.cuda.get_device_name(torch.cuda.current_device())\n if\ + \ gpu_available\n else \"No GPU available\"\n )\n\n print(f\"\ + GPU Available: {gpu_available}, Using: {gpu_name}\")\n\n effective_device\ + \ = (\n device if device is not None else (\"cuda\" if gpu_available\ + \ else \"cpu\")\n )\n print(f\"Running on device: {effective_device}\"\ + )\n\n scores = {}\n all_mmlu_data = []\n\n for model_name in models_list:\n\ + \ model_path = f\"{models_path_prefix}/{model_name}\"\n #\ + \ Debug\n print(f\"Model {model_name} is stored at: {model_path}\"\ + )\n\n # Evaluation\n evaluator = MMLUEvaluator(\n \ + \ model_path=model_path,\n tasks=mmlu_tasks,\n \ + \ model_dtype=model_dtype,\n few_shots=few_shots,\n \ + \ batch_size=batch_size,\n device=effective_device,\n \ + \ )\n\n mmlu_score, individual_scores = evaluator.run()\n \ + \ average_score = round(mmlu_score, 2)\n print(\n f\"\ + Model {model_name} is stored at: {model_path} with AVERAGE_SCORE: {average_score}\"\ + \n )\n\n mmlu_data = {\n \"report_title\": \"KNOWLEDGE\ + \ EVALUATION REPORT\",\n \"model\": model_name,\n \ + \ \"average_score\": average_score,\n \"number_of_tasks\": len(individual_scores),\n\ + \ \"individual_scores\": [\n {task: round(score[\"\ + score\"], 2)}\n for task, score in individual_scores.items()\n\ + \ ],\n }\n\n all_mmlu_data.append(mmlu_data)\n\ + \ scores[model_path] = average_score\n\n with open(mmlu_output.path,\ + \ \"w\") as f:\n json.dump(all_mmlu_data, f, indent=4)\n outputs\ + \ = NamedTuple(\"outputs\", best_model=str, best_score=float)\n best_model\ + \ = max(scores, key=scores.get)\n best_score = scores[best_model]\n \ + \ return outputs(best_model=best_model, best_score=best_score)\n\n" image: quay.io/sallyom/instructlab-ocp:eval resources: accelerator: @@ -1080,64 +1085,71 @@ deploymentSpec: \ bool,\n # generate_answers,judgment uses a magic word for its mt_bench\ \ evaluator - `auto`\n # with `auto`, number of gpus allocated for serving\ \ is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n\ - \ max_workers: str = \"auto\",\n device: str = None,\n) -> NamedTuple('outputs',\ - \ best_model=str, best_score=float):\n\n\n def launch_vllm_server_background(model_path:\ - \ str, gpu_count: int, retries: int = 60, delay: int = 5):\n import\ - \ subprocess\n import sys\n import time\n import requests\n\ - \n if gpu_count > 0:\n command = [\n sys.executable,\n\ - \ \"-m\", \"vllm.entrypoints.openai.api_server\",\n \ - \ \"--model\", model_path,\n \"--tensor-parallel-size\"\ - , str(gpu_count),\n ]\n else:\n command = [\n\ - \ sys.executable,\n \"-m\", \"vllm.entrypoints.openai.api_server\"\ - ,\n \"--model\", model_path,\n ]\n\n subprocess.Popen(args=command)\n\ - \n server_url = \"http://localhost:8000/v1\"\n print(f\"Waiting\ - \ for vLLM server to start at {server_url}...\")\n\n for attempt\ - \ in range(retries):\n try:\n response = requests.get(f\"\ - {server_url}/models\")\n if response.status_code == 200:\n\ - \ print(f\"vLLM server is up and running at {server_url}.\"\ - )\n return\n except requests.ConnectionError:\n\ - \ pass\n\n print(f\"Server not available yet,\ - \ retrying in {delay} seconds (Attempt {attempt + 1}/{retries})...\")\n\ - \ time.sleep(delay)\n\n raise RuntimeError(f\"Failed to\ - \ start vLLM server at {server_url} after {retries} retries.\")\n\n #\ - \ This seems like excessive effort to stop the vllm process, but merely\ - \ saving & killing the pid doesn't work\n # Also, the base image does\ - \ not include `pkill` cmd, so can't pkill -f vllm.entrypoints.openai.api_server\ - \ either\n def stop_vllm_server_by_name():\n import psutil\n\n\ - \ for process in psutil.process_iter(attrs=[\"pid\", \"name\", \"\ - cmdline\"]):\n cmdline = process.info.get(\"cmdline\")\n \ - \ if cmdline and \"vllm.entrypoints.openai.api_server\" in cmdline:\n\ - \ print(f\"Found vLLM server process with PID: {process.info['pid']},\ - \ terminating...\")\n try:\n process.terminate()\ + \ max_workers: str = \"auto\",\n device: str = None,\n) -> NamedTuple(\"\ + outputs\", best_model=str, best_score=float):\n def launch_vllm_server_background(\n\ + \ model_path: str, gpu_count: int, retries: int = 60, delay: int\ + \ = 5\n ):\n import subprocess\n import sys\n import\ + \ time\n import requests\n\n if gpu_count > 0:\n \ + \ command = [\n sys.executable,\n \"-m\"\ + ,\n \"vllm.entrypoints.openai.api_server\",\n \ + \ \"--model\",\n model_path,\n \"--tensor-parallel-size\"\ + ,\n str(gpu_count),\n ]\n else:\n \ + \ command = [\n sys.executable,\n \"\ + -m\",\n \"vllm.entrypoints.openai.api_server\",\n \ + \ \"--model\",\n model_path,\n ]\n\n \ + \ subprocess.Popen(args=command)\n\n server_url = \"http://localhost:8000/v1\"\ + \n print(f\"Waiting for vLLM server to start at {server_url}...\"\ + )\n\n for attempt in range(retries):\n try:\n \ + \ response = requests.get(f\"{server_url}/models\")\n \ + \ if response.status_code == 200:\n print(f\"vLLM\ + \ server is up and running at {server_url}.\")\n return\n\ + \ except requests.ConnectionError:\n pass\n\n\ + \ print(\n f\"Server not available yet, retrying\ + \ in {delay} seconds (Attempt {attempt + 1}/{retries})...\"\n \ + \ )\n time.sleep(delay)\n\n raise RuntimeError(\n \ + \ f\"Failed to start vLLM server at {server_url} after {retries}\ + \ retries.\"\n )\n\n # This seems like excessive effort to stop\ + \ the vllm process, but merely saving & killing the pid doesn't work\n \ + \ # Also, the base image does not include `pkill` cmd, so can't pkill\ + \ -f vllm.entrypoints.openai.api_server either\n def stop_vllm_server_by_name():\n\ + \ import psutil\n\n for process in psutil.process_iter(attrs=[\"\ + pid\", \"name\", \"cmdline\"]):\n cmdline = process.info.get(\"\ + cmdline\")\n if cmdline and \"vllm.entrypoints.openai.api_server\"\ + \ in cmdline:\n print(\n f\"Found vLLM\ + \ server process with PID: {process.info['pid']}, terminating...\"\n \ + \ )\n try:\n process.terminate()\ \ # Try graceful termination\n process.wait(timeout=5)\ \ # Wait a bit for it to terminate\n if process.is_running():\n\ - \ print(f\"Forcefully killing vLLM server process\ - \ with PID: {process.info['pid']}\")\n process.kill()\ - \ # Force kill if it's still running\n print(f\"Successfully\ - \ stopped vLLM server with PID: {process.info['pid']}\")\n \ - \ except psutil.NoSuchProcess:\n print(f\"Process with\ - \ PID {process.info['pid']} no longer exists.\")\n except\ - \ psutil.AccessDenied:\n print(f\"Access denied when\ - \ trying to terminate process with PID {process.info['pid']}.\")\n \ - \ except Exception as e:\n print(f\"Failed\ - \ to terminate process with PID {process.info['pid']}. Error: {e}\")\n\n\ - \n import json\n import torch\n import os\n\n from instructlab.eval\ - \ import mt_bench_answers, mt_bench_judgment\n\n os.environ[\"PYTORCH_CUDA_ALLOC_CONF\"\ - ] = \"expandable_segments:True\"\n candidate_server_url = \"http://localhost:8000/v1\"\ - \n\n gpu_available = torch.cuda.is_available()\n gpu_name = torch.cuda.get_device_name(torch.cuda.current_device())\ - \ if gpu_available else \"No GPU available\"\n gpu_count = torch.cuda.device_count()\ - \ if gpu_available else 0\n\n print(f\"GPU Available: {gpu_available},\ - \ {gpu_name}\")\n\n # See note above about magic word \"auto\"\n if\ - \ max_workers == \"auto\":\n try:\n usable_cpu_count =\ - \ len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n \ - \ usable_cpu_count = multiprocessing.cpu_count() // 2\n \ - \ max_workers = usable_cpu_count\n\n # TODO: Using evaluator results\ - \ in connection errors, need to determine why.\n # For now, using\ - \ mt_bench_answers.generate_answers & mt_bench_judgment.generate_judgment\n\ - \ #evaluator = MTBenchEvaluator(\n # model_name=candidate_model_name,\n\ + \ print(\n f\"Forcefully\ + \ killing vLLM server process with PID: {process.info['pid']}\"\n \ + \ )\n process.kill() # Force kill\ + \ if it's still running\n print(\n \ + \ f\"Successfully stopped vLLM server with PID: {process.info['pid']}\"\ + \n )\n except psutil.NoSuchProcess:\n\ + \ print(f\"Process with PID {process.info['pid']} no\ + \ longer exists.\")\n except psutil.AccessDenied:\n \ + \ print(\n f\"Access denied when trying\ + \ to terminate process with PID {process.info['pid']}.\"\n \ + \ )\n except Exception as e:\n print(\n\ + \ f\"Failed to terminate process with PID {process.info['pid']}.\ + \ Error: {e}\"\n )\n\n import json\n import torch\n\ + \ import os\n\n from instructlab.eval import mt_bench_answers, mt_bench_judgment\n\ + \n os.environ[\"PYTORCH_CUDA_ALLOC_CONF\"] = \"expandable_segments:True\"\ + \n candidate_server_url = \"http://localhost:8000/v1\"\n\n gpu_available\ + \ = torch.cuda.is_available()\n gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n\ + \ if gpu_available\n else \"No GPU available\"\n )\n \ + \ gpu_count = torch.cuda.device_count() if gpu_available else 0\n\n \ + \ print(f\"GPU Available: {gpu_available}, {gpu_name}\")\n\n # See note\ + \ above about magic word \"auto\"\n if max_workers == \"auto\":\n \ + \ try:\n usable_cpu_count = len(os.sched_getaffinity(0))\ + \ // 2\n except AttributeError:\n usable_cpu_count = multiprocessing.cpu_count()\ + \ // 2\n max_workers = usable_cpu_count\n\n # TODO: Using evaluator\ + \ results in connection errors, need to determine why.\n # For\ + \ now, using mt_bench_answers.generate_answers & mt_bench_judgment.generate_judgment\n\ + \ # evaluator = MTBenchEvaluator(\n # model_name=candidate_model_name,\n\ \ # judge_model_name=judge_model_name,\n # max_workers=max_workers,\n\ - \ # merge_system_user_message=merge_system_user_message\n #)\n\n\ - \ judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ + \ # merge_system_user_message=merge_system_user_message\n # )\n\ + \n judge_api_key = os.getenv(\"JUDGE_API_KEY\", \"\")\n judge_model_name\ \ = os.getenv(\"JUDGE_NAME\")\n judge_endpoint = os.getenv(\"JUDGE_ENDPOINT\"\ )\n\n scores = {}\n all_mt_bench_data = []\n\n for model_name in\ \ models_list:\n print(f\"Serving candidate model: {model_name}\"\ @@ -1146,24 +1158,24 @@ deploymentSpec: \ gpu_count)\n\n # model ID is the model_path value in vLLM\n \ \ print(\"Generating answers...\")\n mt_bench_answers.generate_answers(\n\ \ model_name=model_path,\n model_api_base=candidate_server_url,\n\ - \ output_dir=\"/tmp/eval_output\",\n max_workers=max_workers\n\ - \ )\n\n print(\"Judging answers...\")\n overall_score,\ - \ qa_pairs, turn_scores, error_rate = mt_bench_judgment.generate_judgment(\n\ - \ model_name=model_path,\n judge_model_name=judge_model_name,\n\ - \ model_api_base=judge_endpoint,\n api_key=judge_api_key,\n\ \ output_dir=\"/tmp/eval_output\",\n max_workers=max_workers,\n\ - \ merge_system_user_message=merge_system_user_message\n \ - \ )\n\n stop_vllm_server_by_name()\n\n mt_bench_data = {\n\ - \ \"report_title\": \"SKILLS EVALUATION REPORT\",\n \ - \ \"model\": model_path,\n \"judge_model\": judge_model_name,\n\ - \ \"overall_score\": overall_score,\n \"turn_scores\"\ - : turn_scores,\n \"qa_scores\": qa_pairs,\n \"error_rate\"\ - : error_rate,\n }\n\n all_mt_bench_data.append(mt_bench_data)\n\ + \ )\n\n print(\"Judging answers...\")\n overall_score,\ + \ qa_pairs, turn_scores, error_rate = (\n mt_bench_judgment.generate_judgment(\n\ + \ model_name=model_path,\n judge_model_name=judge_model_name,\n\ + \ model_api_base=judge_endpoint,\n api_key=judge_api_key,\n\ + \ output_dir=\"/tmp/eval_output\",\n max_workers=max_workers,\n\ + \ merge_system_user_message=merge_system_user_message,\n\ + \ )\n )\n\n stop_vllm_server_by_name()\n\n \ + \ mt_bench_data = {\n \"report_title\": \"SKILLS EVALUATION\ + \ REPORT\",\n \"model\": model_path,\n \"judge_model\"\ + : judge_model_name,\n \"overall_score\": overall_score,\n \ + \ \"turn_scores\": turn_scores,\n \"qa_scores\": qa_pairs,\n\ + \ \"error_rate\": error_rate,\n }\n\n all_mt_bench_data.append(mt_bench_data)\n\ \ scores[model_path] = overall_score\n\n with open(mt_bench_output.path,\ - \ 'w') as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n outputs\ - \ = NamedTuple('outputs', best_model=str, best_score=float)\n best_model\ - \ = max(scores, key=scores.get)\n best_score = scores[best_model]\n \ - \ return outputs(best_model=best_model, best_score=best_score)\n\n" + \ \"w\") as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n \ + \ outputs = NamedTuple(\"outputs\", best_model=str, best_score=float)\n\ + \ best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n\ + \ return outputs(best_model=best_model, best_score=best_score)\n\n" image: quay.io/sallyom/instructlab-ocp:eval resources: accelerator: diff --git a/sdg/components.py b/sdg/components.py index 6e77f4b..c5203fd 100644 --- a/sdg/components.py +++ b/sdg/components.py @@ -5,6 +5,7 @@ IMAGE = "quay.io/tcoufal/ilab-sdg:latest" + @dsl.container_component def git_clone_op( taxonomy: dsl.Output[dsl.Dataset], diff --git a/sdg/faked/components.py b/sdg/faked/components.py index 3319361..a2db2dd 100644 --- a/sdg/faked/components.py +++ b/sdg/faked/components.py @@ -4,6 +4,7 @@ from kfp import dsl from utils.consts import PYTHON_IMAGE + @dsl.component(base_image=PYTHON_IMAGE) def git_clone_op( taxonomy: dsl.Output[dsl.Dataset], @@ -14,7 +15,12 @@ def git_clone_op( return -@dsl.component(base_image=PYTHON_IMAGE, packages_to_install=["git+https://github.com/redhat-et/ilab-on-ocp.git#subdirectory=sdg/faked/fixtures"]) +@dsl.component( + base_image=PYTHON_IMAGE, + packages_to_install=[ + "git+https://github.com/redhat-et/ilab-on-ocp.git#subdirectory=sdg/faked/fixtures" + ], +) def sdg_op( num_instructions_to_generate: int, taxonomy: dsl.Input[dsl.Dataset], diff --git a/training/components.py b/training/components.py index 7dcc8dd..ea691a3 100644 --- a/training/components.py +++ b/training/components.py @@ -6,70 +6,77 @@ from utils.consts import PYTHON_IMAGE from typing import Optional -@dsl.component(base_image=PYTHON_IMAGE, - packages_to_install=["instructlab-training@git+https://github.com/instructlab/training.git"]) + +@dsl.component( + base_image=PYTHON_IMAGE, + packages_to_install=[ + "instructlab-training@git+https://github.com/instructlab/training.git" + ], +) def data_processing_op( sdg: dsl.Input[dsl.Dataset], processed_data: dsl.Output[dsl.Dataset], model: dsl.Input[dsl.Artifact], max_seq_len: Optional[int] = 4096, - max_batch_len: Optional[int] = 20000 + max_batch_len: Optional[int] = 20000, ): import instructlab.training.data_process as dp import os from instructlab.training import ( TrainingArgs, DataProcessArgs, - ) - # define training-specific arguments + ) + + # define training-specific arguments training_args = TrainingArgs( # define data-specific arguments - model_path = model.path, - data_path = f"{sdg.path}/*_train_msgs*.jsonl", - data_output_dir = processed_data.path, - + model_path=model.path, + data_path=f"{sdg.path}/*_train_msgs*.jsonl", + data_output_dir=processed_data.path, # define model-trianing parameters - max_seq_len = max_seq_len, - max_batch_len = max_batch_len, - - # XXX(shanand): We don't need the following arguments - # for data processing. Added them for now to avoid - # Pydantic validation errors for TrainingArgs - ckpt_output_dir = "data/saved_checkpoints", - num_epochs = 2, - effective_batch_size = 3840, - save_samples = 0, - learning_rate = 2e-6, - warmup_steps = 800, - is_padding_free = True, + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, ) + def data_processing(train_args: TrainingArgs) -> None: - # early validation logic here - if train_args.max_batch_len < train_args.max_seq_len: - raise ValueError( - f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" - ) - - # process the training data - if not os.path.exists(train_args.data_output_dir): - os.makedirs(train_args.data_output_dir, exist_ok=True) - dp.main( - DataProcessArgs( - # XXX(osilkin): make a decision here, either: - # 1. the CLI is fully responsible for managing where the data is written - # 2. we never cache it and simply write it to a tmp file every time. - # - # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux - # where the user has a defined place for new temporary data to be written. - data_output_path=train_args.data_output_dir, - model_path=train_args.model_path, - data_path=train_args.data_path, - max_seq_len=train_args.max_seq_len, - chat_tmpl_path=train_args.chat_tmpl_path, - ) - ) + # early validation logic here + if train_args.max_batch_len < train_args.max_seq_len: + raise ValueError( + f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" + ) + + # process the training data + if not os.path.exists(train_args.data_output_dir): + os.makedirs(train_args.data_output_dir, exist_ok=True) + dp.main( + DataProcessArgs( + # XXX(osilkin): make a decision here, either: + # 1. the CLI is fully responsible for managing where the data is written + # 2. we never cache it and simply write it to a tmp file every time. + # + # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux + # where the user has a defined place for new temporary data to be written. + data_output_path=train_args.data_output_dir, + model_path=train_args.model_path, + data_path=train_args.data_path, + max_seq_len=train_args.max_seq_len, + chat_tmpl_path=train_args.chat_tmpl_path, + ) + ) + data_processing(train_args=training_args) + @dsl.component(base_image=PYTHON_IMAGE) def pytorchjob_manifest_op( model_pvc_name: str, @@ -77,17 +84,17 @@ def pytorchjob_manifest_op( output_pvc_name: str, name_suffix: str, path_to_model: str, - phase_name: str + phase_name: str, ) -> NamedTuple("outputs", manifest=str, name=str): import inspect Outputs = NamedTuple("outputs", manifest=str, name=str) name = f"train-{phase_name}-{name_suffix.rstrip('-sdg')}" - image = 'quay.io/shanand/test-train:0.0.4' + image = "quay.io/shanand/test-train:0.0.4" nprocPerNode = 3 nnodes = 2 - + manifest = inspect.cleandoc( f""" apiVersion: kubeflow.org/v1 diff --git a/training/run_main_ds.py b/training/run_main_ds.py index e5b94a2..4ae3958 100644 --- a/training/run_main_ds.py +++ b/training/run_main_ds.py @@ -2,12 +2,7 @@ import os import subprocess -from instructlab.training import ( - TorchrunArgs, - TrainingArgs, - DeepSpeedOptions, - config -) +from instructlab.training import TorchrunArgs, TrainingArgs, DeepSpeedOptions, config from instructlab.training.utils import StreamablePopen @@ -126,8 +121,10 @@ def run_main_ds(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: # Training Args parser.add_argument("--cpu_offload", default=True) parser.add_argument("--model_path", default="ibm-granite/granite-7b-base") - parser.add_argument("--data_path", default="training/sample-data/train_all_pruned_SDG.jsonl") - parser.add_argument("--ckpt_output_dir",default="data/saved_checkpoints") + parser.add_argument( + "--data_path", default="training/sample-data/train_all_pruned_SDG.jsonl" + ) + parser.add_argument("--ckpt_output_dir", default="data/saved_checkpoints") parser.add_argument("--data_output_dir", default="data/outputs") parser.add_argument("--max_seq_len", default=4096) parser.add_argument("--max_batch_len", default=20000) @@ -145,39 +142,40 @@ def run_main_ds(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: parser.add_argument("--nproc_per_node", default=os.getenv("NPROC_PER_NODE", "1")) parser.add_argument("--node_rank", default=os.getenv("RANK", "0")) parser.add_argument("--rdzv_id", default=123) - parser.add_argument("--rdzv_endpoint", default=f"{os.getenv('MASTER_ADDR', '127.0.0.1')}:{os.getenv('MASTER_PORT','12345')}") + parser.add_argument( + "--rdzv_endpoint", + default=f"{os.getenv('MASTER_ADDR', '127.0.0.1')}:{os.getenv('MASTER_PORT','12345')}", + ) args = parser.parse_args() # define training-specific arguments training_args = TrainingArgs( - deepspeed_options = DeepSpeedOptions(cpu_offload_optimizer=args.cpu_offload), + deepspeed_options=DeepSpeedOptions(cpu_offload_optimizer=args.cpu_offload), # define data-specific arguments - model_path = args.model_path, - data_path = args.data_path, - ckpt_output_dir = args.ckpt_output_dir, - data_output_dir = args.data_output_dir, - + model_path=args.model_path, + data_path=args.data_path, + ckpt_output_dir=args.ckpt_output_dir, + data_output_dir=args.data_output_dir, # define model-trianing parameters - max_seq_len = args.max_seq_len, - max_batch_len = args.max_batch_len, - num_epochs = args.num_epochs, - effective_batch_size = args.effective_batch_size, - save_samples = args.save_samples, - learning_rate = args.learning_rate, - warmup_steps = args.warmup_steps, - is_padding_free = args.is_padding_free, # set this to true when using Granite-based models - random_seed = args.random_seed, - checkpoint_at_epoch = args.checkpoint_at_epoch + max_seq_len=args.max_seq_len, + max_batch_len=args.max_batch_len, + num_epochs=args.num_epochs, + effective_batch_size=args.effective_batch_size, + save_samples=args.save_samples, + learning_rate=args.learning_rate, + warmup_steps=args.warmup_steps, + is_padding_free=args.is_padding_free, # set this to true when using Granite-based models + random_seed=args.random_seed, + checkpoint_at_epoch=args.checkpoint_at_epoch, ) torchrun_args = TorchrunArgs( - nnodes = args.nnodes, # number of machines - nproc_per_node = args.nproc_per_node, # num GPUs per machine - node_rank = args.node_rank, # node rank for this machine - rdzv_id = args.rdzv_id, - rdzv_endpoint = args.rdzv_endpoint, + nnodes=args.nnodes, # number of machines + nproc_per_node=args.nproc_per_node, # num GPUs per machine + node_rank=args.node_rank, # node rank for this machine + rdzv_id=args.rdzv_id, + rdzv_endpoint=args.rdzv_endpoint, ) run_main_ds(torch_args=torchrun_args, train_args=training_args) - diff --git a/utils/components.py b/utils/components.py index 6e92e28..64bbef0 100644 --- a/utils/components.py +++ b/utils/components.py @@ -57,12 +57,15 @@ def pvc_to_model_op(model: dsl.Output[dsl.Model], pvc_path: str): [f"cp -r {pvc_path} {model.path}"], ) + @dsl.component def list_models_in_directory_op(models_folder: str) -> List: import os + models = os.listdir(models_folder) return models + @dsl.component(base_image=PYTHON_IMAGE, packages_to_install=["huggingface_hub"]) def huggingface_importer_op(model: dsl.Output[dsl.Model], repo_name: str): from huggingface_hub import snapshot_download diff --git a/utils/faked/components.py b/utils/faked/components.py index cb9ff1a..5615dab 100644 --- a/utils/faked/components.py +++ b/utils/faked/components.py @@ -13,14 +13,17 @@ def kubectl_apply_op(manifest: str): def kubectl_wait_for_op(condition: str, kind: str, name: str): return + @dsl.component(base_image=PYTHON_IMAGE) def huggingface_importer_op(model: dsl.Output[dsl.Model], repo_name: str): return + @dsl.component(base_image=PYTHON_IMAGE) def pvc_to_artifact_op(model: dsl.Output[dsl.Artifact], pvc_path: str): return + @dsl.component(base_image=PYTHON_IMAGE) def pvc_to_model_op(model: dsl.Output[dsl.Model], pvc_path: str): return