Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

separate dataloader generator from sampler generator #789

Merged

Conversation

pacman100
Copy link
Contributor

@pacman100 pacman100 commented Oct 25, 2022

What does this PR do?

  1. Fixes [Feature Request] Split sampler.generator and loader.generator to leave loader's generator unsynchronized by default. #786

Example script for testing the updated code:

import torch
from torch.utils.data import default_collate,default_convert, Dataset, DataLoader, BatchSampler, RandomSampler, SequentialSampler
import numpy
import accelerate
from accelerate import Accelerator
from accelerate.utils import set_seed
from accelerate.utils.random import synchronize_rng_state
from accelerate.utils.dataclasses import DistributedType, RNGType
import os
import sys
import platform

class MyDataset(Dataset):
    def __len__(self):
        return 22
    
    def __getitem__(self, index):
#         print("MyDataset __getitem__", index)

        squeeze = False

        if isinstance(index, int):
            index = [index]
            squeeze = True
        elif isinstance(index, slice):
            index = list(range(*index.indices(self.size)))
        else:
            index = list(index)

        batch = [{"index": i, "label": i % 2, "random_augmentation": torch.rand(1).item()} for i in index]
#         print(batch)

        if squeeze:
            batch = batch[0]

        return batch
    

if __name__ == "__main__":
    
    num_workers = 4
    dataset = MyDataset()
    accelerator = Accelerator()
    torch.manual_seed(accelerator.process_index)
    
    accelerator.print("Starting conventional Dataloader with shuffle=False. Eval mode in general")
    loader = DataLoader(dataset, shuffle=False, batch_size=4, num_workers=num_workers)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending conventional Dataloader with shuffle=False. Eval mode in general")
    accelerator.print()
    accelerator.print()
    
    accelerator.print("Starting conventional Dataloader with shuffle=True")
    loader = DataLoader(dataset, shuffle=True, batch_size=4, num_workers=num_workers)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending conventional Dataloader with shuffle=True")
    accelerator.print()
    accelerator.print()
    
    accelerator.print("Starting Dataloader with batch_sampler=BatchSampler()")
    sampler = BatchSampler(RandomSampler(dataset), batch_size=4, drop_last=False)
    loader = DataLoader(dataset, batch_sampler=sampler, num_workers=num_workers)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending Dataloader with batch_sampler=BatchSampler()")
    accelerator.print()
    accelerator.print()
    
    accelerator.print("Starting Dataloader with sampler=BatchSampler()")
    sampler = BatchSampler(RandomSampler(dataset), batch_size=4, drop_last=False)
    loader = DataLoader(dataset, sampler=sampler, batch_size=None, collate_fn=default_collate, num_workers=num_workers)
    loader = accelerator.prepare(loader)
    all_examples = []
    for i, batch in enumerate(loader):
        print(f"{accelerator.process_index} | batch #{i} = {batch}")
        index, label = accelerator.gather_for_metrics((batch["index"], batch["label"]))
        all_examples.extend(index.detach().cpu().numpy().tolist())
        accelerator.print(f"{accelerator.process_index} | gathered batch #{i} | index = {index}, label = {label}")
    accelerator.print(f"{sorted(all_examples)=}")
    accelerator.print("Ending Dataloader with sampler=BatchSampler()")

