diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index bd0e353e4a03a..2c3d81ca24c51 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -24,6 +24,7 @@ #include "arrow/buffer.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/formatting.h" @@ -43,7 +44,8 @@ AzureOptions::AzureOptions() {} bool AzureOptions::Equals(const AzureOptions& other) const { return (account_dfs_url == other.account_dfs_url && account_blob_url == other.account_blob_url && - credentials_kind == other.credentials_kind); + credentials_kind == other.credentials_kind && + default_metadata == other.default_metadata); } Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name, @@ -461,6 +463,225 @@ class ObjectInputFile final : public io::RandomAccessFile { int64_t content_length_ = kNoSize; std::shared_ptr metadata_; }; + +Status CreateEmptyBlockBlob( + std::shared_ptr block_blob_client) { + try { + block_blob_client->UploadFrom(nullptr, 0); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "UploadFrom failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. There is no existing blob at this " + "location or the existing blob must be replaced so ObjectAppendStream must " + "create a new empty block blob.", + exception); + } + return Status::OK(); +} + +Result GetBlockList( + std::shared_ptr block_blob_client) { + try { + return block_blob_client->GetBlockList().Value; + } catch (Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "GetBlockList failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. Cannot write to a file without first " + "fetching the existing block list.", + exception); + } +} + +Azure::Storage::Metadata ArrowMetadataToAzureMetadata( + const std::shared_ptr& arrow_metadata) { + Azure::Storage::Metadata azure_metadata; + for (auto key_value : arrow_metadata->sorted_pairs()) { + azure_metadata[key_value.first] = key_value.second; + } + return azure_metadata; +} + +Status CommitBlockList( + std::shared_ptr block_blob_client, + const std::vector& block_ids, const Azure::Storage::Metadata& metadata) { + Azure::Storage::Blobs::CommitBlockListOptions options; + options.Metadata = metadata; + try { + // CommitBlockList puts all block_ids in the latest element. That means in the case of + // overlapping block_ids the newly staged block ids will always replace the + // previously committed blocks. + // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body + block_blob_client->CommitBlockList(block_ids, options); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "CommitBlockList failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. Committing is required to flush an " + "output/append stream.", + exception); + } + return Status::OK(); +} + +class ObjectAppendStream final : public io::OutputStream { + public: + ObjectAppendStream( + std::shared_ptr block_blob_client, + const io::IOContext& io_context, const AzureLocation& location, + const std::shared_ptr& metadata, + const AzureOptions& options, int64_t size = kNoSize) + : block_blob_client_(std::move(block_blob_client)), + io_context_(io_context), + location_(location), + content_length_(size) { + if (metadata && metadata->size() != 0) { + metadata_ = ArrowMetadataToAzureMetadata(metadata); + } else if (options.default_metadata && options.default_metadata->size() != 0) { + metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata); + } + } + + ~ObjectAppendStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); + } + + Status Init() { + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + pos_ = content_length_; + } else { + try { + auto properties = block_blob_client_->GetProperties(); + content_length_ = properties.Value.BlobSize; + pos_ = content_length_; + } catch (const Azure::Storage::StorageException& exception) { + if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_)); + } else { + return internal::ExceptionToStatus( + "GetProperties failed for '" + block_blob_client_->GetUrl() + + "' with an unexpected Azure error. Can not initialise an " + "ObjectAppendStream without knowing whether a file already exists at " + "this path, and if it exists, its size.", + exception); + } + content_length_ = 0; + } + } + if (content_length_ > 0) { + ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); + for (auto block : block_list.CommittedBlocks) { + block_ids_.push_back(block.Name); + } + } + return Status::OK(); + } + + Status Abort() override { + if (closed_) { + return Status::OK(); + } + block_blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + Status Close() override { + if (closed_) { + return Status::OK(); + } + RETURN_NOT_OK(Flush()); + block_blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Status CheckClosed(const char* action) const { + if (closed_) { + return Status::Invalid("Cannot ", action, " on closed stream."); + } + return Status::OK(); + } + + Result Tell() const override { + RETURN_NOT_OK(CheckClosed("tell")); + return pos_; + } + + Status Write(const std::shared_ptr& buffer) override { + return DoAppend(buffer->data(), buffer->size(), buffer); + } + + Status Write(const void* data, int64_t nbytes) override { + return DoAppend(data, nbytes); + } + + Status Flush() override { + RETURN_NOT_OK(CheckClosed("flush")); + return CommitBlockList(block_blob_client_, block_ids_, metadata_); + } + + private: + Status DoAppend(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + RETURN_NOT_OK(CheckClosed("append")); + auto append_data = reinterpret_cast(data); + Azure::Core::IO::MemoryBodyStream block_content(append_data, nbytes); + if (block_content.Length() == 0) { + return Status::OK(); + } + + const auto n_block_ids = block_ids_.size(); + + // New block ID must always be distinct from the existing block IDs. Otherwise we + // will accidentally replace the content of existing blocks, causing corruption. + // We will use monotonically increasing integers. + auto new_block_id = std::to_string(n_block_ids); + + // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks. + const size_t target_number_of_digits = 5; + const auto required_padding_digits = + target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); + new_block_id.insert(0, required_padding_digits, '0'); + // There is a small risk when appending to a blob created by another client that + // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` + // suffix significantly reduces the risk, but does not 100% eliminate it. For example + // if the blob was previously created with one block, with id `00001-arrow` then the + // next block we append will conflict with that, and cause corruption. + new_block_id += "-arrow"; + new_block_id = Azure::Core::Convert::Base64Encode( + std::vector(new_block_id.begin(), new_block_id.end())); + + try { + block_blob_client_->StageBlock(new_block_id, block_content); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" + + new_block_id + + "' with an unexpected Azure error. Staging new blocks is fundamental to " + "streaming writes to blob storage.", + exception); + } + block_ids_.push_back(new_block_id); + pos_ += nbytes; + content_length_ += nbytes; + return Status::OK(); + } + + std::shared_ptr block_blob_client_; + const io::IOContext io_context_; + const AzureLocation location_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; + std::vector block_ids_; + Azure::Storage::Metadata metadata_; +}; + } // namespace // ----------------------------------------------------------------------- @@ -724,6 +945,30 @@ class AzureFileSystem::Impl { return Status::OK(); } + + Result> OpenAppendStream( + const AzureLocation& location, + const std::shared_ptr& metadata, const bool truncate, + AzureFileSystem* fs) { + RETURN_NOT_OK(ValidateFileLocation(location)); + ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path)); + + auto block_blob_client = std::make_shared( + blob_service_client_->GetBlobContainerClient(location.container) + .GetBlockBlobClient(location.path)); + + std::shared_ptr stream; + if (truncate) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); + stream = std::make_shared(block_blob_client, fs->io_context(), + location, metadata, options_, 0); + } else { + stream = std::make_shared(block_blob_client, fs->io_context(), + location, metadata, options_); + } + RETURN_NOT_OK(stream->Init()); + return stream; + } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -805,12 +1050,14 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->OpenAppendStream(location, metadata, true, this); } Result> AzureFileSystem::OpenAppendStream( - const std::string&, const std::shared_ptr&) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + const std::string& path, const std::shared_ptr& metadata) { + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->OpenAppendStream(location, metadata, false, this); } Result> AzureFileSystem::Make( diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 1f7047ff94c56..9f980ee8baae0 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -77,6 +77,11 @@ struct ARROW_EXPORT AzureOptions { std::shared_ptr service_principle_credentials_provider; + /// \brief Default metadata for OpenOutputStream. + /// + /// This will be ignored if non-empty metadata is passed to OpenOutputStream. + std::shared_ptr default_metadata; + AzureOptions(); Status ConfigureAccountKeyCredentials(const std::string& account_name, diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index ecf0a19f684eb..e9b9a6f34b88c 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -232,13 +232,11 @@ class AzureFileSystemTest : public ::testing::Test { void UploadLines(const std::vector& lines, const char* path_to_file, int total_size) { - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - auto blob_client = - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(path_to_file); - std::string all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); - blob_client.UploadFrom(reinterpret_cast(all_lines.data()), - total_size); + const auto path = PreexistingContainerPath() + path_to_file; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const auto all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); + ASSERT_OK(output->Write(all_lines)); + ASSERT_OK(output->Close()); } void RunGetFileInfoObjectWithNestedStructureTest(); @@ -347,21 +345,26 @@ void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() { // Adds detailed tests to handle cases of different edge cases // with directory naming conventions (e.g. with and without slashes). constexpr auto kObjectName = "test-object-dir/some_other_dir/another_dir/foo"; - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(kObjectName) - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); + ASSERT_OK_AND_ASSIGN( + auto output, + fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName, /*metadata=*/{})); + const std::string_view data(kLoremIpsum); + ASSERT_OK(output->Write(data)); + ASSERT_OK(output->Close()); // 0 is immediately after "/" lexicographically, ensure that this doesn't // cause unexpected issues. - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient("test-object-dir/some_other_dir0") - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); - - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(std::string(kObjectName) + "0") - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); + ASSERT_OK_AND_ASSIGN(output, + fs_->OpenOutputStream( + PreexistingContainerPath() + "test-object-dir/some_other_dir0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data)); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN( + output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName + "0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data)); + ASSERT_OK(output->Close()); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName, FileType::File); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/", @@ -647,6 +650,157 @@ TEST_F(AzuriteFileSystemTest, OpenInputStreamClosed) { ASSERT_RAISES(Invalid, stream->Tell()); } +TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { + options_.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); + + ASSERT_OK_AND_ASSIGN(auto fs_with_defaults, AzureFileSystem::Make(options_)); + std::string path = "object_with_defaults"; + auto location = PreexistingContainerPath() + path; + ASSERT_OK_AND_ASSIGN(auto output, + fs_with_defaults->OpenOutputStream(location, /*metadata=*/{})); + const std::string_view expected(kLoremIpsum); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + + // Verify the metadata has been set. + auto blob_metadata = + blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; + EXPECT_EQ(Azure::Core::CaseInsensitiveMap{std::make_pair("foo", "bar")}, blob_metadata); + + // Check that explicit metadata overrides the defaults. + ASSERT_OK_AND_ASSIGN( + output, fs_with_defaults->OpenOutputStream( + location, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + blob_metadata = blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; + // Defaults are overwritten and not merged. + EXPECT_EQ(Azure::Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected(kLoremIpsum); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(expected, std::string_view(inbuf.data(), size)); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; + std::array buffers{ + std::string(sizes[0], 'A'), + std::string(sizes[1], 'B'), + std::string(sizes[2], 'C'), + }; + auto expected = std::int64_t{0}; + for (auto i = 0; i != 3; ++i) { + ASSERT_OK(output->Write(buffers[i])); + expected += sizes[i]; + ASSERT_EQ(expected, output->Tell()); + } + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + ASSERT_TRUE(buffer); + contents.append(buffer->ToString()); + } while (buffer->size() != 0); + + EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamTruncatesExistingFile) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected0("Existing blob content"); + ASSERT_OK(output->Write(expected0)); + ASSERT_OK(output->Close()); + + // Check that the initial content has been written - if not this test is not achieving + // what it's meant to. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected0, std::string_view(inbuf.data(), size)); + + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + const std::string_view expected1(kLoremIpsum); + ASSERT_OK(output->Write(expected1)); + ASSERT_OK(output->Close()); + + // Verify that the initial content has been overwritten. + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected1, std::string_view(inbuf.data(), size)); +} + +TEST_F(AzuriteFileSystemTest, OpenAppendStreamDoesNotTruncateExistingFile) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected0("Existing blob content"); + ASSERT_OK(output->Write(expected0)); + ASSERT_OK(output->Close()); + + // Check that the initial content has been written - if not this test is not achieving + // what it's meant to. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected0, std::string_view(inbuf.data())); + + ASSERT_OK_AND_ASSIGN(output, fs_->OpenAppendStream(path, {})); + const std::string_view expected1(kLoremIpsum); + ASSERT_OK(output->Write(expected1)); + ASSERT_OK(output->Close()); + + // Verify that the initial content has not been overwritten and that the block from + // the other client was not committed. + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(std::string(inbuf.data(), size), + std::string(expected0) + std::string(expected1)); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamClosed) { + const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), + "open-output-stream-closed.txt"); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + ASSERT_OK(output->Close()); + ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum))); + ASSERT_RAISES(Invalid, output->Flush()); + ASSERT_RAISES(Invalid, output->Tell()); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamUri) { + const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), + "open-output-stream-uri.txt"); + ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfs://" + path)); +} + TEST_F(AzuriteFileSystemTest, OpenInputFileMixedReadVsReadAt) { // Create a file large enough to make the random access tests non-trivial. auto constexpr kLineWidth = 100;