Skip to content

Commit

Permalink
Add support for single composite action group (#5728)
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Jul 1, 2024
1 parent 823fbb3 commit c98299a
Show file tree
Hide file tree
Showing 15 changed files with 234 additions and 28 deletions.
69 changes: 51 additions & 18 deletions ydb/core/cms/api_adapters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,25 @@ class TListClusterNodes: public TAdapterActor<

}; // TListClusterNodes

class TCompositeActionGroupHandler {
protected:
template <typename TResult>
Ydb::Maintenance::ActionGroupStates* GetActionGroupState(TResult& result) const {
if (HasSingleCompositeActionGroup && !result.action_group_states().empty()) {
return result.mutable_action_group_states(0);
}
return result.add_action_group_states();
}

protected:
bool HasSingleCompositeActionGroup = false;
};

template <typename TDerived, typename TEvRequest>
class TPermissionResponseProcessor: public TAdapterActor<TDerived, TEvRequest, TEvCms::TEvMaintenanceTaskResponse> {
class TPermissionResponseProcessor
: public TAdapterActor<TDerived, TEvRequest, TEvCms::TEvMaintenanceTaskResponse>
, public TCompositeActionGroupHandler
{
protected:
using TBase = TPermissionResponseProcessor<TDerived, TEvRequest>;

Expand All @@ -232,11 +249,11 @@ class TPermissionResponseProcessor: public TAdapterActor<TDerived, TEvRequest, T
switch (record.GetStatus().GetCode()) {
case NKikimrCms::TStatus::ALLOW:
case NKikimrCms::TStatus::ALLOW_PARTIAL:
case NKikimrCms::TStatus::DISALLOW:
case NKikimrCms::TStatus::DISALLOW_TEMP:
break;
case NKikimrCms::TStatus::ERROR_TEMP:
return this->Reply(Ydb::StatusIds::UNAVAILABLE, record.GetStatus().GetReason());
case NKikimrCms::TStatus::DISALLOW:
case NKikimrCms::TStatus::WRONG_REQUEST:
case NKikimrCms::TStatus::ERROR:
case NKikimrCms::TStatus::NO_SUCH_HOST:
Expand Down Expand Up @@ -266,7 +283,7 @@ class TPermissionResponseProcessor: public TAdapterActor<TDerived, TEvRequest, T
// performed actions: new permissions
for (const auto& permission : record.GetPermissions()) {
permissionsSeen.insert(permission.GetId());
ConvertPermission(taskUid, permission, *result.add_action_group_states()->add_action_states());
ConvertPermission(taskUid, permission, *GetActionGroupState(result)->add_action_states());
}

auto cmsState = this->GetCmsState();
Expand All @@ -279,15 +296,15 @@ class TPermissionResponseProcessor: public TAdapterActor<TDerived, TEvRequest, T
}

ConvertPermission(taskUid, cmsState->Permissions.at(id),
*result.add_action_group_states()->add_action_states());
*GetActionGroupState(result)->add_action_states());
}
}

// pending actions
if (cmsState->ScheduledRequests.contains(record.GetRequestId())) {
const auto& request = cmsState->ScheduledRequests.at(record.GetRequestId());
for (const auto& action : request.Request.GetActions()) {
ConvertAction(action, *result.add_action_group_states()->add_action_states());
ConvertAction(action, *GetActionGroupState(result)->add_action_states());
}
}

Expand Down Expand Up @@ -336,8 +353,17 @@ class TCreateMaintenanceTask: public TPermissionResponseProcessor<
if (group.actions().size() < 1) {
Reply(Ydb::StatusIds::BAD_REQUEST, "Empty actions");
return false;
} else if (group.actions().size() > 1) {
Reply(Ydb::StatusIds::UNSUPPORTED, "Composite action groups are not supported");
}

if (!GetCmsState()->EnableSingleCompositeActionGroup && group.actions().size() > 1) {
Reply(Ydb::StatusIds::UNSUPPORTED, "Feature flag EnableSingleCompositeActionGroup is off");
return false;
}

if (request.action_groups().size() > 1 && group.actions().size() > 1) {
Reply(Ydb::StatusIds::UNSUPPORTED, TStringBuilder()
<< "A task can have either a single composite action group or many action groups"
<< " with only one action");
return false;
}

Expand Down Expand Up @@ -368,7 +394,7 @@ class TCreateMaintenanceTask: public TPermissionResponseProcessor<
}
}

static void ConvertRequest(const TString& user, const Ydb::Maintenance::CreateMaintenanceTaskRequest& request,
void ConvertRequest(const TString& user, const Ydb::Maintenance::CreateMaintenanceTaskRequest& request,
NKikimrCms::TPermissionRequest& cmsRequest)
{
const auto& opts = request.task_options();
Expand All @@ -378,16 +404,19 @@ class TCreateMaintenanceTask: public TPermissionResponseProcessor<
cmsRequest.SetDryRun(opts.dry_run());
cmsRequest.SetReason(opts.description());
cmsRequest.SetAvailabilityMode(ConvertAvailabilityMode(opts.availability_mode()));
cmsRequest.SetPartialPermissionAllowed(true);
cmsRequest.SetSchedule(true);

i32 priority = opts.priority();
if (priority != 0) {
cmsRequest.SetPriority(priority);
}

HasSingleCompositeActionGroup = request.action_groups().size() == 1
&& request.action_groups(0).actions().size() > 1;
cmsRequest.SetPartialPermissionAllowed(!HasSingleCompositeActionGroup);

for (const auto& group : request.action_groups()) {
Y_ABORT_UNLESS(group.actions().size() == 1);
Y_ABORT_UNLESS(HasSingleCompositeActionGroup || group.actions().size() == 1);
for (const auto& action : group.actions()) {
if (action.has_lock_action()) {
ConvertAction(action.lock_action(), *cmsRequest.AddActions());
Expand Down Expand Up @@ -441,6 +470,7 @@ class TRefreshMaintenanceTask: public TPermissionResponseProcessor<
}

const auto& task = tit->second;
HasSingleCompositeActionGroup = task.HasSingleCompositeActionGroup;

auto rit = cmsState->ScheduledRequests.find(task.RequestId);
if (rit == cmsState->ScheduledRequests.end()) {
Expand All @@ -457,7 +487,7 @@ class TRefreshMaintenanceTask: public TPermissionResponseProcessor<
}

ConvertPermission(taskUid, cmsState->Permissions.at(id),
*result.add_action_group_states()->add_action_states());
*GetActionGroupState(result)->add_action_states());
}

if (result.action_group_states().empty()) {
Expand Down Expand Up @@ -486,10 +516,12 @@ class TRefreshMaintenanceTask: public TPermissionResponseProcessor<

}; // TRefreshMaintenanceTask

class TGetMaintenanceTask: public TAdapterActor<
TGetMaintenanceTask,
TEvCms::TEvGetMaintenanceTaskRequest,
TEvCms::TEvGetMaintenanceTaskResponse>
class TGetMaintenanceTask
: public TAdapterActor<
TGetMaintenanceTask,
TEvCms::TEvGetMaintenanceTaskRequest,
TEvCms::TEvGetMaintenanceTaskResponse>
, public TCompositeActionGroupHandler
{
public:
using TBase::TBase;
Expand All @@ -504,6 +536,7 @@ class TGetMaintenanceTask: public TAdapterActor<
}

const auto& task = it->second;
HasSingleCompositeActionGroup = task.HasSingleCompositeActionGroup;

if (!cmsState->ScheduledRequests.contains(task.RequestId)) {
auto response = MakeHolder<TEvCms::TEvGetMaintenanceTaskResponse>();
Expand All @@ -519,7 +552,7 @@ class TGetMaintenanceTask: public TAdapterActor<
}

ConvertPermission(taskUid, cmsState->Permissions.at(id),
*result.add_action_group_states()->add_action_states());
*GetActionGroupState(result)->add_action_states());
}

return Reply(std::move(response));
Expand Down Expand Up @@ -570,7 +603,7 @@ class TGetMaintenanceTask: public TAdapterActor<

// pending actions
for (const auto& action : request.GetActions()) {
ConvertAction(action, *result.add_action_group_states()->add_action_states());
ConvertAction(action, *GetActionGroupState(result)->add_action_states());
}
}

Expand All @@ -584,7 +617,7 @@ class TGetMaintenanceTask: public TAdapterActor<
}

ConvertPermission(taskUid, cmsState->Permissions.at(id),
*result.add_action_group_states()->add_action_states());
*GetActionGroupState(result)->add_action_states());
}
}

Expand Down
8 changes: 5 additions & 3 deletions ydb/core/cms/cms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ void TCms::DefaultSignalTabletActive(const TActorContext &)

