From 12f1bc05c61e6a430ff79cbe6f05a140ca2cdc1c Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 10 Sep 2024 15:51:20 +0000 Subject: [PATCH 1/6] Fixed AS pools in kqprun and added validations --- ydb/core/testlib/test_client.cpp | 58 +++--- ydb/tests/tools/kqprun/.gitignore | 1 + .../kqprun/configuration/app_config.conf | 46 +++++ ydb/tests/tools/kqprun/flame_graph.sh | 11 ++ ydb/tests/tools/kqprun/kqprun.cpp | 172 ++++++++++++++++-- ydb/tests/tools/kqprun/src/common.h | 6 + ydb/tests/tools/kqprun/src/kqp_runner.cpp | 13 +- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 18 ++ 8 files changed, 280 insertions(+), 45 deletions(-) create mode 100755 ydb/tests/tools/kqprun/flame_graph.sh diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index e41442337311..b1c77a9a1ce9 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -796,7 +796,7 @@ namespace Tests { NKikimr::NConfig::TConfigsDispatcherInitInfo { .InitialConfig = initial, }); - auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + auto aid = Runtime->Register(dispatcher, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableMetadataProvider()) { @@ -813,7 +813,7 @@ namespace Tests { } if (Settings->IsEnableExternalIndex()) { auto* actor = NCSIndex::CreateService(NCSIndex::TConfig()); - const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } { @@ -839,7 +839,7 @@ namespace Tests { Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); - TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx); + TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx, appData.UserPoolId); Runtime->RegisterService(NSysView::MakeSysViewServiceID(Runtime->GetNodeId(nodeIdx)), sysViewServiceId, nodeIdx); auto tenantPublisher = CreateTenantNodeEnumerationPublisher(); @@ -857,11 +857,13 @@ namespace Tests { } void TServer::SetupProxies(ui32 nodeIdx) { + const ui32 userPoolId = Runtime->GetAppData(nodeIdx).UserPoolId; + Runtime->SetTxAllocatorTabletIds({ChangeStateStorage(TxAllocator, Settings->Domain)}); { if (Settings->AuthConfig.HasLdapAuthentication()) { IActor* ldapAuthProvider = NKikimr::CreateLdapAuthProvider(Settings->AuthConfig.GetLdapAuthentication()); - TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx); + TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx, userPoolId); Runtime->RegisterService(MakeLdapAuthProviderID(), ldapAuthProviderId, nodeIdx); } TTicketParserSettings ticketParserSettings { @@ -873,19 +875,19 @@ namespace Tests { } }; IActor* ticketParser = Settings->CreateTicketParser(ticketParserSettings); - TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx); + TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx, userPoolId); Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx); } { IActor* healthCheck = NHealthCheck::CreateHealthCheckService(); - TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx); + TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx, userPoolId); Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx); } { const auto& appData = Runtime->GetAppData(nodeIdx); IActor* metadataCache = CreateDatabaseMetadataCache(appData.TenantName, appData.Counters).release(); - TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx); + TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx, userPoolId); Runtime->RegisterService(MakeDatabaseMetadataCacheId(Runtime->GetNodeId(nodeIdx)), metadataCacheId, nodeIdx); } { @@ -893,7 +895,7 @@ namespace Tests { IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor( Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources, Runtime->GetNodeId(nodeIdx)); - TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx); + TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx); if (!KqpLoggerScope) { @@ -916,14 +918,14 @@ namespace Tests { auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId(); Runtime->RegisterService( httpProxyActorId, - Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx), + Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx, userPoolId), nodeIdx ); auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId(); Runtime->RegisterService( databaseResolverActorId, - Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx), + Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx, userPoolId), nodeIdx ); @@ -957,50 +959,50 @@ namespace Tests { TVector(Settings->KqpSettings), nullptr, std::move(kqpProxySharedResources), federatedQuerySetupFactory, Settings->S3ActorsFactory); - TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); + TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService( Settings->AppConfig->GetQueryServiceConfig(), federatedQuerySetupFactory, Settings->S3ActorsFactory ); - TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx); + TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx); } { IActor* txProxy = CreateTxProxy(Runtime->GetTxAllocatorTabletIds()); - TActorId txProxyId = Runtime->Register(txProxy, nodeIdx); + TActorId txProxyId = Runtime->Register(txProxy, nodeIdx, userPoolId); Runtime->RegisterService(MakeTxProxyID(), txProxyId, nodeIdx); } { IActor* compileService = CreateMiniKQLCompileService(100000); - TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0); + TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, userPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx); } { IActor* longTxService = NLongTxService::CreateLongTxService(); - TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx); + TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx, userPoolId); Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx); } { IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy(); - TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx); + TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx, userPoolId); Runtime->RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, nodeIdx); } if (BusServer && nodeIdx == 0) { // MsgBus and GRPC are run now only on first node { IActor* proxy = BusServer->CreateProxy(); - TActorId proxyId = Runtime->Register(proxy, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0); + TActorId proxyId = Runtime->Register(proxy, nodeIdx, userPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NMsgBusProxy::CreateMsgBusProxyId(), proxyId, nodeIdx); } } { IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters()); - TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx); + TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx, userPoolId); Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx); } { @@ -1013,12 +1015,12 @@ namespace Tests { { IActor* pqClusterTracker = NPQ::NClusterTracker::CreateClusterTracker(); - TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx); + TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx, userPoolId); Runtime->RegisterService(NPQ::NClusterTracker::MakeClusterTrackerID(), pqClusterTrackerId, nodeIdx); } { IActor* pqReadCacheService = NPQ::CreatePQDReadCacheService(Runtime->GetDynamicCounters()); - TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx); + TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx, userPoolId); Runtime->RegisterService(NPQ::MakePQDReadCacheServiceActorId(), readCacheId, nodeIdx); } @@ -1028,7 +1030,7 @@ namespace Tests { new ::NMonitoring::TDynamicCounters(), TDuration::Seconds(1) ); - TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx); + TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx, userPoolId); Runtime->RegisterService(NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), pqMetaCacheId, nodeIdx); } } @@ -1039,7 +1041,7 @@ namespace Tests { try { fileBackend = MakeHolder(Settings->MeteringFilePath); auto meteringActor = NMetering::CreateMeteringWriter(std::move(fileBackend)); - TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx); + TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).IOPoolId); Runtime->RegisterService(NMetering::MakeMeteringServiceID(), meteringId, nodeIdx); } catch (const TFileError &ex) { @@ -1051,19 +1053,19 @@ namespace Tests { { IActor* kesusService = NKesus::CreateKesusProxyService(); - TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx); + TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId); Runtime->RegisterService(NKesus::MakeKesusProxyServiceId(), kesusServiceId, nodeIdx); } { IActor* netClassifier = NNetClassifier::CreateNetClassifier(); - TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx); + TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx, userPoolId); Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx); } { IActor* actor = CreatePollerActor(); - TActorId actorId = Runtime->Register(actor, nodeIdx); + TActorId actorId = Runtime->Register(actor, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId); Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx); } @@ -1075,11 +1077,11 @@ namespace Tests { } IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig()); - TActorId actorId = Runtime->Register(actor, nodeIdx); + TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId); Runtime->RegisterService(TActorId{}, actorId, nodeIdx); IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")}); - TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx); + TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx, userPoolId); Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx); } @@ -1206,7 +1208,7 @@ namespace Tests { IActor* viewer = CreateViewer(*Settings->KikimrRunConfig); SetupPQVirtualHandlers(dynamic_cast(viewer)); SetupDBVirtualHandlers(dynamic_cast(viewer)); - TActorId viewerId = Runtime->Register(viewer, nodeIdx); + TActorId viewerId = Runtime->Register(viewer, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId); Runtime->RegisterService(MakeViewerID(nodeIdx), viewerId, nodeIdx); } } diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index 807aadd42e70..8dba86c0931f 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -6,3 +6,4 @@ udfs *.sql *.bin *.txt +*.svg diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index b57e03854ae4..1c9c56cebd20 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -1,3 +1,49 @@ +ActorSystemConfig { + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 10 + Name: "System" + } + Executor { + Type: BASIC + Threads: 6 + SpinThreshold: 1 + Name: "User" + } + Executor { + Type: BASIC + Threads: 1 + SpinThreshold: 1 + Name: "Batch" + } + Executor { + Type: IO + Threads: 1 + Name: "IO" + } + Executor { + Type: BASIC + Threads: 2 + SpinThreshold: 10 + Name: "IC" + TimePerMailboxMicroSecs: 100 + } + Scheduler { + Resolution: 64 + SpinThreshold: 0 + ProgressThreshold: 10000 + } + SysExecutor: 0 + UserExecutor: 1 + IoExecutor: 3 + BatchExecutor: 2 + ServiceExecutor { + ServiceName: "Interconnect" + ExecutorId: 4 + } +} + ColumnShardConfig { DisabledOnSchemeShard: false } diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh new file mode 100755 index 000000000000..ead8de30683a --- /dev/null +++ b/ydb/tests/tools/kqprun/flame_graph.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +# For svg graph download https://github.com/brendangregg/FlameGraph +# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg` + +pid=$(pgrep -u $USER kqprun) + +echo "Target process id: ${pid}" + +sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30 +sudo perf script -i profdata > profdata.txt diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 3e4df13110a9..b45c994d7f38 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -44,25 +44,29 @@ struct TExecutionOptions { std::vector TraceIds; std::vector PoolIds; std::vector UserSIDs; + ui64 ResultsRowsLimit = 0; const TString DefaultTraceId = "kqprun"; bool HasResults() const { - if (ScriptQueries.empty()) { - return false; - } - - for (size_t i = 0; i < ExecutionCases.size(); ++i) { + for (size_t i = 0; i < ScriptQueries.size(); ++i) { if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { continue; } - if (ExecutionCases[i] != EExecutionCase::AsyncQuery) { + if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) { return true; } } return false; } + bool HasExecutionCase(EExecutionCase executionCase) const { + if (ExecutionCases.empty()) { + return executionCase == EExecutionCase::GenericScript; + } + return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end(); + } + EExecutionCase GetExecutionCase(size_t index) const { return GetValue(index, ExecutionCases, EExecutionCase::GenericScript); } @@ -106,6 +110,113 @@ struct TExecutionOptions { }; } + void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const { + if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) { + ythrow yexception() << "Nothing to execute and is not running as daemon"; + } + + ValidateOptionsSizes(); + ValidateSchemeQueryOptions(runnerOptions); + ValidateScriptExecutionOptions(runnerOptions); + ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings); + ValidateTraceOpt(runnerOptions.TraceOptType); + } + +private: + void ValidateOptionsSizes() const { + const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) { + if (checkSize > numberQueries) { + ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries; + } + }; + + checker(ExecutionCases.size(), "execution cases"); + checker(ScriptQueryActions.size(), "script query actions"); + checker(Databases.size(), "databases"); + checker(TraceIds.size(), "trace ids"); + checker(PoolIds.size(), "pool ids"); + checker(UserSIDs.size(), "user SIDs"); + } + + void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + if (SchemeQuery) { + return; + } + if (runnerOptions.SchemeQueryAstOutput) { + ythrow yexception() << "Scheme query AST output can not be used without scheme query"; + } + } + + void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + // Script specific + if (HasExecutionCase(EExecutionCase::GenericScript)) { + return; + } + if (ForgetExecution) { + ythrow yexception() << "Forget execution can not be used without generic script queries"; + } + if (runnerOptions.ScriptCancelAfter) { + ythrow yexception() << "Cancel after can not be used without generic script queries"; + } + + // Script/Query specific + if (HasExecutionCase(EExecutionCase::GenericQuery)) { + return; + } + if (ResultsRowsLimit) { + ythrow yexception() << "Result rows limit can not be used without script queries"; + } + if (runnerOptions.InProgressStatisticsOutputFile) { + ythrow yexception() << "Script statistics can not be used without script queries"; + } + + // Common specific + if (HasExecutionCase(EExecutionCase::YqlScript)) { + return; + } + if (runnerOptions.ScriptQueryAstOutput) { + ythrow yexception() << "Script query AST output can not be used without script/yql queries"; + } + if (runnerOptions.ScriptQueryPlanOutput) { + ythrow yexception() << "Script query plan output can not be used without script/yql queries"; + } + } + + void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const { + if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) { + ythrow yexception() << "In flight limit can not be used without async queries"; + } + + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) { + Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl; + } + } + + void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const { + switch (traceOptType) { + case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: { + if (!SchemeQuery) { + ythrow yexception() << "Trace opt type scheme cannot be used without scheme query"; + } + break; + } + case NKqpRun::TRunnerOptions::ETraceOptType::Script: { + if (ScriptQueries.empty()) { + ythrow yexception() << "Trace opt type script cannot be used without script queries"; + } + } + case NKqpRun::TRunnerOptions::ETraceOptType::All: { + if (!SchemeQuery && ScriptQueries.empty()) { + ythrow yexception() << "Trace opt type all cannot be used without any queries"; + } + } + case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: { + break; + } + } + } + private: template static TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { @@ -278,7 +389,6 @@ class TMain : public TMainClassArgs { TVector UdfsPaths; TString UdfsDirectory; bool ExcludeLinkedUdfs = false; - ui64 ResultsRowsLimit = 1000; bool EmulateYt = false; static TString LoadFile(const TString& file) { @@ -415,8 +525,8 @@ class TMain : public TMainClassArgs { .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") .RequiredArgument("uint") - .DefaultValue(ResultsRowsLimit) - .StoreResult(&ResultsRowsLimit); + .DefaultValue(0) + .StoreResult(&ExecutionOptions.ResultsRowsLimit); TChoices resultFormat({ {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, @@ -441,7 +551,12 @@ class TMain : public TMainClassArgs { .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); options.AddLongOption("script-statistics", "File with script inprogress statistics") .RequiredArgument("file") - .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile); + .StoreMappedResultT(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) { + if (file == "-") { + ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name"; + } + return file; + }); TChoices planFormat({ {"pretty", NYdb::NConsoleClient::EDataFormat::Pretty}, {"table", NYdb::NConsoleClient::EDataFormat::PrettyTable}, @@ -453,6 +568,10 @@ class TMain : public TMainClassArgs { .Choices(planFormat.GetChoices()) .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); + options.AddLongOption("script-timeline-svg", "File with script query timline in svg format (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryTimelineOutput, &GetDefaultOutput); + // Pipeline settings TChoices executionCase({ @@ -463,7 +582,6 @@ class TMain : public TMainClassArgs { }); options.AddLongOption('C', "execution-case", "Type of query for -p argument") .RequiredArgument("query-type") - .DefaultValue("script") .Choices(executionCase.GetChoices()) .Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) { TString choice(option->CurValOrDef()); @@ -489,13 +607,16 @@ class TMain : public TMainClassArgs { }); options.AddLongOption('A', "script-action", "Script query execute action") .RequiredArgument("script-action") - .DefaultValue("execute") .Choices(scriptAction.GetChoices()) .Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) { TString choice(option->CurValOrDef()); ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice)); }); + options.AddLongOption("timeout", "Reauests timeout in milliseconds") + .RequiredArgument("uint") + .StoreMappedResultT(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds); + options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds") .RequiredArgument("uint") .StoreMappedResultT(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds); @@ -510,7 +631,7 @@ class TMain : public TMainClassArgs { .StoreResult(&ExecutionOptions.LoopCount); options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") .RequiredArgument("uint") - .DefaultValue(1000) + .DefaultValue(0) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); options.AddLongOption('D', "database", "Database path for -p queries") @@ -576,6 +697,21 @@ class TMain : public TMainClassArgs { .RequiredArgument("path") .InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants); + options.AddLongOption("storage-size", "Domain storage size in gigabytes") + .RequiredArgument("uint") + .DefaultValue(32) + .StoreMappedResultT(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) { + return static_cast(diskSize) << 30; + }); + + options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)") + .NoArgument() + .SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks); + + options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster") + .NoArgument() + .SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock); + TChoices> backtrace({ {"heavy", &NKikimr::EnableYDBBacktraceFormat}, {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} @@ -591,13 +727,17 @@ class TMain : public TMainClassArgs { } int DoRun(NLastGetopt::TOptsParseResult&&) override { - if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) { - ythrow yexception() << "Nothing to execute"; + ExecutionOptions.Validate(RunnerOptions); + + if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) { + ythrow yexception() << "Disable disk mock cannot be used for multi node clusters"; } RunnerOptions.YdbSettings.YqlToken = YqlToken; RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); - RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit); + if (ExecutionOptions.ResultsRowsLimit) { + RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); + } if (EmulateYt) { const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 8e588c3f620d..3a60c4b94615 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -32,6 +32,11 @@ struct TYdbSetupSettings { std::unordered_set SharedTenants; std::unordered_set ServerlessTenants; TDuration InitializationTimeout = TDuration::Seconds(10); + TDuration RequestsTimeout; + + bool DisableDiskMock = false; + bool UseRealPDisks = false; + ui64 DiskSize = 32_GB; bool MonitoringEnabled = false; ui16 MonitoringPortOffset = 0; @@ -69,6 +74,7 @@ struct TRunnerOptions { IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; + IOutputStream* ScriptQueryTimelineOutput = nullptr; TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 1a500f7d1152..d6afdf6b3824 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -9,6 +9,7 @@ #include #include +#include namespace NKqpRun { @@ -159,8 +160,12 @@ class TKqpRunner::TImpl { TYdbSetup::StopTraceOpt(); + if (!meta.Plan) { + meta.Plan = ExecutionMeta_.Plan; + } + PrintScriptAst(meta.Ast); - PrintScriptProgress(ExecutionMeta_.Plan); + PrintScriptProgress(meta.Plan); PrintScriptPlan(meta.Plan); PrintScriptFinish(meta, queryTypeStr); @@ -323,6 +328,12 @@ class TKqpRunner::TImpl { Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; PrintPlan(plan, Options_.ScriptQueryPlanOutput); } + if (Options_.ScriptQueryTimelineOutput) { + Cout << CoutColors_.Cyan() << "Writing script query timeline" << CoutColors_.Default() << Endl; + TPlanVisualizer planVisualizer; + planVisualizer.LoadPlans(plan); + Options_.ScriptQueryTimelineOutput->Write(planVisualizer.PrintSvg()); + } } void PrintScriptProgress(const TString& plan) const { diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 4270d16b16f3..64203adfb19e 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -121,6 +122,18 @@ class TYdbSetup::TImpl { serverSettings.SetFrFactory(functionRegistryFactory); } + void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const { + const NKikimr::NFake::TStorage storage = { + .UseDisk = Settings_.UseRealPDisks, + .SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE, + .ChunkSize = Settings_.UseRealPDisks ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE, + .DiskSize = Settings_.DiskSize + }; + + serverSettings.SetEnableMockOnSingleNode(!Settings_.DisableDiskMock && !Settings_.UseRealPDisks); + serverSettings.SetCustomDiskParams(storage); + } + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { const ui32 msgBusPort = PortManager_.GetPort(); @@ -147,6 +160,7 @@ class TYdbSetup::TImpl { SetLoggerSettings(serverSettings); SetFunctionRegistry(serverSettings); + SetStorageSettings(serverSettings); if (Settings_.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); @@ -425,6 +439,10 @@ class TYdbSetup::TImpl { request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); request->SetDatabase(GetDatabasePath(query.Database)); request->SetPoolId(query.PoolId); + + if (Settings_.RequestsTimeout) { + request->SetTimeoutMs(Settings_.RequestsTimeout.MilliSeconds()); + } } void FillScriptRequest(const TRequestOptions& script, NKikimrKqp::TEvQueryRequest& event) const { From 966dc91a689653491e68010e789381dd38b18dc6 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 10 Sep 2024 15:53:08 +0000 Subject: [PATCH 2/6] Added *.old in gitignore --- ydb/tests/tools/kqprun/.gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index 8dba86c0931f..a4578942a676 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -7,3 +7,4 @@ udfs *.bin *.txt *.svg +*.old From 233e1b1b26e67abce9673392622f432140ecd2ca Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 11 Sep 2024 07:02:02 +0000 Subject: [PATCH 3/6] Fixed timeline printing --- ydb/tests/tools/kqprun/kqprun.cpp | 9 +++++++-- ydb/tests/tools/kqprun/src/common.h | 2 +- ydb/tests/tools/kqprun/src/kqp_runner.cpp | 16 ++++++++++------ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b45c994d7f38..88802d2aead9 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -568,9 +568,14 @@ class TMain : public TMainClassArgs { .Choices(planFormat.GetChoices()) .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); - options.AddLongOption("script-timeline-svg", "File with script query timline in svg format (use '-' to write in stdout)") + options.AddLongOption("script-timeline-file", "File with script query timline in svg format") .RequiredArgument("file") - .StoreMappedResultT(&RunnerOptions.ScriptQueryTimelineOutput, &GetDefaultOutput); + .StoreMappedResultT(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) { + if (file == "-") { + ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name"; + } + return file; + }); // Pipeline settings diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 3a60c4b94615..0bd74b487012 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -74,7 +74,7 @@ struct TRunnerOptions { IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; - IOutputStream* ScriptQueryTimelineOutput = nullptr; + TString ScriptQueryTimelineFile; TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index d6afdf6b3824..f58eba9ee99e 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -267,6 +267,7 @@ class TKqpRunner::TImpl { } PrintScriptAst(ExecutionMeta_.Ast); + PrintScriptProgress(ExecutionMeta_.Plan); PrintScriptPlan(ExecutionMeta_.Plan); PrintScriptFinish(ExecutionMeta_, "Script"); @@ -328,12 +329,6 @@ class TKqpRunner::TImpl { Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; PrintPlan(plan, Options_.ScriptQueryPlanOutput); } - if (Options_.ScriptQueryTimelineOutput) { - Cout << CoutColors_.Cyan() << "Writing script query timeline" << CoutColors_.Default() << Endl; - TPlanVisualizer planVisualizer; - planVisualizer.LoadPlans(plan); - Options_.ScriptQueryTimelineOutput->Write(planVisualizer.PrintSvg()); - } } void PrintScriptProgress(const TString& plan) const { @@ -363,6 +358,15 @@ class TKqpRunner::TImpl { outputStream << "\nPlan visualization:" << Endl; PrintPlan(convertedPlan, &outputStream); + outputStream.Finish(); + } + if (Options_.ScriptQueryTimelineFile) { + TFileOutput outputStream(Options_.ScriptQueryTimelineFile); + + TPlanVisualizer planVisualizer; + planVisualizer.LoadPlans(plan); + outputStream.Write(planVisualizer.PrintSvg()); + outputStream.Finish(); } } From b5726c567d7e7cdf0a589ef57008f0b42211a333 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 12 Sep 2024 22:12:19 +0000 Subject: [PATCH 4/6] Fixed inprogress stats for -C query on multi node tenant --- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 64203adfb19e..d9052649d35b 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -329,7 +329,7 @@ class TYdbSetup::TImpl { TQueryResponse QueryRequest(const TRequestOptions& query, TProgressCallback progressCallback) const { auto request = GetQueryRequest(query); auto promise = NThreading::NewPromise(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), 0, GetRuntime()->GetAppData().UserPoolId); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), request.TargetNode - GetRuntime()->GetFirstNodeId(), GetRuntime()->GetAppData().UserPoolId); return promise.GetFuture().GetValueSync(); } From 5ec43d7c347bb08016bd2053b8b13ab6fc0560fc Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Fri, 13 Sep 2024 09:03:06 +0000 Subject: [PATCH 5/6] Removed transaction from script execution --- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index d9052649d35b..d481ef254281 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -321,7 +321,7 @@ class TYdbSetup::TImpl { NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TRequestOptions& script) const { auto event = MakeHolder(); - FillScriptRequest(script, event->Record); + FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event->Record); return RunKqpProxyRequest(std::move(event), script.Database); } @@ -445,16 +445,6 @@ class TYdbSetup::TImpl { } } - void FillScriptRequest(const TRequestOptions& script, NKikimrKqp::TEvQueryRequest& event) const { - FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event); - - auto request = event.MutableRequest(); - if (script.Action == NKikimrKqp::QUERY_ACTION_EXECUTE) { - request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request->MutableTxControl()->set_commit_tx(true); - } - } - TQueryRequest GetQueryRequest(const TRequestOptions& query) const { auto event = std::make_unique(); FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, event->Record); From 49b4ac1fdf0837c9c275203bbf0edde1fdb95e8a Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Fri, 13 Sep 2024 11:03:07 +0000 Subject: [PATCH 6/6] Fixed grpc pools --- ydb/core/testlib/test_client.cpp | 20 ++++++++++++-------- ydb/core/testlib/test_client.h | 4 ++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index b1c77a9a1ce9..b75af98027d6 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -338,11 +338,11 @@ namespace Tests { } } - void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options) { + void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) { GRpcServer.reset(new NYdbGrpc::TGRpcServer(options)); auto grpcService = new NGRpcProxy::TGRpcService(); - auto system(Runtime->GetAnyNodeActorSystem()); + auto system(Runtime->GetActorSystem(grpcServiceNodeId)); if (Settings->Verbose) { Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl; @@ -352,21 +352,23 @@ namespace Tests { TVector grpcRequestProxies; grpcRequestProxies.reserve(proxyCount); - auto& appData = Runtime->GetAppData(); + auto& appData = Runtime->GetAppData(grpcServiceNodeId); NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator(appData.TimeProvider, appData.RandomProvider); for (size_t i = 0; i < proxyCount; ++i) { auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(*Settings->AppConfig, tracingConfigurator.GetControl()); - auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled); + auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled, appData.UserPoolId); system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId); grpcRequestProxies.push_back(grpcRequestProxyId); } system->Register( - NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Settings->AppConfig->GetTracingConfig()) + NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Settings->AppConfig->GetTracingConfig()), + TMailboxType::ReadAsFilled, + appData.UserPoolId ); - auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled); + auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId); system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon); GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); @@ -428,6 +430,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcYdbObjectStorageService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYmqService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1)); GRpcServer->AddService(new NGRpcService::TGRpcYdbTabletService(system, counters, grpcRequestProxies, true, 1)); @@ -448,11 +451,12 @@ namespace Tests { GRpcServer->Start(); } - void TServer::EnableGRpc(ui16 port) { + void TServer::EnableGRpc(ui16 port, ui32 grpcServiceNodeId) { EnableGRpc(NYdbGrpc::TServerOptions() .SetHost("localhost") .SetPort(port) - .SetLogger(NYdbGrpc::CreateActorSystemLogger(*Runtime->GetAnyNodeActorSystem(), NKikimrServices::GRPC_SERVER)) + .SetLogger(NYdbGrpc::CreateActorSystemLogger(*Runtime->GetActorSystem(grpcServiceNodeId), NKikimrServices::GRPC_SERVER)), + grpcServiceNodeId ); } diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 7e476a05dab1..db3a73415ed5 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -295,8 +295,8 @@ namespace Tests { TServer& operator =(TServer&& server) = default; virtual ~TServer(); - void EnableGRpc(const NYdbGrpc::TServerOptions& options); - void EnableGRpc(ui16 port); + void EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId = 0); + void EnableGRpc(ui16 port, ui32 grpcServiceNodeId = 0); void SetupRootStoragePools(const TActorId sender) const; void SetupDefaultProfiles();