From 925dcbc7620b65d15c4221cf4d520eb227c9e17c Mon Sep 17 00:00:00 2001 From: ashors1 Date: Tue, 9 Jul 2024 23:49:28 +0000 Subject: [PATCH] Apply isort and black reformatting Signed-off-by: ashors1 --- .../nmt_transformer_infer_megatron.py | 11 +- .../text_to_image/controlnet/controlnet.py | 100 ++++++----- .../text_to_image/dreambooth/dreambooth.py | 56 +++--- .../models/text_to_image/imagen/imagen.py | 65 ++++--- .../stable_diffusion/diffusion_engine.py | 84 ++++++--- .../stable_diffusion/ldm/ddpm.py | 169 +++++++++++------- .../clip/megatron_clip_models.py | 2 +- .../megatron_nsfw_clip_models.py | 2 +- .../speech_llm/models/modular_models.py | 2 +- .../speech_llm/models/modular_t5_models.py | 10 +- .../megatron_bert_embedding_model.py | 12 +- .../language_modeling/megatron_bert_model.py | 81 +++++---- .../language_modeling/megatron_gpt_model.py | 3 +- .../megatron_gpt_sft_model.py | 10 +- .../megatron_lm_encoder_decoder_model.py | 7 +- .../language_modeling/megatron_retro_model.py | 2 +- .../language_modeling/megatron_t0_model.py | 15 +- .../megatron_t5_prompt_learning_model.py | 68 +++---- .../megatron_t5_sft_model.py | 32 ++-- .../machine_translation/megatron_nmt_model.py | 62 ++++--- .../megatron_vit_classification_models.py | 67 +++---- nemo/utils/apex_utils.py | 22 ++- .../collections/nlp/test_rampup_batch_size.py | 5 +- 23 files changed, 512 insertions(+), 375 deletions(-) diff --git a/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py b/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py index c99bf5b40828..fcf1fb8d1796 100644 --- a/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py +++ b/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py @@ -35,6 +35,7 @@ from nemo.utils.app_state import AppState from nemo.utils.model_utils import inject_model_parallel_rank + @hydra_runner(config_path="conf", config_name="nmt_megatron_infer") def main(cfg) -> None: @@ -91,13 +92,19 @@ def main(cfg) -> None: src_text.append(line.strip()) if len(src_text) == cfg.batch_size: translations = model.translate( - text=src_text, source_lang=cfg.source_lang, target_lang=cfg.target_lang, + text=src_text, + source_lang=cfg.source_lang, + target_lang=cfg.target_lang, ) for translation in translations: tgt_f.write(translation + "\n") src_text = [] if len(src_text) > 0: - translations = model.translate(text=src_text, source_lang=cfg.source_lang, target_lang=cfg.target_lang,) + translations = model.translate( + text=src_text, + source_lang=cfg.source_lang, + target_lang=cfg.target_lang, + ) for translation in translations: tgt_f.write(translation + "\n") diff --git a/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py b/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py index c6dedb56087f..fc661d91ab61 100644 --- a/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py +++ b/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py @@ -380,7 +380,9 @@ def __init__( time_embed_dim = model_channels * 4 self.time_embed = nn.Sequential( - linear(model_channels, time_embed_dim), nn.SiLU(), linear(time_embed_dim, time_embed_dim), + linear(model_channels, time_embed_dim), + nn.SiLU(), + linear(time_embed_dim, time_embed_dim), ) self.input_blocks = nn.ModuleList( @@ -505,24 +507,26 @@ def __init__( use_checkpoint=use_checkpoint, use_scale_shift_norm=use_scale_shift_norm, ), - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads, - num_head_channels=dim_head, - use_new_attention_order=use_new_attention_order, - ) - if not use_spatial_transformer - else SpatialTransformer( # always uses a self-attn - ch, - num_heads, - dim_head, - depth=transformer_depth, - context_dim=context_dim, - disable_self_attn=disable_middle_self_attn, - use_linear=use_linear_in_transformer, - use_checkpoint=use_checkpoint, - use_flash_attention=use_flash_attention, + ( + AttentionBlock( + ch, + use_checkpoint=use_checkpoint, + num_heads=num_heads, + num_head_channels=dim_head, + use_new_attention_order=use_new_attention_order, + ) + if not use_spatial_transformer + else SpatialTransformer( # always uses a self-attn + ch, + num_heads, + dim_head, + depth=transformer_depth, + context_dim=context_dim, + disable_self_attn=disable_middle_self_attn, + use_linear=use_linear_in_transformer, + use_checkpoint=use_checkpoint, + use_flash_attention=use_flash_attention, + ) ), ResBlock( ch, @@ -684,7 +688,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -728,12 +735,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the apex fwd/bwd functions self._optimizer.zero_grad() @@ -777,20 +784,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -803,8 +810,8 @@ def _append_sequence_parallel_module_grads(self, module, grads): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the global batch for apex fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the global batch for apex fwd/bwd functions. + Global batch is a list of micro batches. """ # noise_map, condition batch[self.cfg.first_stage_key] = batch[self.cfg.first_stage_key].cuda(non_blocking=True) @@ -814,7 +821,8 @@ def process_batch(batch): # SD has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): x, c = self.model.get_input(batch, self.cfg.first_stage_key) @@ -881,7 +889,7 @@ def validation_step(self, batch, batch_idx): self.log_dict(val_loss_dict, prog_bar=False, logger=True, on_step=False, on_epoch=True) def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -935,7 +943,8 @@ def build_train_valid_test_datasets(self): if self.cfg.first_stage_key.endswith("encoded"): self._train_ds, self._validation_ds = build_train_valid_precached_datasets( - model_cfg=self.cfg, consumed_samples=self.compute_consumed_samples(0), + model_cfg=self.cfg, + consumed_samples=self.compute_consumed_samples(0), ) else: self._train_ds, self._validation_ds = build_train_valid_datasets( @@ -989,20 +998,23 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py b/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py index c9a3e06d7f22..9db63c2abfce 100644 --- a/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py +++ b/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py @@ -99,7 +99,9 @@ def __init__(self, cfg, model_parallel_config): self.get_noise_scheduler(self.cfg.noise_scheduler) self.model_type = None - self.rng = torch.Generator(device=torch.cuda.current_device(),) + self.rng = torch.Generator( + device=torch.cuda.current_device(), + ) self.use_cached_latents = self.cfg.use_cached_latents @@ -246,7 +248,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -291,12 +296,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the apex fwd/bwd functions @@ -351,20 +356,20 @@ def validation_step(self, dataloader_iter): return loss def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -381,7 +386,8 @@ def process_batch(batch): prompts, images = batch # DB has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): images = images.cuda(non_blocking=True) @@ -412,7 +418,7 @@ def fwd_output_only_func(batch, model): return fwd_output_only_func def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -472,9 +478,9 @@ def setup_training_data(self, cfg): center_crop=cfg.center_crop, load_cache_latents=self.model.use_cached_latents, cached_instance_data_root=self.cfg.data.get("cached_instance_dir", None), - cached_reg_data_root=self.cfg.data.get("cached_reg_dir", None) - if self.cfg.with_prior_preservation - else None, + cached_reg_data_root=( + self.cfg.data.get("cached_reg_dir", None) if self.cfg.with_prior_preservation else None + ), vae=self.model.vae, text_encoder=self.model.text_encoder, ) @@ -505,16 +511,16 @@ def setup_test_data(self, cfg): pass def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py b/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py index 8c2935430a5c..9dd52543f7bc 100644 --- a/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py +++ b/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py @@ -218,8 +218,8 @@ def model_provider_func(self, pre_process=True, post_process=True): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the batch for megatron fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the batch for megatron fwd/bwd functions. + Global batch is a list of micro batches. """ # Base model and SR models have slightly different batch input: # Base model would only require images (64x64), @@ -323,7 +323,10 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def fwd_bwd_step(self, dataloader_iter, forward_only): @@ -332,7 +335,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -379,12 +385,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the megatron-core fwd/bwd functions @@ -434,20 +440,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -460,10 +466,10 @@ def _append_sequence_parallel_module_grads(self, module, grads): def validation_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. """ + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions.""" loss, val_loss_dict = self.fwd_bwd_step(dataloader_iter, True) @@ -471,7 +477,7 @@ def validation_step(self, dataloader_iter): return loss def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -520,16 +526,16 @@ def setup(self, stage=None): self.model.setup_rng() def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -558,7 +564,10 @@ def on_load_checkpoint(self, checkpoint) -> None: inductor_enabled = self.cfg.get('inductor', False) state_dict = checkpoint['state_dict'] inductor_checkpoint = False - for k, v, in state_dict.items(): + for ( + k, + v, + ) in state_dict.items(): if '_orig_mod' in k: inductor_checkpoint = True break diff --git a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py index 84414701f6e1..c6c94bb8cd4b 100644 --- a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py +++ b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py @@ -119,7 +119,9 @@ def __init__(self, cfg, model_parallel_config): self._init_first_stage(first_stage_config) self.model_type = None - self.rng = torch.Generator(device=torch.cuda.current_device(),) + self.rng = torch.Generator( + device=torch.cuda.current_device(), + ) self.use_ema = False # TODO use_ema need to switch to NeMo style if self.use_ema: @@ -185,7 +187,12 @@ def training_step(self, batch, batch_idx): self.log_dict(loss_dict, prog_bar=True, logger=True, on_step=True, on_epoch=False) self.log( - "global_step", self.global_step, prog_bar=True, logger=True, on_step=True, on_epoch=False, + "global_step", + self.global_step, + prog_bar=True, + logger=True, + on_step=True, + on_epoch=False, ) if self.scheduler_config is not None: @@ -231,7 +238,11 @@ def configure_optimizers(self): scheduler = DiffusionEngine.from_config_dict(self.scheduler_config) print("Setting up LambdaLR scheduler...") scheduler = [ - {"scheduler": LambdaLR(opt, lr_lambda=scheduler.schedule), "interval": "step", "frequency": 1,} + { + "scheduler": LambdaLR(opt, lr_lambda=scheduler.schedule), + "interval": "step", + "frequency": 1, + } ] return [opt], scheduler return opt @@ -291,7 +302,14 @@ def set_input_tensor(self, input_tensor): pass @torch.no_grad() - def log_images(self, batch: Dict, N: int = 8, sample: bool = True, ucg_keys: List[str] = None, **kwargs,) -> Dict: + def log_images( + self, + batch: Dict, + N: int = 8, + sample: bool = True, + ucg_keys: List[str] = None, + **kwargs, + ) -> Dict: conditioner_input_keys = [e.input_key for e in self.conditioner.embedders] if ucg_keys: assert all(map(lambda x: x in conditioner_input_keys, ucg_keys)), ( @@ -305,7 +323,8 @@ def log_images(self, batch: Dict, N: int = 8, sample: bool = True, ucg_keys: Lis x = self.get_input(batch) c, uc = self.conditioner.get_unconditional_conditioning( - batch, force_uc_zero_embeddings=ucg_keys if len(self.conditioner.embedders) > 0 else [], + batch, + force_uc_zero_embeddings=ucg_keys if len(self.conditioner.embedders) > 0 else [], ) sampling_kwargs = {} @@ -400,7 +419,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -438,12 +460,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ self._optimizer.zero_grad() @@ -491,20 +513,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -517,12 +539,13 @@ def _append_sequence_parallel_module_grads(self, module, grads): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the global batch for apex fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the global batch for apex fwd/bwd functions. + Global batch is a list of micro batches. """ # SD has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): if self.model.precache_mode == 'both': x = batch[self.model.input_key].to(torch.cuda.current_device()) @@ -565,7 +588,7 @@ def validation_step(self, dataloader_iter, batch_idx): return loss def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -678,20 +701,23 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py index e644ec79e5c6..17599d4b0932 100644 --- a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py +++ b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py @@ -163,7 +163,9 @@ def __init__(self, cfg): cuda_graph_enabled = cfg.get("capture_cudagraph_iters", -1) >= 0 if not cuda_graph_enabled: logging.info("Use custom random generator") - self.rng = torch.Generator(device=torch.cuda.current_device(),) + self.rng = torch.Generator( + device=torch.cuda.current_device(), + ) else: logging.info("Use system random generator since CUDA graph enabled") self.rng = None @@ -222,14 +224,12 @@ def register_schedule( ) if self.parameterization == "eps": - lvlb_weights = self.betas ** 2 / ( - 2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod) - ) + lvlb_weights = self.betas**2 / (2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod)) elif self.parameterization == "x0": lvlb_weights = 0.5 * np.sqrt(torch.Tensor(alphas_cumprod)) / (2.0 * 1 - torch.Tensor(alphas_cumprod)) elif self.parameterization == "v": lvlb_weights = torch.ones_like( - self.betas ** 2 / (2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod)) + self.betas**2 / (2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod)) ) else: raise NotImplementedError("mu not supported") @@ -239,7 +239,13 @@ def register_schedule( assert not torch.isnan(self.lvlb_weights).all() def init_from_ckpt( - self, path, ignore_keys=list(), only_model=False, load_vae=True, load_unet=True, load_encoder=True, + self, + path, + ignore_keys=list(), + only_model=False, + load_vae=True, + load_unet=True, + load_encoder=True, ): pl_sd = torch.load(path, map_location="cpu") if "state_dict" in list(pl_sd.keys()): @@ -561,7 +567,11 @@ def __init__(self, cfg, model_parallel_config): load_encoder = True if cfg.get("load_encoder", None) is None else cfg.load_encoder self.init_from_ckpt( - ckpt_path, ignore_keys, load_vae=load_vae, load_unet=load_unet, load_encoder=load_encoder, + ckpt_path, + ignore_keys, + load_vae=load_vae, + load_unet=load_unet, + load_encoder=load_encoder, ) self.restarted_from_ckpt = True @@ -569,7 +579,9 @@ def __init__(self, cfg, model_parallel_config): self.first_stage_model = self.first_stage_model.to(memory_format=torch.channels_last) self.model = self.model.to(memory_format=torch.channels_last) - def make_cond_schedule(self,): + def make_cond_schedule( + self, + ): self.cond_ids = torch.full(size=(self.num_timesteps,), fill_value=self.num_timesteps - 1, dtype=torch.long) ids = torch.round(torch.linspace(0, self.num_timesteps - 1, self.num_timesteps_cond)).long() self.cond_ids[: self.num_timesteps_cond] = ids @@ -686,7 +698,9 @@ def delta_border(self, h, w): def get_weighting(self, h, w, Ly, Lx, device): weighting = self.delta_border(h, w) weighting = torch.clip( - weighting, self.split_input_params["clip_min_weight"], self.split_input_params["clip_max_weight"], + weighting, + self.split_input_params["clip_min_weight"], + self.split_input_params["clip_max_weight"], ) weighting = weighting.view(1, h * w, 1).repeat(1, 1, Ly * Lx).to(device) @@ -1322,9 +1336,11 @@ def progressive_denoising( if cond is not None: if isinstance(cond, dict): cond = { - key: cond[key][:batch_size] - if not isinstance(cond[key], list) - else list(map(lambda x: x[:batch_size], cond[key])) + key: ( + cond[key][:batch_size] + if not isinstance(cond[key], list) + else list(map(lambda x: x[:batch_size], cond[key])) + ) for key in cond } else: @@ -1458,9 +1474,11 @@ def sample( if cond is not None: if isinstance(cond, dict): cond = { - key: cond[key][:batch_size] - if not isinstance(cond[key], list) - else list(map(lambda x: x[:batch_size], cond[key])) + key: ( + cond[key][:batch_size] + if not isinstance(cond[key], list) + else list(map(lambda x: x[:batch_size], cond[key])) + ) for key in cond } else: @@ -1731,7 +1749,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -1779,29 +1800,31 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): if self.loss_broadcast_src_rank is None: self.loss_broadcast_src_rank = parallel_state.get_pipeline_model_parallel_last_rank() torch.distributed.broadcast( - loss_mean, self.loss_broadcast_src_rank, group=parallel_state.get_pipeline_model_parallel_group(), + loss_mean, + self.loss_broadcast_src_rank, + group=parallel_state.get_pipeline_model_parallel_group(), ) return loss_mean, loss_dict def training_step(self, batch): """ - Notice: `training_step` used to have the following signature to support pipeline - parallelism: - - def training_step(self, dataloader_iter, batch_idx): - - However, full iteration CUDA Graph callback is not compatible with this signature - right now, due to we need to wrap the dataloader to generate static tensor outside - the CUDA Graph. This signature moves `next(dataloader)` into the CUDA Graph - capturing region, thus we disabled it. - - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Notice: `training_step` used to have the following signature to support pipeline + parallelism: + + def training_step(self, dataloader_iter, batch_idx): + + However, full iteration CUDA Graph callback is not compatible with this signature + right now, due to we need to wrap the dataloader to generate static tensor outside + the CUDA Graph. This signature moves `next(dataloader)` into the CUDA Graph + capturing region, thus we disabled it. + + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the megatron-core fwd/bwd functions @@ -1875,20 +1898,20 @@ def non_cuda_graph_capturable(self): self.log("timestamp", ts, batch_size=1, rank_zero_only=True) def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -1901,8 +1924,8 @@ def _append_sequence_parallel_module_grads(self, module, grads): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the global batch for apex fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the global batch for apex fwd/bwd functions. + Global batch is a list of micro batches. """ # noise_map, condition batch[self.cfg.first_stage_key] = batch[self.cfg.first_stage_key].cuda(non_blocking=True) @@ -1912,7 +1935,8 @@ def process_batch(batch): # SD has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): x, c = self.model.get_input(batch, self.cfg.first_stage_key) @@ -1959,7 +1983,7 @@ def validation_step(self, dataloader_iter): return loss def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -2016,11 +2040,13 @@ def build_train_valid_test_datasets(self): if self.cfg.first_stage_key.endswith("encoded") or self.cfg.first_stage_key.endswith("moments"): if self.cfg.cond_stage_key.endswith("clip_encoded"): self._train_ds, self._validation_ds = build_train_valid_precached_clip_datasets( - model_cfg=self.cfg, consumed_samples=self.compute_consumed_samples(0), + model_cfg=self.cfg, + consumed_samples=self.compute_consumed_samples(0), ) else: self._train_ds, self._validation_ds = build_train_valid_precached_datasets( - model_cfg=self.cfg, consumed_samples=self.compute_consumed_samples(0), + model_cfg=self.cfg, + consumed_samples=self.compute_consumed_samples(0), ) else: self._train_ds, self._validation_ds = build_train_valid_datasets( @@ -2045,7 +2071,8 @@ def setup_training_data(self, cfg): ) if self.cfg.cond_stage_key.endswith("clip_encoded"): collate_fn = get_collate_fn( - first_stage_key=self.cfg.first_stage_key, cond_stage_key=self.cfg.cond_stage_key, + first_stage_key=self.cfg.first_stage_key, + cond_stage_key=self.cfg.cond_stage_key, ) else: collate_fn = None @@ -2082,20 +2109,23 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -2253,23 +2283,26 @@ def _check_and_add_adapter(self, name, module, peft_name, peft_cfg, name_key_to_ ) def load_adapters( - self, filepath: str, peft_cfgs: Optional[Union[PEFTConfig, List[PEFTConfig]]] = None, map_location: str = None, + self, + filepath: str, + peft_cfgs: Optional[Union[PEFTConfig, List[PEFTConfig]]] = None, + map_location: str = None, ): """ - Utility method that restores only the adapter module(s), and not the entire model itself. - This allows the sharing of adapters which are often just a fraction of the size of the full model, - enabling easier deliver. + Utility method that restores only the adapter module(s), and not the entire model itself. + This allows the sharing of adapters which are often just a fraction of the size of the full model, + enabling easier deliver. - .. note:: + .. note:: - During restoration, assumes that the model does not currently already have one or more adapter modules. + During restoration, assumes that the model does not currently already have one or more adapter modules. - Args: - filepath: Filepath of the .ckpt or .nemo file. - peft_cfgs: One or more PEFTConfig objects that specify the PEFT method configuration. - If none, will infer from the .nemo checkpoint - map_location: Pytorch flag, where to place the adapter(s) state dict(s). - """ + Args: + filepath: Filepath of the .ckpt or .nemo file. + peft_cfgs: One or more PEFTConfig objects that specify the PEFT method configuration. + If none, will infer from the .nemo checkpoint + map_location: Pytorch flag, where to place the adapter(s) state dict(s). + """ def _modify_state_dict(state_dict): # Modify state key for Dreambooth inference @@ -2310,7 +2343,11 @@ def _modify_state_dict(state_dict): class DiffusionWrapper(pl.LightningModule, Serialization): def __init__( - self, diff_model_config, conditioning_key, inductor: bool = False, inductor_cudagraphs: bool = False, + self, + diff_model_config, + conditioning_key, + inductor: bool = False, + inductor_cudagraphs: bool = False, ): super().__init__() self.diffusion_model = DiffusionWrapper.from_config_dict(diff_model_config) diff --git a/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py b/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py index dfd50c5d2f59..d811ce94dbea 100644 --- a/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py +++ b/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py @@ -64,9 +64,9 @@ from megatron.core.distributed import DistributedDataParallel as McoreDDP from megatron.core.distributed import DistributedDataParallelConfig from megatron.core.fusions.fused_bias_dropout import get_bias_dropout_add - from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.models.gpt import GPTModel as MCoreGPTModel from megatron.core.models.vision.clip_vit_model import CLIPViTModel + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.attention import CrossAttention, CrossAttentionSubmodules from megatron.core.transformer.custom_layers.transformer_engine import ( diff --git a/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py b/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py index 097f34b1303c..7b127335d336 100644 --- a/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py +++ b/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py @@ -19,8 +19,8 @@ import torch import torch.nn as nn import torch.nn.functional as F -from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core import parallel_state +from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from omegaconf.dictconfig import DictConfig from pytorch_lightning.accelerators import CPUAccelerator diff --git a/nemo/collections/multimodal/speech_llm/models/modular_models.py b/nemo/collections/multimodal/speech_llm/models/modular_models.py index 2a57c3e6d0d8..aa21cf95bfa4 100644 --- a/nemo/collections/multimodal/speech_llm/models/modular_models.py +++ b/nemo/collections/multimodal/speech_llm/models/modular_models.py @@ -60,8 +60,8 @@ try: from megatron.core import InferenceParams, parallel_state, tensor_parallel - from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.models.gpt import GPTModel as MCoreGPTModel + from megatron.core.num_microbatches_calculator import get_num_microbatches HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py b/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py index aef6971dfe16..e9dacca17bc4 100644 --- a/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py +++ b/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py @@ -49,17 +49,11 @@ from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.core.classes.mixins import adapter_mixins from nemo.utils import AppState, logging, model_utils -from nemo.utils.apex_utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, -) +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state, tensor_parallel - from megatron.core.num_microbatches_calculator import ( - get_current_global_batch_size, - get_num_microbatches, - ) + from megatron.core.num_microbatches_calculator import get_current_global_batch_size, get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py b/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py index 189d7869949f..485c49cbd927 100644 --- a/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py +++ b/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py @@ -427,7 +427,10 @@ def training_step(self, dataloader_iter): self.log('lr', lr, batch_size=1) self.log('global_step', self.trainer.global_step, prog_bar=True, batch_size=1) self.log( - 'consumed_samples', self._compute_consumed_samples_after_training_step(), prog_bar=True, batch_size=1, + 'consumed_samples', + self._compute_consumed_samples_after_training_step(), + prog_bar=True, + batch_size=1, ) return loss_mean[0] @@ -480,7 +483,12 @@ def loss_func(self, output_tensor): ] # List of length "num_negatives", each tensor of shape (bs, embedding_dim) hard_negs_scores = ( - torch.multiply(queries.unsqueeze(0).repeat(len(hard_negs), 1, 1), torch.stack(hard_negs),).sum(axis=-1).T + torch.multiply( + queries.unsqueeze(0).repeat(len(hard_negs), 1, 1), + torch.stack(hard_negs), + ) + .sum(axis=-1) + .T ) # shape = (bs, num_negatives); Hard negatives are not shared between queries. scores = torch.cat([pos_inbatch_negs_scores, hard_negs_scores], axis=1) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py b/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py index 87574a70f7a2..701d24d5b942 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py @@ -57,8 +57,8 @@ try: from megatron.core import parallel_state - from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.models.bert.bert_layer_specs import bert_layer_with_transformer_engine_spec + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -206,8 +206,8 @@ def model_provider_func(self, pre_process, post_process): return model def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -300,7 +300,11 @@ def forward( model = self.model if self.mcore_bert: - output_tensor = model(input_ids, attention_mask, tokentype_ids=token_type_ids,) + output_tensor = model( + input_ids, + attention_mask, + tokentype_ids=token_type_ids, + ) else: output_tensor = model( input_ids, @@ -415,21 +419,24 @@ def training_step(self, dataloader_iter): self.log('lr', lr, batch_size=1) self.log('global_step', self.trainer.global_step, prog_bar=True, batch_size=1) self.log( - 'consumed_samples', self._compute_consumed_samples_after_training_step(), prog_bar=True, batch_size=1, + 'consumed_samples', + self._compute_consumed_samples_after_training_step(), + prog_bar=True, + batch_size=1, ) return loss_mean[0] def _make_data_iterator_list(self, data_iterator: Iterator) -> List[Iterator]: - """ Convert data iterator into form expected by Megatron - With interleaved pipeline parallelism, Megatron expects a - list of one data iterator per model chunk. Each model - chunk independently gets data from its data iterator, so - we need to interact with the data iterator multiple times - for each microbatch step. Instead of incorporating this - logic into the data loader, we cache the iterator's output - to the first model chunk and reuse it in the other model - chunks. + """Convert data iterator into form expected by Megatron + With interleaved pipeline parallelism, Megatron expects a + list of one data iterator per model chunk. Each model + chunk independently gets data from its data iterator, so + we need to interact with the data iterator multiple times + for each microbatch step. Instead of incorporating this + logic into the data loader, we cache the iterator's output + to the first model chunk and reuse it in the other model + chunks. """ if not isinstance(self.model, list) or len(self.model) == 1: @@ -695,9 +702,9 @@ def build_train_valid_test_datasets(self): ] if self.trainer.limit_val_batches <= 1.0 and isinstance(self.trainer.limit_val_batches, float): - train_valid_test_num_samples[ - 1 - ] = 1 # This is to make sure we only have one epoch on every validation iteration + train_valid_test_num_samples[1] = ( + 1 # This is to make sure we only have one epoch on every validation iteration + ) self._train_ds, self._validation_ds, self._test_ds = dataset_utils.build_train_valid_test_datasets( cfg=self.cfg, @@ -731,20 +738,20 @@ def build_train_valid_test_datasets(self): return self._train_ds, self._validation_ds, self._test_ds def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. + No need to call it here. """ return def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ return def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -814,12 +821,12 @@ def setup(self, stage=None): self.setup_transformer_engine_tp_groups() def setup_transformer_engine_tp_groups(self): - """ This should be called after model parallel groups have been initialized - and only needs to be called when using Transformer Engine. + """This should be called after model parallel groups have been initialized + and only needs to be called when using Transformer Engine. """ for module in self.get_bert_module_list(): """Set TP group - Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 + Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 """ # Deep iterate but skip self to avoid infinite recursion. for index, child in enumerate(module.modules()): @@ -841,9 +848,9 @@ def get_bert_module_list(self): return [self.model] def allreduce_sequence_parallel_gradients(self): - """ All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. - Modified from megatron-lm: - https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 + """All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. + Modified from megatron-lm: + https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 """ grads = [] @@ -923,10 +930,10 @@ def setup_test_data(self, cfg): self._test_dl = self.build_pretraining_data_loader(self._test_ds, consumed_samples) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch @@ -1146,10 +1153,10 @@ def on_load_checkpoint(self, checkpoint) -> None: parallel_state.set_virtual_pipeline_model_parallel_rank(0) def build_transformer_config(self) -> TransformerConfig: - """ Builds the megatron core gpt transformer config for the model. - For attributes in the nemo model config that are the same - as the megatron core TransformerConfig, we will use the value from the nemo model config. - For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. + """Builds the megatron core gpt transformer config for the model. + For attributes in the nemo model config that are the same + as the megatron core TransformerConfig, we will use the value from the nemo model config. + For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. """ activation = self.cfg.get('activation', 'gelu') assert activation == 'gelu', "Only gelu activation is support for BERT at the moment." diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index ffa358a091e1..ec9c1f846d46 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -84,7 +84,6 @@ from megatron.core.dist_checkpointing.mapping import LocalNonpersitentObject, ShardedObject from megatron.core.distributed import DistributedDataParallel as McoreDDP from megatron.core.distributed import DistributedDataParallelConfig, finalize_model_grads - from megatron.core.num_microbatches_calculator import get_num_microbatches # NeMo's implementation of the get_gpt_layer_ammo_spec function is temporarily used # from megatron.core.inference.gpt.model_specs import get_gpt_layer_ammo_spec @@ -93,6 +92,7 @@ get_gpt_layer_local_spec, get_gpt_layer_with_transformer_engine_spec, ) + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -114,6 +114,7 @@ try: import transformer_engine from transformer_engine.pytorch import module as te_module + from nemo.collections.nlp.modules.common.hyena.hyena_spec import get_gpt_layer_with_te_and_hyena_spec HAVE_TE = True diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py index a32c8f035088..9ab17189ca64 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py @@ -37,17 +37,11 @@ from nemo.collections.nlp.parts.mixins.nlp_adapter_mixins import NLPAdapterModelMixin from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging -from nemo.utils.apex_utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, -) +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state - from megatron.core.num_microbatches_calculator import ( - get_current_global_batch_size, - get_num_microbatches, - ) + from megatron.core.num_microbatches_calculator import get_current_global_batch_size, get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py index 2949f1d5ee42..c4625787a10c 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py @@ -46,15 +46,11 @@ ) from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging -from nemo.utils.apex_utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, -) +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state, tensor_parallel from megatron.core.enums import ModelType - from megatron.core.num_microbatches_caculator import get_num_microbatches from megatron.core.models.T5 import T5Model as MCoreT5Model from megatron.core.models.T5.t5_spec import ( get_t5_decoder_with_local_block_spec, @@ -62,6 +58,7 @@ get_t5_encoder_with_local_block_spec, get_t5_encoder_with_transformer_engine_block_spec, ) + from megatron.core.num_microbatches_caculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig diff --git a/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py b/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py index deba1f61c576..3eb78d34b3f4 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py @@ -69,12 +69,12 @@ try: from megatron.core import InferenceParams, parallel_state - from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.models.retro import RetroModel as MCoreRetroModel from megatron.core.models.retro.config import RetroConfig from megatron.core.models.retro.decoder_spec import get_retro_decoder_block_spec from megatron.core.models.retro.utils import get_config_path as get_retro_config_path from megatron.core.models.retro.utils import get_gpt_data_dir as get_retro_data_dir + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig diff --git a/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py b/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py index 41e6b3fc43ee..82bd84c8ada8 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py @@ -185,7 +185,10 @@ def build_train_valid_test_datasets(self, stage): logging.info(f'Length of train dataset: {len(self._train_ds)}') def build_data_loader( - self, dataset, data_cfg, consumed_samples=0, + self, + dataset, + data_cfg, + consumed_samples=0, ): """Buld dataloader given an input dataset.""" logging.info(f'Building dataloader with consumed samples: {consumed_samples}') @@ -215,13 +218,19 @@ def setup_training_dataloader(self): if hasattr(self, '_train_ds'): consumed_samples = self.compute_consumed_samples(0) self._train_dl = self.build_data_loader( - dataset=self._train_ds, data_cfg=self.cfg.data.train_ds, consumed_samples=consumed_samples, + dataset=self._train_ds, + data_cfg=self.cfg.data.train_ds, + consumed_samples=consumed_samples, ) def setup_eval_dataloader(self, datasets, data_cfg): dataloaders = [] for dataset in datasets: - eval_dl = self.build_data_loader(dataset=dataset, data_cfg=data_cfg, consumed_samples=0,) + eval_dl = self.build_data_loader( + dataset=dataset, + data_cfg=data_cfg, + consumed_samples=0, + ) dataloaders.append(eval_dl) return dataloaders diff --git a/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py b/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py index 65c1a114d0a0..0773e4abe811 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py @@ -34,10 +34,7 @@ from nemo.collections.nlp.parts.nlp_overrides import NLPSaveRestoreConnector from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging -from nemo.utils.apex_utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, -) +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state @@ -57,21 +54,21 @@ class MegatronT5PromptLearningModel(MegatronBasePromptLearningModel): """ - Model class for prompt-tuning or p-tuning a pretrained Megatron T5 model. + Model class for prompt-tuning or p-tuning a pretrained Megatron T5 model. Prompt Tuning initalizes virtual prompt embeddings directly from a copy of certain token embeddings from the the pretrained T5 model's vocabulary - and directly tunes these embedding weights. The token embeddings used in - initalization are specified by the user in the config file. The model can - be prompt-tuned for multiple tasks at once. Virtual prompts are stored in a - prompt table and can be added or deleted without disrupting virtual prompts - for other tasks. + and directly tunes these embedding weights. The token embeddings used in + initalization are specified by the user in the config file. The model can + be prompt-tuned for multiple tasks at once. Virtual prompts are stored in a + prompt table and can be added or deleted without disrupting virtual prompts + for other tasks. P-tuning initializes an LSTM encoder model that generates virtual prompt embeddings for every task. Each task shares the same encoder. After p-tuning is compelete, the learned virtual prompts can be saved to the prompt table - using add_ptuned_prompts_to_prompt_table(). Thus, if a user wants to add a - new virtual prompt via p-tuning, they do not need to retrain on all previous + using add_ptuned_prompts_to_prompt_table(). Thus, if a user wants to add a + new virtual prompt via p-tuning, they do not need to retrain on all previous tasks. This gives p-tuning the same task flexiblity as prompt-tuning. """ @@ -85,7 +82,15 @@ def first_stage_of_pipeline(self): return False def forward( - self, input_ids, dec_input, enc_mask, dec_mask, position_ids, taskname_ids, labels=None, inference=False, + self, + input_ids, + dec_input, + enc_mask, + dec_mask, + position_ids, + taskname_ids, + labels=None, + inference=False, ): """ Special forward method for p-tuning/prompt-tuning pretrained @@ -166,8 +171,8 @@ def load_frozen_model(self, cfg, trainer): def fwd_bwd_step(self, dataloader_iter, batch_idx, forward_only): """ - Dataloader produces a global batch which is turned into a list of microbatches. - The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. + Dataloader produces a global batch which is turned into a list of microbatches. + The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. """ # Get seq length of batch batch = next(dataloader_iter) @@ -222,15 +227,15 @@ def loss_func(output_tensor): return fwd_output_and_loss_func def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. + No need to call it here. """ return def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ return @@ -283,9 +288,9 @@ def get_predictions(self, input_ids, enc_mask, encoder_input, labels): enc_mask=enc_mask, num_tokens_to_generate=self.decoder_seq_length, encoder_input=encoder_input, - bos_id=self.tokenizer.pad_id - if self.cfg.data.get('decoder_starts_with_pad', False) - else self.tokenizer.bos_id, + bos_id=( + self.tokenizer.pad_id if self.cfg.data.get('decoder_starts_with_pad', False) else self.tokenizer.bos_id + ), ) # Special ids to text function to handle stripping and special tokens with sentencepiece tokenizers. preds_text = MegatronT5SFTModel.ids_to_text(predicted_token_ids, self.tokenizer) @@ -377,7 +382,8 @@ def on_validation_epoch_end(self): gather_results_dedup = list(set(itertools.chain(*gather_results))) val_metric_dict = self.validation_metric.get_score( - [i[2] for i in gather_results_dedup], [i[1] for i in gather_results_dedup], + [i[2] for i in gather_results_dedup], + [i[1] for i in gather_results_dedup], ) for metric, val in val_metric_dict.items(): @@ -437,9 +443,9 @@ def build_virtual_prompt_dataset( drop_last=drop_last, num_workers=num_workers, pin_memory=pin_memory, - persistent_workers=True - if num_workers > 0 - else False, # (@adithyare and @eharper) We need to set this to True to get around issues with spawn=True + persistent_workers=( + True if num_workers > 0 else False + ), # (@adithyare and @eharper) We need to set this to True to get around issues with spawn=True ) print('build success', len(dataloader), dataset_paths) return dataset, dataloader @@ -469,9 +475,9 @@ def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: int = 0) -> A enc_mask=enc_mask, num_tokens_to_generate=self.decoder_seq_length, encoder_input=encoder_input, - bos_id=self.tokenizer.pad_id - if self.cfg.data.get('decoder_starts_with_pad', False) - else self.tokenizer.bos_id, + bos_id=( + self.tokenizer.pad_id if self.cfg.data.get('decoder_starts_with_pad', False) else self.tokenizer.bos_id + ), ) # Special ids to text function to handle stripping and special tokens with sentencepiece tokenizers. preds_text = MegatronT5SFTModel.ids_to_text(predicted_token_ids, self.tokenizer) @@ -514,7 +520,7 @@ def on_predict_epoch_end(self) -> None: input_prediction_pair = [] correct = 0 - for (input, pred, label) in gather_results_dedup: + for input, pred, label in gather_results_dedup: input_prediction_pair.append((input, pred)) if label: if pred == label: diff --git a/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py b/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py index 49231840a23f..1b04c66e4389 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py @@ -30,17 +30,11 @@ from nemo.collections.nlp.parts.mixins.nlp_adapter_mixins import NLPAdapterModelMixin from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging -from nemo.utils.apex_utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, -) +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: + from megatorn.core.num_microbatches_calculator import get_current_global_batch_size, get_num_microbatches from megatron.core import parallel_state - from megatorn.core.num_microbatches_calculator import ( - get_current_global_batch_size, - get_num_microbatches, - ) from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -53,7 +47,7 @@ class MegatronT5SFTModel(NLPAdapterModelMixin, MegatronT5Model): - """ T5 Finetuning model in the same format as MegatronGPTSFTModel """ + """T5 Finetuning model in the same format as MegatronGPTSFTModel""" def __init__(self, cfg: DictConfig, trainer: Trainer): super().__init__(cfg, trainer=trainer) @@ -282,8 +276,8 @@ def _reconfigure_and_process_inference_batch(self, batch, ds_config): def fwd_bwd_step(self, dataloader_iter, forward_only): """ - Dataloader produces a global batch which is turned into a list of microbatches. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Dataloader produces a global batch which is turned into a list of microbatches. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # If tuple, 1st element in it is the batch since dataloader_iter returns batch, batch_idx, dataloader_idx batch = next(dataloader_iter) @@ -597,7 +591,13 @@ def on_test_epoch_end(self): # return super().on_test_epoch_end() def build_data_loader( - self, dataset, global_batch_size, shuffle, num_workers, pin_memory, drop_last, + self, + dataset, + global_batch_size, + shuffle, + num_workers, + pin_memory, + drop_last, ): """Buld dataloader given an input dataset.""" @@ -644,9 +644,11 @@ def setup_eval_data(self, datasets, data_cfg): for dataset in datasets: eval_dl = self.build_data_loader( dataset, - global_batch_size=self.cfg.data.test_ds.global_batch_size - if hasattr(self.cfg.data, "test_ds") - else self.cfg.data.validation_ds.global_batch_size, + global_batch_size=( + self.cfg.data.test_ds.global_batch_size + if hasattr(self.cfg.data, "test_ds") + else self.cfg.data.validation_ds.global_batch_size + ), shuffle=data_cfg.shuffle, num_workers=data_cfg.num_workers, pin_memory=data_cfg.pin_memory, diff --git a/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py b/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py index 9e3fa61c2295..6a76f88cd229 100644 --- a/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py +++ b/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py @@ -51,10 +51,7 @@ from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.core.classes import Exportable from nemo.utils import AppState, logging, timers -from nemo.utils.apex_utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, -) +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state @@ -202,17 +199,21 @@ def _build_tokenizer(self): self.encoder_tokenizer, self.decoder_tokenizer = MTEncDecModel.setup_enc_dec_tokenizers( encoder_tokenizer_library=self.encoder_tokenizer_library, encoder_tokenizer_model=encoder_tokenizer_model, - encoder_bpe_dropout=self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) - if self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) is not None - else 0.0, + encoder_bpe_dropout=( + self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) + if self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) is not None + else 0.0 + ), encoder_model_name=self._cfg.encoder_tokenizer.get('type', None), encoder_r2l=self._cfg.encoder_tokenizer.get('r2l', False), decoder_tokenizer_library=self.decoder_tokenizer_library, encoder_tokenizer_vocab_file=self._cfg.encoder_tokenizer.get('vocab_file', None), decoder_tokenizer_model=decoder_tokenizer_model, - decoder_bpe_dropout=self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) - if self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) is not None - else 0.0, + decoder_bpe_dropout=( + self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) + if self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) is not None + else 0.0 + ), decoder_model_name=self._cfg.encoder_tokenizer.get('type', None), decoder_r2l=self._cfg.decoder_tokenizer.get('r2l', False), encoder_sentencepiece_legacy=self._cfg.encoder_tokenizer.get('sentencepiece_legacy', False), @@ -244,10 +245,14 @@ def _build_vocab(self): f"NMT-XLM objective requires sentencepiece tokenizer, but got decoder tokenizer library : {self.cfg.decoder_tokenizer.library}" ) MegatronT5Model.add_special_tokens_to_tokenizer( - tokenizer=self.encoder_tokenizer, tokenizer_cfg=self.cfg.encoder_tokenizer, dataset_type='ul2', + tokenizer=self.encoder_tokenizer, + tokenizer_cfg=self.cfg.encoder_tokenizer, + dataset_type='ul2', ) MegatronT5Model.add_special_tokens_to_tokenizer( - tokenizer=self.decoder_tokenizer, tokenizer_cfg=self.cfg.decoder_tokenizer, dataset_type='ul2', + tokenizer=self.decoder_tokenizer, + tokenizer_cfg=self.cfg.decoder_tokenizer, + dataset_type='ul2', ) # Set up pre and post processors as well. @@ -269,7 +274,10 @@ def _build_vocab(self): else: # After this call, the model will have self.source_processor and self.target_processor objects self.source_processor, self.target_processor = MTEncDecModel.setup_pre_and_post_processing_utils( - self.src_language, self.tgt_language, self.encoder_tokenizer_library, self.decoder_tokenizer_library, + self.src_language, + self.tgt_language, + self.encoder_tokenizer_library, + self.decoder_tokenizer_library, ) self.multilingual_ids = [None] @@ -281,8 +289,8 @@ def _build_vocab(self): def fwd_bwd_step(self, dataloader_iter, forward_only): """ - Dataloader produces a global batch which is turned into a list of microbatches. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Dataloader produces a global batch which is turned into a list of microbatches. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # If tuple, 1st element in it is the batch since dataloader_iter returns batch, batch_idx, dataloader_idx batch = next(dataloader_iter) @@ -343,13 +351,19 @@ def eval_step(self, dataloader_iter): # Post-process the translations and inputs to log. preds = self.postprocess_outputs( - outputs=predicted_tokens_ids, tokenizer=self.decoder_tokenizer, processor=target_processor, + outputs=predicted_tokens_ids, + tokenizer=self.decoder_tokenizer, + processor=target_processor, ) labels = self.postprocess_outputs( - outputs=labels, tokenizer=self.decoder_tokenizer, processor=target_processor, + outputs=labels, + tokenizer=self.decoder_tokenizer, + processor=target_processor, ) encoder_inputs = self.postprocess_outputs( - outputs=tokens_enc, tokenizer=self.encoder_tokenizer, processor=source_processor, + outputs=tokens_enc, + tokenizer=self.encoder_tokenizer, + processor=source_processor, ) loss_dict = { @@ -773,12 +787,12 @@ def build_memmap_dataset_from_config(self, cfg: DictConfig): tgt_file=tgt_file, num_samples=num_samples, prepend_id=multilingual_ids[idx], - src_language=self.src_language - if not isinstance(self.src_language, ListConfig) - else self.src_language[idx], - tgt_language=self.tgt_language - if not isinstance(self.tgt_language, ListConfig) - else self.tgt_language[idx], + src_language=( + self.src_language if not isinstance(self.src_language, ListConfig) else self.src_language[idx] + ), + tgt_language=( + self.tgt_language if not isinstance(self.tgt_language, ListConfig) else self.tgt_language[idx] + ), ) datasets.append(dataset) dataset = BlendableDataset( diff --git a/nemo/collections/vision/models/megatron_vit_classification_models.py b/nemo/collections/vision/models/megatron_vit_classification_models.py index 1b6ed4ed85b1..3417b04299dc 100644 --- a/nemo/collections/vision/models/megatron_vit_classification_models.py +++ b/nemo/collections/vision/models/megatron_vit_classification_models.py @@ -41,8 +41,8 @@ from nemo.utils import logging try: - from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -274,7 +274,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): grad_sync_func = None param_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) grad_sync_func = self.reduce_overlap_gradients param_sync_func = self.sync_overlap_parameters @@ -345,12 +348,12 @@ def initialize_ub_func(self): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # Initialize userbuffer communicators. if self.initialize_ub: @@ -413,20 +416,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -438,9 +441,9 @@ def _append_sequence_parallel_module_grads(self, module, grads): grads.append(grad.data) def allreduce_sequence_parallel_gradients(self): - """ All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. - Modified from megatron-lm: - https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 + """All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. + Modified from megatron-lm: + https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 """ grads = [] @@ -500,10 +503,10 @@ def fwd_output_only_func(batch, model): def validation_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. """ mode = 'test' if self.trainer.testing else 'val' @@ -513,8 +516,10 @@ def validation_step(self, dataloader_iter): loss, accuracy = self.fwd_bwd_step(dataloader_iter, True) - self.validation_step_outputs.append((loss, accuracy)) if mode == 'val' else self.test_step_outputs.append( - (loss, accuracy) + ( + self.validation_step_outputs.append((loss, accuracy)) + if mode == 'val' + else self.test_step_outputs.append((loss, accuracy)) ) return loss, accuracy @@ -557,7 +562,9 @@ def build_train_valid_test_datasets(self): raise ValueError("limit_val_batches must be an integer or float less than or equal to 1.0.") self._train_ds, self._validation_ds = build_train_valid_datasets( - model_cfg=self.cfg, data_path=self.cfg.data.data_path, image_size=(self.cfg.img_h, self.cfg.img_w), + model_cfg=self.cfg, + data_path=self.cfg.data.data_path, + image_size=(self.cfg.img_h, self.cfg.img_w), ) self._test_ds = None @@ -697,16 +704,16 @@ def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: Optional[int] raise NotImplementedError def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/utils/apex_utils.py b/nemo/utils/apex_utils.py index 8b359f109e52..9b3259958f62 100644 --- a/nemo/utils/apex_utils.py +++ b/nemo/utils/apex_utils.py @@ -1,26 +1,30 @@ -from typing import Optional -import torch import warnings +from typing import Optional +import torch from megatron.core.num_microbatches_calculator import ( _GLOBAL_NUM_MICROBATCHES_CALCULATOR, build_num_microbatches_calculator, ) + def _reconfigure_microbatch_calculator( - rank: int, - rampup_batch_size: Optional[List[int]], - global_batch_size: int, - micro_batch_size: int, - data_parallel_size: int, + rank: int, + rampup_batch_size: Optional[List[int]], + global_batch_size: int, + micro_batch_size: int, + data_parallel_size: int, ) -> None: if torch.distributed.get_rank() == 0: import warnings + warnings.warn("This function is only for unittest") global _GLOBAL_NUM_MICROBATCHES_CALCULATOR _GLOBAL_NUM_MICROBATCHES_CALCULATOR = build_num_microbatches_calculator( - rank, rampup_batch_size, global_batch_size, micro_batch_size, data_parallel_size) + rank, rampup_batch_size, global_batch_size, micro_batch_size, data_parallel_size + ) + def get_micro_batch_size(): - return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.micro_batch_size \ No newline at end of file + return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.micro_batch_size diff --git a/tests/collections/nlp/test_rampup_batch_size.py b/tests/collections/nlp/test_rampup_batch_size.py index 58d7ecff7538..ae2ca07d20ac 100644 --- a/tests/collections/nlp/test_rampup_batch_size.py +++ b/tests/collections/nlp/test_rampup_batch_size.py @@ -16,13 +16,10 @@ import pytest import torch +from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR, get_num_microbatches from omegaconf import DictConfig from pytorch_lightning import Trainer -from megatron.core.num_microbatches_calculator import ( - _GLOBAL_NUM_MICROBATCHES_CALCULATOR, - get_num_microbatches, -) from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy