Skip to content

Commit

Permalink
YQ-3051 add AST compression (#3963)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Apr 26, 2024
1 parent 7999411 commit 9d869e0
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 91 deletions.
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2073,7 +2073,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));

// Create finalize script service
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig().GetFinalizeScriptServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ struct TEvScriptFinalizeRequest : public NActors::TEventLocal<TEvScriptFinalizeR
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<ui64> LeaseGeneration;
std::optional<TString> QueryAstCompressionMethod;
};

TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
Expand Down
125 changes: 75 additions & 50 deletions ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include "kqp_finalize_script_actor.h"

#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/events/events.h>

#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>

#include <ydb/core/tx/datashard/const.h>

#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
Expand All @@ -18,21 +21,42 @@ namespace {
class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
public:
TScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
: ReplyActor_(request->Sender)
, ExecutionId_(request->Get()->Description.ExecutionId)
, Database_(request->Get()->Description.Database)
, FinalizationStatus_(request->Get()->Description.FinalizationStatus)
, Request_(std::move(request))
, FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds()))
, MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
, FederatedQuerySetup_(federatedQuerySetup)
: ReplyActor(request->Sender)
, ExecutionId(request->Get()->Description.ExecutionId)
, Database(request->Get()->Description.Database)
, FinalizationStatus(request->Get()->Description.FinalizationStatus)
, Request(std::move(request))
, FinalizationTimeout(TDuration::Seconds(queryServiceConfig.GetFinalizeScriptServiceConfig().GetScriptFinalizationTimeoutSeconds()))
, MaximalSecretsSnapshotWaitTime(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
, FederatedQuerySetup(federatedQuerySetup)
, Compressor(queryServiceConfig.GetQueryArtifactsCompressionMethod(), queryServiceConfig.GetQueryArtifactsCompressionMinSize())
{}

void CompressScriptArtifacts() const {
auto& description = Request->Get()->Description;
auto ast = description.QueryAst;
if (Compressor.IsEnabled() && ast) {
const auto& [astCompressionMethod, astCompressed] = Compressor.Compress(*ast);
description.QueryAstCompressionMethod = astCompressionMethod;
description.QueryAst = astCompressed;
}

if (description.QueryAst && description.QueryAst->size() > NDataShard::NLimits::MaxWriteValueSize) {
NYql::TIssue astTruncatedIssue(TStringBuilder() << "Query ast size is " << description.QueryAst->size() << " bytes, that is larger than allowed limit " << NDataShard::NLimits::MaxWriteValueSize << " bytes, ast was truncated");
astTruncatedIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
description.Issues.AddIssue(astTruncatedIssue);

description.QueryAst = ast->substr(0, NDataShard::NLimits::MaxWriteValueSize - 1_KB) + "...\n(TRUNCATED)";
description.QueryAstCompressionMethod = std::nullopt;
}
}

void Bootstrap() {
Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request_)));
CompressScriptArtifacts();
Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request)));
Become(&TScriptFinalizerActor::FetchState);
}

Expand All @@ -46,17 +70,17 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
return;
}

Schedule(FinalizationTimeout_, new TEvents::TEvWakeup());
Schedule(FinalizationTimeout, new TEvents::TEvWakeup());
Become(&TScriptFinalizerActor::PrepareState);

CustomerSuppliedId_ = ev->Get()->CustomerSuppliedId;
Sinks_ = std::move(ev->Get()->Sinks);
UserToken_ = ev->Get()->UserToken;
SecretNames_ = std::move(ev->Get()->SecretNames);
CustomerSuppliedId = ev->Get()->CustomerSuppliedId;
Sinks = std::move(ev->Get()->Sinks);
UserToken = ev->Get()->UserToken;
SecretNames = std::move(ev->Get()->SecretNames);

