Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downloader is not producing full set of expected outputs #159

Closed
PranshuBansalDev opened this issue Mar 29, 2022 · 33 comments
Closed

Downloader is not producing full set of expected outputs #159

PranshuBansalDev opened this issue Mar 29, 2022 · 33 comments

Comments

@PranshuBansalDev
Copy link

PranshuBansalDev commented Mar 29, 2022

Heya, I was trying to download the LAION400M dataset and noticed that I am not getting the full set of data for some reason.

Any tips on debugging further?

TL;DR - I was expecting ~12M files to be downloaded, only seeing successes in *_stats.json files indicating ~2M files were actually downloaded

For example - I recently tried to download this dataset in a distributed manner on EMR:

https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/dataset/part-00000-5b54c5d5-bbcf-484d-a2ce-0d6f73df1a36-c000.snappy.parquet

I applied some light NSFW filtering on it to produce a new parquet

# rest of the script is redacted, but there is some code before this to normalize the NSFW row to make filtering more convenient
sampled_df = df[df["NSFW"] == "unlikely"]
sampled_df.reset_index(inplace=True)

Verified its row count is ~12M samples:

import glob
import json
from pyarrow.parquet import ParquetDataset

files = glob.glob("*.parquet")

d = {}

for file in files:
    d[file] = 0
    dataset = ParquetDataset(file)
    for piece in dataset.pieces:
        d[file] += piece.get_metadata().num_rows

print(json.dumps(d, indent=2, sort_keys=True))
{
  "part00000.parquet": 12026281
}

Ran the download, and scanned over the output s3 bucket:

aws s3 cp\
	s3://path/to/s3/download/ . \
	--exclude "*" \
	--include "*.json" \
	--recursive

Ran this script to get the total count of images downloaded:

import json
import glob

files = glob.glob("/path/to/json/files/*.json")

count = {}
successes = {}

for file in files:
    with open(file) as f:
        j = json.load(f)
        count[file] = j["count"]
        successes[file] = j["successes"]

rate = 100 * sum(successes.values()) / sum(count.values())
print(f"Success rate: {rate}. From {sum(successes.values())} / {sum(count.values())}")

which gave me the following output:

Success rate: 56.15816066896948. From 1508566 / 2686281

