Skip to content

Commit

Permalink
KIKIMR-20079: create/alter/drop group (ydb-platform#501)
Browse files Browse the repository at this point in the history
* KIKIMR-20079: create/alter/drop group

* fix test

* fix mistake

* resolve issues
  • Loading branch information
VPolka authored and CyberROFL committed Dec 19, 2023
1 parent b3a885e commit 8e58ccd
Show file tree
Hide file tree
Showing 34 changed files with 819 additions and 58 deletions.
32 changes: 32 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,49 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterUser: {
auto modifyScheme = schemeOp.GetAlterUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropUser: {
auto modifyScheme = schemeOp.GetDropUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kCreateGroup: {
auto modifyScheme = schemeOp.GetCreateGroup();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAddGroupMembership: {
auto modifyScheme = schemeOp.GetAddGroupMembership();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kRemoveGroupMembership: {
auto modifyScheme = schemeOp.GetRemoveGroupMembership();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kRenameGroup: {
auto modifyScheme = schemeOp.GetRenameGroup();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropGroup: {
auto modifyScheme = schemeOp.GetDropGroup();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/kqp/gateway/actors/scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,17 @@ class TSchemeOpRequestHandler: public TRequestHandlerBase<
{
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, "Successful completion of scheme request"
<< ", TxId: " << response.GetTxId());

TResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));

if (!response.GetIssues().empty()) {
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetIssues(), issues);
Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::SUCCESS, "", issues));
} else {
TResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));
}

this->Die(ctx);
return;
}
Expand Down
53 changes: 44 additions & 9 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,47 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> RenameGroup(const TString& cluster, NYql::TRenameGroupSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}

TString database;
if (!GetDatabaseForLoginOperation(database)) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

TPromise<TGenericResult> renameGroupPromise = NewPromise<TGenericResult>();

auto ev = MakeHolder<TRequest>();
ev->Record.SetDatabaseName(database);
if (UserToken) {
ev->Record.SetUserToken(UserToken->GetSerializedToken());
}
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& renameGroup = *schemeTx.MutableAlterLogin()->MutableRenameGroup();

renameGroup.SetGroup(settings.GroupName);
renameGroup.SetNewName(settings.NewName);

SendSchemeRequest(ev.Release()).Apply(
[renameGroupPromise](const TFuture<TGenericResult>& future) mutable {
renameGroupPromise.SetValue(future.GetValue());
}
);

return renameGroupPromise.GetFuture();
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> DropGroup(const TString& cluster, const NYql::TDropGroupSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

Expand All @@ -1798,17 +1839,11 @@ class TKikimrIcGateway : public IKqpGateway {
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();

dropGroup.SetGroup(settings.GroupName);
dropGroup.SetMissingOk(settings.Force);

SendSchemeRequest(ev.Release()).Apply(
[dropGroupPromise, &settings](const TFuture<TGenericResult>& future) mutable {
const auto& realResult = future.GetValue();
if (!realResult.Success() && realResult.Status() == TIssuesIds::DEFAULT_ERROR && settings.Force) {
IKqpGateway::TGenericResult fakeResult;
fakeResult.SetSuccess();
dropGroupPromise.SetValue(std::move(fakeResult));
} else {
dropGroupPromise.SetValue(realResult);
}
[dropGroupPromise](const TFuture<TGenericResult>& future) mutable {
dropGroupPromise.SetValue(future.GetValue());
}
);

Expand Down
153 changes: 150 additions & 3 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,15 +908,133 @@ class TKqpGatewayProxy : public IKikimrGateway {
}

TFuture<TGenericResult> CreateGroup(const TString& cluster, const TCreateGroupSettings& settings) override {
FORWARD_ENSURE_NO_PREPARE(CreateGroup, cluster, settings);
CHECK_PREPARED_DDL(CreateGroup);

auto createGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& createGroup = *schemeTx.MutableAlterLogin()->MutableCreateGroup();
createGroup.SetGroup(settings.GroupName);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

phyTx.MutableSchemeOperation()->MutableCreateGroup()->Swap(&schemeTx);

if (settings.Roles.size()) {
AddUsersToGroup(database, settings.GroupName, settings.Roles, NYql::TAlterGroupSettings::EAction::AddRoles);
}

TGenericResult result;
result.SetSuccess();
createGroupPromise.SetValue(result);
} else {
return Gateway->CreateGroup(cluster, settings);
}

return createGroupPromise.GetFuture();
}

TFuture<TGenericResult> AlterGroup(const TString& cluster, TAlterGroupSettings& settings) override {
FORWARD_ENSURE_NO_PREPARE(AlterGroup, cluster, settings);
CHECK_PREPARED_DDL(UpdateGroup);

auto alterGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

if (!settings.Roles.size()) {
return MakeFuture(ResultFromError<TGenericResult>("No roles given for AlterGroup request"));
}

if (IsPrepare()) {
AddUsersToGroup(database, settings.GroupName, settings.Roles, settings.Action);

TGenericResult result;
result.SetSuccess();
alterGroupPromise.SetValue(result);
} else {
return Gateway->AlterGroup(cluster, settings);
}

return alterGroupPromise.GetFuture();
}

TFuture<TGenericResult> RenameGroup(const TString& cluster, TRenameGroupSettings& settings) override {
CHECK_PREPARED_DDL(RenameGroup);

auto renameGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& renameGroup = *schemeTx.MutableAlterLogin()->MutableRenameGroup();
renameGroup.SetGroup(settings.GroupName);
renameGroup.SetNewName(settings.NewName);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

phyTx.MutableSchemeOperation()->MutableRenameGroup()->Swap(&schemeTx);
TGenericResult result;
result.SetSuccess();
renameGroupPromise.SetValue(result);
} else {
return Gateway->RenameGroup(cluster, settings);
}

return renameGroupPromise.GetFuture();
}

TFuture<TGenericResult> DropGroup(const TString& cluster, const TDropGroupSettings& settings) override {
FORWARD_ENSURE_NO_PREPARE(DropGroup, cluster, settings);
CHECK_PREPARED_DDL(DropGroup);

auto dropGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();
dropGroup.SetGroup(settings.GroupName);
dropGroup.SetMissingOk(settings.Force);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

phyTx.MutableSchemeOperation()->MutableAlterUser()->Swap(&schemeTx);
TGenericResult result;
result.SetSuccess();
dropGroupPromise.SetValue(result);
} else {
return Gateway->DropGroup(cluster, settings);
}

return dropGroupPromise.GetFuture();
}

