Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CMS request priorities KIKIMR-9024 #1620

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/cms/api_adapters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class TCreateMaintenanceTask: public TPermissionResponseProcessor<
cmsRequest.SetAvailabilityMode(ConvertAvailabilityMode(opts.availability_mode()));
cmsRequest.SetPartialPermissionAllowed(true);
cmsRequest.SetSchedule(true);
cmsRequest.SetPriority(opts.priority());

for (const auto& group : request.action_groups()) {
Y_ABORT_UNLESS(group.actions().size() == 1);
Expand Down Expand Up @@ -559,7 +560,8 @@ class TGetMaintenanceTask: public TAdapterActor<
opts.set_task_uid(taskUid);
opts.set_description(request.GetReason());
opts.set_availability_mode(ConvertAvailabilityMode(request.GetAvailabilityMode()));

opts.set_priority(request.GetPriority());

// pending actions
for (const auto& action : request.GetActions()) {
ConvertAction(action, *result.add_action_group_states()->add_action_states());
Expand Down
65 changes: 41 additions & 24 deletions ydb/core/cms/cluster_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ bool TLockableItem::IsLocked(TErrorInfo &error, TDuration defaultRetryTime,
return true;
}

if (!ScheduledLocks.empty() && ScheduledLocks.begin()->Order < DeactivatedLocksOrder) {
if (!ScheduledLocks.empty() && ScheduledLocks.begin()->Priority < DeactivatedLocksPriority) {
error.Code = TStatus::DISALLOW_TEMP;
error.Reason = Sprintf("%s has scheduled action %s owned by %s (order %" PRIu64 " vs %" PRIu64 ")",
error.Reason = Sprintf("%s has scheduled action %s owned by %s (priority %" PRIi32 " vs %" PRIi32 ")",
PrettyItemName().data(), ScheduledLocks.begin()->RequestId.data(),
ScheduledLocks.begin()->Owner.data(), ScheduledLocks.begin()->Order,
DeactivatedLocksOrder);
ScheduledLocks.begin()->Owner.data(), ScheduledLocks.begin()->Priority,
DeactivatedLocksPriority);
error.Deadline = now + defaultRetryTime;
return true;
}
Expand Down Expand Up @@ -113,12 +113,12 @@ void TLockableItem::RollbackLocks(ui64 point)

void TLockableItem::ReactivateScheduledLocks()
{
DeactivatedLocksOrder = Max<ui64>();
DeactivatedLocksPriority = Max<i32>();
}

void TLockableItem::DeactivateScheduledLocks(ui64 order)
void TLockableItem::DeactivateScheduledLocks(i32 priority)
{
DeactivatedLocksOrder = order;
DeactivatedLocksPriority = priority;
}

void TLockableItem::RemoveScheduledLocks(const TString &requestId)
Expand Down Expand Up @@ -650,21 +650,30 @@ void TClusterInfo::ApplyActionWithoutLog(const NKikimrCms::TAction &action)
case TAction::REBOOT_HOST:
if (auto nodes = NodePtrs(action.GetHost(), MakeServices(action))) {
for (const auto node : nodes) {
for (auto &nodeGroup: node->NodeGroups)
nodeGroup->LockNode(node->NodeId);
for (auto &nodeGroup: node->NodeGroups) {
if (!nodeGroup->IsNodeLocked(node->NodeId)) {
nodeGroup->LockNode(node->NodeId);
}
}
}
}
break;
case TAction::REPLACE_DEVICES:
for (const auto &device : action.GetDevices()) {
if (HasPDisk(device)) {
auto pdisk = &PDiskRef(device);
for (auto &nodeGroup: NodeRef(pdisk->NodeId).NodeGroups)
nodeGroup->LockNode(pdisk->NodeId);
for (auto &nodeGroup: NodeRef(pdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(pdisk->NodeId)) {
nodeGroup->LockNode(pdisk->NodeId);
}
}
} else if (HasVDisk(device)) {
auto vdisk = &VDiskRef(device);
for (auto &nodeGroup: NodeRef(vdisk->NodeId).NodeGroups)
nodeGroup->LockNode(vdisk->NodeId);
for (auto &nodeGroup: NodeRef(vdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(vdisk->NodeId)) {
nodeGroup->LockNode(vdisk->NodeId);
}
}
}
}
break;
Expand Down Expand Up @@ -756,7 +765,7 @@ ui64 TClusterInfo::AddLocks(const TPermissionInfo &permission, const TActorConte
|| permission.Action.GetType() == TAction::REBOOT_HOST
|| permission.Action.GetType() == TAction::REPLACE_DEVICES)) {
item->State = RESTART;
lock = true;;
lock = true;
}

if (lock) {
Expand Down Expand Up @@ -854,7 +863,7 @@ ui64 TClusterInfo::ScheduleActions(const TRequestInfo &request, const TActorCont
auto items = FindLockedItems(action, ctx);

for (auto item : items)
item->ScheduleLock({action, request.Owner, request.RequestId, request.Order});
item->ScheduleLock({action, request.Owner, request.RequestId, request.Priority});

locks += items.size();
}
Expand All @@ -868,10 +877,10 @@ void TClusterInfo::UnscheduleActions(const TString &requestId)
entry.second->RemoveScheduledLocks(requestId);
}

void TClusterInfo::DeactivateScheduledLocks(ui64 order)
void TClusterInfo::DeactivateScheduledLocks(i32 priority)
{
for (auto &entry : LockableItems)
entry.second->DeactivateScheduledLocks(order);
entry.second->DeactivateScheduledLocks(priority);
}

void TClusterInfo::ReactivateScheduledLocks()
Expand Down Expand Up @@ -1020,22 +1029,30 @@ void TOperationLogManager::ApplyAction(const NKikimrCms::TAction &action,
case NKikimrCms::TAction::REBOOT_HOST:
if (auto nodes = clusterState->NodePtrs(action.GetHost(), MakeServices(action))) {
for (const auto node : nodes) {
for (auto &nodeGroup: node->NodeGroups)
AddNodeLockOperation(node->NodeId, nodeGroup);
for (auto &nodeGroup: node->NodeGroups) {
if (!nodeGroup->IsNodeLocked(node->NodeId)) {
AddNodeLockOperation(node->NodeId, nodeGroup);
}
}
}
}
break;
case NKikimrCms::TAction::REPLACE_DEVICES:
for (const auto &device : action.GetDevices()) {
if (clusterState->HasPDisk(device)) {
auto pdisk = &clusterState->PDisk(device);
for (auto &nodeGroup: clusterState->NodeRef(pdisk->NodeId).NodeGroups)
AddNodeLockOperation(pdisk->NodeId, nodeGroup);

for (auto &nodeGroup: clusterState->NodeRef(pdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(pdisk->NodeId)) {
AddNodeLockOperation(pdisk->NodeId, nodeGroup);
}
}
} else if (clusterState->HasVDisk(device)) {
auto vdisk = &clusterState->VDisk(device);
for (auto &nodeGroup: clusterState->NodeRef(vdisk->NodeId).NodeGroups)
AddNodeLockOperation(vdisk->NodeId, nodeGroup);
for (auto &nodeGroup: clusterState->NodeRef(vdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(vdisk->NodeId)) {
AddNodeLockOperation(vdisk->NodeId, nodeGroup);
}
}
}
}
break;
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/cms/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ struct TRequestInfo {
request.SetPartialPermissionAllowed(Request.GetPartialPermissionAllowed());
request.SetReason(Request.GetReason());
request.SetAvailabilityMode(Request.GetAvailabilityMode());
request.SetPriority(Priority);
}

TString RequestId;
TString Owner;
ui64 Order = 0;
i32 Priority = 0;
NKikimrCms::TPermissionRequest Request;
};

Expand Down Expand Up @@ -203,10 +205,10 @@ class TLockableItem : public TThrRefBase {
};

struct TScheduledLock : TBaseLock {
TScheduledLock(const NKikimrCms::TAction &action, const TString &owner, const TString &requestId, ui64 order)
TScheduledLock(const NKikimrCms::TAction &action, const TString &owner, const TString &requestId, i32 priority)
: TBaseLock(owner, action)
, RequestId(requestId)
, Order(order)
, Priority(priority)
{
}

Expand All @@ -217,7 +219,7 @@ class TLockableItem : public TThrRefBase {
TScheduledLock &operator=(TScheduledLock &&other) = default;

TString RequestId;
ui64 Order = 0;
i32 Priority = 0;
};

struct TTemporaryLock : TBaseLock {
Expand Down Expand Up @@ -268,7 +270,7 @@ class TLockableItem : public TThrRefBase {

void ScheduleLock(TScheduledLock &&lock) {
auto pos = LowerBound(ScheduledLocks.begin(), ScheduledLocks.end(), lock, [](auto &l, auto &r) {
return l.Order < r.Order;
return l.Priority < r.Priority;
});
ScheduledLocks.insert(pos, lock);
}
Expand All @@ -278,7 +280,7 @@ class TLockableItem : public TThrRefBase {

void RollbackLocks(ui64 point);

void DeactivateScheduledLocks(ui64 order);
void DeactivateScheduledLocks(i32 priority);
void ReactivateScheduledLocks();
void RemoveScheduledLocks(const TString &requestId);

Expand All @@ -296,7 +298,7 @@ class TLockableItem : public TThrRefBase {
std::list<TExternalLock> ExternalLocks;
std::list<TScheduledLock> ScheduledLocks;
TVector<TTemporaryLock> TempLocks;
ui64 DeactivatedLocksOrder = Max<ui64>();
i32 DeactivatedLocksPriority = Max<i32>();
THashSet<NKikimrCms::EMarker> Markers;
};

Expand Down Expand Up @@ -667,7 +669,6 @@ class TClusterInfo : public TThrRefBase {
TOperationLogManager LogManager;
TOperationLogManager ScheduledLogManager;

void ApplyActionToOperationLog(const NKikimrCms::TAction &action);
void ApplyActionWithoutLog(const NKikimrCms::TAction &action);
void ApplyNodeLimits(ui32 clusterLimit, ui32 clusterRatioLimit, ui32 tenantLimit, ui32 tenantRatioLimit);

Expand Down Expand Up @@ -912,7 +913,7 @@ class TClusterInfo : public TThrRefBase {
ui64 AddTempLocks(const NKikimrCms::TAction &action, const TActorContext *ctx);
ui64 ScheduleActions(const TRequestInfo &request, const TActorContext *ctx);
void UnscheduleActions(const TString &requestId);
void DeactivateScheduledLocks(ui64 order);
void DeactivateScheduledLocks(i32 priority);
void ReactivateScheduledLocks();

void RollbackLocks(ui64 point);
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/cms/cluster_info_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,33 +199,33 @@ void AddActions(TRequestInfo &request, const NKikimrCms::TAction &action, Ts...
}

template<typename... Ts>
TRequestInfo MakeRequest(const TString &id, const TString &owner, ui64 order, Ts... actions)
TRequestInfo MakeRequest(const TString &id, const TString &owner, i32 priority, Ts... actions)
{
TRequestInfo res;
res.RequestId = id;
res.Owner = owner;
res.Order = order;
res.Priority = priority;
AddActions(res, actions...);
return res;
}

template<typename I>
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, ui64 order)
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, i32 priority)
{
UNIT_ASSERT(pos != end);
UNIT_ASSERT_VALUES_EQUAL(pos->RequestId, id);
UNIT_ASSERT_VALUES_EQUAL(pos->Owner, owner);
UNIT_ASSERT_VALUES_EQUAL(pos->Order, order);
UNIT_ASSERT_VALUES_EQUAL(pos->Priority, priority);
UNIT_ASSERT(++pos == end);
}

template<typename I, typename... Ts>
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, ui64 order, Ts... locks)
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, i32 priority, Ts... locks)
{
UNIT_ASSERT(pos != end);
UNIT_ASSERT_VALUES_EQUAL(pos->RequestId, id);
UNIT_ASSERT_VALUES_EQUAL(pos->Owner, owner);
UNIT_ASSERT_VALUES_EQUAL(pos->Order, order);
UNIT_ASSERT_VALUES_EQUAL(pos->Priority, priority);
CheckScheduledLocks(++pos, end, locks...);
}

Expand Down Expand Up @@ -442,7 +442,7 @@ Y_UNIT_TEST_SUITE(TClusterInfoTest) {
"request-3", "user-3", 3,
"request-4", "user-4", 4);

cluster->DeactivateScheduledLocks(request2.Order);
cluster->DeactivateScheduledLocks(request2.Priority);

TErrorInfo error;
UNIT_ASSERT(cluster->Node(1).IsLocked(error, TDuration(), Now(), TDuration()));
Expand Down
Loading
Loading