diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index fd6d11cf99ec1..f765db0e55ded 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -56,8 +56,6 @@ #include #include #include -#include -#include #include #include #include @@ -101,12 +99,21 @@ #define ARROW_S3_HAS_CRT #endif +#if ARROW_AWS_SDK_VERSION_CHECK(1, 10, 0) +#define ARROW_S3_HAS_S3CLIENT_CONFIGURATION +#endif + #ifdef ARROW_S3_HAS_CRT #include #include #include #endif +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION +#include +#include +#endif + #include "arrow/util/windows_fixup.h" #include "arrow/buffer.h" @@ -913,6 +920,18 @@ Result> GetClientHolder( // ----------------------------------------------------------------------- // S3 client factory: build S3Client from S3Options +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + +// GH-40279: standard initialization of S3Client creates a new `S3EndpointProvider` +// every time. Its construction takes 1ms, which makes instantiating every S3Client +// very costly (see upstream bug report +// at https://github.com/aws/aws-sdk-cpp/issues/2880). +// To work around this, we build and cache `S3EndpointProvider` instances +// for each distinct endpoint configuration, and reuse them whenever possible. +// Since most applications tend to use a single endpoint configuration, this +// makes the 1ms setup cost a once-per-process overhead, making it much more +// bearable - if not ideal. + struct EndpointConfigKey { explicit EndpointConfigKey(const Aws::S3::S3ClientConfiguration& config) : region(config.region), @@ -938,6 +957,7 @@ struct EndpointConfigKey { template <> struct std::hash { std::size_t operator()(const arrow::fs::EndpointConfigKey& key) const noexcept { + // A crude hash is sufficient since we expect the cache to remain very small. auto h = std::hash{}; return h(key.region) ^ h(key.endpoint_override); } @@ -946,15 +966,17 @@ struct std::hash { namespace arrow::fs { namespace { +// EndpointProvider configuration happens in a non-thread-safe way, even +// when the updates are idempotent. This is a problem when trying to reuse +// a single EndpointProvider from several clients. +// To work around this, this class ensures reconfiguration of an existing +// EndpointProvider is a no-op. class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase { public: explicit InitOnceEndpointProvider( std::shared_ptr wrapped) : wrapped_(std::move(wrapped)) {} - // EndpointProvider configuration happens in a non-thread-safe way, even - // when the updates are idempotent. To work around this, this class ensures - // reconfiguration of an existing EndpointProvider is a no-op. void InitBuiltInParameters(const Aws::S3::S3ClientConfiguration& config) override {} void OverrideEndpoint(const Aws::String& endpoint) override { @@ -980,6 +1002,9 @@ class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase { std::shared_ptr wrapped_; }; +// A class that instantiates a single EndpointProvider per distinct endpoint +// configuration and initializes it in a thread-safe way. See earlier comments +// for rationale. class EndpointProviderBuilder { public: std::shared_ptr Lookup( @@ -1021,6 +1046,8 @@ class EndpointProviderBuilder { std::unordered_map cache_; }; +#endif // ARROW_S3_HAS_S3CLIENT_CONFIGURATION + class ClientBuilder { public: explicit ClientBuilder(S3Options options) : options_(std::move(options)) {} @@ -1045,8 +1072,6 @@ class ClientBuilder { static_cast(ceil(options_.connect_timeout * 1000)); // NOLINT runtime/int } - client_config_.useVirtualAddressing = - options_.endpoint_override.empty() || options_.force_virtual_addressing; client_config_.endpointOverride = ToAwsString(options_.endpoint_override); if (options_.scheme == "http") { client_config_.scheme = Aws::Http::Scheme::HTTP; @@ -1097,10 +1122,20 @@ class ClientBuilder { client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25); } + const bool use_virtual_addressing = + options_.endpoint_override.empty() || options_.force_virtual_addressing; + +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + client_config_.useVirtualAddressing = use_virtual_addressing; auto endpoint_provider = EndpointProviderBuilder::Instance()->Lookup(client_config_); auto client = std::make_shared(credentials_provider_, endpoint_provider, client_config_); - +#else + auto client = std::make_shared( + credentials_provider_, client_config_, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + use_virtual_addressing); +#endif client->s3_retry_strategy_ = options_.retry_strategy; return GetClientHolder(std::move(client)); } @@ -1109,7 +1144,11 @@ class ClientBuilder { protected: S3Options options_; +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION Aws::S3::S3ClientConfiguration client_config_; +#else + Aws::Client::ClientConfiguration client_config_; +#endif std::shared_ptr credentials_provider_; }; @@ -3056,7 +3095,9 @@ struct AwsInstance { "This could lead to a segmentation fault at exit"; } GetClientFinalizer()->Finalize(); +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION EndpointProviderBuilder::Instance()->Reset(); +#endif Aws::ShutdownAPI(aws_options_); } }