diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index c8e9bce63a59..34966c6b67db 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2462,7 +2462,8 @@ class TKqpDataExecuter : public TKqpExecuterBasePlanExecution(); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 627f6eb4c590..4b00d43ddd93 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -55,7 +55,7 @@ bool TKqpPlanner::UseMockEmptyPlanner = false; // Task can allocate extra memory during execution. // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; -constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; +constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8; TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) : TxId(args.TxId) @@ -80,6 +80,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) , FederatedQuerySetup(args.FederatedQuerySetup) , OutputChunkMaxSize(args.OutputChunkMaxSize) , GUCSettings(std::move(args.GUCSettings)) + , MayRunTasksLocally(args.MayRunTasksLocally) { if (!Database) { // a piece of magic for tests @@ -255,6 +256,10 @@ std::unique_ptr TKqpPlanner::AssignTasksToNodes() { return nullptr; } + if (ResourcesSnapshot.empty()) { + ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources()); + } + if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) { // try to run without memory overflow settings if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && @@ -407,6 +412,8 @@ std::unique_ptr TKqpPlanner::PlanExecution() { nComputeTasks = ComputeTasks.size(); + // explicit requirement to execute task on the same node because it has dependencies + // on datashard tx. if (LocalComputeTasks) { bool shareMailbox = (ComputeTasks.size() <= 1); for (ui64 taskId : ComputeTasks) { @@ -429,7 +436,7 @@ std::unique_ptr TKqpPlanner::PlanExecution() { PendingComputeTasks.insert(taskId); } - for (auto& [shardId, tasks] : TasksPerNode) { + for (auto& [nodeId, tasks] : TasksPerNode) { for (ui64 taskId : tasks) { PendingComputeTasks.insert(taskId); } @@ -440,7 +447,23 @@ std::unique_ptr TKqpPlanner::PlanExecution() { return err; } + if (MayRunTasksLocally) { + // temporary flag until common ca factory is implemented. + auto tasksOnNodeIt = TasksPerNode.find(ExecuterId.NodeId()); + if (tasksOnNodeIt != TasksPerNode.end()) { + auto& tasks = tasksOnNodeIt->second; + const bool shareMailbox = (tasks.size() <= 1); + for (ui64 taskId: tasks) { + ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true); + PendingComputeTasks.erase(taskId); + } + } + } + for(auto& [nodeId, tasks] : TasksPerNode) { + if (MayRunTasksLocally && ExecuterId.NodeId() == nodeId) + continue; + SortUnique(tasks); auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); request.SerializedRequest = SerializeRequest(request); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index fabda6bf1bf9..01efd3e79454 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -63,6 +63,7 @@ class TKqpPlanner { const std::optional& FederatedQuerySetup; const ui64 OutputChunkMaxSize = 0; const TGUCSettings::TPtr GUCSettings; + const bool MayRunTasksLocally = false; }; TKqpPlanner(TKqpPlanner::TArgs&& args); @@ -104,7 +105,7 @@ class TKqpPlanner { const bool WithSpilling; const TMaybe RlPath; THashSet TrackingNodes; - const TVector ResourcesSnapshot; + TVector ResourcesSnapshot; NWilson::TSpan& ExecuterSpan; const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig; ui64 LocalRunMemoryEst = 0; @@ -126,6 +127,7 @@ class TKqpPlanner { const std::optional FederatedQuerySetup; const ui64 OutputChunkMaxSize; const TGUCSettings::TPtr GUCSettings; + const bool MayRunTasksLocally; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 56078128bd66..3d793b683525 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -475,6 +475,23 @@ class TKqpResourceManager : public IKqpResourceManager { FireResourcesPublishing(); } + TVector GetClusterResources() const override { + TVector resources; + Y_ABORT_UNLESS(PublishResourcesByExchanger); + + if (PublishResourcesByExchanger) { + std::shared_ptr> infos; + with_lock (ResourceSnapshotState->Lock) { + infos = ResourceSnapshotState->Snapshot; + } + if (infos != nullptr) { + resources = *infos; + } + } + + return resources; + } + void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_AS_D("Schedule Snapshot request"); if (PublishResourcesByExchanger) { diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index bc2039927366..4f5a83a5377a 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -92,6 +92,7 @@ class IKqpResourceManager : private TNonCopyable { virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0; + virtual TVector GetClusterResources() const = 0; virtual TKqpLocalNodeResources GetLocalResources() const = 0; virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0; diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 272928a4eaad..029f78b4deaa 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -87,10 +87,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { auto [runtime, server, sender] = TestCreateServer(); CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); - runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions(); @@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { CheckTxHasDatashardUnits(propose, 3); auto progress = tabletTxs[1]; - CheckTxHasWriteLog(progress); + CheckTxHasWriteLog(progress); CheckTxHasDatashardUnits(progress, usesVolatileTxs ? 6 : 11); } @@ -166,12 +166,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { Y_UNIT_TEST(TestTraceDistributedSelect) { auto [runtime, server, sender] = TestCreateServer(); bool bTreeIndex = runtime.GetAppData().FeatureFlags.GetEnableLocalDBBtreeIndex(); - + CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); - runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); SplitTable(runtime, server, 5); @@ -230,8 +230,8 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2); canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " - ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) " - ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " + ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) " + ", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " @@ -239,18 +239,18 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])])])])])])"; + "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor), (RunTasks)])])"; if (bTreeIndex) { // no index nodes (levels = 0) canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " - ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) " - ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " + ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) " + ", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction " "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])])])])])])"; + "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])"; } } else { @@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { } canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " - ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , " + ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , " "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " @@ -290,10 +290,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])])])])"; + "[(Tablet.WriteLog.LogEntry)])])]) , (ComputeActor)])])"; } - - + + UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } @@ -345,13 +345,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2); std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , " - "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , " - "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " + "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , " + "(ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " "(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])" "]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])])])])])])"; + "[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])"; UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } @@ -363,7 +363,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); - runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095); @@ -380,16 +380,16 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(1, uploader->Traces.size()); TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; - + auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction"); UNIT_ASSERT(wtSpan); - + auto tabletTxs = wtSpan->get().FindAll("Tablet.Transaction"); UNIT_ASSERT_VALUES_EQUAL(1, tabletTxs.size()); auto writeTx = tabletTxs[0]; - CheckTxHasWriteLog(writeTx); - CheckTxHasDatashardUnits(writeTx, 5); + CheckTxHasWriteLog(writeTx); + CheckTxHasDatashardUnits(writeTx, 5); std::string canon = "(Datashard.WriteTransaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "