Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41862: [C++][S3] Fix potential deadlock when closing output stream #41876

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,10 @@ class ObjectOutputStream final : public io::OutputStream {
io::internal::CloseFromDestructor(this);
}

std::shared_ptr<ObjectOutputStream> Self() {
return std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
}

Status Init() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

Expand Down Expand Up @@ -1724,9 +1728,9 @@ class ObjectOutputStream final : public io::OutputStream {

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

auto self = std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); });
return FlushAsync().Then(
[self = Self()]() { return self->FinishPartUploadAfterFlush(); });
pitrou marked this conversation as resolved.
Show resolved Hide resolved
}

bool closed() const override { return closed_; }
Expand Down Expand Up @@ -1892,7 +1896,15 @@ class ObjectOutputStream final : public io::OutputStream {
}
// Notify completion
if (--state->parts_in_progress == 0) {
state->pending_parts_completed.MarkFinished(state->status);
// GH-41862: avoid potential deadlock if the Future's callback is called
// with the mutex taken.
auto fut = state->pending_parts_completed;
lock.unlock();
// State could be mutated concurrently if another thread writes to the
// stream, but in this case the Flush() call is only advisory anyway.
// Besides, it's not generally sound to write to an OutputStream from
// several threads at once.
fut.MarkFinished(state->status);
}
}

Expand Down
31 changes: 29 additions & 2 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,26 @@ class TestS3FS : public S3TestMixin {
// after CloseAsync or synchronously after stream.reset() since we're just
// checking that `closeAsyncFut` keeps the stream alive until completion
// rather than segfaulting on a dangling stream
auto closeAsyncFut = stream->CloseAsync();
auto close_fut = stream->CloseAsync();
stream.reset();
ASSERT_OK(closeAsyncFut.MoveResult());
ASSERT_OK(close_fut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

void TestOpenOutputStreamCloseAsyncFutureDeadlock() {
// This is inspired by GH-41862, though it fails to reproduce the actual
// issue reported there (actual preconditions might be required).
// Here we lose our reference to an output stream from its CloseAsync callback.
// This should not deadlock.
std::shared_ptr<io::OutputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
ASSERT_OK(stream->Write("new data"));
auto close_fut = stream->CloseAsync();
close_fut.AddCallback([stream = std::move(stream)](Status st) mutable {
// Trigger stream destruction from callback
stream.reset();
});
ASSERT_OK(close_fut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

Expand Down Expand Up @@ -1254,6 +1271,16 @@ TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
TestOpenOutputStreamCloseAsyncDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) {
TestOpenOutputStreamCloseAsyncFutureDeadlock();
}

TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) {
options_.background_writes = false;
MakeFileSystem();
TestOpenOutputStreamCloseAsyncFutureDeadlock();
}

TEST_F(TestS3FS, OpenOutputStreamMetadata) {
std::shared_ptr<io::OutputStream> stream;

Expand Down
Loading