Skip to content

Commit

Permalink
YQL: Limit thread number for local calculations (ydb-platform#9950)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxkovalev authored Oct 2, 2024
1 parent d765169 commit 785c0c4
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 0 deletions.
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct TYtSettings {
NCommon::TConfSetting<ui32, false> InferSchemaTableCountThreshold;
NCommon::TConfSetting<NSize::TSize, false> DefaultCalcMemoryLimit;
NCommon::TConfSetting<ui32, false> ParallelOperationsLimit;
NCommon::TConfSetting<ui32, false> LocalCalcLimit;
NCommon::TConfSetting<EQueryCacheMode, false> QueryCacheMode;
NCommon::TConfSetting<bool, false> QueryCacheIgnoreTableRevision;
NCommon::TConfSetting<TString, false> QueryCacheSalt;
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4309,6 +4309,8 @@ class TYtNativeGateway : public IYtGateway {

if (transform.CanExecuteInternally() && !testRun) {
const auto nativeTypeCompat = execCtx->Options_.Config()->NativeYtTypeCompatibility.Get(execCtx->Cluster_).GetOrElse(NTCF_LEGACY);
execCtx->Session_->InitLocalCalcSemaphore(execCtx->Options_.Config());
TGuard<TFastSemaphore> guard(*execCtx->Session_->LocalCalcSemaphore_);
ExecSafeFill(outYPaths, root, execCtx->GetOutSpec(!useSkiff, nativeTypeCompat), execCtx, entry, builder, alloc, tmpFiles->TmpDir.GetPath() + '/');
return MakeFuture();
}
Expand Down Expand Up @@ -4906,6 +4908,8 @@ class TYtNativeGateway : public IYtGateway {
}

if (transform.CanExecuteInternally()) {
execCtx->Session_->InitLocalCalcSemaphore(execCtx->Options_.Config());
TGuard<TFastSemaphore> guard(*execCtx->Session_->LocalCalcSemaphore_);
TExploringNodeVisitor explorer;
auto localGraph = builder.BuildLocalGraph(GetGatewayNodeFactory(codecCtx.Get(), nullptr, execCtx->UserFiles_, pathPrefix),
execCtx->Options_.UdfValidateMode(), NUdf::EValidatePolicy::Exception,
Expand Down
10 changes: 10 additions & 0 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ TSession::TSession(IYtGateway::TOpenSessionOptions&& options, size_t numThreads)
, TimeProvider_(std::move(options.TimeProvider()))
, DeterministicMode_(GetEnv("YQL_DETERMINISTIC_MODE"))
, OperationSemaphore(nullptr)
, LocalCalcSemaphore_(nullptr)
, TxCache_(UserName_)
, SessionId_(options.SessionId_)
{
Expand Down Expand Up @@ -71,6 +72,15 @@ void TSession::EnsureInitializedSemaphore(const TYtSettings::TConstPtr& settings
}
}

void TSession::InitLocalCalcSemaphore(const TYtSettings::TConstPtr& settings) {
with_lock(Mutex_) {
if(!LocalCalcSemaphore_) {
const size_t localCalcLimit = settings->LocalCalcLimit.Get().GetOrElse(1U);
LocalCalcSemaphore_ = MakeHolder<TFastSemaphore>(localCalcLimit);
}
}
}

} // NNative

} // NYql
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct TSession: public TThrRefBase {
NYT::TNode CreateTableAttrs() const;

void EnsureInitializedSemaphore(const TYtSettings::TConstPtr& settings);
void InitLocalCalcSemaphore(const TYtSettings::TConstPtr& settings);

const TString UserName_;
const TOperationProgressWriter ProgressWriter_;
Expand All @@ -50,6 +51,7 @@ struct TSession: public TThrRefBase {
TOperationTracker::TPtr OpTracker_;
NThreading::TAsyncSemaphore::TPtr OperationSemaphore;
TMutex Mutex_;
THolder<TFastSemaphore> LocalCalcSemaphore_;

TTransactionCache TxCache_;
TString SessionId_;
Expand Down

0 comments on commit 785c0c4

Please sign in to comment.