Output logs (num_workers=4 and pay attention to the random_augmentation data which isn't same across GPUs, hence solving the issue mentioned):

accelerate launch --multi_gpu --num_processes 2 temp/dataset_issue_2.py 
The following values were not passed to `accelerate launch` and had defaults used instead:
	`--num_machines` was set to a value of `1`
	`--mixed_precision` was set to a value of `'no'`
To avoid this warning pass in values for each of the problematic parameters or run `accelerate config`.
Starting conventional Dataloader with shuffle=False. Eval mode in general
1 | batch #0 = {'index': tensor([4, 5, 6, 7], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1'), 'random_augmentation': tensor([0.5113, 0.2310, 0.6590, 0.7075], device='cuda:1', dtype=torch.float64)}
0 | batch #0 = {'index': tensor([0, 1, 2, 3], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0'), 'random_augmentation': tensor([0.7821, 0.0536, 0.9888, 0.1949], device='cuda:0', dtype=torch.float64)}
0 | gathered batch #0 | index = tensor([0, 1, 2, 3, 4, 5, 6, 7], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1, 0, 1], device='cuda:0')
1 | batch #1 = {'index': tensor([12, 13, 14, 15], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1'), 'random_augmentation': tensor([0.3023, 0.8598, 0.0777, 0.6678], device='cuda:1', dtype=torch.float64)}
0 | batch #1 = {'index': tensor([ 8,  9, 10, 11], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0'), 'random_augmentation': tensor([0.6938, 0.2980, 0.1669, 0.2847], device='cuda:0', dtype=torch.float64)}
0 | gathered batch #1 | index = tensor([ 8,  9, 10, 11, 12, 13, 14, 15], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1, 0, 1], device='cuda:0')
0 | batch #2 = {'index': tensor([16, 17, 18, 19], device='cuda:0'), 'label': tensor([0, 1, 0, 1], device='cuda:0'), 'random_augmentation': tensor([0.6540, 0.2994, 0.2798, 0.5160], device='cuda:0', dtype=torch.float64)}
1 | batch #2 = {'index': tensor([20, 21,  0,  1], device='cuda:1'), 'label': tensor([0, 1, 0, 1], device='cuda:1'), 'random_augmentation': tensor([0.4550, 0.6276, 0.2961, 0.1046], device='cuda:1', dtype=torch.float64)}
0 | gathered batch #2 | index = tensor([16, 17, 18, 19, 20, 21], device='cuda:0'), label = tensor([0, 1, 0, 1, 0, 1], device='cuda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending conventional Dataloader with shuffle=False. Eval mode in general


Starting conventional Dataloader with shuffle=True
1 | batch #0 = {'index': tensor([13,  4, 21, 11], device='cuda:1'), 'label': tensor([1, 0, 1, 1], device='cuda:1'), 'random_augmentation': tensor([0.6874, 0.2763, 0.0351, 0.2464], device='cuda:1', dtype=torch.float64)}
0 | batch #0 = {'index': tensor([10, 16, 19, 15], device='cuda:0'), 'label': tensor([0, 0, 1, 1], device='cuda:0'), 'random_augmentation': tensor([0.7514, 0.5174, 0.8544, 0.2775], device='cuda:0', dtype=torch.float64)}
0 | gathered batch #0 | index = tensor([10, 16, 19, 15, 13,  4, 21, 11], device='cuda:0'), label = tensor([0, 0, 1, 1, 1, 0, 1, 1], device='cuda:0')
0 | batch #1 = {'index': tensor([ 0, 14,  2,  9], device='cuda:0'), 'label': tensor([0, 0, 0, 1], device='cuda:0'), 'random_augmentation': tensor([0.1679, 0.6141, 0.1338, 0.5165], device='cuda:0', dtype=torch.float64)}
1 | batch #1 = {'index': tensor([ 1, 20,  6,  8], device='cuda:1'), 'label': tensor([1, 0, 0, 0], device='cuda:1'), 'random_augmentation': tensor([0.1724, 0.2674, 0.2993, 0.4871], device='cuda:1', dtype=torch.float64)}
0 | gathered batch #1 | index = tensor([ 0, 14,  2,  9,  1, 20,  6,  8], device='cuda:0'), label = tensor([0, 0, 0, 1, 1, 0, 0, 0], device='cuda:0')
1 | batch #2 = {'index': tensor([ 5, 18, 10, 16], device='cuda:1'), 'label': tensor([1, 0, 0, 0], device='cuda:1'), 'random_augmentation': tensor([0.6530, 0.2832, 0.4021, 0.6125], device='cuda:1', dtype=torch.float64)}
0 | batch #2 = {'index': tensor([ 3, 12,  7, 17], device='cuda:0'), 'label': tensor([1, 0, 1, 1], device='cuda:0'), 'random_augmentation': tensor([0.3018, 0.2774, 0.3048, 0.7736], device='cuda:0', dtype=torch.float64)}
0 | gathered batch #2 | index = tensor([ 3, 12,  7, 17,  5, 18], device='cuda:0'), label = tensor([1, 0, 1, 1, 1, 0], device='cuda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending conventional Dataloader with shuffle=True


Starting Dataloader with batch_sampler=BatchSampler()
1 | batch #0 = {'index': tensor([ 8,  6,  9, 13], device='cuda:1'), 'label': tensor([0, 0, 1, 1], device='cuda:1'), 'random_augmentation': tensor([0.5405, 0.7326, 0.7492, 0.5146], device='cuda:1', dtype=torch.float64)}
0 | batch #0 = {'index': tensor([10,  0, 19, 15], device='cuda:0'), 'label': tensor([0, 0, 1, 1], device='cuda:0'), 'random_augmentation': tensor([0.1316, 0.9379, 0.6814, 0.5280], device='cuda:0', dtype=torch.float64)}
0 | gathered batch #0 | index = tensor([10,  0, 19, 15,  8,  6,  9, 13], device='cuda:0'), label = tensor([0, 0, 1, 1, 0, 0, 1, 1], device='cuda:0')
0 | batch #1 = {'index': tensor([21, 14,  1, 20], device='cuda:0'), 'label': tensor([1, 0, 1, 0], device='cuda:0'), 'random_augmentation': tensor([0.1859, 0.3687, 0.4874, 0.2116], device='cuda:0', dtype=torch.float64)}
1 | batch #1 = {'index': tensor([16, 18,  3,  4], device='cuda:1'), 'label': tensor([0, 0, 1, 0], device='cuda:1'), 'random_augmentation': tensor([0.3968, 0.0916, 0.2149, 0.6062], device='cuda:1', dtype=torch.float64)}
0 | gathered batch #1 | index = tensor([21, 14,  1, 20, 16, 18,  3,  4], device='cuda:0'), label = tensor([1, 0, 1, 0, 0, 0, 1, 0], device='cuda:0')
0 | batch #2 = {'index': tensor([ 7, 17, 11,  5], device='cuda:0'), 'label': tensor([1, 1, 1, 1], device='cuda:0'), 'random_augmentation': tensor([0.4991, 0.8987, 0.1741, 0.5618], device='cuda:0', dtype=torch.float64)}
1 | batch #2 = {'index': tensor([12,  2, 10,  0], device='cuda:1'), 'label': tensor([0, 0, 0, 0], device='cuda:1'), 'random_augmentation': tensor([0.5458, 0.5847, 0.9402, 0.8402], device='cuda:1', dtype=torch.float64)}
0 | gathered batch #2 | index = tensor([ 7, 17, 11,  5, 12,  2], device='cuda:0'), label = tensor([1, 1, 1, 1, 0, 0], device='cuda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending Dataloader with batch_sampler=BatchSampler()


Starting Dataloader with sampler=BatchSampler()
1 | batch #0 = {'index': tensor([ 9, 17,  8,  0], device='cuda:1'), 'label': tensor([1, 1, 0, 0], device='cuda:1'), 'random_augmentation': tensor([0.4054, 0.3933, 0.0712, 0.4824], device='cuda:1', dtype=torch.float64)}
0 | batch #0 = {'index': tensor([15,  7, 21, 20], device='cuda:0'), 'label': tensor([1, 1, 1, 0], device='cuda:0'), 'random_augmentation': tensor([0.4614, 0.6065, 0.3889, 0.5783], device='cuda:0', dtype=torch.float64)}
0 | gathered batch #0 | index = tensor([15,  7, 21, 20,  9, 17,  8,  0], device='cuda:0'), label = tensor([1, 1, 1, 0, 1, 1, 0, 0], device='cuda:0')
0 | batch #1 = {'index': tensor([ 5,  6,  1, 13], device='cuda:0'), 'label': tensor([1, 0, 1, 1], device='cuda:0'), 'random_augmentation': tensor([0.2467, 0.7869, 0.0868, 0.5347], device='cuda:0', dtype=torch.float64)}
1 | batch #1 = {'index': tensor([ 3,  4, 16,  2], device='cuda:1'), 'label': tensor([1, 0, 0, 0], device='cuda:1'), 'random_augmentation': tensor([0.6884, 0.2470, 0.5488, 0.0427], device='cuda:1', dtype=torch.float64)}
0 | gathered batch #1 | index = tensor([ 5,  6,  1, 13,  3,  4, 16,  2], device='cuda:0'), label = tensor([1, 0, 1, 1, 1, 0, 0, 0], device='cuda:0')
0 | batch #2 = {'index': tensor([18, 19, 12, 14], device='cuda:0'), 'label': tensor([0, 1, 0, 0], device='cuda:0'), 'random_augmentation': tensor([0.2793, 0.3084, 0.4822, 0.6987], device='cuda:0', dtype=torch.float64)}
1 | batch #2 = {'index': tensor([10, 11, 15,  7], device='cuda:1'), 'label': tensor([0, 1, 1, 1], device='cuda:1'), 'random_augmentation': tensor([0.3580, 0.6254, 0.6006, 0.0303], device='cuda:1', dtype=torch.float64)}
0 | gathered batch #2 | index = tensor([18, 19, 12, 14, 10, 11], device='cuda:0'), label = tensor([0, 1, 0, 0, 0, 1], device='cuda:0')
sorted(all_examples)=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
Ending Dataloader with sampler=BatchSampler()

@pacman100 pacman100 requested a review from sgugger October 25, 2022 12:37
@pacman100 pacman100 changed the title separate dataloader and sampler generator separate dataloader generator from sampler generator Oct 25, 2022
@HuggingFaceDocBuilderDev
Copy link

HuggingFaceDocBuilderDev commented Oct 25, 2022

The documentation is not available anymore as the PR was closed or merged.

Copy link
Collaborator

@sgugger sgugger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for diving into this! Just have one main comment but it's looking great already! Very clean fix!

src/accelerate/data_loader.py Show resolved Hide resolved
@YouJiacheng
Copy link
Contributor

YouJiacheng commented Oct 25, 2022

This PR currently will break

generator = getattr(dataloader, "generator", None)

generator = dataloader.dataset.generator

generator = sampler.generator

(Of course
if rng_types is not None and generator is None and "generator" in rng_types:
should be modified)


This code duplication (for finding the sampler) also impairs readability instead of improving readability.

batch_sampler = self.sampler if isinstance(self.sampler, BatchSampler) else self.batch_sampler
sampler = (
    batch_sampler.batch_sampler.sampler
    if hasattr(batch_sampler, "batch_sampler")
    else batch_sampler.sampler
)
if hasattr(sampler, "generator"):
    generator = sampler.generator
    synchronize_rng_states(self.rng_types, generator)

I believe what needs to be synchronized is IterableDataset's generator(according to IterableDatasetShard implementation) OR sampler's generator. We can store the synchronized generator in a private attribute.

@sgugger
Copy link
Collaborator

sgugger commented Oct 25, 2022

@YouJiacheng I am not seeing any breaks in all the lines you mention, which are completely orthogonal to the change suggested. As for the code duplication, let us worry about readability as maintainers :-) We cannot store the generator as a private attribute as it is fetched in two different functions.

@YouJiacheng
Copy link
Contributor

By "break", I means that these lines will become useless. And for generator = dataloader.dataset.generator, it is intended to synchronize the generator of IterableDataset. So after proposed change, IterableDataset's generator won't be synchronized.

@sgugger
Copy link
Collaborator

sgugger commented Oct 25, 2022

Ah!, I get what you mean, thanks for clarifying!

We should indeed make the difference between the sampler_generator (for batch samplers) or dataset.generator (in iterable dataset) and the Dataloader generator. The latter should be passed along again when creating the final DataLoader (or use the default if it's none) while the formers should be the one synchronized.

The check on rng_types should probably be moved to the init of BatchSamplerShard/IterableDatasetShard.

@YouJiacheng
Copy link
Contributor

It's hard to explain my proposal using text. So I open a parallel PR to show the code.(It is somewhat "pseudocode"/proof of concept, since I didn't test it).

@sgugger
Copy link
Collaborator

sgugger commented Oct 25, 2022

@YouJiacheng Thanks for taking the time to draft a PR to show your points, it's much clearer this way! I think we need to merge the two PRs somehow as they both contain important things the other has not.

Co-Authored-By: YouJiacheng <1503679330@qq.com>
Co-Authored-By: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>
src/accelerate/data_loader.py Outdated Show resolved Hide resolved
src/accelerate/data_loader.py Show resolved Hide resolved
src/accelerate/data_loader.py Show resolved Hide resolved
@pacman100 pacman100 merged commit a552540 into huggingface:main Oct 25, 2022
@pacman100 pacman100 deleted the smangrul/dataloader-generator-fixes branch November 1, 2022 07:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants