Skip to content

Commit

Permalink
Make jsoncs3 more robust (#4101)
Browse files Browse the repository at this point in the history
* Move sync responsibility into the caches. Make sure to initialize.

* Fix preventing overwriting changed resources

* map status to correct error

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* Fix deadlock

* Set MTime of uploaded files

* Add comment about the X-OC-Mtime header

* make tests compile again

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* Add changelog

* Check errors

* Get rid of PersistWithTime. We always set time.Now() anyway

* Add note to make Sync private

* Fix tests

* Fix linter issue

* Fix tests

---------

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
Co-authored-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
aduffeck and butonic authored Aug 7, 2023
1 parent 1913d35 commit 5856b2e
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 232 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-jsoncs3-indexes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Make the jsoncs3 share manager indexes more robust

We fixed a problem where the jsoncs3 share manager indexes could get out of sync.

https://github.com/cs3org/reva/pull/4101
117 changes: 20 additions & 97 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla

eg.Go(func() error {
err := m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
Expand All @@ -359,17 +348,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla

eg.Go(func() error {
err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
Expand All @@ -391,16 +369,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
State: collaboration.ShareState_SHARE_STATE_PENDING,
}
err := m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
Expand All @@ -413,16 +381,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
eg.Go(func() error {
groupid := g.Grantee.GetGroupId().GetOpaqueId()
err := m.GroupReceivedCache.Add(ctx, groupid, shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

err = m.GroupReceivedCache.Add(ctx, groupid, shareID)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
Expand All @@ -445,13 +403,11 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
// getByID must be called in a lock-controlled block.
func (m *Manager) getByID(ctx context.Context, id *collaboration.ShareId) (*collaboration.Share, error) {
storageID, spaceID, _ := shareid.Decode(id.OpaqueId)
// sync cache, maybe our data is outdated
err := m.Cache.Sync(ctx, storageID, spaceID)

share, err := m.Cache.Get(ctx, storageID, spaceID, id.OpaqueId)
if err != nil {
return nil, err
}

share := m.Cache.Get(storageID, spaceID, id.OpaqueId)
if share == nil {
return nil, errtypes.NotFound(id.String())
}
Expand All @@ -460,12 +416,10 @@ func (m *Manager) getByID(ctx context.Context, id *collaboration.ShareId) (*coll

// getByKey must be called in a lock-controlled block.
func (m *Manager) getByKey(ctx context.Context, key *collaboration.ShareKey) (*collaboration.Share, error) {
err := m.Cache.Sync(ctx, key.ResourceId.StorageId, key.ResourceId.SpaceId)
spaceShares, err := m.Cache.ListSpace(ctx, key.ResourceId.StorageId, key.ResourceId.SpaceId)
if err != nil {
return nil, err
}

spaceShares := m.Cache.ListSpace(key.ResourceId.StorageId, key.ResourceId.SpaceId)
for _, share := range spaceShares.Shares {
if utils.GranteeEqual(key.Grantee, share.Grantee) && utils.ResourceIDEqual(share.ResourceId, key.ResourceId) {
return share, nil
Expand Down Expand Up @@ -678,15 +632,11 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
var ss []*collaboration.Share
for providerID, spaces := range providerSpaces {
for spaceID := range spaces {
err := m.Cache.Sync(ctx, providerID, spaceID)
shares, err := m.Cache.ListSpace(ctx, providerID, spaceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

shares := m.Cache.ListSpace(providerID, spaceID)

for _, s := range shares.Shares {
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
Expand Down Expand Up @@ -737,20 +687,20 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares")
defer span.End()

var ss []*collaboration.Share

if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil {
list, err := m.CreatedCache.List(ctx, user.Id.OpaqueId)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return ss, err
return nil, err
}
for ssid, spaceShareIDs := range m.CreatedCache.List(user.Id.OpaqueId) {

var ss []*collaboration.Share
for ssid, spaceShareIDs := range list {
storageID, spaceID, _ := shareid.Decode(ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
spaceShares, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
continue
}
spaceShares := m.Cache.ListSpace(storageID, spaceID)
for shareid := range spaceShareIDs.IDs {
s := spaceShares.Shares[shareid]
if s == nil {
Expand Down Expand Up @@ -800,10 +750,11 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati

// first collect all spaceids the user has access to as a group member
for _, group := range user.Groups {
if err := m.GroupReceivedCache.Sync(ctx, group); err != nil {
list, err := m.GroupReceivedCache.List(ctx, group)
if err != nil {
continue // ignore error, cache will be updated on next read
}
for ssid, spaceShareIDs := range m.GroupReceivedCache.List(group) {
for ssid, spaceShareIDs := range list {
// add a pending entry, the state will be updated
// when reading the received shares below if they have already been accepted or denied
var rs *receivedsharecache.Space
Expand All @@ -825,6 +776,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}

// add all spaces the user has receved shares for, this includes mount points and share state for groups
// TODO: rewrite this code to not use the internal strucs anymore (e.g. by adding a List method). Sync can then be made private.
_ = m.UserReceivedStates.Sync(ctx, user.Id.OpaqueId) // ignore error, cache will be updated on next read

if m.UserReceivedStates.ReceivedSpaces[user.Id.OpaqueId] != nil {
Expand Down Expand Up @@ -872,13 +824,9 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
for shareID, state := range w.rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID)
if err != nil || s == nil {
continue
}
if share.IsExpired(s) {
Expand Down Expand Up @@ -952,9 +900,8 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S

storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)

_ = m.UserReceivedStates.Sync(ctx, userID) // ignore error, cache will be updated on next read
state := m.UserReceivedStates.Get(userID, storageID+shareid.IDDelimiter+spaceID, s.Id.GetOpaqueId())
if state != nil {
state, err := m.UserReceivedStates.Get(ctx, userID, storageID+shareid.IDDelimiter+spaceID, s.Id.GetOpaqueId())
if err == nil && state != nil {
rs.State = state.State
rs.MountPoint = state.MountPoint
}
Expand Down Expand Up @@ -1031,14 +978,6 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab
userID := ctxpkg.ContextMustGetUser(ctx)

err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
// when persisting fails, download, readd and persist again
if err := m.UserReceivedStates.Sync(ctx, userID.GetId().GetOpaqueId()); err != nil {
return nil, err
}
err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs)
// TODO try more often?
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1120,29 +1059,13 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, storageID, spaceID); err != nil {
return err
}
err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
// TODO try more often?
}

return err
})

eg.Go(func() error {
// remove from created cache
err := m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
return err
}
err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
// TODO try more often?
}

return err
return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
})

// TODO remove from grantee cache
Expand Down
Loading

0 comments on commit 5856b2e

Please sign in to comment.