Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync layer norm #271

Draft
wants to merge 38 commits into
base: thomas/test_different_layer_norm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
07ccb3d
Better
thomasw21 Mar 24, 2022
391ed48
Force synchronize the layer norms parameters across all TP
thomasw21 Mar 24, 2022
98d0e7c
import mpu
stas00 Mar 25, 2022
279a77e
use the bf16 branch for testing
stas00 Mar 25, 2022
87a9dba
`torch.testing.assert_equal` didn't make it (#273)
stas00 Mar 25, 2022
dbb5914
Merge remote-tracking branch 'origin/main' into thomas/fix_layer_norm
stas00 Mar 25, 2022
70f91f8
bf16 comms requite pt-1.11
stas00 Mar 25, 2022
835a3e5
already part of the function
stas00 Mar 25, 2022
37795a9
reproduce the crashing on resume
stas00 Mar 25, 2022
3ec65f7
run just the test we want for now
stas00 Mar 25, 2022
8271d41
all_reduce is an in_place operation
thomasw21 Mar 25, 2022
b418b47
Make a test that TP reshaping works
thomasw21 Mar 25, 2022
4b7207b
Woops
thomasw21 Mar 25, 2022
3bc5824
Woops
thomasw21 Mar 25, 2022
05c99db
Woops
thomasw21 Mar 25, 2022
55e10c6
Woops
thomasw21 Mar 25, 2022
2ab8a3a
Woops
thomasw21 Mar 25, 2022
d357839
Woops
thomasw21 Mar 25, 2022
5fb231c
Woops
thomasw21 Mar 25, 2022
cc7ff45
Woops
thomasw21 Mar 25, 2022
7cdb1be
Woops
thomasw21 Mar 25, 2022
4574ec9
Fix load issue
thomasw21 Mar 25, 2022
04e89d1
Woops
thomasw21 Mar 25, 2022
e943100
Fix checkpoint path
thomasw21 Mar 25, 2022
09cead3
Test that force sync will allow TP changes
thomasw21 Mar 25, 2022
77abee6
Nit
thomasw21 Mar 25, 2022
64a62c8
Now that we have a force sync mechanism, let's try to reproduce
thomasw21 Mar 29, 2022
0b7afcc
Compare model_states_rank
thomasw21 Mar 29, 2022
ce01733
test
thomasw21 Mar 29, 2022
89ab0b7
Row column bias should be synchronized as well
thomasw21 Mar 29, 2022
42997b2
New list of matching embeddings
thomasw21 Mar 29, 2022
e0ef168
Figure out why state differs
thomasw21 Mar 29, 2022
1fc4fe8
Test for final weight
thomasw21 Mar 29, 2022
7ebbed1
Test that torch_rng_state
thomasw21 Mar 29, 2022
2c49216
Fix non matching torch_rng_state for tp_rank=0
thomasw21 Mar 30, 2022
007ecb4
Update test
thomasw21 Mar 31, 2022
c3844b5
I'm surprised one can apply inplace operation here
thomasw21 Mar 31, 2022
189f054
Test out the loss from the fp32 weights and optimizer states
thomasw21 Apr 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ jobs:
pip install pytest-timeout

- name: Run tests
run: pytest --timeout=600 tests
# run: pytest --timeout=600 tests
# run just the test we want for now
run: pytest --timeout=600 tests/test_training.py::MegDSTestTraining::test_layer_norm_consistent_0_bf16

stop-runner:
name: Stop self-hosted EC2 runner
Expand Down
1 change: 1 addition & 0 deletions megatron/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ def _add_distributed_args(parser):
group.add_argument('--use-cpu-initialization', action='store_true',
default=None, help='If set, affine parallel weights '
'initialization uses CPU' )
group.add_argument('--force-sync-layer-norm-parameters', action="store_true")
return parser


Expand Down
63 changes: 63 additions & 0 deletions megatron/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,69 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler):
# Trim off the filename and mp_rank_* directory.
for _ in range(3):
checkpoint_name = os.path.dirname(checkpoint_name)

# Debug
layer_norms_params_end_with = [
"word_embeddings.norm.weight", "word_embeddings.norm.bias",
"input_layernorm.weight", "input_layernorm.bias",
"post_attention_layernorm.weight", "post_attention_layernorm.bias",
"self_attention.dense.bias", "mlp.dense_4h_to_h.bias",
]
for n,p in model[0].named_parameters():
# Here is how you can access fp32 version of the bf16 param and fp32 optim states
#
# Note that there is an all_reduce called on all dp ranks when `get_full_hp_param` is called -
# so it's not free
#
# a. fp32 param
for end in layer_norms_params_end_with:
if n.endswith(end):
fp32_param = p.get_full_hp_param()

fp32_params_acculumator = [
torch.zeros_like(fp32_param)
for _ in range(mpu.get_tensor_model_parallel_world_size())
]
torch.distributed.gather(
fp32_param,
fp32_params_acculumator,
dst=0,
group=mpu.get_tensor_model_parallel_group()
)
if mpu.get_tensor_model_parallel_rank() == 0:
square = torch.tensor([
[
torch.max(torch.abs(c1 - c2))
for c2 in fp32_params_acculumator
] for c1 in fp32_params_acculumator
])
print(f"Parameter name = {n}")
print(square)

# b. fp32 optim states
for key in ['exp_avg', 'exp_avg_sq']:
full_optim_state = p.get_full_hp_param(optim_state_key=key)

