Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3439 added retries for CURLE_COULDNT_RESOLVE_HOST #6732

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ struct TObjectStorageExternalSource : public IExternalSource {
}

auto httpGateway = NYql::IHTTPGateway::Make();
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, NYql::NS3Lister::TListingRequest{
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Pattern = meta->TableLocation,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,7 @@ class TKqpHost : public IKqpHost {
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
state->Gateway = FederatedQuerySetup->HttpGateway;
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
state->ExecutorPoolId = AppData()->UserPoolId;

auto dataSource = NYql::CreateS3DataSource(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ std::unordered_set<CURLcode> FqRetriedCurlCodes() {
CURLE_SEND_ERROR,
CURLE_RECV_ERROR,
CURLE_NO_CONNECTION_AVAILABLE,
CURLE_GOT_NOTHING
CURLE_GOT_NOTHING,
CURLE_COULDNT_RESOLVE_HOST
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
FileQueueBatchSizeLimit,
FileQueueBatchObjectCountLimit,
Gateway,
RetryPolicy,
Url,
AuthInfo,
Pattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
FileQueueBatchSizeLimit,
FileQueueBatchObjectCountLimit,
Gateway,
RetryPolicy,
Url,
AuthInfo,
Pattern,
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ NActors::IActor* CreateS3FileQueueActor(
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand All @@ -186,6 +187,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, BatchSizeLimit(batchSizeLimit)
, BatchObjectCountLimit(batchObjectCountLimit)
, Gateway(std::move(gateway))
, RetryPolicy(std::move(retryPolicy))
, Url(std::move(url))
, AuthInfo(std::move(authInfo))
, Pattern(std::move(pattern))
Expand Down Expand Up @@ -488,6 +490,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
CurrentDirectoryPathIndex = object.GetPathIndex();
MaybeLister = NS3Lister::MakeS3Lister(
Gateway,
RetryPolicy,
NS3Lister::TListingRequest{
Url,
AuthInfo,
Expand Down Expand Up @@ -611,6 +614,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
THashSet<NActors::TActorId> UpdatedConsumers;

const IHTTPGateway::TPtr Gateway;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TString Pattern;
Expand All @@ -632,6 +636,7 @@ NActors::IActor* CreateS3FileQueueActor(
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand All @@ -648,6 +653,7 @@ NActors::IActor* CreateS3FileQueueActor(
batchSizeLimit,
batchObjectCountLimit,
gateway,
retryPolicy,
url,
authInfo,
pattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ NActors::IActor* CreateS3FileQueueActor(
ui64 batchSizeLimit,
ui64 batchObjectCountLimit,
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TString pattern,
Expand Down
13 changes: 8 additions & 5 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ class TS3Lister : public IS3Lister {

TS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
size_t maxFilesPerQuery,
Expand All @@ -265,7 +266,7 @@ class TS3Lister : public IS3Lister {
NewPromise<TMaybe<TListingContext>>(),
std::make_shared<TListEntries>(),
IHTTPGateway::TWeakPtr(httpGateway),
GetHTTPDefaultRetryPolicy(),
retryPolicy,
CreateGuidAsString(),
std::move(request),
delimiter,
Expand Down Expand Up @@ -391,7 +392,7 @@ class TS3Lister : public IS3Lister {
NewPromise<TMaybe<TListingContext>>(),
std::make_shared<TListEntries>(),
ctx.GatewayWeak,
GetHTTPDefaultRetryPolicy(),
ctx.RetryPolicy,
CreateGuidAsString(),
ctx.ListingRequest,
ctx.Delimiter,
Expand Down Expand Up @@ -457,15 +458,16 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {

TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const NS3Lister::TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles) override {
auto acquired = Semaphore->AcquireAsync();
return acquired.Apply(
[ctx = SharedCtx, httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) {
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
NS3Lister::MakeS3Lister(
httpGateway, listingRequest, delimiter, allowLocalFiles, ctx),
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
std::make_unique<TAsyncSemaphore::TAutoRelease>(
f.GetValue()->MakeAutoRelease())});
});
Expand Down Expand Up @@ -507,13 +509,14 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {

IS3Lister::TPtr MakeS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
TSharedListingContextPtr sharedCtx) {
if (listingRequest.Url.substr(0, 7) != "file://") {
return std::make_shared<TS3Lister>(
httpGateway, listingRequest, delimiter, 1000, std::move(sharedCtx));
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
}

if (!allowLocalFiles) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class IS3Lister : public TIterator<NThreading::TFuture<TListResult>> {

IS3Lister::TPtr MakeS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
Expand All @@ -176,6 +177,7 @@ class IS3ListerFactory {

virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const NS3Lister::TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
fileQueueBatchSizeLimit,
fileQueueBatchObjectCountLimit,
State_->Gateway,
State_->GatewayRetryPolicy,
connect.Url,
GetAuthInfo(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)),
pathPattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
State_->Configuration->RegexpCacheSize))
, ListingStrategy_(MakeS3ListingStrategy(
State_->Gateway,
State_->GatewayRetryPolicy,
ListerFactory_,
State_->Configuration->MinDesiredDirectoriesOfFilesPerQuery,
State_->Configuration->MaxInflightListsPerQuery,
Expand Down
39 changes: 25 additions & 14 deletions ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,15 @@ class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy {
TFlatFileS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[allowLocalFiles, httpGateway, listerFactory](
[allowLocalFiles, httpGateway, retryPolicy, listerFactory](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, Nothing(), allowLocalFiles);
httpGateway, retryPolicy, listingRequest, Nothing(), allowLocalFiles);
},
"TFlatFileS3ListingStrategy") { }
};
Expand All @@ -151,14 +152,15 @@ class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy {
TDirectoryS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[allowLocalFiles, httpGateway, listerFactory](
[allowLocalFiles, httpGateway, retryPolicy, listerFactory](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, "/", allowLocalFiles);
httpGateway, retryPolicy, listingRequest, "/", allowLocalFiles);
},
"TDirectoryS3ListingStrategy") { }
};
Expand Down Expand Up @@ -402,9 +404,10 @@ class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy
TPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, allowLocalFiles](
[listerFactory, httpGateway, retryPolicy, allowLocalFiles](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand All @@ -413,7 +416,7 @@ class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy
listingRequest.Prefix,
options,
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles}});
listerFactory, httpGateway, retryPolicy, allowLocalFiles}});
return MakeFuture(std::move(ptr));
},
"TPartitionedDatasetS3ListingStrategy") { }
Expand Down Expand Up @@ -557,10 +560,11 @@ class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrate
TUnPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t minParallelism,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, minParallelism, allowLocalFiles](
[listerFactory, httpGateway, retryPolicy, minParallelism, allowLocalFiles](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand All @@ -579,7 +583,7 @@ class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrate
: listingRequest.Pattern.substr(
0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles},
listerFactory, httpGateway, retryPolicy, allowLocalFiles},
minParallelism,
options.MaxResultSet});
return MakeFuture(std::move(ptr));
Expand Down Expand Up @@ -893,11 +897,12 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy :
TConcurrentUnPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t minParallelism,
size_t maxParallelOps,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, minParallelism, allowLocalFiles, maxParallelOps](
[listerFactory, httpGateway, retryPolicy, minParallelism, allowLocalFiles, maxParallelOps](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand Down Expand Up @@ -929,7 +934,7 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy :
: listingRequest.Pattern.substr(
0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles},
listerFactory, httpGateway, retryPolicy, allowLocalFiles},
options.MaxResultSet,
maxParallelOps});
return MakeFuture(std::move(ptr));
Expand All @@ -943,10 +948,11 @@ class TConcurrentPartitionedDatasetS3ListingStrategy :
TConcurrentPartitionedDatasetS3ListingStrategy(
const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t maxParallelOps,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, allowLocalFiles, maxParallelOps](
[listerFactory, httpGateway, retryPolicy, allowLocalFiles, maxParallelOps](
const TListingRequest& listingRequest,
TS3ListingOptions options) {
auto ptr = std::shared_ptr<IS3Lister>(
Expand Down Expand Up @@ -974,12 +980,12 @@ class TConcurrentPartitionedDatasetS3ListingStrategy :
: listingRequest.Pattern.substr(
0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, allowLocalFiles},
listerFactory, httpGateway, retryPolicy, allowLocalFiles},
options.MaxResultSet,
maxParallelOps});
return MakeFuture(std::move(ptr));
},
"TConcurrentUnPartitionedDatasetS3ListingStrategy") { }
"TConcurrentPartitionedDatasetS3ListingStrategy") { }
};


