Skip to content

Commit

Permalink
Add fallback code for older AWS SDK versions
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Mar 14, 2024
1 parent 859b790 commit d51037a
Showing 1 changed file with 49 additions and 8 deletions.
57 changes: 49 additions & 8 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
#include <aws/core/utils/xml/XmlSerializer.h>
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3ClientConfiguration.h>
#include <aws/s3/S3EndpointProvider.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
Expand Down Expand Up @@ -101,12 +99,21 @@
#define ARROW_S3_HAS_CRT
#endif

#if ARROW_AWS_SDK_VERSION_CHECK(1, 10, 0)
#define ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#endif

#ifdef ARROW_S3_HAS_CRT
#include <aws/crt/io/Bootstrap.h>
#include <aws/crt/io/EventLoopGroup.h>
#include <aws/crt/io/HostResolver.h>
#endif

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#include <aws/s3/S3ClientConfiguration.h>
#include <aws/s3/S3EndpointProvider.h>
#endif

#include "arrow/util/windows_fixup.h"

#include "arrow/buffer.h"
Expand Down Expand Up @@ -913,6 +920,18 @@ Result<std::shared_ptr<S3ClientHolder>> GetClientHolder(
// -----------------------------------------------------------------------
// S3 client factory: build S3Client from S3Options

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION

// GH-40279: standard initialization of S3Client creates a new `S3EndpointProvider`
// every time. Its construction takes 1ms, which makes instantiating every S3Client
// very costly (see upstream bug report
// at https://github.com/aws/aws-sdk-cpp/issues/2880).
// To work around this, we build and cache `S3EndpointProvider` instances
// for each distinct endpoint configuration, and reuse them whenever possible.
// Since most applications tend to use a single endpoint configuration, this
// makes the 1ms setup cost a once-per-process overhead, making it much more
// bearable - if not ideal.

struct EndpointConfigKey {
explicit EndpointConfigKey(const Aws::S3::S3ClientConfiguration& config)
: region(config.region),
Expand All @@ -938,6 +957,7 @@ struct EndpointConfigKey {
template <>
struct std::hash<arrow::fs::EndpointConfigKey> {
std::size_t operator()(const arrow::fs::EndpointConfigKey& key) const noexcept {
// A crude hash is sufficient since we expect the cache to remain very small.
auto h = std::hash<Aws::String>{};
return h(key.region) ^ h(key.endpoint_override);
}
Expand All @@ -946,15 +966,17 @@ struct std::hash<arrow::fs::EndpointConfigKey> {
namespace arrow::fs {
namespace {

// EndpointProvider configuration happens in a non-thread-safe way, even
// when the updates are idempotent. This is a problem when trying to reuse
// a single EndpointProvider from several clients.
// To work around this, this class ensures reconfiguration of an existing
// EndpointProvider is a no-op.
class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase {
public:
explicit InitOnceEndpointProvider(
std::shared_ptr<Aws::S3::S3EndpointProviderBase> wrapped)
: wrapped_(std::move(wrapped)) {}

// EndpointProvider configuration happens in a non-thread-safe way, even
// when the updates are idempotent. To work around this, this class ensures
// reconfiguration of an existing EndpointProvider is a no-op.
void InitBuiltInParameters(const Aws::S3::S3ClientConfiguration& config) override {}

void OverrideEndpoint(const Aws::String& endpoint) override {
Expand All @@ -980,6 +1002,9 @@ class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase {
std::shared_ptr<Aws::S3::S3EndpointProviderBase> wrapped_;
};

// A class that instantiates a single EndpointProvider per distinct endpoint
// configuration and initializes it in a thread-safe way. See earlier comments
// for rationale.
class EndpointProviderBuilder {
public:
std::shared_ptr<Aws::S3::S3EndpointProviderBase> Lookup(
Expand Down Expand Up @@ -1021,6 +1046,8 @@ class EndpointProviderBuilder {
std::unordered_map<EndpointConfigKey, CacheValue> cache_;
};

#endif // ARROW_S3_HAS_S3CLIENT_CONFIGURATION

class ClientBuilder {
public:
explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
Expand All @@ -1045,8 +1072,6 @@ class ClientBuilder {
static_cast<long>(ceil(options_.connect_timeout * 1000)); // NOLINT runtime/int
}

client_config_.useVirtualAddressing =
options_.endpoint_override.empty() || options_.force_virtual_addressing;
client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
if (options_.scheme == "http") {
client_config_.scheme = Aws::Http::Scheme::HTTP;
Expand Down Expand Up @@ -1097,10 +1122,20 @@ class ClientBuilder {
client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25);
}

const bool use_virtual_addressing =
options_.endpoint_override.empty() || options_.force_virtual_addressing;

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
client_config_.useVirtualAddressing = use_virtual_addressing;
auto endpoint_provider = EndpointProviderBuilder::Instance()->Lookup(client_config_);
auto client = std::make_shared<S3Client>(credentials_provider_, endpoint_provider,
client_config_);

#else
auto client = std::make_shared<S3Client>(
credentials_provider_, client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing);
#endif
client->s3_retry_strategy_ = options_.retry_strategy;
return GetClientHolder(std::move(client));
}
Expand All @@ -1109,7 +1144,11 @@ class ClientBuilder {

protected:
S3Options options_;
#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
Aws::S3::S3ClientConfiguration client_config_;
#else
Aws::Client::ClientConfiguration client_config_;
#endif
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
};

Expand Down Expand Up @@ -3056,7 +3095,9 @@ struct AwsInstance {
"This could lead to a segmentation fault at exit";
}
GetClientFinalizer()->Finalize();
#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
EndpointProviderBuilder::Instance()->Reset();
#endif
Aws::ShutdownAPI(aws_options_);
}
}
Expand Down

0 comments on commit d51037a

Please sign in to comment.