Skip to content

Commit

Permalink
Merge dfcbae3 into 6cf49bf
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Feb 7, 2024
2 parents 6cf49bf + dfcbae3 commit d98d101
Show file tree
Hide file tree
Showing 20 changed files with 442 additions and 55 deletions.
4 changes: 3 additions & 1 deletion ydb/core/cms/api_adapters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class TCreateMaintenanceTask: public TPermissionResponseProcessor<
cmsRequest.SetAvailabilityMode(ConvertAvailabilityMode(opts.availability_mode()));
cmsRequest.SetPartialPermissionAllowed(true);
cmsRequest.SetSchedule(true);
cmsRequest.SetPriority(opts.priority());

for (const auto& group : request.action_groups()) {
Y_ABORT_UNLESS(group.actions().size() == 1);
Expand Down Expand Up @@ -559,7 +560,8 @@ class TGetMaintenanceTask: public TAdapterActor<
opts.set_task_uid(taskUid);
opts.set_description(request.GetReason());
opts.set_availability_mode(ConvertAvailabilityMode(request.GetAvailabilityMode()));

opts.set_priority(request.GetPriority());

// pending actions
for (const auto& action : request.GetActions()) {
ConvertAction(action, *result.add_action_group_states()->add_action_states());
Expand Down
65 changes: 41 additions & 24 deletions ydb/core/cms/cluster_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ bool TLockableItem::IsLocked(TErrorInfo &error, TDuration defaultRetryTime,
return true;
}

if (!ScheduledLocks.empty() && ScheduledLocks.begin()->Order < DeactivatedLocksOrder) {
if (!ScheduledLocks.empty() && ScheduledLocks.begin()->Priority < DeactivatedLocksPriority) {
error.Code = TStatus::DISALLOW_TEMP;
error.Reason = Sprintf("%s has scheduled action %s owned by %s (order %" PRIu64 " vs %" PRIu64 ")",
error.Reason = Sprintf("%s has scheduled action %s owned by %s (priority %" PRIi32 " vs %" PRIi32 ")",
PrettyItemName().data(), ScheduledLocks.begin()->RequestId.data(),
ScheduledLocks.begin()->Owner.data(), ScheduledLocks.begin()->Order,
DeactivatedLocksOrder);
ScheduledLocks.begin()->Owner.data(), ScheduledLocks.begin()->Priority,
DeactivatedLocksPriority);
error.Deadline = now + defaultRetryTime;
return true;
}
Expand Down Expand Up @@ -113,12 +113,12 @@ void TLockableItem::RollbackLocks(ui64 point)

void TLockableItem::ReactivateScheduledLocks()
{
DeactivatedLocksOrder = Max<ui64>();
DeactivatedLocksPriority = Max<i32>();
}

void TLockableItem::DeactivateScheduledLocks(ui64 order)
void TLockableItem::DeactivateScheduledLocks(i32 priority)
{
DeactivatedLocksOrder = order;
DeactivatedLocksPriority = priority;
}

void TLockableItem::RemoveScheduledLocks(const TString &requestId)
Expand Down Expand Up @@ -650,21 +650,30 @@ void TClusterInfo::ApplyActionWithoutLog(const NKikimrCms::TAction &action)
case TAction::REBOOT_HOST:
if (auto nodes = NodePtrs(action.GetHost(), MakeServices(action))) {
for (const auto node : nodes) {
for (auto &nodeGroup: node->NodeGroups)
nodeGroup->LockNode(node->NodeId);
for (auto &nodeGroup: node->NodeGroups) {
if (!nodeGroup->IsNodeLocked(node->NodeId)) {
nodeGroup->LockNode(node->NodeId);
}
}
}
}
break;
case TAction::REPLACE_DEVICES:
for (const auto &device : action.GetDevices()) {
if (HasPDisk(device)) {
auto pdisk = &PDiskRef(device);
for (auto &nodeGroup: NodeRef(pdisk->NodeId).NodeGroups)
nodeGroup->LockNode(pdisk->NodeId);
for (auto &nodeGroup: NodeRef(pdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(pdisk->NodeId)) {
nodeGroup->LockNode(pdisk->NodeId);
}
}
} else if (HasVDisk(device)) {
auto vdisk = &VDiskRef(device);
for (auto &nodeGroup: NodeRef(vdisk->NodeId).NodeGroups)
nodeGroup->LockNode(vdisk->NodeId);
for (auto &nodeGroup: NodeRef(vdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(vdisk->NodeId)) {
nodeGroup->LockNode(vdisk->NodeId);
}
}
}
}
break;
Expand Down Expand Up @@ -756,7 +765,7 @@ ui64 TClusterInfo::AddLocks(const TPermissionInfo &permission, const TActorConte
|| permission.Action.GetType() == TAction::REBOOT_HOST
|| permission.Action.GetType() == TAction::REPLACE_DEVICES)) {
item->State = RESTART;
lock = true;;
lock = true;
}

if (lock) {
Expand Down Expand Up @@ -854,7 +863,7 @@ ui64 TClusterInfo::ScheduleActions(const TRequestInfo &request, const TActorCont
auto items = FindLockedItems(action, ctx);

for (auto item : items)
item->ScheduleLock({action, request.Owner, request.RequestId, request.Order});
item->ScheduleLock({action, request.Owner, request.RequestId, request.Priority});

locks += items.size();
}
Expand All @@ -868,10 +877,10 @@ void TClusterInfo::UnscheduleActions(const TString &requestId)
entry.second->RemoveScheduledLocks(requestId);
}

void TClusterInfo::DeactivateScheduledLocks(ui64 order)
void TClusterInfo::DeactivateScheduledLocks(i32 priority)
{
for (auto &entry : LockableItems)
entry.second->DeactivateScheduledLocks(order);
entry.second->DeactivateScheduledLocks(priority);
}

void TClusterInfo::ReactivateScheduledLocks()
Expand Down Expand Up @@ -1020,22 +1029,30 @@ void TOperationLogManager::ApplyAction(const NKikimrCms::TAction &action,
case NKikimrCms::TAction::REBOOT_HOST:
if (auto nodes = clusterState->NodePtrs(action.GetHost(), MakeServices(action))) {
for (const auto node : nodes) {
for (auto &nodeGroup: node->NodeGroups)
AddNodeLockOperation(node->NodeId, nodeGroup);
for (auto &nodeGroup: node->NodeGroups) {
if (!nodeGroup->IsNodeLocked(node->NodeId)) {
AddNodeLockOperation(node->NodeId, nodeGroup);
}
}
}
}
break;
case NKikimrCms::TAction::REPLACE_DEVICES:
for (const auto &device : action.GetDevices()) {
if (clusterState->HasPDisk(device)) {
auto pdisk = &clusterState->PDisk(device);
for (auto &nodeGroup: clusterState->NodeRef(pdisk->NodeId).NodeGroups)
AddNodeLockOperation(pdisk->NodeId, nodeGroup);

for (auto &nodeGroup: clusterState->NodeRef(pdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(pdisk->NodeId)) {
AddNodeLockOperation(pdisk->NodeId, nodeGroup);
}
}
} else if (clusterState->HasVDisk(device)) {
auto vdisk = &clusterState->VDisk(device);
for (auto &nodeGroup: clusterState->NodeRef(vdisk->NodeId).NodeGroups)
AddNodeLockOperation(vdisk->NodeId, nodeGroup);
for (auto &nodeGroup: clusterState->NodeRef(vdisk->NodeId).NodeGroups) {
if (!nodeGroup->IsNodeLocked(vdisk->NodeId)) {
AddNodeLockOperation(vdisk->NodeId, nodeGroup);
}
}
}
}
break;
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/cms/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ struct TRequestInfo {
request.SetPartialPermissionAllowed(Request.GetPartialPermissionAllowed());
request.SetReason(Request.GetReason());
request.SetAvailabilityMode(Request.GetAvailabilityMode());
request.SetPriority(Priority);
}

TString RequestId;
TString Owner;
ui64 Order = 0;
i32 Priority = 0;
NKikimrCms::TPermissionRequest Request;
};

Expand Down Expand Up @@ -203,10 +205,10 @@ class TLockableItem : public TThrRefBase {
};

struct TScheduledLock : TBaseLock {
TScheduledLock(const NKikimrCms::TAction &action, const TString &owner, const TString &requestId, ui64 order)
TScheduledLock(const NKikimrCms::TAction &action, const TString &owner, const TString &requestId, i32 priority)
: TBaseLock(owner, action)
, RequestId(requestId)
, Order(order)
, Priority(priority)
{
}

Expand All @@ -217,7 +219,7 @@ class TLockableItem : public TThrRefBase {
TScheduledLock &operator=(TScheduledLock &&other) = default;

TString RequestId;
ui64 Order = 0;
i32 Priority = 0;
};

struct TTemporaryLock : TBaseLock {
Expand Down Expand Up @@ -268,7 +270,7 @@ class TLockableItem : public TThrRefBase {

void ScheduleLock(TScheduledLock &&lock) {
auto pos = LowerBound(ScheduledLocks.begin(), ScheduledLocks.end(), lock, [](auto &l, auto &r) {
return l.Order < r.Order;
return l.Priority < r.Priority;
});
ScheduledLocks.insert(pos, lock);
}
Expand All @@ -278,7 +280,7 @@ class TLockableItem : public TThrRefBase {

void RollbackLocks(ui64 point);

void DeactivateScheduledLocks(ui64 order);
void DeactivateScheduledLocks(i32 priority);
void ReactivateScheduledLocks();
void RemoveScheduledLocks(const TString &requestId);

Expand All @@ -296,7 +298,7 @@ class TLockableItem : public TThrRefBase {
std::list<TExternalLock> ExternalLocks;
std::list<TScheduledLock> ScheduledLocks;
TVector<TTemporaryLock> TempLocks;
ui64 DeactivatedLocksOrder = Max<ui64>();
i32 DeactivatedLocksPriority = Max<i32>();
THashSet<NKikimrCms::EMarker> Markers;
};

Expand Down Expand Up @@ -667,7 +669,6 @@ class TClusterInfo : public TThrRefBase {
TOperationLogManager LogManager;
TOperationLogManager ScheduledLogManager;

void ApplyActionToOperationLog(const NKikimrCms::TAction &action);
void ApplyActionWithoutLog(const NKikimrCms::TAction &action);
void ApplyNodeLimits(ui32 clusterLimit, ui32 clusterRatioLimit, ui32 tenantLimit, ui32 tenantRatioLimit);

Expand Down Expand Up @@ -912,7 +913,7 @@ class TClusterInfo : public TThrRefBase {
ui64 AddTempLocks(const NKikimrCms::TAction &action, const TActorContext *ctx);
ui64 ScheduleActions(const TRequestInfo &request, const TActorContext *ctx);
void UnscheduleActions(const TString &requestId);
void DeactivateScheduledLocks(ui64 order);
void DeactivateScheduledLocks(i32 priority);
void ReactivateScheduledLocks();

void RollbackLocks(ui64 point);
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/cms/cluster_info_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,33 +199,33 @@ void AddActions(TRequestInfo &request, const NKikimrCms::TAction &action, Ts...
}

template<typename... Ts>
TRequestInfo MakeRequest(const TString &id, const TString &owner, ui64 order, Ts... actions)
TRequestInfo MakeRequest(const TString &id, const TString &owner, i32 priority, Ts... actions)
{
TRequestInfo res;
res.RequestId = id;
res.Owner = owner;
res.Order = order;
res.Priority = priority;
AddActions(res, actions...);
return res;
}

template<typename I>
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, ui64 order)
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, i32 priority)
{
UNIT_ASSERT(pos != end);
UNIT_ASSERT_VALUES_EQUAL(pos->RequestId, id);
UNIT_ASSERT_VALUES_EQUAL(pos->Owner, owner);
UNIT_ASSERT_VALUES_EQUAL(pos->Order, order);
UNIT_ASSERT_VALUES_EQUAL(pos->Priority, priority);
UNIT_ASSERT(++pos == end);
}

template<typename I, typename... Ts>
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, ui64 order, Ts... locks)
void CheckScheduledLocks(I pos, I end, const TString &id, const TString &owner, i32 priority, Ts... locks)
{
UNIT_ASSERT(pos != end);
UNIT_ASSERT_VALUES_EQUAL(pos->RequestId, id);
UNIT_ASSERT_VALUES_EQUAL(pos->Owner, owner);
UNIT_ASSERT_VALUES_EQUAL(pos->Order, order);
UNIT_ASSERT_VALUES_EQUAL(pos->Priority, priority);
CheckScheduledLocks(++pos, end, locks...);
}

Expand Down Expand Up @@ -442,7 +442,7 @@ Y_UNIT_TEST_SUITE(TClusterInfoTest) {
"request-3", "user-3", 3,
"request-4", "user-4", 4);

cluster->DeactivateScheduledLocks(request2.Order);
cluster->DeactivateScheduledLocks(request2.Priority);

TErrorInfo error;
UNIT_ASSERT(cluster->Node(1).IsLocked(error, TDuration(), Now(), TDuration()));
Expand Down
Loading

0 comments on commit d98d101

Please sign in to comment.