From af21e9917e6bf3de1c0f7c183a4b02ed21e79fba Mon Sep 17 00:00:00 2001 From: Eric Harper Date: Mon, 14 Aug 2023 15:37:44 -0600 Subject: [PATCH] Use GPTModel from mcore (#7093) * start adding gpt from megatron core path Signed-off-by: ericharper * set model parallel config Signed-off-by: ericharper * use model parallel config object Signed-off-by: ericharper * update args Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * set vp size to none if it is 1 Signed-off-by: ericharper * set vp size to none if it is 1 Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add TransformerConfig Signed-off-by: ericharper * start updating to TransformerConfig Signed-off-by: ericharper * add todo Signed-off-by: ericharper * revert to model parallel config Signed-off-by: ericharper * add hidden_size to model_parallel_config Signed-off-by: ericharper * remove imports Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove import Signed-off-by: ericharper * small clean up Signed-off-by: ericharper * update hidden size in peft base model, add mcore commit to jenkins Signed-off-by: ericharper * update module args Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add config obj to flash attention tests Signed-off-by: ericharper * remove args Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove sequence parallel arg Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update args Signed-off-by: ericharper * add config to self Signed-off-by: ericharper * update args Signed-off-by: ericharper * update args Signed-off-by: ericharper * update args Signed-off-by: ericharper * add config to test Signed-off-by: ericharper * get hidden_size from config Signed-off-by: ericharper * add try except Signed-off-by: ericharper * use default Signed-off-by: ericharper * update config with hidden size Signed-off-by: ericharper * remove arg Signed-off-by: ericharper * comment out jenkins test Signed-off-by: ericharper * revert import Signed-off-by: ericharper * remove optimizer_idx Signed-off-by: eharper * prefetch num microbatches Signed-off-by: eharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * start adding gpt from megatron core path Signed-off-by: ericharper * set model parallel config Signed-off-by: ericharper * use model parallel config object Signed-off-by: ericharper * update args Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * start updating to TransformerConfig Signed-off-by: ericharper * revert to model parallel config Signed-off-by: ericharper * add hidden_size to model_parallel_config Signed-off-by: ericharper * remove imports Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update module args Signed-off-by: ericharper * add config to self Signed-off-by: ericharper * build transformer config Signed-off-by: ericharper * add model to provider func Signed-off-by: ericharper * update forward and float16 wrapper Signed-off-by: ericharper * instantiate model parallel config after init model parallel Signed-off-by: ericharper * set virtual rank Signed-off-by: ericharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add GQA config to megatron gpt model (#7096) * Add GQA config in gpt config file Signed-off-by: jasonwan * Verify mcore is enabled when using GQA Signed-off-by: jasonwan --------- Signed-off-by: jasonwan * revert Signed-off-by: ericharper * remove import Signed-off-by: eharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update for dist adam Signed-off-by: eharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * use get_gpt_module_list Signed-off-by: eharper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update megatron core commit Signed-off-by: eharper * revert change Signed-off-by: eharper * remove import Signed-off-by: eharper * remove import Signed-off-by: eharper * remove import Signed-off-by: eharper --------- Signed-off-by: ericharper Signed-off-by: eharper Signed-off-by: jasonwan Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jason Wang --- Jenkinsfile | 4 +- .../conf/megatron_gpt_config.yaml | 4 + .../language_modeling/megatron_base_model.py | 39 +- .../language_modeling/megatron_gpt_model.py | 439 ++++++++++++------ .../megatron_gpt_prompt_learning_model.py | 1 + nemo/collections/nlp/parts/utils_funcs.py | 51 +- 6 files changed, 373 insertions(+), 165 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index bb1f4955168af..727813f3bf32e 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -59,10 +59,10 @@ pipeline { stage('Megatron Core installation') { steps { - // commit points to core_transformer merge + // commit has api fix for TE sh 'git clone https://github.com/NVIDIA/Megatron-LM.git && \ cd Megatron-LM && \ - git checkout 3316e811cc5335ee24c2d203416d864edcf2f7a8 && \ + git checkout 0609f27fe8376f17ab65c001d3d8f35cd8175950 && \ pip install -e .' } } diff --git a/examples/nlp/language_modeling/conf/megatron_gpt_config.yaml b/examples/nlp/language_modeling/conf/megatron_gpt_config.yaml index 303e47d8088ea..3747e85fc6225 100755 --- a/examples/nlp/language_modeling/conf/megatron_gpt_config.yaml +++ b/examples/nlp/language_modeling/conf/megatron_gpt_config.yaml @@ -44,6 +44,9 @@ exp_manager: model_parallel_size: ${multiply:${model.tensor_model_parallel_size}, ${model.pipeline_model_parallel_size}} model: + # use GPTModel from megatron.core + mcore_gpt: False + # specify micro_batch_size, global_batch_size, and model parallelism # gradient accumulation will be done automatically based on data_parallel_size micro_batch_size: 4 # limited by GPU memory @@ -87,6 +90,7 @@ model: overlap_p2p_comm: False # Overlap p2p communication with computes. This argument is valid only when `virtual_pipeline_model_parallel_size` is larger than 1 batch_p2p_comm: True # Batch consecutive inter-peer send/recv operations. This argument is valid only when `virtual_pipeline_model_parallel_size` is larger than 1 seq_len_interpolation_factor: null # RoPE Interpolation factor for sequence length. This is used to build long-context models with RoPE ex: https://arxiv.org/abs/2306.15595. + num_query_groups: null # Number of query groups for group query attention. If None, normal attention is used. tokenizer: library: 'megatron' diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index b053b56c52f57..976c4166d8a25 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -131,7 +131,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): if vp_size is not None: if vp_size == 1: - self.cfg.virtual_pipeline_model_parallel_size = None + vp_size = None else: assert ( self.cfg.num_layers // self.cfg.pipeline_model_parallel_size @@ -141,14 +141,14 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): world_size=init_world_size, global_rank=init_global_rank, local_rank=init_local_rank, - tensor_model_parallel_size=self.cfg.get('tensor_model_parallel_size', 1), - pipeline_model_parallel_size=self.cfg.get('pipeline_model_parallel_size', 1), - virtual_pipeline_model_parallel_size=self.cfg.get('virtual_pipeline_model_parallel_size', None), - pipeline_model_parallel_split_rank=self.cfg.get('pipeline_model_parallel_split_rank', 0), - micro_batch_size=self.cfg.get('micro_batch_size'), - global_batch_size=self.cfg.get('global_batch_size'), - rampup_batch_size=self.cfg.get('rampup_batch_size'), - use_fp8=self.cfg.get('fp8', False), + tensor_model_parallel_size=cfg.get('tensor_model_parallel_size', 1), + pipeline_model_parallel_size=cfg.get('pipeline_model_parallel_size', 1), + virtual_pipeline_model_parallel_size=vp_size, + pipeline_model_parallel_split_rank=cfg.get('pipeline_model_parallel_split_rank', 0), + micro_batch_size=cfg.get('micro_batch_size'), + global_batch_size=cfg.get('global_batch_size'), + rampup_batch_size=cfg.get('rampup_batch_size'), + use_fp8=cfg.get('fp8', False), init_mpi_proc_group=cfg.get('ub_tp_comm_overlap', False), seed=self.cfg.get('seed', 1234), apex_transformer_log_level=self.cfg.get('apex_transformer_log_level', 30), @@ -157,6 +157,9 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): # This must be called after initialize model parallel since it needs to know the data parallel size self._validate_and_override_config() + # set the megatron core model parallel config + self.model_parallel_config: ModelParallelConfig = self.build_model_parallel_config() + self.grad_clip_pl_default = False # use pytorch default for gradient clipping. Default False if hasattr(self._cfg, "tokenizer") or ( @@ -643,8 +646,13 @@ def _get_total_params_across_model_parallel_groups_gpt_bert(self, model): and parallel_state.is_pipeline_last_stage(ignore_virtual=True) and self.cfg.get('share_embeddings_and_output_weights', True) ): + word_embeddings_weight = ( + model[-1].module.shared_embedding_or_output_weight() + if getattr(self, 'mcore_gpt', False) + else model[-1].word_embeddings_weight() + ) # substract the embedding weights on the last virtual stage - num_word_embedding_parameters = sum([p.nelement() for p in model[-1].word_embeddings_weight()]) + num_word_embedding_parameters = sum([p.nelement() for p in word_embeddings_weight]) num_parameters_on_device -= num_word_embedding_parameters else: num_parameters_on_device = sum([p.nelement() for p in model.parameters()]) @@ -653,8 +661,13 @@ def _get_total_params_across_model_parallel_groups_gpt_bert(self, model): and parallel_state.is_pipeline_last_stage(ignore_virtual=True) and self.cfg.get('share_embeddings_and_output_weights', True) ): + word_embeddings_weight = ( + model.module.shared_embedding_or_output_weight() + if getattr(self, 'mcore_gpt', False) + else model.word_embeddings_weight() + ) # substract the embedding weights on the last stage - num_word_embedding_parameters = sum([p.nelement() for p in model.word_embeddings_weight()]) + num_word_embedding_parameters = sum([p.nelement() for p in word_embeddings_weight]) num_parameters_on_device -= num_word_embedding_parameters # to be summed across data parallel group @@ -714,7 +727,7 @@ def _get_total_params_across_model_parallel_groups_enc_dec(self, model): torch.distributed.all_reduce(total_num_parameters, group=parallel_state.get_model_parallel_group()) return num_parameters_on_device, total_num_parameters - def build_model_parallel_config(self): + def build_model_parallel_config(self) -> ModelParallelConfig: """ For attributes in the nemo model config that are the same as the megatron core ModelParallelConfig we will use the value from the nemo config. For attributes in ModelParallelConfig that are not in the nemo model config, we add custom logic. @@ -742,7 +755,7 @@ def build_model_parallel_config(self): "fp16": False, # NeMo does not currently support fp16 training with megatron amp O2 "bf16": precision == 'bf16' and megatron_amp_O2, "params_dtype": params_dtype, - "timers": None, # NeMo dues not currently support megatron core timers + "timers": None, # NeMo does not currently support megatron core timers "async_tensor_model_parallel_allreduce": self.cfg.get('tensor_model_parallel_world_size', 1) > 1 and not self.cfg.get('sequence_parallel', False), "pipeline_dtype": pipeline_dtype, diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index e3af8c1c651d5..bd766f0ef6a6c 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -15,10 +15,12 @@ import itertools import queue import warnings +from dataclasses import fields from functools import partial from typing import Any, Dict, Iterator, List, Optional, Union import torch +from omegaconf import OmegaConf from omegaconf.dictconfig import DictConfig from pytorch_lightning.accelerators import CPUAccelerator from pytorch_lightning.trainer.trainer import Trainer @@ -51,7 +53,7 @@ SamplingParam, TextGeneration, ) -from nemo.collections.nlp.parts.utils_funcs import get_last_rank +from nemo.collections.nlp.parts.utils_funcs import activation_to_func, get_last_rank from nemo.core.classes import Exportable from nemo.core.classes.common import PretrainedModelInfo from nemo.core.neural_types import ChannelType, NeuralType @@ -69,8 +71,11 @@ try: from megatron.core import parallel_state + from megatron.core.models.gpt import GPTModel as MCoreGPTModel from megatron.core.pipeline_parallel.schedules import get_forward_backward_func + from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig + from megatron.core.utils import init_method_normal, scaled_init_method_normal # TODO @tmoon: Use once available in Megatron-LM # from megatron.core.pipeline_parallel.schedules import DataIteratorList @@ -205,8 +210,13 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): self._validate_trainer() + # build the transformer config + self.transformer_config = self.build_transformer_config() + self.megatron_amp_o2 = cfg.get('megatron_amp_O2', False) + self.mcore_gpt = cfg.get('mcore_gpt', False) + self.rampup_batch_size = self.cfg.get('rampup_batch_size', None) if self.rampup_batch_size: self.prev_consumed_samples = 0 @@ -245,27 +255,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): else: self.model.cuda(torch.cuda.current_device()) - # Model wrapper to convert both model and inputs to half precision - if isinstance(self.model, list): - converted_model = [] - for module in self.model: - converted_model.append( - Float16Module( - config=self.model_parallel_config, - module=module, - precision=cfg.precision, - share_token_embeddings=self.cfg.get('share_embeddings_and_output_weights', True), - ) - ) - - self.model = converted_model - else: - self.model = Float16Module( - config=self.model_parallel_config, - module=self.model, - precision=cfg.precision, - share_token_embeddings=self.cfg.get('share_embeddings_and_output_weights', True), - ) + self._wrap_model_for_O2() if self.trainer.precision in ['bf16', 'bf16-mixed']: self.autocast_dtype = torch.bfloat16 @@ -298,8 +288,11 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): def get_gpt_module_list(self): if isinstance(self.model, list): - return [model.module if isinstance(model, Float16Module) else model for model in self.model] - elif isinstance(self.model, Float16Module): + return [ + model.module if isinstance(model, (Float16Module, MCoreFloat16Module)) else model + for model in self.model + ] + elif isinstance(self.model, (Float16Module, MCoreFloat16Module)): return [self.model.module] else: return [self.model] @@ -312,67 +305,84 @@ def get_inference_config(self): def model_provider_func(self, pre_process, post_process): """Model depends on pipeline paralellism.""" - model = GPTModel( - config=self.model_parallel_config, - vocab_size=self.cfg.get('override_vocab_size', self.padded_vocab_size), - hidden_size=self.cfg.hidden_size, - max_position_embeddings=self.cfg.max_position_embeddings, - num_layers=self.cfg.num_layers, - num_attention_heads=self.cfg.num_attention_heads, - apply_query_key_layer_scaling=self.cfg.get('apply_query_key_layer_scaling', True), - kv_channels=self.cfg.get('kv_channels', None), - ffn_hidden_size=self.cfg.ffn_hidden_size, - num_tokentypes=0, - parallel_output=True, - pre_process=pre_process, - post_process=post_process, - init_method_std=self.cfg.get('init_method_std', 0.02), - use_scaled_init_method=self.cfg.get('use_scaled_init_method', True), - fp16_lm_cross_entropy=self.cfg.get('fp16_lm_cross_entropy', False), - megatron_amp_O2=self.cfg.get('megatron_amp_O2', False), - hidden_dropout=self.cfg.get('hidden_dropout', 0.1), - attention_dropout=self.cfg.get('attention_dropout', 0.1), - ffn_dropout=self.cfg.get('ffn_dropout', 0.0), - precision=self.cfg.get('precision', 16), - fp32_residual_connection=self.cfg.get('fp32_residual_connection', False), - activations_checkpoint_granularity=self.cfg.get('activations_checkpoint_granularity', None), - activations_checkpoint_method=self.cfg.get('activations_checkpoint_method', None), - activations_checkpoint_num_layers=self.cfg.get('activations_checkpoint_num_layers', 1), - activations_checkpoint_layers_per_pipeline=self.cfg.get( - 'activations_checkpoint_layers_per_pipeline', None - ), - normalization=self.cfg.get('normalization', 'layernorm'), - layernorm_epsilon=self.cfg.get('layernorm_epsilon', 1e-5), - onnx_safe=self.cfg.get('onnx_safe', False), - bias=self.cfg.get('bias', True), - bias_activation_fusion=self.cfg.get('bias_activation_fusion', True), - bias_dropout_add_fusion=self.cfg.get('bias_dropout_add_fusion', True), - activation=self.cfg.get('activation', 'gelu'), - headscale=self.cfg.get('headscale', False), - transformer_block_type=self.cfg.get('transformer_block_type', 'pre_ln'), - openai_gelu=self.cfg.get('openai_gelu', False), - normalize_attention_scores=self.cfg.get('normalize_attention_scores', True), - position_embedding_type=self.cfg.get('position_embedding_type', 'learned_absolute'), - rotary_percentage=self.cfg.get('rotary_percentage', 1.0), - share_embeddings_and_output_weights=self.cfg.get('share_embeddings_and_output_weights', True), - attention_type=self.cfg.get('attention_type', 'multihead'), - masked_softmax_fusion=self.cfg.get('masked_softmax_fusion', True), - persist_layer_norm=self.cfg.get('persist_layer_norm', False), - transformer_engine=self.cfg.get('transformer_engine', False), - fp8=self.cfg.get('fp8', False), - fp8_e4m3=self.cfg.get('fp8_e4m3', False), - fp8_hybrid=self.cfg.get('fp8_hybrid', False), - fp8_margin=self.cfg.get('fp8_margin', 0), - fp8_interval=self.cfg.get('fp8_interval', 1), - fp8_amax_history_len=self.cfg.get('fp8_amax_history_len', 1), - fp8_amax_compute_algo=self.cfg.get('fp8_amax_compute_algo', 'most_recent'), - reduce_amax=self.cfg.get('reduce_amax', True), - use_emha=self.cfg.get('use_emha', False), - use_flash_attention=self.cfg.get('use_flash_attention', False), - megatron_legacy=self.cfg.get('megatron_legacy', False), - seq_len_interpolation_factor=self.cfg.get('seq_len_interpolation_factor', None), - ) - + if self.mcore_gpt: + model = MCoreGPTModel( + config=self.transformer_config, + vocab_size=self.cfg.get('override_vocab_size', self.padded_vocab_size), + max_sequence_length=self.cfg.get('encoder_seq_length', 512), + pre_process=pre_process, + post_process=post_process, + parallel_output=True, + share_embeddings_and_output_weights=self.cfg.get('share_embeddings_and_output_weights', True), + position_embedding_type=self.cfg.get('position_embedding_type', 'learned_absolute'), + rotary_percent=self.cfg.get('rotary_percentage', 1.0), + ) + else: + assert ( + self.cfg.get('num_query_groups', None) is None + ), "Group Query Attention is only supported in Megatron Core. Set 'mcore_gpt' to use GQA." + + model = GPTModel( + config=self.model_parallel_config, + vocab_size=self.cfg.get('override_vocab_size', self.padded_vocab_size), + hidden_size=self.cfg.hidden_size, + max_position_embeddings=self.cfg.max_position_embeddings, + num_layers=self.cfg.num_layers, + num_attention_heads=self.cfg.num_attention_heads, + apply_query_key_layer_scaling=self.cfg.get('apply_query_key_layer_scaling', True), + kv_channels=self.cfg.get('kv_channels', None), + ffn_hidden_size=self.cfg.ffn_hidden_size, + num_tokentypes=0, + parallel_output=True, + pre_process=pre_process, + post_process=post_process, + init_method_std=self.cfg.get('init_method_std', 0.02), + use_scaled_init_method=self.cfg.get('use_scaled_init_method', True), + fp16_lm_cross_entropy=self.cfg.get('fp16_lm_cross_entropy', False), + megatron_amp_O2=self.cfg.get('megatron_amp_O2', False), + hidden_dropout=self.cfg.get('hidden_dropout', 0.1), + attention_dropout=self.cfg.get('attention_dropout', 0.1), + ffn_dropout=self.cfg.get('ffn_dropout', 0.0), + precision=self.cfg.get('precision', 16), + fp32_residual_connection=self.cfg.get('fp32_residual_connection', False), + activations_checkpoint_granularity=self.cfg.get('activations_checkpoint_granularity', None), + activations_checkpoint_method=self.cfg.get('activations_checkpoint_method', None), + activations_checkpoint_num_layers=self.cfg.get('activations_checkpoint_num_layers', 1), + activations_checkpoint_layers_per_pipeline=self.cfg.get( + 'activations_checkpoint_layers_per_pipeline', None + ), + normalization=self.cfg.get('normalization', 'layernorm'), + layernorm_epsilon=self.cfg.get('layernorm_epsilon', 1e-5), + onnx_safe=self.cfg.get('onnx_safe', False), + bias=self.cfg.get('bias', True), + bias_activation_fusion=self.cfg.get('bias_activation_fusion', True), + bias_dropout_add_fusion=self.cfg.get('bias_dropout_add_fusion', True), + activation=self.cfg.get('activation', 'gelu'), + headscale=self.cfg.get('headscale', False), + transformer_block_type=self.cfg.get('transformer_block_type', 'pre_ln'), + openai_gelu=self.cfg.get('openai_gelu', False), + normalize_attention_scores=self.cfg.get('normalize_attention_scores', True), + position_embedding_type=self.cfg.get('position_embedding_type', 'learned_absolute'), + rotary_percentage=self.cfg.get('rotary_percentage', 1.0), + share_embeddings_and_output_weights=self.cfg.get('share_embeddings_and_output_weights', True), + attention_type=self.cfg.get('attention_type', 'multihead'), + masked_softmax_fusion=self.cfg.get('masked_softmax_fusion', True), + persist_layer_norm=self.cfg.get('persist_layer_norm', False), + transformer_engine=self.cfg.get('transformer_engine', False), + fp8=self.cfg.get('fp8', False), + fp8_e4m3=self.cfg.get('fp8_e4m3', False), + fp8_hybrid=self.cfg.get('fp8_hybrid', False), + fp8_margin=self.cfg.get('fp8_margin', 0), + fp8_interval=self.cfg.get('fp8_interval', 1), + fp8_amax_history_len=self.cfg.get('fp8_amax_history_len', 1), + fp8_amax_compute_algo=self.cfg.get('fp8_amax_compute_algo', 'most_recent'), + reduce_amax=self.cfg.get('reduce_amax', True), + use_emha=self.cfg.get('use_emha', False), + ub_tp_comm_overlap=self.cfg.get('ub_tp_comm_overlap', False), + use_flash_attention=self.cfg.get('use_flash_attention', False), + megatron_legacy=self.cfg.get('megatron_legacy', False), + seq_len_interpolation_factor=self.cfg.get('seq_len_interpolation_factor', None), + ) return model def setup_optimizer_param_groups(self): @@ -393,22 +403,31 @@ def configure_optimizers(self): # Disable overlapped grad sync for embedding grad when # pipeline parallelism is enabled if parallel_state.get_pipeline_model_parallel_world_size() > 1: + modules = self.get_gpt_module_list() if parallel_state.is_pipeline_first_stage(ignore_virtual=True): - if isinstance(self.model, list): - module = self.model[0] # only the first virtual rank has the embeddings + if len(modules) > 1: + module = modules[0] # only the first virtual rank has the embeddings else: - module = self.model - if module.share_token_embeddings: - param = module.word_embeddings_weight() + module = modules[0] + if self.cfg.get('share_embeddings_and_output_weights', True): + param = ( + module.shared_embedding_or_output_weight() + if self.mcore_gpt + else module.word_embeddings_weight() + ) param._disable_greedy_grad_copy = not self.megatron_amp_o2 param._disable_overlap_grad_sync = True if parallel_state.is_pipeline_last_stage(ignore_virtual=True): - if isinstance(self.model, list): - module = self.model[-1] # only the last virtual rank has the embeddings + if len(modules) > 1: + module = modules[-1] # only the last virtual rank has the embeddings else: - module = self.model - if module.share_token_embeddings: - param = module.word_embeddings_weight() + module = modules[0] + if self.cfg.get('share_embeddings_and_output_weights', True): + param = ( + module.shared_embedding_or_output_weight() + if self.mcore_gpt + else module.word_embeddings_weight() + ) param._disable_greedy_grad_copy = not self.megatron_amp_o2 param._disable_overlap_grad_sync = True @@ -426,10 +445,11 @@ def configure_optimizers(self): if self.cfg.get('virtual_pipeline_model_parallel_size', None) is not None: # Initialize a bucket for each virtual pipeline stage for module in self.model: - if isinstance(module, Float16Module): + if isinstance(module, (Float16Module, MCoreFloat16Module)): module = module.module stage_bucket = [] - for layer in module.language_model.encoder.layers: + layers = module.decoder.layers if self.mcore_gpt else module.language_model.encoder.layers + for layer in layers: stage_bucket.extend( p for p in layer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) ) @@ -438,9 +458,10 @@ def configure_optimizers(self): # Initialize a bucket for each Transformer layer modules = self.model if isinstance(self.model, list) else [self.model] for module in modules: - if isinstance(module, Float16Module): + if isinstance(module, (Float16Module, MCoreFloat16Module)): module = module.module - for layer in module.language_model.encoder.layers: + layers = module.decoder.layers if self.mcore_gpt else module.language_model.encoder.layers + for layer in layers: buckets.append( [p for p in layer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False)] ) @@ -569,9 +590,10 @@ def training_step(self, dataloader_iter, batch_idx): # manually interact with the parameter. modules = self.model if isinstance(self.model, list) else [self.model] for module in modules: - if isinstance(module, Float16Module): + if isinstance(module, (Float16Module, MCoreFloat16Module)): module = module.module - module = module.language_model + if not self.mcore_gpt: + module = module.language_model if hasattr(module, 'embedding'): for param in module.embedding.parameters(): param.data_ptr() @@ -699,18 +721,18 @@ def allreduce_first_last_embeddings(self): parallel_state.is_pipeline_first_stage(ignore_virtual=True) or parallel_state.is_pipeline_last_stage(ignore_virtual=True) ): + module_list = self.get_gpt_module_list() if parallel_state.is_pipeline_first_stage(ignore_virtual=True): - if isinstance(self.model, list): - module = self.model[0] # only the first virtual rank has the embeddings - else: - module = self.model - if parallel_state.is_pipeline_last_stage(ignore_virtual=True): - if isinstance(self.model, list): - module = self.model[-1] # only the last virtual rank has the embeddings - else: - module = self.model - if module.share_token_embeddings: - word_embeddings_weight = module.word_embeddings_weight() + module = module_list[0] # only the first virtual rank has the embeddings + elif parallel_state.is_pipeline_last_stage(ignore_virtual=True): + module = module_list[-1] # only the last virtual rank has the embeddings + share_embeddings = ( + module.share_embeddings_and_output_weights if self.mcore_gpt else module.share_token_embeddings + ) + if share_embeddings: + word_embeddings_weight = ( + module.shared_embedding_or_output_weight() if self.mcore_gpt else module.word_embeddings_weight() + ) # (@adithyare) adapter training now extends MegatronGPTModel so we have to add this check here to ensure we do not perform all_reduce when grad is None. # grad can be None when performing PeFT training. if word_embeddings_weight.requires_grad: @@ -801,14 +823,17 @@ def fwd_output_and_loss_func(dataloader_iter, model, checkpoint_activations_all_ if self.get_attention_mask_from_fusion: required_keys.remove('attention_mask') batch = {key: val.cuda(non_blocking=True) if key in required_keys else None for key, val in batch.items()} + # Model forward pass - output_tensor = model( - batch['tokens'], - batch['position_ids'], - batch['attention_mask'], - batch['labels'], - checkpoint_activations_all_layers=checkpoint_activations_all_layers, - ) + forward_args = { + 'input_ids': batch['tokens'], + 'position_ids': batch['position_ids'], + 'attention_mask': batch['attention_mask'], + 'labels': batch['labels'], + } + if not self.mcore_gpt: + forward_args['checkpoint_activations_all_layers'] = checkpoint_activations_all_layers + output_tensor = model(**forward_args) def loss_func(output_tensor): # Loss for a micro-batch (ub) @@ -1067,19 +1092,21 @@ def setup(self, stage=None): self.setup_test_data(self.cfg.data) if stage == 'fit': - # when using pipeline model parallel the final stage need to initialize word embeddings if parallel_state.get_pipeline_model_parallel_world_size() > 1: - if isinstance(self.model, list): - for i, module in enumerate(self.model): - parallel_state.set_virtual_pipeline_model_parallel_rank(i) - if self.cfg.get('share_embeddings_and_output_weights', True): - module.sync_initial_word_embeddings() - parallel_state.set_virtual_pipeline_model_parallel_rank(0) - else: - if self.cfg.get('share_embeddings_and_output_weights', True): - self.model.sync_initial_word_embeddings() + if self.cfg.get('share_embeddings_and_output_weights', True): + for index, module in enumerate(self.get_gpt_module_list()): + if parallel_state.get_virtual_pipeline_model_parallel_world_size() is not None: + parallel_state.set_virtual_pipeline_model_parallel_rank(index) + sync_embeddings = ( + module.initialize_last_stage_with_word_embeddings + if self.mcore_gpt + else module.sync_initial_word_embeddings + ) + sync_embeddings() + if parallel_state.get_virtual_pipeline_model_parallel_world_size() is not None: + parallel_state.set_virtual_pipeline_model_parallel_rank(0) - if self.cfg.get('transformer_engine', False): + if self.cfg.get('transformer_engine', False) or self.cfg.get('mcore_gpt', False): self.setup_transformer_engine_tp_groups() def setup_training_data(self, cfg): @@ -1208,29 +1235,22 @@ def list_available_models(cls) -> Optional[PretrainedModelInfo]: ) return result - def _set_tp_groups(self, module): - """ Helper method to set tp groups for transformer engine""" - - if self.cfg.get('transformer_engine', False): - logging.info(f'Setting up transformer engine modules for tensor parallelism.') - if self.cfg.get('megatron_amp_O2', 'False'): - # when using O2 additional module key is added that casts the weights - for layer in module.module.language_model.encoder.layers: - layer.set_tensor_parallel_group(parallel_state.get_tensor_model_parallel_group()) - - else: - for layer in module.language_model.encoder.layers: - layer.set_tensor_parallel_group(parallel_state.get_tensor_model_parallel_group()) - def setup_transformer_engine_tp_groups(self): """ This should be called after model parallel groups have been initialized and only needs to be called when using Transformer Engine. """ - if isinstance(self.model, list): - for module in self.model: - self._set_tp_groups(module) - else: - self._set_tp_groups(self.model) + + for module in self.get_gpt_module_list(): + """Set TP group + Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 + """ + # Deep iterate but skip self to avoid infinite recursion. + for index, child in enumerate(module.modules()): + if index == 0: + continue + if hasattr(child, "set_tensor_parallel_group"): + tp_group = parallel_state.get_tensor_model_parallel_group() + child.set_tensor_parallel_group(tp_group) def on_save_checkpoint(self, checkpoint) -> None: """LightningModule hook: @@ -1284,6 +1304,7 @@ def _reset_activation_checkpointing_args(self): # Reset model parameters. for module in self.get_gpt_module_list(): + # TODO: @eharper how to update this for mcore module.language_model.encoder.activations_checkpoint_granularity = None module.language_model.encoder.activations_checkpoint_method = None module.language_model.encoder.activations_checkpoint_num_layers = None @@ -1295,6 +1316,7 @@ def _restore_activation_checkpointing_args(self): _reset_activation_checkpointing_args. """ # Restore config values. + # TODO: @eharper how to update this for mcore self.cfg.activations_checkpoint_granularity = self.last_activations_checkpoint_granularity self.cfg.activations_checkpoint_method = self.last_activations_checkpoint_method self.cfg.activations_checkpoint_num_layers = self.last_activations_checkpoint_num_layers @@ -1343,3 +1365,122 @@ def _restore_sequence_parallelism_args(self): for mod in module.modules(): if hasattr(mod, "sequence_parallel"): mod.sequence_parallel = self.last_sequence_parallel + + def build_transformer_config(self) -> TransformerConfig: + """ Builds the megatron core gpt transformer config for the model. + For attributes in the nemo model config that are the same + as the megatron core TransformerConfig, we will use the value from the nemo model config. + For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. + """ + + # create a dictionary copy of the model config + cfg = OmegaConf.to_container(self.cfg, resolve=True) + + # create a dict to store the transformer config arguments + transformer_config_dict = {} + + # get model parallel configs from the base class + model_parallel_config = self.build_model_parallel_config() + + add_bias_linear = self.cfg.get('bias', True) + + activation = self.cfg.get('activation', 'gelu') + # TODO: need to check which activation functions are supported in mcore + activation_func = activation_to_func(activation) + + init_method_std = self.cfg.get('init_method_std', 0.02) + # default used in mcore + init_method = init_method_normal(init_method_std) + + output_layer_init_method = init_method + num_layers = self.cfg.get('num_layers', 1) + use_scaled_init_method = self.cfg.get('use_scaled_init_method', True) + if use_scaled_init_method: + output_layer_init_method = scaled_init_method_normal(init_method_std, num_layers=num_layers) + + attention_softmax_in_fp32 = False # not currently used in NeMo unless apply_query_key_layer_scaling is True + apply_query_key_layer_scaling = self.cfg.get('apply_query_key_layer_scaling', False) + if apply_query_key_layer_scaling: + attention_softmax_in_fp32 = True + + bias_activation_fusion = self.cfg.get('bias_activation_fusion', True) + bias_gelu_fusion = True if bias_activation_fusion else False + + bias_dropout_fusion = self.cfg.get('bias_dropout_add_fusion', True) + + # TODO: need to check if recompute APIs are matching up properly + recompute_granularity = self.cfg.get('activations_checkpoint_granularity', None) + recompute_method = self.cfg.get('activations_checkpoint_method', None) + recompute_num_layers = self.cfg.get('activations_checkpoint_num_layers', None) + + # any configs that are not in the nemo model config will be added here + config_mapping = { + 'apply_residual_connection_post_layernorm': False, # we don't use this in NeMo + 'layernorm_zero_centered_gamma': False, # not currently used in NeMo + 'add_bias_linear': add_bias_linear, + 'gated_linear_unit': False, # TODO: is this used in NeMo? + 'activation_func': activation_func, + 'init_method': init_method, + 'output_layer_init_method': output_layer_init_method, + 'attention_softmax_in_fp32': attention_softmax_in_fp32, + 'bias_gelu_fusion': bias_gelu_fusion, + 'bias_dropout_fusion': bias_dropout_fusion, + 'recompute_granularity': recompute_granularity, + 'recompute_method': recompute_method, + 'recompute_num_layers': recompute_num_layers, + 'distribute_saved_activations': False, # not currently used in NeMo + } + + # populate the transformer config dict + for field in fields(TransformerConfig): + # model config has priority + if field.name in cfg: + transformer_config_dict[field.name] = cfg[field.name] + # then model parallel config + elif field in fields(model_parallel_config): + transformer_config_dict[field.name] = getattr(model_parallel_config, field.name) + # then config mapping + elif field.name in config_mapping: + transformer_config_dict[field.name] = config_mapping[field.name] + else: + logging.warning( + f"The model: {self} does not have field.name: {field.name} in its cfg. " + f"Add this key to cfg or config_mapping to make to make it configurable." + ) + + transformer_config = TransformerConfig(**transformer_config_dict) + + return transformer_config + + def _wrap_model_for_O2(self): + """ Wraps self.model in a float16 wrapper if the model is using megatron amp O2. + Args: + model: The model to wrap. Can be a list of modules or a single module. + Returns: + The wrapped model. Returns a list of wrapped modules or a single wrapped module. + """ + Float16Wrapper = MCoreFloat16Module if self.mcore_gpt else Float16Module + + nemo_args = { + 'config': self.model_parallel_config, + 'precision': self.cfg.precision, + 'share_token_embeddings': self.cfg.get('share_embeddings_and_output_weights', True), + } + mcore_args = { + 'config': self.transformer_config, + } + + args = mcore_args if self.mcore_gpt else nemo_args + + # Model wrapper to convert both model and inputs to half precision + if isinstance(self.model, list): + converted_model = [] + for module in self.model: + args['module'] = module + converted_model.append(Float16Wrapper(**args)) + self.model = converted_model + else: + args['module'] = self.model + self.model = Float16Wrapper(**args) + + args.pop('module') diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py index 5edc56bf5d0a2..c455660cb21ff 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py @@ -104,6 +104,7 @@ def init_model(self, cfg: DictConfig, trainer: Trainer): save_restore_connector=save_restore_connector, ) + # set hidden size in the model parallel config for pipeline parallel schedules setattr(self.config, 'hidden_size', frozen_model_cfg.hidden_size) # Need to overwrite some params in frozen model's config before restoring diff --git a/nemo/collections/nlp/parts/utils_funcs.py b/nemo/collections/nlp/parts/utils_funcs.py index ad94e92d94cfc..b60b0c1528c7f 100644 --- a/nemo/collections/nlp/parts/utils_funcs.py +++ b/nemo/collections/nlp/parts/utils_funcs.py @@ -16,14 +16,18 @@ import os import time -from typing import Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Union import numpy as np import torch +import torch.nn.functional as F from matplotlib import pyplot as plt from sklearn.metrics import classification_report, confusion_matrix from torch import Tensor +from nemo.collections.nlp.modules.common.megatron.utils import erf_gelu +from nemo.collections.nlp.modules.common.megatron.utils import openai_gelu as openai_gelu_func +from nemo.collections.nlp.modules.common.megatron.utils import squared_relu from nemo.utils import logging @@ -151,3 +155,48 @@ def is_last_rank(): def get_last_rank(): return torch.distributed.get_world_size() - 1 + + +def activation_to_func(activation: str, openai_gelu: bool = False, onnx_safe: bool = False) -> Callable: + """ Converts an activation function represented as a string to a function. + + Args: + activation (str): string representation of an activation function, typically gotten from the model config. + openai_gelu (bool): whether to use the OpenAI GELU implementation. Used with HF compatibility. + onnx_safe (bool): whether to use the ONNX-compatible implementation of GELU. + + Returns: + Callable: the activation function. + """ + + supported_activations = [ + 'gelu', + 'geglu', + 'reglu', + 'swiglu', + 'squared-relu', + 'fast-geglu', + 'fast-swiglu', + 'fast-reglu', + ] + + if activation not in supported_activations: + raise ValueError(f"Unsupported activation {activation}. Supported activations: {supported_activations} ") + + # Give openai_gelu precedence over other activations if set, for HF compatibility. + # Normally this is off and shouldn't affect regular model training. + if openai_gelu: + activation_func = openai_gelu_func + elif activation in ["gelu", "geglu", "fast-geglu"]: + activation_func = F.gelu + elif onnx_safe: + activation_func = erf_gelu + elif activation in ["reglu", "fast-reglu"]: + activation_func = F.relu + elif activation in ["swiglu", "fast-swiglu"]: + # SiLU or sigmoid linear unit is the same as swish with beta = 1 (which is what https://arxiv.org/pdf/2002.05202.pdf uses.) + activation_func = F.silu + elif activation == 'squared-relu': + activation_func = squared_relu + + return activation_func