-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIKIMR-20079: create/alter/drop group #501
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1772,6 +1772,47 @@ class TKikimrIcGateway : public IKqpGateway { | |
} | ||
} | ||
|
||
TFuture<TGenericResult> RenameGroup(const TString& cluster, NYql::TRenameGroupSettings& settings) override { | ||
using TRequest = TEvTxUserProxy::TEvProposeTransaction; | ||
|
||
try { | ||
if (!CheckCluster(cluster)) { | ||
return InvalidCluster<TGenericResult>(cluster); | ||
} | ||
|
||
TString database; | ||
if (!GetDatabaseForLoginOperation(database)) { | ||
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name")); | ||
} | ||
|
||
TPromise<TGenericResult> renameGroupPromise = NewPromise<TGenericResult>(); | ||
|
||
auto ev = MakeHolder<TRequest>(); | ||
ev->Record.SetDatabaseName(database); | ||
if (UserToken) { | ||
ev->Record.SetUserToken(UserToken->GetSerializedToken()); | ||
} | ||
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); | ||
schemeTx.SetWorkingDir(database); | ||
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin); | ||
auto& renameGroup = *schemeTx.MutableAlterLogin()->MutableRenameGroup(); | ||
|
||
renameGroup.SetGroup(settings.GroupName); | ||
renameGroup.SetNewName(settings.NewName); | ||
|
||
SendSchemeRequest(ev.Release()).Apply( | ||
[renameGroupPromise](const TFuture<TGenericResult>& future) mutable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe [&renameGroupPromise] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
renameGroupPromise.SetValue(future.GetValue()); | ||
} | ||
); | ||
|
||
return renameGroupPromise.GetFuture(); | ||
} | ||
catch (yexception& e) { | ||
return MakeFuture(ResultFromException<TGenericResult>(e)); | ||
} | ||
} | ||
|
||
TFuture<TGenericResult> DropGroup(const TString& cluster, const NYql::TDropGroupSettings& settings) override { | ||
using TRequest = TEvTxUserProxy::TEvProposeTransaction; | ||
|
||
|
@@ -1798,17 +1839,11 @@ class TKikimrIcGateway : public IKqpGateway { | |
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup(); | ||
|
||
dropGroup.SetGroup(settings.GroupName); | ||
dropGroup.SetMissingOk(settings.Force); | ||
|
||
SendSchemeRequest(ev.Release()).Apply( | ||
[dropGroupPromise, &settings](const TFuture<TGenericResult>& future) mutable { | ||
const auto& realResult = future.GetValue(); | ||
if (!realResult.Success() && realResult.Status() == TIssuesIds::DEFAULT_ERROR && settings.Force) { | ||
IKqpGateway::TGenericResult fakeResult; | ||
fakeResult.SetSuccess(); | ||
dropGroupPromise.SetValue(std::move(fakeResult)); | ||
} else { | ||
dropGroupPromise.SetValue(realResult); | ||
} | ||
[dropGroupPromise](const TFuture<TGenericResult>& future) mutable { | ||
dropGroupPromise.SetValue(future.GetValue()); | ||
} | ||
); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -908,15 +908,133 @@ class TKqpGatewayProxy : public IKikimrGateway { | |
} | ||
|
||
TFuture<TGenericResult> CreateGroup(const TString& cluster, const TCreateGroupSettings& settings) override { | ||
FORWARD_ENSURE_NO_PREPARE(CreateGroup, cluster, settings); | ||
CHECK_PREPARED_DDL(CreateGroup); | ||
|
||
auto createGroupPromise = NewPromise<TGenericResult>(); | ||
|
||
TString database; | ||
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { | ||
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name")); | ||
} | ||
|
||
NKikimrSchemeOp::TModifyScheme schemeTx; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With these changes we get a new code that does the same things as in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not a strict requirement. I think we need to see if it will bring us a profit. Maybe in this particular cases protos are very simple. If so, maybe it would be better to leave this code in both gateways. |
||
schemeTx.SetWorkingDir(database); | ||
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin); | ||
auto& createGroup = *schemeTx.MutableAlterLogin()->MutableCreateGroup(); | ||
createGroup.SetGroup(settings.GroupName); | ||
|
||
if (IsPrepare()) { | ||
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); | ||
auto& phyTx = *phyQuery.AddTransactions(); | ||
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); | ||
|
||
phyTx.MutableSchemeOperation()->MutableCreateGroup()->Swap(&schemeTx); | ||
|
||
if (settings.Roles.size()) { | ||
AddUsersToGroup(database, settings.GroupName, settings.Roles, NYql::TAlterGroupSettings::EAction::AddRoles); | ||
} | ||
|
||
TGenericResult result; | ||
result.SetSuccess(); | ||
createGroupPromise.SetValue(result); | ||
} else { | ||
return Gateway->CreateGroup(cluster, settings); | ||
} | ||
|
||
return createGroupPromise.GetFuture(); | ||
} | ||
|
||
TFuture<TGenericResult> AlterGroup(const TString& cluster, TAlterGroupSettings& settings) override { | ||
FORWARD_ENSURE_NO_PREPARE(AlterGroup, cluster, settings); | ||
CHECK_PREPARED_DDL(UpdateGroup); | ||
|
||
auto alterGroupPromise = NewPromise<TGenericResult>(); | ||
|
||
TString database; | ||
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { | ||
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name")); | ||
} | ||
|
||
if (!settings.Roles.size()) { | ||
return MakeFuture(ResultFromError<TGenericResult>("No roles given for AlterGroup request")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't see the same check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
if (IsPrepare()) { | ||
AddUsersToGroup(database, settings.GroupName, settings.Roles, settings.Action); | ||
|
||
TGenericResult result; | ||
result.SetSuccess(); | ||
alterGroupPromise.SetValue(result); | ||
} else { | ||
return Gateway->AlterGroup(cluster, settings); | ||
} | ||
|
||
return alterGroupPromise.GetFuture(); | ||
} | ||
|
||
TFuture<TGenericResult> RenameGroup(const TString& cluster, TRenameGroupSettings& settings) override { | ||
CHECK_PREPARED_DDL(RenameGroup); | ||
|
||
auto renameGroupPromise = NewPromise<TGenericResult>(); | ||
|
||
TString database; | ||
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { | ||
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name")); | ||
} | ||
|
||
NKikimrSchemeOp::TModifyScheme schemeTx; | ||
schemeTx.SetWorkingDir(database); | ||
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin); | ||
auto& renameGroup = *schemeTx.MutableAlterLogin()->MutableRenameGroup(); | ||
renameGroup.SetGroup(settings.GroupName); | ||
renameGroup.SetNewName(settings.NewName); | ||
|
||
if (IsPrepare()) { | ||
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); | ||
auto& phyTx = *phyQuery.AddTransactions(); | ||
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); | ||
|
||
phyTx.MutableSchemeOperation()->MutableRenameGroup()->Swap(&schemeTx); | ||
TGenericResult result; | ||
result.SetSuccess(); | ||
renameGroupPromise.SetValue(result); | ||
} else { | ||
return Gateway->RenameGroup(cluster, settings); | ||
} | ||
|
||
return renameGroupPromise.GetFuture(); | ||
} | ||
|
||
TFuture<TGenericResult> DropGroup(const TString& cluster, const TDropGroupSettings& settings) override { | ||
FORWARD_ENSURE_NO_PREPARE(DropGroup, cluster, settings); | ||
CHECK_PREPARED_DDL(DropGroup); | ||
|
||
auto dropGroupPromise = NewPromise<TGenericResult>(); | ||
|
||
TString database; | ||
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { | ||
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name")); | ||
} | ||
|
||
NKikimrSchemeOp::TModifyScheme schemeTx; | ||
schemeTx.SetWorkingDir(database); | ||
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin); | ||
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup(); | ||
dropGroup.SetGroup(settings.GroupName); | ||
dropGroup.SetMissingOk(settings.Force); | ||
|
||
if (IsPrepare()) { | ||
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); | ||
auto& phyTx = *phyQuery.AddTransactions(); | ||
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); | ||
|
||
phyTx.MutableSchemeOperation()->MutableAlterUser()->Swap(&schemeTx); | ||
TGenericResult result; | ||
result.SetSuccess(); | ||
dropGroupPromise.SetValue(result); | ||
} else { | ||
return Gateway->DropGroup(cluster, settings); | ||
} | ||
|
||
return dropGroupPromise.GetFuture(); | ||
} | ||
|
||
TFuture<TGenericResult> CreateColumnTable(TKikimrTableMetadataPtr metadata, bool createDir) override { | ||
|
@@ -1000,6 +1118,35 @@ class TKqpGatewayProxy : public IKikimrGateway { | |
return Gateway->GetDomainName(); | ||
} | ||
|
||
void AddUsersToGroup(const TString& database, const TString& group, const std::vector<TString>& roles, const NYql::TAlterGroupSettings::EAction& action) { | ||
for (const auto& role : roles) { | ||
NKikimrSchemeOp::TModifyScheme schemeTx; | ||
schemeTx.SetWorkingDir(database); | ||
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin); | ||
|
||
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); | ||
auto& phyTx = *phyQuery.AddTransactions(); | ||
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); | ||
|
||
switch (action) { | ||
case NYql::TAlterGroupSettings::EAction::AddRoles: { | ||
auto& alterGroup = *schemeTx.MutableAlterLogin()->MutableAddGroupMembership(); | ||
alterGroup.SetGroup(group); | ||
alterGroup.SetMember(role); | ||
phyTx.MutableSchemeOperation()->MutableAddGroupMembership()->Swap(&schemeTx); | ||
break; | ||
} | ||
case NYql::TAlterGroupSettings::EAction::RemoveRoles: { | ||
auto& alterGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroupMembership(); | ||
alterGroup.SetGroup(group); | ||
alterGroup.SetMember(role); | ||
phyTx.MutableSchemeOperation()->MutableRemoveGroupMembership()->Swap(&schemeTx); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
private: | ||
TIntrusivePtr<IKqpGateway> Gateway; | ||
TIntrusivePtr<TKikimrSessionContext> SessionCtx; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you put SUCCESS status here or maybe infer it from response.GetIssues()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition in case is already a status of response
This changes in ExecComplete status, so it means that result is success