Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] pyarrow.fs.copy_files hangs indefinitely #15233

Closed
lukehsiao opened this issue Jan 7, 2023 · 10 comments
Closed

[Python] pyarrow.fs.copy_files hangs indefinitely #15233

lukehsiao opened this issue Jan 7, 2023 · 10 comments

Comments

@lukehsiao
Copy link

lukehsiao commented Jan 7, 2023

Describe the bug, including details regarding any error messages, version, and platform.

We are working on a python package that calls pyarrow's copy_files function to copy a local directory to S3. We notice that this seems to hang indefinitely for a directory, even though it works for individual files.

A simple reproducer seems to be:

from ray.air._internal.remote_storage import upload_to_uri
dir = "/some/directory/local"
uri = "s3://some/s3/bucket"
upload_to_uri(dir, uri)

Where ray just wraps pyarrow.fs.copy_files: https://github.com/ray-project/ray/blob/d7b2b49a962bf33dae7a50376f159ab15d80800f/python/ray/air/_internal/remote_storage.py#L195

This results in the following flamegraph.

profile-idle

And an strace of that process looks like

...
stat("/some/directory/local/result.json", {st_mode=S_IFREG|0664, st_size=2345, ...}) = 0
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
futex(0x5580a3b563e4, FUTEX_WAKE_PRIVATE, 1) = 1
getpid()                                = 1639901
getpid()                                = 1639901
getpid()                                = 1639901
futex(0x5580a3d374e8, FUTEX_WAIT_PRIVATE, 0, NULL) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGTSTP {si_signo=SIGTSTP, si_code=SI_KERNEL} ---
--- stopped by SIGTSTP ---
+++ killed by SIGKILL +++

This is using

.venv ❯ pip show pyarrow                                                                                                                                                                                                    [23/01/6|17:08]
Name: pyarrow
Version: 8.0.0

On Python 3.10.9, on a machine running Ubuntu 20.04.5 LTS.

Given the flamegraph, it seemed to be an issue with pyarrow, not ray, which is why I opened the ticket here.

Component(s)

Python

@lukehsiao lukehsiao changed the title pyarrow copy_files hangs indefinitely pyarrow.fs.copy_files hangs indefinitely Jan 7, 2023
@lukehsiao lukehsiao changed the title pyarrow.fs.copy_files hangs indefinitely [Python] pyarrow.fs.copy_files hangs indefinitely Jan 7, 2023
@EpsilonPrime
Copy link
Contributor

I'm taking a look at this. I have one debugging question for you in the meantime however. When I look at _upload_to_uri_with_exclude it appears that it is a recursive copy. Is there a possibility that you've included the current directory "." or parent directory ".." and that's why the operation never finishes? One way of determining this would be to insert a log/print at https://github.com/ray-project/ray/blob/d7b2b49a962bf33dae7a50376f159ab15d80800f/python/ray/air/_internal/remote_storage.py#L238 to output the paths that are going to be copied from.

@lukehsiao
Copy link
Author

lukehsiao commented Jan 10, 2023

I'm taking a look at this. I have one debugging question for you in the meantime however. When I look at _upload_to_uri_with_exclude it appears that it is a recursive copy. Is there a possibility that you've included the current directory "." or parent directory ".." and that's why the operation never finishes?

In our case, exclude is None, so that code path to _upload_to_uri_with_exclude is actually never hit (and you'll notice it doesn't show in the flamegraph). So, it's essentially just the call to pyarrow.fs.copy_files here:

https://github.com/ray-project/ray/blob/d7b2b49a962bf33dae7a50376f159ab15d80800f/python/ray/air/_internal/remote_storage.py#L209

@krfricke
Copy link
Contributor

krfricke commented Feb 7, 2023

I think this issue is a duplicate of #32372. I've added more details in that issue, but in a nutshell, pyarrow.fs.copy_files hangs for s3 buckets and with use_threads=True if more files are uploaded than CPU cores available:

# On a 8 core Macbook
mkdir -p /tmp/pa-s3
cd /tmp/pa-s3 
for i in {1..7}; do touch $i.txt; done
# This works
python -c "import pyarrow.fs; pyarrow.fs.copy_files('/tmp/pa-s3', 's3://bucket/folder')"
for i in {1..8}; do touch $i.txt; done  
# This hangs forever
python -c "import pyarrow.fs; pyarrow.fs.copy_files('/tmp/pa-s3', 's3://bucket/folder')"

I could reproduce this for pyarrow 6 to 11.

It also only seems to come up for S3, not for GS.

@westonpace
Copy link
Member

This sounds very similar to nested parallelism deadlocks we have had in the past.

