Skip to content

Commit

Permalink
WIP to implement shares caching into the gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
glpatcern committed Oct 17, 2024
1 parent ee10143 commit 1b80e63
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 50 deletions.
78 changes: 61 additions & 17 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/share/cache"
"github.com/cs3org/reva/pkg/sharedconf"
"github.com/cs3org/reva/pkg/token"
"github.com/cs3org/reva/pkg/token/manager/registry"
"github.com/cs3org/reva/pkg/utils/cfg"
"github.com/cs3org/reva/pkg/utils/resourceid"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -65,13 +67,18 @@ type config struct {
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
// ShareFolder is the location where to create shares in the recipient's storage provider.
ShareFolder string `mapstructure:"share_folder"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
HomeMapping string `mapstructure:"home_mapping"`
TokenManagers map[string]map[string]interface{} `mapstructure:"token_managers"`
EtagCacheTTL int `mapstructure:"etag_cache_ttl"`
AllowedUserAgents map[string][]string `mapstructure:"allowed_user_agents"` // map[path][]user-agent
CreateHomeCacheTTL int `mapstructure:"create_home_cache_ttl"`
ShareFolder string `mapstructure:"share_folder"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
HomeMapping string `mapstructure:"home_mapping"`
TokenManagers map[string]map[string]interface{} `mapstructure:"token_managers"`
AllowedUserAgents map[string][]string `mapstructure:"allowed_user_agents"` // map[path][]user-agent
CacheWarmupDriver string `mapstructure:"cache_warmup_driver"`
CacheWarmupDrivers map[string]map[string]interface{} `mapstructure:"cache_warmup_drivers"`
EtagCacheTTL int `mapstructure:"etag_cache_ttl"`
CreateHomeCacheTTL int `mapstructure:"create_home_cache_ttl"`
ResourceInfoCacheDriver string `mapstructure:"resource_info_cache_type"`
ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"`
ResourceInfoCacheDrivers map[string]map[string]interface{} `mapstructure:"resource_info_caches"`
}

// sets defaults.
Expand Down Expand Up @@ -116,11 +123,13 @@ func (c *config) ApplyDefaults() {
}

type svc struct {
c *config
dataGatewayURL url.URL
tokenmgr token.Manager
etagCache *ttlcache.Cache `mapstructure:"etag_cache"`
createHomeCache *ttlcache.Cache `mapstructure:"create_home_cache"`
c *config
dataGatewayURL url.URL
tokenmgr token.Manager
etagCache *ttlcache.Cache `mapstructure:"etag_cache"`
createHomeCache *ttlcache.Cache `mapstructure:"create_home_cache"`
resourceInfoCache cache.ResourceInfoCache
resourceInfoCacheTTL time.Duration
}

// New creates a new gateway svc that acts as a proxy for any grpc operation.
Expand Down Expand Up @@ -151,12 +160,21 @@ func New(ctx context.Context, m map[string]interface{}) (rgrpc.Service, error) {
_ = createHomeCache.SetTTL(time.Duration(c.CreateHomeCacheTTL) * time.Second)
createHomeCache.SkipTTLExtensionOnHit(true)

rCache, _ := getCacheManager(c)
if c.ResourceInfoCacheTTL > 0 {
cwm, err := getCacheWarmupManager(c)
if err == nil {
go startCacheWarmup(cwm, rCache, c.ResourceInfoCacheTTL)
}
}

s := &svc{
c: &c,
dataGatewayURL: *u,
tokenmgr: tokenManager,
etagCache: etagCache,
createHomeCache: createHomeCache,
c: &c,
dataGatewayURL: *u,
tokenmgr: tokenManager,
etagCache: etagCache,
createHomeCache: createHomeCache,
resourceInfoCache: rCache,
}

return s, nil
Expand Down Expand Up @@ -217,3 +235,29 @@ func getTokenManager(manager string, m map[string]map[string]interface{}) (token

return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager))
}

func getCacheManager(c *config.Config) (cache.ResourceInfoCache, error) {

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 239 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

config.Config is not a type
if f, ok := cachereg.NewFuncs[c.ResourceInfoCacheDriver]; ok {

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cachereg

Check failure on line 240 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: cachereg
return f(c.ResourceInfoCacheDrivers[c.ResourceInfoCacheDriver])
}
return nil, fmt.Errorf("driver not found: %s", c.ResourceInfoCacheDriver)
}

func getCacheWarmupManager(c *config.Config) (cache.Warmup, error) {

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

config.Config is not a type

Check failure on line 246 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

config.Config is not a type
if f, ok := warmupreg.NewFuncs[c.CacheWarmupDriver]; ok {

Check failure on line 247 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: warmupreg

Check failure on line 247 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: warmupreg

Check failure on line 247 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: warmupreg

Check failure on line 247 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: warmupreg

Check failure on line 247 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: warmupreg

Check failure on line 247 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: warmupreg
return f(c.CacheWarmupDrivers[c.CacheWarmupDriver])
}
return nil, fmt.Errorf("driver not found: %s", c.CacheWarmupDriver)
}

func startCacheWarmup(cw cache.Warmup, rCache cache.ResourceInfoCache, ttl Duration) {

Check failure on line 253 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: Duration

Check failure on line 253 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: Duration

Check failure on line 253 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: Duration

Check failure on line 253 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: Duration

Check failure on line 253 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: Duration

Check failure on line 253 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: Duration
time.Sleep(2 * time.Second)
infos, err := cw.GetResourceInfos()
if err != nil {
return
}
for _, r := range infos {
key := resourceid.OwnCloudResourceIDWrap(r.Id)
_ = h.resourceInfoCache.SetWithExpire(key, r, h.resourceInfoCacheTTL)

Check failure on line 261 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: h (typecheck)

Check failure on line 261 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: h

Check failure on line 261 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: h

Check failure on line 261 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / integration

undefined: h

Check failure on line 261 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / lint

undefined: h (typecheck)

Check failure on line 261 in internal/grpc/services/gateway/gateway.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: h
}
}
1 change: 1 addition & 0 deletions internal/grpc/services/gateway/usershareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func (s *svc) ListExistingReceivedShares(ctx context.Context, req *collaboration
}

// TODO(lopresti) incorporate the cache layer from internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go
//s.resourceInfoCache
stat, err := s.Stat(ctx, &provider.StatRequest{
Ref: &provider.Reference{
ResourceId: rs.Share.ResourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/conversions"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/response"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/spaces"

"github.com/cs3org/reva/pkg/notification"
"github.com/cs3org/reva/pkg/notification/notificationhelper"
Expand Down Expand Up @@ -150,24 +149,18 @@ func (h *Handler) startCacheWarmup(c cache.Warmup) {
}
}

func (h *Handler) extractReference(r *http.Request) (*provider.Reference, error) {
func (h *Handler) extractReference(r *http.Request) (provider.Reference, error) {
var ref provider.Reference
if spaceID := r.FormValue("space_ref"); spaceID != "" {
_, base, _, ok := spaces.DecodeResourceID(spaceID)
if !ok {
return nil, errors.New("bad space id format")
}

ref.Path = base
}
if p := r.FormValue("path"); p != "" {
if ref.Path == "" {
ref.Path = path.Join(h.homeNamespace, p)
} else {
ref.Path = path.Join(ref.Path, p)
ref = provider.Reference{Path: path.Join(h.homeNamespace, p)}
} else if spaceRef := r.FormValue("space_ref"); spaceRef != "" {
var err error
ref, err = utils.ParseStorageSpaceReference(spaceRef)
if err != nil {
return provider.Reference{}, err
}
}
return &ref, nil
return ref, nil
}

// CreateShare handles POST requests on /apps/files_sharing/api/v1/shares.
Expand All @@ -193,7 +186,7 @@ func (h *Handler) CreateShare(w http.ResponseWriter, r *http.Request) {
}

statReq := provider.StatRequest{
Ref: ref,
Ref: &ref,
}

log := appctx.GetLogger(ctx).With().Interface("ref", ref).Logger()
Expand Down Expand Up @@ -1117,12 +1110,8 @@ func (h *Handler) addFilters(w http.ResponseWriter, r *http.Request, prefix stri
return nil, nil, err
}

target, err := h.extractReference(r)
if err != nil {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error extracting reference from request", err)
return nil, nil, err
}
info, status, err := h.getResourceInfoByPath(ctx, client, target.Path)
target := path.Join(prefix, r.FormValue("path"))
info, status, err := h.getResourceInfoByPath(ctx, client, target)
if err != nil {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error sending a grpc stat request", err)
return nil, nil, err
Expand All @@ -1145,10 +1134,6 @@ func (h *Handler) addFilters(w http.ResponseWriter, r *http.Request, prefix stri
return collaborationFilters, linkFilters, nil
}

func relativePathToSpaceID(info *provider.ResourceInfo) string {
return strings.TrimPrefix(info.Path, info.Id.SpaceId)
}

func (h *Handler) addFileInfo(ctx context.Context, s *conversions.ShareData, info *provider.ResourceInfo) error {
log := appctx.GetLogger(ctx)
if info != nil {
Expand All @@ -1161,14 +1146,12 @@ func (h *Handler) addFileInfo(ctx context.Context, s *conversions.ShareData, inf
s.MimeType = parsedMt
// TODO STime: &types.Timestamp{Seconds: info.Mtime.Seconds, Nanos: info.Mtime.Nanos},
// TODO Storage: int
itemID := spaces.EncodeResourceID(info.Id)

s.ItemSource = itemID
s.ItemSource = resourceid.OwnCloudResourceIDWrap(info.Id)
s.FileSource = s.ItemSource
switch {
case h.sharePrefix == "/":
s.FileTarget = relativePathToSpaceID(info)
s.Path = relativePathToSpaceID(info)
s.FileTarget = info.Path
s.Path = info.Path
case s.ShareType == conversions.ShareTypePublicLink:
s.FileTarget = path.Join("/", path.Base(info.Path))
s.Path = path.Join("/", path.Base(info.Path))
Expand Down Expand Up @@ -1404,8 +1387,7 @@ func mapState(state collaboration.ShareState) int {
var mapped int
switch state {
case collaboration.ShareState_SHARE_STATE_PENDING:
mapped = ocsStateAccepted
// mapped = ocsStatePending
mapped = ocsStatePending
case collaboration.ShareState_SHARE_STATE_ACCEPTED:
mapped = ocsStateAccepted
case collaboration.ShareState_SHARE_STATE_REJECTED:
Expand Down

0 comments on commit 1b80e63

Please sign in to comment.