void TCms::OnActivateExecutor(const TActorContext &ctx)
{
EnableCMSRequestPriorities = AppData(ctx)->FeatureFlags.GetEnableCMSRequestPriorities();
State->EnableCMSRequestPriorities = AppData(ctx)->FeatureFlags.GetEnableCMSRequestPriorities();
State->EnableSingleCompositeActionGroup = AppData(ctx)->FeatureFlags.GetEnableSingleCompositeActionGroup();

Executor()->RegisterExternalTabletCounters(TabletCountersPtr.Release());

Expand Down Expand Up @@ -1469,7 +1470,7 @@ void TCms::CheckAndEnqueueRequest(TEvCms::TEvPermissionRequest::TPtr &ev, const
}
}

if (rec.HasPriority() && !EnableCMSRequestPriorities) {
if (rec.HasPriority() && !State->EnableCMSRequestPriorities) {
if (rec.GetUser() == WALLE_CMS_USER) {
rec.ClearPriority();
} else {
Expand Down Expand Up @@ -2271,7 +2272,8 @@ void TCms::Handle(TEvConsole::TEvConfigNotificationRequest::TPtr &ev,
{
const auto& appConfig = ev->Get()->Record.GetConfig();
if (appConfig.HasFeatureFlags()) {
EnableCMSRequestPriorities = appConfig.GetFeatureFlags().GetEnableCMSRequestPriorities();
State->EnableCMSRequestPriorities = appConfig.GetFeatureFlags().GetEnableCMSRequestPriorities();
State->EnableSingleCompositeActionGroup = appConfig.GetFeatureFlags().GetEnableSingleCompositeActionGroup();
}

if (ev->Get()->Record.HasLocal() && ev->Get()->Record.GetLocal()) {
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/cms/cms_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,6 @@ class TCms : public TActor<TCms>, public TTabletExecutedFlat {

TInstant InfoCollectorStartTime;

bool EnableCMSRequestPriorities = false;

private:
TString GenerateStat();
void GenerateNodeState(IOutputStream&);
Expand Down
74 changes: 74 additions & 0 deletions ydb/core/cms/cms_maintenance_api_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include "cms_ut_common.h"

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NCmsTest {

using namespace Ydb::Maintenance;

Y_UNIT_TEST_SUITE(TMaintenanceApiTest) {
Y_UNIT_TEST(ManyActionGroupsWithSingleAction) {
TCmsTestEnv env(8);

auto response = env.CheckMaintenanceTaskCreate("task-1", Ydb::StatusIds::SUCCESS,
MakeActionGroup(
MakeLockAction(env.GetNodeId(0), TDuration::Minutes(10))
),
MakeActionGroup(
MakeLockAction(env.GetNodeId(1), TDuration::Minutes(10))
)
);

UNIT_ASSERT_VALUES_EQUAL(response.action_group_states().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(response.action_group_states(0).action_states().size(), 1);
const auto &a1 = response.action_group_states(0).action_states(0);
UNIT_ASSERT_VALUES_EQUAL(response.action_group_states(1).action_states().size(), 1);
const auto &a2 = response.action_group_states(1).action_states(0);

bool hasPerformedAction = a1.status() == ActionState::ACTION_STATUS_PERFORMED
|| a2.status() == ActionState::ACTION_STATUS_PERFORMED;
bool hasPendingAction = a1.status() == ActionState::ACTION_STATUS_PENDING
|| a2.status() == ActionState::ACTION_STATUS_PENDING;
UNIT_ASSERT(hasPerformedAction && hasPendingAction);
}

Y_UNIT_TEST(SingleCompositeActionGroup) {
TCmsTestEnv env(16);

// lock a node to prevent task-2 from taking a lock on a node from the same storage group
env.CheckMaintenanceTaskCreate("task-1", Ydb::StatusIds::SUCCESS,
MakeActionGroup(
MakeLockAction(env.GetNodeId(0), TDuration::Minutes(10))
)
);

auto response = env.CheckMaintenanceTaskCreate("task-2", Ydb::StatusIds::SUCCESS,
MakeActionGroup(
MakeLockAction(env.GetNodeId(1), TDuration::Minutes(10)),
MakeLockAction(env.GetNodeId(9), TDuration::Minutes(10))
)
);

UNIT_ASSERT_VALUES_EQUAL(response.action_group_states().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(response.action_group_states(0).action_states().size(), 2);
const auto &a1 = response.action_group_states(0).action_states(0);
const auto &a2 = response.action_group_states(0).action_states(1);

bool allActionsPending = a1.status() == ActionState::ACTION_STATUS_PENDING
&& a2.status() == ActionState::ACTION_STATUS_PENDING;
UNIT_ASSERT(allActionsPending);
}

Y_UNIT_TEST(CompositeActionGroupSameStorageGroup) {
TCmsTestEnv env(8);

env.CheckMaintenanceTaskCreate("task-2", Ydb::StatusIds::BAD_REQUEST,
MakeActionGroup(
MakeLockAction(env.GetNodeId(0), TDuration::Minutes(10)),
MakeLockAction(env.GetNodeId(1), TDuration::Minutes(10))
)
);
}
}

} // namespace NKikimr::NCmsTest
5 changes: 5 additions & 0 deletions ydb/core/cms/cms_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ struct TTaskInfo {
TString RequestId;
TString Owner;
TSet<TString> Permissions;
bool HasSingleCompositeActionGroup = false;

TString ToString() const {
return TStringBuilder() << "{"
<< " TaskId: " << TaskId
<< " RequestId: " << RequestId
<< " Owner: " << Owner
<< " Permissions: [" << JoinSeq(", ", Permissions) << "]"
<< " HasSingleCompositeActionGroup: " << HasSingleCompositeActionGroup
<< " }";
}
};
Expand Down Expand Up @@ -63,6 +65,9 @@ struct TCmsState : public TAtomicRefCount<TCmsState> {
TActorId CmsActorId;
TActorId BSControllerPipe;
TActorId Sentinel;

bool EnableCMSRequestPriorities = false;
bool EnableSingleCompositeActionGroup = false;
};

using TCmsStatePtr = TIntrusivePtr<TCmsState>;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/cms/cms_tx_load_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ class TCms::TTxLoadState : public TTransactionBase<TCms> {
TString taskId = maintenanceTasksRowset.GetValue<Schema::MaintenanceTasks::TaskID>();
TString requestId = maintenanceTasksRowset.GetValue<Schema::MaintenanceTasks::RequestID>();
TString owner = maintenanceTasksRowset.GetValue<Schema::MaintenanceTasks::Owner>();
bool hasSingleCompositeActionGroup = maintenanceTasksRowset.GetValue<Schema::MaintenanceTasks::HasSingleCompositeActionGroup>();

state->MaintenanceRequests.emplace(requestId, taskId);
state->MaintenanceTasks.emplace(taskId, TTaskInfo{
.TaskId = taskId,
.RequestId = requestId,
.Owner = owner,
.HasSingleCompositeActionGroup = hasSingleCompositeActionGroup
});

LOG_DEBUG(ctx, NKikimrServices::CMS, "Loaded maintenance task %s mapped to request %s",
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/cms/cms_tx_store_permissions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ class TCms::TTxStorePermissions : public TTransactionBase<TCms> {
.TaskId = *MaintenanceTaskId,
.RequestId = Scheduled->RequestId,
.Owner = Scheduled->Owner,
.HasSingleCompositeActionGroup = !Scheduled->Request.GetPartialPermissionAllowed()
});

db.Table<Schema::MaintenanceTasks>().Key(*MaintenanceTaskId).Update(
NIceDb::TUpdate<Schema::MaintenanceTasks::RequestID>(Scheduled->RequestId),
NIceDb::TUpdate<Schema::MaintenanceTasks::Owner>(Scheduled->Owner)
NIceDb::TUpdate<Schema::MaintenanceTasks::Owner>(Scheduled->Owner),
NIceDb::TUpdate<Schema::MaintenanceTasks::HasSingleCompositeActionGroup>(!Scheduled->Request.GetPartialPermissionAllowed())
);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/cms/cms_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ static void SetupServices(TTestActorRuntime &runtime, const TTestEnvOpts &option
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableBootstrapConfig()->CopyFrom(TFakeNodeWhiteboardService::BootstrapConfig);
appConfig.MutableFeatureFlags()->SetEnableCMSRequestPriorities(options.EnableCMSRequestPriorities);
appConfig.MutableFeatureFlags()->SetEnableSingleCompositeActionGroup(options.EnableSingleCompositeActionGroup);
runtime.AddLocalService(
MakeConfigsDispatcherID(
runtime.GetNodeId(0)),
Expand Down
Loading

0 comments on commit c98299a

Please sign in to comment.