Skip to content

Commit

Permalink
rework locking when uploading
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Dec 9, 2022
1 parent 1ff975d commit 97ee2a4
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 59 deletions.
5 changes: 3 additions & 2 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}

/* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED
case events.VirusscanFinished:
if ev.ErrorMsg != "" {
// scan failed somehow
Expand Down Expand Up @@ -385,7 +387,6 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
continue
}

default:
// uploadid is not empty -> this is an async upload
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
Expand All @@ -410,7 +411,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
// remove cache entry in gateway
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})

*/
default:
log.Error().Interface("event", ev).Msg("Unknown event")
}
Expand Down
99 changes: 58 additions & 41 deletions pkg/storage/utils/decomposedfs/upload/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ package upload
import (
"context"
"encoding/json"
"fmt"
iofs "io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/gofrs/flock"
"github.com/google/uuid"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
Expand Down Expand Up @@ -234,7 +237,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri
}

// CreateNodeForUpload will create the target node for the Upload
func CreateNodeForUpload(upload *Upload) (*node.Node, error) {
func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Node, error) {
fi, err := os.Stat(upload.binPath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -262,19 +265,29 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) {
return nil, err
}

var lock *flock.Flock
switch n.ID {
case "":
err = initNewNode(upload, n, uint64(fsize))
lock, err = initNewNode(upload, n, uint64(fsize))
default:
err = updateExistingNode(upload, n, spaceID, uint64(fsize))
lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize))
}

defer filelocks.ReleaseLock(lock)
if err != nil {
return nil, err
}

// create/update node info
if err := n.WriteAllNodeMetadata(); err != nil {
// overwrite technical information
initAttrs[xattrs.ParentidAttr] = n.ParentID
initAttrs[xattrs.NameAttr] = n.Name
initAttrs[xattrs.BlobIDAttr] = n.BlobID
initAttrs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10)
initAttrs[xattrs.StatusPrefix] = node.ProcessingStatus

// update node metadata with new blobid etc
err = n.SetXattrsWithLock(initAttrs, lock)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: could not write metadata")
}

Expand All @@ -284,56 +297,61 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) {
return nil, err
}

return n, n.MarkProcessing()
return n, nil
}

func initNewNode(upload *Upload, n *node.Node, fsize uint64) error {
func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) {
n.ID = uuid.New().String()

// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return err
return nil, err
}

if _, err := os.Create(n.InternalPath()); err != nil {
return err
return nil, err
}

lock, err := filelocks.AcquireWriteLock(n.InternalPath())
if err != nil {
// we cannot acquire a lock - we error for safety
return lock, err
}

if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil {
return err
return lock, err
}

// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentInternalPath(), n.Name)
var link string
link, err := os.Readlink(childNameLink)
if err == nil && link != "../"+n.ID {
if err := os.Remove(childNameLink); err != nil {
return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry")
return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry")
}
}
if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID {
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
return errors.Wrap(err, "Decomposedfs: could not symlink child entry")
return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
}
}

// on a new file the sizeDiff is the fileSize
upload.sizeDiff = int64(fsize)
upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff))
return nil
return lock, nil
}

func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) error {
func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) {
old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false)
if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil {
return err
return nil, err
}

vfi, err := os.Stat(old.InternalPath())
if err != nil {
return err
return nil, err
}

// When the if-match header was set we need to check if the
Expand All @@ -342,9 +360,9 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint
targetEtag, err := node.CalculateEtag(n.ID, vfi.ModTime())
switch {
case err != nil:
return errtypes.InternalError(err.Error())
return nil, errtypes.InternalError(err.Error())
case ifMatch != targetEtag:
return errtypes.Aborted("etag mismatch")
return nil, errtypes.Aborted("etag mismatch")
}
}

Expand All @@ -358,36 +376,35 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint
lock, err := filelocks.AcquireWriteLock(targetPath)
if err != nil {
// we cannot acquire a lock - we error for safety
return err
return nil, err
}

// create version node
if _, err := os.Create(upload.versionsPath); err != nil {
return lock, err
}
defer filelocks.ReleaseLock(lock)

// This move drops all metadata!!! We copy it below with CopyMetadata
if err = os.Rename(targetPath, upload.versionsPath); err != nil {
return err
// copy blob metadata to version node
if err := xattrs.CopyMetadataWithSourceLock(targetPath, upload.versionsPath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) ||
attributeName == xattrs.BlobIDAttr ||
attributeName == xattrs.BlobsizeAttr
}, lock); err != nil {
return lock, err
}

if _, err := os.Create(targetPath); err != nil {
return err
// keep mtime from previous version
if err := os.Chtimes(upload.versionsPath, vfi.ModTime(), vfi.ModTime()); err != nil {
return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
}

// copy grant and arbitrary metadata
// NOTE: now restoring an older revision might bring back a grant that was removed!
if err := xattrs.CopyMetadata(upload.versionsPath, targetPath, func(attributeName string) bool {
return true
// TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties
/*
[>
return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants
strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata
strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites
strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file
*/
}); err != nil {
return err
// update mtime of current version
mtime := time.Now()
if err := os.Chtimes(n.InternalPath(), mtime, mtime); err != nil {
return nil, err
}

return nil
return lock, nil
}

// lookupNode looks up nodes by path.
Expand Down
25 changes: 9 additions & 16 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/golang-jwt/jwt"
"github.com/pkg/errors"
Expand Down Expand Up @@ -227,19 +228,21 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
}
}

n, err := CreateNodeForUpload(upload)
// update checksums
attrs := map[string]string{
xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)),
xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)),
xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)),
}

n, err := CreateNodeForUpload(upload, attrs)
if err != nil {
Cleanup(upload, true, false)
return err
}

upload.Node = n

// now try write all checksums
tryWritingChecksum(log, upload.Node, "sha1", sha1h)
tryWritingChecksum(log, upload.Node, "md5", md5h)
tryWritingChecksum(log, upload.Node, "adler32", adler32h)

if upload.pub != nil {
u, _ := ctxpkg.ContextGetUser(upload.Ctx)
s, err := upload.URL(upload.Ctx)
Expand Down Expand Up @@ -459,13 +462,3 @@ func joinurl(paths ...string) string {

return s.String()
}

func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) {
if err := n.SetChecksum(algo, h); err != nil {
log.Err(err).
Str("csType", algo).
Bytes("hash", h.Sum(nil)).
Msg("Decomposedfs: could not write checksum")
// this is not critical, the bytes are there so we will continue
}
}
4 changes: 4 additions & 0 deletions pkg/storage/utils/filelocks/filelocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func AcquireWriteLock(file string) (*flock.Flock, error) {
// ReleaseLock releases a lock from a file that was previously created
// by AcquireReadLock or AcquireWriteLock.
func ReleaseLock(lock *flock.Flock) error {
if lock == nil {
return errors.New("cannot unlock nil lock")
}

// there is a probability that if the file can not be unlocked,
// we also can not remove the file. We will only try to remove if it
// was successfully unlocked.
Expand Down

0 comments on commit 97ee2a4

Please sign in to comment.