diff --git a/ydb/core/fq/libs/gateway/empty_gateway.cpp b/ydb/core/fq/libs/gateway/empty_gateway.cpp index e0aff058bb86..c5642acf575d 100644 --- a/ydb/core/fq/libs/gateway/empty_gateway.cpp +++ b/ydb/core/fq/libs/gateway/empty_gateway.cpp @@ -35,11 +35,13 @@ class TEmptyGateway : public NYql::IDqGateway { const NYql::TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) override + bool discard, + ui64 executionTimeout) override { Y_UNUSED(progressWriter); Y_UNUSED(modulesMapping); // TODO: support. Y_UNUSED(discard); + Y_UNUSED(executionTimeout); NProto::TGraphParams params; THashMap stagePrograms; diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 411f68af5c3e..08687c84a4a9 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -44,7 +44,8 @@ class TDqExecuter: public TRichActor, NYql::TCounters { const TDqConfiguration::TPtr& settings, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TInstant requestStartTime, - bool createTaskSuspended) + bool createTaskSuspended, + ui64 executionTimeout) : TRichActor(&TDqExecuter::Handler) , GwmActorId(gwmActorId) , PrinterId(printerId) @@ -54,6 +55,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { , Counters(counters) // root, component=dq , LongWorkersAllocationCounter(Counters->GetSubgroup("component", "ServiceProxyActor")->GetCounter("LongWorkersAllocation")) , ExecutionTimeoutCounter(Counters->GetSubgroup("component", "ServiceProxyActor")->GetCounter("ExecutionTimeout", /*derivative=*/ true)) + , Timeout(TDuration::MilliSeconds(executionTimeout)) , WorkersAllocationFailTimeout(TDuration::MilliSeconds(Settings->_LongWorkersAllocationFailTimeout.Get().GetOrElse(TDqSettings::TDefault::LongWorkersAllocationFailTimeout))) , WorkersAllocationWarnTimeout(TDuration::MilliSeconds(Settings->_LongWorkersAllocationWarnTimeout.Get().GetOrElse(TDqSettings::TDefault::LongWorkersAllocationWarnTimeout))) , RequestStartTime(requestStartTime) @@ -239,10 +241,6 @@ class TDqExecuter: public TRichActor, NYql::TCounters { allocateRequest.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession)); - Timeout = tasks.size() == 1 - ? TDuration::MilliSeconds(Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout)) - : TDuration::MilliSeconds(Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout)); - YQL_CLOG(DEBUG, ProviderDq) << "Dq timeouts are set to: " << ToString(Timeout) << " (global), " << ToString(WorkersAllocationFailTimeout) << " (workers allocation fail), " @@ -523,9 +521,10 @@ NActors::IActor* MakeDqExecuter( const TDqConfiguration::TPtr& settings, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TInstant requestStartTime, - bool createTaskSuspended + bool createTaskSuspended, + ui64 executionTimeout ) { - return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended), traceId); + return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended, executionTimeout), traceId); } } // namespace NDq diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.h b/ydb/library/yql/providers/dq/actors/executer_actor.h index 0a453d513396..88dc1022fdb7 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.h +++ b/ydb/library/yql/providers/dq/actors/executer_actor.h @@ -15,7 +15,8 @@ NActors::IActor* MakeDqExecuter( const TDqConfiguration::TPtr& settings, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TInstant requestStartTime = TInstant::Now(), - bool createTaskSuspended = false + bool createTaskSuspended = false, + ui64 executionTimeout = 0 ); } // namespace NDq diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index 194e2201276c..d569fba231b8 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -88,6 +88,7 @@ message ExecuteGraphRequest { string RateLimiterResource = 15; map CommonTaskParams = 16; // to be merged into each task TTaskMeta.TaskParams NYql.NDqProto.EDqStatsMode StatsMode = 17; + uint64 ExecutionTimeout = 18; // in milliseconds } message ExecuteGraphResponse { diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index ac5552080c78..78b5a2f22576 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -126,6 +126,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this ModulesMapping; bool Discard; NThreading::TPromise Result; + ui64 ExecutionTimeout; }; public: @@ -148,13 +149,13 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const IDqGateway::TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) + bool discard, ui64 executionTimeout) { NThreading::TFuture result; { TGuard lock(Mutex); - Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise()}); + Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise(), executionTimeout}); result = Queue.back().Result; } @@ -177,7 +178,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_thisExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard) + Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard, request.ExecutionTimeout) .Apply([promise=request.Result, weak](const NThreading::TFuture& result) mutable { try { promise.SetValue(result.GetValue()); @@ -228,10 +229,10 @@ class TDqGatewayLocal : public IDqGateway { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) override + bool discard, ui64 executionTimeout) override { return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, - settings, progressWriter, modulesMapping, discard); + settings, progressWriter, modulesMapping, discard, executionTimeout); } void Stop() override { diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 73fff17b955f..3328ad1c3f63 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -866,6 +866,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } TInstant startTime = TInstant::Now(); + ui64 executionTimeout = State->Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout); try { auto result = TMaybeNode(input).Cast(); @@ -879,7 +880,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters auto precomputes = FindIndependentPrecomputes(result.Input().Ptr()); if (!precomputes.empty()) { - auto status = HandlePrecomputes(precomputes, ctx, resSettings); + auto status = HandlePrecomputes(precomputes, ctx, resSettings, executionTimeout); if (status.Level != TStatus::Ok) { if (status == TStatus::Async) { return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture& completedFuture) { @@ -1008,7 +1009,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters graphParams["Evaluation"] = ToString(!ctx.Step.IsDone(TExprStep::ExprEval)); future = State->ExecutePlan( State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, - settings, progressWriter, ModulesMapping, fillSettings.Discard); + settings, progressWriter, ModulesMapping, fillSettings.Discard, executionTimeout); } } @@ -1255,6 +1256,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters YQL_CLOG(TRACE, ProviderDq) << "HandlePull " << NCommon::ExprToPrettyString(ctx, *input); TInstant startTime = TInstant::Now(); + ui64 executionTimeout = State->Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout); auto pull = TPull(input); THashMap pullSettings; @@ -1273,7 +1275,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr()); if (!precomputes.empty()) { - auto status = HandlePrecomputes(precomputes, ctx, pullSettings); + auto status = HandlePrecomputes(precomputes, ctx, pullSettings, executionTimeout); if (status.Level != TStatus::Ok) { if (status == TStatus::Async) { return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture& completedFuture) { @@ -1454,7 +1456,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, - settings, progressWriter, ModulesMapping, fillSettings.Discard); + settings, progressWriter, ModulesMapping, fillSettings.Discard, executionTimeout); future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture& completedFuture) { YQL_ENSURE(!completedFuture.HasException()); @@ -1800,7 +1802,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters }); } - IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap& providerParams) { + IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap& providerParams, ui64 executionTimeout) { IDataProvider::TFillSettings fillSettings; fillSettings.AllResultsBytesLimit.Clear(); @@ -1968,7 +1970,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams, - settings, progressWriter, ModulesMapping, false); + settings, progressWriter, ModulesMapping, false, executionTimeout); executionPlanner.Destroy(); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 8ad08f347fe2..cc81a0dbc7f8 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -241,7 +241,7 @@ class TDqGatewaySession: public std::enable_shared_from_this const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) + bool discard, ui64 executionTimeout) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId); @@ -257,6 +257,7 @@ class TDqGatewaySession: public std::enable_shared_from_this YQL_ENSURE(!file.GetObjectId().empty()); } } + queryPB.SetExecutionTimeout(executionTimeout); queryPB.SetSession(SessionId); queryPB.SetResultType(plan.ResultType); queryPB.SetSourceId(plan.SourceID.NodeId()-1); @@ -521,7 +522,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) + bool discard, ui64 executionTimeout) { std::shared_ptr session; with_lock(Mutex) { @@ -534,7 +535,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this { YQL_CLOG(ERROR, ProviderDq) << "Session was closed: " << sessionId; return MakeFuture(NCommon::ResultFromException(yexception() << "Session was closed")); } - return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard) + return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout) .Apply([](const TFuture& f) { try { f.TryRethrow(); @@ -586,9 +587,9 @@ class TDqGateway: public IDqGateway { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) override + bool discard, ui64 executionTimeout) override { - return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard); + return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout); } TString GetVanillaJobPath() override { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index 42d07c21f096..798169bf7adc 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -75,7 +75,7 @@ class IDqGateway : public TThrRefBase { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) = 0; + bool discard, ui64 executionTimeout) = 0; virtual TString GetVanillaJobPath() { return ""; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h index 5a369805efc5..70f9d3a8b42f 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h @@ -82,16 +82,16 @@ struct TDqState: public TThrRefBase { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const IDqGateway::TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) { + bool discard, ui64 executionTimeout) { with_lock(Mutex_) { if (!OperationSemaphore) { const auto parallelOperationsLimit = Settings->ParallelOperationsLimit.Get().GetOrElse(TDqSettings::TDefault::ParallelOperationsLimit); OperationSemaphore = NThreading::TAsyncSemaphore::Make(parallelOperationsLimit); } } - return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard](const auto& f) mutable { + return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout](const auto& f) mutable { auto lock = f.GetValue()->MakeAutoRelease(); - return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard).Apply([unlock = lock.DeferRelease()](const auto& f) { + return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout).Apply([unlock = lock.DeferRelease()](const auto& f) { unlock(NThreading::MakeFuture()); return f; }); diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index b95fcc344c40..db43c9900a05 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -276,6 +276,7 @@ namespace NYql::NDqs { : TServiceProxyActor(ctx, counters, traceId, username) , GraphExecutionEventsActorId(graphExecutionEventsActorId) { + ExecutionTimeout = Request->GetExecutionTimeout(); } void DoRetry() override { @@ -378,7 +379,7 @@ namespace NYql::NDqs { YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; MergeTaskMetas(params); - auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); + auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime, false, ExecutionTimeout)); TVector columns; columns.reserve(Request->GetColumns().size()); @@ -427,6 +428,7 @@ namespace NYql::NDqs { } NActors::TActorId GraphExecutionEventsActorId; + ui64 ExecutionTimeout; }; TString GetVersionString() {