Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest user/group drivers: refactor and fix user type for lw accounts #3821

Merged
merged 6 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions changelog/unreleased/revamp-rest-used-group-drivers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Enhancement: Revamp user/group drivers and fix user type
for lightweight accounts

* Fix the user type for lightweight accounts, using the
source field to differentiate between a primary and lw account
* Remove all the code with manual parsing of the json returned
by the CERN provider
* Introduce pagination for `GetMembers` method in the group driver
* Reduced network transfer size by requesting only needed fields for `GetMembers` method

https://github.com/cs3org/reva/pull/3821
8 changes: 0 additions & 8 deletions pkg/cbox/group/rest/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,6 @@ func (m *manager) getVal(key string) (string, error) {
return "", errors.New("rest: unable to get connection from redis pool")
}

func (m *manager) fetchCachedInternalID(gid *grouppb.GroupId) (string, error) {
return m.getVal(groupPrefix + groupInternalIDPrefix + gid.OpaqueId)
}

func (m *manager) cacheInternalID(gid *grouppb.GroupId, internalID string) error {
return m.setVal(groupPrefix+groupInternalIDPrefix+gid.OpaqueId, internalID, -1)
}

func (m *manager) findCachedGroups(query string) ([]*grouppb.Group, error) {
conn := m.redisPool.Get()
defer conn.Close()
Expand Down
140 changes: 65 additions & 75 deletions pkg/cbox/group/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package rest

import (
"context"
"errors"
"fmt"
"net/url"
"os"
"os/signal"
"strings"
Expand All @@ -31,9 +31,11 @@ import (
grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
user "github.com/cs3org/reva/pkg/cbox/user/rest"
utils "github.com/cs3org/reva/pkg/cbox/utils"
"github.com/cs3org/reva/pkg/group"
"github.com/cs3org/reva/pkg/group/manager/registry"
"github.com/cs3org/reva/pkg/utils/list"
"github.com/gomodule/redigo/redis"
"github.com/mitchellh/mapstructure"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -126,12 +128,12 @@ func New(m map[string]interface{}) (group.Manager, error) {
redisPool: redisPool,
apiTokenManager: apiTokenManager,
}
go mgr.fetchAllGroups()
go mgr.fetchAllGroups(context.Background())
return mgr, nil
}

func (m *manager) fetchAllGroups() {
_ = m.fetchAllGroupAccounts()
func (m *manager) fetchAllGroups(ctx context.Context) {
_ = m.fetchAllGroupAccounts(ctx)
ticker := time.NewTicker(time.Duration(m.conf.GroupFetchInterval) * time.Second)
work := make(chan os.Signal, 1)
signal.Notify(work, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT)
Expand All @@ -141,89 +143,76 @@ func (m *manager) fetchAllGroups() {
case <-work:
return
case <-ticker.C:
_ = m.fetchAllGroupAccounts()
_ = m.fetchAllGroupAccounts(ctx)
}
}
}

func (m *manager) fetchAllGroupAccounts() error {
ctx := context.Background()
// Group contains the information about a group.
type Group struct {
GroupIdentifier string `json:"groupIdentifier"`
DisplayName string `json:"displayName"`
GID int `json:"gid,omitempty"`
IsComputingGroup bool `json:"isComputingGroup"`
}

// GroupsResponse contains the expected response from grappa
// when getting the list of groups.
type GroupsResponse struct {
Pagination struct {
Links struct {
Next *string `json:"next"`
} `json:"links"`
} `json:"pagination"`
Data []*Group `json:"data"`
}

func (m *manager) fetchAllGroupAccounts(ctx context.Context) error {
url := fmt.Sprintf("%s/api/v1.0/Group?field=groupIdentifier&field=displayName&field=gid&field=isComputingGroup", m.conf.APIBaseURL)

for url != "" {
result, err := m.apiTokenManager.SendAPIGetRequest(ctx, url, false)
if err != nil {
var r GroupsResponse
for {
if err := m.apiTokenManager.SendAPIGetRequest(ctx, url, false, &r); err != nil {
return err
}

responseData, ok := result["data"].([]interface{})
if !ok {
return errors.New("rest: error in type assertion")
}
for _, usr := range responseData {
groupData, ok := usr.(map[string]interface{})
if !ok {
continue
}

// filter computing groups
if is, ok := groupData["isComputingGroup"].(bool); ok && is {
for _, g := range r.Data {
if g.IsComputingGroup {
continue
}

_, err = m.parseAndCacheGroup(ctx, groupData)
if err != nil {
if _, err := m.parseAndCacheGroup(ctx, g); err != nil {
continue
}
}

url = ""
if pagination, ok := result["pagination"].(map[string]interface{}); ok {
if links, ok := pagination["links"].(map[string]interface{}); ok {
if next, ok := links["next"].(string); ok {
url = fmt.Sprintf("%s%s", m.conf.APIBaseURL, next)
}
}
if r.Pagination.Links.Next == nil {
break
}
url = fmt.Sprintf("%s%s", m.conf.APIBaseURL, *r.Pagination.Links.Next)
}

return nil
}

func (m *manager) parseAndCacheGroup(ctx context.Context, groupData map[string]interface{}) (*grouppb.Group, error) {
id, ok := groupData["groupIdentifier"].(string)
if !ok {
return nil, errors.New("rest: missing upn in user data")
}

name, _ := groupData["displayName"].(string)
func (m *manager) parseAndCacheGroup(ctx context.Context, g *Group) (*grouppb.Group, error) {
groupID := &grouppb.GroupId{
OpaqueId: id,
Idp: m.conf.IDProvider,
OpaqueId: g.GroupIdentifier,
}
gid, ok := groupData["gid"].(int64)
if !ok {
gid = 0
}
g := &grouppb.Group{

group := &grouppb.Group{
Id: groupID,
GroupName: id,
Mail: id + "@cern.ch",
DisplayName: name,
GidNumber: gid,
GroupName: g.GroupIdentifier,
Mail: g.GroupIdentifier + "@cern.ch",
DisplayName: g.DisplayName,
GidNumber: int64(g.GID),
}

if err := m.cacheGroupDetails(g); err != nil {
if err := m.cacheGroupDetails(group); err != nil {
log.Error().Err(err).Msg("rest: error caching group details")
}

if internalID, ok := groupData["id"].(string); ok {
if err := m.cacheInternalID(groupID, internalID); err != nil {
log.Error().Err(err).Msg("rest: error caching group details")
}
}

return g, nil
return group, nil
}

func (m *manager) GetGroup(ctx context.Context, gid *grouppb.GroupId, skipFetchingMembers bool) (*grouppb.Group, error) {
Expand Down Expand Up @@ -288,38 +277,39 @@ func (m *manager) GetMembers(ctx context.Context, gid *grouppb.GroupId) ([]*user
return users, nil
}

internalID, err := m.fetchCachedInternalID(gid)
if err != nil {
return nil, err
}
url := fmt.Sprintf("%s/api/v1.0/Group/%s/memberidentities/precomputed", m.conf.APIBaseURL, internalID)
result, err := m.apiTokenManager.SendAPIGetRequest(ctx, url, false)
url, err := url.JoinPath(m.conf.APIBaseURL, "/api/v1.0/Group", gid.OpaqueId, "/memberidentities/precomputed?limit=10&field=upn&field=primaryAccountEmail&field=displayName&field=uid&field=gid&field=type&field=source")
if err != nil {
return nil, err
}

userData := result["data"].([]interface{})
users = []*userpb.UserId{}

for _, u := range userData {
userInfo, ok := u.(map[string]interface{})
if !ok {
return nil, errors.New("rest: error in type assertion")
var r user.IdentitiesResponse
members := []*userpb.UserId{}
for {
if err := m.apiTokenManager.SendAPIGetRequest(ctx, url, false, &r); err != nil {
return nil, err
}
if id, ok := userInfo["upn"].(string); ok {
users = append(users, &userpb.UserId{OpaqueId: id, Idp: m.conf.IDProvider})

users := list.Map(r.Data, func(i *user.Identity) *userpb.UserId {
return &userpb.UserId{OpaqueId: i.Upn, Idp: m.conf.IDProvider, Type: i.UserType()}
})
members = append(members, users...)

if r.Pagination.Links.Next == nil {
break
}
url = fmt.Sprintf("%s%s", m.conf.APIBaseURL, *r.Pagination.Links.Next)
}

if err = m.cacheGroupMembers(gid, users); err != nil {
log := appctx.GetLogger(ctx)
log.Error().Err(err).Msg("rest: error caching group members")
if err = m.cacheGroupMembers(gid, members); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("rest: error caching group members")
}

return users, nil
}

func (m *manager) HasMember(ctx context.Context, gid *grouppb.GroupId, uid *userpb.UserId) (bool, error) {
// TODO (gdelmont): this can be improved storing the users a group is composed of as a list in redis
// and, instead of returning all the members, use the redis apis to check if the user is in the list.
groupMemers, err := m.GetMembers(ctx, gid)
if err != nil {
return false, err
Expand Down
Loading