Outermost call: fork-join on a bunch of items (in this case it looks like we are doing fork-join on files)
Inner task: fork-join on something else (e.g. in parquet it would be parquet column decoding)

If the inner-task is blocking on a "join" then it is wasting a thread pool thread. If enough of these thread pool threads get wasted then all thread pool threads are blocked waiting for other thread pool tasks and no thread is free to actually do the tasks.

The solution we adopted was to migrate to an async model so that "join" step becomes "return a future" instead of "block until done". This yields roughly the following rules:

  • The user thread (the python top-level thread) should block on a top-level future
  • CPU threads should never block (outside of minor blocking on mutex guards to sequence a tiny critical section)
  • I/O threads should only block on OS calls. They should never block waiting for other tasks.

It seems like the copy_files/s3 combination is violating one of the above rules. There is an OptionalParallelFor in CopyFiles which blocks but I think that is called from the user thread and so that is ok. @EpsilonPrime if you can reproduce I would grab a thread dump from gdb and check and see what the thread tasks are blocking on. The fix will probably be to move copy files over to using async APIs (internally).

@EpsilonPrime
Copy link
Contributor

I have written a reproduction testcase that detects the thread contention issue (and is ready to check in once the fix is ready). What is happening is that when copying a file (filesystem.cc:613) the CopyStream happens as expected and then is passed to the close routine to complete. That delegates to CloseAsync which handles uploading parts (calling UploadPart). To do this UploadPart then adds its work to the threadpool which overloads the executor. For the case of an 8 thread pool with 8 tasks (each small enough to fit in a single part) this ends up being 16 busy threads in a size 8 executor.

The easy solution is to limit the number of tasks to the pool (merely leaving one extra thread appears to be enough for the pool to empty although this needs verification). The second is to modify the close routine to take over the work of the existing thread (not be asynchronous). This would require reworking of at least 5 functions and might require even more work for the case where there are multiple parts per file (which we do not have a test for yet).

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Nov 26, 2024

We just discovered this problem and I'm quite keen to get it fixed. In our case we are using the AzureFileSystem, but it implements background_writes in a very similar way to the S3FileSystem, so it's not a surprise that it suffers from the same problem.

I think the problem is exactly what @westonpace was describing but to be precise about how it manifests in this case:

  1. With a large number of files and use_threads=true, CopyFiles submits a large number of copy operations to the IO thread pool, enough to occupy all the IO threads. https://github.com/apache/arrow/blob/main/cpp/src/arrow/filesystem/filesystem.cc#L648-L650
  2. With background_writes=true each Write https://github.com/apache/arrow/tree/main/cpp/src/arrow/filesystem/util_internal.cc#L56 to Azure or S3 actually just writes to an in memory buffer and submits a task on the IO thread pool, to do the actual upload.
  3. After all the write calls the OutputStream.Close() is called, which calls Flush(). If it was not for ther explicit Close() I thin the destructor would also call Flush(). Flush() blocks, waiting for all the tasks, submitted by calls to Write to complete their actual uploads. Unfortunately, this is a deadlock because there are no free threads in the IO thread pool to actually do any of the uploads.

If we compare to @westonpace 's rules:

This yields roughly the following rules:

  • The user thread (the python top-level thread) should block on a top-level future
  • CPU threads should never block (outside of minor blocking on mutex guards to sequence a tiny critical section)
  • I/O threads should only block on OS calls. They should never block waiting for other tasks.

This third rule is violated because the IO threads spawned by copy are waiting on background write IO threads to complete.

A few vague ideas on solutions, but none of them seem particularly attractive:

  1. OutputStream destructor does not synchronously Flush. Instead it returns a future for the Flush.
    1. I think this is what @westonpace described 'migrate to an async model so that "join" step becomes "return a future" instead of "block until done'
    2. I fear this could require changes to the Filesystem interface particularly around the destructor calling Flush. Maybe there are other options though.
  2. Try setting priorities when submitting tasks.
    1. Copy files spawns tasks with lower priority so they can’t block threads that actually do the copies from running.
    2. If it can preempt lower priority, then this seems like a perfect solution. If it cannot preempt there will still be rare race conditions when all the IO threads can get filled with copy operations and cause the deadlock.
  3. Use a different thread pool e.g. the cpu thread pool for copy files
    1. Sort of makes sense to call it CPU on uploads with background_writes=true because it's basically just writing to an in memory buffer, which is probably more CPU than IO.
    2. Using the same thread pool at both levels is good for controlling concurrency.
    3. For downloads its definitely wrong to use the CPU pool because it will run the blocking IO directly in the CPU pool.
  4. Limit how many tasks copy files can run in parallel, to be slightly less than the io thread pool’s capacity.
    1. There will still be lots of wasted threads.
  5. Disable background writes if there are more files than IO threads.
    1. If there is a really huge file, among lots of small ones the large file will be uploaded synchronously, which will be bad for performance.

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Nov 27, 2024