TFuture<TGenericResult> CreateColumnTable(TKikimrTableMetadataPtr metadata, bool createDir) override {
Expand Down Expand Up @@ -1000,6 +1118,35 @@ class TKqpGatewayProxy : public IKikimrGateway {
return Gateway->GetDomainName();
}

void AddUsersToGroup(const TString& database, const TString& group, const std::vector<TString>& roles, const NYql::TAlterGroupSettings::EAction& action) {
for (const auto& role : roles) {
NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);

auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

switch (action) {
case NYql::TAlterGroupSettings::EAction::AddRoles: {
auto& alterGroup = *schemeTx.MutableAlterLogin()->MutableAddGroupMembership();
alterGroup.SetGroup(group);
alterGroup.SetMember(role);
phyTx.MutableSchemeOperation()->MutableAddGroupMembership()->Swap(&schemeTx);
break;
}
case NYql::TAlterGroupSettings::EAction::RemoveRoles: {
auto& alterGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroupMembership();
alterGroup.SetGroup(group);
alterGroup.SetMember(role);
phyTx.MutableSchemeOperation()->MutableRemoveGroupMembership()->Swap(&schemeTx);
break;
}
}
}
}

private:
TIntrusivePtr<IKqpGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Error;
}

TStatus HandleRenameGroup(TKiRenameGroup node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "RenameGroup is not yet implemented for intent determination transformer"));
return TStatus::Error;
}

TStatus HandleDropGroup(TKiDropGroup node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "DropGroup is not yet implemented for intent determination transformer"));
Expand Down Expand Up @@ -466,6 +472,7 @@ class TKikimrDataSink : public TDataProviderBase
|| node.IsCallable(TKiDropUser::CallableName())
|| node.IsCallable(TKiCreateGroup::CallableName())
|| node.IsCallable(TKiAlterGroup::CallableName())
|| node.IsCallable(TKiRenameGroup::CallableName())
|| node.IsCallable(TKiDropGroup::CallableName())
|| node.IsCallable(TKiUpsertObject::CallableName())
|| node.IsCallable(TKiCreateObject::CallableName())
Expand Down Expand Up @@ -916,6 +923,7 @@ class TKikimrDataSink : public TDataProviderBase
.World(node->Child(0))
.DataSink(node->Child(1))
.GroupName().Build(key.GetRoleName())
.Roles(settings.Roles.Cast())
.Done()
.Ptr();
} else if (mode == "addUsersToGroup" || mode == "dropUsersFromGroup") {
Expand All @@ -927,6 +935,14 @@ class TKikimrDataSink : public TDataProviderBase
.Roles(settings.Roles.Cast())
.Done()
.Ptr();
} else if (mode == "renameGroup") {
return Build<TKiRenameGroup>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.GroupName().Build(key.GetRoleName())
.NewName(settings.NewName.Cast())
.Done()
.Ptr();
} else if (mode == "dropGroup") {
return Build<TKiDropGroup>(ctx, node->Pos())
.World(node->Child(0))
Expand Down Expand Up @@ -1103,6 +1119,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleAlterGroup(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiRenameGroup>(input)) {
return HandleRenameGroup(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiDropGroup>(input)) {
return HandleDropGroup(node.Cast(), ctx);
}
Expand Down
Loading

0 comments on commit 8e58ccd

Please sign in to comment.