Skip to content

Commit

Permalink
YQ-4046 KqpRun fixed storage settings (#14065)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jan 31, 2025
1 parent 79d9940 commit 9b70c59
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 84 deletions.
23 changes: 16 additions & 7 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,10 +752,12 @@ class TMain : public TMainClassArgs {
.DefaultValue(0)
.StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit);

options.AddLongOption("verbose", "Common verbose level (max level 2)")
options.AddLongOption("verbose", TStringBuilder() << "Common verbose level (max level " << static_cast<ui32>(TYdbSetupSettings::EVerbose::Max) - 1 << ")")
.RequiredArgument("uint")
.DefaultValue(1)
.StoreResult(&RunnerOptions.YdbSettings.VerboseLevel);
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EVerbose::Info))
.StoreMappedResultT<ui8>(&RunnerOptions.YdbSettings.VerboseLevel, [](ui8 value) {
return static_cast<TYdbSetupSettings::EVerbose>(std::min(value, static_cast<ui8>(TYdbSetupSettings::EVerbose::Max)));
});

TChoices<TAsyncQueriesSettings::EVerbose> verbose({
{"each-query", TAsyncQueriesSettings::EVerbose::EachQuery},
Expand Down Expand Up @@ -857,10 +859,17 @@ class TMain : public TMainClassArgs {
.NoArgument()
.SetFlag(&EmulateYt);

options.AddLongOption('H', "health-check", "Level of health check before start (max level 2)")
options.AddLongOption('H', "health-check", TStringBuilder() << "Level of health check before start (max level " << static_cast<ui32>(TYdbSetupSettings::EHealthCheck::Max) - 1 << ")")
.RequiredArgument("uint")
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::NodesCount))
.StoreMappedResultT<ui8>(&RunnerOptions.YdbSettings.HealthCheckLevel, [](ui8 value) {
return static_cast<TYdbSetupSettings::EHealthCheck>(std::min(value, static_cast<ui8>(TYdbSetupSettings::EHealthCheck::Max)));
});

options.AddLongOption("health-check-timeout", "Health check timeout in seconds")
.RequiredArgument("uint")
.DefaultValue(1)
.StoreResult(&RunnerOptions.YdbSettings.HealthCheckLevel);
.DefaultValue(10)
.StoreMappedResultT<ui64>(&RunnerOptions.YdbSettings.HealthCheckTimeout, &TDuration::Seconds<ui64>);

options.AddLongOption("domain", "Test cluster domain name")
.RequiredArgument("name")
Expand All @@ -879,7 +888,7 @@ class TMain : public TMainClassArgs {
.RequiredArgument("path")
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);

options.AddLongOption("storage-size", "Domain storage size in gigabytes (32 GiB by default)")
options.AddLongOption("storage-size", TStringBuilder() << "Domain storage size in gigabytes (" << NKikimr::NBlobDepot::FormatByteSize(DEFAULT_STORAGE_SIZE) << " by default)")
.RequiredArgument("uint")
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
return static_cast<ui64>(diskSize) << 30;
Expand Down
130 changes: 79 additions & 51 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,19 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
}

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
if (ProgressCallback_) {
ProgressCallback_(QueryId_, ev->Get()->Record);
try {
if (ProgressCallback_) {
ProgressCallback_(QueryId_, ev->Get()->Record);
}
} catch (...) {
Cerr << CerrColors_.Red() << "Got unexpected exception during progress callback: " << CurrentExceptionMessage() << CerrColors_.Default() << Endl;
}
}

private:
const ui32 TargetNode_ = 0;
const size_t QueryId_ = 0;
const NColorizer::TColors CerrColors_ = NColorizer::AutoColors(Cerr);

std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<TQueryResponse> Promise_;
Expand Down Expand Up @@ -229,48 +234,61 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
};

class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
using IRetryPolicy = IRetryPolicy<bool>;
using EVerbose = TYdbSetupSettings::EVerbose;
using EHealthCheck = TYdbSetupSettings::EHealthCheck;

static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);

public:
TResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings)
: Settings_(settings)
, RetryPolicy_(IRetryPolicy::GetExponentialBackoffPolicy(
&TResourcesWaiterActor::Retryable, REFRESH_PERIOD,
TDuration::MilliSeconds(100), TDuration::Seconds(1),
std::numeric_limits<size_t>::max(), std::max(2 * REFRESH_PERIOD, Settings_.HealthCheckTimeout)
))
, Promise_(promise)
{}

void Bootstrap() {
if (Settings_.HealthCheckLevel < 1) {
Become(&TResourcesWaiterActor::StateFunc);

HealthCheckStage_ = EHealthCheck::NodesCount;
DoHealthCheck();
}

void DoHealthCheck() {
if (Settings_.HealthCheckLevel < HealthCheckStage_) {
Finish();
return;
}

Become(&TResourcesWaiterActor::StateWaitNodeCont);
CheckResourcesPublish();
}
switch (HealthCheckStage_) {
case TYdbSetupSettings::EHealthCheck::NodesCount:
CheckResourcesPublish();
break;

void HandleWaitNodeCountWakeup() {
CheckResourcesPublish();
}
case TYdbSetupSettings::EHealthCheck::ScriptRequest:
StartScriptQuery();
break;

void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
const auto nodeCont = ev->Get()->NodeCount;
if (nodeCont == Settings_.ExpectedNodeCount) {
if (Settings_.HealthCheckLevel < 2) {
case TYdbSetupSettings::EHealthCheck::None:
case TYdbSetupSettings::EHealthCheck::Max:
Finish();
} else {
Become(&TResourcesWaiterActor::StateWaitScript);
StartScriptQuery();
}
return;
break;
}
}

if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry invalid node count, got " << nodeCont << ", expected " << Settings_.ExpectedNodeCount << CoutColors_.Default() << Endl;
void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
const auto nodeCount = ev->Get()->NodeCount;
if (nodeCount == Settings_.ExpectedNodeCount) {
HealthCheckStage_ = EHealthCheck::ScriptRequest;
DoHealthCheck();
return;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
}

void HandleWaitScriptWakeup() {
StartScriptQuery();
Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true);
}

void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
Expand All @@ -280,45 +298,26 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
return;
}

if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry script creation fail with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
Retry(TStringBuilder() << "script creation fail with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString(), true);
}

STRICT_STFUNC(StateWaitNodeCont,
sFunc(NActors::TEvents::TEvWakeup, HandleWaitNodeCountWakeup);
STRICT_STFUNC(StateFunc,
sFunc(NActors::TEvents::TEvWakeup, DoHealthCheck);
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
)

STRICT_STFUNC(StateWaitScript,
sFunc(NActors::TEvents::TEvWakeup, HandleWaitScriptWakeup);
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
)

private:
void CheckResourcesPublish() {
GetResourceManager();

if (!ResourceManager_) {
if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry uninitialized resource manager" << CoutColors_.Default() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
return;
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
}

UpdateResourcesInfo();
}

void GetResourceManager() {
if (ResourceManager_) {
if (!ResourceManager_) {
Retry("uninitialized resource manager", true);
return;
}
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
}

void UpdateResourcesInfo() const {
ResourceManager_->RequestClusterResourcesInfo(
[selfId = SelfId(), actorContext = ActorContext()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size()));
Expand All @@ -338,20 +337,49 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
}

void Retry(const TString& message, bool shortRetry) {
if (RetryState_ == nullptr) {
RetryState_ = RetryPolicy_->CreateRetryState();
}

if (auto delay = RetryState_->GetNextRetryDelay(shortRetry)) {
if (Settings_.VerboseLevel >= EVerbose::InitLogs) {
Cout << CoutColors_.Cyan() << "Retry in " << *delay << " " << message << CoutColors_.Default() << Endl;
}
Schedule(*delay, new NActors::TEvents::TEvWakeup());
} else {
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded, use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
}
}

void Finish() {
Promise_.SetValue();
PassAway();
}

void Fail(const TString& error) {
Promise_.SetException(error);
PassAway();
}

static ERetryErrorClass Retryable(bool shortRetry) {
return shortRetry ? ERetryErrorClass::ShortRetry : ERetryErrorClass::LongRetry;
}

private:
const TWaitResourcesSettings Settings_;
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
const IRetryPolicy::TPtr RetryPolicy_;
IRetryPolicy::IRetryState::TPtr RetryState_ = nullptr;
NThreading::TPromise<void> Promise_;

EHealthCheck HealthCheckStage_ = EHealthCheck::None;
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
};

class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderActor> {
using EVerbose = TYdbSetupSettings::EVerbose;

public:
TSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise)
: TargetNode_(request.TargetNode)
Expand All @@ -375,7 +403,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
}

SessionId_ = response.GetResponse().GetSessionId();
if (VerboseLevel_ >= 1) {
if (VerboseLevel_ >= EVerbose::Info) {
Cout << CoutColors_.Cyan() << "Created new session on node " << TargetNode_ << " with id " << SessionId_ << "\n";
}

Expand Down Expand Up @@ -453,7 +481,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
private:
const ui32 TargetNode_;
const TString TraceId_;
const ui8 VerboseLevel_;
const EVerbose VerboseLevel_;
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);

std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Request_;
Expand Down
7 changes: 4 additions & 3 deletions ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ struct TQueryRequest {
struct TCreateSessionRequest {
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Event;
ui32 TargetNode;
ui8 VerboseLevel;
TYdbSetupSettings::EVerbose VerboseLevel;
};

struct TWaitResourcesSettings {
i32 ExpectedNodeCount;
ui8 HealthCheckLevel;
ui8 VerboseLevel;
TYdbSetupSettings::EHealthCheck HealthCheckLevel;
TDuration HealthCheckTimeout;
TYdbSetupSettings::EVerbose VerboseLevel;
TString Database;
};

Expand Down
22 changes: 19 additions & 3 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
namespace NKqpRun {

constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
constexpr ui64 DEFAULT_STORAGE_SIZE = 32_GB;

struct TAsyncQueriesSettings {
enum class EVerbose {
Expand All @@ -30,13 +31,28 @@ struct TAsyncQueriesSettings {
};

struct TYdbSetupSettings {
enum class EVerbose {
None,
Info,
QueriesText,
InitLogs,
Max
};

enum class EHealthCheck {
None,
NodesCount,
ScriptRequest,
Max
};

ui32 NodeCount = 1;
TString DomainName = "Root";
std::unordered_set<TString> DedicatedTenants;
std::unordered_set<TString> SharedTenants;
std::unordered_set<TString> ServerlessTenants;
TDuration InitializationTimeout = TDuration::Seconds(10);
ui8 HealthCheckLevel = 1;
TDuration HealthCheckTimeout = TDuration::Seconds(10);
EHealthCheck HealthCheckLevel = EHealthCheck::NodesCount;
bool SameSession = false;

bool DisableDiskMock = false;
Expand All @@ -52,7 +68,7 @@ struct TYdbSetupSettings {

bool TraceOptEnabled = false;
TString LogOutputFile;
ui8 VerboseLevel = 1;
EVerbose VerboseLevel = EVerbose::Info;

TString YqlToken;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
Expand Down
Loading

0 comments on commit 9b70c59

Please sign in to comment.