Skip to content

Commit

Permalink
Return relaxed requirements (#4887)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored May 28, 2024
1 parent 3cc510d commit 66cb6b8
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 29 deletions.
6 changes: 4 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
const bool localComputeTasks = !DatashardTxs.empty();
const bool mayRunTasksLocally = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());

Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
Expand All @@ -2486,7 +2487,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = FederatedQuerySetup,
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
.GUCSettings = GUCSettings
.GUCSettings = GUCSettings,
.MayRunTasksLocally = mayRunTasksLocally
});

auto err = Planner->PlanExecution();
Expand Down
27 changes: 25 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
// Task can allocate extra memory during execution.
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8;

TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
: TxId(args.TxId)
Expand All @@ -80,6 +80,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, FederatedQuerySetup(args.FederatedQuerySetup)
, OutputChunkMaxSize(args.OutputChunkMaxSize)
, GUCSettings(std::move(args.GUCSettings))
, MayRunTasksLocally(args.MayRunTasksLocally)
{
if (!Database) {
// a piece of magic for tests
Expand Down Expand Up @@ -255,6 +256,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
return nullptr;
}

if (ResourcesSnapshot.empty()) {
ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources());
}

if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
// try to run without memory overflow settings
if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
Expand Down Expand Up @@ -407,6 +412,8 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {

nComputeTasks = ComputeTasks.size();

// explicit requirement to execute task on the same node because it has dependencies
// on datashard tx.
if (LocalComputeTasks) {
bool shareMailbox = (ComputeTasks.size() <= 1);
for (ui64 taskId : ComputeTasks) {
Expand All @@ -429,7 +436,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
PendingComputeTasks.insert(taskId);
}

for (auto& [shardId, tasks] : TasksPerNode) {
for (auto& [nodeId, tasks] : TasksPerNode) {
for (ui64 taskId : tasks) {
PendingComputeTasks.insert(taskId);
}
Expand All @@ -440,7 +447,23 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
return err;
}

if (MayRunTasksLocally) {
// temporary flag until common ca factory is implemented.
auto tasksOnNodeIt = TasksPerNode.find(ExecuterId.NodeId());
if (tasksOnNodeIt != TasksPerNode.end()) {
auto& tasks = tasksOnNodeIt->second;
const bool shareMailbox = (tasks.size() <= 1);
for (ui64 taskId: tasks) {
ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true);
PendingComputeTasks.erase(taskId);
}
}
}

for(auto& [nodeId, tasks] : TasksPerNode) {
if (MayRunTasksLocally && ExecuterId.NodeId() == nodeId)
continue;

SortUnique(tasks);
auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
request.SerializedRequest = SerializeRequest(request);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TKqpPlanner {
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
const ui64 OutputChunkMaxSize = 0;
const TGUCSettings::TPtr GUCSettings;
const bool MayRunTasksLocally = false;
};

TKqpPlanner(TKqpPlanner::TArgs&& args);
Expand Down Expand Up @@ -104,7 +105,7 @@ class TKqpPlanner {
const bool WithSpilling;
const TMaybe<NKikimrKqp::TRlPath> RlPath;
THashSet<ui32> TrackingNodes;
const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
NWilson::TSpan& ExecuterSpan;
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
ui64 LocalRunMemoryEst = 0;
Expand All @@ -126,6 +127,7 @@ class TKqpPlanner {
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const ui64 OutputChunkMaxSize;
const TGUCSettings::TPtr GUCSettings;
const bool MayRunTasksLocally;

public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,23 @@ class TKqpResourceManager : public IKqpResourceManager {
FireResourcesPublishing();
}

TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
TVector<NKikimrKqp::TKqpNodeResources> resources;
Y_ABORT_UNLESS(PublishResourcesByExchanger);

if (PublishResourcesByExchanger) {
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
if (infos != nullptr) {
resources = *infos;
}
}

return resources;
}

void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_AS_D("Schedule Snapshot request");
if (PublishResourcesByExchanger) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class IKqpResourceManager : private TNonCopyable {

virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;

virtual TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const = 0;
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;

Expand Down
48 changes: 24 additions & 24 deletions ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
auto [runtime, server, sender] = TestCreateServer();

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
Expand Down Expand Up @@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
CheckTxHasDatashardUnits(propose, 3);

auto progress = tabletTxs[1];
CheckTxHasWriteLog(progress);
CheckTxHasWriteLog(progress);
CheckTxHasDatashardUnits(progress, usesVolatileTxs ? 6 : 11);
}

Expand Down Expand Up @@ -166,12 +166,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
Y_UNIT_TEST(TestTraceDistributedSelect) {
auto [runtime, server, sender] = TestCreateServer();
bool bTreeIndex = runtime.GetAppData().FeatureFlags.GetEnableLocalDBBtreeIndex();

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

SplitTable(runtime, server, 5);
Expand Down Expand Up @@ -230,27 +230,27 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor), (RunTasks)])])";

if (bTreeIndex) { // no index nodes (levels = 0)
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])";
}

} else {
Expand Down Expand Up @@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
}

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
Expand All @@ -290,10 +290,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])])])])])";
"[(Tablet.WriteLog.LogEntry)])])]) , (ComputeActor)])])";
}


UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Expand Down Expand Up @@ -345,13 +345,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , "
"(ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])])])])])])])";
"[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Expand All @@ -363,7 +363,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {

TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);
Expand All @@ -380,16 +380,16 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(1, uploader->Traces.size());

TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second;

auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction");
UNIT_ASSERT(wtSpan);

auto tabletTxs = wtSpan->get().FindAll("Tablet.Transaction");
UNIT_ASSERT_VALUES_EQUAL(1, tabletTxs.size());
auto writeTx = tabletTxs[0];

CheckTxHasWriteLog(writeTx);
CheckTxHasDatashardUnits(writeTx, 5);
CheckTxHasWriteLog(writeTx);
CheckTxHasDatashardUnits(writeTx, 5);

std::string canon = "(Datashard.WriteTransaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
Expand Down

0 comments on commit 66cb6b8

Please sign in to comment.