Skip to content

Commit

Permalink
YQ-3346 improve secrets fetching (ydb-platform#5850)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 24, 2024
1 parent 193ba44 commit 48f5f33
Show file tree
Hide file tree
Showing 24 changed files with 68 additions and 122 deletions.
4 changes: 2 additions & 2 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2130,15 +2130,15 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
auto federatedQuerySetupFactory = NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config);

auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(),
Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
Config.GetQueryServiceConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
federatedQuerySetupFactory
);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpProxyID(NodeId),
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));

// Create finalize script service
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig(), federatedQuerySetupFactory);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));
Expand Down
8 changes: 2 additions & 6 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& tableServiceConfig,
const TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& queryId,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
Expand All @@ -64,7 +63,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, CompilationTimeout(TDuration::MilliSeconds(tableServiceConfig.GetCompileTimeoutMs()))
, UserRequestContext(userRequestContext)
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
Expand Down Expand Up @@ -176,7 +174,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);
Expand Down Expand Up @@ -463,7 +461,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr DbCounters;
TKikimrConfiguration::TPtr Config;
TQueryServiceConfig QueryServiceConfig;
TMetadataProviderConfig MetadataProviderConfig;
TDuration CompilationTimeout;
TInstant StartTime;
TDuration CompileCpuTime;
Expand Down Expand Up @@ -523,15 +520,14 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const TTableServiceConfig& tableServiceConfig,
const TQueryServiceConfig& queryServiceConfig,
const TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics)
{
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig,
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig,
moduleResolverState, counters,
uid, query, userToken, dbCounters,
federatedQuerySetup, userRequestContext,
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,13 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

TKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
const TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
)
: TableServiceConfig(tableServiceConfig)
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpSettings(kqpSettings)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
Expand Down Expand Up @@ -962,7 +961,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, MetadataProviderConfig, ModuleResolverState, Counters,
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, ModuleResolverState, Counters,
request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.UserRequestContext,
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.Action, std::move(request.AstResult), CollectDiagnostics);
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
Expand Down Expand Up @@ -1044,7 +1043,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
private:
TTableServiceConfig TableServiceConfig;
TQueryServiceConfig QueryServiceConfig;
TMetadataProviderConfig MetadataProviderConfig;
TKqpSettings::TConstPtr KqpSettings;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TIntrusivePtr<TKqpCounters> Counters;
Expand All @@ -1059,13 +1057,12 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
};

IActor* CreateKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
const TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
)
{
return new TKqpCompileService(tableServiceConfig, queryServiceConfig, metadataProviderConfig, kqpSettings, moduleResolverState, counters,
return new TKqpCompileService(tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters,
std::move(queryReplayFactory), federatedQuerySetup);
}

Expand Down
2 changes: 0 additions & 2 deletions ydb/core/kqp/compile_service/kqp_compile_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ enum class ECompileActorAction {

IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
Expand All @@ -27,7 +26,6 @@ IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableSer
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const TString& uid, const TKqpQueryId& query,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
)
, AsyncIoFactory(std::move(asyncIoFactory))
, EnableOlapSink(enableOlapSink)
Expand Down Expand Up @@ -2442,10 +2442,10 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool enableOlapSink)
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool enableOlapSink)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext, enableOlapSink);
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink);

IActor* CreateKqpSchemeExecuter(
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback);
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, userRequestContext, enableOlapSink);
}

TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
Expand All @@ -107,13 +107,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, userRequestContext, enableOlapSink);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext);
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, userRequestContext);

case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, userRequestContext, enableOlapSink);

default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false)
: Request(std::move(request))
, Database(database)
Expand All @@ -135,7 +135,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
, AggregationSettings(aggregation)
, HasOlapTable(false)
, StreamResult(streamResult)
Expand Down Expand Up @@ -1598,7 +1597,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

void GetSecretsSnapshot() {
RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext().ActorSystem(), MaximalSecretsSnapshotWaitTime);
RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext().ActorSystem());
}

void GetResourcesSnapshot() {
Expand Down Expand Up @@ -1924,7 +1923,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

std::vector<TString> SecretNames;
std::map<TString, TString> SecureParams;
TDuration MaximalSecretsSnapshotWaitTime;

const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
Expand All @@ -1950,15 +1948,15 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink);

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
const TIntrusivePtr<TUserRequestContext>& userRequestContext);

} // namespace NKqp
} // namespace NKikimr
Loading

0 comments on commit 48f5f33

Please sign in to comment.