Skip to content

Commit

Permalink
SQL command ANALYZE (#6996)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashandor789 authored Aug 5, 2024
1 parent b2e3ab3 commit 59654e9
Show file tree
Hide file tree
Showing 28 changed files with 710 additions and 8 deletions.
24 changes: 24 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "kqp_executer_impl.h"

#include <ydb/core/kqp/gateway/actors/scheme.h>
#include <ydb/core/kqp/gateway/actors/analyze_actor.h>
#include <ydb/core/kqp/gateway/local_rpc/helper.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
Expand Down Expand Up @@ -307,6 +308,29 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kAnalyzeTable: {
const auto& analyzeOperation = schemeOp.GetAnalyzeTable();

auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>();

TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()};
IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise);

auto actorSystem = TlsActivationContext->AsActorContext().ExecutorThread.ActorSystem;
RegisterWithSameMailbox(analyzeActor);

auto selfId = SelfId();
analyzePromise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) {
auto ev = MakeHolder<TEvPrivate::TEvResult>();
ev->Result = future.GetValue();

actorSystem->Send(selfId, ev.Release());
});

Become(&TKqpSchemeExecuter::ExecuteState);
return;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
227 changes: 227 additions & 0 deletions ydb/core/kqp/gateway/actors/analyze_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
#include "analyze_actor.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>


namespace NKikimr::NKqp {

enum {
FirstRoundCookie = 0,
SecondRoundCookie = 1,
};

using TNavigate = NSchemeCache::TSchemeCacheNavigate;

void TAnalyzeActor::Bootstrap() {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
auto navigate = std::make_unique<TNavigate>();
auto& entry = navigate->ResultSet.emplace_back();
entry.Path = SplitPath(TablePath);
entry.Operation = TNavigate::EOp::OpTable;
entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
navigate->Cookie = FirstRoundCookie;

Send(NKikimr::MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));

Become(&TAnalyzeActor::StateWork);
}

void TAnalyzeActor::SendAnalyzeStatus() {
Y_ABORT_UNLESS(StatisticsAggregatorId.has_value());

auto getStatus = std::make_unique<NStat::TEvStatistics::TEvAnalyzeStatus>();
auto& record = getStatus->Record;
PathIdFromPathId(PathId, record.MutablePathId());

Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(getStatus.release(), StatisticsAggregatorId.value(), true)
);
}

void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
Y_UNUSED(ctx);

SendAnalyzeStatus();
}

void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
Y_UNUSED(ctx);

SendAnalyzeStatus();
}

void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
switch (record.GetStatus()) {
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_UNSPECIFIED: {
Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "Statistics Aggregator unspecified error"
)
)
);
this->Die(ctx);
return;
}
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION: {
NYql::IKikimrGateway::TGenericResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));

this->Die(ctx);
return;
}
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED: {
Schedule(TDuration::Seconds(10), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck());
return;
}
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS: {
Schedule(TDuration::Seconds(5), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck());
return;
}
}
}

void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
std::unique_ptr<TNavigate> navigate(ev->Get()->Request.Release());
Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);
auto& entry = navigate->ResultSet.front();

if (entry.Status != TNavigate::EStatus::Ok) {
NYql::EYqlIssueCode error;
switch (entry.Status) {
case TNavigate::EStatus::PathErrorUnknown:
case TNavigate::EStatus::RootUnknown:
case TNavigate::EStatus::PathNotTable:
case TNavigate::EStatus::TableCreationNotComplete:
error = NYql::TIssuesIds::KIKIMR_SCHEME_ERROR;
case TNavigate::EStatus::LookupError:
case TNavigate::EStatus::RedirectLookupError:
error = NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
default:
error = NYql::TIssuesIds::DEFAULT_ERROR;
}
Promise.SetValue(
NYql::NCommon::ResultFromIssues<NYql::IKikimrGateway::TGenericResult>(
error,
TStringBuilder() << "Can't get statistics aggregator ID. " << entry.Status,
{}
)
);
this->Die(ctx);
return;
}

if (navigate->Cookie == SecondRoundCookie) {
if (entry.DomainInfo->Params.HasStatisticsAggregator()) {
SendStatisticsAggregatorAnalyze(entry, ctx);
} else {
Promise.SetValue(
NYql::NCommon::ResultFromIssues<NYql::IKikimrGateway::TGenericResult>(
NYql::TIssuesIds::DEFAULT_ERROR,
TStringBuilder() << "Can't get statistics aggregator ID.", {}
)
);
}

this->Die(ctx);
return;
}

PathId = entry.TableId.PathId;

auto& domainInfo = entry.DomainInfo;

auto navigateDomainKey = [this] (TPathId domainKey) {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
auto navigate = std::make_unique<TNavigate>();
auto& entry = navigate->ResultSet.emplace_back();
entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId);
entry.Operation = TNavigate::EOp::OpPath;
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.RedirectRequired = false;
navigate->Cookie = SecondRoundCookie;

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
};

if (!domainInfo->IsServerless()) {
if (domainInfo->Params.HasStatisticsAggregator()) {
SendStatisticsAggregatorAnalyze(entry, ctx);
return;
}

navigateDomainKey(domainInfo->DomainKey);
} else {
navigateDomainKey(domainInfo->ResourcesDomainKey);
}
}

void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) {
Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator());

StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();

auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
auto& record = analyzeRequest->Record;
auto table = record.AddTables();

PathIdFromPathId(PathId, table->MutablePathId());


THashMap<TString, ui32> tagByColumnName;
for (const auto& [_, tableInfo]: entry.Columns) {
tagByColumnName[TString(tableInfo.Name)] = tableInfo.Id;
}

for (const auto& columnName: Columns) {
if (!tagByColumnName.contains(columnName)){
Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "No such column: " << columnName << " in the " << TablePath
)
)
);
this->Die(ctx);
return;
}

*table->MutableColumnTags()->Add() = tagByColumnName[columnName];
}

Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true),
IEventHandle::FlagTrackDelivery
);
}

void TAnalyzeActor::HandleUnexpectedEvent(ui32 typeRewrite) {
ALOG_CRIT(
NKikimrServices::KQP_GATEWAY,
"TAnalyzeActor, unexpected event, request type: " << typeRewrite;
);

Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "Unexpected event: " << typeRewrite
)
)
);

this->PassAway();
}

}// end of NKikimr::NKqp
66 changes: 66 additions & 0 deletions ydb/core/kqp/gateway/actors/analyze_actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/statistics/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>

#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>


namespace NKikimr::NKqp {


struct TEvAnalyzePrivate {
enum EEv {
EvAnalyzeStatusCheck = EventSpaceBegin(TEvents::ES_PRIVATE),
EvEnd
};

struct TEvAnalyzeStatusCheck : public TEventLocal<TEvAnalyzeStatusCheck, EvAnalyzeStatusCheck> {};
};

class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
public:
TAnalyzeActor(TString tablePath, TVector<TString> columns, NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> promise)
: TablePath(tablePath)
, Columns(columns)
, Promise(promise)
{}

void Bootstrap();

STFUNC(StateWork) {
switch(ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
HFunc(NStat::TEvStatistics::TEvAnalyzeResponse, Handle);
HFunc(NStat::TEvStatistics::TEvAnalyzeStatusResponse, Handle);
HFunc(TEvAnalyzePrivate::TEvAnalyzeStatusCheck, Handle);
default:
HandleUnexpectedEvent(ev->GetTypeRewrite());
}
}

private:
void Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx);

void Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx);

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);

void Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx);

void HandleUnexpectedEvent(ui32 typeRewrite);

void SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry&, const TActorContext&);

void SendAnalyzeStatus();

private:
TString TablePath;
TVector<TString> Columns;
NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> Promise;
// For Statistics Aggregator
std::optional<ui64> StatisticsAggregatorId;
TPathId PathId;
};

} // end of NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/gateway/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
scheme.cpp
analyze_actor.cpp
)

PEERDIR(
Expand All @@ -11,6 +12,7 @@ PEERDIR(
ydb/library/yql/providers/common/gateway
ydb/core/tx/schemeshard
ydb/library/actors/core
ydb/library/services
)

YQL_LAST_ABI_VERSION()
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "kqp_gateway.h"
#include "actors/kqp_ic_gateway_actors.h"
#include "actors/analyze_actor.h"
#include "actors/scheme.h"
#include "kqp_metadata_loader.h"
#include "local_rpc/helper.h"
Expand Down Expand Up @@ -1386,6 +1387,22 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> Analyze(const TString& cluster, const NYql::TAnalyzeSettings& settings) override {
try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}

auto analyzePromise = NewPromise<TGenericResult>();
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
RegisterActor(analyzeActor);

return analyzePromise.GetFuture();
} catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

template <class TSettings>
class IObjectModifier {
public:
Expand Down
Loading

0 comments on commit 59654e9

Please sign in to comment.