The high error rate here is not of major concern, I was running at low worker node count for experimentation so we have a lot of dns issues (I'll use a knot resolver later)

unknown url type: '21nicrmo2'                                                      1.0
<urlopen error [errno 22] invalid argument>                                        1.0
encoding with 'idna' codec failed (unicodeerror: label empty or too long)          1.0
http/1.1 401.2 unauthorized\r\n                                                    4.0
<urlopen error no host given>                                                      5.0
<urlopen error unknown url type: "https>                                          11.0
incomplete read                                                                   14.0
<urlopen error [errno 101] network is unreachable>                                38.0
<urlopen error [errno 104] connection reset by peer>                              75.0
[errno 104] connection reset by peer                                              92.0
opencv                                                                           354.0
<urlopen error [errno 113] no route to host>                                     448.0
remote end closed connection without response                                    472.0
<urlopen error [errno 111] connection refused>                                  1144.0
encoding issue                                                                  2341.0
timed out                                                                       2850.0
<urlopen error timed out>                                                       4394.0
the read operation timed out                                                    4617.0
image decoding error                                                            5563.0
ssl                                                                             6174.0
http error                                                                     62670.0
<urlopen error [errno -2] name or service not known>                         1086446.0
success                                                                      1508566.0

I also noticed there were only 270 json files produced, but given that each shard should contain 10,000 images, I expected ~1,200 json files to be produced. Not sure where this discrepancy is coming from

> ls
00000_stats.json  00051_stats.json  01017_stats.json  01066_stats.json  01112_stats.json  01157_stats.json
00001_stats.json  00052_stats.json  01018_stats.json  01067_stats.json  01113_stats.json  01159_stats.json
...
> ls -l | wc -l 
270
@PranshuBansalDev
Copy link
Author

image

I'm even seeing ~1200 jobs being created, but somehow only 270 outputs

@rom1504
Copy link
Owner

rom1504 commented Mar 29, 2022

Do you have enough ram in workers ?

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Mar 29, 2022

I believe they are all succeeding

image

This is the config I'm using


        --master yarn\
        --deploy-mode client\
        --conf "spark.executor.cores=5"\
        --conf "spark.executor.memory=9g"\
        --conf "spark.executor.memoryOverhead=1g"\
        --conf "spark.driver.memory=9g"\
        --conf "spark.driver.cores=5"\
        --conf "spark.default.parallelism=50"\
        --conf "spark.network.timeout=1000000s"\

This is my worker node

image

@rom1504
Copy link
Owner

rom1504 commented Mar 29, 2022

Ok, do you have any way to check the memory usage during the job?
It's likely that it is the problem

Can you also check executor logs to see if you get this log https://github.com/rom1504/img2dataset/blob/main/img2dataset/downloader.py#L93 ?

@PranshuBansalDev
Copy link
Author

I see that from cloudwatch that it looks fine:

image

Oh interesting, I looked through some of the executors that were running shockingly fast (<1s) and found a bunch of these errors

botocore.exceptions.NoCredentialsError: Unable to locate credentials
shard 381 failed with error Unable to locate credentials

Wonder if thats related, I'll dig into it

Any reason this is just a print statement and not a thrown exception that the tool can do retries on?

@rom1504
Copy link
Owner

rom1504 commented Mar 29, 2022

Yeah the problem is spark doesn't have a feature of "try then give up at some point" so if there was an exception here instead of a print then your whole job would likely have failed after the tasks had retried a few times

I think something that could be improved here is doing a loop in that piece of code to retry a few times instead of just failing

@rom1504
Copy link
Owner

rom1504 commented Mar 29, 2022

Your credentials error is likely the problem

There is 2 ways to solve it
One is to find the root cause and solve that cred problem
Another is implement the retry I'm mentioning above, assuming this is a temporary problem, the second try should work

@PranshuBansalDev
Copy link
Author

Is there any concept of surfacing these logs without having to go into the individual executors? Or somehow tracking these failures in stats.json?

@PranshuBansalDev
Copy link
Author

Your credentials error is likely the problem

There is 2 ways to solve it One is to find the root cause and solve that cred problem Another is implement the retry I'm mentioning above, assuming this is a temporary problem, the second try should work

Yeah I noticed the error eventually goes away, I'm wondering if its some kind of spark job spin up time and I should just add a wait 5m to the calling script or something

@rom1504
Copy link
Owner

rom1504 commented Mar 29, 2022

There are several options to surface them but I'm not sure if I can think of something clean, feel free to try things
With spark it's kind of usual to look at executor logs

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Mar 30, 2022

Edit - I spoke too soon, I reduced subjob_size way down to 5, and lets see if that works

@PranshuBansalDev
Copy link
Author

Could this #137 be related?

@rom1504
Copy link
Owner

rom1504 commented Mar 30, 2022 via email

@rom1504
Copy link
Owner

rom1504 commented Mar 30, 2022

Reducing subjob_size to 5 will drastically decrease the performance of the tool, at this point it will be slower than running on one node.

@rom1504
Copy link
Owner

rom1504 commented Mar 30, 2022

https://github.com/rom1504/img2dataset/blob/main/img2dataset/downloader.py#L88 by retry at the shard level i mean doing a for loop there

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Mar 30, 2022

Yeah thats what I did, I ran retries at shard level (i.e. i went and edited the code so that it 1. retries on shard failure, 2. eventually throws after 3 retries. This did not resolve the error). The error I'm getting is not retry-able. Do you know the minimum parallelization you'd recommend before it starts to be more efficient to run on single node? Reducing parallelism seems to be the only fix

@PranshuBansalDev
Copy link
Author

Yeah I just ran a test over a single parquet, running at low parallelism fixed it (but yes now the tool isnt performant)

I'll spend today working on finding if there are better params I can use

SSL errors also (expectedly) went way down

>>> df.sum()
success                                                                      11279122.0
http error                                                                     451597.0
ssl                                                                             48412.0
image decoding error                                                            57479.0
<urlopen error [errno -2] name or service not known>                            82297.0
remote end closed connection without response                                    3438.0
the read operation timed out                                                    27808.0
timed out                                                                       16421.0
<urlopen error [errno 111] connection refused>                                   8824.0
<urlopen error timed out>                                                       32609.0
opencv                                                                           2876.0
<urlopen error [errno 113] no route to host>                                     3014.0
<urlopen error [errno 104] connection reset by peer>                              352.0
encoding issue                                                                  10642.0
[errno 104] connection reset by peer                                              614.0
<urlopen error [errno 101] network is unreachable>                                542.0
incomplete read                                                                    53.0
<urlopen error [errno 22] invalid argument>                                         5.0
http/1.1 401.2 unauthorized\r\n                                                    98.0
<urlopen error unknown url type: "https>                                           35.0
<urlopen error no host given>                                                      33.0
encoding with 'idna' codec failed (unicodeerror: label empty or too long)           3.0
got more than 100 headers                                                           3.0
unknown url type: '21nicrmo2'                                                       4.0

@rom1504
Copy link
Owner

rom1504 commented Mar 30, 2022

The error I'm getting is not retry-able

What do you mean exactly? Does it fail again if you try (after some wait time) ?

Do you know the minimum parallelization you'd recommend before it starts to be more efficient to run on single node? Reducing parallelism seems to be the only fix

I do not advise reducing the parallism for this permission problem. The whole point of this tool is to be as parallel as possible

My advice at this point is either to try to implement retrying properly (can you share your implementation?)
Either to fix the root permission issue

In my case i chose not to use AWS EMR and instead to use directly ec2 instances with a spark standalone cluster on them because AWS EMR was not working well. You can see how in the distributed guide in the doc.

Btw running this in distributed only makes sense if your dataset is larger than a few billions of samples

For laion400m, a single node is enough to run in 3 days

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Mar 30, 2022

I tried something super basic like this:

    def __call__(
        self, row,
    ):
        for retry in len(self.retries+1):
            try:
                self.download_shard(row)
                print(f"shard {row[0]} completed successfully!")
                return
            except Exception as err:  # pylint: disable=broad-except
                traceback.print_exc()
                print(f"shard {row[0]} failed with error {err}. Retry {retry}/{self.retries}")
                time.sleep(10)
        return (False, 0, 0, 0, 0, 0, None)

Is there a better way to attempt retries?

For single node - do you have recommended configs for the LAION400M download? (ec2 instance type, multiprocessing params, etc)

@rom1504
Copy link
Owner

rom1504 commented Mar 30, 2022

Your loop seems ok. So it prints failure 10 times in a row and never succeeds ?

If that's the case I'm afraid the only thing to do is really to fix the S3 auth. Maybe your AWS configs are not quite right?

For a single node you can use these commands https://github.com/rom1504/img2dataset/blob/main/dataset_examples/laion400m.md
In term of hardware 16 cores is enough. If you have more then it will be faster. But for example c6i in AWS ec2
It's really important to setup knot resolver though

@PranshuBansalDev
Copy link
Author

I don't think its a aws config thing because eventually the download does start working, and it also works at very low parallelism.

I'll also give the knot resolver a go. Thank you so so so so so much for all the support you've been providing to me!

@PranshuBansalDev
Copy link
Author

Ok I just decided to opt for a slower download, as its currently blocking some other work, not sure what the issue is. If you want to keep this open for resolution I'll update the thread when i do get around to digging into the root cause

@PranshuBansalDev
Copy link
Author

Huge thanks for all the ideas and support though!

@rom1504
Copy link
Owner

rom1504 commented Mar 31, 2022

Let's keep it open for now.
I'll think what can be done to make this better and clearer when I have time.

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Mar 31, 2022

So the set of things I'm going to investigate in parallel while the real download goes on in the background is:

  1. set up a knot resolver (this isnt super trivial as my corpo security may ban me from using the ones you have access to)
  2. try see if premounting s3 onto my file system (using s3fs or similar) will help resolve the startup thrashing (my hunch thats what the issue is)

While I definitely believe using ec2 directly may help resolve the issues, I want to try my best to see if we can get EMR working so that we can leverage auto-scaling/remove the parallel-ssh steps for future use (doing that for 100 machines if theres a 100B dataset doesnt sound fun)

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Apr 6, 2022

Ok i figured it out eventually - I don't have a smoking gun with data, but when I did the below suggested fix it resolved the issue:

Basically my spark defaults had spark.dynamicAllocation.enabled as True, when I disabled dynamicAllocation the issue went away

Observations: I noticed that the executors would keep dieing/come back up. After a bit of digging I saw that the executors that were getting shut down were handling ~40-50% of the tasks in less than 50ms

Digging: I realized that it takes a bit of time between executors spinning up and the tasks actually getting allocated to them, I noticed that in the spark logs there was a bunch of "killing executor due to timeout"

Solution:
So when I went and looked up this message I did some reading about dynamicAllocation. So I had 2 thoughts -

  1. Disable it, see if it works
  2. Go back and increase idleTimeout to something like 10-15minutes

I tried 1, it worked, then I ran out of time to go back and try 2

Hopefully this helps, but your tool isn't the issue here

@rom1504
Copy link
Owner

rom1504 commented Apr 6, 2022

Interesting, thanks for the info!
I'm still interested to increase the reliability here. I will be handling it in #140. Trying various ways to implement shard retrying and making it so the likeliness of the success rate being low decrease / trying to making so things are more likely to work by default

Btw just to give some ideas of speeds, I talked with someone that used a bunch of freely provided TPU vms and was able to download laion2B-multi in 7h at around 100k sample/s (using img2dataset)

@rom1504
Copy link
Owner

rom1504 commented Apr 6, 2022

Did you manage to download what you wanted in a reasonable time ?

@PranshuBansalDev
Copy link
Author

Yup, we used 10 c4.8x large and got it done in 30hours for LAION400M, success rate of ~93% with no dns/knot resolvers

I used these settings (probably overly conservative, but I used the guidance avail here)

--master yarn\
                --deploy-mode client\
                --conf "spark.dynamicAllocation.enabled=false"\
                --conf "spark.executor.cores=5"\
                --conf "spark.executor.memory=9g"\
                --conf "spark.executor.memoryOverhead=1g"\
                --conf "spark.driver.memory=9g"\
                --conf "spark.driver.cores=5"\
                --conf "spark.executor.instances=59"\
                --conf "spark.default.parallelism=1300"

@PranshuBansalDev
Copy link
Author

PranshuBansalDev commented Apr 6, 2022

I also had this weird issue where the SparkJob would just show "successfully completed" after the first 5-6 parquets, so I had to wrap it in a loop and that helped a bunch too

aws s3 ls s3://path/to/parquets/ > files.txt
input="files.txt"
while IFS= read -r line
  spark-submit ... img2dataset ... -i $line
done < "$input" 

@PranshuBansalDev
Copy link
Author

Also - since I resolved this issue on my side/posted the steps to fix going forward for others I won't monitor this anymore, please do ping me at my email address if you have any further questions!

I'll let you close this out once you're ready to

@rom1504
Copy link
Owner

rom1504 commented May 18, 2022

you may now rerun the job to get missing shards, see https://github.com/rom1504/img2dataset#incremental-mode

however I will also implement a shard retrying feature in a future PR

@rom1504
Copy link
Owner

rom1504 commented May 18, 2022

shard retrying implemented as well

@rom1504 rom1504 closed this as completed May 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants