Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ kqprun fixed AS pools and added validations #9030

Merged
merged 8 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 39 additions & 34 deletions ydb/core/testlib/test_client.cpp

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions ydb/core/testlib/test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ namespace Tests {
TServer& operator =(TServer&& server) = default;
virtual ~TServer();

void EnableGRpc(const NYdbGrpc::TServerOptions& options);
void EnableGRpc(ui16 port);
void EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId = 0);
void EnableGRpc(ui16 port, ui32 grpcServiceNodeId = 0);
void SetupRootStoragePools(const TActorId sender) const;

void SetupDefaultProfiles();
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ udfs
*.sql
*.bin
*.txt
*.svg
*.old
46 changes: 46 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,49 @@
ActorSystemConfig {
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 10
Name: "System"
}
Executor {
Type: BASIC
Threads: 6
SpinThreshold: 1
Name: "User"
}
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 1
Name: "Batch"
}
Executor {
Type: IO
Threads: 1
Name: "IO"
}
Executor {
Type: BASIC
Threads: 2
SpinThreshold: 10
Name: "IC"
TimePerMailboxMicroSecs: 100
}
Scheduler {
Resolution: 64
SpinThreshold: 0
ProgressThreshold: 10000
}
SysExecutor: 0
UserExecutor: 1
IoExecutor: 3
BatchExecutor: 2
ServiceExecutor {
ServiceName: "Interconnect"
ExecutorId: 4
}
}

ColumnShardConfig {
DisabledOnSchemeShard: false
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/tests/tools/kqprun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

# For svg graph download https://github.com/brendangregg/FlameGraph
# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`

pid=$(pgrep -u $USER kqprun)

echo "Target process id: ${pid}"

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
sudo perf script -i profdata > profdata.txt
177 changes: 161 additions & 16 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,29 @@ struct TExecutionOptions {
std::vector<TString> TraceIds;
std::vector<TString> PoolIds;
std::vector<TString> UserSIDs;
ui64 ResultsRowsLimit = 0;

const TString DefaultTraceId = "kqprun";

bool HasResults() const {
if (ScriptQueries.empty()) {
return false;
}

for (size_t i = 0; i < ExecutionCases.size(); ++i) {
for (size_t i = 0; i < ScriptQueries.size(); ++i) {
if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) {
continue;
}
if (ExecutionCases[i] != EExecutionCase::AsyncQuery) {
if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) {
return true;
}
}
return false;
}

bool HasExecutionCase(EExecutionCase executionCase) const {
if (ExecutionCases.empty()) {
return executionCase == EExecutionCase::GenericScript;
}
return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end();
}

EExecutionCase GetExecutionCase(size_t index) const {
return GetValue(index, ExecutionCases, EExecutionCase::GenericScript);
}
Expand Down Expand Up @@ -106,6 +110,113 @@ struct TExecutionOptions {
};
}

void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const {
if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute and is not running as daemon";
}

ValidateOptionsSizes();
ValidateSchemeQueryOptions(runnerOptions);
ValidateScriptExecutionOptions(runnerOptions);
ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings);
ValidateTraceOpt(runnerOptions.TraceOptType);
}

private:
void ValidateOptionsSizes() const {
const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) {
if (checkSize > numberQueries) {
ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries;
}
};

checker(ExecutionCases.size(), "execution cases");
checker(ScriptQueryActions.size(), "script query actions");
checker(Databases.size(), "databases");
checker(TraceIds.size(), "trace ids");
checker(PoolIds.size(), "pool ids");
checker(UserSIDs.size(), "user SIDs");
}

void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
if (SchemeQuery) {
return;
}
if (runnerOptions.SchemeQueryAstOutput) {
ythrow yexception() << "Scheme query AST output can not be used without scheme query";
}
}

void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
// Script specific
if (HasExecutionCase(EExecutionCase::GenericScript)) {
return;
}
if (ForgetExecution) {
ythrow yexception() << "Forget execution can not be used without generic script queries";
}
if (runnerOptions.ScriptCancelAfter) {
ythrow yexception() << "Cancel after can not be used without generic script queries";
}

// Script/Query specific
if (HasExecutionCase(EExecutionCase::GenericQuery)) {
return;
}
if (ResultsRowsLimit) {
ythrow yexception() << "Result rows limit can not be used without script queries";
}
if (runnerOptions.InProgressStatisticsOutputFile) {
ythrow yexception() << "Script statistics can not be used without script queries";
}

// Common specific
if (HasExecutionCase(EExecutionCase::YqlScript)) {
return;
}
if (runnerOptions.ScriptQueryAstOutput) {
ythrow yexception() << "Script query AST output can not be used without script/yql queries";
}
if (runnerOptions.ScriptQueryPlanOutput) {
ythrow yexception() << "Script query plan output can not be used without script/yql queries";
}
}

void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const {
if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) {
ythrow yexception() << "In flight limit can not be used without async queries";
}

NColorizer::TColors colors = NColorizer::AutoColors(Cout);
if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) {
Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl;
}
}

void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const {
switch (traceOptType) {
case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: {
if (!SchemeQuery) {
ythrow yexception() << "Trace opt type scheme cannot be used without scheme query";
}
break;
}
case NKqpRun::TRunnerOptions::ETraceOptType::Script: {
if (ScriptQueries.empty()) {
ythrow yexception() << "Trace opt type script cannot be used without script queries";
}
}
case NKqpRun::TRunnerOptions::ETraceOptType::All: {
if (!SchemeQuery && ScriptQueries.empty()) {
ythrow yexception() << "Trace opt type all cannot be used without any queries";
}
}
case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: {
break;
}
}
}

private:
template <typename TValue>
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
Expand Down Expand Up @@ -278,7 +389,6 @@ class TMain : public TMainClassArgs {
TVector<TString> UdfsPaths;
TString UdfsDirectory;
bool ExcludeLinkedUdfs = false;
ui64 ResultsRowsLimit = 1000;
bool EmulateYt = false;

static TString LoadFile(const TString& file) {
Expand Down Expand Up @@ -415,8 +525,8 @@ class TMain : public TMainClassArgs {
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);
options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
.RequiredArgument("uint")
.DefaultValue(ResultsRowsLimit)
.StoreResult(&ResultsRowsLimit);
.DefaultValue(0)
.StoreResult(&ExecutionOptions.ResultsRowsLimit);
TChoices<NKqpRun::TRunnerOptions::EResultOutputFormat> resultFormat({
{"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson},
{"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson},
Expand All @@ -441,7 +551,12 @@ class TMain : public TMainClassArgs {
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput);
options.AddLongOption("script-statistics", "File with script inprogress statistics")
.RequiredArgument("file")
.StoreResult(&RunnerOptions.InProgressStatisticsOutputFile);
.StoreMappedResultT<TString>(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) {
if (file == "-") {
ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name";
}
return file;
});
TChoices<NYdb::NConsoleClient::EDataFormat> planFormat({
{"pretty", NYdb::NConsoleClient::EDataFormat::Pretty},
{"table", NYdb::NConsoleClient::EDataFormat::PrettyTable},
Expand All @@ -453,6 +568,15 @@ class TMain : public TMainClassArgs {
.Choices(planFormat.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.PlanOutputFormat, planFormat);

options.AddLongOption("script-timeline-file", "File with script query timline in svg format")
.RequiredArgument("file")
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) {
if (file == "-") {
ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name";
}
return file;
});

// Pipeline settings

TChoices<TExecutionOptions::EExecutionCase> executionCase({
Expand All @@ -463,7 +587,6 @@ class TMain : public TMainClassArgs {
});
options.AddLongOption('C', "execution-case", "Type of query for -p argument")
.RequiredArgument("query-type")
.DefaultValue("script")
.Choices(executionCase.GetChoices())
.Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) {
TString choice(option->CurValOrDef());
Expand All @@ -489,13 +612,16 @@ class TMain : public TMainClassArgs {
});
options.AddLongOption('A', "script-action", "Script query execute action")
.RequiredArgument("script-action")
.DefaultValue("execute")
.Choices(scriptAction.GetChoices())
.Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) {
TString choice(option->CurValOrDef());
ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice));
});

options.AddLongOption("timeout", "Reauests timeout in milliseconds")
.RequiredArgument("uint")
.StoreMappedResultT<ui64>(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds")
.RequiredArgument("uint")
.StoreMappedResultT<ui64>(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds<ui64>);
Expand All @@ -510,7 +636,7 @@ class TMain : public TMainClassArgs {
.StoreResult(&ExecutionOptions.LoopCount);
options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps")
.RequiredArgument("uint")
.DefaultValue(1000)
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption('D', "database", "Database path for -p queries")
Expand Down Expand Up @@ -576,6 +702,21 @@ class TMain : public TMainClassArgs {
.RequiredArgument("path")
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);

options.AddLongOption("storage-size", "Domain storage size in gigabytes")
.RequiredArgument("uint")
.DefaultValue(32)
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
return static_cast<ui64>(diskSize) << 30;
});

options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)")
.NoArgument()
.SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks);

options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster")
.NoArgument()
.SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock);

TChoices<std::function<void()>> backtrace({
{"heavy", &NKikimr::EnableYDBBacktraceFormat},
{"light", []() { SetFormatBackTraceFn(FormatBackTrace); }}
Expand All @@ -591,13 +732,17 @@ class TMain : public TMainClassArgs {
}

int DoRun(NLastGetopt::TOptsParseResult&&) override {
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute";
ExecutionOptions.Validate(RunnerOptions);

if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) {
ythrow yexception() << "Disable disk mock cannot be used for multi node clusters";
}

RunnerOptions.YdbSettings.YqlToken = YqlToken;
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit);
if (ExecutionOptions.ResultsRowsLimit) {
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
}

if (EmulateYt) {
const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage();
Expand Down
6 changes: 6 additions & 0 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ struct TYdbSetupSettings {
std::unordered_set<TString> SharedTenants;
std::unordered_set<TString> ServerlessTenants;
TDuration InitializationTimeout = TDuration::Seconds(10);
TDuration RequestsTimeout;

bool DisableDiskMock = false;
bool UseRealPDisks = false;
ui64 DiskSize = 32_GB;

bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;
Expand Down Expand Up @@ -69,6 +74,7 @@ struct TRunnerOptions {
IOutputStream* SchemeQueryAstOutput = nullptr;
IOutputStream* ScriptQueryAstOutput = nullptr;
IOutputStream* ScriptQueryPlanOutput = nullptr;
TString ScriptQueryTimelineFile;
TString InProgressStatisticsOutputFile;

EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
Expand Down
Loading
Loading