diff --git a/docs/source/multimodal/mllm/sequence_packing.rst b/docs/source/multimodal/mllm/sequence_packing.rst index b061ee1d89c6..c5587e3f7173 100644 --- a/docs/source/multimodal/mllm/sequence_packing.rst +++ b/docs/source/multimodal/mllm/sequence_packing.rst @@ -103,15 +103,13 @@ To train with packed sequences, modify four items in the SFT/PEFT config file. .. code-block:: bash - ++model.data.data_prefix=/lustre/fsw/coreai_dlalgo_genai/datasets/LLaVA-Instruct-150K/packed_seq_12288_336_v1/packed_seq_dataset - ++model.data.crop_size=[224,224] ++model.data.packed_sequence=True 2. Use the new dataset file instead of the original JSONL file and ensure the crop sizes are correctly specified since images are now cached: .. code-block:: bash - ++model.data.data_prefix=/path/to/datasets/LLaVA-Instruct-150K/packed_seq_12288_336_v1/packed_seq_dataset + ++model.data.data_path=/path/to/datasets/LLaVA-Instruct-150K/packed_seq_12288_336_v1/packed_seq_dataset ++model.data.crop_size=[336,336] 4. Adjust batch sizes: diff --git a/examples/multimodal/multimodal_llm/neva/conf/neva_config.yaml b/examples/multimodal/multimodal_llm/neva/conf/neva_config.yaml index 9315b0fa3712..89e61a8b917c 100644 --- a/examples/multimodal/multimodal_llm/neva/conf/neva_config.yaml +++ b/examples/multimodal/multimodal_llm/neva/conf/neva_config.yaml @@ -38,7 +38,7 @@ exp_manager: save_top_k: 10 mode: min always_save_nemo: False # saves nemo file during validation, not implemented for model parallel - save_nemo_on_train_end: False # not recommended when training large models on clusters with short time limits + save_nemo_on_train_end: True # not recommended when training large models on clusters with short time limits filename: 'megatron_clip--{val_loss:.2f}-{step}-{consumed_samples}' model_parallel_size: ${multiply:${model.tensor_model_parallel_size}, ${model.pipeline_model_parallel_size}} ema: @@ -60,6 +60,7 @@ model: tensor_model_parallel_size: 1 # intra-layer model parallelism pipeline_model_parallel_size: 1 # inter-layer model parallelism + context_parallel_size: 1 # kqv model parallelism virtual_pipeline_model_parallel_size: null # interleaved pipeline restore_from_path: null # used in fine-tuning @@ -185,7 +186,22 @@ model: packed_sequence: False num_workers: 8 dataloader_type: cyclic - data_path: + data_path: + # This configuration can either be a single string pointing to a data path, or a list of data paths for data blending. + # When using a blendable dataset, be aware of the following: + # - The sampling of data across datasets depends on both the relative sizes of the datasets and the concat_sampling_probabilities. + # - For example, if there are two datasets with lengths of 100 and 10, and the sampling probabilities are set to 0.5 for each, + # then 55 samples would be taken from the dataset of length 100 and 55 from the dataset of length 10 (with repetition). + # - This means not all data might be seen in one epoch, and smaller datasets may need to be repeated to match the number of samples. + # Please adjust your concat_sampling_probabilities accordingly to ensure balanced and effective training. + + # - /path/to/json + # - /path/to/json + global_batch_size: ${model.global_batch_size} + micro_batch_size: ${model.micro_batch_size} + concat_sampling_probabilities: null + # - 0.5 + # - 0.5 lazy_preprocess: True is_multimodal: True media_type: image # currently supported: image diff --git a/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py b/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py old mode 100644 new mode 100755 index 60f882fa9821..b670d171fd1d --- a/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py +++ b/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py @@ -271,6 +271,7 @@ def main(): logging.info(f"Output directory: {output_dir}") prefix_path = f"{output_dir}/packed_seq_dataset" + os.makedirs(prefix_path, exist_ok=True) # Original Datasets to Sequence Lengths Files builders = {} for item_dict in tqdm(train_dl, desc="Building indexed datasets"): diff --git a/nemo/collections/multimodal/data/neva/neva_dataset.py b/nemo/collections/multimodal/data/neva/neva_dataset.py index 17cb6e6cf644..8102d179757e 100644 --- a/nemo/collections/multimodal/data/neva/neva_dataset.py +++ b/nemo/collections/multimodal/data/neva/neva_dataset.py @@ -1004,6 +1004,8 @@ def __len__(self): return len(self.list_data_dict) def __getitem__(self, i) -> Dict[str, torch.Tensor]: + if isinstance(i, np.integer): + i = int(i) sources = self.list_data_dict[i] if isinstance(i, int): sources = [sources] @@ -1190,7 +1192,6 @@ class NevaDataset(LazySupervisedDataset): """Dataset for supervised fine-tuning.""" def __init__(self, data_path: str, tokenizer, multimodal_cfg: dict, data_cfg: dict): - if data_path.endswith(".json"): super(NevaDataset, self).__init__(data_path, tokenizer, multimodal_cfg, data_cfg) @@ -1313,7 +1314,7 @@ def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]: return batch -def make_supervised_data_module(tokenizer, image_processor, model_cfg) -> Dict: +def make_supervised_data_module(tokenizer, image_processor, model_cfg, each_file_from_path=None) -> Dict: """Make dataset and collator for supervised fine-tuning.""" data_cfg = model_cfg.data mm_cfg = model_cfg.mm_cfg @@ -1321,10 +1322,10 @@ def make_supervised_data_module(tokenizer, image_processor, model_cfg) -> Dict: if getattr(model_cfg, 'no_seqlen_plus_one_input_tokens', False): add_extra_token = 0 crop_size = mm_cfg.vision_encoder.get("crop_size", (224, 224)) - + data_path = each_file_from_path if each_file_from_path is not None else data_cfg.data_path train_dataset = NevaDataset( tokenizer=tokenizer, - data_path=data_cfg.data_path, + data_path=data_path, multimodal_cfg=dict( is_multimodal=data_cfg.is_multimodal, sep_image_conv_front=data_cfg.sep_image_conv_front, diff --git a/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py b/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py index 40b1b4ed9a02..6218332c2bde 100644 --- a/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py +++ b/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py @@ -21,7 +21,7 @@ import torch import torch.nn.functional as F from einops import rearrange, reduce, repeat -from omegaconf.dictconfig import DictConfig +from omegaconf import DictConfig, ListConfig from pkg_resources import packaging from pytorch_lightning.trainer.trainer import Trainer from transformers import CLIPVisionModel, SiglipVisionModel @@ -38,6 +38,10 @@ MegatronCLIPModel, ) from nemo.collections.multimodal.parts.utils import create_image_processor, load_nemo_model_weights +from nemo.collections.nlp.data.language_modeling.megatron.base_dataset_utils import ( + get_datasets_weights_and_num_samples, +) +from nemo.collections.nlp.data.language_modeling.megatron.blendable_dataset import BlendableDataset from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import MegatronPretrainingSampler from nemo.collections.nlp.models.language_modeling.megatron.gpt_model import GPTModel from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel, get_specs @@ -1242,15 +1246,132 @@ def setup(self, stage=None): if self.cfg.get('transformer_engine', False): self.setup_transformer_engine_tp_groups() + def build_train_valid_test_datasets_blend(self): + logging.info('Building Blending Neva datasets.') + + train_datasets = [] + valid_datasets = [] + + data_cfg = self.cfg.data + is_packed_sequence = data_cfg.get("packed_sequence", False) + + if is_packed_sequence: + assert self.cfg.micro_batch_size == 1, "Micro batch size must be 1 if using packed sequence" + + # Check if concat_sampling_probabilities is properly set + if data_cfg.get('concat_sampling_probabilities') is None or not isinstance( + data_cfg.concat_sampling_probabilities, ListConfig + ): + raise ValueError( + "concat_sampling_probabilities must be a ListConfig with the same number of entries as data_path." + ) + + if len(data_cfg.concat_sampling_probabilities) != len(data_cfg.data_path): + raise ValueError( + f"concat_sampling_probabilities must be of the same size as number of files from data path. " + f"Provided size {len(data_cfg.concat_sampling_probabilities)}, number of datasets {len(data_cfg.data_path)}" + ) + + for each_file_from_path in data_cfg.data_path: + if is_packed_sequence: + train_dataset = NevaPackedSeqDatatset( + each_file_from_path, self.cfg.mm_cfg.vision_encoder.get("crop_size") + ) + valid_dataset = NevaPackedSeqDatatset( + each_file_from_path, self.cfg.mm_cfg.vision_encoder.get("crop_size") + ) + else: + ds_dict = make_supervised_data_module( + tokenizer=self.tokenizer, + image_processor=( + self.model.module.image_processor + if hasattr(self.model, "module") + else self.model.image_processor + ), + model_cfg=self.cfg, + each_file_from_path=each_file_from_path, + ) + train_dataset = ds_dict["train_dataset"] + valid_dataset = ds_dict["eval_dataset"] + + train_datasets.append(train_dataset) + valid_datasets.append(valid_dataset) + + # Create BlendableDataset for training + if self.trainer.max_steps is None or self.trainer.max_steps <= 0: + raise ValueError(f'Trainer max_steps must be set to a positive integer. Found {self.trainer.max_steps}') + + num_train_samples = self.trainer.max_steps * data_cfg.global_batch_size + _, _, num_train_samples_per_dataset = get_datasets_weights_and_num_samples( + data_prefix=[ + weight for pair in zip(data_cfg.concat_sampling_probabilities, data_cfg.data_path) for weight in pair + ], + num_samples=[num_train_samples], + ) + num_train_samples_after_blend = sum([x[0] for x in num_train_samples_per_dataset]) + + logging.info(f"Number of train datasets: {len(train_datasets)}") + logging.info(f"Lengths of train datasets: {[len(ds) for ds in train_datasets]}") + logging.info(f"Number of train datasets after blending: {num_train_samples_after_blend}") + + if is_packed_sequence: + num_train_samples_after_blend = sum([len(ds) for ds in train_datasets]) + + self._train_ds = BlendableDataset( + datasets=train_datasets, weights=data_cfg.concat_sampling_probabilities, size=num_train_samples_after_blend + ) + + self._validation_ds = BlendableDataset( + datasets=valid_datasets, weights=data_cfg.concat_sampling_probabilities, size=num_train_samples_after_blend + ) + + logging.info(f'Length of train dataset: {len(self._train_ds)}') + logging.info(f'Length of validation dataset: {len(self._validation_ds)}') + + return self._train_ds, self._validation_ds + def build_train_valid_test_datasets(self): logging.info('Building Neva datasets.') + + if isinstance(self.cfg.data.data_path, (list, ListConfig)): + if len(self.cfg.data.data_path) > 1: + # Only consider data blending if there are multiple dataset paths + if self.cfg.data.get('concat_sampling_probabilities') is None: + logging.warning("No sampling probabilities provided. Defaulting to uniform sampling.") + self.cfg.data.concat_sampling_probabilities = [1 / len(self.cfg.data.data_path)] * len( + self.cfg.data.data_path + ) + else: + # Normalize the sampling probabilities if they don't sum to 1 + total = sum(self.cfg.data.concat_sampling_probabilities) + if total != 1: + logging.warning(f"Concat_sampling_probabilities sum to {total}. Normalizing to sum to 1.") + self.cfg.data.concat_sampling_probabilities = [ + prob / total for prob in self.cfg.data.concat_sampling_probabilities + ] + return self.build_train_valid_test_datasets_blend() + elif len(self.cfg.data.data_path) == 1: + if self.cfg.data.concat_sampling_probabilities is not None: + logging.warning( + "Using sampling probabilities with a single dataset has no effect. Defaulting to None and not using blend dataset." + ) + self.cfg.data.concat_sampling_probabilities = None + self.cfg.data.data_path = self.cfg.data.data_path[0] + else: + raise ValueError("data_path must contain at least one valid path.") + elif isinstance(self.cfg.data.data_path, str): + pass + else: + raise TypeError("data_path must be a list of paths or a single string") + if self.cfg.data.get("packed_sequence", False): assert self.cfg.micro_batch_size == 1, "Micro batch size must be 1 if using packed sequence" + self._train_ds = NevaPackedSeqDatatset( - self.cfg.data.data_prefix, self.cfg.mm_cfg.vision_encoder.get("crop_size") + self.cfg.data.data_path, self.cfg.mm_cfg.vision_encoder.get("crop_size") ) self._validation_ds = NevaPackedSeqDatatset( - self.cfg.data.data_prefix, self.cfg.mm_cfg.vision_encoder.get("crop_size") + self.cfg.data.data_path, self.cfg.mm_cfg.vision_encoder.get("crop_size") ) else: ds_dict = make_supervised_data_module( diff --git a/nemo/collections/nlp/data/language_modeling/megatron/blendable_dataset.py b/nemo/collections/nlp/data/language_modeling/megatron/blendable_dataset.py index ae2b5fff6be1..39b64ae89865 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/blendable_dataset.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/blendable_dataset.py @@ -25,7 +25,6 @@ class BlendableDataset(torch.utils.data.Dataset): def __init__(self, datasets, weights, size): - self.datasets = datasets num_datasets = len(datasets) assert num_datasets == len(weights) @@ -43,6 +42,7 @@ def __init__(self, datasets, weights, size): assert num_datasets < 255 self.dataset_index = np.zeros(self.size, dtype=np.uint8) self.dataset_sample_index = np.zeros(self.size, dtype=np.int64) + app_state = AppState() try: if app_state.local_rank == 0: @@ -74,6 +74,13 @@ def __len__(self): def __getitem__(self, idx): dataset_idx = self.dataset_index[idx] sample_idx = self.dataset_sample_index[idx] + dataset_size = len(self.datasets[dataset_idx]) + # Ensure the sample index doesn't exceed the dataset size + if sample_idx >= dataset_size: + logging.warning(f"Index {sample_idx} out of bounds for dataset {dataset_idx}. Reusing existing examples.") + sample_idx = sample_idx % dataset_size + logging.warning(f"Reusing index {sample_idx} for dataset {dataset_idx}.") + return self.datasets[dataset_idx][sample_idx] def create_data_mmap(self): @@ -85,7 +92,7 @@ class MemoryEfficientBlendableDataset(torch.utils.data.Dataset): """ A BlendableDataset implementation that uses less memory than the original implementation. Indices are computed algorithmically instead of storing them in memory. - + To test call: MemoryEfficientBlendableDataset.test_index_blending() """ diff --git a/tutorials/multimodal/NeVA Tutorial.ipynb b/tutorials/multimodal/NeVA Tutorial.ipynb index 921452ac08c0..4914ccd6fcb1 100644 --- a/tutorials/multimodal/NeVA Tutorial.ipynb +++ b/tutorials/multimodal/NeVA Tutorial.ipynb @@ -186,6 +186,7 @@ " model.mm_cfg.vision_encoder.from_hf=True \\\n", " model.optim.name=\"fused_adam\" \\\n", " exp_manager.create_checkpoint_callback=True \\\n", + " exp_manager.checkpoint_callback_params.save_nemo_on_train_end=True \\\n", " exp_manager.create_wandb_logger=False" ] }, @@ -255,6 +256,7 @@ " model.mm_cfg.vision_encoder.from_pretrained='openai/clip-vit-large-patch14' \\\n", " model.mm_cfg.vision_encoder.from_hf=True \\\n", " exp_manager.create_checkpoint_callback=True \\\n", + " exp_manager.checkpoint_callback_params.save_nemo_on_train_end=True \\\n", " exp_manager.name=\"nemo_neva_finetune\" \\\n", " model.optim.name=\"fused_adam\"" ]