full_optim_state_acculumator = [
torch.zeros_like(fp32_param)
for _ in range(mpu.get_tensor_model_parallel_world_size())
]
torch.distributed.gather(
full_optim_state,
full_optim_state_acculumator,
dst=0,
group=mpu.get_tensor_model_parallel_group()
)
if mpu.get_tensor_model_parallel_rank() == 0:
square = torch.tensor([
[
torch.max(torch.abs(c1 - c2))
for c2 in full_optim_state_acculumator
] for c1 in full_optim_state_acculumator
])
print(f"Optimizer state: parameter name = {n}, key = {key}")
print(square)

model[0].save_checkpoint(checkpoint_name, client_state=state_dict)

# Wait so everyone is done (necessary)
Expand Down
1 change: 1 addition & 0 deletions megatron/data/data_samplers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def build_pretraining_data_loader(dataset, consumed_samples):
return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler,
num_workers=args.num_workers,
generator=torch.Generator().manual_seed(args.seed),
pin_memory=True)

class MegatronPretrainingSampler:
Expand Down
29 changes: 15 additions & 14 deletions megatron/model/fused_layer_norm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import numbers
import torch
from megatron import mpu
from megatron import mpu, get_args
from torch.nn.parameter import Parameter
from torch.nn import init
import importlib
from megatron import mpu

global fused_mix_prec_layer_norm_cuda
fused_mix_prec_layer_norm_cuda = None
Expand Down Expand Up @@ -63,6 +64,7 @@ class MixedFusedLayerNorm(torch.nn.Module):

def __init__(self, normalized_shape, eps=1e-5):
super(MixedFusedLayerNorm, self).__init__()
args = get_args()

global fused_mix_prec_layer_norm_cuda
fused_mix_prec_layer_norm_cuda = importlib.import_module(
Expand All @@ -75,6 +77,7 @@ def __init__(self, normalized_shape, eps=1e-5):
self.weight = Parameter(torch.Tensor(*normalized_shape))
self.bias = Parameter(torch.Tensor(*normalized_shape))
self.reset_parameters()
self.force_sync_layer_norm_parameters = args.force_sync_layer_norm_parameters


def reset_parameters(self):
Expand All @@ -84,19 +87,17 @@ def reset_parameters(self):


def forward(self, input):
weights = [torch.empty_like(self.weight) for tp in range(mpu.get_tensor_model_parallel_world_size())]
torch.distributed.all_gather(weights, self.weight, group=mpu.get_tensor_model_parallel_group())
biases = [torch.empty_like(self.bias) for tp in range(mpu.get_tensor_model_parallel_world_size())]
torch.distributed.all_gather(biases, self.bias, group=mpu.get_tensor_model_parallel_group())
if any(torch.any(weight != self.weight) for weight in weights):
if mpu.get_tensor_model_parallel_rank() == 0:
print("Weight sync failed")
print(weights)
if any(torch.any(bias != self.bias) for bias in biases):
if mpu.get_tensor_model_parallel_rank() == 0:
print("Bias sync failed")
print(biases)
if self.force_sync_layer_norm_parameters:
tp_world_size = mpu.get_tensor_model_parallel_world_size()
# TODO: hack in order to synchronize all layer norms despite them being unsynched
weight = torch.clone(self.weight)
bias = torch.clone(self.bias)
weight = mpu.reduce_from_tensor_model_parallel_region(weight) / tp_world_size
bias = mpu.reduce_from_tensor_model_parallel_region(bias) / tp_world_size
else:
weight = self.weight
bias = self.bias

return FusedLayerNormAffineFunction.apply(
input, self.weight, self.bias, self.normalized_shape,self.eps)
input, weight, bias, self.normalized_shape,self.eps)

2 changes: 1 addition & 1 deletion megatron/mpu/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def forward(self, input_):
self.sparse)
# Mask the output embedding.
if self.tensor_model_parallel_size > 1:
output_parallel[input_mask, :] = 0.0
output_parallel = output_parallel.masked_fill(input_mask[..., None], 0.0)
# Reduce across all the model parallel GPUs.
output = reduce_from_tensor_model_parallel_region(output_parallel)

Expand Down
8 changes: 4 additions & 4 deletions megatron/testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ def get_gpu_count():
return 0

def torch_assert_equal(actual, expected, **kwargs):
# assert_equal was added around pt-1.9, it does better checks - e.g will check dimensions match
if hasattr(torch.testing, "assert_equal"):
return torch.testing.assert_equal(actual, expected, **kwargs)
# assert_close was added around pt-1.9, it does better checks - e.g will check dimensions match
if hasattr(torch.testing, "assert_close"):
return torch.testing.assert_close(actual, expected, rtol=0.0, atol=0.0, **kwargs)
else:
return torch.allclose(actual, expected, rtol=0.0, atol=0.0)

Expand Down Expand Up @@ -886,4 +886,4 @@ def flatten_arguments(args):
Example: {"arg1": "value1", "arg2": "value2"} -> ["IGNORED", "arg1", "value1", "arg2", "value2"]
"""
return ["IGNORED"] + [item for key_value in args.items() for item in key_value if item != ""]
return ["IGNORED"] + [item for key_value in args.items() for item in key_value if item != ""]
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ pybind11
regex
six
tensorboard
torch>=1.7
torch>=1.11
transformers
DeepSpeed @ git+https://github.com/microsoft/DeepSpeed.git
# for now using this branch for bf16 work
DeepSpeed @ git+https://github.com/microsoft/DeepSpeed.git@olruwase/bf16-updates
# versions from HF transformers
black==21.4b0
isort>=5.5.4
Loading