-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft of example integration with Lhotse #102
base: main
Are you sure you want to change the base?
Conversation
Hi @pzelasko! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at cla@fb.com. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for providing early feedback on this project. This is extremely useful.
A couple of comments below.
) | ||
|
||
def __iter__(self): | ||
self.datapipe = iter(self.datapipe) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not remove the reference to prior/source datapipe here. Since we want to let DataLoaderV2
have graph
mode execution of data pipeline (run parallel, automatic sharding, etc.), we need to keep reference graph of each DataPipe instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would also prevent you do second iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. DataLoaderV2 sounds interesting -- is there a description somewhere that I could read to learn more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't started to write all the related docs as the API and features keep changing. I can give some code pointers to the things I talked about.
- Graph traverse: https://github.com/pytorch/pytorch/blob/master/torch/utils/data/graph.py (Automatical sharding is using this graph to dynamically shard DataPipe instances)
- DataLoaderV2 is here https://github.com/pytorch/pytorch/blob/master/torch/utils/data/dataloader_experimental.py
while True: | ||
yield self._collect_batch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should reset the iter
to None, because generator instance is not serializable (pickable).
IIUC, this is going to iterate forever. Is this intentionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply speaking.
while True: | |
yield self._collect_batch() | |
def __iter__(self): | |
self.datapipe_iter = iter(self.datapipe) | |
... | |
self.datapipe_iter = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is going to iterate forever. Is this intentionally?
This should stop once self._collect_batch()
raises StopIteration
. Maybe the sampling code in DurationBatcher
can be made simpler -- as you can see there is a lot of branching. I'll give it more thought at a later point.
class IODataPipe(dp.IterDataPipe): | ||
def __init__(self, datapipe): | ||
self.datapipe = datapipe | ||
|
||
def __iter__(self): | ||
for cut_idx, batch_idx, batch_size, cut in self.datapipe: | ||
yield cut_idx, batch_idx, batch_size, cut.load_audio(), cut |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use Mapper
to replace this DataPipe?
You should be able to use functional API as:
source_datapipe # Referring to self.datapipe
source_datapipe.map(fn=lambda x: x.load_audio(), input_col=3, output_col=-1)
The result is going to be cut_idx, batch_idx, batch_size, cut, cut.load_audio()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks interesting. Will the lambda work well with pickling / multiprocessing though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: is it possible to work with dicts rather than tuples when using the functional API? E.g.:
yield {
'cut_idx': cut_idx,
'batch_idx': batch_idx,
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the lambda work well with pickling / multiprocessing though?
We want to support lambda
as it's a common feature for functional programming. So, as long as dill
is installed, we would let multiprocessing with lambda works.
Also: is it possible to work with dicts rather than tuples when using the functional API?
Yes. It's supposed to work if you give input_col='cut_idx'
.
yield cut_idx, batch_idx, batch_size, cut.load_audio(), cut | ||
|
||
|
||
class RecombineBatchAfterIO(dp.IterDataPipe): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting use case. It's better to have a buffer size IMO. Otherwise, it potentially blows up memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you envision it? Let's say buffer size is 500 items but for some reason, I sampled a batch that has 600 items. Wouldn't that hang indefinitely?
I might not understand the new dataloading machinery yet -- my assumption was that there is some mechanism to make it prepare a prefetch_factor
number of batches, and if the consumer stops fetching them (e.g., because it's training), then the whole pipeline would "stop" and wait. If it doesn't stop, I can see how the memory might blow up though.
num_jobs, classifier_fn=partial(classifier_fn, num_jobs=num_jobs) | ||
) | ||
for i in range(len(datapipes)): | ||
datapipes[i] = IODataPipe(datapipes[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much. This is a great example to show the functionality to distribute extensive operation to multiprocessing.
|
||
if not cuts_path.exists(): | ||
print("Downloading Mini LibriSpeech.") | ||
download_librispeech("./workspace", dataset_parts="mini_librispeech") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could potentially replaced by the on_disk_cache
[WIP for refactoring] with download and hash check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds interesting, do you have a pointer to some code/doc?
process, req_queue, res_queue = SpawnProcessForDataPipeline(ctx, datapipes[i]) | ||
process.start() | ||
datapipes[i] = QueueWrapper( | ||
IterDataPipeQueueProtocolClient(req_queue, res_queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I admit the QueueWrapper
and IterDataPipeQueueProtocolClient
took me a while to get right. I feel like I'm unnecessarily "hacking" the library and using a too low-level API not intended for users. Is there a cleaner way to achieve the same outcome?
|
||
if not cuts_path.exists(): | ||
print("Downloading Mini LibriSpeech.") | ||
download_librispeech("./workspace", dataset_parts="mini_librispeech") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds interesting, do you have a pointer to some code/doc?
As discussed with @ejguan, I'm uploading my example code for integration of Lhotse and DataPipes.
Some context: Lhotse is a library for working with audio/speech data. It has a common representation (manifest) format called
Cut
andCutSet
, and recipes for generating these manifests for 30+ popular speech corpora. Lhotse provides Dataset and Sampler classes that simplify working with speech data.The reason I'm interested in DataPipes is that I find the "old" Dataset/Sampler setup lacks some flexibility that is helpful in speech/audio. This PR shows how to:
DurationBatcher
: code copied from one of Lhotse's samplers),What is missing is: collation, transforms, etc. -- these should be simple to add later.
I'd like to get your feedback on this usage of DataPipes. I enjoyed writing this example and I think Lhotse could gain a lot by adopting this type of data pipeline.