diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 30962870e8e6..20509952d03d 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -299,11 +299,11 @@ namespace Tests { } } - void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options) { + void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) { GRpcServer.reset(new NYdbGrpc::TGRpcServer(options)); auto grpcService = new NGRpcProxy::TGRpcService(); - auto system(Runtime->GetAnyNodeActorSystem()); + auto system(Runtime->GetActorSystem(grpcServiceNodeId)); Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl; @@ -311,21 +311,23 @@ namespace Tests { TVector grpcRequestProxies; grpcRequestProxies.reserve(proxyCount); - auto& appData = Runtime->GetAppData(); + auto& appData = Runtime->GetAppData(grpcServiceNodeId); NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator(appData.TimeProvider, appData.RandomProvider); for (size_t i = 0; i < proxyCount; ++i) { auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(*Settings->AppConfig, tracingConfigurator.GetControl()); - auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled); + auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled, appData.UserPoolId); system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId); grpcRequestProxies.push_back(grpcRequestProxyId); } system->Register( - NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Settings->AppConfig->GetTracingConfig()) + NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Settings->AppConfig->GetTracingConfig()), + TMailboxType::ReadAsFilled, + appData.UserPoolId ); - auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled); + auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId); system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon); GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); @@ -406,11 +408,12 @@ namespace Tests { GRpcServer->Start(); } - void TServer::EnableGRpc(ui16 port) { + void TServer::EnableGRpc(ui16 port, ui32 grpcServiceNodeId) { EnableGRpc(NYdbGrpc::TServerOptions() .SetHost("localhost") .SetPort(port) - .SetLogger(NYdbGrpc::CreateActorSystemLogger(*Runtime->GetAnyNodeActorSystem(), NKikimrServices::GRPC_SERVER)) + .SetLogger(NYdbGrpc::CreateActorSystemLogger(*Runtime->GetActorSystem(grpcServiceNodeId), NKikimrServices::GRPC_SERVER)), + grpcServiceNodeId ); } @@ -748,7 +751,7 @@ namespace Tests { NKikimr::NConfig::TConfigsDispatcherInitInfo { .InitialConfig = initial, }); - auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + auto aid = Runtime->Register(dispatcher, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableMetadataProvider()) { @@ -765,7 +768,7 @@ namespace Tests { } if (Settings->IsEnableExternalIndex()) { auto* actor = NCSIndex::CreateService(NCSIndex::TConfig()); - const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } { @@ -791,7 +794,7 @@ namespace Tests { Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); - TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx); + TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx, appData.UserPoolId); Runtime->RegisterService(NSysView::MakeSysViewServiceID(Runtime->GetNodeId(nodeIdx)), sysViewServiceId, nodeIdx); auto tenantPublisher = CreateTenantNodeEnumerationPublisher(); @@ -809,11 +812,13 @@ namespace Tests { } void TServer::SetupProxies(ui32 nodeIdx) { + const ui32 userPoolId = Runtime->GetAppData(nodeIdx).UserPoolId; + Runtime->SetTxAllocatorTabletIds({ChangeStateStorage(TxAllocator, Settings->Domain)}); { if (Settings->AuthConfig.HasLdapAuthentication()) { IActor* ldapAuthProvider = NKikimr::CreateLdapAuthProvider(Settings->AuthConfig.GetLdapAuthentication()); - TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx); + TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx, userPoolId); Runtime->RegisterService(MakeLdapAuthProviderID(), ldapAuthProviderId, nodeIdx); } TTicketParserSettings ticketParserSettings { @@ -825,19 +830,19 @@ namespace Tests { } }; IActor* ticketParser = Settings->CreateTicketParser(ticketParserSettings); - TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx); + TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx, userPoolId); Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx); } { IActor* healthCheck = NHealthCheck::CreateHealthCheckService(); - TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx); + TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx, userPoolId); Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx); } { const auto& appData = Runtime->GetAppData(nodeIdx); IActor* metadataCache = CreateDatabaseMetadataCache(appData.TenantName, appData.Counters).release(); - TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx); + TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx, userPoolId); Runtime->RegisterService(MakeDatabaseMetadataCacheId(Runtime->GetNodeId(nodeIdx)), metadataCacheId, nodeIdx); } { @@ -845,7 +850,7 @@ namespace Tests { IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor( Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources, Runtime->GetNodeId(nodeIdx)); - TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx); + TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx); if (!KqpLoggerScope) { @@ -868,14 +873,14 @@ namespace Tests { auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId(); Runtime->RegisterService( httpProxyActorId, - Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx), + Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx, userPoolId), nodeIdx ); auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId(); Runtime->RegisterService( databaseResolverActorId, - Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx), + Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx, userPoolId), nodeIdx ); @@ -909,13 +914,13 @@ namespace Tests { TVector(Settings->KqpSettings), nullptr, std::move(kqpProxySharedResources), federatedQuerySetupFactory, Settings->S3ActorsFactory); - TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); + TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService( Settings->AppConfig->GetQueryServiceConfig(), federatedQuerySetupFactory, Settings->S3ActorsFactory ); - TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx); + TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx); } @@ -927,19 +932,19 @@ namespace Tests { { IActor* compileService = CreateMiniKQLCompileService(100000); - TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0); + TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, userPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx); } { IActor* longTxService = NLongTxService::CreateLongTxService(); - TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx); + TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx, userPoolId); Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx); } { IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy(); - TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx); + TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx, userPoolId); Runtime->RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, nodeIdx); } @@ -952,7 +957,7 @@ namespace Tests { } { IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters()); - TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx); + TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx, userPoolId); Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx); } { @@ -965,12 +970,12 @@ namespace Tests { { IActor* pqClusterTracker = NPQ::NClusterTracker::CreateClusterTracker(); - TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx); + TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx, userPoolId); Runtime->RegisterService(NPQ::NClusterTracker::MakeClusterTrackerID(), pqClusterTrackerId, nodeIdx); } { IActor* pqReadCacheService = NPQ::CreatePQDReadCacheService(Runtime->GetDynamicCounters()); - TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx); + TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx, userPoolId); Runtime->RegisterService(NPQ::MakePQDReadCacheServiceActorId(), readCacheId, nodeIdx); } @@ -980,7 +985,7 @@ namespace Tests { new ::NMonitoring::TDynamicCounters(), TDuration::Seconds(1) ); - TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx); + TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx, userPoolId); Runtime->RegisterService(NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), pqMetaCacheId, nodeIdx); } } @@ -991,7 +996,7 @@ namespace Tests { try { fileBackend = MakeHolder(Settings->MeteringFilePath); auto meteringActor = NMetering::CreateMeteringWriter(std::move(fileBackend)); - TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx); + TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).IOPoolId); Runtime->RegisterService(NMetering::MakeMeteringServiceID(), meteringId, nodeIdx); } catch (const TFileError &ex) { @@ -1003,19 +1008,19 @@ namespace Tests { { IActor* kesusService = NKesus::CreateKesusProxyService(); - TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx); + TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId); Runtime->RegisterService(NKesus::MakeKesusProxyServiceId(), kesusServiceId, nodeIdx); } { IActor* netClassifier = NNetClassifier::CreateNetClassifier(); - TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx); + TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx, userPoolId); Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx); } { IActor* actor = CreatePollerActor(); - TActorId actorId = Runtime->Register(actor, nodeIdx); + TActorId actorId = Runtime->Register(actor, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId); Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx); } @@ -1027,11 +1032,11 @@ namespace Tests { } IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig()); - TActorId actorId = Runtime->Register(actor, nodeIdx); + TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId); Runtime->RegisterService(TActorId{}, actorId, nodeIdx); IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")}); - TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx); + TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx, userPoolId); Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx); } @@ -1158,7 +1163,7 @@ namespace Tests { IActor* viewer = CreateViewer(*Settings->KikimrRunConfig); SetupPQVirtualHandlers(dynamic_cast(viewer)); SetupDBVirtualHandlers(dynamic_cast(viewer)); - TActorId viewerId = Runtime->Register(viewer, nodeIdx); + TActorId viewerId = Runtime->Register(viewer, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId); Runtime->RegisterService(MakeViewerID(nodeIdx), viewerId, nodeIdx); } } diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 133f0fe77d39..7fdbd1174840 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -288,8 +288,8 @@ namespace Tests { TServer& operator =(TServer&& server) = default; virtual ~TServer(); - void EnableGRpc(const NYdbGrpc::TServerOptions& options); - void EnableGRpc(ui16 port); + void EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId = 0); + void EnableGRpc(ui16 port, ui32 grpcServiceNodeId = 0); void SetupRootStoragePools(const TActorId sender) const; void SetupDefaultProfiles(); diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index 807aadd42e70..a4578942a676 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -6,3 +6,5 @@ udfs *.sql *.bin *.txt +*.svg +*.old diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 85225bbe88f1..b6146fbd6e00 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -1,3 +1,53 @@ +ActorSystemConfig { + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 10 + Name: "System" + } + Executor { + Type: BASIC + Threads: 6 + SpinThreshold: 1 + Name: "User" + } + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 1 + Name: "Batch" + } + Executor { + Type: IO + Threads: 1 + Name: "IO" + } + Executor { + Type: BASIC + Threads: 2 + SpinThreshold: 10 + Name: "IC" + TimePerMailboxMicroSecs: 100 + } + Scheduler { + Resolution: 64 + SpinThreshold: 0 + ProgressThreshold: 10000 + } + SysExecutor: 0 + UserExecutor: 1 + IoExecutor: 3 + BatchExecutor: 2 + ServiceExecutor { + ServiceName: "Interconnect" + ExecutorId: 4 + } +} + +ColumnShardConfig { + DisabledOnSchemeShard: false +} + FeatureFlags { EnableExternalDataSources: true EnableScriptExecutionOperations: true diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh new file mode 100755 index 000000000000..ead8de30683a --- /dev/null +++ b/ydb/tests/tools/kqprun/flame_graph.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +# For svg graph download https://github.com/brendangregg/FlameGraph +# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg` + +pid=$(pgrep -u $USER kqprun) + +echo "Target process id: ${pid}" + +sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30 +sudo perf script -i profdata > profdata.txt diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 9f97c8af1fac..c9cc63bee296 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -36,27 +36,202 @@ struct TExecutionOptions { bool ForgetExecution = false; std::vector ExecutionCases; - NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; + std::vector ScriptQueryActions; + std::vector Databases; + std::vector TraceIds; + std::vector PoolIds; + std::vector UserSIDs; + ui64 ResultsRowsLimit = 0; const TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { - if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { - return false; - } - - for (EExecutionCase executionCase : ExecutionCases) { - if (executionCase != EExecutionCase::AsyncQuery) { + for (size_t i = 0; i < ScriptQueries.size(); ++i) { + if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { + continue; + } + if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) { return true; } } return false; } + bool HasExecutionCase(EExecutionCase executionCase) const { + if (ExecutionCases.empty()) { + return executionCase == EExecutionCase::GenericScript; + } + return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end(); + } + EExecutionCase GetExecutionCase(size_t index) const { Y_ABORT_UNLESS(!ExecutionCases.empty()); return ExecutionCases[std::min(index, ExecutionCases.size() - 1)]; } + + NKikimrKqp::EQueryAction GetScriptQueryAction(size_t index) const { + return GetValue(index, ScriptQueryActions, NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE); + } + + NKqpRun::TRequestOptions GetSchemeQueryOptions() const { + TString sql = SchemeQuery; + if (UseTemplates) { + ReplaceYqlTokenTemplate(sql); + } + + return { + .Query = sql, + .Action = NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE, + .TraceId = DefaultTraceId, + .PoolId = "", + .UserSID = BUILTIN_ACL_ROOT, + .Database = "" + }; + } + + NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const { + Y_ABORT_UNLESS(index < ScriptQueries.size()); + + TString sql = ScriptQueries[index]; + if (UseTemplates) { + ReplaceYqlTokenTemplate(sql); + SubstGlobal(sql, "${QUERY_ID}", ToString(queryId)); + } + + return { + .Query = sql, + .Action = GetScriptQueryAction(index), + .TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(), + .PoolId = GetValue(index, PoolIds, TString()), + .UserSID = GetValue(index, UserSIDs, TString(BUILTIN_ACL_ROOT)), + .Database = GetValue(index, Databases, TString()) + }; + } + + void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const { + if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) { + ythrow yexception() << "Nothing to execute and is not running as daemon"; + } + + ValidateOptionsSizes(); + ValidateSchemeQueryOptions(runnerOptions); + ValidateScriptExecutionOptions(runnerOptions); + ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings); + ValidateTraceOpt(runnerOptions.TraceOptType); + } + +private: + void ValidateOptionsSizes() const { + const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) { + if (checkSize > numberQueries) { + ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries; + } + }; + + checker(ExecutionCases.size(), "execution cases"); + checker(ScriptQueryActions.size(), "script query actions"); + checker(Databases.size(), "databases"); + checker(TraceIds.size(), "trace ids"); + checker(PoolIds.size(), "pool ids"); + checker(UserSIDs.size(), "user SIDs"); + } + + void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + if (SchemeQuery) { + return; + } + if (runnerOptions.SchemeQueryAstOutput) { + ythrow yexception() << "Scheme query AST output can not be used without scheme query"; + } + } + + void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + // Script specific + if (HasExecutionCase(EExecutionCase::GenericScript)) { + return; + } + if (ForgetExecution) { + ythrow yexception() << "Forget execution can not be used without generic script queries"; + } + if (runnerOptions.ScriptCancelAfter) { + ythrow yexception() << "Cancel after can not be used without generic script queries"; + } + + // Script/Query specific + if (HasExecutionCase(EExecutionCase::GenericQuery)) { + return; + } + if (ResultsRowsLimit) { + ythrow yexception() << "Result rows limit can not be used without script queries"; + } + if (runnerOptions.InProgressStatisticsOutputFile) { + ythrow yexception() << "Script statistics can not be used without script queries"; + } + + // Common specific + if (HasExecutionCase(EExecutionCase::YqlScript)) { + return; + } + if (runnerOptions.ScriptQueryAstOutput) { + ythrow yexception() << "Script query AST output can not be used without script/yql queries"; + } + if (runnerOptions.ScriptQueryPlanOutput) { + ythrow yexception() << "Script query plan output can not be used without script/yql queries"; + } + } + + void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const { + if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) { + ythrow yexception() << "In flight limit can not be used without async queries"; + } + + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) { + Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl; + } + } + + void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const { + switch (traceOptType) { + case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: { + if (!SchemeQuery) { + ythrow yexception() << "Trace opt type scheme cannot be used without scheme query"; + } + break; + } + case NKqpRun::TRunnerOptions::ETraceOptType::Script: { + if (ScriptQueries.empty()) { + ythrow yexception() << "Trace opt type script cannot be used without script queries"; + } + } + case NKqpRun::TRunnerOptions::ETraceOptType::All: { + if (!SchemeQuery && ScriptQueries.empty()) { + ythrow yexception() << "Trace opt type all cannot be used without any queries"; + } + } + case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: { + break; + } + } + } + +private: + template + static TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { + if (values.empty()) { + return defaultValue; + } + return values[std::min(index, values.size() - 1)]; + } + + static void ReplaceYqlTokenTemplate(TString& sql) { + const TString variableName = TStringBuilder() << "${" << NKqpRun::YQL_TOKEN_VARIABLE << "}"; + if (const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE)) { + SubstGlobal(sql, variableName, yqlToken); + } else if (sql.Contains(variableName)) { + ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n"; + } + } }; @@ -211,7 +386,6 @@ class TMain : public TMainClassArgs { TVector UdfsPaths; TString UdfsDirectory; bool ExcludeLinkedUdfs = false; - ui64 ResultsRowsLimit = 1000; bool EmulateYt = false; static TString LoadFile(const TString& file) { @@ -348,8 +522,8 @@ class TMain : public TMainClassArgs { .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") .RequiredArgument("uint") - .DefaultValue(ResultsRowsLimit) - .StoreResult(&ResultsRowsLimit); + .DefaultValue(0) + .StoreResult(&ExecutionOptions.ResultsRowsLimit); TChoices resultFormat({ {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, @@ -374,11 +548,16 @@ class TMain : public TMainClassArgs { .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); options.AddLongOption("script-statistics", "File with script inprogress statistics") .RequiredArgument("file") - .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile); - TChoices planFormat({ - {"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty}, - {"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable}, - {"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode}, + .StoreMappedResultT(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) { + if (file == "-") { + ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name"; + } + return file; + }); + TChoices planFormat({ + {"pretty", NYdb::NConsoleClient::EDataFormat::Pretty}, + {"table", NYdb::NConsoleClient::EDataFormat::PrettyTable}, + {"json", NYdb::NConsoleClient::EDataFormat::JsonUnicode}, }); options.AddLongOption('P', "plan-format", "Script query plan format") .RequiredArgument("plan-format") @@ -386,6 +565,15 @@ class TMain : public TMainClassArgs { .Choices(planFormat.GetChoices()) .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); + options.AddLongOption("script-timeline-file", "File with script query timline in svg format") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) { + if (file == "-") { + ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name"; + } + return file; + }); + // Pipeline settings TChoices executionCase({ @@ -396,7 +584,6 @@ class TMain : public TMainClassArgs { }); options.AddLongOption('C', "execution-case", "Type of query for -p argument") .RequiredArgument("query-type") - .DefaultValue("script") .Choices(executionCase.GetChoices()) .Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) { TString choice(option->CurValOrDef()); @@ -413,10 +600,17 @@ class TMain : public TMainClassArgs { }); options.AddLongOption('A', "script-action", "Script query execute action") .RequiredArgument("script-action") - .DefaultValue("execute") .Choices(scriptAction.GetChoices()) .StoreMappedResultT(&ExecutionOptions.ScriptQueryAction, scriptAction); + options.AddLongOption("timeout", "Reauests timeout in milliseconds") + .RequiredArgument("uint") + .StoreMappedResultT(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds); + + options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds") + .RequiredArgument("uint") + .StoreMappedResultT(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds); + options.AddLongOption('F', "forget", "Forget script execution operation after fetching results") .NoArgument() .SetFlag(&ExecutionOptions.ForgetExecution); @@ -427,7 +621,7 @@ class TMain : public TMainClassArgs { .StoreResult(&ExecutionOptions.LoopCount); options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") .RequiredArgument("uint") - .DefaultValue(1000) + .DefaultValue(0) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); options.AddLongOption("pool", "Workload manager pool in which queries will be executed") @@ -459,6 +653,38 @@ class TMain : public TMainClassArgs { .NoArgument() .SetFlag(&EmulateYt); + options.AddLongOption("domain", "Test cluster domain name") + .RequiredArgument("name") + .DefaultValue(RunnerOptions.YdbSettings.DomainName) + .StoreResult(&RunnerOptions.YdbSettings.DomainName); + + options.AddLongOption("dedicated", "Dedicated tenant path, relative inside domain") + .RequiredArgument("path") + .InsertTo(&RunnerOptions.YdbSettings.DedicatedTenants); + + options.AddLongOption("shared", "Shared tenant path, relative inside domain") + .RequiredArgument("path") + .InsertTo(&RunnerOptions.YdbSettings.SharedTenants); + + options.AddLongOption("serverless", "Serverless tenant path, relative inside domain (use string serverless-name@shared-name to specify shared database)") + .RequiredArgument("path") + .InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants); + + options.AddLongOption("storage-size", "Domain storage size in gigabytes") + .RequiredArgument("uint") + .DefaultValue(32) + .StoreMappedResultT(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) { + return static_cast(diskSize) << 30; + }); + + options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)") + .NoArgument() + .SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks); + + options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster") + .NoArgument() + .SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock); + TChoices> backtrace({ {"heavy", &NKikimr::EnableYDBBacktraceFormat}, {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} @@ -474,13 +700,17 @@ class TMain : public TMainClassArgs { } int DoRun(NLastGetopt::TOptsParseResult&&) override { - if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) { - ythrow yexception() << "Nothing to execute"; + ExecutionOptions.Validate(RunnerOptions); + + if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) { + ythrow yexception() << "Disable disk mock cannot be used for multi node clusters"; } RunnerOptions.YdbSettings.YqlToken = YqlToken; RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); - RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit); + if (ExecutionOptions.ResultsRowsLimit) { + RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); + } if (EmulateYt) { const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index a16c1010f9f5..2fb620381f6b 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -18,6 +18,11 @@ struct TYdbSetupSettings { TString DomainName = "Root"; TString DefaultPoolId; TDuration InitializationTimeout = TDuration::Seconds(10); + TDuration RequestsTimeout; + + bool DisableDiskMock = false; + bool UseRealPDisks = false; + ui64 DiskSize = 32_GB; bool MonitoringEnabled = false; ui16 MonitoringPortOffset = 0; @@ -52,6 +57,7 @@ struct TRunnerOptions { IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; + TString ScriptQueryTimelineFile; TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 399025dcee25..b5c05c702d56 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -9,6 +9,7 @@ #include #include +#include namespace NKqpRun { @@ -152,8 +153,12 @@ class TKqpRunner::TImpl { TYdbSetup::StopTraceOpt(); - PrintScriptAst(meta.Ast); + if (!meta.Plan) { + meta.Plan = ExecutionMeta_.Plan; + } + PrintScriptAst(meta.Ast); + PrintScriptProgress(meta.Plan); PrintScriptPlan(meta.Plan); if (!status.IsSuccess()) { @@ -245,7 +250,7 @@ class TKqpRunner::TImpl { } PrintScriptAst(ExecutionMeta_.Ast); - + PrintScriptProgress(ExecutionMeta_.Plan); PrintScriptPlan(ExecutionMeta_.Plan); if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) { @@ -335,6 +340,15 @@ class TKqpRunner::TImpl { outputStream << "\nPlan visualization:" << Endl; PrintPlan(convertedPlan, &outputStream); + outputStream.Finish(); + } + if (Options_.ScriptQueryTimelineFile) { + TFileOutput outputStream(Options_.ScriptQueryTimelineFile); + + TPlanVisualizer planVisualizer; + planVisualizer.LoadPlans(plan); + outputStream.Write(planVisualizer.PrintSvg()); + outputStream.Finish(); } } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index d394732d4246..95835459eb84 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -112,8 +113,20 @@ class TYdbSetup::TImpl { serverSettings.SetFrFactory(functionRegistryFactory); } - NKikimr::Tests::TServerSettings GetServerSettings() { - ui32 msgBusPort = PortManager_.GetPort(); + void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const { + const NKikimr::NFake::TStorage storage = { + .UseDisk = Settings_.UseRealPDisks, + .SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE, + .ChunkSize = Settings_.UseRealPDisks ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE, + .DiskSize = Settings_.DiskSize + }; + + serverSettings.SetEnableMockOnSingleNode(!Settings_.DisableDiskMock && !Settings_.UseRealPDisks); + serverSettings.SetCustomDiskParams(storage); + } + + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { + const ui32 msgBusPort = PortManager_.GetPort(); NKikimr::Tests::TServerSettings serverSettings(msgBusPort, Settings_.AppConfig.GetAuthConfig(), Settings_.AppConfig.GetPQConfig()); serverSettings.SetNodeCount(Settings_.NodeCount); @@ -137,6 +150,7 @@ class TYdbSetup::TImpl { SetLoggerSettings(serverSettings); SetFunctionRegistry(serverSettings); + SetStorageSettings(serverSettings); if (Settings_.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); @@ -215,7 +229,7 @@ class TYdbSetup::TImpl { NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const { auto event = MakeHolder(); - FillScriptRequest(script, action, traceId, event->Record); + FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event->Record); return RunKqpProxyRequest(std::move(event)); } @@ -223,7 +237,7 @@ class TYdbSetup::TImpl { TQueryResponse QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { auto request = GetQueryRequest(query, action, traceId); auto promise = NThreading::NewPromise(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), request.TargetNode - GetRuntime()->GetFirstNodeId(), GetRuntime()->GetAppData().UserPoolId); return promise.GetFuture().GetValueSync(); } @@ -324,17 +338,11 @@ class TYdbSetup::TImpl { request->SetType(type); request->SetAction(action); request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); - request->SetDatabase(Settings_.DomainName); - request->SetPoolId(Settings_.DefaultPoolId); - } - - void FillScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, action, traceId, event); + request->SetDatabase(GetDatabasePath(query.Database)); + request->SetPoolId(query.PoolId); - auto request = event.MutableRequest(); - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) { - request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request->MutableTxControl()->set_commit_tx(true); + if (Settings_.RequestsTimeout) { + request->SetTimeoutMs(Settings_.RequestsTimeout.MilliSeconds()); } }