Expand Down Expand Up @@ -1024,6 +1030,7 @@ class TLoggingS3ListingStrategy : public IS3ListingStrategy {

IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const IS3ListerFactory::TPtr& listerFactory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
size_t maxParallelOps,
Expand All @@ -1032,7 +1039,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TCompositeS3ListingStrategy>(
std::vector<std::shared_ptr<IS3ListingStrategy>>{
std::make_shared<TFlatFileS3ListingStrategy>(
listerFactory, httpGateway, allowLocalFiles),
listerFactory, httpGateway, retryPolicy, allowLocalFiles),
std::make_shared<TConditionalS3ListingStrategy>(
std::initializer_list<TConditionalS3ListingStrategy::TPair>{
{[](const TS3ListingOptions& options) {
Expand All @@ -1042,6 +1049,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
allowLocalFiles)},
{[](const TS3ListingOptions& options) {
return options.IsPartitionedDataset &&
Expand All @@ -1050,6 +1058,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TConcurrentPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
maxParallelOps,
allowLocalFiles)},
{[](const TS3ListingOptions& options) {
Expand All @@ -1059,6 +1068,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TUnPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
minDesiredDirectoriesOfFilesPerQuery,
allowLocalFiles)},
{[](const TS3ListingOptions& options) {
Expand All @@ -1068,6 +1078,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy(
std::make_shared<TConcurrentUnPartitionedDatasetS3ListingStrategy>(
listerFactory,
httpGateway,
retryPolicy,
minDesiredDirectoriesOfFilesPerQuery,
maxParallelOps,
allowLocalFiles)}})}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class IS3ListingStrategy {

IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
size_t maxParallelOps,
Expand Down
Loading
Loading