From dd57ad18a45fc881a4e3f82003236fff5f13e3f5 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Mon, 27 May 2024 14:53:38 +0300 Subject: [PATCH] return relax compute actors requirement --- .../kqp/executer_actor/kqp_data_executer.cpp | 4 +++- ydb/core/kqp/executer_actor/kqp_planner.cpp | 21 +++++++++++++++++-- ydb/core/kqp/executer_actor/kqp_planner.h | 2 ++ ydb/core/tx/datashard/datashard_ut_trace.cpp | 12 +++++------ 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 3d50dde944d2..34966c6b67db 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2463,6 +2463,7 @@ class TKqpDataExecuter : public TKqpExecuterBasePlanExecution(); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 7c07e4bced68..4b00d43ddd93 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -55,7 +55,7 @@ bool TKqpPlanner::UseMockEmptyPlanner = false; // Task can allocate extra memory during execution. // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; -constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; +constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8; TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) : TxId(args.TxId) @@ -80,6 +80,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) , FederatedQuerySetup(args.FederatedQuerySetup) , OutputChunkMaxSize(args.OutputChunkMaxSize) , GUCSettings(std::move(args.GUCSettings)) + , MayRunTasksLocally(args.MayRunTasksLocally) { if (!Database) { // a piece of magic for tests @@ -435,7 +436,7 @@ std::unique_ptr TKqpPlanner::PlanExecution() { PendingComputeTasks.insert(taskId); } - for (auto& [shardId, tasks] : TasksPerNode) { + for (auto& [nodeId, tasks] : TasksPerNode) { for (ui64 taskId : tasks) { PendingComputeTasks.insert(taskId); } @@ -446,7 +447,23 @@ std::unique_ptr TKqpPlanner::PlanExecution() { return err; } + if (MayRunTasksLocally) { + // temporary flag until common ca factory is implemented. + auto tasksOnNodeIt = TasksPerNode.find(ExecuterId.NodeId()); + if (tasksOnNodeIt != TasksPerNode.end()) { + auto& tasks = tasksOnNodeIt->second; + const bool shareMailbox = (tasks.size() <= 1); + for (ui64 taskId: tasks) { + ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true); + PendingComputeTasks.erase(taskId); + } + } + } + for(auto& [nodeId, tasks] : TasksPerNode) { + if (MayRunTasksLocally && ExecuterId.NodeId() == nodeId) + continue; + SortUnique(tasks); auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); request.SerializedRequest = SerializeRequest(request); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 29507ae895fd..01efd3e79454 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -63,6 +63,7 @@ class TKqpPlanner { const std::optional& FederatedQuerySetup; const ui64 OutputChunkMaxSize = 0; const TGUCSettings::TPtr GUCSettings; + const bool MayRunTasksLocally = false; }; TKqpPlanner(TKqpPlanner::TArgs&& args); @@ -126,6 +127,7 @@ class TKqpPlanner { const std::optional FederatedQuerySetup; const ui64 OutputChunkMaxSize; const TGUCSettings::TPtr GUCSettings; + const bool MayRunTasksLocally; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 13a8df250854..029f78b4deaa 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -231,7 +231,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) " - ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " + ", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " @@ -239,18 +239,18 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])"; + "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor), (RunTasks)])])"; if (bTreeIndex) { // no index nodes (levels = 0) canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) " - ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " + ", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction " "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])"; + "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])"; } } else { @@ -346,12 +346,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , " "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , " - "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " + "(ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " "(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])" "]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])"; + "[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])"; UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); }