diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 1d2639625213..be2311df44c8 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -795,7 +795,7 @@ namespace Tests { NKikimr::NConfig::TConfigsDispatcherInitInfo { .InitialConfig = initial, }); - auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + auto aid = Runtime->Register(dispatcher, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableMetadataProvider()) { @@ -812,7 +812,7 @@ namespace Tests { } if (Settings->IsEnableExternalIndex()) { auto* actor = NCSIndex::CreateService(NCSIndex::TConfig()); - const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } { @@ -838,7 +838,7 @@ namespace Tests { Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); - TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx); + TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx, appData.UserPoolId); Runtime->RegisterService(NSysView::MakeSysViewServiceID(Runtime->GetNodeId(nodeIdx)), sysViewServiceId, nodeIdx); auto tenantPublisher = CreateTenantNodeEnumerationPublisher(); @@ -856,11 +856,13 @@ namespace Tests { } void TServer::SetupProxies(ui32 nodeIdx) { + const ui32 userPoolId = Runtime->GetAppData(nodeIdx).UserPoolId; + Runtime->SetTxAllocatorTabletIds({ChangeStateStorage(TxAllocator, Settings->Domain)}); { if (Settings->AuthConfig.HasLdapAuthentication()) { IActor* ldapAuthProvider = NKikimr::CreateLdapAuthProvider(Settings->AuthConfig.GetLdapAuthentication()); - TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx); + TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx, userPoolId); Runtime->RegisterService(MakeLdapAuthProviderID(), ldapAuthProviderId, nodeIdx); } TTicketParserSettings ticketParserSettings { @@ -872,19 +874,19 @@ namespace Tests { } }; IActor* ticketParser = Settings->CreateTicketParser(ticketParserSettings); - TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx); + TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx, userPoolId); Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx); } { IActor* healthCheck = NHealthCheck::CreateHealthCheckService(); - TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx); + TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx, userPoolId); Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx); } { const auto& appData = Runtime->GetAppData(nodeIdx); IActor* metadataCache = CreateDatabaseMetadataCache(appData.TenantName, appData.Counters).release(); - TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx); + TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx, userPoolId); Runtime->RegisterService(MakeDatabaseMetadataCacheId(Runtime->GetNodeId(nodeIdx)), metadataCacheId, nodeIdx); } { @@ -892,7 +894,7 @@ namespace Tests { IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor( Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources, Runtime->GetNodeId(nodeIdx)); - TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx); + TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx); if (!KqpLoggerScope) { @@ -915,14 +917,14 @@ namespace Tests { auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId(); Runtime->RegisterService( httpProxyActorId, - Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx), + Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx, userPoolId), nodeIdx ); auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId(); Runtime->RegisterService( databaseResolverActorId, - Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx), + Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx, userPoolId), nodeIdx ); @@ -956,50 +958,50 @@ namespace Tests { TVector(Settings->KqpSettings), nullptr, std::move(kqpProxySharedResources), federatedQuerySetupFactory, Settings->S3ActorsFactory); - TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); + TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService( Settings->AppConfig->GetQueryServiceConfig(), federatedQuerySetupFactory, Settings->S3ActorsFactory ); - TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx); + TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx); } { IActor* txProxy = CreateTxProxy(Runtime->GetTxAllocatorTabletIds()); - TActorId txProxyId = Runtime->Register(txProxy, nodeIdx); + TActorId txProxyId = Runtime->Register(txProxy, nodeIdx, userPoolId); Runtime->RegisterService(MakeTxProxyID(), txProxyId, nodeIdx); } { IActor* compileService = CreateMiniKQLCompileService(100000); - TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0); + TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, userPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx); } { IActor* longTxService = NLongTxService::CreateLongTxService(); - TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx); + TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx, userPoolId); Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx); } { IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy(); - TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx); + TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx, userPoolId); Runtime->RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, nodeIdx); } if (BusServer && nodeIdx == 0) { // MsgBus and GRPC are run now only on first node { IActor* proxy = BusServer->CreateProxy(); - TActorId proxyId = Runtime->Register(proxy, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0); + TActorId proxyId = Runtime->Register(proxy, nodeIdx, userPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NMsgBusProxy::CreateMsgBusProxyId(), proxyId, nodeIdx); } } { IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters()); - TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx); + TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx, userPoolId); Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx); } { @@ -1012,12 +1014,12 @@ namespace Tests { { IActor* pqClusterTracker = NPQ::NClusterTracker::CreateClusterTracker(); - TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx); + TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx, userPoolId); Runtime->RegisterService(NPQ::NClusterTracker::MakeClusterTrackerID(), pqClusterTrackerId, nodeIdx); } { IActor* pqReadCacheService = NPQ::CreatePQDReadCacheService(Runtime->GetDynamicCounters()); - TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx); + TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx, userPoolId); Runtime->RegisterService(NPQ::MakePQDReadCacheServiceActorId(), readCacheId, nodeIdx); } @@ -1027,7 +1029,7 @@ namespace Tests { new ::NMonitoring::TDynamicCounters(), TDuration::Seconds(1) ); - TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx); + TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx, userPoolId); Runtime->RegisterService(NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), pqMetaCacheId, nodeIdx); } } @@ -1038,7 +1040,7 @@ namespace Tests { try { fileBackend = MakeHolder(Settings->MeteringFilePath); auto meteringActor = NMetering::CreateMeteringWriter(std::move(fileBackend)); - TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx); + TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).IOPoolId); Runtime->RegisterService(NMetering::MakeMeteringServiceID(), meteringId, nodeIdx); } catch (const TFileError &ex) { @@ -1050,19 +1052,19 @@ namespace Tests { { IActor* kesusService = NKesus::CreateKesusProxyService(); - TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx); + TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId); Runtime->RegisterService(NKesus::MakeKesusProxyServiceId(), kesusServiceId, nodeIdx); } { IActor* netClassifier = NNetClassifier::CreateNetClassifier(); - TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx); + TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx, userPoolId); Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx); } { IActor* actor = CreatePollerActor(); - TActorId actorId = Runtime->Register(actor, nodeIdx); + TActorId actorId = Runtime->Register(actor, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId); Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx); } @@ -1074,11 +1076,11 @@ namespace Tests { } IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig()); - TActorId actorId = Runtime->Register(actor, nodeIdx); + TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId); Runtime->RegisterService(TActorId{}, actorId, nodeIdx); IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")}); - TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx); + TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx, userPoolId); Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx); } @@ -1205,7 +1207,7 @@ namespace Tests { IActor* viewer = CreateViewer(*Settings->KikimrRunConfig); SetupPQVirtualHandlers(dynamic_cast(viewer)); SetupDBVirtualHandlers(dynamic_cast(viewer)); - TActorId viewerId = Runtime->Register(viewer, nodeIdx); + TActorId viewerId = Runtime->Register(viewer, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId); Runtime->RegisterService(MakeViewerID(nodeIdx), viewerId, nodeIdx); } } diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index 807aadd42e70..a4578942a676 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -6,3 +6,5 @@ udfs *.sql *.bin *.txt +*.svg +*.old diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index b57e03854ae4..1c9c56cebd20 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -1,3 +1,49 @@ +ActorSystemConfig { + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 10 + Name: "System" + } + Executor { + Type: BASIC + Threads: 6 + SpinThreshold: 1 + Name: "User" + } + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 1 + Name: "Batch" + } + Executor { + Type: IO + Threads: 1 + Name: "IO" + } + Executor { + Type: BASIC + Threads: 2 + SpinThreshold: 10 + Name: "IC" + TimePerMailboxMicroSecs: 100 + } + Scheduler { + Resolution: 64 + SpinThreshold: 0 + ProgressThreshold: 10000 + } + SysExecutor: 0 + UserExecutor: 1 + IoExecutor: 3 + BatchExecutor: 2 + ServiceExecutor { + ServiceName: "Interconnect" + ExecutorId: 4 + } +} + ColumnShardConfig { DisabledOnSchemeShard: false } diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh new file mode 100755 index 000000000000..ead8de30683a --- /dev/null +++ b/ydb/tests/tools/kqprun/flame_graph.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +# For svg graph download https://github.com/brendangregg/FlameGraph +# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg` + +pid=$(pgrep -u $USER kqprun) + +echo "Target process id: ${pid}" + +sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30 +sudo perf script -i profdata > profdata.txt diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 3e4df13110a9..88802d2aead9 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -44,25 +44,29 @@ struct TExecutionOptions { std::vector TraceIds; std::vector PoolIds; std::vector UserSIDs; + ui64 ResultsRowsLimit = 0; const TString DefaultTraceId = "kqprun"; bool HasResults() const { - if (ScriptQueries.empty()) { - return false; - } - - for (size_t i = 0; i < ExecutionCases.size(); ++i) { + for (size_t i = 0; i < ScriptQueries.size(); ++i) { if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { continue; } - if (ExecutionCases[i] != EExecutionCase::AsyncQuery) { + if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) { return true; } } return false; } + bool HasExecutionCase(EExecutionCase executionCase) const { + if (ExecutionCases.empty()) { + return executionCase == EExecutionCase::GenericScript; + } + return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end(); + } + EExecutionCase GetExecutionCase(size_t index) const { return GetValue(index, ExecutionCases, EExecutionCase::GenericScript); } @@ -106,6 +110,113 @@ struct TExecutionOptions { }; } + void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const { + if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) { + ythrow yexception() << "Nothing to execute and is not running as daemon"; + } + + ValidateOptionsSizes(); + ValidateSchemeQueryOptions(runnerOptions); + ValidateScriptExecutionOptions(runnerOptions); + ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings); + ValidateTraceOpt(runnerOptions.TraceOptType); + } + +private: + void ValidateOptionsSizes() const { + const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) { + if (checkSize > numberQueries) { + ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries; + } + }; + + checker(ExecutionCases.size(), "execution cases"); + checker(ScriptQueryActions.size(), "script query actions"); + checker(Databases.size(), "databases"); + checker(TraceIds.size(), "trace ids"); + checker(PoolIds.size(), "pool ids"); + checker(UserSIDs.size(), "user SIDs"); + } + + void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + if (SchemeQuery) { + return; + } + if (runnerOptions.SchemeQueryAstOutput) { + ythrow yexception() << "Scheme query AST output can not be used without scheme query"; + } + } + + void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + // Script specific + if (HasExecutionCase(EExecutionCase::GenericScript)) { + return; + } + if (ForgetExecution) { + ythrow yexception() << "Forget execution can not be used without generic script queries"; + } + if (runnerOptions.ScriptCancelAfter) { + ythrow yexception() << "Cancel after can not be used without generic script queries"; + } + + // Script/Query specific + if (HasExecutionCase(EExecutionCase::GenericQuery)) { + return; + } + if (ResultsRowsLimit) { + ythrow yexception() << "Result rows limit can not be used without script queries"; + } + if (runnerOptions.InProgressStatisticsOutputFile) { + ythrow yexception() << "Script statistics can not be used without script queries"; + } + + // Common specific + if (HasExecutionCase(EExecutionCase::YqlScript)) { + return; + } + if (runnerOptions.ScriptQueryAstOutput) { + ythrow yexception() << "Script query AST output can not be used without script/yql queries"; + } + if (runnerOptions.ScriptQueryPlanOutput) { + ythrow yexception() << "Script query plan output can not be used without script/yql queries"; + } + } + + void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const { + if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) { + ythrow yexception() << "In flight limit can not be used without async queries"; + } + + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) { + Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl; + } + } + + void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const { + switch (traceOptType) { + case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: { + if (!SchemeQuery) { + ythrow yexception() << "Trace opt type scheme cannot be used without scheme query"; + } + break; + } + case NKqpRun::TRunnerOptions::ETraceOptType::Script: { + if (ScriptQueries.empty()) { + ythrow yexception() << "Trace opt type script cannot be used without script queries"; + } + } + case NKqpRun::TRunnerOptions::ETraceOptType::All: { + if (!SchemeQuery && ScriptQueries.empty()) { + ythrow yexception() << "Trace opt type all cannot be used without any queries"; + } + } + case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: { + break; + } + } + } + private: template static TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { @@ -278,7 +389,6 @@ class TMain : public TMainClassArgs { TVector UdfsPaths; TString UdfsDirectory; bool ExcludeLinkedUdfs = false; - ui64 ResultsRowsLimit = 1000; bool EmulateYt = false; static TString LoadFile(const TString& file) { @@ -415,8 +525,8 @@ class TMain : public TMainClassArgs { .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") .RequiredArgument("uint") - .DefaultValue(ResultsRowsLimit) - .StoreResult(&ResultsRowsLimit); + .DefaultValue(0) + .StoreResult(&ExecutionOptions.ResultsRowsLimit); TChoices resultFormat({ {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, @@ -441,7 +551,12 @@ class TMain : public TMainClassArgs { .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); options.AddLongOption("script-statistics", "File with script inprogress statistics") .RequiredArgument("file") - .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile); + .StoreMappedResultT(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) { + if (file == "-") { + ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name"; + } + return file; + }); TChoices planFormat({ {"pretty", NYdb::NConsoleClient::EDataFormat::Pretty}, {"table", NYdb::NConsoleClient::EDataFormat::PrettyTable}, @@ -453,6 +568,15 @@ class TMain : public TMainClassArgs { .Choices(planFormat.GetChoices()) .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); + options.AddLongOption("script-timeline-file", "File with script query timline in svg format") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) { + if (file == "-") { + ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name"; + } + return file; + }); + // Pipeline settings TChoices executionCase({ @@ -463,7 +587,6 @@ class TMain : public TMainClassArgs { }); options.AddLongOption('C', "execution-case", "Type of query for -p argument") .RequiredArgument("query-type") - .DefaultValue("script") .Choices(executionCase.GetChoices()) .Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) { TString choice(option->CurValOrDef()); @@ -489,13 +612,16 @@ class TMain : public TMainClassArgs { }); options.AddLongOption('A', "script-action", "Script query execute action") .RequiredArgument("script-action") - .DefaultValue("execute") .Choices(scriptAction.GetChoices()) .Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) { TString choice(option->CurValOrDef()); ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice)); }); + options.AddLongOption("timeout", "Reauests timeout in milliseconds") + .RequiredArgument("uint") + .StoreMappedResultT(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds); + options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds") .RequiredArgument("uint") .StoreMappedResultT(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds); @@ -510,7 +636,7 @@ class TMain : public TMainClassArgs { .StoreResult(&ExecutionOptions.LoopCount); options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") .RequiredArgument("uint") - .DefaultValue(1000) + .DefaultValue(0) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); options.AddLongOption('D', "database", "Database path for -p queries") @@ -576,6 +702,21 @@ class TMain : public TMainClassArgs { .RequiredArgument("path") .InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants); + options.AddLongOption("storage-size", "Domain storage size in gigabytes") + .RequiredArgument("uint") + .DefaultValue(32) + .StoreMappedResultT(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) { + return static_cast(diskSize) << 30; + }); + + options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)") + .NoArgument() + .SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks); + + options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster") + .NoArgument() + .SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock); + TChoices> backtrace({ {"heavy", &NKikimr::EnableYDBBacktraceFormat}, {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} @@ -591,13 +732,17 @@ class TMain : public TMainClassArgs { } int DoRun(NLastGetopt::TOptsParseResult&&) override { - if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) { - ythrow yexception() << "Nothing to execute"; + ExecutionOptions.Validate(RunnerOptions); + + if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) { + ythrow yexception() << "Disable disk mock cannot be used for multi node clusters"; } RunnerOptions.YdbSettings.YqlToken = YqlToken; RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); - RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit); + if (ExecutionOptions.ResultsRowsLimit) { + RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); + } if (EmulateYt) { const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 1488d8c32142..42f9249b2144 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -31,6 +31,11 @@ struct TYdbSetupSettings { std::unordered_set SharedTenants; std::unordered_set ServerlessTenants; TDuration InitializationTimeout = TDuration::Seconds(10); + TDuration RequestsTimeout; + + bool DisableDiskMock = false; + bool UseRealPDisks = false; + ui64 DiskSize = 32_GB; bool MonitoringEnabled = false; ui16 MonitoringPortOffset = 0; @@ -68,6 +73,7 @@ struct TRunnerOptions { IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; + TString ScriptQueryTimelineFile; TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 1a500f7d1152..f58eba9ee99e 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -9,6 +9,7 @@ #include #include +#include namespace NKqpRun { @@ -159,8 +160,12 @@ class TKqpRunner::TImpl { TYdbSetup::StopTraceOpt(); + if (!meta.Plan) { + meta.Plan = ExecutionMeta_.Plan; + } + PrintScriptAst(meta.Ast); - PrintScriptProgress(ExecutionMeta_.Plan); + PrintScriptProgress(meta.Plan); PrintScriptPlan(meta.Plan); PrintScriptFinish(meta, queryTypeStr); @@ -262,6 +267,7 @@ class TKqpRunner::TImpl { } PrintScriptAst(ExecutionMeta_.Ast); + PrintScriptProgress(ExecutionMeta_.Plan); PrintScriptPlan(ExecutionMeta_.Plan); PrintScriptFinish(ExecutionMeta_, "Script"); @@ -352,6 +358,15 @@ class TKqpRunner::TImpl { outputStream << "\nPlan visualization:" << Endl; PrintPlan(convertedPlan, &outputStream); + outputStream.Finish(); + } + if (Options_.ScriptQueryTimelineFile) { + TFileOutput outputStream(Options_.ScriptQueryTimelineFile); + + TPlanVisualizer planVisualizer; + planVisualizer.LoadPlans(plan); + outputStream.Write(planVisualizer.PrintSvg()); + outputStream.Finish(); } } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 4270d16b16f3..64203adfb19e 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -121,6 +122,18 @@ class TYdbSetup::TImpl { serverSettings.SetFrFactory(functionRegistryFactory); } + void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const { + const NKikimr::NFake::TStorage storage = { + .UseDisk = Settings_.UseRealPDisks, + .SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE, + .ChunkSize = Settings_.UseRealPDisks ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE, + .DiskSize = Settings_.DiskSize + }; + + serverSettings.SetEnableMockOnSingleNode(!Settings_.DisableDiskMock && !Settings_.UseRealPDisks); + serverSettings.SetCustomDiskParams(storage); + } + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { const ui32 msgBusPort = PortManager_.GetPort(); @@ -147,6 +160,7 @@ class TYdbSetup::TImpl { SetLoggerSettings(serverSettings); SetFunctionRegistry(serverSettings); + SetStorageSettings(serverSettings); if (Settings_.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); @@ -425,6 +439,10 @@ class TYdbSetup::TImpl { request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); request->SetDatabase(GetDatabasePath(query.Database)); request->SetPoolId(query.PoolId); + + if (Settings_.RequestsTimeout) { + request->SetTimeoutMs(Settings_.RequestsTimeout.MilliSeconds()); + } } void FillScriptRequest(const TRequestOptions& script, NKikimrKqp::TEvQueryRequest& event) const {