diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index 21fbcb2a3ea..14065545b86 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -115,37 +115,118 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) { log := appctx.GetLogger(ctx) - // TODO: needs to be fixed var id *provider.StorageSpaceId for _, f := range req.Filters { if f.Type == provider.ListStorageSpacesRequest_Filter_TYPE_ID { id = f.GetId() } } - parts := strings.SplitN(id.OpaqueId, "!", 2) - if len(parts) != 2 { - return &provider.ListStorageSpacesResponse{ - Status: status.NewInvalidArg(ctx, "space id must be separated by !"), - }, nil + + var providers []*registry.ProviderInfo + var err error + c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint) + if err != nil { + return nil, errors.Wrap(err, "gateway: error getting storage registry client") } - c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{ - StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid - OpaqueId: parts[1], - }}) + + if id != nil { + // query that specific story provider + parts := strings.SplitN(id.OpaqueId, "!", 2) + if len(parts) != 2 { + return &provider.ListStorageSpacesResponse{ + Status: status.NewInvalidArg(ctx, "space id must be separated by !"), + }, nil + } + res, err := c.GetStorageProviders(ctx, ®istry.GetStorageProvidersRequest{ + Ref: &provider.Reference{ResourceId: &provider.ResourceId{ + StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid + OpaqueId: parts[1], + }}, + }) + if err != nil { + return &provider.ListStorageSpacesResponse{ + Status: status.NewStatusFromErrType(ctx, "ListStorageSpaces filters: req "+req.String(), err), + }, nil + } + if res.Status.Code != rpc.Code_CODE_OK { + return &provider.ListStorageSpacesResponse{ + Status: res.Status, + }, nil + } + providers = res.Providers + } else { + // get list of all storage providers + res, err := c.ListStorageProviders(ctx, ®istry.ListStorageProvidersRequest{}) + + if err != nil { + return &provider.ListStorageSpacesResponse{ + Status: status.NewStatusFromErrType(ctx, "error listing providers", err), + }, nil + } + if res.Status.Code != rpc.Code_CODE_OK { + return &provider.ListStorageSpacesResponse{ + Status: res.Status, + }, nil + } + + providers = []*registry.ProviderInfo{} + // FIXME filter only providers that have an id set ... currently none have? + // bug? only ProviderPath is set + for i := range res.Providers { + // use only providers whose path does not start with a /? + if strings.HasPrefix(res.Providers[i].ProviderPath, "/") { + continue + } + providers = append(providers, res.Providers[i]) + } + } + + spacesFromProviders := make([][]*provider.StorageSpace, len(providers)) + errors := make([]error, len(providers)) + var wg sync.WaitGroup + + for i, p := range providers { + wg.Add(1) + go s.listStorageSpacesOnProvider(ctx, req, &spacesFromProviders[i], p, &errors[i], &wg) + } + wg.Wait() + + uniqueSpaces := map[string]*provider.StorageSpace{} + for i := range providers { + if errors[i] != nil { + log.Debug().Err(errors[i]).Msg("skipping provider") + continue + } + for j := range spacesFromProviders[i] { + uniqueSpaces[spacesFromProviders[i][j].Id.OpaqueId] = spacesFromProviders[i][j] + } + } + spaces := []*provider.StorageSpace{} + for spaceID := range uniqueSpaces { + spaces = append(spaces, uniqueSpaces[spaceID]) + } + + return &provider.ListStorageSpacesResponse{ + Status: status.NewOK(ctx), + StorageSpaces: spaces, + }, nil +} + +func (s *svc) listStorageSpacesOnProvider(ctx context.Context, req *provider.ListStorageSpacesRequest, res *[]*provider.StorageSpace, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) { + defer wg.Done() + c, err := s.getStorageProviderClient(ctx, p) if err != nil { - return &provider.ListStorageSpacesResponse{ - Status: status.NewStatusFromErrType(ctx, "error finding path", err), - }, nil + *e = errors.Wrap(err, "error connecting to storage provider="+p.Address) + return } - res, err := c.ListStorageSpaces(ctx, req) + r, err := c.ListStorageSpaces(ctx, req) if err != nil { - log.Err(err).Msg("gateway: error listing storage space on storage provider") - return &provider.ListStorageSpacesResponse{ - Status: status.NewInternal(ctx, err, "error calling ListStorageSpaces"), - }, nil + *e = errors.Wrap(err, "gateway: error calling ListStorageSpaces") + return } - return res, nil + + *res = r.StorageSpaces } func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) { diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index b3ab295260c..d3b435a196c 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -451,6 +451,8 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora st = status.NewNotFound(ctx, "not found when listing spaces") case errtypes.PermissionDenied: st = status.NewPermissionDenied(ctx, err, "permission denied") + case errtypes.NotSupported: + st = status.NewUnimplemented(ctx, err, "not implemented") default: st = status.NewInternal(ctx, err, "error listing spaces") }