diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 84c17fa204db..1146b60fbd85 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -81,7 +81,8 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner Sleep(executionOptions.LoopDelay); } - if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) { + const auto executionCase = executionOptions.GetExecutionCase(id); + if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) { Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; if (numberQueries > 1) { Cout << " " << id; @@ -92,7 +93,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner Cout << "..." << colors.Default() << Endl; } - switch (executionOptions.GetExecutionCase(id)) { + switch (executionCase) { case TExecutionOptions::EExecutionCase::GenericScript: if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; @@ -381,8 +382,8 @@ class TMain : public TMainClassArgs { }); options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") .RequiredArgument("uint") - .DefaultValue(RunnerOptions.InFlightLimit) - .StoreResult(&RunnerOptions.InFlightLimit); + .DefaultValue(RunnerOptions.YdbSettings.InFlightLimit) + .StoreResult(&RunnerOptions.YdbSettings.InFlightLimit); TChoices scriptAction({ {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 3ead3c724998..f615765250da 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -1,5 +1,7 @@ #include "actors.h" +#include + #include #include @@ -10,26 +12,25 @@ namespace { class TRunScriptActorMock : public NActors::TActorBootstrapped { public: - TRunScriptActorMock(THolder request, - NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, - TProgressCallback progressCallback) - : Request_(std::move(request)) + TRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback) + : TargetNode(request.TargetNode) + , Request_(std::move(request.Event)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) , ResultSizeLimit_(std::numeric_limits::max()) , ProgressCallback_(progressCallback) { - if (resultRowsLimit) { - ResultRowsLimit_ = resultRowsLimit; + if (request.ResultRowsLimit) { + ResultRowsLimit_ = request.ResultRowsLimit; } - if (resultSizeLimit) { - ResultSizeLimit_ = resultSizeLimit; + if (request.ResultSizeLimit) { + ResultSizeLimit_ = request.ResultSizeLimit; } } void Bootstrap() { NActors::ActorIdToProto(SelfId(), Request_->Record.MutableRequestActorId()); - Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), std::move(Request_)); + Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode), std::move(Request_)); Become(&TRunScriptActorMock::StateFunc); } @@ -88,7 +89,8 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped Request_; + ui32 TargetNode = 0; + std::unique_ptr Request_; NThreading::TPromise Promise_; ui64 ResultRowsLimit_; ui64 ResultSizeLimit_; @@ -97,25 +99,104 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped ResultSetSizes_; }; -class TResourcesWaiterActor : public NActors::TActorBootstrapped { - struct TEvPrivate { - enum EEv : ui32 { - EvResourcesInfo = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), +class TAsyncQueryRunnerActor : public NActors::TActor { + using TBase = NActors::TActor; + +public: + TAsyncQueryRunnerActor(ui64 inFlightLimit) + : TBase(&TAsyncQueryRunnerActor::StateFunc) + , InFlightLimit_(inFlightLimit) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvStartAsyncQuery, Handle); + hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle); + hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle); + ) + + void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) { + DelayedRequests_.emplace(std::move(ev)); + StartDelayedRequests(); + } + + void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) { + const ui64 requestId = ev->Get()->RequestId; + RunningRequests_.erase(requestId); + + const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef(); + const auto status = response.GetYdbStatus(); + + if (status == Ydb::StatusIds::SUCCESS) { + Completed_++; + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + } else { + Failed_++; + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues); + Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default(); + } + + StartDelayedRequests(); + TryFinalize(); + } + + void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) { + FinalizePromise_ = ev->Get()->FinalizePromise; + if (!TryFinalize()) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl; + } + } - EvEnd - }; +private: + void StartDelayedRequests() { + while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) { + auto request = std::move(DelayedRequests_.front()); + DelayedRequests_.pop(); + + auto promise = NThreading::NewPromise(); + Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr)); + RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture& f) { + Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); + }); + + MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size()); + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + + RequestId_++; + request->Get()->StartPromise.SetValue(); + } + } - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + bool TryFinalize() { + if (!FinalizePromise_ || !RunningRequests_.empty()) { + return false; + } - struct TEvResourcesInfo : public NActors::TEventLocal { - explicit TEvResourcesInfo(i32 nodeCount) - : NodeCount(nodeCount) - {} + FinalizePromise_->SetValue(); + PassAway(); + return true; + } - const i32 NodeCount; - }; - }; + TString GetInfoString() const { + return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; + } + +private: + const ui64 InFlightLimit_; + const TInstant StartTime_ = TInstant::Now(); + const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout); + + std::optional> FinalizePromise_; + std::queue DelayedRequests_; + std::unordered_map> RunningRequests_; + + ui64 RequestId_ = 1; + ui64 MaxInFlight_ = 0; + ui64 Completed_ = 0; + ui64 Failed_ = 0; +}; +class TResourcesWaiterActor : public NActors::TActorBootstrapped { static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10); public: @@ -183,10 +264,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped request, - NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, - TProgressCallback progressCallback) { - return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, progressCallback); +NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback) { + return new TRunScriptActorMock(std::move(request), promise, progressCallback); +} + +NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) { + return new TAsyncQueryRunnerActor(inFlightLimit); } NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index f0a8ef3d5b2d..e438df4f1f6b 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -9,11 +9,68 @@ struct TQueryResponse { std::vector ResultSets; }; +struct TQueryRequest { + std::unique_ptr Event; + ui32 TargetNode; + ui64 ResultRowsLimit; + ui64 ResultSizeLimit; +}; + +struct TEvPrivate { + enum EEv : ui32 { + EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvAsyncQueryFinished, + EvFinalizeAsyncQueryRunner, + + EvResourcesInfo, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvStartAsyncQuery : public NActors::TEventLocal { + TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise startPromise) + : Request(std::move(request)) + , StartPromise(startPromise) + {} + + TQueryRequest Request; + NThreading::TPromise StartPromise; + }; + + struct TEvAsyncQueryFinished : public NActors::TEventLocal { + TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result) + : RequestId(requestId) + , Result(result) + {} + + const ui64 RequestId; + const TQueryResponse Result; + }; + + struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal { + explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise finalizePromise) + : FinalizePromise(finalizePromise) + {} + + NThreading::TPromise FinalizePromise; + }; + + struct TEvResourcesInfo : public NActors::TEventLocal { + explicit TEvResourcesInfo(i32 nodeCount) + : NodeCount(nodeCount) + {} + + const i32 NodeCount; + }; +}; + using TProgressCallback = std::function; -NActors::IActor* CreateRunScriptActorMock(THolder request, - NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, - TProgressCallback progressCallback); +NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback); + +NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit); NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 8aab9649936d..31abec30971b 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -28,6 +28,8 @@ struct TYdbSetupSettings { NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; TIntrusivePtr YtGateway; NKikimrConfig::TAppConfig AppConfig; + + ui64 InFlightLimit = 0; }; @@ -55,8 +57,6 @@ struct TRunnerOptions { NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; ETraceOptType TraceOptType = ETraceOptType::Disabled; - ui64 InFlightLimit = 0; - TYdbSetupSettings YdbSettings; }; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index abcc69bed91a..527f1d22d084 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -88,38 +88,11 @@ void PrintStatistics(const TString& fullStat, const THashMap& flat //// TKqpRunner::TImpl class TKqpRunner::TImpl { - struct TAsyncState { - ui64 OnStartRequest() { - InFlight++; - MaxInFlight = std::max(MaxInFlight, InFlight); - return RequestId++; - } - - void OnRequestFinished(bool success) { - InFlight--; - if (success) { - Completed++; - } else { - Failed++; - } - } - - TString GetInfoString() const { - return TStringBuilder() << "completed: " << Completed << ", failed: " << Failed << ", in flight: " << InFlight << ", max in flight: " << MaxInFlight << ", spend time: " << TInstant::Now() - Start; - } - - const TInstant Start = TInstant::Now(); - ui64 RequestId = 1; - ui64 MaxInFlight = 0; - ui64 InFlight = 0; - ui64 Completed = 0; - ui64 Failed = 0; - }; - public: enum class EQueryType { ScriptQuery, - YqlScriptQuery + YqlScriptQuery, + AsyncQuery }; explicit TImpl(const TRunnerOptions& options) @@ -173,6 +146,10 @@ class TKqpRunner::TImpl { case EQueryType::YqlScriptQuery: status = YdbSetup_.YqlScriptRequest(query, action, traceId, meta, ResultSets_); break; + + case EQueryType::AsyncQuery: + YdbSetup_.QueryRequestAsync(query, action, traceId); + return true; } TYdbSetup::StopTraceOpt(); @@ -193,43 +170,8 @@ class TKqpRunner::TImpl { return true; } - void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { - TGuard lock(Mutex_); - - if (Options_.InFlightLimit && AsyncState_.InFlight >= Options_.InFlightLimit) { - AwaitInFlight_.WaitI(Mutex_); - } - ui64 requestId = AsyncState_.OnStartRequest(); - Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; - - RunningQueries_[requestId] = YdbSetup_.QueryRequestAsync(query, action, traceId, nullptr).Subscribe([this, requestId](const NThreading::TFuture& f) { - TGuard lock(Mutex_); - - auto response = f.GetValue().Response; - AsyncState_.OnRequestFinished(response.IsSuccess()); - if (response.IsSuccess()) { - Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << Endl; - } else { - Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.Status << ". " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.Issues.ToString() << CoutColors_.Default(); - } - - if (AsyncState_.InFlight < Options_.InFlightLimit) { - AwaitInFlight_.Signal(); - } - if (!AsyncState_.InFlight) { - AwaitFinish_.Signal(); - } - RunningQueries_.erase(requestId); - }).IgnoreResult(); - } - - void WaitAsyncQueries() { - TGuard lock(Mutex_); - - if (AsyncState_.InFlight) { - Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for async queries..." << CoutColors_.Default() << Endl; - AwaitFinish_.WaitI(Mutex_); - } + void WaitAsyncQueries() const { + YdbSetup_.WaitAsyncQueries(); } bool FetchScriptResults() { @@ -444,12 +386,6 @@ class TKqpRunner::TImpl { TString ExecutionOperation_; TExecutionMeta ExecutionMeta_; std::vector ResultSets_; - - TMutex Mutex_; - TCondVar AwaitInFlight_; - TCondVar AwaitFinish_; - TAsyncState AsyncState_; - std::unordered_map> RunningQueries_; }; @@ -471,15 +407,15 @@ bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction act return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery); } -void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - Impl_->ExecuteQueryAsync(query, action, traceId); -} - bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); } -void TKqpRunner::WaitAsyncQueries() { +void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::AsyncQuery); +} + +void TKqpRunner::WaitAsyncQueries() const { Impl_->WaitAsyncQueries(); } diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index bcff9cc8d01a..3687a7cbda06 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -16,11 +16,11 @@ class TKqpRunner { bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; - void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; - bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; - void WaitAsyncQueries(); + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + + void WaitAsyncQueries() const; bool FetchScriptResults(); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index cd0f7261b3e8..010533ba990d 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -60,20 +60,6 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred TString YqlToken_; }; -TRequestResult GetQueryResult(TQueryResponse response, TQueryMeta& meta, std::vector& resultSets) { - resultSets = std::move(response.ResultSets); - - auto queryOperationResponse = response.Response->Get()->Record.GetRef(); - const auto& responseRecord = queryOperationResponse.GetResponse(); - - meta.Ast = responseRecord.GetQueryAst(); - if (const auto& plan = responseRecord.GetQueryPlan()) { - meta.Plan = plan; - } - - return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); -} - } // anonymous namespace @@ -232,20 +218,12 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - NThreading::TFuture QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { - auto event = MakeHolder(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); - - if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { - event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); - } - + TQueryResponse QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { + auto request = GetQueryRequest(query, action, traceId); auto promise = NThreading::NewPromise(); - auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); - auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, progressCallback), RandomNumber(Settings_.NodeCount)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); - return promise.GetFuture(); + return promise.GetFuture().GetValueSync(); } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { @@ -282,6 +260,29 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } + void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + if (!AsyncQueryRunnerActorId) { + AsyncQueryRunnerActorId = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit)); + } + + auto request = GetQueryRequest(query, action, traceId); + auto startPromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); + + return startPromise.GetFuture().GetValueSync(); + } + + void WaitAsyncQueries() const { + if (!AsyncQueryRunnerActorId) { + return; + } + + auto finalizePromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise)); + + return finalizePromise.GetFuture().GetValueSync(); + } + void StartTraceOpt() const { if (!Settings_.TraceOptEnabled) { ythrow yexception() << "Trace opt was disabled"; @@ -332,6 +333,22 @@ class TYdbSetup::TImpl { } } + TQueryRequest GetQueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + auto event = std::make_unique(); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); + + if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { + event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); + } + + return { + .Event = std::move(event), + .TargetNode = GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount)), + .ResultRowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(), + .ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit() + }; + } + private: TYdbSetupSettings Settings_; NColorizer::TColors CoutColors_; @@ -339,6 +356,8 @@ class TYdbSetup::TImpl { THolder Server_; THolder Client_; TPortManager PortManager_; + + std::optional AsyncQueryRunnerActorId; }; @@ -392,16 +411,19 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer } TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { - auto response = Impl_->QueryRequestAsync(query, action, traceId, progressCallback).GetValueSync(); - return GetQueryResult(std::move(response), meta, resultSets); -} + resultSets.clear(); + + TQueryResponse queryResponse = Impl_->QueryRequest(query, action, traceId, progressCallback); + const auto& queryOperationResponse = queryResponse.Response->Get()->Record.GetRef(); + const auto& responseRecord = queryOperationResponse.GetResponse(); -NThreading::TFuture TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { - return Impl_->QueryRequestAsync(query, action, traceId, progressCallback).Apply([](const NThreading::TFuture& f) { - TQueryResult result; - result.Response = GetQueryResult(f.GetValue(), result.Meta, result.ResultSets); - return result; - }); + resultSets = std::move(queryResponse.ResultSets); + meta.Ast = responseRecord.GetQueryAst(); + if (const auto& plan = responseRecord.GetQueryPlan()) { + meta.Plan = plan; + } + + return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); } TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { @@ -457,6 +479,14 @@ TRequestResult TYdbSetup::ForgetScriptExecutionOperationRequest(const TString& o return TRequestResult(forgetScriptExecutionOperationResponse->Get()->Status, forgetScriptExecutionOperationResponse->Get()->Issues); } +void TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->QueryRequestAsync(query, action, traceId); +} + +void TYdbSetup::WaitAsyncQueries() const { + Impl_->WaitAsyncQueries(); +} + void TYdbSetup::StartTraceOpt() const { Impl_->StartTraceOpt(); } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index 4d707f28b050..aa4181da23b5 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -64,8 +64,6 @@ class TYdbSetup { TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; - NThreading::TFuture QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const; - TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const; @@ -74,6 +72,10 @@ class TYdbSetup { TRequestResult ForgetScriptExecutionOperationRequest(const TString& operation) const; + void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + + void WaitAsyncQueries() const; + void StartTraceOpt() const; static void StopTraceOpt();