I had a quick try at implementing idea 1: return futures and join in the main thread. This appears to be a lot easier than I feared.
fix_copy_files_deadlock.patch.txt seems to work fine so far. I'll probably try to make a PR for this if it passes further testing.

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Nov 29, 2024

I found a couple more related problems and I've ended up with Tom-Newton#13. This includes 3 changes:

  1. Use CloseAsync() inside copy_one_file so that it returns futures, instead of blocking while it waits for other tasks to complete. This avoids the deadlock.
  2. Fix a segfault in FileInterface::CloseAsync() by using shared_from_this() to so that it doesn't get destructed prematurely. shared_from_this() is already used in the CloseAsync() implementation on several filesystems. After change 1 this is important when downloading to the local filesystem using CopyFiles.
  3. Spawn copy_one_file less urgently than default, so that background_writes are done with higher priority. Otherwise copy_one_file will keep buffering more data in memory without giving the background_writes any chance to upload the data and drop it from memory. Therefore, without this large copies would cause OOMs.
    1. The current Arrow thread pool implementation makes provision for spawning with a set priority but the priority numbers are just ignored.
    2. I made a small change to use a std::priority_queue to implement the priority.
    3. There is a property of the current implementation that tasks will be scheduled in the order of calls to Spawn. There are no tests to assert this behaviour, and I don't know if it is depended on by anything but currently my std::priority_queue version removes this property.
    4. I think this has got to be the most contentious part of the change, with the broadest possible impact if I introduce a bug. I would appreciate some input on whether this seems like a reasonable change.

I've been testing copying between Azure and local filesystems using a directory of about 50,000 files of varying sizes totalling 160GiB. With all the described changes both upload and download work reliably, with reasonable memory usage. I'm also relatively happy with the performance - bandwidth utilization fluctuates more than I would like, but at times it saturates the network interface on my Azure VM.

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Dec 3, 2024

I opened a draft PR #44897 but ideally, I would like some high level thoughts from maintainers on implementing priority queuing before I mark it ready for review.

pitrou added a commit that referenced this issue Dec 16, 2024
…ackground_writes (#44897)

### Rationale for this change
Currently it deadlocks when trying to upload more files than we have IO threads. 

### What changes are included in this PR?
1. Use CloseAsync() inside copy_one_file so that it returns futures, instead of blocking while it waits for other tasks to complete. This avoids the deadlock.
1. Fix a segfault in FileInterface::CloseAsync() by using shared_from_this() to so that it doesn't get destructed prematurely. shared_from_this() is already used in the CloseAsync() implementation on several filesystems. After change 1 this is important when downloading to the local filesystem using CopyFiles.
1. Spawn `copy_one_file` less urgently than default, so that `background_writes` are done with higher priority. Otherwise `copy_one_file` will keep buffering more data in memory without giving the `background_writes` any chance to upload the data and drop it from memory. Therefore, without this large copies would cause OOMs.
   1. The current Arrow thread pool implementation makes provision for spawning with a set priority but the priority numbers are just ignored.
   1. I made a small change to use a `std::priority_queue` to implement the priority.
   1. There is a property of the current implementation that tasks will be scheduled in the order of calls to Spawn. There are also a few test cases in `arrow-acero-hash-aggregate-test` that fail if this property is not maintained. I'm not sure what is the correct option but for now I have ensured that tasks of the same priority are run in the order they are spawned.
   1. I think this has got to be the most contentious part of the change, with the broadest possible impact if I introduce a bug. I would appreciate some input on whether this seems like a reasonable change and I'm very happy to move it to a separate PR and/or discuss further. 

### Are these changes tested?
Added some extra automated tests. I also ran some large scale tests copying between Azure and local with a directory of about 50,000 files of varying sizes totalling 160GiB.

### Are there any user-facing changes?
1. `CopyFiles` should now work reliably
2. `ThreadPool` now runs tasks in priority order

* GitHub Issue: #15233

Lead-authored-by: Thomas Newton <thomas.w.newton@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Antoine Pitrou <antoine@python.org>
@pitrou pitrou added this to the 19.0.0 milestone Dec 16, 2024
@pitrou
Copy link
Member

pitrou commented Dec 16, 2024

Issue resolved by pull request 44897
#44897

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants