Skip to content

Commit

Permalink
Enhance storage registry with virtual views and regular expressions. (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored and root committed Apr 20, 2021
1 parent 8a74120 commit 374d671
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 123 deletions.
9 changes: 9 additions & 0 deletions changelog/unreleased/storage-registry-refactor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Enhancement: Enhance storage registry with virtual views and regular expressions

Add the functionality to the storage registry service to handle user requests
for references which can span across multiple storage providers, particularly
useful for cases where directories are sharded across providers or virtual views
are expected.

https://github.com/cs3org/cs3apis/pull/116
https://github.com/cs3org/reva/pull/1570
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20210316113645-e4a74cb8761c
github.com/cs3org/go-cs3apis v0.0.0-20210322124405-872bbbf14d0b
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
github.com/ffurano/grpc-proto v0.0.0-20210312134900-65801a1ca184
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20210316113645-e4a74cb8761c h1:2vcWjiaFkJMhMZHeTbkkXWwhhAOTAIKpul8yjAo95UU=
github.com/cs3org/go-cs3apis v0.0.0-20210316113645-e4a74cb8761c/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20210322124405-872bbbf14d0b h1:80DK9Yufaj1YJ0fPb6x1WZfijHWA+CMstq3MEZs/8To=
github.com/cs3org/go-cs3apis v0.0.0-20210322124405-872bbbf14d0b/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
210 changes: 168 additions & 42 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"path"
"strings"
"sync"
"time"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
Expand All @@ -35,10 +36,9 @@ import (
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/storage/utils/etag"
"github.com/cs3org/reva/pkg/storage/utils/templates"
"github.com/cs3org/reva/pkg/user"
"github.com/cs3org/reva/pkg/utils"
"github.com/dgrijalva/jwt-go"
"github.com/google/uuid"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -193,6 +193,7 @@ func (s *svc) getHome(_ context.Context) string {
// TODO(labkode): issue #601, /home will be hardcoded.
return "/home"
}

func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*gateway.InitiateFileDownloadResponse, error) {
log := appctx.GetLogger(ctx)
p, st := s.getPath(ctx, req.Ref)
Expand Down Expand Up @@ -366,6 +367,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi
}

func (s *svc) initiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*gateway.InitiateFileDownloadResponse, error) {
// TODO(ishank011): enable downloading references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
if err != nil {
return &gateway.InitiateFileDownloadResponse{
Expand Down Expand Up @@ -857,6 +859,7 @@ func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provide
}

func (s *svc) delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) {
// TODO(ishank011): enable deleting references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.DeleteResponse{
Expand Down Expand Up @@ -974,19 +977,20 @@ func (s *svc) Move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
}

func (s *svc) move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) {
srcP, err := s.findProvider(ctx, req.Source)
srcList, err := s.findProviders(ctx, req.Source)
if err != nil {
return &provider.MoveResponse{
Status: status.NewStatusFromErrType(ctx, "move src="+req.Source.String(), err),
}, nil
}

dstP, err := s.findProvider(ctx, req.Destination)
dstList, err := s.findProviders(ctx, req.Destination)
if err != nil {
return &provider.MoveResponse{
Status: status.NewStatusFromErrType(ctx, "move dst="+req.Destination.String(), err),
}, nil
}
srcP, dstP := srcList[0], dstList[0]

// if providers are not the same we do not implement cross storage copy yet.
if srcP.Address != dstP.Address {
Expand All @@ -1007,6 +1011,7 @@ func (s *svc) move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
}

func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitraryMetadataRequest) (*provider.SetArbitraryMetadataResponse, error) {
// TODO(ishank011): enable for references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.SetArbitraryMetadataResponse{
Expand All @@ -1023,6 +1028,7 @@ func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitra
}

func (s *svc) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArbitraryMetadataRequest) (*provider.UnsetArbitraryMetadataResponse, error) {
// TODO(ishank011): enable for references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.UnsetArbitraryMetadataResponse{
Expand Down Expand Up @@ -1142,14 +1148,89 @@ func (s *svc) statSharesFolder(ctx context.Context) (*provider.StatResponse, err
}

func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
c, err := s.find(ctx, req.Ref)
providers, err := s.findProviders(ctx, req.Ref)
if err != nil {
return &provider.StatResponse{
Status: status.NewStatusFromErrType(ctx, "stat ref="+req.Ref.String(), err),
Status: status.NewStatusFromErrType(ctx, "stat ref: "+req.Ref.String(), err),
}, nil
}

return c.Stat(ctx, req)
resPath := req.Ref.GetPath()
if len(providers) == 1 && (resPath == "" || strings.HasPrefix(resPath, providers[0].ProviderPath)) {
c, err := s.getStorageProviderClient(ctx, providers[0])
if err != nil {
return &provider.StatResponse{
Status: status.NewInternal(ctx, err, "error connecting to storage provider="+providers[0].Address),
}, nil
}
return c.Stat(ctx, req)
}

infoFromProviders := make([]*provider.ResourceInfo, len(providers))
errors := make([]error, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.statOnProvider(ctx, req, infoFromProviders[i], p, &errors[i], &wg)
}
wg.Wait()

var totalSize uint64
for i := range providers {
if errors[i] != nil {
return &provider.StatResponse{
Status: status.NewStatusFromErrType(ctx, "stat ref: "+req.Ref.String(), errors[i]),
}, nil
}
if infoFromProviders[i] != nil {
totalSize += infoFromProviders[i].Size
}
}

// TODO(ishank011): aggregrate other properties for references spread across storage providers, eg. /eos
return &provider.StatResponse{
Status: status.NewOK(ctx),
Info: &provider.ResourceInfo{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: resPath,
Size: totalSize,
},
}, nil
}

func (s *svc) statOnProvider(ctx context.Context, req *provider.StatRequest, res *provider.ResourceInfo, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

resPath := path.Clean(req.Ref.GetPath())
newPath := req.Ref.GetPath()
if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) {
newPath = p.ProviderPath
}
r, err := c.Stat(ctx, &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: newPath,
},
},
})
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
return
}
if res == nil {
res = &provider.ResourceInfo{}
}
*res = *r.Info
}

func (s *svc) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
Expand Down Expand Up @@ -1454,19 +1535,88 @@ func (s *svc) listSharesFolder(ctx context.Context) (*provider.ListContainerResp
}

func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
c, err := s.find(ctx, req.Ref)
providers, err := s.findProviders(ctx, req.Ref)
if err != nil {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref="+req.Ref.String(), err),
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), err),
}, nil
}

res, err := c.ListContainer(ctx, req)
resPath := path.Clean(req.Ref.GetPath())
infoFromProviders := make([][]*provider.ResourceInfo, len(providers))
errors := make([]error, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.listContainerOnProvider(ctx, req, &infoFromProviders[i], p, &errors[i], &wg)
}
wg.Wait()

infos := []*provider.ResourceInfo{}
indirects := make(map[string][]*provider.ResourceInfo)
for i := range providers {
if errors[i] != nil {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), errors[i]),
}, nil
}
for _, inf := range infoFromProviders[i] {
if parent := path.Dir(inf.Path); resPath != "" && resPath != parent {
parts := strings.Split(strings.TrimPrefix(inf.Path, resPath), "/")
p := path.Join(resPath, parts[1])
indirects[p] = append(indirects[p], inf)
} else {
infos = append(infos, inf)
}
}
}

for k, v := range indirects {
inf := &provider.ResourceInfo{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Etag: etag.GenerateEtagFromResources(nil, v),
Path: k,
Size: 0,
}
infos = append(infos, inf)
}

return &provider.ListContainerResponse{
Status: status.NewOK(ctx),
Infos: infos,
}, nil
}

func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListContainerRequest, res *[]*provider.ResourceInfo, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling ListContainer")
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

return res, nil
resPath := path.Clean(req.Ref.GetPath())
newPath := req.Ref.GetPath()
if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) {
newPath = p.ProviderPath
}
r, err := c.ListContainer(ctx, &provider.ListContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: newPath,
},
},
})
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
return
}
*res = r.Infos
}

func (s *svc) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
Expand Down Expand Up @@ -1888,11 +2038,11 @@ func (s *svc) findByPath(ctx context.Context, path string) (provider.ProviderAPI
}

func (s *svc) find(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, error) {
p, err := s.findProvider(ctx, ref)
p, err := s.findProviders(ctx, ref)
if err != nil {
return nil, err
}
return s.getStorageProviderClient(ctx, p)
return s.getStorageProviderClient(ctx, p[0])
}

func (s *svc) getStorageProviderClient(_ context.Context, p *registry.ProviderInfo) (provider.ProviderAPIClient, error) {
Expand All @@ -1905,37 +2055,13 @@ func (s *svc) getStorageProviderClient(_ context.Context, p *registry.ProviderIn
return c, nil
}

func (s *svc) findProvider(ctx context.Context, ref *provider.Reference) (*registry.ProviderInfo, error) {
home := s.getHome(ctx)
if strings.HasPrefix(ref.GetPath(), home) && s.c.HomeMapping != "" {
if u, ok := user.ContextGetUser(ctx); ok {
layout := templates.WithUser(u, s.c.HomeMapping)
newRef := &provider.Reference{
Spec: &provider.Reference_Path{
Path: path.Join(layout, strings.TrimPrefix(ref.GetPath(), home)),
},
}
res, err := s.getStorageProvider(ctx, newRef)
if err != nil {
// if we get a NotFound error, default to the original reference
if _, ok := err.(errtypes.IsNotFound); !ok {
return nil, err
}
} else {
return res, nil
}
}
}
return s.getStorageProvider(ctx, ref)
}

func (s *svc) getStorageProvider(ctx context.Context, ref *provider.Reference) (*registry.ProviderInfo, error) {
func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*registry.ProviderInfo, error) {
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}

res, err := c.GetStorageProvider(ctx, &registry.GetStorageProviderRequest{
res, err := c.GetStorageProviders(ctx, &registry.GetStorageProvidersRequest{
Ref: ref,
})

Expand All @@ -1958,11 +2084,11 @@ func (s *svc) getStorageProvider(ctx context.Context, ref *provider.Reference) (
}
}

if res.Provider == nil {
if res.Providers == nil {
return nil, errors.New("gateway: provider is nil")
}

return res.Provider, nil
return res.Providers, nil
}

type etagWithTS struct {
Expand Down
Loading

0 comments on commit 374d671

Please sign in to comment.