Skip to content

Commit

Permalink
Merge d08e57e into acee834
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 13, 2024
2 parents acee834 + d08e57e commit c9c3658
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 63 deletions.
73 changes: 39 additions & 34 deletions ydb/core/testlib/test_client.cpp

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions ydb/core/testlib/test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ namespace Tests {
TServer& operator =(TServer&& server) = default;
virtual ~TServer();

void EnableGRpc(const NYdbGrpc::TServerOptions& options);
void EnableGRpc(ui16 port);
void EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId = 0);
void EnableGRpc(ui16 port, ui32 grpcServiceNodeId = 0);
void SetupRootStoragePools(const TActorId sender) const;

void SetupDefaultProfiles();
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ udfs
*.sql
*.bin
*.txt
*.svg
*.old
46 changes: 46 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,49 @@
ActorSystemConfig {
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 10
Name: "System"
}
Executor {
Type: BASIC
Threads: 6
SpinThreshold: 1
Name: "User"
}
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 1
Name: "Batch"
}
Executor {
Type: IO
Threads: 1
Name: "IO"
}
Executor {
Type: BASIC
Threads: 2
SpinThreshold: 10
Name: "IC"
TimePerMailboxMicroSecs: 100
}
Scheduler {
Resolution: 64
SpinThreshold: 0
ProgressThreshold: 10000
}
SysExecutor: 0
UserExecutor: 1
IoExecutor: 3
BatchExecutor: 2
ServiceExecutor {
ServiceName: "Interconnect"
ExecutorId: 4
}
}

ColumnShardConfig {
DisabledOnSchemeShard: false
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/tests/tools/kqprun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

# For svg graph download https://github.com/brendangregg/FlameGraph
# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`

pid=$(pgrep -u $USER kqprun)

echo "Target process id: ${pid}"

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
sudo perf script -i profdata > profdata.txt
177 changes: 161 additions & 16 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,29 @@ struct TExecutionOptions {
std::vector<TString> TraceIds;
std::vector<TString> PoolIds;
std::vector<TString> UserSIDs;
ui64 ResultsRowsLimit = 0;

const TString DefaultTraceId = "kqprun";

bool HasResults() const {
if (ScriptQueries.empty()) {
return false;
}

for (size_t i = 0; i < ExecutionCases.size(); ++i) {
for (size_t i = 0; i < ScriptQueries.size(); ++i) {
if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) {
continue;
}
if (ExecutionCases[i] != EExecutionCase::AsyncQuery) {
if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) {
return true;
}
}
return false;
}

bool HasExecutionCase(EExecutionCase executionCase) const {
if (ExecutionCases.empty()) {
return executionCase == EExecutionCase::GenericScript;
}
return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end();
}

EExecutionCase GetExecutionCase(size_t index) const {
return GetValue(index, ExecutionCases, EExecutionCase::GenericScript);
}
Expand Down Expand Up @@ -106,6 +110,113 @@ struct TExecutionOptions {
};
}

void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const {
if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute and is not running as daemon";
}

ValidateOptionsSizes();
ValidateSchemeQueryOptions(runnerOptions);
ValidateScriptExecutionOptions(runnerOptions);
ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings);
ValidateTraceOpt(runnerOptions.TraceOptType);
}

private:
void ValidateOptionsSizes() const {
const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) {
if (checkSize > numberQueries) {
ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries;
}
};

checker(ExecutionCases.size(), "execution cases");
checker(ScriptQueryActions.size(), "script query actions");
checker(Databases.size(), "databases");
checker(TraceIds.size(), "trace ids");
checker(PoolIds.size(), "pool ids");
checker(UserSIDs.size(), "user SIDs");
}

void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
if (SchemeQuery) {
return;
}
if (runnerOptions.SchemeQueryAstOutput) {
ythrow yexception() << "Scheme query AST output can not be used without scheme query";
}
}

void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
// Script specific
if (HasExecutionCase(EExecutionCase::GenericScript)) {
return;
}
if (ForgetExecution) {
ythrow yexception() << "Forget execution can not be used without generic script queries";
}
if (runnerOptions.ScriptCancelAfter) {
ythrow yexception() << "Cancel after can not be used without generic script queries";
}

// Script/Query specific
if (HasExecutionCase(EExecutionCase::GenericQuery)) {
return;
}
if (ResultsRowsLimit) {
ythrow yexception() << "Result rows limit can not be used without script queries";
}
if (runnerOptions.InProgressStatisticsOutputFile) {
ythrow yexception() << "Script statistics can not be used without script queries";
}

// Common specific
if (HasExecutionCase(EExecutionCase::YqlScript)) {
return;
}
if (runnerOptions.ScriptQueryAstOutput) {
ythrow yexception() << "Script query AST output can not be used without script/yql queries";
}
if (runnerOptions.ScriptQueryPlanOutput) {
ythrow yexception() << "Script query plan output can not be used without script/yql queries";
}
}

void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const {
if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) {
ythrow yexception() << "In flight limit can not be used without async queries";
}

NColorizer::TColors colors = NColorizer::AutoColors(Cout);
if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) {
Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl;
}
}

void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const {
switch (traceOptType) {
case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: {
if (!SchemeQuery) {
ythrow yexception() << "Trace opt type scheme cannot be used without scheme query";
}
break;
}
case NKqpRun::TRunnerOptions::ETraceOptType::Script: {
if (ScriptQueries.empty()) {
ythrow yexception() << "Trace opt type script cannot be used without script queries";
}
}
case NKqpRun::TRunnerOptions::ETraceOptType::All: {
if (!SchemeQuery && ScriptQueries.empty()) {
ythrow yexception() << "Trace opt type all cannot be used without any queries";
}
}
case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: {
break;
}
}
}

private:
template <typename TValue>
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
Expand Down Expand Up @@ -278,7 +389,6 @@ class TMain : public TMainClassArgs {
TVector<TString> UdfsPaths;
TString UdfsDirectory;
bool ExcludeLinkedUdfs = false;
ui64 ResultsRowsLimit = 1000;
bool EmulateYt = false;

static TString LoadFile(const TString& file) {
Expand Down Expand Up @@ -415,8 +525,8 @@ class TMain : public TMainClassArgs {
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);
options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
.RequiredArgument("uint")
.DefaultValue(ResultsRowsLimit)
.StoreResult(&ResultsRowsLimit);
.DefaultValue(0)
.StoreResult(&ExecutionOptions.ResultsRowsLimit);
TChoices<NKqpRun::TRunnerOptions::EResultOutputFormat> resultFormat({
{"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson},
{"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson},
Expand All @@ -441,7 +551,12 @@ class TMain : public TMainClassArgs {
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput);
options.AddLongOption("script-statistics", "File with script inprogress statistics")
.RequiredArgument("file")
.StoreResult(&RunnerOptions.InProgressStatisticsOutputFile);
.StoreMappedResultT<TString>(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) {
if (file == "-") {
ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name";
}
return file;
});
TChoices<NYdb::NConsoleClient::EDataFormat> planFormat({
{"pretty", NYdb::NConsoleClient::EDataFormat::Pretty},
{"table", NYdb::NConsoleClient::EDataFormat::PrettyTable},
Expand All @@ -453,6 +568,15 @@ class TMain : public TMainClassArgs {
.Choices(planFormat.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.PlanOutputFormat, planFormat);

options.AddLongOption("script-timeline-file", "File with script query timline in svg format")
.RequiredArgument("file")
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) {
if (file == "-") {
ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name";
}
return file;
});

// Pipeline settings

TChoices<TExecutionOptions::EExecutionCase> executionCase({
Expand All @@ -463,7 +587,6 @@ class TMain : public TMainClassArgs {
});
options.AddLongOption('C', "execution-case", "Type of query for -p argument")
.RequiredArgument("query-type")
.DefaultValue("script")
.Choices(executionCase.GetChoices())
.Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) {
TString choice(option->CurValOrDef());
Expand All @@ -489,13 +612,16 @@ class TMain : public TMainClassArgs {
});
options.AddLongOption('A', "script-action", "Script query execute action")
.RequiredArgument("script-action")
.DefaultValue("execute")
.Choices(scriptAction.GetChoices())
.Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) {
TString choice(option->CurValOrDef());
ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice));
});

options.AddLongOption("timeout", "Reauests timeout in milliseconds")
.RequiredArgument("uint")
.StoreMappedResultT<ui64>(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds")
.RequiredArgument("uint")
.StoreMappedResultT<ui64>(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds<ui64>);
Expand All @@ -510,7 +636,7 @@ class TMain : public TMainClassArgs {
.StoreResult(&ExecutionOptions.LoopCount);
options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps")
.RequiredArgument("uint")
.DefaultValue(1000)
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption('D', "database", "Database path for -p queries")
Expand Down Expand Up @@ -576,6 +702,21 @@ class TMain : public TMainClassArgs {
.RequiredArgument("path")
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);

options.AddLongOption("storage-size", "Domain storage size in gigabytes")
.RequiredArgument("uint")
.DefaultValue(32)
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
return static_cast<ui64>(diskSize) << 30;
});

options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)")
.NoArgument()
.SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks);

options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster")
.NoArgument()
.SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock);

TChoices<std::function<void()>> backtrace({
{"heavy", &NKikimr::EnableYDBBacktraceFormat},
{"light", []() { SetFormatBackTraceFn(FormatBackTrace); }}
Expand All @@ -591,13 +732,17 @@ class TMain : public TMainClassArgs {
}

int DoRun(NLastGetopt::TOptsParseResult&&) override {
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute";
ExecutionOptions.Validate(RunnerOptions);

if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) {
ythrow yexception() << "Disable disk mock cannot be used for multi node clusters";
}

RunnerOptions.YdbSettings.YqlToken = YqlToken;
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit);
if (ExecutionOptions.ResultsRowsLimit) {
RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
}

if (EmulateYt) {
const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage();
Expand Down
6 changes: 6 additions & 0 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ struct TYdbSetupSettings {
std::unordered_set<TString> SharedTenants;
std::unordered_set<TString> ServerlessTenants;
TDuration InitializationTimeout = TDuration::Seconds(10);
TDuration RequestsTimeout;

bool DisableDiskMock = false;
bool UseRealPDisks = false;
ui64 DiskSize = 32_GB;

bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;
Expand Down Expand Up @@ -69,6 +74,7 @@ struct TRunnerOptions {
IOutputStream* SchemeQueryAstOutput = nullptr;
IOutputStream* ScriptQueryAstOutput = nullptr;
IOutputStream* ScriptQueryPlanOutput = nullptr;
TString ScriptQueryTimelineFile;
TString InProgressStatisticsOutputFile;

EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
Expand Down
Loading

0 comments on commit c9c3658

Please sign in to comment.