Skip to content

Commit

Permalink
YQ-3439 added retries for CURLE_COULDNT_RESOLVE_HOST (ydb-platform#6732)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 16, 2024
1 parent fe5e16f commit e4a6ceb
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 21 deletions.
3 changes: 2 additions & 1 deletion ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ struct TObjectStorageExternalSource : public IExternalSource {
}

auto httpGateway = NYql::IHTTPGateway::Make();
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, NYql::NS3Lister::TListingRequest{
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Pattern = meta->TableLocation,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,7 @@ class TKqpHost : public IKqpHost {
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
state->Gateway = FederatedQuerySetup->HttpGateway;
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
state->ExecutorPoolId = AppData()->UserPoolId;

auto dataSource = NYql::CreateS3DataSource(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ std::unordered_set<CURLcode> FqRetriedCurlCodes() {
CURLE_SEND_ERROR,
CURLE_RECV_ERROR,
CURLE_NO_CONNECTION_AVAILABLE,
CURLE_GOT_NOTHING
CURLE_GOT_NOTHING,
CURLE_COULDNT_RESOLVE_HOST
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
FileQueueBatchSizeLimit,
FileQueueBatchObjectCountLimit,
Gateway,
RetryPolicy,
Url,
AuthInfo,
Pattern,
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
FileQueueBatchSizeLimit,
FileQueueBatchObjectCountLimit,
Gateway,
RetryPolicy,
Url,
AuthInfo,
Pattern,
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ NActors::IActor* CreateS3FileQueueActor(
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand All @@ -186,6 +187,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, BatchSizeLimit(batchSizeLimit)
, BatchObjectCountLimit(batchObjectCountLimit)
, Gateway(std::move(gateway))
, RetryPolicy(std::move(retryPolicy))
, Url(std::move(url))
, AuthInfo(std::move(authInfo))
, Pattern(std::move(pattern))
Expand Down Expand Up @@ -488,6 +490,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
CurrentDirectoryPathIndex = object.GetPathIndex();
MaybeLister = NS3Lister::MakeS3Lister(
Gateway,
RetryPolicy,
NS3Lister::TListingRequest{
Url,
AuthInfo,
Expand Down Expand Up @@ -611,6 +614,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
THashSet<NActors::TActorId> UpdatedConsumers;

const IHTTPGateway::TPtr Gateway;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TString Pattern;
Expand All @@ -632,6 +636,7 @@ NActors::IActor* CreateS3FileQueueActor(
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand All @@ -648,6 +653,7 @@ NActors::IActor* CreateS3FileQueueActor(
batchSizeLimit,
batchObjectCountLimit,
gateway,
retryPolicy,
url,
authInfo,
pattern,
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ NActors::IActor* CreateS3FileQueueActor(
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand Down
13 changes: 8 additions & 5 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ class TS3Lister : public IS3Lister {

TS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
size_t maxFilesPerQuery,
Expand All @@ -265,7 +266,7 @@ class TS3Lister : public IS3Lister {
NewPromise<TMaybe<TListingContext>>(),
std::make_shared<TListEntries>(),
IHTTPGateway::TWeakPtr(httpGateway),
GetHTTPDefaultRetryPolicy(),
retryPolicy,
CreateGuidAsString(),
std::move(request),
delimiter,
Expand Down Expand Up @@ -391,7 +392,7 @@ class TS3Lister : public IS3Lister {
NewPromise<TMaybe<TListingContext>>(),
std::make_shared<TListEntries>(),
ctx.GatewayWeak,
GetHTTPDefaultRetryPolicy(),
ctx.RetryPolicy,
CreateGuidAsString(),
ctx.ListingRequest,
ctx.Delimiter,
Expand Down Expand Up @@ -457,15 +458,16 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {

TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const NS3Lister::TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles) override {
auto acquired = Semaphore->AcquireAsync();
return acquired.Apply(
[ctx = SharedCtx, httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) {
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
NS3Lister::MakeS3Lister(
httpGateway, listingRequest, delimiter, allowLocalFiles, ctx),
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
std::make_unique<TAsyncSemaphore::TAutoRelease>(
f.GetValue()->MakeAutoRelease())});
});
Expand Down Expand Up @@ -507,13 +509,14 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {

IS3Lister::TPtr MakeS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
TSharedListingContextPtr sharedCtx) {
if (listingRequest.Url.substr(0, 7) != "file://") {
return std::make_shared<TS3Lister>(
httpGateway, listingRequest, delimiter, 1000, std::move(sharedCtx));
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
}

if (!allowLocalFiles) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class IS3Lister : public TIterator<NThreading::TFuture<TListResult>> {

IS3Lister::TPtr MakeS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
Expand All @@ -176,6 +177,7 @@ class IS3ListerFactory {

virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const NS3Lister::TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
fileQueueBatchSizeLimit,
fileQueueBatchObjectCountLimit,
State_->Gateway,
State_->GatewayRetryPolicy,
connect.Url,
GetAuthInfo(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)),
pathPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
State_->Configuration->RegexpCacheSize))
, ListingStrategy_(MakeS3ListingStrategy(
State_->Gateway,
State_->GatewayRetryPolicy,
ListerFactory_,
State_->Configuration->MinDesiredDirectoriesOfFilesPerQuery,
State_->Configuration->MaxInflightListsPerQuery,
Expand Down
39 changes: 25 additions & 14 deletions ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,15 @@ class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy {
TFlatFileS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[allowLocalFiles, httpGateway, listerFactory](
[allowLocalFiles, httpGateway, retryPolicy, listerFactory](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, Nothing(), allowLocalFiles);
httpGateway, retryPolicy, listingRequest, Nothing(), allowLocalFiles);
},
"TFlatFileS3ListingStrategy") { }
};
Expand All @@ -151,14 +152,15 @@ class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy {
TDirectoryS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[allowLocalFiles, httpGateway, listerFactory](
[allowLocalFiles, httpGateway, retryPolicy, listerFactory](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, "/", allowLocalFiles);
httpGateway, retryPolicy, listingRequest, "/", allowLocalFiles);
},
"TDirectoryS3ListingStrategy") { }
};
Expand Down Expand Up @@ -402,9 +404,10 @@ class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy
TPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, allowLocalFiles](
[listerFactory, httpGateway, retryPolicy, allowLocalFiles](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand All @@ -413,7 +416,7 @@ class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy
listingRequest.Prefix,
options,
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles}});
listerFactory, httpGateway, retryPolicy, allowLocalFiles}});
return MakeFuture(std::move(ptr));
},
"TPartitionedDatasetS3ListingStrategy") { }
Expand Down Expand Up @@ -557,10 +560,11 @@ class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrate
TUnPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t minParallelism,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, minParallelism, allowLocalFiles](
[listerFactory, httpGateway, retryPolicy, minParallelism, allowLocalFiles](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand All @@ -579,7 +583,7 @@ class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrate
: listingRequest.Pattern.substr(
0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles},
listerFactory, httpGateway, retryPolicy, allowLocalFiles},
minParallelism,
options.MaxResultSet});
return MakeFuture(std::move(ptr));
Expand Down Expand Up @@ -893,11 +897,12 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy :
TConcurrentUnPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t minParallelism,
size_t maxParallelOps,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, minParallelism, allowLocalFiles, maxParallelOps](
[listerFactory, httpGateway, retryPolicy, minParallelism, allowLocalFiles, maxParallelOps](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand Down Expand Up @@ -929,7 +934,7 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy :
: listingRequest.Pattern.substr(
0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles},
listerFactory, httpGateway, retryPolicy, allowLocalFiles},
options.MaxResultSet,
maxParallelOps});
return MakeFuture(std::move(ptr));
Expand All @@ -943,10 +948,11 @@ class TConcurrentPartitionedDatasetS3ListingStrategy :
TConcurrentPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t maxParallelOps,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, allowLocalFiles, maxParallelOps](
[listerFactory, httpGateway, retryPolicy, allowLocalFiles, maxParallelOps](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand Down Expand Up @@ -974,12 +980,12 @@ class TConcurrentPartitionedDatasetS3ListingStrategy :
: listingRequest.Pattern.substr(
0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles},
listerFactory, httpGateway, retryPolicy, allowLocalFiles},
options.MaxResultSet,
maxParallelOps});
return MakeFuture(std::move(ptr));
},
"TConcurrentUnPartitionedDatasetS3ListingStrategy") { }
"TConcurrentPartitionedDatasetS3ListingStrategy") { }
};


