From b3c39d1759527aa06212a6da1302a6207b8bd517 Mon Sep 17 00:00:00 2001 From: Sergey Uzhakov Date: Tue, 10 Sep 2024 18:54:32 +0300 Subject: [PATCH] listing fix (#7643) (#8280) --- ydb/core/external_sources/object_storage.cpp | 3 +- ydb/core/fq/libs/actors/run_actor.cpp | 2 +- ydb/core/kqp/host/kqp_host.cpp | 1 + .../s3/actors/yql_s3_source_queue.cpp | 3 +- .../yql/providers/s3/object_listers/ya.make | 1 + .../s3/object_listers/yql_s3_list.cpp | 58 +++++++++++++------ .../providers/s3/object_listers/yql_s3_list.h | 5 +- .../s3/provider/yql_s3_io_discovery.cpp | 3 +- .../s3/provider/yql_s3_listing_strategy.cpp | 8 ++- .../providers/s3/provider/yql_s3_provider.cpp | 5 +- .../providers/s3/provider/yql_s3_provider.h | 3 +- ydb/library/yql/tools/dqrun/dqrun.cpp | 2 +- ydb/tests/tools/fq_runner/kikimr_runner.py | 2 +- 13 files changed, 66 insertions(+), 30 deletions(-) diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 5b7e2584dedc..1b1487ddd078 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -290,7 +290,6 @@ struct TObjectStorageExternalSource : public IExternalSource { }; virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { - Y_UNUSED(ActorSystem); auto format = meta->Attributes.FindPtr("format"); if (!format || !meta->Attributes.contains("withinfer")) { return NThreading::MakeFuture(std::move(meta)); @@ -333,7 +332,7 @@ struct TObjectStorageExternalSource : public IExternalSource { .Url = meta->DataSourceLocation, .Credentials = credentials, .Pattern = effectiveFilePattern, - }, Nothing(), false); + }, Nothing(), false, ActorSystem); auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture& listResFut) { auto& listRes = listResFut.GetValue(); if (std::holds_alternative(listRes)) { diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index f52beadd6735..18e731e87d88 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1940,7 +1940,7 @@ class TRunActor : public NActors::TActorBootstrapped { { dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, - Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig + Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig } { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index fb6d02c83a80..367043080836 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1699,6 +1699,7 @@ class TKqpHost : public IKqpHost { state->Gateway = FederatedQuerySetup->HttpGateway; state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); state->ExecutorPoolId = AppData()->UserPoolId; + state->ActorSystem = ActorSystem; auto dataSource = NYql::CreateS3DataSource(state); auto dataSink = NYql::CreateS3DataSink(state); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp index 060afbb4aea5..0651ffebce52 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp @@ -500,7 +500,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped PatternType, object.GetPath()}, Nothing(), - false); + false, + NActors::TActivationContext::ActorSystem()); Fetch(); return true; } diff --git a/ydb/library/yql/providers/s3/object_listers/ya.make b/ydb/library/yql/providers/s3/object_listers/ya.make index 2d284a9b4d9a..62849a03a7de 100644 --- a/ydb/library/yql/providers/s3/object_listers/ya.make +++ b/ydb/library/yql/providers/s3/object_listers/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/library/yql/providers/common/http_gateway ydb/library/yql/providers/s3/credentials ydb/library/yql/utils + ydb/library/yql/utils/actor_log ydb/library/yql/utils/threading ) diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index f62c12ea1d4c..9d915d0a4529 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -25,7 +26,7 @@ namespace NYql::NS3Lister { IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) { - return stream << "TListingRequest{.url=" << request.Url + return stream << "[TS3Lister] TListingRequest{.url=" << request.Url << ",.Prefix=" << request.Prefix << ",.Pattern=" << request.Pattern << ",.PatternType=" << request.PatternType @@ -51,7 +52,7 @@ std::pair MakeFilterRegexp(const TString& regex, const size_t numGroups = re->NumberOfCapturingGroups(); YQL_CLOG(DEBUG, ProviderS3) - << "Got regex: '" << regex << "' with " << numGroups << " capture groups "; + << "[TS3Lister] Got regex: '" << regex << "' with " << numGroups << " capture groups "; auto groups = std::make_shared>(numGroups); auto reArgs = std::make_shared>(numGroups); @@ -101,7 +102,7 @@ std::pair MakeFilterWildcard(const TString& patt } const auto regex = NS3::RegexFromWildcards(pattern); - YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '" + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Got prefix: '" << regexPatternPrefix << "', regex: '" << regex << "' from original pattern '" << pattern << "'"; return MakeFilterRegexp(regex, sharedCtx); @@ -238,6 +239,8 @@ class TS3Lister : public IS3Lister { const TMaybe Delimiter; const TMaybe ContinuationToken; const ui64 MaxKeys; + const std::pair CurrentLogContextPath; + const NActors::TActorSystem* ActorSystem; }; TS3Lister( @@ -246,7 +249,8 @@ class TS3Lister : public IS3Lister { const TListingRequest& listingRequest, const TMaybe& delimiter, size_t maxFilesPerQuery, - TSharedListingContextPtr sharedCtx) + TSharedListingContextPtr sharedCtx, + NActors::TActorSystem* actorSystem) : MaxFilesPerQuery(maxFilesPerQuery) { Y_ENSURE( listingRequest.Url.substr(0, 7) != "file://", @@ -270,7 +274,9 @@ class TS3Lister : public IS3Lister { std::move(request), delimiter, Nothing(), - MaxFilesPerQuery}; + MaxFilesPerQuery, + NLog::CurrentLogContextPath(), + actorSystem}; YQL_CLOG(TRACE, ProviderS3) << "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url @@ -335,9 +341,19 @@ class TS3Lister : public IS3Lister { /*data=*/"", retryPolicy); } + static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) { return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) { - OnDiscovery(c, std::move(result)); + if (c.ActorSystem) { + NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second); + OnDiscovery(c, std::move(result)); + } else { + /* + If the subsystem doesn't use the actor system + then there is a need to use an own YqlLoggerScope on the top level + */ + OnDiscovery(c, std::move(result)); + } }; } @@ -351,7 +367,7 @@ class TS3Lister : public IS3Lister { const NXml::TDocument xml(xmlString, NXml::TDocument::String); auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId); YQL_CLOG(DEBUG, ProviderS3) - << "Listing of " << ctx.ListingRequest.Url + << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size() << " entries, got another " << parsedResponse.KeyCount << " entries, request id: [" << ctx.RequestId << "]"; @@ -380,7 +396,7 @@ class TS3Lister : public IS3Lister { } if (parsedResponse.IsTruncated && !earlyStop) { - YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": got truncated flag, will continue"; @@ -397,7 +413,9 @@ class TS3Lister : public IS3Lister { ctx.ListingRequest, ctx.Delimiter, parsedResponse.ContinuationToken, - parsedResponse.MaxKeys}; + parsedResponse.MaxKeys, + ctx.CurrentLogContextPath, + ctx.ActorSystem}; ctx.NextRequestPromise.SetValue(TMaybe(newCtx)); } else { @@ -409,14 +427,14 @@ class TS3Lister : public IS3Lister { TStringBuilder{} << "request id: [" << ctx.RequestId << "]", std::move(result.Issues)); YQL_CLOG(INFO, ProviderS3) - << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix + << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": got error from http gateway: " << issues.ToString(true); ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)}); ctx.NextRequestPromise.SetValue(Nothing()); } } catch (const std::exception& ex) { YQL_CLOG(INFO, ProviderS3) - << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix + << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << " : got exception: " << ex.what(); ctx.Promise.SetException(std::current_exception()); ctx.NextRequestPromise.SetValue(Nothing()); @@ -452,9 +470,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { using TPtr = std::shared_ptr; explicit TS3ParallelLimitedListerFactory( - size_t maxParallelOps, TSharedListingContextPtr sharedCtx) + size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem) : SharedCtx(std::move(sharedCtx)) - , Semaphore(TAsyncSemaphore::Make(std::max(1, maxParallelOps))) { } + , Semaphore(TAsyncSemaphore::Make(std::max(1, maxParallelOps))) + , ActorSystem(actorSystem) { } TFuture Make( const IHTTPGateway::TPtr& httpGateway, @@ -464,10 +483,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { bool allowLocalFiles) override { auto acquired = Semaphore->AcquireAsync(); return acquired.Apply( - [ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) { + [ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) { return std::shared_ptr(new TListerLockReleaseWrapper{ NS3Lister::MakeS3Lister( - httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx), + httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx), std::make_unique( f.GetValue()->MakeAutoRelease())}); }); @@ -503,6 +522,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { private: TSharedListingContextPtr SharedCtx; const TAsyncSemaphore::TPtr Semaphore; + NActors::TActorSystem* ActorSystem; }; } // namespace @@ -513,10 +533,11 @@ IS3Lister::TPtr MakeS3Lister( const TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles, + NActors::TActorSystem* actorSystem, TSharedListingContextPtr sharedCtx) { if (listingRequest.Url.substr(0, 7) != "file://") { return std::make_shared( - httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx)); + httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem); } if (!allowLocalFiles) { @@ -530,13 +551,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory( size_t maxParallelOps, size_t callbackThreadCount, size_t callbackPerThreadQueueSize, - size_t regexpCacheSize) { + size_t regexpCacheSize, + NActors::TActorSystem* actorSystem) { std::shared_ptr sharedCtx = nullptr; if (callbackThreadCount != 0 || regexpCacheSize != 0) { sharedCtx = std::make_shared( callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize); } - return std::make_shared(maxParallelOps, sharedCtx); + return std::make_shared(maxParallelOps, sharedCtx, actorSystem); } } // namespace NYql::NS3Lister diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index 93fafae19057..3419ec3fd462 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister( const TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles, + NActors::TActorSystem* actorSystem, TSharedListingContextPtr sharedCtx = nullptr); class IS3ListerFactory { @@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory( size_t maxParallelOps, size_t callbackThreadCount, size_t callbackPerThreadQueueSize, - size_t regexpCacheSize); + size_t regexpCacheSize, + NActors::TActorSystem* actorSystem); } // namespace NS3Lister } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 436a116ecdf1..e78d47a00f2b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { State_->Configuration->MaxInflightListsPerQuery, State_->Configuration->ListingCallbackThreadCount, State_->Configuration->ListingCallbackPerThreadQueueSize, - State_->Configuration->RegexpCacheSize)) + State_->Configuration->RegexpCacheSize, + State_->ActorSystem)) , ListingStrategy_(MakeS3ListingStrategy( State_->Gateway, State_->GatewayRetryPolicy, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp index 843ba0bcf434..a7d52c408905 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -491,8 +491,14 @@ class TBFSDirectoryResolverIterator : public IS3Lister { }); return NextDirectoryListeningChunk; } + + static TString ParseBasePath(const TString& path) { + TString basePath = TString{TStringBuf{path}.RBefore('/')}; + return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath; + } + void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) { - result.Directories.push_back({.Path = sourcePrefix}); + result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)}); for (auto& directoryPrefix : DirectoryPrefixQueue) { result.Directories.push_back({.Path = directoryPrefix}); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp index 85707a21f16a..c283c53e8cab 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp @@ -4,8 +4,8 @@ namespace NYql { -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) { - return [gateway, credentialsFactory, allowLocalFiles] ( +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) { + return [gateway, credentialsFactory, allowLocalFiles, actorSystem] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway state->Types = typeCtx.Get(); state->FunctionRegistry = functionRegistry; state->CredentialsFactory = credentialsFactory; + state->ActorSystem = actorSystem; if (gatewaysConfig) { state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index c72f03c357a5..9fc24e090a3b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -35,7 +36,7 @@ struct TS3State : public TThrRefBase std::list> PrimaryKeys; }; -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false); +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr); TIntrusivePtr CreateS3DataSource(TS3State::TPtr state); TIntrusivePtr CreateS3DataSink(TS3State::TPtr state); diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index d9d5f081a647..6d0f621f6e6c 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -909,7 +909,7 @@ int RunMain(int argc, const char* argv[]) if (!httpGateway) { httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr); } - dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true)); + dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, actorSystemManager->GetActorSystem())); } if (gatewaysConfig.HasPq()) { diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index f1ed9a18d6d9..3161591f2c78 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -484,7 +484,7 @@ def fill_config(self, control_plane): self.config_generator.yaml_config['grpc_config']['skip_scheme_check'] = True self.config_generator.yaml_config['grpc_config']['services'] = ["local_discovery", "yq", "yq_private"] # yq services - fq_config['control_plane_storage']['task_lease_ttl'] = "10s" + fq_config['control_plane_storage']['task_lease_ttl'] = "20s" self.fill_storage_config(fq_config['control_plane_storage']['storage'], "DbPoolStorage_" + self.uuid) else: self.config_generator.yaml_config.pop('grpc_config', None)