Skip to content

Commit

Permalink
Use designated initializers to pass arguments in to KQP planer
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Apr 2, 2024
1 parent 4ddc2cb commit f9e99e5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 67 deletions.
25 changes: 22 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2398,9 +2398,28 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());

Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, useDataQueryPool, localComputeTasks, Request.MkqlMemoryLimit, AsyncIoFactory, singlePartitionOptAllowed, GetUserRequestContext(), FederatedQuerySetup);
Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
.TxId = TxId,
.Executer = SelfId(),
.Snapshot = GetSnapshot(),
.Database = Database,
.UserToken = UserToken,
.Deadline = Deadline.GetOrElse(TInstant::Zero()),
.StatsMode = Request.StatsMode,
.WithSpilling = false,
.RlPath = Nothing(),
.ExecuterSpan = ExecuterSpan,
.ResourcesSnapshot = std::move(ResourceSnapshot),
.ExecuterRetriesConfig = ExecuterRetriesConfig,
.UseDataQueryPool = useDataQueryPool,
.LocalComputeTasks = localComputeTasks,
.MkqlMemoryLimit = Request.MkqlMemoryLimit,
.AsyncIoFactory = AsyncIoFactory,
.AllowSinglePartitionOpt = singlePartitionOptAllowed,
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = FederatedQuerySetup
});

auto err = Planner->PlanExecution();
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "kqp_table_resolver.h"
#include "kqp_shards_resolver.h"


#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/runtime/kqp_transport.h>
Expand All @@ -26,6 +25,7 @@
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/grpc_services/local_rate_limiter.h>

#include <ydb/services/metadata/secret/fetcher.h>
Expand Down
64 changes: 23 additions & 41 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,27 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;

TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
bool allowSinglePartitionOpt, const TIntrusivePtr<TUserRequestContext>& userRequestContext, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
: TxId(txId)
, ExecuterId(executer)
, Snapshot(snapshot)
, Database(database)
, UserToken(userToken)
, Deadline(deadline)
, StatsMode(statsMode)
, WithSpilling(withSpilling)
, RlPath(rlPath)
, ResourcesSnapshot(std::move(resourcesSnapshot))
, ExecuterSpan(executerSpan)
, ExecuterRetriesConfig(executerRetriesConfig)
, TasksGraph(graph)
, UseDataQueryPool(useDataQueryPool)
, LocalComputeTasks(localComputeTasks)
, MkqlMemoryLimit(mkqlMemoryLimit)
, AsyncIoFactory(asyncIoFactory)
, AllowSinglePartitionOpt(allowSinglePartitionOpt)
, UserRequestContext(userRequestContext)
, FederatedQuerySetup(federatedQuerySetup)
TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
: TxId(args.TxId)
, ExecuterId(args.Executer)
, Snapshot(args.Snapshot)
, Database(args.Database)
, UserToken(args.UserToken)
, Deadline(args.Deadline)
, StatsMode(args.StatsMode)
, WithSpilling(args.WithSpilling)
, RlPath(args.RlPath)
, ResourcesSnapshot(std::move(args.ResourcesSnapshot))
, ExecuterSpan(args.ExecuterSpan)
, ExecuterRetriesConfig(args.ExecuterRetriesConfig)
, TasksGraph(args.TasksGraph)
, UseDataQueryPool(args.UseDataQueryPool)
, LocalComputeTasks(args.LocalComputeTasks)
, MkqlMemoryLimit(args.MkqlMemoryLimit)
, AsyncIoFactory(args.AsyncIoFactory)
, AllowSinglePartitionOpt(args.AllowSinglePartitionOpt)
, UserRequestContext(args.UserRequestContext)
, FederatedQuerySetup(args.FederatedQuerySetup)
{
if (!Database) {
// a piece of magic for tests
Expand Down Expand Up @@ -516,19 +509,8 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
{
return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, snapshot,
database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan,
std::move(resourcesSnapshot), executerRetriesConfig, useDataQueryPool,
localComputeTasks, mkqlMemoryLimit, asyncIoFactory, doOptimization, userRequestContext, federatedQuerySetup);
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpPlanner::TArgs args) {
return std::make_unique<TKqpPlanner>(std::move(args));
}

} // namespace NKikimr::NKqp
42 changes: 24 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>

#include <ydb/library/actors/core/actor.h>
Expand Down Expand Up @@ -41,14 +40,30 @@ class TKqpPlanner {
using TLogFunc = std::function<void(TStringBuf message)>;

public:
TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool allowSinglePartitionOpt,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
struct TArgs {
TKqpTasksGraph& TasksGraph;
const ui64 TxId;
const TActorId& Executer;
const IKqpGateway::TKqpSnapshot& Snapshot;
const TString& Database;
const TIntrusiveConstPtr<NACLib::TUserToken>& UserToken;
const TInstant Deadline;
const Ydb::Table::QueryStatsCollection::Mode& StatsMode;
const bool WithSpilling;
const TMaybe<NKikimrKqp::TRlPath>& RlPath;
NWilson::TSpan& ExecuterSpan;
TVector<NKikimrKqp::TKqpNodeResources>&& ResourcesSnapshot;
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
const bool UseDataQueryPool;
const bool LocalComputeTasks;
const ui64 MkqlMemoryLimit;
const NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const bool AllowSinglePartitionOpt;
const TIntrusivePtr<TUserRequestContext>& UserRequestContext;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
};

TKqpPlanner(TKqpPlanner::TArgs&& args);
bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target);
std::unique_ptr<IEventHandle> PlanExecution();
std::unique_ptr<IEventHandle> AssignTasksToNodes();
Expand Down Expand Up @@ -112,15 +127,6 @@ class TKqpPlanner {
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
};

std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig,
bool useDataQueryPool, bool localComputeTasks,
ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpPlanner::TArgs args);

} // namespace NKikimr::NKqp
26 changes: 22 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,28 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
private:
void ExecuteScanTx() {

Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
Request.RlPath, ExecuterSpan, std::move(ResourcesSnapshot), ExecuterRetriesConfig, /* useDataQueryPool */ false, /* localComputeTasks */ false,
Request.MkqlMemoryLimit, nullptr, false, GetUserRequestContext(), std::nullopt);
Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
.TxId = TxId,
.Executer = SelfId(),
.Snapshot = GetSnapshot(),
.Database = Database,
.UserToken = UserToken,
.Deadline = Deadline.GetOrElse(TInstant::Zero()),
.StatsMode = Request.StatsMode,
.WithSpilling = AppData()->EnableKqpSpilling,
.RlPath = Request.RlPath,
.ExecuterSpan = ExecuterSpan,
.ResourcesSnapshot = std::move(ResourcesSnapshot),
.ExecuterRetriesConfig = ExecuterRetriesConfig,
.UseDataQueryPool = false,
.LocalComputeTasks = false,
.MkqlMemoryLimit = Request.MkqlMemoryLimit,
.AsyncIoFactory = nullptr,
.AllowSinglePartitionOpt = false,
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = std::nullopt
});

LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
auto err = Planner->PlanExecution();
Expand Down

0 comments on commit f9e99e5

Please sign in to comment.