Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KqpRun add results into clear execution #1661

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
sync_dir
example
udfs
*.log
*.sql
*.bin
76 changes: 50 additions & 26 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "src/kqp_runner.h"

#include <cstdio>

#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>

#include <library/cpp/colorizer/colors.h>
Expand All @@ -22,7 +24,7 @@ struct TExecutionOptions {
TString ScriptTraceId = "kqprun";

bool HasResults() const {
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE && !ClearExecution;
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
}
};

Expand All @@ -46,6 +48,9 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
ythrow yexception() << "Script execution failed";
}
if (!runner.FetchScriptResults()) {
ythrow yexception() << "Fetch script results failed";
}
} else {
if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
ythrow yexception() << "Query execution failed";
Expand All @@ -54,10 +59,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
}

if (executionOptions.HasResults()) {
Cout << colors.Yellow() << "Writing script results..." << colors.Default() << Endl;
if (!runner.WriteScriptResults()) {
ythrow yexception() << "Writing script results failed";
}
runner.PrintScriptResults();
}
}

Expand All @@ -74,6 +76,20 @@ THolder<TFileOutput> SetupDefaultFileOutput(const TString& filePath, IOutputStre
}


TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry(const TString& udfsDirectory, TVector<TString> udfsPaths) {
if (!udfsDirectory.empty() || !udfsPaths.empty()) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
Cout << colors.Yellow() << "Fetching udfs..." << colors.Default() << Endl;
}

NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths);
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone();
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);

return functionRegistry;
}


void RunMain(int argc, const char* argv[]) {
TExecutionOptions executionOptions;
NKqpRun::TRunnerOptions runnerOptions;
Expand All @@ -87,9 +103,11 @@ void RunMain(int argc, const char* argv[]) {
TString logFile = "-";
TString appConfigFile = "./configuration/app_config.conf";

TString traceOptType = "disabled";
TString scriptQueryAction = "execute";
TString planOutputFormat = "pretty";
TString resultOutputFormat = "rows";
i64 resultsRowsLimit = 1000;

TVector<TString> udfsPaths;
TString udfsDirectory;
Expand All @@ -103,7 +121,7 @@ void RunMain(int argc, const char* argv[]) {
.Optional()
.RequiredArgument("FILE")
.StoreResult(&schemeQueryFile);
options.AddLongOption("app-config", "File with app config (TAppConfig)")
options.AddLongOption('c', "app-config", "File with app config (TAppConfig)")
.Optional()
.RequiredArgument("FILE")
.DefaultValue(appConfigFile)
Expand Down Expand Up @@ -135,33 +153,33 @@ void RunMain(int argc, const char* argv[]) {
.NoArgument()
.DefaultValue(executionOptions.ClearExecution)
.SetFlag(&executionOptions.ClearExecution);
options.AddLongOption("trace-opt", "print AST in the begin of each transformation")
options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation, one of { scheme | script | all }")
.Optional()
.NoArgument()
.DefaultValue(runnerOptions.YdbSettings.TraceOpt)
.SetFlag(&runnerOptions.YdbSettings.TraceOpt);
options.AddLongOption("script-action", "Script query execute action, one of { execute | explain }")
.RequiredArgument("STR")
.DefaultValue(traceOptType)
.StoreResult(&traceOptType);
options.AddLongOption('A', "script-action", "Script query execute action, one of { execute | explain }")
.Optional()
.RequiredArgument("STR")
.DefaultValue(scriptQueryAction)
.StoreResult(&scriptQueryAction);
options.AddLongOption("plan-format", "Script query plan format, one of { pretty | table | json }")
options.AddLongOption('P', "plan-format", "Script query plan format, one of { pretty | table | json }")
.Optional()
.RequiredArgument("STR")
.DefaultValue(planOutputFormat)
.StoreResult(&planOutputFormat);
options.AddLongOption("result-format", "Script query result format, one of { rows | full }")
options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full }")
.Optional()
.RequiredArgument("STR")
.DefaultValue(resultOutputFormat)
.StoreResult(&resultOutputFormat);
options.AddLongOption("result-rows-limit", "Rows limit for script execution results")
options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
.Optional()
.RequiredArgument("INT")
.DefaultValue(runnerOptions.ResultsRowsLimit)
.StoreResult(&runnerOptions.ResultsRowsLimit);
.DefaultValue(resultsRowsLimit)
.StoreResult(&resultsRowsLimit);

options.AddLongOption("udf", "Load shared library with UDF by given path")
options.AddLongOption('u', "udf", "Load shared library with UDF by given path")
.Optional()
.RequiredArgument("FILE")
.AppendTo(&udfsPaths);
Expand Down Expand Up @@ -191,15 +209,19 @@ void RunMain(int argc, const char* argv[]) {

// Runner options

if (runnerOptions.ResultsRowsLimit < 0) {
ythrow yexception() << "Results rows limit less than zero";
}

THolder<TFileOutput> resultFileHolder = SetupDefaultFileOutput(resultOutputFile, runnerOptions.ResultOutput);
THolder<TFileOutput> schemeQueryAstFileHolder = SetupDefaultFileOutput(schemeQueryAstFile, runnerOptions.SchemeQueryAstOutput);
THolder<TFileOutput> scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput);
THolder<TFileOutput> scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput);

runnerOptions.TraceOptType =
(traceOptType == TStringBuf("all")) ? NKqpRun::TRunnerOptions::ETraceOptType::All
: (traceOptType == TStringBuf("scheme")) ? NKqpRun::TRunnerOptions::ETraceOptType::Scheme
: (traceOptType == TStringBuf("script")) ? NKqpRun::TRunnerOptions::ETraceOptType::Script
: (traceOptType == TStringBuf("disabled")) ? NKqpRun::TRunnerOptions::ETraceOptType::Disabled
: NKqpRun::TRunnerOptions::ETraceOptType::All;
runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled;

runnerOptions.ResultOutputFormat =
(resultOutputFormat == TStringBuf("rows")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson
: (resultOutputFormat == TStringBuf("full")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson
Expand All @@ -215,20 +237,22 @@ void RunMain(int argc, const char* argv[]) {

if (logFile != "-") {
runnerOptions.YdbSettings.LogOutputFile = logFile;
std::remove(logFile.c_str());
}

runnerOptions.YdbSettings.YqlToken = GetEnv("YQL_TOKEN");

NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths);
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone();
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);
runnerOptions.YdbSettings.FunctionRegistry = functionRegistry.Get();
runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths).Get();

TString appConfigData = TFileInput(appConfigFile).ReadAll();
if (!google::protobuf::TextFormat::ParseFromString(appConfigData, &runnerOptions.YdbSettings.AppConfig)) {
ythrow yexception() << "Bad format of app configuration";
}

if (resultsRowsLimit < 0) {
ythrow yexception() << "Results rows limit less than zero";
}
runnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(resultsRowsLimit);

RunScript(executionOptions, runnerOptions);
}

Expand Down
43 changes: 38 additions & 5 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ namespace {

class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMock> {
public:
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise, ui64 resultSizeLimit)
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets)
: Request_(std::move(request))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
, ResultSizeLimit_(std::numeric_limits<i64>::max())
, ResultSets_(resultSets)
{
if (resultSizeLimit && resultSizeLimit < std::numeric_limits<i64>::max()) {
if (resultRowsLimit) {
ResultRowsLimit_ = resultRowsLimit;
}
if (resultSizeLimit) {
ResultSizeLimit_ = resultSizeLimit;
}
}
Expand All @@ -36,6 +43,28 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
response->Record.SetFreeSpace(ResultSizeLimit_);

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
if (resultSetIndex >= ResultSets_.size()) {
ResultSets_.resize(resultSetIndex + 1);
}

if (!ResultSets_[resultSetIndex].truncated()) {
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
if (static_cast<ui64>(ResultSets_[resultSetIndex].rows_size()) >= ResultRowsLimit_) {
ResultSets_[resultSetIndex].set_truncated(true);
break;
}

if (ResultSets_[resultSetIndex].ByteSizeLong() + row.ByteSizeLong() > ResultSizeLimit_) {
ResultSets_[resultSetIndex].set_truncated(true);
break;
}

*ResultSets_[resultSetIndex].add_rows() = std::move(row);
}
*ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns();
}

Send(ev->Sender, response.Release());
}

Expand All @@ -47,13 +76,17 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
private:
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
i64 ResultSizeLimit_;
ui64 ResultRowsLimit_;
ui64 ResultSizeLimit_;
std::vector<Ydb::ResultSet>& ResultSets_;
};

} // anonymous namespace

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise, ui64 resultSizeLimit) {
return new TRunScriptActorMock(std::move(request), promise, resultSizeLimit);
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets);
}

} // namespace NKqpRun
4 changes: 3 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace NKqpRun {

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise, ui64 resultSizeLimit);
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets);

} // namespace NKqpRun
14 changes: 10 additions & 4 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,36 @@ namespace NKqpRun {
struct TYdbSetupSettings {
TString DomainName = "Root";

bool TraceOpt = false;
bool TraceOptEnabled = false;
TMaybe<TString> LogOutputFile;

TString YqlToken;
NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry = nullptr;
NKikimrConfig::TAppConfig AppConfig;
};


struct TRunnerOptions {
enum class ETraceOptType {
Disabled,
Scheme,
Script,
All,
};

enum class EResultOutputFormat {
RowsJson, // Rows in json format
FullJson, // Columns, rows and types in json format
};

i64 ResultsRowsLimit = 1000;

IOutputStream* ResultOutput = &Cout;
IOutputStream* SchemeQueryAstOutput = nullptr;
IOutputStream* ScriptQueryAstOutput = nullptr;
IOutputStream* ScriptQueryPlanOutput = nullptr;

EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;
ETraceOptType TraceOptType = ETraceOptType::Disabled;

TYdbSetupSettings YdbSettings;
};
Expand Down
Loading
Loading