Skip to content

Commit

Permalink
Use mtimesyncedcache.Map to prevent concurrent reads/writes
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Jan 11, 2024
1 parent b74d280 commit 4c2970b
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions pkg/share/manager/jsoncs3/sharecache/sharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand All @@ -45,7 +46,7 @@ const tracerName = "sharecache"
type Cache struct {
lockMap sync.Map

UserShares map[string]*UserShareCache
UserShares mtimesyncedcache.Map[string, *UserShareCache]

storage metadata.Storage
namespace string
Expand Down Expand Up @@ -76,7 +77,7 @@ func (c *Cache) lockUser(userID string) func() {
// New returns a new Cache instance
func New(s metadata.Storage, namespace, filename string, ttl time.Duration) Cache {
return Cache{
UserShares: map[string]*UserShareCache{},
UserShares: mtimesyncedcache.Map[string, *UserShareCache]{},
storage: s,
namespace: namespace,
filename: filename,
Expand All @@ -93,7 +94,7 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()

if c.UserShares[userid] == nil {
if _, ok := c.UserShares.Load(userid); !ok {
err := c.syncWithLock(ctx, userid)
if err != nil {
return err
Expand All @@ -111,7 +112,8 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
c.initializeIfNeeded(userid, ssid)

// add share id
c.UserShares[userid].UserShares[ssid].IDs[shareID] = struct{}{}
us, _ := c.UserShares.Load(userid)
us.UserShares[ssid].IDs[shareID] = struct{}{}
return c.Persist(ctx, userid)
}

Expand Down Expand Up @@ -158,7 +160,7 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()

if c.UserShares[userid] == nil {
if _, ok := c.UserShares.Load(userid); ok {
err := c.syncWithLock(ctx, userid)
if err != nil {
return err
Expand All @@ -173,15 +175,13 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
ssid := storageid + shareid.IDDelimiter + spaceid

persistFunc := func() error {
if c.UserShares[userid] == nil {
c.UserShares[userid] = &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
}
}
us, loaded := c.UserShares.LoadOrStore(userid, &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
})

if c.UserShares[userid].UserShares[ssid] != nil {
if loaded {
// remove share id
delete(c.UserShares[userid].UserShares[ssid].IDs, shareID)
delete(us.UserShares[ssid].IDs, shareID)
}

return c.Persist(ctx, userid)
Expand Down Expand Up @@ -234,11 +234,12 @@ func (c *Cache) List(ctx context.Context, userid string) (map[string]SpaceShareI
}

r := map[string]SpaceShareIDs{}
if c.UserShares[userid] == nil {
us, ok := c.UserShares.Load(userid)
if !ok {
return r, nil
}

for ssid, cached := range c.UserShares[userid].UserShares {
for ssid, cached := range us.UserShares {
r[ssid] = SpaceShareIDs{
IDs: cached.IDs,
}
Expand All @@ -261,8 +262,8 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
dlreq := metadata.DownloadRequest{
Path: userCreatedPath,
}
if c.UserShares[userID].Etag != "" {
dlreq.IfNoneMatch = []string{c.UserShares[userID].Etag}
if us, ok := c.UserShares.Load(userID); ok && us.Etag != "" {
dlreq.IfNoneMatch = []string{us.Etag}
}

dlres, err := c.storage.Download(ctx, dlreq)
Expand Down Expand Up @@ -290,7 +291,7 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
}
newShareCache.Etag = dlres.Etag

c.UserShares[userID] = newShareCache
c.UserShares.Store(userID, newShareCache)
span.SetStatus(codes.Ok, "")
return nil
}
Expand All @@ -301,7 +302,12 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))

createdBytes, err := json.Marshal(c.UserShares[userid])
us, ok := c.UserShares.Load(userid)
if !ok {
span.SetStatus(codes.Ok, "no user shares")
return nil
}
createdBytes, err := json.Marshal(us)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -317,11 +323,11 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
ur := metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfMatchEtag: c.UserShares[userid].Etag,
IfMatchEtag: us.Etag,
}
// when there is no etag in memory make sure the file has not been created on the server, see https://www.rfc-editor.org/rfc/rfc9110#field.if-match
// > If the field value is "*", the condition is false if the origin server has a current representation for the target resource.
if c.UserShares[userid].Etag == "" {
if us.Etag == "" {
ur.IfNoneMatch = []string{"*"}
}
res, err := c.storage.Upload(ctx, ur)
Expand All @@ -330,7 +336,7 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
span.SetStatus(codes.Error, err.Error())
return err
}
c.UserShares[userid].Etag = res.Etag
us.Etag = res.Etag
span.SetStatus(codes.Ok, "")
return nil
}
Expand All @@ -340,13 +346,11 @@ func (c *Cache) userCreatedPath(userid string) string {
}

func (c *Cache) initializeIfNeeded(userid, ssid string) {
if c.UserShares[userid] == nil {
c.UserShares[userid] = &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
}
}
if ssid != "" && c.UserShares[userid].UserShares[ssid] == nil {
c.UserShares[userid].UserShares[ssid] = &SpaceShareIDs{
us, _ := c.UserShares.LoadOrStore(userid, &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
})
if ssid != "" && us.UserShares[ssid] == nil {
us.UserShares[ssid] = &SpaceShareIDs{
IDs: map[string]struct{}{},
}
}
Expand Down

0 comments on commit 4c2970b

Please sign in to comment.