Skip to content

Commit

Permalink
return relax compute actors requirement
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed May 27, 2024
1 parent a356b24 commit dd57ad1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2463,6 +2463,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
const bool localComputeTasks = !DatashardTxs.empty();
const bool mayRunTasksLocally = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());

Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
Expand All @@ -2486,7 +2487,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = FederatedQuerySetup,
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
.GUCSettings = GUCSettings
.GUCSettings = GUCSettings,
.MayRunTasksLocally = mayRunTasksLocally
});

auto err = Planner->PlanExecution();
Expand Down
21 changes: 19 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
// Task can allocate extra memory during execution.
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8;

TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
: TxId(args.TxId)
Expand All @@ -80,6 +80,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, FederatedQuerySetup(args.FederatedQuerySetup)
, OutputChunkMaxSize(args.OutputChunkMaxSize)
, GUCSettings(std::move(args.GUCSettings))
, MayRunTasksLocally(args.MayRunTasksLocally)
{
if (!Database) {
// a piece of magic for tests
Expand Down Expand Up @@ -435,7 +436,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
PendingComputeTasks.insert(taskId);
}

for (auto& [shardId, tasks] : TasksPerNode) {
for (auto& [nodeId, tasks] : TasksPerNode) {
for (ui64 taskId : tasks) {
PendingComputeTasks.insert(taskId);
}
Expand All @@ -446,7 +447,23 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
return err;
}

if (MayRunTasksLocally) {
// temporary flag until common ca factory is implemented.
auto tasksOnNodeIt = TasksPerNode.find(ExecuterId.NodeId());
if (tasksOnNodeIt != TasksPerNode.end()) {
auto& tasks = tasksOnNodeIt->second;
const bool shareMailbox = (tasks.size() <= 1);
for (ui64 taskId: tasks) {
ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true);
PendingComputeTasks.erase(taskId);
}
}
}

for(auto& [nodeId, tasks] : TasksPerNode) {
if (MayRunTasksLocally && ExecuterId.NodeId() == nodeId)
continue;

SortUnique(tasks);
auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
request.SerializedRequest = SerializeRequest(request);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TKqpPlanner {
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
const ui64 OutputChunkMaxSize = 0;
const TGUCSettings::TPtr GUCSettings;
const bool MayRunTasksLocally = false;
};

TKqpPlanner(TKqpPlanner::TArgs&& args);
Expand Down Expand Up @@ -126,6 +127,7 @@ class TKqpPlanner {
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const ui64 OutputChunkMaxSize;
const TGUCSettings::TPtr GUCSettings;
const bool MayRunTasksLocally;

public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,26 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor), (RunTasks)])])";

if (bTreeIndex) { // no index nodes (levels = 0)
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])";
}

} else {
Expand Down Expand Up @@ -346,12 +346,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
"[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Expand Down

0 comments on commit dd57ad1

Please sign in to comment.