Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3051 add AST compression #4151

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2073,7 +2073,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));

// Create finalize script service
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig().GetFinalizeScriptServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ struct TEvScriptFinalizeRequest : public NActors::TEventLocal<TEvScriptFinalizeR
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<ui64> LeaseGeneration;
std::optional<TString> QueryAstCompressionMethod;
};

TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
Expand Down
125 changes: 75 additions & 50 deletions ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include "kqp_finalize_script_actor.h"

#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/events/events.h>

#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>

#include <ydb/core/tx/datashard/const.h>

#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
Expand All @@ -18,21 +21,42 @@ namespace {
class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
public:
TScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
: ReplyActor_(request->Sender)
, ExecutionId_(request->Get()->Description.ExecutionId)
, Database_(request->Get()->Description.Database)
, FinalizationStatus_(request->Get()->Description.FinalizationStatus)
, Request_(std::move(request))
, FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds()))
, MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
, FederatedQuerySetup_(federatedQuerySetup)
: ReplyActor(request->Sender)
, ExecutionId(request->Get()->Description.ExecutionId)
, Database(request->Get()->Description.Database)
, FinalizationStatus(request->Get()->Description.FinalizationStatus)
, Request(std::move(request))
, FinalizationTimeout(TDuration::Seconds(queryServiceConfig.GetFinalizeScriptServiceConfig().GetScriptFinalizationTimeoutSeconds()))
, MaximalSecretsSnapshotWaitTime(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
, FederatedQuerySetup(federatedQuerySetup)
, Compressor(queryServiceConfig.GetQueryArtifactsCompressionMethod(), queryServiceConfig.GetQueryArtifactsCompressionMinSize())
{}

void CompressScriptArtifacts() const {
auto& description = Request->Get()->Description;
auto ast = description.QueryAst;
if (Compressor.IsEnabled() && ast) {
const auto& [astCompressionMethod, astCompressed] = Compressor.Compress(*ast);
description.QueryAstCompressionMethod = astCompressionMethod;
description.QueryAst = astCompressed;
}

if (description.QueryAst && description.QueryAst->size() > NDataShard::NLimits::MaxWriteValueSize) {
NYql::TIssue astTruncatedIssue(TStringBuilder() << "Query ast size is " << description.QueryAst->size() << " bytes, that is larger than allowed limit " << NDataShard::NLimits::MaxWriteValueSize << " bytes, ast was truncated");
astTruncatedIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
description.Issues.AddIssue(astTruncatedIssue);

description.QueryAst = ast->substr(0, NDataShard::NLimits::MaxWriteValueSize - 1_KB) + "...\n(TRUNCATED)";
description.QueryAstCompressionMethod = std::nullopt;
}
}

void Bootstrap() {
Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request_)));
CompressScriptArtifacts();
Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request)));
Become(&TScriptFinalizerActor::FetchState);
}

Expand All @@ -46,17 +70,17 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
return;
}

Schedule(FinalizationTimeout_, new TEvents::TEvWakeup());
Schedule(FinalizationTimeout, new TEvents::TEvWakeup());
Become(&TScriptFinalizerActor::PrepareState);

CustomerSuppliedId_ = ev->Get()->CustomerSuppliedId;
Sinks_ = std::move(ev->Get()->Sinks);
UserToken_ = ev->Get()->UserToken;
SecretNames_ = std::move(ev->Get()->SecretNames);
CustomerSuppliedId = ev->Get()->CustomerSuppliedId;
Sinks = std::move(ev->Get()->Sinks);
UserToken = ev->Get()->UserToken;
SecretNames = std::move(ev->Get()->SecretNames);