Expand Down Expand Up @@ -1024,6 +1030,7 @@ class TLoggingS3ListingStrategy : public IS3ListingStrategy {

IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const IS3ListerFactory::TPtr& listerFactory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
size_t maxParallelOps,
Expand All @@ -1032,7 +1039,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TCompositeS3ListingStrategy>(
std::vector<std::shared_ptr<IS3ListingStrategy>>{
std::make_shared<TFlatFileS3ListingStrategy>(
listerFactory, httpGateway, allowLocalFiles),
listerFactory, httpGateway, retryPolicy, allowLocalFiles),
std::make_shared<TConditionalS3ListingStrategy>(
std::initializer_list<TConditionalS3ListingStrategy::TPair>{
{[](const TS3ListingOptions& options) {
Expand All @@ -1042,6 +1049,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
allowLocalFiles)},
{[](const TS3ListingOptions& options) {
return options.IsPartitionedDataset &&
Expand All @@ -1050,6 +1058,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TConcurrentPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
maxParallelOps,
allowLocalFiles)},
{[](const TS3ListingOptions& options) {
Expand All @@ -1059,6 +1068,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TUnPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
minDesiredDirectoriesOfFilesPerQuery,
allowLocalFiles)},
{[](const TS3ListingOptions& options) {
Expand All @@ -1068,6 +1078,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TConcurrentUnPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
minDesiredDirectoriesOfFilesPerQuery,
maxParallelOps,
allowLocalFiles)}})}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class IS3ListingStrategy {

IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
size_t maxParallelOps,
Expand Down
Loading

0 comments on commit e4a6ceb

Please sign in to comment.