if (Sinks_.empty()) {
if (Sinks.empty()) {
FinishScriptFinalization();
} else if (SecretNames_.empty()) {
} else if (SecretNames.empty()) {
ComputeScriptExternalEffect();
} else {
FetchSecrets();
Expand All @@ -75,7 +99,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void FetchSecrets() {
RegisterDescribeSecretsActor(SelfId(), UserToken_, SecretNames_, ActorContext().ActorSystem(), MaximalSecretsSnapshotWaitTime_);
RegisterDescribeSecretsActor(SelfId(), UserToken, SecretNames, ActorContext().ActorSystem(), MaximalSecretsSnapshotWaitTime);
}

void Handle(TEvDescribeSecretsResponse::TPtr& ev) {
Expand All @@ -90,15 +114,15 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
void FillSecureParams(const std::vector<TString>& secretValues) {
std::map<TString, TString> secretsMap;
for (size_t i = 0; i < secretValues.size(); ++i) {
secretsMap.emplace(SecretNames_[i], secretValues[i]);
secretsMap.emplace(SecretNames[i], secretValues[i]);
}

for (const auto& sink : Sinks_) {
auto sinkName = sink.GetSinkName();
for (const auto& sink : Sinks) {
const auto& sinkName = sink.GetSinkName();

if (sinkName) {
const auto& structuredToken = NYql::CreateStructuredTokenParser(sink.GetAuthInfo()).ToBuilder().ReplaceReferences(secretsMap).ToJson();
SecureParams_.emplace(sinkName, structuredToken);
SecureParams.emplace(sinkName, structuredToken);
}
}

Expand All @@ -122,7 +146,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
NYql::NDqProto::TExternalEffect externalEffectS3;
externalEffectS3.SetProviderName(TString(NYql::S3ProviderName));

for (const auto& sink : Sinks_) {
for (const auto& sink : Sinks) {
const TString& sinkType = sink.GetType();

if (sinkType == "S3Sink") {
Expand All @@ -137,20 +161,20 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void RunS3ApplicatorActor(const NYql::NDqProto::TExternalEffect& externalEffect) {
if (!FederatedQuerySetup_) {
if (!FederatedQuerySetup) {
FinishScriptFinalization(Ydb::StatusIds::INTERNAL_ERROR, "unable to aplicate s3 external effect, invalid federated query setup");
return;
}

Register(NYql::NDq::MakeS3ApplicatorActor(
SelfId(),
FederatedQuerySetup_->HttpGateway,
FederatedQuerySetup->HttpGateway,
CreateGuidAsString(),
CustomerSuppliedId_,
CustomerSuppliedId,
std::nullopt,
FinalizationStatus_ == EFinalizationStatus::FS_COMMIT,
THashMap<TString, TString>(SecureParams_.begin(), SecureParams_.end()),
FederatedQuerySetup_->CredentialsFactory,
FinalizationStatus == EFinalizationStatus::FS_COMMIT,
THashMap<TString, TString>(SecureParams.begin(), SecureParams.end()),
FederatedQuerySetup->CredentialsFactory,
externalEffect
).Release());
}
Expand All @@ -172,7 +196,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
)

void FinishScriptFinalization(std::optional<Ydb::StatusIds::StatusCode> status, NYql::TIssues issues) {
Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId_, Database_, status, std::move(issues)));
Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId, Database, status, std::move(issues)));
Become(&TScriptFinalizerActor::FinishState);
}

Expand All @@ -189,38 +213,39 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void Reply(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
Send(ReplyActor_, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues)));
Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_));
Send(ReplyActor, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues)));
Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId));

PassAway();
}

private:
TActorId ReplyActor_;
TString ExecutionId_;
TString Database_;
EFinalizationStatus FinalizationStatus_;
TEvScriptFinalizeRequest::TPtr Request_;

TDuration FinalizationTimeout_;
TDuration MaximalSecretsSnapshotWaitTime_;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup_;

TString CustomerSuppliedId_;
std::vector<NKqpProto::TKqpExternalSink> Sinks_;

TString UserToken_;
std::vector<TString> SecretNames_;
std::unordered_map<TString, TString> SecureParams_;
const TActorId ReplyActor;
const TString ExecutionId;
const TString Database;
const EFinalizationStatus FinalizationStatus;
TEvScriptFinalizeRequest::TPtr Request;

const TDuration FinalizationTimeout;
const TDuration MaximalSecretsSnapshotWaitTime;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
const NFq::TCompressor Compressor;

TString CustomerSuppliedId;
std::vector<NKqpProto::TKqpExternalSink> Sinks;

TString UserToken;
std::vector<TString> SecretNames;
std::unordered_map<TString, TString> SecureParams;
};

} // anonymous namespace

IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup) {
return new TScriptFinalizerActor(std::move(request), finalizeScriptServiceConfig, metadataProviderConfig, federatedQuerySetup);
return new TScriptFinalizerActor(std::move(request), queryServiceConfig, metadataProviderConfig, federatedQuerySetup);
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr::NKqp {

IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ namespace {

class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptService> {
public:
TKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
TKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory)
: FinalizeScriptServiceConfig_(finalizeScriptServiceConfig)
, MetadataProviderConfig_(metadataProviderConfig)
, FederatedQuerySetupFactory_(federatedQuerySetupFactory)
: QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, FederatedQuerySetupFactory(federatedQuerySetupFactory)
{}

void Bootstrap(const TActorContext &ctx) {
FederatedQuerySetup_ = FederatedQuerySetupFactory_->Make(ctx.ActorSystem());
FederatedQuerySetup = FederatedQuerySetupFactory->Make(ctx.ActorSystem());

Become(&TKqpFinalizeScriptService::MainState);
}
Expand All @@ -40,10 +40,10 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
void Handle(TEvScriptFinalizeRequest::TPtr& ev) {
TString executionId = ev->Get()->Description.ExecutionId;

if (!FinalizationRequestsQueue_.contains(executionId)) {
WaitingFinalizationExecutions_.push(executionId);
if (!FinalizationRequestsQueue.contains(executionId)) {
WaitingFinalizationExecutions.push(executionId);
}
FinalizationRequestsQueue_[executionId].emplace_back(std::move(ev));
FinalizationRequestsQueue[executionId].emplace_back(std::move(ev));

TryStartFinalizeRequest();
}
Expand All @@ -60,39 +60,39 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe

private:
void TryStartFinalizeRequest() {
if (FinalizationRequestsInFlight_ >= FinalizeScriptServiceConfig_.GetMaxInFlightFinalizationsCount() || WaitingFinalizationExecutions_.empty()) {
if (FinalizationRequestsInFlight >= QueryServiceConfig.GetFinalizeScriptServiceConfig().GetMaxInFlightFinalizationsCount() || WaitingFinalizationExecutions.empty()) {
return;
}

TString executionId = WaitingFinalizationExecutions_.front();
WaitingFinalizationExecutions_.pop();
TString executionId = WaitingFinalizationExecutions.front();
WaitingFinalizationExecutions.pop();

auto& queue = FinalizationRequestsQueue_[executionId];
auto& queue = FinalizationRequestsQueue[executionId];
Y_ENSURE(!queue.empty());

StartFinalizeRequest(std::move(queue.back()));
queue.pop_back();
}

void StartFinalizeRequest(TEvScriptFinalizeRequest::TPtr request) {
++FinalizationRequestsInFlight_;
++FinalizationRequestsInFlight;

Register(CreateScriptFinalizerActor(
std::move(request),
FinalizeScriptServiceConfig_,
MetadataProviderConfig_,
FederatedQuerySetup_
QueryServiceConfig,
MetadataProviderConfig,
FederatedQuerySetup
));
}

void Handle(TEvScriptFinalizeResponse::TPtr& ev) {
--FinalizationRequestsInFlight_;
--FinalizationRequestsInFlight;
TString executionId = ev->Get()->ExecutionId;

if (!FinalizationRequestsQueue_[executionId].empty()) {
WaitingFinalizationExecutions_.push(executionId);
if (!FinalizationRequestsQueue[executionId].empty()) {
WaitingFinalizationExecutions.push(executionId);
} else {
FinalizationRequestsQueue_.erase(executionId);
FinalizationRequestsQueue.erase(executionId);
}
TryStartFinalizeRequest();
}
Expand Down Expand Up @@ -122,23 +122,23 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
}

private:
NKikimrConfig::TFinalizeScriptServiceConfig FinalizeScriptServiceConfig_;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig_;
const NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
const NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;

IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory_;
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup_;
IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory;
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;

ui32 FinalizationRequestsInFlight_ = 0;
std::queue<TString> WaitingFinalizationExecutions_;
std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue_;
ui32 FinalizationRequestsInFlight = 0;
std::queue<TString> WaitingFinalizationExecutions;
std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue;
};

} // anonymous namespace

IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory) {
return new TKqpFinalizeScriptService(finalizeScriptServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory));
return new TKqpFinalizeScriptService(queryServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory));
}

} // namespace NKikimr::NKqp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace NKikimr::NKqp {

IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory);

Expand Down
Loading

0 comments on commit 9d869e0

Please sign in to comment.