Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft of example integration with Lhotse #102

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

pzelasko
Copy link

@pzelasko pzelasko commented Nov 8, 2021

As discussed with @ejguan, I'm uploading my example code for integration of Lhotse and DataPipes.

Some context: Lhotse is a library for working with audio/speech data. It has a common representation (manifest) format called Cut and CutSet, and recipes for generating these manifests for 30+ popular speech corpora. Lhotse provides Dataset and Sampler classes that simplify working with speech data.

The reason I'm interested in DataPipes is that I find the "old" Dataset/Sampler setup lacks some flexibility that is helpful in speech/audio. This PR shows how to:

  • take Lhotse manifests (lightweight meta-data objects),
  • construct a metadata-level batch with a dynamic size (DurationBatcher: code copied from one of Lhotse's samplers),
  • perform I/O for each batch element in parallel
  • recombine the batch manifests + tensors into a single batch again

What is missing is: collation, transforms, etc. -- these should be simple to add later.

I'd like to get your feedback on this usage of DataPipes. I enjoyed writing this example and I think Lhotse could gain a lot by adopting this type of data pipeline.

@facebook-github-bot
Copy link
Contributor

Hi @pzelasko!

Thank you for your pull request and welcome to our community.

Action Required

In order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you.

Process

In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.

Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.

If you have received this in error or have any questions, please contact us at cla@fb.com. Thanks!

Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for providing early feedback on this project. This is extremely useful.
A couple of comments below.

)

def __iter__(self):
self.datapipe = iter(self.datapipe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not remove the reference to prior/source datapipe here. Since we want to let DataLoaderV2 have graph mode execution of data pipeline (run parallel, automatic sharding, etc.), we need to keep reference graph of each DataPipe instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also prevent you do second iteration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. DataLoaderV2 sounds interesting -- is there a description somewhere that I could read to learn more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't started to write all the related docs as the API and features keep changing. I can give some code pointers to the things I talked about.

Comment on lines +76 to +77
while True:
yield self._collect_batch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reset the iter to None, because generator instance is not serializable (pickable).
IIUC, this is going to iterate forever. Is this intentionally?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply speaking.

Suggested change
while True:
yield self._collect_batch()
def __iter__(self):
self.datapipe_iter = iter(self.datapipe)
...
self.datapipe_iter = None

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this is going to iterate forever. Is this intentionally?

This should stop once self._collect_batch() raises StopIteration. Maybe the sampling code in DurationBatcher can be made simpler -- as you can see there is a lot of branching. I'll give it more thought at a later point.

Comment on lines +137 to +143
class IODataPipe(dp.IterDataPipe):
def __init__(self, datapipe):
self.datapipe = datapipe

def __iter__(self):
for cut_idx, batch_idx, batch_size, cut in self.datapipe:
yield cut_idx, batch_idx, batch_size, cut.load_audio(), cut
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use Mapper to replace this DataPipe?
You should be able to use functional API as:

source_datapipe # Referring to self.datapipe
source_datapipe.map(fn=lambda x: x.load_audio(), input_col=3, output_col=-1)

The result is going to be cut_idx, batch_idx, batch_size, cut, cut.load_audio()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks interesting. Will the lambda work well with pickling / multiprocessing though?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: is it possible to work with dicts rather than tuples when using the functional API? E.g.:

yield {
  'cut_idx': cut_idx,
  'batch_idx': batch_idx,
  ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the lambda work well with pickling / multiprocessing though?

We want to support lambda as it's a common feature for functional programming. So, as long as dill is installed, we would let multiprocessing with lambda works.

Also: is it possible to work with dicts rather than tuples when using the functional API?

Yes. It's supposed to work if you give input_col='cut_idx'.

yield cut_idx, batch_idx, batch_size, cut.load_audio(), cut


class RecombineBatchAfterIO(dp.IterDataPipe):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting use case. It's better to have a buffer size IMO. Otherwise, it potentially blows up memory.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you envision it? Let's say buffer size is 500 items but for some reason, I sampled a batch that has 600 items. Wouldn't that hang indefinitely?

I might not understand the new dataloading machinery yet -- my assumption was that there is some mechanism to make it prepare a prefetch_factor number of batches, and if the consumer stops fetching them (e.g., because it's training), then the whole pipeline would "stop" and wait. If it doesn't stop, I can see how the memory might blow up though.

num_jobs, classifier_fn=partial(classifier_fn, num_jobs=num_jobs)
)
for i in range(len(datapipes)):
datapipes[i] = IODataPipe(datapipes[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much. This is a great example to show the functionality to distribute extensive operation to multiprocessing.


if not cuts_path.exists():
print("Downloading Mini LibriSpeech.")
download_librispeech("./workspace", dataset_parts="mini_librispeech")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially replaced by the on_disk_cache [WIP for refactoring] with download and hash check.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds interesting, do you have a pointer to some code/doc?

process, req_queue, res_queue = SpawnProcessForDataPipeline(ctx, datapipes[i])
process.start()
datapipes[i] = QueueWrapper(
IterDataPipeQueueProtocolClient(req_queue, res_queue)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit the QueueWrapper and IterDataPipeQueueProtocolClient took me a while to get right. I feel like I'm unnecessarily "hacking" the library and using a too low-level API not intended for users. Is there a cleaner way to achieve the same outcome?


if not cuts_path.exists():
print("Downloading Mini LibriSpeech.")
download_librispeech("./workspace", dataset_parts="mini_librispeech")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds interesting, do you have a pointer to some code/doc?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants