Skip to content

Commit

Permalink
listing fix (ydb-platform#7643)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored and stanislav-shchetinin committed Aug 30, 2024
1 parent 7b765a8 commit 96d12aa
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 29 deletions.
3 changes: 1 addition & 2 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ struct TObjectStorageExternalSource : public IExternalSource {
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
auto format = meta->Attributes.FindPtr("format");
if (!format || !meta->Attributes.contains("withinfer")) {
return NThreading::MakeFuture(std::move(meta));
Expand Down Expand Up @@ -335,7 +334,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
.Url = meta->DataSourceLocation,
.Credentials = credentials,
.Pattern = effectiveFilePattern,
}, Nothing(), AllowLocalFiles);
}, Nothing(), AllowLocalFiles, ActorSystem);
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
}

{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ class TKqpHost : public IKqpHost {
state->Gateway = FederatedQuerySetup->HttpGateway;
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
state->ExecutorPoolId = AppData()->UserPoolId;
state->ActorSystem = ActorSystem;

auto dataSource = NYql::CreateS3DataSource(state);
auto dataSink = NYql::CreateS3DataSink(state);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
PatternType,
object.GetPath()},
Nothing(),
AllowLocalFiles);
AllowLocalFiles,
NActors::TActivationContext::ActorSystem());
Fetch();
return true;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/object_listers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ PEERDIR(
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/s3/credentials
ydb/library/yql/utils
ydb/library/yql/utils/actor_log
ydb/library/yql/utils/threading
)

Expand Down
54 changes: 37 additions & 17 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/url_builder.h>
#include <ydb/library/yql/utils/yql_panic.h>
Expand All @@ -25,7 +26,7 @@
namespace NYql::NS3Lister {

IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) {
return stream << "TListingRequest{.url=" << request.Url
return stream << "[TS3Lister] TListingRequest{.url=" << request.Url
<< ",.Prefix=" << request.Prefix
<< ",.Pattern=" << request.Pattern
<< ",.PatternType=" << request.PatternType
Expand All @@ -51,7 +52,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,

const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(DEBUG, ProviderS3)
<< "Got regex: '" << regex << "' with " << numGroups << " capture groups ";
<< "[TS3Lister] Got regex: '" << regex << "' with " << numGroups << " capture groups ";

auto groups = std::make_shared<std::vector<std::string>>(numGroups);
auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
Expand Down Expand Up @@ -101,7 +102,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& patt
}

const auto regex = NS3::RegexFromWildcards(pattern);
YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '"
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Got prefix: '" << regexPatternPrefix << "', regex: '"
<< regex << "' from original pattern '" << pattern << "'";

return MakeFilterRegexp(regex, sharedCtx);
Expand Down Expand Up @@ -238,6 +239,8 @@ class TS3Lister : public IS3Lister {
const TMaybe<TString> Delimiter;
const TMaybe<TString> ContinuationToken;
const ui64 MaxKeys;
const std::pair<TString, TString> CurrentLogContextPath;
const NActors::TActorSystem* ActorSystem;
};

TS3Lister(
Expand All @@ -246,7 +249,8 @@ class TS3Lister : public IS3Lister {
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
size_t maxFilesPerQuery,
TSharedListingContextPtr sharedCtx)
TSharedListingContextPtr sharedCtx,
NActors::TActorSystem* actorSystem)
: MaxFilesPerQuery(maxFilesPerQuery) {
Y_ENSURE(
listingRequest.Url.substr(0, 7) != "file://",
Expand All @@ -270,7 +274,9 @@ class TS3Lister : public IS3Lister {
std::move(request),
delimiter,
Nothing(),
MaxFilesPerQuery};
MaxFilesPerQuery,
NLog::CurrentLogContextPath(),
actorSystem};

YQL_CLOG(TRACE, ProviderS3)
<< "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
Expand Down Expand Up @@ -335,9 +341,19 @@ class TS3Lister : public IS3Lister {
/*data=*/"",
retryPolicy);
}

static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
OnDiscovery(c, std::move(result));
if (c.ActorSystem) {
NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
OnDiscovery(c, std::move(result));
} else {
/*
If the subsystem doesn't use the actor system
then there is a need to use an own YqlLoggerScope on the top level
*/
OnDiscovery(c, std::move(result));
}
};
}

Expand All @@ -351,7 +367,7 @@ class TS3Lister : public IS3Lister {
const NXml::TDocument xml(xmlString, NXml::TDocument::String);
auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId);
YQL_CLOG(DEBUG, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size()
<< " entries, got another " << parsedResponse.KeyCount
<< " entries, request id: [" << ctx.RequestId << "]";
Expand Down Expand Up @@ -380,7 +396,7 @@ class TS3Lister : public IS3Lister {
}

if (parsedResponse.IsTruncated && !earlyStop) {
YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix
<< ": got truncated flag, will continue";

Expand Down Expand Up @@ -409,14 +425,14 @@ class TS3Lister : public IS3Lister {
TStringBuilder{} << "request id: [" << ctx.RequestId << "]",
std::move(result.Issues));
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< ": got error from http gateway: " << issues.ToString(true);
ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)});
ctx.NextRequestPromise.SetValue(Nothing());
}
} catch (const std::exception& ex) {
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< " : got exception: " << ex.what();
ctx.Promise.SetException(std::current_exception());
ctx.NextRequestPromise.SetValue(Nothing());
Expand Down Expand Up @@ -452,9 +468,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;

explicit TS3ParallelLimitedListerFactory(
size_t maxParallelOps, TSharedListingContextPtr sharedCtx)
size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem)
: SharedCtx(std::move(sharedCtx))
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps)))
, ActorSystem(actorSystem) { }

TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
Expand All @@ -464,10 +481,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
bool allowLocalFiles) override {
auto acquired = Semaphore->AcquireAsync();
return acquired.Apply(
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) {
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
NS3Lister::MakeS3Lister(
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx),
std::make_unique<TAsyncSemaphore::TAutoRelease>(
f.GetValue()->MakeAutoRelease())});
});
Expand Down Expand Up @@ -503,6 +520,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
private:
TSharedListingContextPtr SharedCtx;
const TAsyncSemaphore::TPtr Semaphore;
NActors::TActorSystem* ActorSystem;
};

} // namespace
Expand All @@ -513,10 +531,11 @@ IS3Lister::TPtr MakeS3Lister(
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
NActors::TActorSystem* actorSystem,
TSharedListingContextPtr sharedCtx) {
if (listingRequest.Url.substr(0, 7) != "file://") {
return std::make_shared<TS3Lister>(
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem);
}

if (!allowLocalFiles) {
Expand All @@ -530,13 +549,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t maxParallelOps,
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize) {
size_t regexpCacheSize,
NActors::TActorSystem* actorSystem) {
std::shared_ptr<TSharedListingContext> sharedCtx = nullptr;
if (callbackThreadCount != 0 || regexpCacheSize != 0) {
sharedCtx = std::make_shared<TSharedListingContext>(
callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
}
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx);
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx, actorSystem);
}

} // namespace NYql::NS3Lister
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <library/cpp/cache/cache.h>
#include <library/cpp/threading/future/future.h>
#include <util/thread/pool.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>

Expand Down Expand Up @@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister(
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
NActors::TActorSystem* actorSystem,
TSharedListingContextPtr sharedCtx = nullptr);

class IS3ListerFactory {
Expand All @@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t maxParallelOps,
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize);
size_t regexpCacheSize,
NActors::TActorSystem* actorSystem);

} // namespace NS3Lister
} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
State_->Configuration->MaxInflightListsPerQuery,
State_->Configuration->ListingCallbackThreadCount,
State_->Configuration->ListingCallbackPerThreadQueueSize,
State_->Configuration->RegexpCacheSize))
State_->Configuration->RegexpCacheSize,
State_->ActorSystem))
, ListingStrategy_(MakeS3ListingStrategy(
State_->Gateway,
State_->GatewayRetryPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,14 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
});
return NextDirectoryListeningChunk;
}

static TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
result.Directories.push_back({.Path = sourcePrefix});
result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)});
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace NYql {

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) {
return [gateway, credentialsFactory, allowLocalFiles] (
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) {
return [gateway, credentialsFactory, allowLocalFiles, actorSystem] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
Expand All @@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
state->CredentialsFactory = credentialsFactory;
state->ActorSystem = actorSystem;
if (gatewaysConfig) {
state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/provider/yql_s3_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ struct TS3State : public TThrRefBase
IHTTPGateway::TRetryPolicy::TPtr GatewayRetryPolicy = GetHTTPDefaultRetryPolicy();
ui32 ExecutorPoolId = 0;
std::list<TVector<TString>> PrimaryKeys;
NActors::TActorSystem* ActorSystem = nullptr;
};

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false);
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr);

TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state);
TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ int RunMain(int argc, const char* argv[])
if (!httpGateway) {
httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
}
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true));
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, actorSystemManager->GetActorSystem()));
}

if (gatewaysConfig.HasPq()) {
Expand Down
Loading

0 comments on commit 96d12aa

Please sign in to comment.