Skip to content

Commit

Permalink
Add blend dataset in NeVA (NVIDIA#10000)
Browse files Browse the repository at this point in the history
* inital commit on adding blenddataset for neva

Signed-off-by: Vivian Chen <xuanzic@example.com>

* sequence packing support try 1

Signed-off-by: Vivian Chen <xuanzic@example.com>

* clean up

Signed-off-by: Vivian Chen <xuanzic@example.com>

* remove unused

Signed-off-by: Vivian Chen <xuanzic@nvidia.com>

* Apply isort and black reformatting

Signed-off-by: xuanzic <xuanzic@users.noreply.github.com>

* fix config

Signed-off-by: Vivian Chen <xuanzic@nvidia.com>

* fix based on reviews

Signed-off-by: Vivian Chen <xuanzic@nvidia.com>

* Apply isort and black reformatting

Signed-off-by: xuanzic <xuanzic@users.noreply.github.com>

* fix for neva config

Signed-off-by: Vivian Chen <xuanzic@nvidia.com>

* modify nemo config for neva tutorial

Signed-off-by: Vivian Chen <xuanzic@nvidia.com>

* address comments

Signed-off-by: Vivian Chen <xuanzic@nvidia.com>

* Apply isort and black reformatting

Signed-off-by: xuanzic <xuanzic@users.noreply.github.com>

---------

Signed-off-by: Vivian Chen <xuanzic@example.com>
Signed-off-by: Vivian Chen <xuanzic@nvidia.com>
Signed-off-by: xuanzic <xuanzic@users.noreply.github.com>
Co-authored-by: Vivian Chen <xuanzic@example.com>
Co-authored-by: xuanzic <xuanzic@users.noreply.github.com>
Co-authored-by: Yu Yao <54727607+yaoyu-33@users.noreply.github.com>
  • Loading branch information
4 people authored Aug 20, 2024
1 parent 60442c2 commit 8d2e43a
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 14 deletions.
4 changes: 1 addition & 3 deletions docs/source/multimodal/mllm/sequence_packing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,13 @@ To train with packed sequences, modify four items in the SFT/PEFT config file.

.. code-block:: bash
++model.data.data_prefix=/lustre/fsw/coreai_dlalgo_genai/datasets/LLaVA-Instruct-150K/packed_seq_12288_336_v1/packed_seq_dataset
++model.data.crop_size=[224,224]
++model.data.packed_sequence=True
2. Use the new dataset file instead of the original JSONL file and ensure the crop sizes are correctly specified since images are now cached:

.. code-block:: bash
++model.data.data_prefix=/path/to/datasets/LLaVA-Instruct-150K/packed_seq_12288_336_v1/packed_seq_dataset
++model.data.data_path=/path/to/datasets/LLaVA-Instruct-150K/packed_seq_12288_336_v1/packed_seq_dataset
++model.data.crop_size=[336,336]
4. Adjust batch sizes:
Expand Down
20 changes: 18 additions & 2 deletions examples/multimodal/multimodal_llm/neva/conf/neva_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ exp_manager:
save_top_k: 10
mode: min
always_save_nemo: False # saves nemo file during validation, not implemented for model parallel
save_nemo_on_train_end: False # not recommended when training large models on clusters with short time limits
save_nemo_on_train_end: True # not recommended when training large models on clusters with short time limits
filename: 'megatron_clip--{val_loss:.2f}-{step}-{consumed_samples}'
model_parallel_size: ${multiply:${model.tensor_model_parallel_size}, ${model.pipeline_model_parallel_size}}
ema:
Expand All @@ -60,6 +60,7 @@ model:

tensor_model_parallel_size: 1 # intra-layer model parallelism
pipeline_model_parallel_size: 1 # inter-layer model parallelism
context_parallel_size: 1 # kqv model parallelism
virtual_pipeline_model_parallel_size: null # interleaved pipeline

restore_from_path: null # used in fine-tuning
Expand Down Expand Up @@ -185,7 +186,22 @@ model:
packed_sequence: False
num_workers: 8
dataloader_type: cyclic
data_path:
data_path:
# This configuration can either be a single string pointing to a data path, or a list of data paths for data blending.
# When using a blendable dataset, be aware of the following:
# - The sampling of data across datasets depends on both the relative sizes of the datasets and the concat_sampling_probabilities.
# - For example, if there are two datasets with lengths of 100 and 10, and the sampling probabilities are set to 0.5 for each,
# then 55 samples would be taken from the dataset of length 100 and 55 from the dataset of length 10 (with repetition).
# - This means not all data might be seen in one epoch, and smaller datasets may need to be repeated to match the number of samples.
# Please adjust your concat_sampling_probabilities accordingly to ensure balanced and effective training.

# - /path/to/json
# - /path/to/json
global_batch_size: ${model.global_batch_size}
micro_batch_size: ${model.micro_batch_size}
concat_sampling_probabilities: null
# - 0.5
# - 0.5
lazy_preprocess: True
is_multimodal: True
media_type: image # currently supported: image
Expand Down
1 change: 1 addition & 0 deletions examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def main():
logging.info(f"Output directory: {output_dir}")

prefix_path = f"{output_dir}/packed_seq_dataset"
os.makedirs(prefix_path, exist_ok=True)
# Original Datasets to Sequence Lengths Files
builders = {}
for item_dict in tqdm(train_dl, desc="Building indexed datasets"):
Expand Down
9 changes: 5 additions & 4 deletions nemo/collections/multimodal/data/neva/neva_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,8 @@ def __len__(self):
return len(self.list_data_dict)

def __getitem__(self, i) -> Dict[str, torch.Tensor]:
if isinstance(i, np.integer):
i = int(i)
sources = self.list_data_dict[i]
if isinstance(i, int):
sources = [sources]
Expand Down Expand Up @@ -1190,7 +1192,6 @@ class NevaDataset(LazySupervisedDataset):
"""Dataset for supervised fine-tuning."""

def __init__(self, data_path: str, tokenizer, multimodal_cfg: dict, data_cfg: dict):

if data_path.endswith(".json"):
super(NevaDataset, self).__init__(data_path, tokenizer, multimodal_cfg, data_cfg)

Expand Down Expand Up @@ -1313,18 +1314,18 @@ def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
return batch


def make_supervised_data_module(tokenizer, image_processor, model_cfg) -> Dict:
def make_supervised_data_module(tokenizer, image_processor, model_cfg, each_file_from_path=None) -> Dict:
"""Make dataset and collator for supervised fine-tuning."""
data_cfg = model_cfg.data
mm_cfg = model_cfg.mm_cfg
add_extra_token = 1
if getattr(model_cfg, 'no_seqlen_plus_one_input_tokens', False):
add_extra_token = 0
crop_size = mm_cfg.vision_encoder.get("crop_size", (224, 224))

data_path = each_file_from_path if each_file_from_path is not None else data_cfg.data_path
train_dataset = NevaDataset(
tokenizer=tokenizer,
data_path=data_cfg.data_path,
data_path=data_path,
multimodal_cfg=dict(
is_multimodal=data_cfg.is_multimodal,
sep_image_conv_front=data_cfg.sep_image_conv_front,
Expand Down
127 changes: 124 additions & 3 deletions nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import torch
import torch.nn.functional as F
from einops import rearrange, reduce, repeat
from omegaconf.dictconfig import DictConfig
from omegaconf import DictConfig, ListConfig
from pkg_resources import packaging
from pytorch_lightning.trainer.trainer import Trainer
from transformers import CLIPVisionModel, SiglipVisionModel
Expand All @@ -38,6 +38,10 @@
MegatronCLIPModel,
)
from nemo.collections.multimodal.parts.utils import create_image_processor, load_nemo_model_weights
from nemo.collections.nlp.data.language_modeling.megatron.base_dataset_utils import (
get_datasets_weights_and_num_samples,
)
from nemo.collections.nlp.data.language_modeling.megatron.blendable_dataset import BlendableDataset
from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import MegatronPretrainingSampler
from nemo.collections.nlp.models.language_modeling.megatron.gpt_model import GPTModel
from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel, get_specs
Expand Down Expand Up @@ -1242,15 +1246,132 @@ def setup(self, stage=None):
if self.cfg.get('transformer_engine', False):
self.setup_transformer_engine_tp_groups()

def build_train_valid_test_datasets_blend(self):
logging.info('Building Blending Neva datasets.')

train_datasets = []
valid_datasets = []

data_cfg = self.cfg.data
is_packed_sequence = data_cfg.get("packed_sequence", False)

if is_packed_sequence:
assert self.cfg.micro_batch_size == 1, "Micro batch size must be 1 if using packed sequence"

# Check if concat_sampling_probabilities is properly set
if data_cfg.get('concat_sampling_probabilities') is None or not isinstance(
data_cfg.concat_sampling_probabilities, ListConfig
):
raise ValueError(
"concat_sampling_probabilities must be a ListConfig with the same number of entries as data_path."
)

if len(data_cfg.concat_sampling_probabilities) != len(data_cfg.data_path):
raise ValueError(
f"concat_sampling_probabilities must be of the same size as number of files from data path. "
f"Provided size {len(data_cfg.concat_sampling_probabilities)}, number of datasets {len(data_cfg.data_path)}"
)

for each_file_from_path in data_cfg.data_path:
if is_packed_sequence:
train_dataset = NevaPackedSeqDatatset(
each_file_from_path, self.cfg.mm_cfg.vision_encoder.get("crop_size")
)
valid_dataset = NevaPackedSeqDatatset(
each_file_from_path, self.cfg.mm_cfg.vision_encoder.get("crop_size")
)
else:
ds_dict = make_supervised_data_module(
tokenizer=self.tokenizer,
image_processor=(
self.model.module.image_processor
if hasattr(self.model, "module")
else self.model.image_processor
),
model_cfg=self.cfg,
each_file_from_path=each_file_from_path,
)
train_dataset = ds_dict["train_dataset"]
valid_dataset = ds_dict["eval_dataset"]

train_datasets.append(train_dataset)
valid_datasets.append(valid_dataset)

# Create BlendableDataset for training
if self.trainer.max_steps is None or self.trainer.max_steps <= 0:
raise ValueError(f'Trainer max_steps must be set to a positive integer. Found {self.trainer.max_steps}')

num_train_samples = self.trainer.max_steps * data_cfg.global_batch_size
_, _, num_train_samples_per_dataset = get_datasets_weights_and_num_samples(
data_prefix=[
weight for pair in zip(data_cfg.concat_sampling_probabilities, data_cfg.data_path) for weight in pair
],
num_samples=[num_train_samples],
)
num_train_samples_after_blend = sum([x[0] for x in num_train_samples_per_dataset])

logging.info(f"Number of train datasets: {len(train_datasets)}")
logging.info(f"Lengths of train datasets: {[len(ds) for ds in train_datasets]}")
logging.info(f"Number of train datasets after blending: {num_train_samples_after_blend}")

if is_packed_sequence:
num_train_samples_after_blend = sum([len(ds) for ds in train_datasets])

self._train_ds = BlendableDataset(
datasets=train_datasets, weights=data_cfg.concat_sampling_probabilities, size=num_train_samples_after_blend
)

self._validation_ds = BlendableDataset(
datasets=valid_datasets, weights=data_cfg.concat_sampling_probabilities, size=num_train_samples_after_blend
)

logging.info(f'Length of train dataset: {len(self._train_ds)}')
logging.info(f'Length of validation dataset: {len(self._validation_ds)}')

return self._train_ds, self._validation_ds

def build_train_valid_test_datasets(self):
logging.info('Building Neva datasets.')

if isinstance(self.cfg.data.data_path, (list, ListConfig)):
if len(self.cfg.data.data_path) > 1:
# Only consider data blending if there are multiple dataset paths
if self.cfg.data.get('concat_sampling_probabilities') is None:
logging.warning("No sampling probabilities provided. Defaulting to uniform sampling.")
self.cfg.data.concat_sampling_probabilities = [1 / len(self.cfg.data.data_path)] * len(
self.cfg.data.data_path
)
else:
# Normalize the sampling probabilities if they don't sum to 1
total = sum(self.cfg.data.concat_sampling_probabilities)
if total != 1:
logging.warning(f"Concat_sampling_probabilities sum to {total}. Normalizing to sum to 1.")
self.cfg.data.concat_sampling_probabilities = [
prob / total for prob in self.cfg.data.concat_sampling_probabilities
]
return self.build_train_valid_test_datasets_blend()
elif len(self.cfg.data.data_path) == 1:
if self.cfg.data.concat_sampling_probabilities is not None:
logging.warning(
"Using sampling probabilities with a single dataset has no effect. Defaulting to None and not using blend dataset."
)
self.cfg.data.concat_sampling_probabilities = None
self.cfg.data.data_path = self.cfg.data.data_path[0]
else:
raise ValueError("data_path must contain at least one valid path.")
elif isinstance(self.cfg.data.data_path, str):
pass
else:
raise TypeError("data_path must be a list of paths or a single string")

if self.cfg.data.get("packed_sequence", False):
assert self.cfg.micro_batch_size == 1, "Micro batch size must be 1 if using packed sequence"

self._train_ds = NevaPackedSeqDatatset(
self.cfg.data.data_prefix, self.cfg.mm_cfg.vision_encoder.get("crop_size")
self.cfg.data.data_path, self.cfg.mm_cfg.vision_encoder.get("crop_size")
)
self._validation_ds = NevaPackedSeqDatatset(
self.cfg.data.data_prefix, self.cfg.mm_cfg.vision_encoder.get("crop_size")
self.cfg.data.data_path, self.cfg.mm_cfg.vision_encoder.get("crop_size")
)
else:
ds_dict = make_supervised_data_module(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

class BlendableDataset(torch.utils.data.Dataset):
def __init__(self, datasets, weights, size):

self.datasets = datasets
num_datasets = len(datasets)
assert num_datasets == len(weights)
Expand All @@ -43,6 +42,7 @@ def __init__(self, datasets, weights, size):
assert num_datasets < 255
self.dataset_index = np.zeros(self.size, dtype=np.uint8)
self.dataset_sample_index = np.zeros(self.size, dtype=np.int64)

app_state = AppState()
try:
if app_state.local_rank == 0:
Expand Down Expand Up @@ -74,6 +74,13 @@ def __len__(self):
def __getitem__(self, idx):
dataset_idx = self.dataset_index[idx]
sample_idx = self.dataset_sample_index[idx]
dataset_size = len(self.datasets[dataset_idx])
# Ensure the sample index doesn't exceed the dataset size
if sample_idx >= dataset_size:
logging.warning(f"Index {sample_idx} out of bounds for dataset {dataset_idx}. Reusing existing examples.")
sample_idx = sample_idx % dataset_size
logging.warning(f"Reusing index {sample_idx} for dataset {dataset_idx}.")

return self.datasets[dataset_idx][sample_idx]

def create_data_mmap(self):
Expand All @@ -85,7 +92,7 @@ class MemoryEfficientBlendableDataset(torch.utils.data.Dataset):
"""
A BlendableDataset implementation that uses less memory than the original implementation.
Indices are computed algorithmically instead of storing them in memory.
To test call: MemoryEfficientBlendableDataset.test_index_blending()
"""

Expand Down
2 changes: 2 additions & 0 deletions tutorials/multimodal/NeVA Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
" model.mm_cfg.vision_encoder.from_hf=True \\\n",
" model.optim.name=\"fused_adam\" \\\n",
" exp_manager.create_checkpoint_callback=True \\\n",
" exp_manager.checkpoint_callback_params.save_nemo_on_train_end=True \\\n",
" exp_manager.create_wandb_logger=False"
]
},
Expand Down Expand Up @@ -255,6 +256,7 @@
" model.mm_cfg.vision_encoder.from_pretrained='openai/clip-vit-large-patch14' \\\n",
" model.mm_cfg.vision_encoder.from_hf=True \\\n",
" exp_manager.create_checkpoint_callback=True \\\n",
" exp_manager.checkpoint_callback_params.save_nemo_on_train_end=True \\\n",
" exp_manager.name=\"nemo_neva_finetune\" \\\n",
" model.optim.name=\"fused_adam\""
]
Expand Down

0 comments on commit 8d2e43a

Please sign in to comment.