Skip to content

Commit

Permalink
KqpRun sync stable version with main (ydb-platform#6671)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored and uzhastik committed Sep 11, 2024
1 parent 68e6a16 commit 142702a
Show file tree
Hide file tree
Showing 11 changed files with 948 additions and 292 deletions.
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ udfs
*.json
*.sql
*.bin
*.txt
6 changes: 6 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ LogConfig {

QueryServiceConfig {
MdbTransformHost: false
ProgressStatsPeriodMs: 1000
QueryArtifactsCompressionMethod: "zstd_6"
ScriptResultRowsLimit: 0
ScriptResultSizeLimit: 10485760
Expand Down Expand Up @@ -66,6 +67,10 @@ QueryServiceConfig {
MinDesiredDirectoriesOfFilesPerQuery: 1000
RegexpCacheSize: 100

DefaultSettings {
Name: "AtomicUploadCommit"
Value: "true"
}
DefaultSettings {
Name: "UseBlocksSource"
Value: "true"
Expand All @@ -91,6 +96,7 @@ ResourceBrokerConfig {
TableServiceConfig {
BindingsMode: BM_DROP
CompileTimeoutMs: 600000
EnableOlapSink: true
SessionsLimitPerNode: 1000

QueryLimits {
Expand Down
610 changes: 383 additions & 227 deletions ydb/tests/tools/kqprun/kqprun.cpp

Large diffs are not rendered by default.

229 changes: 209 additions & 20 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "actors.h"

#include <library/cpp/colorizer/colors.h>

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>


namespace NKqpRun {
Expand All @@ -9,33 +12,33 @@ namespace {

class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMock> {
public:
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets)
: Request_(std::move(request))
TRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback)
: TargetNode_(request.TargetNode)
, Request_(std::move(request.Event))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
, ResultSizeLimit_(std::numeric_limits<i64>::max())
, ResultSets_(resultSets)
, ProgressCallback_(progressCallback)
{
if (resultRowsLimit) {
ResultRowsLimit_ = resultRowsLimit;
if (request.ResultRowsLimit) {
ResultRowsLimit_ = request.ResultRowsLimit;
}
if (resultSizeLimit) {
ResultSizeLimit_ = resultSizeLimit;
if (request.ResultSizeLimit) {
ResultSizeLimit_ = request.ResultSizeLimit;
}
}

void Bootstrap() {
NActors::ActorIdToProto(SelfId(), Request_->Record.MutableRequestActorId());
Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), std::move(Request_));
Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode_), std::move(Request_));

Become(&TRunScriptActorMock::StateFunc);
}

STRICT_STFUNC(StateFunc,
hFunc(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData, Handle);
hFunc(NKikimr::NKqp::TEvKqp::TEvQueryResponse, Handle);
hFunc(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
)

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
Expand All @@ -46,47 +49,233 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
if (resultSetIndex >= ResultSets_.size()) {
ResultSets_.resize(resultSetIndex + 1);
ResultSetSizes_.resize(resultSetIndex + 1, 0);
}

if (!ResultSets_[resultSetIndex].truncated()) {
ui64& resultSetSize = ResultSetSizes_[resultSetIndex];
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
if (static_cast<ui64>(ResultSets_[resultSetIndex].rows_size()) >= ResultRowsLimit_) {
ResultSets_[resultSetIndex].set_truncated(true);
break;
}

if (ResultSets_[resultSetIndex].ByteSizeLong() + row.ByteSizeLong() > ResultSizeLimit_) {
auto rowSize = row.ByteSizeLong();
if (resultSetSize + rowSize > ResultSizeLimit_) {
ResultSets_[resultSetIndex].set_truncated(true);
break;
}

resultSetSize += rowSize;
*ResultSets_[resultSetIndex].add_rows() = std::move(row);
}
*ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns();
if (!ResultSets_[resultSetIndex].columns_size()) {
*ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns();
}
}

Send(ev->Sender, response.Release());
}

void Handle(NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
Promise_.SetValue(std::move(ev));
Promise_.SetValue(TQueryResponse{.Response = std::move(ev), .ResultSets = std::move(ResultSets_)});
PassAway();
}

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
if (ProgressCallback_) {
ProgressCallback_(ev->Get()->Record);
}
}

private:
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
ui32 TargetNode_ = 0;
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<TQueryResponse> Promise_;
ui64 ResultRowsLimit_;
ui64 ResultSizeLimit_;
std::vector<Ydb::ResultSet>& ResultSets_;
TProgressCallback ProgressCallback_;
std::vector<Ydb::ResultSet> ResultSets_;
std::vector<ui64> ResultSetSizes_;
};

class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
using TBase = NActors::TActor<TAsyncQueryRunnerActor>;

public:
TAsyncQueryRunnerActor(ui64 inFlightLimit)
: TBase(&TAsyncQueryRunnerActor::StateFunc)
, InFlightLimit_(inFlightLimit)
{
RunningRequests_.reserve(InFlightLimit_);
}

STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvStartAsyncQuery, Handle);
hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle);
hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle);
)

void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) {
DelayedRequests_.emplace(std::move(ev));
StartDelayedRequests();
}

