From 2f51dc9aa6820de07fcc340acfc0aadd3f74f385 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Thu, 28 Oct 2021 11:24:26 +0200 Subject: [PATCH 1/2] Aggregrate resource info properties for virtual views --- changelog/unreleased/eos-virtual-views.md | 3 + .../grpc/services/gateway/storageprovider.go | 224 +++++++----------- .../storageprovider/storageprovider.go | 128 +++++++++- pkg/storage/utils/eosfs/eosfs.go | 1 + pkg/utils/utils.go | 8 + 5 files changed, 217 insertions(+), 147 deletions(-) create mode 100644 changelog/unreleased/eos-virtual-views.md diff --git a/changelog/unreleased/eos-virtual-views.md b/changelog/unreleased/eos-virtual-views.md new file mode 100644 index 0000000000..e6c590f708 --- /dev/null +++ b/changelog/unreleased/eos-virtual-views.md @@ -0,0 +1,3 @@ +Enhancement: Aggregrate resource info properties for virtual views + +https://github.com/cs3org/reva/pull/2215 \ No newline at end of file diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index da46ffbd8e..2d4b297769 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -23,7 +23,6 @@ import ( "fmt" "net/url" "path" - "path/filepath" "strings" "sync" "time" @@ -1283,6 +1282,7 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St Status: status.NewStatusFromErrType(ctx, "stat ref: "+req.Ref.String(), err), }, nil } + providers = getUniqueProviders(providers) resPath := req.Ref.GetPath() if len(providers) == 1 && (utils.IsRelativeReference(req.Ref) || resPath == "" || strings.HasPrefix(resPath, providers[0].ProviderPath)) { @@ -1303,69 +1303,43 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St } func (s *svc) statAcrossProviders(ctx context.Context, req *provider.StatRequest, providers []*registry.ProviderInfo) (*provider.StatResponse, error) { - log := appctx.GetLogger(ctx) - - infoFromProviders := make([]*provider.ResourceInfo, len(providers)) - errors := make([]error, len(providers)) - var wg sync.WaitGroup - - for i, p := range providers { - wg.Add(1) - go s.statOnProvider(ctx, req, &infoFromProviders[i], p, &errors[i], &wg) + // TODO(ishank011): aggregrate properties such as etag, checksum, etc. + info := &provider.ResourceInfo{ + Id: &provider.ResourceId{ + StorageId: "/", + OpaqueId: uuid.New().String(), + }, + Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER, + Path: req.Ref.GetPath(), + MimeType: "httpd/unix-directory", + Size: 0, + Mtime: &types.Timestamp{}, } - wg.Wait() - var totalSize uint64 - for i := range providers { - if errors[i] != nil { - log.Warn().Msgf("statting on provider %s returned err %+v", providers[i].ProviderPath, errors[i]) - continue + for _, p := range providers { + c, err := s.getStorageProviderClient(ctx, p) + if err != nil { + return nil, errors.Wrap(err, "error connecting to storage provider="+p.Address) + } + resp, err := c.Stat(ctx, req) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("gateway: error calling Stat %s: %+v", req.Ref.String(), p)) } - if infoFromProviders[i] != nil { - totalSize += infoFromProviders[i].Size + if err != nil || resp.Status.Code != rpc.Code_CODE_OK { + return resp, err + } + if resp.Info != nil { + info.Size += resp.Info.Size + info.Mtime = utils.LaterTS(info.Mtime, resp.Info.Mtime) } } - // TODO(ishank011): aggregrate other properties for references spread across storage providers, eg. /eos return &provider.StatResponse{ Status: status.NewOK(ctx), - Info: &provider.ResourceInfo{ - Id: &provider.ResourceId{ - StorageId: "/", - OpaqueId: uuid.New().String(), - }, - Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER, - Path: req.Ref.GetPath(), - Size: totalSize, - }, + Info: info, }, nil } -func (s *svc) statOnProvider(ctx context.Context, req *provider.StatRequest, res **provider.ResourceInfo, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) { - defer wg.Done() - c, err := s.getStorageProviderClient(ctx, p) - if err != nil { - *e = errors.Wrap(err, "error connecting to storage provider="+p.Address) - return - } - - if utils.IsAbsoluteReference(req.Ref) { - resPath := path.Clean(req.Ref.GetPath()) - newPath := req.Ref.GetPath() - if resPath != "." && !strings.HasPrefix(resPath, p.ProviderPath) { - newPath = p.ProviderPath - } - req.Ref = &provider.Reference{Path: newPath} - } - - r, err := c.Stat(ctx, req) - if err != nil { - *e = errors.Wrap(err, fmt.Sprintf("gateway: error calling Stat %s on %+v", req.Ref, p)) - return - } - *res = r.Info -} - func (s *svc) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) { if utils.IsRelativeReference(req.Ref) { @@ -1651,60 +1625,70 @@ func (s *svc) listSharesFolder(ctx context.Context) (*provider.ListContainerResp } func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) { - log := appctx.GetLogger(ctx) providers, err := s.findProviders(ctx, req.Ref) if err != nil { return &provider.ListContainerResponse{ Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), err), }, nil } + providers = getUniqueProviders(providers) - infoFromProviders := make([][]*provider.ResourceInfo, len(providers)) - errors := make([]error, len(providers)) - indirects := make([]bool, len(providers)) - var wg sync.WaitGroup - - for i, p := range providers { - wg.Add(1) - go s.listContainerOnProvider(ctx, req, &infoFromProviders[i], p, &indirects[i], &errors[i], &wg) + resPath := req.Ref.GetPath() + if len(providers) == 1 && (utils.IsRelativeReference(req.Ref) || resPath == "" || strings.HasPrefix(resPath, providers[0].ProviderPath)) { + c, err := s.getStorageProviderClient(ctx, providers[0]) + if err != nil { + return &provider.ListContainerResponse{ + Status: status.NewInternal(ctx, err, "error connecting to storage provider="+providers[0].Address), + }, nil + } + rsp, err := c.ListContainer(ctx, req) + if err != nil || rsp.Status.Code != rpc.Code_CODE_OK { + return rsp, err + } + return rsp, nil } - wg.Wait() - infos := []*provider.ResourceInfo{} - nestedInfos := make(map[string][]*provider.ResourceInfo) - for i := range providers { - if errors[i] != nil { - // return if there's only one mount, else skip this one - if len(providers) == 1 { - return &provider.ListContainerResponse{ - Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), errors[i]), - }, nil - } - log.Warn().Msgf("listing container on provider %s returned err %+v", providers[i].ProviderPath, errors[i]) - continue + return s.listContainerAcrossProviders(ctx, req, providers) +} + +func (s *svc) listContainerAcrossProviders(ctx context.Context, req *provider.ListContainerRequest, providers []*registry.ProviderInfo) (*provider.ListContainerResponse, error) { + nestedInfos := make(map[string]*provider.ResourceInfo) + + for _, p := range providers { + c, err := s.getStorageProviderClient(ctx, p) + if err != nil { + return nil, errors.Wrap(err, "error connecting to storage provider="+p.Address) + } + resp, err := c.ListContainer(ctx, req) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("gateway: error calling ListContainer %s: %+v", req.Ref.String(), p)) + } + if err != nil || resp.Status.Code != rpc.Code_CODE_OK { + return resp, err } - for _, inf := range infoFromProviders[i] { - if indirects[i] { - p := inf.Path - // TODO do we need to trim prefix here for relative references? - nestedInfos[p] = append(nestedInfos[p], inf) + + for _, info := range resp.Infos { + if p, ok := nestedInfos[info.Path]; ok { + // Since more than one providers contribute to this path, + // use a generic ID + p.Id = &provider.ResourceId{ + StorageId: "/", + OpaqueId: uuid.New().String(), + } + // TODO(ishank011): aggregrate properties such as etag, checksum, etc. + p.Size += info.Size + p.Mtime = utils.LaterTS(p.Mtime, info.Mtime) + p.Type = provider.ResourceType_RESOURCE_TYPE_CONTAINER + p.MimeType = "httpd/unix-directory" } else { - infos = append(infos, inf) + nestedInfos[info.Path] = info } } } - for k := range nestedInfos { - inf := &provider.ResourceInfo{ - Id: &provider.ResourceId{ - StorageId: "/", - OpaqueId: uuid.New().String(), - }, - Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER, - Path: k, - Size: 0, - } - infos = append(infos, inf) + infos := make([]*provider.ResourceInfo, 0, len(nestedInfos)) + for _, info := range nestedInfos { + infos = append(infos, info) } return &provider.ListContainerResponse{ @@ -1713,50 +1697,6 @@ func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequ }, nil } -func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListContainerRequest, res *[]*provider.ResourceInfo, p *registry.ProviderInfo, ind *bool, e *error, wg *sync.WaitGroup) { - defer wg.Done() - c, err := s.getStorageProviderClient(ctx, p) - if err != nil { - *e = errors.Wrap(err, "error connecting to storage provider="+p.Address) - return - } - - if utils.IsAbsoluteReference(req.Ref) { - resPath := path.Clean(req.Ref.GetPath()) - if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) { - // The path which we're supposed to list encompasses this provider - // so just return the first child and mark it as indirect - rel, err := filepath.Rel(resPath, p.ProviderPath) - if err != nil { - *e = err - return - } - parts := strings.Split(rel, "/") - p := path.Join(resPath, parts[0]) - *ind = true - *res = []*provider.ResourceInfo{ - { - Id: &provider.ResourceId{ - StorageId: "/", - OpaqueId: uuid.New().String(), - }, - Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER, - Path: p, - Size: 0, - }, - } - return - } - } - - r, err := c.ListContainer(ctx, req) - if err != nil { - *e = errors.Wrap(err, "gateway: error calling ListContainer") - return - } - *res = r.Infos -} - func (s *svc) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) { log := appctx.GetLogger(ctx) @@ -2197,6 +2137,18 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re return res.Providers, nil } +func getUniqueProviders(providers []*registry.ProviderInfo) []*registry.ProviderInfo { + unique := make(map[string]bool) + for _, p := range providers { + unique[p.Address] = true + } + p := make([]*registry.ProviderInfo, 0, len(unique)) + for addr := range unique { + p = append(p, ®istry.ProviderInfo{Address: addr}) + } + return p +} + type etagWithTS struct { Etag string Timestamp time.Time diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 562abcd7a3..a7e067147b 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -25,6 +25,7 @@ import ( "net/url" "os" "path" + "path/filepath" "sort" "strconv" "strings" @@ -41,6 +42,7 @@ import ( "github.com/cs3org/reva/pkg/storage/fs/registry" rtrace "github.com/cs3org/reva/pkg/trace" "github.com/cs3org/reva/pkg/utils" + "github.com/google/uuid" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" @@ -631,9 +633,10 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide newRef, err := s.unwrap(ctx, req.Ref) if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "error unwrapping path"), - }, nil + // The path might be a virtual view; handle that case + if utils.IsAbsolutePathReference(req.Ref) && strings.HasPrefix(s.mountPath, req.Ref.Path) { + return s.statVirtualView(ctx, req.Ref) + } } md, err := s.storage.GetMD(ctx, newRef, req.ArbitraryMetadataKeys) @@ -641,11 +644,11 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide var st *rpc.Status switch err.(type) { case errtypes.IsNotFound: - st = status.NewNotFound(ctx, "path not found when stating") + st = status.NewNotFound(ctx, "path not found when statting") case errtypes.PermissionDenied: st = status.NewPermissionDenied(ctx, err, "permission denied") default: - st = status.NewInternal(ctx, err, "error stating: "+req.Ref.String()) + st = status.NewInternal(ctx, err, "error statting: "+req.Ref.String()) } return &provider.StatResponse{ Status: st, @@ -664,6 +667,40 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide return res, nil } +func (s *service) statVirtualView(ctx context.Context, ref *provider.Reference) (*provider.StatResponse, error) { + // The reference in the request encompasses this provider + // So we need to stat root, and update the required path + md, err := s.storage.GetMD(ctx, &provider.Reference{Path: "/"}, []string{}) + if err != nil { + var st *rpc.Status + switch err.(type) { + case errtypes.IsNotFound: + st = status.NewNotFound(ctx, "path not found when statting") + case errtypes.PermissionDenied: + st = status.NewPermissionDenied(ctx, err, "permission denied") + default: + st = status.NewInternal(ctx, err, "error statting root") + } + return &provider.StatResponse{ + Status: st, + }, nil + } + + if err := s.wrap(ctx, md, true); err != nil { + return &provider.StatResponse{ + Status: status.NewInternal(ctx, err, "error wrapping path"), + }, nil + } + + // Don't expose the underlying path + md.Path = ref.Path + + return &provider.StatResponse{ + Status: status.NewOK(ctx), + Info: md, + }, nil +} + func (s *service) ListContainerStream(req *provider.ListContainerStreamRequest, ss provider.ProviderAPI_ListContainerStreamServer) error { ctx := ss.Context() log := appctx.GetLogger(ctx) @@ -729,6 +766,11 @@ func (s *service) ListContainerStream(req *provider.ListContainerStreamRequest, func (s *service) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) { newRef, err := s.unwrap(ctx, req.Ref) if err != nil { + // The path might be a virtual view; handle that case + if utils.IsAbsolutePathReference(req.Ref) && strings.HasPrefix(s.mountPath, req.Ref.Path) { + return s.listVirtualView(ctx, req.Ref) + } + return &provider.ListContainerResponse{ Status: status.NewInternal(ctx, err, "error unwrapping path"), }, nil @@ -767,6 +809,74 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer return res, nil } +func (s *service) listVirtualView(ctx context.Context, ref *provider.Reference) (*provider.ListContainerResponse, error) { + // The reference in the request encompasses this provider + // So we need to list root, merge the responses and return only the immediate children + mds, err := s.storage.ListFolder(ctx, &provider.Reference{Path: "/"}, []string{}) + if err != nil { + var st *rpc.Status + switch err.(type) { + case errtypes.IsNotFound: + st = status.NewNotFound(ctx, "path not found when listing root") + case errtypes.PermissionDenied: + st = status.NewPermissionDenied(ctx, err, "permission denied") + default: + st = status.NewInternal(ctx, err, "error listing root") + } + return &provider.ListContainerResponse{ + Status: st, + }, nil + } + + nestedInfos := make(map[string]*provider.ResourceInfo) + infos := make([]*provider.ResourceInfo, 0, len(mds)) + + for _, info := range mds { + // Get the path prefixed with the mount point + if err := s.wrap(ctx, info, true); err != nil { + continue + } + + // If info is an immediate child of the path in request, just use that + if path.Dir(info.Path) == path.Clean(ref.Path) { + infos = append(infos, info) + continue + } + + // info is a nested resource, so link it to its parent closest to the path in request + rel, err := filepath.Rel(ref.Path, info.Path) + if err != nil { + continue + } + parent := path.Join(ref.Path, strings.Split(rel, "/")[0]) + + if p, ok := nestedInfos[parent]; ok { + p.Size += info.Size + p.Mtime = utils.LaterTS(p.Mtime, info.Mtime) + } else { + nestedInfos[parent] = &provider.ResourceInfo{ + Path: parent, + Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER, + Id: &provider.ResourceId{ + OpaqueId: uuid.New().String(), + }, + Size: info.Size, + Mtime: info.Mtime, + MimeType: "httpd/unix-directory", + } + } + } + + for _, info := range nestedInfos { + infos = append(infos, info) + } + + return &provider.ListContainerResponse{ + Status: status.NewOK(ctx), + Infos: infos, + }, nil +} + func (s *service) ListFileVersions(ctx context.Context, req *provider.ListFileVersionsRequest) (*provider.ListFileVersionsResponse, error) { newRef, err := s.unwrap(ctx, req.Ref) if err != nil { @@ -1252,15 +1362,11 @@ func (s *service) unwrap(ctx context.Context, ref *provider.Reference) (*provide } // TODO move mount path trimming to the gateway - fn := ref.GetPath() - fsfn, err := s.trimMountPrefix(fn) + fn, err := s.trimMountPrefix(ref.GetPath()) if err != nil { return nil, err } - - pathRef := &provider.Reference{Path: fsfn} - - return pathRef, nil + return &provider.Reference{Path: fn}, nil } func (s *service) trimMountPrefix(fn string) (string, error) { diff --git a/pkg/storage/utils/eosfs/eosfs.go b/pkg/storage/utils/eosfs/eosfs.go index 0f923d59ac..b8a27281b0 100644 --- a/pkg/storage/utils/eosfs/eosfs.go +++ b/pkg/storage/utils/eosfs/eosfs.go @@ -215,6 +215,7 @@ func NewEOSFS(c *Config) (storage.FS, error) { func (fs *eosfs) userIDcacheWarmup() { if !fs.conf.EnableHome { + time.Sleep(2 * time.Second) ctx := context.Background() paths := []string{fs.wrap(ctx, "/")} auth, _ := fs.getRootAuth(ctx) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index d71d80f10a..f76c2c14b4 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -134,6 +134,14 @@ func TSToTime(ts *types.Timestamp) time.Time { return time.Unix(int64(ts.Seconds), int64(ts.Nanos)) } +// LaterTS returns the timestamp which occurs later. +func LaterTS(t1 *types.Timestamp, t2 *types.Timestamp) *types.Timestamp { + if TSToUnixNano(t1) > TSToUnixNano(t2) { + return t1 + } + return t2 +} + // ExtractGranteeID returns the ID, user or group, set in the GranteeId object func ExtractGranteeID(grantee *provider.Grantee) (*userpb.UserId, *grouppb.GroupId) { switch t := grantee.Id.(type) { From 9b1d711caddf7cf54909c67b8a3c840bf6c2c9a6 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Thu, 28 Oct 2021 12:34:56 +0200 Subject: [PATCH 2/2] Ignore errors when statting or listing across providers --- .../grpc/services/gateway/storageprovider.go | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index 2d4b297769..a3d94538ea 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -1304,6 +1304,7 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St func (s *svc) statAcrossProviders(ctx context.Context, req *provider.StatRequest, providers []*registry.ProviderInfo) (*provider.StatResponse, error) { // TODO(ishank011): aggregrate properties such as etag, checksum, etc. + log := appctx.GetLogger(ctx) info := &provider.ResourceInfo{ Id: &provider.ResourceId{ StorageId: "/", @@ -1319,14 +1320,17 @@ func (s *svc) statAcrossProviders(ctx context.Context, req *provider.StatRequest for _, p := range providers { c, err := s.getStorageProviderClient(ctx, p) if err != nil { - return nil, errors.Wrap(err, "error connecting to storage provider="+p.Address) + log.Err(err).Msg("error connecting to storage provider=" + p.Address) + continue } resp, err := c.Stat(ctx, req) if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("gateway: error calling Stat %s: %+v", req.Ref.String(), p)) + log.Err(err).Msgf("gateway: error calling Stat %s: %+v", req.Ref.String(), p) + continue } - if err != nil || resp.Status.Code != rpc.Code_CODE_OK { - return resp, err + if resp.Status.Code != rpc.Code_CODE_OK { + log.Err(status.NewErrorFromCode(rpc.Code_CODE_OK, "gateway")) + continue } if resp.Info != nil { info.Size += resp.Info.Size @@ -1653,18 +1657,22 @@ func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequ func (s *svc) listContainerAcrossProviders(ctx context.Context, req *provider.ListContainerRequest, providers []*registry.ProviderInfo) (*provider.ListContainerResponse, error) { nestedInfos := make(map[string]*provider.ResourceInfo) + log := appctx.GetLogger(ctx) for _, p := range providers { c, err := s.getStorageProviderClient(ctx, p) if err != nil { - return nil, errors.Wrap(err, "error connecting to storage provider="+p.Address) + log.Err(err).Msg("error connecting to storage provider=" + p.Address) + continue } resp, err := c.ListContainer(ctx, req) if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("gateway: error calling ListContainer %s: %+v", req.Ref.String(), p)) + log.Err(err).Msgf("gateway: error calling Stat %s: %+v", req.Ref.String(), p) + continue } - if err != nil || resp.Status.Code != rpc.Code_CODE_OK { - return resp, err + if resp.Status.Code != rpc.Code_CODE_OK { + log.Err(status.NewErrorFromCode(rpc.Code_CODE_OK, "gateway")) + continue } for _, info := range resp.Infos {