if (Sinks_.empty()) {
if (Sinks.empty()) {
FinishScriptFinalization();
} else if (SecretNames_.empty()) {
} else if (SecretNames.empty()) {
ComputeScriptExternalEffect();
} else {
FetchSecrets();
Expand All @@ -75,7 +99,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void FetchSecrets() {
RegisterDescribeSecretsActor(SelfId(), UserToken_, SecretNames_, ActorContext().ActorSystem(), MaximalSecretsSnapshotWaitTime_);
RegisterDescribeSecretsActor(SelfId(), UserToken, SecretNames, ActorContext().ActorSystem(), MaximalSecretsSnapshotWaitTime);
}

void Handle(TEvDescribeSecretsResponse::TPtr& ev) {
Expand All @@ -90,15 +114,15 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
void FillSecureParams(const std::vector<TString>& secretValues) {
std::map<TString, TString> secretsMap;
for (size_t i = 0; i < secretValues.size(); ++i) {
secretsMap.emplace(SecretNames_[i], secretValues[i]);
secretsMap.emplace(SecretNames[i], secretValues[i]);
}

for (const auto& sink : Sinks_) {
auto sinkName = sink.GetSinkName();
for (const auto& sink : Sinks) {
const auto& sinkName = sink.GetSinkName();

if (sinkName) {
const auto& structuredToken = NYql::CreateStructuredTokenParser(sink.GetAuthInfo()).ToBuilder().ReplaceReferences(secretsMap).ToJson();
SecureParams_.emplace(sinkName, structuredToken);
SecureParams.emplace(sinkName, structuredToken);
}
}

Expand All @@ -122,7 +146,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
NYql::NDqProto::TExternalEffect externalEffectS3;
externalEffectS3.SetProviderName(TString(NYql::S3ProviderName));

for (const auto& sink : Sinks_) {
for (const auto& sink : Sinks) {
const TString& sinkType = sink.GetType();

if (sinkType == "S3Sink") {
Expand All @@ -137,20 +161,20 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void RunS3ApplicatorActor(const NYql::NDqProto::TExternalEffect& externalEffect) {
if (!FederatedQuerySetup_) {
if (!FederatedQuerySetup) {
FinishScriptFinalization(Ydb::StatusIds::INTERNAL_ERROR, "unable to aplicate s3 external effect, invalid federated query setup");
return;
}

Register(NYql::NDq::MakeS3ApplicatorActor(
SelfId(),
FederatedQuerySetup_->HttpGateway,
FederatedQuerySetup->HttpGateway,
CreateGuidAsString(),
CustomerSuppliedId_,
CustomerSuppliedId,
std::nullopt,
FinalizationStatus_ == EFinalizationStatus::FS_COMMIT,
THashMap<TString, TString>(SecureParams_.begin(), SecureParams_.end()),
FederatedQuerySetup_->CredentialsFactory,
FinalizationStatus == EFinalizationStatus::FS_COMMIT,
THashMap<TString, TString>(SecureParams.begin(), SecureParams.end()),
FederatedQuerySetup->CredentialsFactory,
externalEffect
).Release());
}
Expand All @@ -172,7 +196,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
)

void FinishScriptFinalization(std::optional<Ydb::StatusIds::StatusCode> status, NYql::TIssues issues) {
Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId_, Database_, status, std::move(issues)));
Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId, Database, status, std::move(issues)));
Become(&TScriptFinalizerActor::FinishState);
}

Expand All @@ -189,38 +213,39 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void Reply(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
Send(ReplyActor_, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues)));
Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_));
Send(ReplyActor, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues)));
Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId));

PassAway();
}

private:
TActorId ReplyActor_;
TString ExecutionId_;
TString Database_;
EFinalizationStatus FinalizationStatus_;
TEvScriptFinalizeRequest::TPtr Request_;

TDuration FinalizationTimeout_;
TDuration MaximalSecretsSnapshotWaitTime_;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup_;

TString CustomerSuppliedId_;
std::vector<NKqpProto::TKqpExternalSink> Sinks_;

TString UserToken_;
std::vector<TString> SecretNames_;
std::unordered_map<TString, TString> SecureParams_;
const TActorId ReplyActor;
const TString ExecutionId;
const TString Database;
const EFinalizationStatus FinalizationStatus;
TEvScriptFinalizeRequest::TPtr Request;

const TDuration FinalizationTimeout;
const TDuration MaximalSecretsSnapshotWaitTime;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
const NFq::TCompressor Compressor;

TString CustomerSuppliedId;
std::vector<NKqpProto::TKqpExternalSink> Sinks;

TString UserToken;
std::vector<TString> SecretNames;
std::unordered_map<TString, TString> SecureParams;
};

} // anonymous namespace

IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup) {
return new TScriptFinalizerActor(std::move(request), finalizeScriptServiceConfig, metadataProviderConfig, federatedQuerySetup);
return new TScriptFinalizerActor(std::move(request), queryServiceConfig, metadataProviderConfig, federatedQuerySetup);
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr::NKqp {

IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ namespace {

class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptService> {
public:
TKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
TKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory)
: FinalizeScriptServiceConfig_(finalizeScriptServiceConfig)
, MetadataProviderConfig_(metadataProviderConfig)
, FederatedQuerySetupFactory_(federatedQuerySetupFactory)
: QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, FederatedQuerySetupFactory(federatedQuerySetupFactory)
{}

void Bootstrap(const TActorContext &ctx) {
FederatedQuerySetup_ = FederatedQuerySetupFactory_->Make(ctx.ActorSystem());
FederatedQuerySetup = FederatedQuerySetupFactory->Make(ctx.ActorSystem());

Become(&TKqpFinalizeScriptService::MainState);
}
Expand All @@ -40,10 +40,10 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
void Handle(TEvScriptFinalizeRequest::TPtr& ev) {
TString executionId = ev->Get()->Description.ExecutionId;

if (!FinalizationRequestsQueue_.contains(executionId)) {
WaitingFinalizationExecutions_.push(executionId);
if (!FinalizationRequestsQueue.contains(executionId)) {
WaitingFinalizationExecutions.push(executionId);
}
FinalizationRequestsQueue_[executionId].emplace_back(std::move(ev));
FinalizationRequestsQueue[executionId].emplace_back(std::move(ev));

TryStartFinalizeRequest();
}
Expand All @@ -60,39 +60,39 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe

private:
void TryStartFinalizeRequest() {
if (FinalizationRequestsInFlight_ >= FinalizeScriptServiceConfig_.GetMaxInFlightFinalizationsCount() || WaitingFinalizationExecutions_.empty()) {
if (FinalizationRequestsInFlight >= QueryServiceConfig.GetFinalizeScriptServiceConfig().GetMaxInFlightFinalizationsCount() || WaitingFinalizationExecutions.empty()) {
return;
}

TString executionId = WaitingFinalizationExecutions_.front();
WaitingFinalizationExecutions_.pop();
TString executionId = WaitingFinalizationExecutions.front();
WaitingFinalizationExecutions.pop();

auto& queue = FinalizationRequestsQueue_[executionId];
auto& queue = FinalizationRequestsQueue[executionId];
Y_ENSURE(!queue.empty());

StartFinalizeRequest(std::move(queue.back()));
queue.pop_back();
}

void StartFinalizeRequest(TEvScriptFinalizeRequest::TPtr request) {
++FinalizationRequestsInFlight_;
++FinalizationRequestsInFlight;

Register(CreateScriptFinalizerActor(
std::move(request),
FinalizeScriptServiceConfig_,
MetadataProviderConfig_,
FederatedQuerySetup_
QueryServiceConfig,
MetadataProviderConfig,
FederatedQuerySetup
));
}

void Handle(TEvScriptFinalizeResponse::TPtr& ev) {
--FinalizationRequestsInFlight_;
--FinalizationRequestsInFlight;
TString executionId = ev->Get()->ExecutionId;

if (!FinalizationRequestsQueue_[executionId].empty()) {
WaitingFinalizationExecutions_.push(executionId);
if (!FinalizationRequestsQueue[executionId].empty()) {
WaitingFinalizationExecutions.push(executionId);
} else {
FinalizationRequestsQueue_.erase(executionId);
FinalizationRequestsQueue.erase(executionId);
}
TryStartFinalizeRequest();
}
Expand Down Expand Up @@ -122,23 +122,23 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
}

private:
NKikimrConfig::TFinalizeScriptServiceConfig FinalizeScriptServiceConfig_;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig_;
const NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
const NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;

IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory_;
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup_;
IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory;
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;

ui32 FinalizationRequestsInFlight_ = 0;
std::queue<TString> WaitingFinalizationExecutions_;
std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue_;
ui32 FinalizationRequestsInFlight = 0;
std::queue<TString> WaitingFinalizationExecutions;
std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue;
};

} // anonymous namespace

IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory) {
return new TKqpFinalizeScriptService(finalizeScriptServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory));
return new TKqpFinalizeScriptService(queryServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory));
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace NKikimr::NKqp {

IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory);

Expand Down
Loading
Loading