void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
const ui64 requestId = ev->Get()->RequestId;
RunningRequests_.erase(requestId);

const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef();
const auto status = response.GetYdbStatus();

if (status == Ydb::StatusIds::SUCCESS) {
Completed_++;
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
} else {
Failed_++;
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues);
Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default();
}

StartDelayedRequests();
TryFinalize();
}

void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) {
FinalizePromise_ = ev->Get()->FinalizePromise;
if (!TryFinalize()) {
Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl;
}
}

private:
void StartDelayedRequests() {
while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) {
auto request = std::move(DelayedRequests_.front());
DelayedRequests_.pop();

auto promise = NThreading::NewPromise<TQueryResponse>();
Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr));
RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
});

MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";

RequestId_++;
request->Get()->StartPromise.SetValue();
}
}

bool TryFinalize() {
if (!FinalizePromise_ || !RunningRequests_.empty()) {
return false;
}

FinalizePromise_->SetValue();
PassAway();
return true;
}

TString GetInfoString() const {
return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
}

private:
const ui64 InFlightLimit_;
const TInstant StartTime_ = TInstant::Now();
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);

std::optional<NThreading::TPromise<void>> FinalizePromise_;
std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
std::unordered_map<ui64, NThreading::TFuture<TQueryResponse>> RunningRequests_;

ui64 RequestId_ = 1;
ui64 MaxInFlight_ = 0;
ui64 Completed_ = 0;
ui64 Failed_ = 0;
};

class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);

public:
TResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
: ExpectedNodeCount_(expectedNodeCount)
, Promise_(promise)
{}

void Bootstrap() {
Become(&TResourcesWaiterActor::StateFunc);
CheckResourcesPublish();
}

void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
CheckResourcesPublish();
}

void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
if (ev->Get()->NodeCount == ExpectedNodeCount_) {
Promise_.SetValue();
PassAway();
return;
}

Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
}

STRICT_STFUNC(StateFunc,
hFunc(NActors::TEvents::TEvWakeup, Handle);
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
)

private:
void CheckResourcesPublish() {
GetResourceManager();

if (!ResourceManager_) {
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
return;
}

UpdateResourcesInfo();
}

void GetResourceManager() {
if (ResourceManager_) {
return;
}
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
}

void UpdateResourcesInfo() const {
ResourceManager_->RequestClusterResourcesInfo(
[selfId = SelfId(), actorContext = ActorContext()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size()));
});
}

private:
const i32 ExpectedNodeCount_;
NThreading::TPromise<void> Promise_;

std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
};

} // anonymous namespace

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets);
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback) {
return new TRunScriptActorMock(std::move(request), promise, progressCallback);
}

NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) {
return new TAsyncQueryRunnerActor(inFlightLimit);
}

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
return new TResourcesWaiterActor(promise, expectedNodeCount);
}

} // namespace NKqpRun
Loading

0 comments on commit 142702a

Please sign in to comment.