Skip to content

Commit

Permalink
list spaces from all providers
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Jun 17, 2021
1 parent b82b779 commit 14827d9
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 19 deletions.
119 changes: 100 additions & 19 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,37 +115,118 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
// TODO: needs to be fixed
var id *provider.StorageSpaceId
for _, f := range req.Filters {
if f.Type == provider.ListStorageSpacesRequest_Filter_TYPE_ID {
id = f.GetId()
}
}
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil

var providers []*registry.ProviderInfo
var err error
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid
OpaqueId: parts[1],
}})

if id != nil {
// query that specific story provider
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil
}
res, err := c.GetStorageProviders(ctx, &registry.GetStorageProvidersRequest{
Ref: &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid
OpaqueId: parts[1],
}},
})
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "ListStorageSpaces filters: req "+req.String(), err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}
providers = res.Providers
} else {
// get list of all storage providers
res, err := c.ListStorageProviders(ctx, &registry.ListStorageProvidersRequest{})

if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error listing providers", err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}

providers = []*registry.ProviderInfo{}
// FIXME filter only providers that have an id set ... currently none have?
// bug? only ProviderPath is set
for i := range res.Providers {
// use only providers whose path does not start with a /?
if strings.HasPrefix(res.Providers[i].ProviderPath, "/") {
continue
}
providers = append(providers, res.Providers[i])
}
}

spacesFromProviders := make([][]*provider.StorageSpace, len(providers))
errors := make([]error, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.listStorageSpacesOnProvider(ctx, req, &spacesFromProviders[i], p, &errors[i], &wg)
}
wg.Wait()

uniqueSpaces := map[string]*provider.StorageSpace{}
for i := range providers {
if errors[i] != nil {
log.Debug().Err(errors[i]).Msg("skipping provider")
continue
}
for j := range spacesFromProviders[i] {
uniqueSpaces[spacesFromProviders[i][j].Id.OpaqueId] = spacesFromProviders[i][j]
}
}
spaces := []*provider.StorageSpace{}
for spaceID := range uniqueSpaces {
spaces = append(spaces, uniqueSpaces[spaceID])
}

return &provider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}

func (s *svc) listStorageSpacesOnProvider(ctx context.Context, req *provider.ListStorageSpacesRequest, res *[]*provider.StorageSpace, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error finding path", err),
}, nil
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

res, err := c.ListStorageSpaces(ctx, req)
r, err := c.ListStorageSpaces(ctx, req)
if err != nil {
log.Err(err).Msg("gateway: error listing storage space on storage provider")
return &provider.ListStorageSpacesResponse{
Status: status.NewInternal(ctx, err, "error calling ListStorageSpaces"),
}, nil
*e = errors.Wrap(err, "gateway: error calling ListStorageSpaces")
return
}
return res, nil

*res = r.StorageSpaces
}

func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
st = status.NewNotFound(ctx, "not found when listing spaces")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
case errtypes.NotSupported:
st = status.NewUnimplemented(ctx, err, "not implemented")
default:
st = status.NewInternal(ctx, err, "error listing spaces")
}
Expand Down

0 comments on commit 14827d9

Please sign in to comment.