diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 0ebeed09535a..122ca07f0afe 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -4871,6 +4871,35 @@ jobs: rm -rf /tmp/nemo2_ckpt rm -rf /tmp/nemo2_ptq_engine + L2_NeMo_2_Export_In_Framework: + needs: [cicd-test-container-setup] + uses: ./.github/workflows/_test_template.yml + if: contains(fromJSON(needs.cicd-test-container-setup.outputs.test_to_run), 'L2_NeMo_2_Export_In_Framework') || needs.cicd-test-container-setup.outputs.all == 'true' + with: + RUNNER: self-hosted-azure + SCRIPT: | + python tests/collections/llm/test_hf_import.py \ + --hf_model /home/TestData/nlp/megatron_llama/llama-ci-hf \ + --output_path /tmp/nemo2_ckpt + + python tests/setup/data/create_sample_lambada.py \ + --output_file /tmp/lambada.json + + python tests/export/nemo_export.py \ + --model_name test \ + --model_type llama \ + --checkpoint_dir /tmp/nemo2_ckpt \ + --min_tps 1 \ + --in_framework True \ + --test_deployment True \ + --run_accuracy True \ + --test_data_path /tmp/lambada.json \ + --accuracy_threshold 0.0 \ + --debug + + AFTER_SCRIPT: | + rm -rf /tmp/nemo2_ckpt /tmp/lambada.json + L2_NeMo_2_LLAVA_NEXT_MOCK_TRAINING: needs: [cicd-test-container-setup] uses: ./.github/workflows/_test_template.yml @@ -5068,6 +5097,7 @@ jobs: - L2_Megatron_GPT_Reranker - L2_NeMo_2_NeMo_Mcore_Mixtral_bitexact - L2_NeMo_2_PTQ_Llama2_FP8 + - L2_NeMo_2_Export_In_Framework - L2_NeMo_2_jit_callback - L2_NeMo_2_LLAVA_NEXT_MOCK_TRAINING - L2_HF_Transformer_SFT_FSDP2_2gpu diff --git a/nemo/deploy/nlp/megatronllm_deployable.py b/nemo/deploy/nlp/megatronllm_deployable.py index 703ad0742a17..4bd9c2e938a5 100644 --- a/nemo/deploy/nlp/megatronllm_deployable.py +++ b/nemo/deploy/nlp/megatronllm_deployable.py @@ -15,14 +15,15 @@ import logging from enum import IntEnum, auto from pathlib import Path -from typing import List +from typing import List, Optional import numpy as np import torch import torch.distributed import wrapt +from lightning.pytorch.trainer.trainer import Trainer from megatron.core.inference.common_inference_params import CommonInferenceParams -from pytorch_lightning.trainer.trainer import Trainer +from megatron.core.inference.inference_request import InferenceRequest import nemo.lightning as nl from nemo.collections.llm import inference @@ -94,7 +95,7 @@ def GetNumpyDtype(pyvalue): class ServerSync(IntEnum): - """Enum for synchronization messages using torch.distributed""" + """Enum for synchronization messages using torch.distributed.""" WAIT = auto() SIGNAL = auto() @@ -104,17 +105,35 @@ def to_long_tensor(self): class MegatronLLMDeploy: + """ + A factory class for creating deployable instances of Megatron LLM models. + This class provides a method to get the appropriate deployable instance + based on the version of the NeMo checkpoint model used. + """ @staticmethod def get_deployable( - nemo_checkpoint_filepath: str = None, + nemo_checkpoint_filepath: str, num_devices: int = 1, num_nodes: int = 1, tensor_model_parallel_size: int = 1, pipeline_model_parallel_size: int = 1, context_parallel_size: int = 1, ): - + """ + Returns the appropriate deployable instance for the given NeMo checkpoint. + + Args: + nemo_checkpoint_filepath (str): Path to the .nemo checkpoint file. + num_devices (int): Number of devices to use for deployment. + num_nodes (int): Number of nodes to use for deployment. + tensor_model_parallel_size (int): Size of the tensor model parallelism. + pipeline_model_parallel_size (int): Size of the pipeline model parallelism. + context_parallel_size (int): Size of the context parallelism. + + Returns: + ITritonDeployable: An instance of a deployable class compatible with Triton inference server. + """ if nemo_checkpoint_version(nemo_checkpoint_filepath) == NEMO2: return MegatronLLMDeployableNemo2( nemo_checkpoint_filepath=nemo_checkpoint_filepath, @@ -178,6 +197,39 @@ def __init__( inference_batch_times_seqlen_threshold=inference_batch_times_seqlen_threshold, ) + def generate( + self, + prompts: List[str], + max_batch_size: int = 4, + inference_params: Optional[CommonInferenceParams] = None, + random_seed: Optional[int] = None, + ) -> List[InferenceRequest]: + """ + Generates text based on the provided input prompts. + + Args: + prompts (List[str]): A list of input strings. + max_batch_size (int): The maximum batch size used for inference. + inference_params (Optional[CommonInferenceParams]): Parameters for controlling the inference process. + random_seed (Optional[int]): A random seed for reproducibility. + + Returns: + List[InferenceRequest]: A list containing the generated results. + """ + # TODO: This function doesn't account for parallelism settings currently + + inference_params = inference_params or CommonInferenceParams() + + results = inference.generate( + model=self.inference_wrapped_model, + tokenizer=self.mcore_tokenizer, + prompts=prompts, + max_batch_size=max_batch_size, + random_seed=random_seed, + inference_params=inference_params, + ) + return list(results) + @property def get_triton_input(self): inputs = ( @@ -222,14 +274,7 @@ def triton_infer_fn(self, **inputs: np.ndarray): return_log_probs=log_probs, ) - results = inference.generate( - model=self.inference_wrapped_model, - tokenizer=self.mcore_tokenizer, - prompts=prompts, - max_batch_size=max_batch_size, - random_seed=random_seed, - inference_params=inference_params, - ) + results = self.generate(prompts, max_batch_size, inference_params, random_seed) output_texts = [r.generated_text if text_only else r for r in results] output_infer = {"sentences": cast_output(output_texts, np.bytes_)} @@ -263,11 +308,14 @@ def __init__( raise IMPORT_ERROR if nemo_checkpoint_filepath is None and existing_model is None: raise ValueError( - "MegatronLLMDeployable requires either a .nemo checkpoint filepath or an existing MegatronGPTModel, but both provided were None" + "MegatronLLMDeployable requires either a .nemo checkpoint filepath " + "or an existing MegatronGPTModel, but both provided were None." ) if num_devices > 1: LOGGER.warning( - "Creating a MegatronLLMDeployable with num_devices>1 will assume running with a PyTorch Lightning DDP-variant strategy, which will run the main script once per device. Make sure any user code is compatible with multiple executions!" + "Creating a MegatronLLMDeployable with num_devices > 1 will assume running with " + "a PyTorch Lightning DDP-variant strategy, which will run the main script once per device. " + "Make sure any user code is compatible with multiple executions!" ) # if both existing_model and nemo_checkpoint_filepath are provided, existing_model will take precedence @@ -292,14 +340,16 @@ def _load_from_nemo_checkpoint(self, nemo_checkpoint_filepath: str, num_devices: # transformer_engine should always be true according to EricH, but GPT-2B model will fail if it is enabled if not custom_config.transformer_engine: LOGGER.warning( - "MegatronLLMDeployable expects model config transformer_engine=True, but this model has it =False. " - "Overriding it to =True, but this may break certain checkpoints converted on older Nemo versions. " + "MegatronLLMDeployable expects model config transformer_engine=True, but this model uses False. " + "Overriding it to True, but this may break certain checkpoints converted on older Nemo versions. " "If your model breaks, please try re-converting the checkpoint on the current Nemo version." ) custom_config.transformer_engine = True - # using multi-gpu for tensor parallelism directly for now, could do pipeline parallel instead or a combination + # using multi-gpu for tensor parallelism directly for now, + # could do pipeline parallel instead or a combination custom_config.tensor_model_parallel_size = num_devices - # had to override these to make Nemotron3-22B work, see sample_sequence_batch() in text_generation_utils.py + # had to override these to make Nemotron3-22B work, + # see sample_sequence_batch() in text_generation_utils.py custom_config.activations_checkpoint_granularity = None custom_config.activations_checkpoint_method = None # Models trained with TE < 1.10 and loaded with TE >= 1.10 require @@ -398,7 +448,8 @@ def generate(self, inputs: List[str], length_params: LengthParam, sampling_param distributed_rank = torch.distributed.get_rank() if distributed_rank != 0: raise ValueError( - f"Triton inference function should not be called on a thread with torch.distributed rank != 0, but this thread is rank {distributed_rank}" + "Triton inference function should not be called on a thread with " + f"torch.distributed rank != 0, but this thread is rank {distributed_rank}." ) signal_value = ServerSync.SIGNAL.to_long_tensor() torch.distributed.broadcast(signal_value, 0) diff --git a/nemo/deploy/nlp/query_llm.py b/nemo/deploy/nlp/query_llm.py index e91754297aa7..8b65a278ff41 100644 --- a/nemo/deploy/nlp/query_llm.py +++ b/nemo/deploy/nlp/query_llm.py @@ -13,7 +13,7 @@ # limitations under the License. import time -from abc import ABC, abstractmethod +from abc import ABC import numpy as np @@ -141,7 +141,7 @@ def query_llm( "object": "text_completion", "created": int(time.time()), "model": self.model_name, - "choices": [{"text": str(sentences)}], + "choices": [{"text": sentences}], } if log_probs_output is not None: openai_response["log_probs"] = log_probs_output @@ -297,7 +297,7 @@ def query_llm( "object": "text_completion", "created": int(time.time()), "model": self.model_name, - "choices": [{"text": str(sentences)}], + "choices": [{"text": sentences}], } if output_generation_logits: openai_response["choices"][0]["generation_logits"] = result_dict["generation_logits"] diff --git a/requirements/requirements_deploy.txt b/requirements/requirements_deploy.txt new file mode 100644 index 000000000000..5380398c278b --- /dev/null +++ b/requirements/requirements_deploy.txt @@ -0,0 +1,6 @@ +fastapi +nvidia-pytriton +pydantic-settings +tensorstore==0.1.45 +uvicorn +zarr diff --git a/requirements/requirements_infer.txt b/requirements/requirements_infer.txt index 5380398c278b..5f428d91fc56 100644 --- a/requirements/requirements_infer.txt +++ b/requirements/requirements_infer.txt @@ -1,3 +1,5 @@ +# This is a copy of requirements_deploy.txt for a seamless rename 'infer' -> 'deploy'. +# TODO: Remove this file once it is not used in container build anywhere. fastapi nvidia-pytriton pydantic-settings diff --git a/setup.py b/setup.py index 32e6ab414dd9..b14b96085186 100644 --- a/setup.py +++ b/setup.py @@ -77,6 +77,7 @@ def req_file(filename, folder="requirements"): 'slu': req_file("requirements_slu.txt"), 'multimodal': req_file("requirements_multimodal.txt"), 'audio': req_file("requirements_audio.txt"), + 'deploy': req_file("requirements_deploy.txt"), } @@ -257,7 +258,7 @@ def finalize_options(self): extras_require=extras_require, # Add in any packaged data. include_package_data=True, - exclude=['tools', 'tests', 'nemo.deploy', 'nemo.export'], + exclude=['tools', 'tests'], package_data={'': ['*.tsv', '*.txt', '*.far', '*.fst', '*.cpp', 'Makefile']}, zip_safe=False, # PyPI package information. diff --git a/tests/export/nemo_export.py b/tests/export/nemo_export.py index 674416861e43..c5f1124d5e84 100644 --- a/tests/export/nemo_export.py +++ b/tests/export/nemo_export.py @@ -40,12 +40,17 @@ in_framework_supported = True try: + from megatron.core.inference.common_inference_params import CommonInferenceParams from nemo.deploy.nlp import NemoQueryLLMPyTorch - from nemo.deploy.nlp.megatronllm_deployable import MegatronLLMDeployable + from nemo.deploy.nlp.megatronllm_deployable import ( + MegatronLLMDeploy, + MegatronLLMDeployable, + MegatronLLMDeployableNemo2, + ) except Exception as e: LOGGER.warning( - "Cannot import MegatronLLMDeployable or NemoQueryLLMPyTorch," - f" in-framework inference will not be available. {type(e).__name__}: {e}" + "Cannot import MegatronLLMDeploy* classes, or NemoQueryLLMPyTorch, or CommonInferenceParams, " + f"in-framework inference will not be available. Reason: {type(e).__name__}: {e}" ) in_framework_supported = False @@ -124,6 +129,18 @@ def get_accuracy_with_lambada(model, nq, task_ids, lora_uids, test_data_path): ) # MegatronLLMDeployable returns prompt + generated output, so need to slice off prompt model_output = model_output["sentences"][0][len(prompt) :].strip().lower() + elif in_framework_supported and isinstance(model, MegatronLLMDeployableNemo2): + model_output = model.generate( + prompts=[prompt], + inference_params=CommonInferenceParams( + temperature=0.1, + top_k=1, + top_p=0, + num_tokens_to_generate=1, + return_log_probs=False, + ), + ) + model_output = model_output[0].generated_text # Index [0] as a single prompt is used else: model_output = model.forward( input_texts=[prompt], @@ -158,8 +175,17 @@ def get_accuracy_with_lambada(model, nq, task_ids, lora_uids, test_data_path): top_p=0, temperature=0.1, ) - # MegatronLLMDeployable returns prompt + generated output, so need to slice off prompt - deployed_output = deployed_output["sentences"][0][0][len(prompt) :].decode().strip().lower() + # MegatronLLMDeployable for NeMo 1.0 returns prompt + generated output, so need to slice off prompt. + # On the other hand, MegatronLLMDeployableNeMo2 in the case of NeMo 2.0 returns only generated text. + # TODO: Unify this somewhere else + if isinstance(model, MegatronLLMDeployableNemo2): + prefix_len = 0 + else: + prefix_len = len(prompt) + + # Accessing [0][0] of "text" is to get a raw string entry from a NumPy array + # for a single prompt (batch size = 1) and stripping prefix if needed: + deployed_output = deployed_output["choices"][0]["text"][0][0][prefix_len:].strip().lower() else: deployed_output = nq.query_llm( prompts=[prompt], @@ -574,7 +600,7 @@ def run_in_framework_inference( print("Path: {0} and model: {1} will be tested".format(checkpoint_path, model_name)) - deployed_model = MegatronLLMDeployable(checkpoint_path, num_gpus) + deployed_model = MegatronLLMDeploy.get_deployable(checkpoint_path, num_gpus) nm = DeployPyTriton( model=deployed_model, @@ -588,10 +614,12 @@ def run_in_framework_inference( output_deployed = nq.query_llm( prompts=prompts, top_k=top_k, top_p=top_p, temperature=temperature, max_length=max_output_len ) - output_deployed = output_deployed["sentences"] - # MegatronLLMDeployable will return the prompt + generated output, so cut off the prompt - for i, output in enumerate(output_deployed): - output_deployed[i, :] = output[0][len(prompts[i]) :] + output_deployed = output_deployed["choices"][0]["text"] + # MegatronLLMDeployable will return the prompt + generated output, so cut off the prompt. + # On the other hand, MegatronLLMDeployableNeMo2 returns only generated text. + if isinstance(deployed_model, MegatronLLMDeployable): + for i, output in enumerate(output_deployed): + output_deployed[i, :] = output[0][len(prompts[i]) :] # Unwrap the generator if needed output_deployed = list(output_deployed) @@ -717,6 +745,11 @@ def get_args(): type=str, default="False", ) + parser.add_argument( + "--accuracy_threshold", + type=float, + default=0.5, + ) parser.add_argument("--streaming", default=False, action="store_true") parser.add_argument( "--test_cpp_runtime", @@ -980,8 +1013,8 @@ def optional_bool_to_pass_fail(b: Optional[bool]): print(f"Deployed Model Accuracy: {accuracy_result.deployed_accuracy:.4f}") print(f"Deployed Relaxed Model Accuracy: {accuracy_result.deployed_accuracy_relaxed:.4f}") print(f"Evaluation Time [s]: {accuracy_result.evaluation_time:.2f}") - if (deployed_tests_only and accuracy_result.deployed_accuracy_relaxed < 0.5) or ( - not deployed_tests_only and accuracy_result.accuracy_relaxed < 0.5 + if (deployed_tests_only and accuracy_result.deployed_accuracy_relaxed < args.accuracy_threshold) or ( + not deployed_tests_only and accuracy_result.accuracy_relaxed < args.accuracy_threshold ): accuracy_test_result = "FAIL" @@ -995,7 +1028,7 @@ def optional_bool_to_pass_fail(b: Optional[bool]): raise Exception("Functional test failed") if accuracy_test_result == "FAIL": - raise Exception("Model accuracy is below 0.5") + raise Exception(f"Model accuracy is below {args.accuracy_threshold}") if __name__ == '__main__': diff --git a/tests/setup/data/create_sample_jsonl.py b/tests/setup/data/create_sample_jsonl.py index 00f789548f81..567abb202563 100644 --- a/tests/setup/data/create_sample_jsonl.py +++ b/tests/setup/data/create_sample_jsonl.py @@ -42,7 +42,7 @@ def create_sample_jsonl(output_file: str, overwrite: bool = False): "Korzystając z okazji chciałbym pozdrowić całą moją rodzinę i przyjaciół", ] print(f"Writing {len(texts)} line(s) to {output_file}...") - os.makedirs(os.path.dirname(output_file), exist_ok=True) + os.makedirs(os.path.dirname(os.path.abspath(output_file)), exist_ok=True) with open(output_file, mode="w", encoding="utf-8") as f: for text in texts: json.dump({"text": text}, f) @@ -52,7 +52,7 @@ def create_sample_jsonl(output_file: str, overwrite: bool = False): if __name__ == "__main__": parser = argparse.ArgumentParser("Create sample JSONL file.") - parser.add_argument("--output_file", help="Output file name") + parser.add_argument("--output_file", required=True, help="Output file name") parser.add_argument("--overwrite", action="store_true", help="Overwrite file if it exists") args = parser.parse_args() - create_sample_jsonl(args.output_file) + create_sample_jsonl(args.output_file, args.overwrite) diff --git a/tests/setup/data/create_sample_lambada.py b/tests/setup/data/create_sample_lambada.py new file mode 100644 index 000000000000..d12017a25cc4 --- /dev/null +++ b/tests/setup/data/create_sample_lambada.py @@ -0,0 +1,66 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os + +""" +Create a dataset with five Lambada test examples for functional testing. Each line +contains a dictionary with a "text_before_last_word" and "last_word" keys. +""" + + +def create_sample_lambada(output_file: str, overwrite: bool = False): + """Create JSON file with a few Lambada examples.""" + if os.path.isfile(output_file) and not overwrite: + print(f"File {output_file} exists and overwrite flag is not set so exiting.") + return + + texts = [ + { + "text_before_last_word": "In my palm is a clear stone , and inside it is a small ivory statuette . A guardian angel .\n\n\" Figured if you re going to be out at night getting hit by cars , you might as well have some backup .\"\n\n I look at him , feeling stunned . Like this is some sort of sign . But as I stare at Harlin , his mouth curved in a confident grin , I don t care about", + "last_word": "signs", + }, + { + "text_before_last_word": "Give me a minute to change and I ll meet you at the docks .\" She d forced those words through her teeth .\n\n\" No need to change . We won t be that long .\"\n\n Shane gripped her arm and started leading her to the dock .\n\n\" I can make it there on my own ,", + "last_word": "Shane", + }, + { + "text_before_last_word": "\" Only one source I know of that would be likely to cough up enough money to finance a phony sleep research facility and pay people big bucks to solve crimes in their dreams ,\" Farrell concluded dryly .\n\n\" What can I say ?\" Ellis unfolded his arms and widened his hands . \" Your tax dollars at work .\"\n\n Before Farrell could respond , Leila s voice rose from inside the house .\n\n\" No insurance ?\" she wailed . \" What do you mean you don t have any", + "last_word": "insurance", + }, + { + "text_before_last_word": "Helen s heart broke a little in the face of Miss Mabel s selfless courage . She thought that because she was old , her life was of less value than the others . For all Helen knew , Miss Mabel had a lot more years to live than she did . \" Not going to happen ,\" replied", + "last_word": "Helen", + }, + { + "text_before_last_word": "Preston had been the last person to wear those chains , and I knew what I d see and feel if they were slipped onto my skin the Reaper s unending hatred of me . I d felt enough of that emotion already in the amphitheater . I didn t want to feel anymore .\n\n\" Don t put those on me ,\" I whispered . \" Please .\"\n\n Sergei looked at me , surprised by my low , raspy please , but he put down the", + "last_word": "chains", + }, + ] + + print(f"Writing {len(texts)} line(s) to {output_file}...") + os.makedirs(os.path.dirname(os.path.abspath(output_file)), exist_ok=True) + with open(output_file, mode="w", encoding="utf-8") as f: + json.dump(texts, f) + print("OK.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Create a sample from Lambada test dataset.") + parser.add_argument("--output_file", required=True, help="Output file name") + parser.add_argument("--overwrite", action="store_true", help="Overwrite file if it exists") + args = parser.parse_args() + create_sample_lambada(args.output_file, args.overwrite)