diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index e240c8c1f76b..807aadd42e70 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -5,3 +5,4 @@ udfs *.json *.sql *.bin +*.txt diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index cdf48036a0e0..bd8b36376df6 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -25,6 +25,7 @@ LogConfig { QueryServiceConfig { MdbTransformHost: false + ProgressStatsPeriodMs: 1000 QueryArtifactsCompressionMethod: "zstd_6" ScriptResultRowsLimit: 0 ScriptResultSizeLimit: 10485760 @@ -66,6 +67,10 @@ QueryServiceConfig { MinDesiredDirectoriesOfFilesPerQuery: 1000 RegexpCacheSize: 100 + DefaultSettings { + Name: "AtomicUploadCommit" + Value: "true" + } DefaultSettings { Name: "UseBlocksSource" Value: "true" @@ -91,6 +96,7 @@ ResourceBrokerConfig { TableServiceConfig { BindingsMode: BM_DROP CompileTimeoutMs: 600000 + EnableOlapSink: true SessionsLimitPerNode: 1000 QueryLimits { diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 0b3f6677d846..7b841ae47209 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -6,120 +6,186 @@ #include #include +#include #include #include +#include + +#include #include #include -#include struct TExecutionOptions { - enum class EClearExecutionCase { - Disabled, + enum class EExecutionCase { + GenericScript, GenericQuery, - YqlScript + YqlScript, + AsyncQuery }; - TString ScriptQuery; + std::vector ScriptQueries; TString SchemeQuery; + ui32 LoopCount = 1; + TDuration LoopDelay; + bool ForgetExecution = false; - EClearExecutionCase ClearExecution = EClearExecutionCase::Disabled; + std::vector ExecutionCases; NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; - TString TraceId = "kqprun"; + const TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { - return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; + if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { + return false; + } + + for (EExecutionCase executionCase : ExecutionCases) { + if (executionCase != EExecutionCase::AsyncQuery) { + return true; + } + } + return false; + } + + EExecutionCase GetExecutionCase(size_t index) const { + Y_ABORT_UNLESS(!ExecutionCases.empty()); + return ExecutionCases[std::min(index, ExecutionCases.size() - 1)]; } }; -void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) { +void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); - Cout << colors.Yellow() << "Initialization of kqp runner..." << colors.Default() << Endl; - NKqpRun::TKqpRunner runner(runnerOptions); - if (executionOptions.SchemeQuery) { - Cout << colors.Yellow() << "Executing scheme query..." << colors.Default() << Endl; + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing scheme query..." << colors.Default() << Endl; if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) { - ythrow yexception() << "Scheme query execution failed"; + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Scheme query execution failed"; } } - if (executionOptions.ScriptQuery) { - Cout << colors.Yellow() << "Executing script..." << colors.Default() << Endl; - switch (executionOptions.ClearExecution) { - case TExecutionOptions::EClearExecutionCase::Disabled: - if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) { - ythrow yexception() << "Script execution failed"; + const size_t numberQueries = executionOptions.ScriptQueries.size(); + const size_t numberLoops = executionOptions.LoopCount; + for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) { + size_t id = queryId % numberQueries; + if (id == 0 && queryId > 0) { + Sleep(executionOptions.LoopDelay); + } + + const auto executionCase = executionOptions.GetExecutionCase(id); + if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + if (numberQueries > 1) { + Cout << " " << id; + } + if (numberLoops != 1) { + Cout << ", loop " << queryId / numberQueries; } - Cout << colors.Yellow() << "Fetching script results..." << colors.Default() << Endl; + Cout << "..." << colors.Default() << Endl; + } + + TInstant startTime = TInstant::Now(); + switch (executionCase) { + case TExecutionOptions::EExecutionCase::GenericScript: + if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; + } + Cout << colors.Cyan() << "Script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl; if (!runner.FetchScriptResults()) { - ythrow yexception() << "Fetch script results failed"; + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch script results failed"; } if (executionOptions.ForgetExecution) { - Cout << colors.Yellow() << "Forgetting script execution operation..." << colors.Default() << Endl; + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Forgetting script execution operation..." << colors.Default() << Endl; if (!runner.ForgetExecutionOperation()) { - ythrow yexception() << "Forget script execution operation failed"; + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Forget script execution operation failed"; } } break; - case TExecutionOptions::EClearExecutionCase::GenericQuery: - if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) { - ythrow yexception() << "Query execution failed"; + case TExecutionOptions::EExecutionCase::GenericQuery: + if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } + Cout << colors.Cyan() << "Generic request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; break; - case TExecutionOptions::EClearExecutionCase::YqlScript: - if (!runner.ExecuteYqlScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) { - ythrow yexception() << "Yql script execution failed"; + case TExecutionOptions::EExecutionCase::YqlScript: + if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } + Cout << colors.Cyan() << "Yql script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; + break; + + case TExecutionOptions::EExecutionCase::AsyncQuery: + runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId); break; } } + runner.WaitAsyncQueries(); if (executionOptions.HasResults()) { - runner.PrintScriptResults(); + try { + runner.PrintScriptResults(); + } catch (...) { + ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage(); + } } - - Cout << colors.Yellow() << "Finalization of kqp runner..." << colors.Default() << Endl; } -THolder SetupDefaultFileOutput(const TString& filePath, IOutputStream*& stream) { - THolder fileHolder; - if (filePath == "-") { - stream = &Cout; - } else if (filePath) { - fileHolder.Reset(new TFileOutput(filePath)); - stream = fileHolder.Get(); +void RunAsDaemon() { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl; + while (true) { + TString command; + Cin >> command; + + if (command == "exit") { + break; + } + Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl; } - return fileHolder; } -void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { - TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; - for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { - query.replace(position, variableTemplate.size(), variableValue); - position += variableValue.size(); +void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl; + NKqpRun::TKqpRunner runner(runnerOptions); + + try { + RunArgumentQueries(executionOptions, runner); + } catch (const yexception& exception) { + if (runnerOptions.YdbSettings.MonitoringEnabled) { + Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl; + } else { + throw exception; + } + } + + if (runnerOptions.YdbSettings.MonitoringEnabled) { + RunAsDaemon(); } + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of kqp runner..." << colors.Default() << Endl; } TIntrusivePtr CreateFunctionRegistry(const TString& udfsDirectory, TVector udfsPaths, bool excludeLinkedUdfs) { if (!udfsDirectory.empty() || !udfsPaths.empty()) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); - Cout << colors.Yellow() << "Fetching udfs..." << colors.Default() << Endl; + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching udfs..." << colors.Default() << Endl; } NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths); - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); if (excludeLinkedUdfs) { for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { @@ -136,194 +202,271 @@ TIntrusivePtr CreateFunctionRegistr } -void RunMain(int argc, const char* argv[]) { - TExecutionOptions executionOptions; - NKqpRun::TRunnerOptions runnerOptions; - - TString scriptQueryFile; - TString schemeQueryFile; - TString resultOutputFile = "-"; - TString schemeQueryAstFile; - TString scriptQueryAstFile; - TString scriptQueryPlanFile; - TString logFile = "-"; - TString appConfigFile = "./configuration/app_config.conf"; - - TString clearExecutionType = "disabled"; - TString traceOptType = "disabled"; - TString scriptQueryAction = "execute"; - TString planOutputFormat = "pretty"; - TString resultOutputFormat = "rows"; - i64 resultsRowsLimit = 1000; - - TVector udfsPaths; - TString udfsDirectory; - bool excludeLinkedUdfs = false; - - NLastGetopt::TOpts options = NLastGetopt::TOpts::Default(); - options.AddLongOption('p', "script-query", "Script query to execute") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryFile); - options.AddLongOption('s', "scheme-query", "Scheme query to execute") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&schemeQueryFile); - options.AddLongOption('c', "app-config", "File with app config (TAppConfig)") - .Optional() - .RequiredArgument("FILE") - .DefaultValue(appConfigFile) - .StoreResult(&appConfigFile); - - options.AddLongOption("log-file", "File with execution logs (use '-' to write in stderr)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&logFile); - options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&resultOutputFile); - options.AddLongOption("scheme-ast-file", "File with scheme query ast (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&schemeQueryAstFile); - options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryAstFile); - options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryPlanFile); - - options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(clearExecutionType) - .StoreResult(&clearExecutionType); - options.AddLongOption('F', "forget", "Forget script execution operation after fetching results, cannot be used with -C") - .Optional() - .NoArgument() - .DefaultValue(executionOptions.ForgetExecution) - .SetFlag(&executionOptions.ForgetExecution); - options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation, one of { scheme | script | all }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(traceOptType) - .StoreResult(&traceOptType); - options.AddLongOption('A', "script-action", "Script query execute action, one of { execute | explain }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(scriptQueryAction) - .StoreResult(&scriptQueryAction); - options.AddLongOption('P', "plan-format", "Script query plan format, one of { pretty | table | json }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(planOutputFormat) - .StoreResult(&planOutputFormat); - options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(resultOutputFormat) - .StoreResult(&resultOutputFormat); - options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") - .Optional() - .RequiredArgument("INT") - .DefaultValue(resultsRowsLimit) - .StoreResult(&resultsRowsLimit); - - options.AddLongOption('u', "udf", "Load shared library with UDF by given path") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&udfsPaths); - options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") - .Optional() - .RequiredArgument("PATH") - .StoreResult(&udfsDirectory); - options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") - .Optional() - .NoArgument() - .DefaultValue(excludeLinkedUdfs) - .SetFlag(&excludeLinkedUdfs); - - NLastGetopt::TOptsParseResult parsedOptions(&options, argc, argv); - - // Environment variables - - const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); - - // Execution options - - if (!schemeQueryFile && !scriptQueryFile) { - ythrow yexception() << "Nothing to execute"; - } - if (schemeQueryFile) { - executionOptions.SchemeQuery = TFileInput(schemeQueryFile).ReadAll(); - ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, yqlToken, executionOptions.SchemeQuery); +class TMain : public TMainClassArgs { + inline static const TString YqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); + inline static std::vector> FileHolders; + + TExecutionOptions ExecutionOptions; + NKqpRun::TRunnerOptions RunnerOptions; + + TVector UdfsPaths; + TString UdfsDirectory; + bool ExcludeLinkedUdfs = false; + ui64 ResultsRowsLimit = 1000; + + static TString LoadFile(const TString& file) { + return TFileInput(file).ReadAll(); } - if (scriptQueryFile) { - executionOptions.ScriptQuery = TFileInput(scriptQueryFile).ReadAll(); + + static void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { + TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; + for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { + query.replace(position, variableTemplate.size(), variableValue); + position += variableValue.size(); + } } - executionOptions.ClearExecution = - (clearExecutionType == TStringBuf("query")) ? TExecutionOptions::EClearExecutionCase::GenericQuery - : (clearExecutionType == TStringBuf("yql-script")) ? TExecutionOptions::EClearExecutionCase::YqlScript - : (clearExecutionType == TStringBuf("disabled")) ? TExecutionOptions::EClearExecutionCase::Disabled - : TExecutionOptions::EClearExecutionCase::Disabled; - - executionOptions.ScriptQueryAction = - (scriptQueryAction == TStringBuf("execute")) ? NKikimrKqp::QUERY_ACTION_EXECUTE - : (scriptQueryAction == TStringBuf("explain")) ? NKikimrKqp::QUERY_ACTION_EXPLAIN - : NKikimrKqp::QUERY_ACTION_EXECUTE; - - // Runner options - - THolder resultFileHolder = SetupDefaultFileOutput(resultOutputFile, runnerOptions.ResultOutput); - THolder schemeQueryAstFileHolder = SetupDefaultFileOutput(schemeQueryAstFile, runnerOptions.SchemeQueryAstOutput); - THolder scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput); - THolder scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput); - - runnerOptions.TraceOptType = - (traceOptType == TStringBuf("all")) ? NKqpRun::TRunnerOptions::ETraceOptType::All - : (traceOptType == TStringBuf("scheme")) ? NKqpRun::TRunnerOptions::ETraceOptType::Scheme - : (traceOptType == TStringBuf("script")) ? NKqpRun::TRunnerOptions::ETraceOptType::Script - : (traceOptType == TStringBuf("disabled")) ? NKqpRun::TRunnerOptions::ETraceOptType::Disabled - : NKqpRun::TRunnerOptions::ETraceOptType::All; - runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; - - runnerOptions.ResultOutputFormat = - (resultOutputFormat == TStringBuf("rows")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson - : (resultOutputFormat == TStringBuf("full")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson - : NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson; - - runnerOptions.PlanOutputFormat = - (planOutputFormat == TStringBuf("pretty")) ? NYdb::NConsoleClient::EOutputFormat::Pretty - : (planOutputFormat == TStringBuf("table")) ? NYdb::NConsoleClient::EOutputFormat::PrettyTable - : (planOutputFormat == TStringBuf("json")) ? NYdb::NConsoleClient::EOutputFormat::JsonUnicode - : NYdb::NConsoleClient::EOutputFormat::Default; - - // Ydb settings - - if (logFile != "-") { - runnerOptions.YdbSettings.LogOutputFile = logFile; - std::remove(logFile.c_str()); + static IOutputStream* GetDefaultOutput(const TString& file) { + if (file == "-") { + return &Cout; + } + if (file) { + FileHolders.emplace_back(new TFileOutput(file)); + return FileHolders.back().get(); + } + return nullptr; } - runnerOptions.YdbSettings.YqlToken = yqlToken; - runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths, excludeLinkedUdfs).Get(); + template + class TChoices { + public: + explicit TChoices(std::map choicesMap) + : ChoicesMap(std::move(choicesMap)) + {} - TString appConfigData = TFileInput(appConfigFile).ReadAll(); - if (!google::protobuf::TextFormat::ParseFromString(appConfigData, &runnerOptions.YdbSettings.AppConfig)) { - ythrow yexception() << "Bad format of app configuration"; + TResult operator()(const TString& choice) const { + return ChoicesMap.at(choice); + } + + TVector GetChoices() const { + TVector choices; + choices.reserve(ChoicesMap.size()); + for (const auto& [choice, _] : ChoicesMap) { + choices.emplace_back(choice); + } + return choices; + } + + private: + const std::map ChoicesMap; + }; + +protected: + void RegisterOptions(NLastGetopt::TOpts& options) override { + options.SetTitle("KqpRun -- tool to execute queries by using kikimr provider (instead of dq provider in DQrun tool)"); + options.AddHelpOption('h'); + options.SetFreeArgsNum(0); + + // Inputs + + options.AddLongOption('s', "scheme-query", "Scheme query to execute (typically DDL/DCL query)") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + ExecutionOptions.SchemeQuery = LoadFile(option->CurVal()); + ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, YqlToken, ExecutionOptions.SchemeQuery); + }); + options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); + }); + + options.AddLongOption('c', "app-config", "File with app config (TAppConfig for ydb tennant)") + .RequiredArgument("file") + .DefaultValue("./configuration/app_config.conf") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TString file(option->CurValOrDef()); + if (file.EndsWith(".yaml")) { + auto document = NKikimr::NFyaml::TDocument::Parse(LoadFile(file)); + RunnerOptions.YdbSettings.AppConfig = NKikimr::NYamlConfig::YamlToProto(document.Root()); + } else if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &RunnerOptions.YdbSettings.AppConfig)) { + ythrow yexception() << "Bad format of app configuration"; + } + }); + + options.AddLongOption('u', "udf", "Load shared library with UDF by given path") + .RequiredArgument("file") + .EmplaceTo(&UdfsPaths); + options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") + .RequiredArgument("directory") + .StoreResult(&UdfsDirectory); + options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") + .NoArgument() + .SetFlag(&ExcludeLinkedUdfs); + + // Outputs + + options.AddLongOption("log-file", "File with execution logs (writes in stderr if empty)") + .RequiredArgument("file") + .StoreResult(&RunnerOptions.YdbSettings.LogOutputFile) + .Handler1([](const NLastGetopt::TOptsParser* option) { + if (const TString& file = option->CurVal()) { + std::remove(file.c_str()); + } + }); + TChoices traceOpt({ + {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, + {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, + {"script", NKqpRun::TRunnerOptions::ETraceOptType::Script}, + {"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled} + }); + options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation") + .RequiredArgument("trace-opt-query") + .DefaultValue("disabled") + .Choices(traceOpt.GetChoices()) + .StoreMappedResultT(&RunnerOptions.TraceOptType, [this, traceOpt](const TString& choise) { + auto traceOptType = traceOpt(choise); + RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; + return traceOptType; + }); + + options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") + .RequiredArgument("file") + .DefaultValue("-") + .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); + options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") + .RequiredArgument("uint") + .DefaultValue(ResultsRowsLimit) + .StoreResult(&ResultsRowsLimit); + TChoices resultFormat({ + {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, + {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, + {"full-proto", NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto} + }); + options.AddLongOption('R', "result-format", "Script query result format") + .RequiredArgument("result-format") + .DefaultValue("rows") + .Choices(resultFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.ResultOutputFormat, resultFormat); + + options.AddLongOption("scheme-ast-file", "File with scheme query ast (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.SchemeQueryAstOutput, &GetDefaultOutput); + + options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryAstOutput, &GetDefaultOutput); + + options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); + options.AddLongOption("script-statistics", "File with script inprogress statistics") + .RequiredArgument("file") + .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile); + TChoices planFormat({ + {"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty}, + {"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable}, + {"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode}, + }); + options.AddLongOption('P', "plan-format", "Script query plan format") + .RequiredArgument("plan-format") + .DefaultValue("pretty") + .Choices(planFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); + + // Pipeline settings + + TChoices executionCase({ + {"script", TExecutionOptions::EExecutionCase::GenericScript}, + {"query", TExecutionOptions::EExecutionCase::GenericQuery}, + {"yql-script", TExecutionOptions::EExecutionCase::YqlScript}, + {"async", TExecutionOptions::EExecutionCase::AsyncQuery} + }); + options.AddLongOption('C', "execution-case", "Type of query for -p argument") + .RequiredArgument("query-type") + .DefaultValue("script") + .Choices(executionCase.GetChoices()) + .Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice)); + }); + options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.YdbSettings.InFlightLimit) + .StoreResult(&RunnerOptions.YdbSettings.InFlightLimit); + + TChoices scriptAction({ + {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, + {"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN} + }); + options.AddLongOption('A', "script-action", "Script query execute action") + .RequiredArgument("script-action") + .DefaultValue("execute") + .Choices(scriptAction.GetChoices()) + .StoreMappedResultT(&ExecutionOptions.ScriptQueryAction, scriptAction); + + options.AddLongOption('F', "forget", "Forget script execution operation after fetching results") + .NoArgument() + .SetFlag(&ExecutionOptions.ForgetExecution); + + options.AddLongOption("loop-count", "Number of runs of the script query (use 0 to start infinite loop)") + .RequiredArgument("uint") + .DefaultValue(ExecutionOptions.LoopCount) + .StoreResult(&ExecutionOptions.LoopCount); + options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") + .RequiredArgument("uint") + .DefaultValue(1000) + .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + + // Cluster settings + + options.AddLongOption('N', "node-count", "Number of nodes to create") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.YdbSettings.NodeCount) + .StoreMappedResultT(&RunnerOptions.YdbSettings.NodeCount, [](ui32 nodeCount) { + if (nodeCount < 1) { + ythrow yexception() << "Number of nodes less than one"; + } + return nodeCount; + }); + + options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be runs as daemon") + .RequiredArgument("uint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + if (const TString& port = option->CurVal()) { + RunnerOptions.YdbSettings.MonitoringEnabled = true; + RunnerOptions.YdbSettings.MonitoringPortOffset = FromString(port); + } + }); + + TChoices> backtrace({ + {"heavy", &NKikimr::EnableYDBBacktraceFormat}, + {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} + }); + options.AddLongOption("backtrace", "Default backtrace format function") + .RequiredArgument("backtrace-type") + .DefaultValue("heavy") + .Choices(backtrace.GetChoices()) + .Handler1([backtrace](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + backtrace(choice)(); + }); } - if (resultsRowsLimit < 0) { - ythrow yexception() << "Results rows limit less than zero"; + int DoRun(NLastGetopt::TOptsParseResult&&) override { + if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) { + ythrow yexception() << "Nothing to execute"; + } + + RunnerOptions.YdbSettings.YqlToken = YqlToken; + RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); + RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit); + + RunScript(ExecutionOptions, RunnerOptions); + return 0; } - runnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(resultsRowsLimit); +}; - RunScript(executionOptions, runnerOptions); -} void KqprunTerminateHandler() { NColorizer::TColors colors = NColorizer::AutoColors(Cerr); @@ -335,11 +478,24 @@ void KqprunTerminateHandler() { abort(); } + +void SegmentationFaultHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << "======= segmentation fault call stack ========" << colors.Default() << Endl; + FormatBackTrace(&Cerr); + Cerr << colors.Red() << "==============================================" << colors.Default() << Endl; + + abort(); +} + + int main(int argc, const char* argv[]) { std::set_terminate(KqprunTerminateHandler); + signal(SIGSEGV, &SegmentationFaultHandler); try { - RunMain(argc, argv); + TMain().Run(argc, argv); } catch (...) { NColorizer::TColors colors = NColorizer::AutoColors(Cerr); diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 264eb6abcba6..5e962c59157e 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -1,6 +1,9 @@ #include "actors.h" +#include + #include +#include namespace NKqpRun { @@ -9,26 +12,25 @@ namespace { class TRunScriptActorMock : public NActors::TActorBootstrapped { public: - TRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets) - : Request_(std::move(request)) + TRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback) + : TargetNode_(request.TargetNode) + , Request_(std::move(request.Event)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) , ResultSizeLimit_(std::numeric_limits::max()) - , ResultSets_(resultSets) + , ProgressCallback_(progressCallback) { - if (resultRowsLimit) { - ResultRowsLimit_ = resultRowsLimit; + if (request.ResultRowsLimit) { + ResultRowsLimit_ = request.ResultRowsLimit; } - if (resultSizeLimit) { - ResultSizeLimit_ = resultSizeLimit; + if (request.ResultSizeLimit) { + ResultSizeLimit_ = request.ResultSizeLimit; } } void Bootstrap() { NActors::ActorIdToProto(SelfId(), Request_->Record.MutableRequestActorId()); - Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), std::move(Request_)); + Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode_), std::move(Request_)); Become(&TRunScriptActorMock::StateFunc); } @@ -36,6 +38,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrappedGet()->Record.GetQueryResultIndex(); if (resultSetIndex >= ResultSets_.size()) { ResultSets_.resize(resultSetIndex + 1); + ResultSetSizes_.resize(resultSetIndex + 1, 0); } if (!ResultSets_[resultSetIndex].truncated()) { + ui64& resultSetSize = ResultSetSizes_[resultSetIndex]; for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) { if (static_cast(ResultSets_[resultSetIndex].rows_size()) >= ResultRowsLimit_) { ResultSets_[resultSetIndex].set_truncated(true); break; } - if (ResultSets_[resultSetIndex].ByteSizeLong() + row.ByteSizeLong() > ResultSizeLimit_) { + auto rowSize = row.ByteSizeLong(); + if (resultSetSize + rowSize > ResultSizeLimit_) { ResultSets_[resultSetIndex].set_truncated(true); break; } + resultSetSize += rowSize; *ResultSets_[resultSetIndex].add_rows() = std::move(row); } - *ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + if (!ResultSets_[resultSetIndex].columns_size()) { + *ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + } } Send(ev->Sender, response.Release()); } void Handle(NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - Promise_.SetValue(std::move(ev)); + Promise_.SetValue(TQueryResponse{.Response = std::move(ev), .ResultSets = std::move(ResultSets_)}); PassAway(); } + void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { + if (ProgressCallback_) { + ProgressCallback_(ev->Get()->Record); + } + } + private: - THolder Request_; - NThreading::TPromise Promise_; + ui32 TargetNode_ = 0; + std::unique_ptr Request_; + NThreading::TPromise Promise_; ui64 ResultRowsLimit_; ui64 ResultSizeLimit_; - std::vector& ResultSets_; + TProgressCallback ProgressCallback_; + std::vector ResultSets_; + std::vector ResultSetSizes_; +}; + +class TAsyncQueryRunnerActor : public NActors::TActor { + using TBase = NActors::TActor; + +public: + TAsyncQueryRunnerActor(ui64 inFlightLimit) + : TBase(&TAsyncQueryRunnerActor::StateFunc) + , InFlightLimit_(inFlightLimit) + { + RunningRequests_.reserve(InFlightLimit_); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvStartAsyncQuery, Handle); + hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle); + hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle); + ) + + void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) { + DelayedRequests_.emplace(std::move(ev)); + StartDelayedRequests(); + } + + void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) { + const ui64 requestId = ev->Get()->RequestId; + RunningRequests_.erase(requestId); + + const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef(); + const auto status = response.GetYdbStatus(); + + if (status == Ydb::StatusIds::SUCCESS) { + Completed_++; + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + } else { + Failed_++; + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues); + Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default(); + } + + StartDelayedRequests(); + TryFinalize(); + } + + void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) { + FinalizePromise_ = ev->Get()->FinalizePromise; + if (!TryFinalize()) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl; + } + } + +private: + void StartDelayedRequests() { + while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) { + auto request = std::move(DelayedRequests_.front()); + DelayedRequests_.pop(); + + auto promise = NThreading::NewPromise(); + Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr)); + RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture& f) { + Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); + }); + + MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size()); + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + + RequestId_++; + request->Get()->StartPromise.SetValue(); + } + } + + bool TryFinalize() { + if (!FinalizePromise_ || !RunningRequests_.empty()) { + return false; + } + + FinalizePromise_->SetValue(); + PassAway(); + return true; + } + + TString GetInfoString() const { + return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; + } + +private: + const ui64 InFlightLimit_; + const TInstant StartTime_ = TInstant::Now(); + const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout); + + std::optional> FinalizePromise_; + std::queue DelayedRequests_; + std::unordered_map> RunningRequests_; + + ui64 RequestId_ = 1; + ui64 MaxInFlight_ = 0; + ui64 Completed_ = 0; + ui64 Failed_ = 0; +}; + +class TResourcesWaiterActor : public NActors::TActorBootstrapped { + static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10); + +public: + TResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) + : ExpectedNodeCount_(expectedNodeCount) + , Promise_(promise) + {} + + void Bootstrap() { + Become(&TResourcesWaiterActor::StateFunc); + CheckResourcesPublish(); + } + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + CheckResourcesPublish(); + } + + void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) { + if (ev->Get()->NodeCount == ExpectedNodeCount_) { + Promise_.SetValue(); + PassAway(); + return; + } + + Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup()); + } + + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvWakeup, Handle); + hFunc(TEvPrivate::TEvResourcesInfo, Handle); + ) + +private: + void CheckResourcesPublish() { + GetResourceManager(); + + if (!ResourceManager_) { + Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup()); + return; + } + + UpdateResourcesInfo(); + } + + void GetResourceManager() { + if (ResourceManager_) { + return; + } + ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId()); + } + + void UpdateResourcesInfo() const { + ResourceManager_->RequestClusterResourcesInfo( + [selfId = SelfId(), actorContext = ActorContext()](TVector&& resources) { + actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size())); + }); + } + +private: + const i32 ExpectedNodeCount_; + NThreading::TPromise Promise_; + + std::shared_ptr ResourceManager_; }; } // anonymous namespace -NActors::IActor* CreateRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets) { - return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets); +NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback) { + return new TRunScriptActorMock(std::move(request), promise, progressCallback); +} + +NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) { + return new TAsyncQueryRunnerActor(inFlightLimit); +} + +NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { + return new TResourcesWaiterActor(promise, expectedNodeCount); } } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index 9e7a251d14ff..8dc4e731a4ea 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -1,11 +1,79 @@ +#pragma once + #include #include namespace NKqpRun { -NActors::IActor* CreateRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets); +struct TQueryResponse { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response; + std::vector ResultSets; +}; + +struct TQueryRequest { + std::unique_ptr Event; + ui32 TargetNode; + ui64 ResultRowsLimit; + ui64 ResultSizeLimit; +}; + +struct TEvPrivate { + enum EEv : ui32 { + EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvAsyncQueryFinished, + EvFinalizeAsyncQueryRunner, + + EvResourcesInfo, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvStartAsyncQuery : public NActors::TEventLocal { + TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise startPromise) + : Request(std::move(request)) + , StartPromise(startPromise) + {} + + TQueryRequest Request; + NThreading::TPromise StartPromise; + }; + + struct TEvAsyncQueryFinished : public NActors::TEventLocal { + TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result) + : RequestId(requestId) + , Result(std::move(result)) + {} + + const ui64 RequestId; + const TQueryResponse Result; + }; + + struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal { + explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise finalizePromise) + : FinalizePromise(finalizePromise) + {} + + NThreading::TPromise FinalizePromise; + }; + + struct TEvResourcesInfo : public NActors::TEventLocal { + explicit TEvResourcesInfo(i32 nodeCount) + : NodeCount(nodeCount) + {} + + const i32 NodeCount; + }; +}; + +using TProgressCallback = std::function; + +NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback); + +NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit); + +NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index e69331ab9aed..c702bfe8a3c1 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -3,6 +3,7 @@ #include #include +#include #include @@ -12,14 +13,20 @@ namespace NKqpRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; struct TYdbSetupSettings { + ui32 NodeCount = 1; TString DomainName = "Root"; + TDuration InitializationTimeout = TDuration::Seconds(10); + bool MonitoringEnabled = false; + ui16 MonitoringPortOffset = 0; bool TraceOptEnabled = false; - TMaybe LogOutputFile; + TString LogOutputFile; TString YqlToken; TIntrusivePtr FunctionRegistry; NKikimrConfig::TAppConfig AppConfig; + + ui64 InFlightLimit = 0; }; @@ -34,12 +41,14 @@ struct TRunnerOptions { enum class EResultOutputFormat { RowsJson, // Rows in json format FullJson, // Columns, rows and types in json format + FullProto, // Columns, rows and types in proto string format }; - IOutputStream* ResultOutput = &Cout; + IOutputStream* ResultOutput = nullptr; IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; + TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 1975d86bd368..d7c3cfdfe1b3 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -2,6 +2,10 @@ #include "ydb_setup.h" #include +#include + +#include +#include #include #include @@ -9,18 +13,75 @@ namespace NKqpRun { +namespace { + +// Function adds thousands separators +// 123456789 -> 123.456.789 +TString FormatNumber(i64 number) { + struct TSeparator : public std::numpunct { + char do_thousands_sep() const final { + return '.'; + } + + std::string do_grouping() const final { + return "\03"; + } + }; + + std::ostringstream stream; + stream.imbue(std::locale(stream.getloc(), new TSeparator())); + stream << number; + return stream.str(); +} + +void PrintStatistics(const TString& fullStat, const NFq::TPublicStat& publicStat, IOutputStream& output) { + output << "\nPublic statistics:" << Endl; + if (auto memoryUsageBytes = publicStat.MemoryUsageBytes) { + output << "MemoryUsage = " << NKikimr::NBlobDepot::FormatByteSize(*memoryUsageBytes) << Endl; + } + if (auto cpuUsageUs = publicStat.CpuUsageUs) { + output << "CpuUsage = " << NFq::FormatDurationUs(*cpuUsageUs) << Endl; + } + if (auto inputBytes = publicStat.InputBytes) { + output << "InputSize = " << NKikimr::NBlobDepot::FormatByteSize(*inputBytes) << Endl; + } + if (auto outputBytes = publicStat.OutputBytes) { + output << "OutputSize = " << NKikimr::NBlobDepot::FormatByteSize(*outputBytes) << Endl; + } + if (auto sourceInputRecords = publicStat.SourceInputRecords) { + output << "SourceInputRecords = " << FormatNumber(*sourceInputRecords) << Endl; + } + if (auto sinkOutputRecords = publicStat.SinkOutputRecords) { + output << "SinkOutputRecords = " << FormatNumber(*sinkOutputRecords) << Endl; + } + if (auto runningTasks = publicStat.RunningTasks) { + output << "RunningTasks = " << FormatNumber(*runningTasks) << Endl; + } + + output << "\nFull statistics:" << Endl; + NJson::TJsonValue statsJson; + NJson::ReadJsonTree(fullStat, &statsJson); + NJson::WriteJson(&output, &statsJson, true, true, true); + output << Endl; +} + +} // anonymous namespace + + //// TKqpRunner::TImpl class TKqpRunner::TImpl { public: enum class EQueryType { ScriptQuery, - YqlScriptQuery + YqlScriptQuery, + AsyncQuery }; explicit TImpl(const TRunnerOptions& options) : Options_(options) , YdbSetup_(options.YdbSettings) + , StatProcessor_(NFq::CreateStatProcessor("stat_full")) , CerrColors_(NColorizer::AutoColors(Cerr)) , CoutColors_(NColorizer::AutoColors(Cout)) {} @@ -62,18 +123,24 @@ class TKqpRunner::TImpl { TRequestResult status; switch (queryType) { case EQueryType::ScriptQuery: - status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_); + status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_, GetProgressCallback()); break; case EQueryType::YqlScriptQuery: status = YdbSetup_.YqlScriptRequest(query, action, traceId, meta, ResultSets_); break; + + case EQueryType::AsyncQuery: + YdbSetup_.QueryRequestAsync(query, action, traceId); + return true; } TYdbSetup::StopTraceOpt(); PrintScriptAst(meta.Ast); + PrintScriptPlan(meta.Plan); + if (!status.IsSuccess()) { Cerr << CerrColors_.Red() << "Failed to execute query, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; return false; @@ -83,11 +150,13 @@ class TKqpRunner::TImpl { Cerr << CerrColors_.Red() << "Request finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl; } - PrintScriptPlan(meta.Plan); - return true; } + void WaitAsyncQueries() const { + YdbSetup_.WaitAsyncQueries(); + } + bool FetchScriptResults() { TYdbSetup::StopTraceOpt(); @@ -136,9 +205,17 @@ class TKqpRunner::TImpl { private: bool WaitScriptExecutionOperation() { + ExecutionMeta_ = TExecutionMeta(); + + TDuration getOperationPeriod = TDuration::Seconds(1); + if (auto progressStatsPeriodMs = Options_.YdbSettings.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { + getOperationPeriod = TDuration::MilliSeconds(progressStatsPeriodMs); + } + TRequestResult status; while (true) { status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionOperation_, ExecutionMeta_); + PrintScriptProgress(ExecutionMeta_.Plan); if (ExecutionMeta_.Ready) { break; @@ -149,11 +226,13 @@ class TKqpRunner::TImpl { return false; } - Sleep(TDuration::Seconds(1)); + Sleep(getOperationPeriod); } PrintScriptAst(ExecutionMeta_.Ast); + PrintScriptPlan(ExecutionMeta_.Plan); + if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) { Cerr << CerrColors_.Red() << "Failed to execute script, invalid final status, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; return false; @@ -163,8 +242,6 @@ class TKqpRunner::TImpl { Cerr << CerrColors_.Red() << "Request finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl; } - PrintScriptPlan(ExecutionMeta_.Plan); - return true; } @@ -194,15 +271,60 @@ class TKqpRunner::TImpl { } } + void PrintPlan(const TString& plan, IOutputStream* output) const { + if (!plan) { + return; + } + + NJson::TJsonValue planJson; + NJson::ReadJsonTree(plan, &planJson, true); + if (!planJson.GetMapSafe().contains("meta")) { + return; + } + + NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *output); + printer.Print(plan); + } + void PrintScriptPlan(const TString& plan) const { if (Options_.ScriptQueryPlanOutput) { Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; + PrintPlan(plan, Options_.ScriptQueryPlanOutput); + } + } + + void PrintScriptProgress(const TString& plan) const { + if (Options_.InProgressStatisticsOutputFile) { + TFileOutput outputStream(Options_.InProgressStatisticsOutputFile); + outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; + + auto convertedPlan = plan; + try { + double cpuUsage = 0.0; + auto fullStat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage); + auto publicStat = StatProcessor_->GetPublicStat(fullStat); + + outputStream << "\nCPU usage: " << cpuUsage << Endl; + PrintStatistics(fullStat, publicStat, outputStream); + } catch (const NJson::TJsonException& ex) { + outputStream << "Error stat conversion: " << ex.what() << Endl; + } - NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput); - printer.Print(plan); + outputStream << "\nPlan visualization:" << Endl; + PrintPlan(convertedPlan, &outputStream); + + outputStream.Finish(); } } + TProgressCallback GetProgressCallback() { + return [this](const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable { + const TString& plan = executerProgress.GetQueryPlan(); + ExecutionMeta_.Plan = plan; + PrintScriptProgress(plan); + }; + } + void PrintScriptResult(const Ydb::ResultSet& resultSet) const { switch (Options_.ResultOutputFormat) { case TRunnerOptions::EResultOutputFormat::RowsJson: { @@ -219,6 +341,16 @@ class TKqpRunner::TImpl { case TRunnerOptions::EResultOutputFormat::FullJson: resultSet.PrintJSON(*Options_.ResultOutput); + *Options_.ResultOutput << Endl; + break; + + case TRunnerOptions::EResultOutputFormat::FullProto: + TString resultSetString; + google::protobuf::TextFormat::Printer printer; + printer.SetSingleLineMode(false); + printer.SetUseUtf8StringEscaping(true); + printer.PrintToString(resultSet, &resultSetString); + *Options_.ResultOutput << resultSetString; break; } } @@ -227,6 +359,7 @@ class TKqpRunner::TImpl { TRunnerOptions Options_; TYdbSetup YdbSetup_; + std::unique_ptr StatProcessor_; NColorizer::TColors CerrColors_; NColorizer::TColors CoutColors_; @@ -258,6 +391,14 @@ bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); } +void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::AsyncQuery); +} + +void TKqpRunner::WaitAsyncQueries() const { + Impl_->WaitAsyncQueries(); +} + bool TKqpRunner::FetchScriptResults() { return Impl_->FetchScriptResults(); } diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index b263bbedbf55..3687a7cbda06 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -18,6 +18,10 @@ class TKqpRunner { bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + + void WaitAsyncQueries() const; + bool FetchScriptResults(); bool ForgetExecutionOperation(); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index b2ed012e5f57..9929aa5cadea 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -1,6 +1,7 @@ -#include "actors.h" #include "ydb_setup.h" +#include + #include #include @@ -8,7 +9,6 @@ #include - namespace NKqpRun { namespace { @@ -68,7 +68,7 @@ class TYdbSetup::TImpl { private: TAutoPtr CreateLogBackend() const { if (Settings_.LogOutputFile) { - return NActors::CreateFileBackend(*Settings_.LogOutputFile); + return NActors::CreateFileBackend(Settings_.LogOutputFile); } else { return NActors::CreateStderrBackend(); } @@ -113,12 +113,19 @@ class TYdbSetup::TImpl { NKikimr::Tests::TServerSettings GetServerSettings() { ui32 msgBusPort = PortManager_.GetPort(); - NKikimr::Tests::TServerSettings serverSettings(msgBusPort); - serverSettings.SetNodeCount(1); + NKikimr::Tests::TServerSettings serverSettings(msgBusPort, Settings_.AppConfig.GetAuthConfig(), Settings_.AppConfig.GetPQConfig()); + serverSettings.SetNodeCount(Settings_.NodeCount); serverSettings.SetDomainName(Settings_.DomainName); serverSettings.SetAppConfig(Settings_.AppConfig); serverSettings.SetFeatureFlags(Settings_.AppConfig.GetFeatureFlags()); + serverSettings.SetControls(Settings_.AppConfig.GetImmediateControlsConfig()); + serverSettings.SetCompactionConfig(Settings_.AppConfig.GetCompactionConfig()); + serverSettings.PQClusterDiscoveryConfig = Settings_.AppConfig.GetPQClusterDiscoveryConfig(); + serverSettings.NetClassifierConfig = Settings_.AppConfig.GetNetClassifierConfig(); + + const auto& kqpSettings = Settings_.AppConfig.GetKQPConfig().GetSettings(); + serverSettings.SetKqpSettings({kqpSettings.begin(), kqpSettings.end()}); serverSettings.SetCredentialsFactory(std::make_shared(Settings_.YqlToken)); serverSettings.SetInitializeFederatedQuerySetupFactory(true); @@ -126,6 +133,10 @@ class TYdbSetup::TImpl { SetLoggerSettings(serverSettings); SetFunctionRegistry(serverSettings); + if (Settings_.MonitoringEnabled) { + serverSettings.InitKikimrRunConfig(); + } + return serverSettings; } @@ -162,17 +173,36 @@ class TYdbSetup::TImpl { NYql::NLog::InitLogger(NActors::CreateNullBackend()); } + void WaitResourcesPublishing() const { + auto promise = NThreading::NewPromise(); + GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount)); + + try { + promise.GetFuture().GetValue(Settings_.InitializationTimeout); + } catch (...) { + ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage(); + } + } + public: explicit TImpl(const TYdbSetupSettings& settings) : Settings_(settings) + , CoutColors_(NColorizer::AutoColors(Cout)) { InitializeYqlLogger(); InitializeServer(); + WaitResourcesPublishing(); + + if (Settings_.MonitoringEnabled) { + for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) { + Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl; + } + } } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TString& query, const TString& traceId) const { auto event = MakeHolder(); - FillSchemeRequest(query, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_DDL, NKikimrKqp::QUERY_ACTION_EXECUTE, traceId, event->Record); return RunKqpProxyRequest(std::move(event)); } @@ -184,21 +214,17 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector& resultSets) const { - auto event = MakeHolder(); - FillScriptRequest(query, action, traceId, event->Record); - - auto promise = NThreading::NewPromise(); - auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); - auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets)); + TQueryResponse QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { + auto request = GetQueryRequest(query, action, traceId); + auto promise = NThreading::NewPromise(); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); return promise.GetFuture().GetValueSync(); } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { auto event = MakeHolder(); - FillYqlScriptRequest(query, action, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, action, traceId, event->Record); return RunKqpProxyRequest(std::move(event)); } @@ -213,12 +239,13 @@ class TYdbSetup::TImpl { NKikimr::NKqp::TEvFetchScriptResultsResponse::TPtr FetchScriptExecutionResultsRequest(const TString& operation, i32 resultSetId) const { TString executionId = *NKikimr::NKqp::ScriptExecutionIdFromOperation(operation); - NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); + ui32 nodeIndex = RandomNumber(Settings_.NodeCount); + NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex); auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max()); - GetRuntime()->Register(fetchActor); + GetRuntime()->Register(fetchActor, nodeIndex); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -230,6 +257,29 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } + void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + if (!AsyncQueryRunnerActorId_) { + AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit)); + } + + auto request = GetQueryRequest(query, action, traceId); + auto startPromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); + + return startPromise.GetFuture().GetValueSync(); + } + + void WaitAsyncQueries() const { + if (!AsyncQueryRunnerActorId_) { + return; + } + + auto finalizePromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise)); + + return finalizePromise.GetFuture().GetValueSync(); + } + void StartTraceOpt() const { if (!Settings_.TraceOptEnabled) { ythrow yexception() << "Trace opt was disabled"; @@ -249,10 +299,11 @@ class TYdbSetup::TImpl { template typename TResponse::TPtr RunKqpProxyRequest(THolder event) const { - NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); - NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId()); + ui32 nodeIndex = RandomNumber(Settings_.NodeCount); + NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex); + NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(nodeIndex)); - GetRuntime()->Send(kqpProxy, edgeActor, event.Release()); + GetRuntime()->Send(kqpProxy, edgeActor, event.Release(), nodeIndex); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -270,10 +321,6 @@ class TYdbSetup::TImpl { request->SetDatabase(Settings_.DomainName); } - void FillSchemeRequest(const TString& query, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_DDL, NKikimrKqp::QUERY_ACTION_EXECUTE, traceId, event); - } - void FillScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, action, traceId, event); @@ -284,16 +331,31 @@ class TYdbSetup::TImpl { } } - void FillYqlScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, action, traceId, event); + TQueryRequest GetQueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + auto event = std::make_unique(); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); + + if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { + event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); + } + + return { + .Event = std::move(event), + .TargetNode = GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount)), + .ResultRowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(), + .ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit() + }; } private: TYdbSetupSettings Settings_; + NColorizer::TColors CoutColors_; THolder Server_; THolder Client_; TPortManager PortManager_; + + std::optional AsyncQueryRunnerActorId_; }; @@ -346,14 +408,18 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); } -TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { +TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { resultSets.clear(); - auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets)->Get()->Record.GetRef(); + TQueryResponse queryResponse = Impl_->QueryRequest(query, action, traceId, progressCallback); + const auto& queryOperationResponse = queryResponse.Response->Get()->Record.GetRef(); const auto& responseRecord = queryOperationResponse.GetResponse(); + resultSets = std::move(queryResponse.ResultSets); meta.Ast = responseRecord.GetQueryAst(); - meta.Plan = responseRecord.GetQueryPlan(); + if (const auto& plan = responseRecord.GetQueryPlan()) { + meta.Plan = plan; + } return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); } @@ -389,7 +455,9 @@ TRequestResult TYdbSetup::GetScriptExecutionOperationRequest(const TString& oper meta.ExecutionStatus = static_cast(deserializedMeta.exec_status()); meta.ResultSetsCount = deserializedMeta.result_sets_meta_size(); meta.Ast = deserializedMeta.exec_stats().query_ast(); - meta.Plan = deserializedMeta.exec_stats().query_plan(); + if (deserializedMeta.exec_stats().query_plan() != "{}") { + meta.Plan = deserializedMeta.exec_stats().query_plan(); + } } return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); @@ -409,6 +477,14 @@ TRequestResult TYdbSetup::ForgetScriptExecutionOperationRequest(const TString& o return TRequestResult(forgetScriptExecutionOperationResponse->Get()->Status, forgetScriptExecutionOperationResponse->Get()->Issues); } +void TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->QueryRequestAsync(query, action, traceId); +} + +void TYdbSetup::WaitAsyncQueries() const { + Impl_->WaitAsyncQueries(); +} + void TYdbSetup::StartTraceOpt() const { Impl_->StartTraceOpt(); } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index f31a3b249891..017ab6e18ede 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -1,6 +1,7 @@ #pragma once #include "common.h" +#include "actors.h" #include #include @@ -54,7 +55,7 @@ class TYdbSetup { TRequestResult ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const; - TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; + TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; @@ -64,6 +65,10 @@ class TYdbSetup { TRequestResult ForgetScriptExecutionOperationRequest(const TString& operation) const; + void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + + void WaitAsyncQueries() const; + void StartTraceOpt() const; static void StopTraceOpt(); diff --git a/ydb/tests/tools/kqprun/ya.make b/ydb/tests/tools/kqprun/ya.make index 7ba9c73011e5..db64989879a5 100644 --- a/ydb/tests/tools/kqprun/ya.make +++ b/ydb/tests/tools/kqprun/ya.make @@ -15,6 +15,7 @@ PEERDIR( PEERDIR( ydb/library/yql/udfs/common/datetime2 + ydb/library/yql/udfs/common/re2 ydb/library/yql/udfs/common/string ydb/library/yql/udfs/common/yson2 )