Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIKIMR-20079: create/alter/drop group #501

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,49 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterUser: {
auto modifyScheme = schemeOp.GetAlterUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropUser: {
auto modifyScheme = schemeOp.GetDropUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kCreateGroup: {
auto modifyScheme = schemeOp.GetCreateGroup();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAddGroupMembership: {
auto modifyScheme = schemeOp.GetAddGroupMembership();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kRemoveGroupMembership: {
auto modifyScheme = schemeOp.GetRemoveGroupMembership();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kRenameGroup: {
auto modifyScheme = schemeOp.GetRenameGroup();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropGroup: {
auto modifyScheme = schemeOp.GetDropGroup();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/kqp/gateway/actors/scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,17 @@ class TSchemeOpRequestHandler: public TRequestHandlerBase<
{
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, "Successful completion of scheme request"
<< ", TxId: " << response.GetTxId());

TResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));

if (!response.GetIssues().empty()) {
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetIssues(), issues);
Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::SUCCESS, "", issues));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you put SUCCESS status here or maybe infer it from response.GetIssues()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Condition in case is already a status of response
This changes in ExecComplete status, so it means that result is success

} else {
TResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));
}

this->Die(ctx);
return;
}
Expand Down
53 changes: 44 additions & 9 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,47 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> RenameGroup(const TString& cluster, NYql::TRenameGroupSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}

TString database;
if (!GetDatabaseForLoginOperation(database)) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

TPromise<TGenericResult> renameGroupPromise = NewPromise<TGenericResult>();

auto ev = MakeHolder<TRequest>();
ev->Record.SetDatabaseName(database);
if (UserToken) {
ev->Record.SetUserToken(UserToken->GetSerializedToken());
}
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& renameGroup = *schemeTx.MutableAlterLogin()->MutableRenameGroup();

renameGroup.SetGroup(settings.GroupName);
renameGroup.SetNewName(settings.NewName);

SendSchemeRequest(ev.Release()).Apply(
[renameGroupPromise](const TFuture<TGenericResult>& future) mutable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe [&renameGroupPromise]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renameGroupPromise is local variable. So it will be error to capture it by reference. Promises have reference counting, it is safe to copy renameGroupPromise into lambda.

renameGroupPromise.SetValue(future.GetValue());
}
);

return renameGroupPromise.GetFuture();
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> DropGroup(const TString& cluster, const NYql::TDropGroupSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

Expand All @@ -1798,17 +1839,11 @@ class TKikimrIcGateway : public IKqpGateway {
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();

dropGroup.SetGroup(settings.GroupName);
dropGroup.SetMissingOk(settings.Force);

SendSchemeRequest(ev.Release()).Apply(
[dropGroupPromise, &settings](const TFuture<TGenericResult>& future) mutable {
const auto& realResult = future.GetValue();
if (!realResult.Success() && realResult.Status() == TIssuesIds::DEFAULT_ERROR && settings.Force) {
IKqpGateway::TGenericResult fakeResult;
fakeResult.SetSuccess();
dropGroupPromise.SetValue(std::move(fakeResult));
} else {
dropGroupPromise.SetValue(realResult);
}
[dropGroupPromise](const TFuture<TGenericResult>& future) mutable {
dropGroupPromise.SetValue(future.GetValue());
}
);

Expand Down
153 changes: 150 additions & 3 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,15 +908,133 @@ class TKqpGatewayProxy : public IKikimrGateway {
}

TFuture<TGenericResult> CreateGroup(const TString& cluster, const TCreateGroupSettings& settings) override {
FORWARD_ENSURE_NO_PREPARE(CreateGroup, cluster, settings);
CHECK_PREPARED_DDL(CreateGroup);

auto createGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

NKikimrSchemeOp::TModifyScheme schemeTx;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these changes we get a new code that does the same things as in kqp_ic_gateway.cpp. Can we make some library with helpers that fill TModifyScheme proto and is used by both gateways?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a strict requirement. I think we need to see if it will bring us a profit. Maybe in this particular cases protos are very simple. If so, maybe it would be better to leave this code in both gateways.

schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& createGroup = *schemeTx.MutableAlterLogin()->MutableCreateGroup();
createGroup.SetGroup(settings.GroupName);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

phyTx.MutableSchemeOperation()->MutableCreateGroup()->Swap(&schemeTx);

if (settings.Roles.size()) {
AddUsersToGroup(database, settings.GroupName, settings.Roles, NYql::TAlterGroupSettings::EAction::AddRoles);
}

TGenericResult result;
result.SetSuccess();
createGroupPromise.SetValue(result);
} else {
return Gateway->CreateGroup(cluster, settings);
}

return createGroupPromise.GetFuture();
}

TFuture<TGenericResult> AlterGroup(const TString& cluster, TAlterGroupSettings& settings) override {
FORWARD_ENSURE_NO_PREPARE(AlterGroup, cluster, settings);
CHECK_PREPARED_DDL(UpdateGroup);

auto alterGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

if (!settings.Roles.size()) {
return MakeFuture(ResultFromError<TGenericResult>("No roles given for AlterGroup request"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see the same check for CreateGroup. I think they should be equivalent in this case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateGroup can be without any users, but before calling AddUsersToGroup I check that settings.Roles.size() is not 0
https://github.com/ydb-platform/ydb/pull/501/files#diff-fb95cf18d73802eec5aabc3807b82c4e98f0b3cfae62523917dab93336b0c5ebR933

}

if (IsPrepare()) {
AddUsersToGroup(database, settings.GroupName, settings.Roles, settings.Action);

TGenericResult result;
result.SetSuccess();
alterGroupPromise.SetValue(result);
} else {
return Gateway->AlterGroup(cluster, settings);
}

return alterGroupPromise.GetFuture();
}

TFuture<TGenericResult> RenameGroup(const TString& cluster, TRenameGroupSettings& settings) override {
CHECK_PREPARED_DDL(RenameGroup);

auto renameGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& renameGroup = *schemeTx.MutableAlterLogin()->MutableRenameGroup();
renameGroup.SetGroup(settings.GroupName);
renameGroup.SetNewName(settings.NewName);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

phyTx.MutableSchemeOperation()->MutableRenameGroup()->Swap(&schemeTx);
TGenericResult result;
result.SetSuccess();
renameGroupPromise.SetValue(result);
} else {
return Gateway->RenameGroup(cluster, settings);
}

return renameGroupPromise.GetFuture();
}

TFuture<TGenericResult> DropGroup(const TString& cluster, const TDropGroupSettings& settings) override {
FORWARD_ENSURE_NO_PREPARE(DropGroup, cluster, settings);
CHECK_PREPARED_DDL(DropGroup);

auto dropGroupPromise = NewPromise<TGenericResult>();

TString database;
if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) {
return MakeFuture(ResultFromError<TGenericResult>("Couldn't get domain name"));
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();
dropGroup.SetGroup(settings.GroupName);
dropGroup.SetMissingOk(settings.Force);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

phyTx.MutableSchemeOperation()->MutableAlterUser()->Swap(&schemeTx);
TGenericResult result;
result.SetSuccess();
dropGroupPromise.SetValue(result);
} else {
return Gateway->DropGroup(cluster, settings);
}

return dropGroupPromise.GetFuture();
}

TFuture<TGenericResult> CreateColumnTable(TKikimrTableMetadataPtr metadata, bool createDir) override {
Expand Down Expand Up @@ -1000,6 +1118,35 @@ class TKqpGatewayProxy : public IKikimrGateway {
return Gateway->GetDomainName();
}

void AddUsersToGroup(const TString& database, const TString& group, const std::vector<TString>& roles, const NYql::TAlterGroupSettings::EAction& action) {
for (const auto& role : roles) {
NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(database);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);

auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);

switch (action) {
case NYql::TAlterGroupSettings::EAction::AddRoles: {
auto& alterGroup = *schemeTx.MutableAlterLogin()->MutableAddGroupMembership();
alterGroup.SetGroup(group);
alterGroup.SetMember(role);
phyTx.MutableSchemeOperation()->MutableAddGroupMembership()->Swap(&schemeTx);
break;
}
case NYql::TAlterGroupSettings::EAction::RemoveRoles: {
auto& alterGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroupMembership();
alterGroup.SetGroup(group);
alterGroup.SetMember(role);
phyTx.MutableSchemeOperation()->MutableRemoveGroupMembership()->Swap(&schemeTx);
break;
}
}
}
}

private:
TIntrusivePtr<IKqpGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Error;
}

TStatus HandleRenameGroup(TKiRenameGroup node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "RenameGroup is not yet implemented for intent determination transformer"));
return TStatus::Error;
}

TStatus HandleDropGroup(TKiDropGroup node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "DropGroup is not yet implemented for intent determination transformer"));
Expand Down Expand Up @@ -466,6 +472,7 @@ class TKikimrDataSink : public TDataProviderBase
|| node.IsCallable(TKiDropUser::CallableName())
|| node.IsCallable(TKiCreateGroup::CallableName())
|| node.IsCallable(TKiAlterGroup::CallableName())
|| node.IsCallable(TKiRenameGroup::CallableName())
|| node.IsCallable(TKiDropGroup::CallableName())
|| node.IsCallable(TKiUpsertObject::CallableName())
|| node.IsCallable(TKiCreateObject::CallableName())
Expand Down Expand Up @@ -916,6 +923,7 @@ class TKikimrDataSink : public TDataProviderBase
.World(node->Child(0))
.DataSink(node->Child(1))
.GroupName().Build(key.GetRoleName())
.Roles(settings.Roles.Cast())
.Done()
.Ptr();
} else if (mode == "addUsersToGroup" || mode == "dropUsersFromGroup") {
Expand All @@ -927,6 +935,14 @@ class TKikimrDataSink : public TDataProviderBase
.Roles(settings.Roles.Cast())
.Done()
.Ptr();
} else if (mode == "renameGroup") {
return Build<TKiRenameGroup>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.GroupName().Build(key.GetRoleName())
.NewName(settings.NewName.Cast())
.Done()
.Ptr();
} else if (mode == "dropGroup") {
return Build<TKiDropGroup>(ctx, node->Pos())
.World(node->Child(0))
Expand Down Expand Up @@ -1103,6 +1119,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleAlterGroup(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiRenameGroup>(input)) {
return HandleRenameGroup(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiDropGroup>(input)) {
return HandleDropGroup(node.Cast(), ctx);
}
Expand Down
Loading
Loading