Skip to content

Commit

Permalink
Merge branch 'main' into rwkv-tp
Browse files Browse the repository at this point in the history
  • Loading branch information
Quentin-Anthony committed Dec 19, 2024
2 parents daac503 + f7a5a6f commit bf478ce
Show file tree
Hide file tree
Showing 22 changed files with 1,888 additions and 190 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ GPT-NeoX leverages many of the same features and technologies as the popular Meg
* Easy connections with the open source ecosystem, including Hugging Face's [tokenizers](https://github.com/huggingface/tokenizers) and [transformers](https://github.com/huggingface/transformers/) libraries, monitor experiments via [WandB](https://wandb.ai/site)/[Comet](https://www.comet.com/site/)/TensorBoard, and evaluation via our [Language Model Evaluation Harness](https://github.com/EleutherAI/lm-evaluation-harness).

## News
**[10/9/2024]** We now support [Transformer Engine](https://github.com/NVIDIA/TransformerEngine) integration

**[9/9/2024]** We now support preference learning via [DPO](https://arxiv.org/abs/2305.18290), [KTO](https://arxiv.org/abs/2402.01306), and reward modeling

**[9/9/2024]** We now support integration with [Comet ML](https://www.comet.com/site/), a machine learning monitoring platform
Expand Down Expand Up @@ -60,6 +62,7 @@ Prior to 3/9/2023, GPT-NeoX relied on [DeeperSpeed](https://github.com/EleutherA
* [Environment and Dependencies](#environment-and-dependencies)
+ [Host Setup](#host-setup)
+ [Flash Attention](#flash-attention)
+ [Transformer Engine](#transformer-engine)
+ [Multi-Node Launching](#multi-node-launching)
+ [Containerized Setup](#containerized-setup)
* [Usage](#usage)
Expand Down Expand Up @@ -130,7 +133,20 @@ This will automatically adapts building process over different GPU vendors (AMD,

### Flash Attention

To use [Flash-Attention](https://github.com/HazyResearch/flash-attention), install the additional dependencies in `./requirements/requirements-flashattention.txt` and set the attention type in your configuration accordingly (see [configs](./configs/)). This can provide significant speed-ups over regular attention on certain GPU architectures, including Ampere GPUs (such as A100s); see the repository for more details.
To use [Flash-Attention](https://github.com/HazyResearch/flash-attention), install the additional dependencies in `./requirements/requirements-flashattention.txt` or use a PyTorch NGC container with it pre-installed (note that functionality is not guaranteed using versions different from our requirements file). Then set the attention type in your configuration accordingly (see [configs](./configs/)). This can provide significant speed-ups over regular attention on certain GPU architectures, including Ampere GPUs (such as A100s); see the repository for more details.

### Transformer Engine

To use [Transformer Engine (TE)](https://github.com/NVIDIA/TransformerEngine), install the additional dependencies in `./requirements/requirements-transformer-engine.txt` or use a PyTorch NGC container with it pre-installed (note that functionality is not guaranteed using versions different from our requirements file). See [this config](https://github.com/EleutherAI/gpt-neox/configs/1-3B-transformer-engine.yml) for an example of using TE on a 1.3B model. This can provide significant speed-ups over regular attention on certain GPU architectures, including Ampere and Hopper GPUs; see the repository for more details.


TE provides very efficient kernels for both A100 and H100 GPUs. We've run some sample ablations on A100:



and H100:




### Multi-Node Launching
Expand Down
105 changes: 105 additions & 0 deletions configs/1-3B-transformer-engine.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# GPT-2 pretraining setup
{
# parallelism settings ( you will want to change these based on your cluster setup, ideally scheduling pipeline stages
# across the node boundaries )
"pipe_parallel_size": 1,
"model_parallel_size": 1,

# model settings
"num_layers": 24,
"hidden_size": 2048,
"num_attention_heads": 16,
"seq_length": 2048,
"max_position_embeddings": 2048,
"norm": "layernorm",
"pos_emb": "rotary",
"no_weight_tying": true,
"gpt_j_residual": false,
"output_layer_parallelism": "column",

# Transformer Engine settings
"te_columnparallel": false,
"te_rowparallel": false,
"te_layernorm_mlp": true,
"te_mha": true,
"te_fp8_format": "hybrid",
"te_fp8_wgrad": true,
"te_fp8_amax_history_len": 1,
"te_fp8_amax_compute_algo": "most_recent",
"te_fp8_margin": 0,
"te_fp8_mha": false,

# these should provide some speedup but takes a while to build, set to true if desired
"scaled_upper_triang_masked_softmax_fusion": false,
"bias_gelu_fusion": false,
"rope_fusion": false,
"layernorm_fusion": false,

# init methods
"init_method": "small_init",
"output_layer_init_method": "wang_init",

# optimizer settings
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.0002,
"betas": [0.9, 0.95],
"eps": 1.0e-8,
}
},
"min_lr": 0.00002,

# for all zero_optimization options, see https://www.deepspeed.ai/docs/config-json/#zero-optimizations-for-fp16-training
"zero_optimization": {
"stage": 1,
"allgather_partitions": True,
"allgather_bucket_size": 500000000,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 500000000,
"contiguous_gradients": True,
},

# batch / data settings
"train_micro_batch_size_per_gpu": 4,
"data_impl": "mmap",

# activation checkpointing
"checkpoint_activations": true,
"checkpoint_num_layers": 1,
"partition_activations": true,
"synchronize_each_layer": true,

# regularization
"gradient_clipping": 1.0,
"weight_decay": 0.1,
"hidden_dropout": 0,
"attention_dropout": 0,

# precision settings
"fp16": {
"fp16": true,
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},

# misc. training settings
"train_iters": 320000,
"lr_decay_iters": 320000,
"distributed_backend": "nccl",
"lr_decay_style": "cosine",
"warmup": 0.01,
"checkpoint_factor": 10000,
"eval_interval": 1000,
"eval_iters": 10,

# logging
"log_interval": 100,
"steps_per_print": 10,
"keep_last_n_checkpoints": 4,
"wall_clock_breakdown": true,
}
1 change: 1 addition & 0 deletions configs/eleutherai_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"tensorboard_dir": "/mnt/ssd-1/tensorboard",
"log_dir": "/mnt/ssd-1/logs",
"wandb_team": "eleutherai",
#"wandb_run_name": "experiment"
"wandb_project": "neox",
"wandb_group": "example"
}
52 changes: 51 additions & 1 deletion megatron/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from megatron.data.blendable_dataset import BlendableDataset
from megatron.data.gpt2_dataset import GPT2Dataset
from megatron.data.pairwise_dataset import PairwiseDataset
from megatron.data.online_dataset import OnlineDataset
from megatron.data.samplers import DistributedBatchSampler


Expand Down Expand Up @@ -532,7 +533,56 @@ def build_train_valid_test_data_loaders(neox_args):
pipe_load = True

# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0 and pipe_load:
if (
pipe_load
and (neox_args.dataset_impl == "online")
and (mpu.get_model_parallel_rank() == 0)
):
# Can skip most of the work...
train_iters = neox_args.train_iters
eval_iters = (train_iters // neox_args.eval_interval + 1) * neox_args.eval_iters
test_iters = neox_args.eval_iters
# Build datasets...
print(
f"train_iters: {train_iters}, eval_iters: {eval_iters}, test_iters: {test_iters}"
)
train_datasets = OnlineDataset(
leave_one_out=neox_args.reinforce_leave_one_out,
data_split="train",
num_samples=train_iters * neox_args.train_batch_size,
seq_length=neox_args.seq_length,
dataserver_ips=neox_args.online_dataserver_ips,
dataserver_ports=neox_args.online_dataserver_ports,
)
valid_datasets = OnlineDataset(
leave_one_out=neox_args.reinforce_leave_one_out,
data_split="valid",
num_samples=eval_iters * neox_args.train_batch_size,
seq_length=neox_args.seq_length,
dataserver_ips=neox_args.online_dataserver_ips,
dataserver_ports=neox_args.online_dataserver_ports,
)
test_datasets = OnlineDataset(
leave_one_out=neox_args.reinforce_leave_one_out,
data_split="test",
num_samples=test_iters * neox_args.train_batch_size,
seq_length=neox_args.seq_length,
dataserver_ips=neox_args.online_dataserver_ips,
dataserver_ports=neox_args.online_dataserver_ports,
)
# print length of datasets
# Build dataloders.
train_dataloader = make_data_loader(train_datasets, neox_args=neox_args)
valid_dataloader = make_data_loader(valid_datasets, neox_args=neox_args)
test_dataloader = make_data_loader(test_datasets, neox_args=neox_args)

# Flags to know if we need to do training/validation/testing.
do_train = train_dataloader is not None and neox_args.train_iters > 0
do_valid = valid_dataloader is not None and neox_args.eval_iters > 0
do_test = test_dataloader is not None and neox_args.eval_iters > 0
# Need to broadcast num_tokens and num_type_tokens.
flags = torch.cuda.LongTensor([int(do_train), int(do_valid), int(do_test)])
elif mpu.get_model_parallel_rank() == 0 and pipe_load:
# Number of train/valid/test samples.
if neox_args.train_iters is not None:
train_iters = neox_args.train_iters
Expand Down
128 changes: 128 additions & 0 deletions megatron/data/online_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (c) 2024, EleutherAI
# This file is based on code by the authors denoted below and has been modified from its original version.
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Online dataset."""
from typing import Union, List

import numpy as np
import torch
import torch.utils.data
import socket
import pickle
from megatron.mpu.initialize import get_data_parallel_rank


class OnlineDataset(torch.utils.data.Dataset):
def __init__(
self,
num_samples,
seq_length,
leave_one_out=False,
data_split="train",
dataserver_ips: Union[str, List[str]] = "localhost",
dataserver_ports: Union[int, List[int]] = 10000,
):
self.num_samples = num_samples
self.global_rank = get_data_parallel_rank()
self.leave_one_out = leave_one_out
self.reward_buffer = []
self.online_batching_data = []
self.data_split = data_split
self.seq_length = seq_length
self.dataserver_ips = dataserver_ips
self.dataserver_ports = dataserver_ports

def __len__(self):
# dummy value since it's decided by the Online Trainer
return self.num_samples

def update_online_batches(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if isinstance(self.dataserver_ips, str):
ipaddr = self.dataserver_ips
else:
ipaddr = self.dataserver_ips[self.global_rank]
if isinstance(self.dataserver_ports, int):
# simply add over the global rank
port = self.dataserver_ports
else:
# in case we want to use different ports for different ranks, e.g. per machine sampling
port = self.dataserver_ports[self.global_rank]
print(f"Connecting to {ipaddr}:{port}")
s.connect((ipaddr, port))
s.send(self.data_split.encode())
data = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
data += chunk
batch_data = pickle.loads(data)
s.close()
print(f"Received {len(batch_data)} samples from the server.")
for data in batch_data:
if self.leave_one_out:
rewards = list()
for i in range(len(data["rewards"])):
rewards.append(
data["rewards"][i]
- np.mean(
[
data["rewards"][j]
for j in range(len(data["rewards"]))
if j != i
]
)
)
data["raw_rewards"] = data["rewards"]
data["rewards"] = rewards
else:
moving_average = 0
if len(self.reward_buffer) > 0:
moving_average = np.mean(self.reward_buffer)
self.reward_buffer.append(np.mean(data["rewards"]))
if len(self.reward_buffer) > 100:
self.reward_buffer.pop(0)
# For metrics...
data["raw_rewards"] = data["rewards"]
data["rewards"] = [r - moving_average for r in data["rewards"]]
for i in range(len(data["completions"])):
self.online_batching_data.append(
[
data["prefix"],
data["completions"][i],
data["rewards"][i],
data["raw_rewards"][i],
]
)

def __getitem__(self, idx):
if len(self.online_batching_data) == 0:
self.update_online_batches()
batch = self.online_batching_data.pop(0)
text = batch[0] + batch[1]
label = [-100 for _ in batch[0]] + batch[1]
# +1 because of causal masking
if len(text) <= self.seq_length:
text = text + [0] * ((self.seq_length + 1) - len(text))
label = label + [-100] * ((self.seq_length + 1) - len(label))
return {
"text": np.array(text, dtype=np.int64),
"label": np.array(label, dtype=np.int64),
"reward": np.array([batch[2]], dtype=np.float32),
"raw_reward": np.array([batch[3]], dtype=np.float32),
}
Loading

0 comments on commit bf478ce

Please sign in to comment.