Skip to content

Commit

Permalink
Paste in object append stream from apache#12914
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Nov 19, 2023
1 parent 7df1cdd commit 89899b4
Showing 1 changed file with 190 additions and 0 deletions.
190 changes: 190 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,196 @@ class ObjectInputFile final : public io::RandomAccessFile {
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};

class ObjectAppendStream final : public io::OutputStream {
public:
ObjectAppendStream(
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient>& path_client,
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient>& file_client,
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>& blob_client,
const bool is_hierarchical_namespace_enabled, const io::IOContext& io_context,
const AzurePath& path, const std::shared_ptr<const KeyValueMetadata>& metadata)
: path_client_(std::move(path_client)),
file_client_(std::move(file_client)),
blob_client_(std::move(blob_client)),
is_hierarchical_namespace_enabled_(is_hierarchical_namespace_enabled),
io_context_(io_context),
path_(path) {}

~ObjectAppendStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
// even though it may be more expensive.
io::internal::CloseFromDestructor(this);
}

Status Init() {
closed_ = false;
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
return Status::OK();
}
try {
auto properties = path_client_->GetProperties();
if (properties.Value.IsDirectory) {
return ::arrow::fs::internal::NotAFile(path_.full_path);
}
content_length_ = properties.Value.FileSize;
pos_ = content_length_;
} catch (const Azure::Storage::StorageException& exception) {
// new file
if (is_hierarchical_namespace_enabled_) {
try {
file_client_->CreateIfNotExists();
} catch (const Azure::Storage::StorageException& exception) {
return Status::IOError(exception.RawResponse->GetReasonPhrase());
}
} else {
std::string s = "";
try {
file_client_->UploadFrom(
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())), s.size());
} catch (const Azure::Storage::StorageException& exception) {
return Status::IOError(exception.RawResponse->GetReasonPhrase());
}
}
content_length_ = 0;
}
return Status::OK();
}

Status Abort() override {
if (closed_) {
return Status::OK();
}
path_client_ = nullptr;
file_client_ = nullptr;
blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

// OutputStream interface

Status Close() override {
if (closed_) {
return Status::OK();
}
path_client_ = nullptr;
file_client_ = nullptr;
blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

bool closed() const override { return closed_; }

Result<int64_t> Tell() const override {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
return pos_;
}

Status Write(const std::shared_ptr<Buffer>& buffer) override {
return DoAppend(buffer->data(), buffer->size(), buffer);
}

Status Write(const void* data, int64_t nbytes) override {
return DoAppend(data, nbytes);
}

Status DoAppend(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
if (is_hierarchical_namespace_enabled_) {
try {
auto buffer_stream = std::make_unique<Azure::Core::IO::MemoryBodyStream>(
Azure::Core::IO::MemoryBodyStream(
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(data)), nbytes));
if (buffer_stream->Length() == 0) {
return Status::OK();
}
auto result = file_client_->Append(*buffer_stream, pos_);
pos_ += nbytes;
file_client_->Flush(pos_);
} catch (const Azure::Storage::StorageException& exception) {
return Status::IOError(exception.RawResponse->GetReasonPhrase());
}
} else {
try {
auto append_data = static_cast<uint8_t*>((void*)data);
auto res = blob_client_->GetBlockList().Value;
auto size = res.CommittedBlocks.size();
std::string block_id;
{
block_id = std::to_string(size + 1);
size_t n = 8;
int precision = n - std::min(n, block_id.size());
block_id.insert(0, precision, '0');
}
block_id = Azure::Core::Convert::Base64Encode(
std::vector<uint8_t>(block_id.begin(), block_id.end()));
auto block_content = Azure::Core::IO::MemoryBodyStream(
append_data, strlen(reinterpret_cast<char*>(append_data)));
if (block_content.Length() == 0) {
return Status::OK();
}
blob_client_->StageBlock(block_id, block_content);
std::vector<std::string> block_ids;
for (auto block : res.CommittedBlocks) {
block_ids.push_back(block.Name);
}
block_ids.push_back(block_id);
blob_client_->CommitBlockList(block_ids);
pos_ += nbytes;
} catch (const Azure::Storage::StorageException& exception) {
return Status::IOError(exception.RawResponse->GetReasonPhrase());
}
}
content_length_ += nbytes;
return Status::OK();
}

Status Flush() override {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
if (is_hierarchical_namespace_enabled_) {
try {
file_client_->Flush(content_length_);
} catch (const Azure::Storage::StorageException& exception) {
return Status::IOError(exception.RawResponse->GetReasonPhrase());
}
} else {
try {
auto res = blob_client_->GetBlockList().Value;
std::vector<std::string> block_ids;
for (auto block : res.UncommittedBlocks) {
block_ids.push_back(block.Name);
}
blob_client_->CommitBlockList(block_ids);
} catch (const Azure::Storage::StorageException& exception) {
return Status::IOError(exception.RawResponse->GetReasonPhrase());
}
}
return Status::OK();
}

protected:
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client_;
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client_;
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> blob_client_;
const bool is_hierarchical_namespace_enabled_;
const io::IOContext io_context_;
const AzurePath path_;

bool closed_ = true;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
};

} // namespace

// -----------------------------------------------------------------------
Expand Down

0 comments on commit 89899b4

Please sign in to comment.