From 63eda99568227a5d83420c5a2779f209e66edae5 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 9 Dec 2022 11:28:35 +0100 Subject: [PATCH] rework locking when uploading Signed-off-by: jkoberg --- .../utils/decomposedfs/decomposedfs.go | 5 +- .../utils/decomposedfs/upload/processing.go | 95 +++++++++++-------- .../utils/decomposedfs/upload/upload.go | 25 ++--- pkg/storage/utils/filelocks/filelocks.go | 4 + 4 files changed, 69 insertions(+), 60 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index a87a67d3d53..d6c4dfa179f 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -300,6 +300,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { ); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") } + + /* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED case events.VirusscanFinished: if ev.ErrorMsg != "" { // scan failed somehow @@ -385,7 +387,6 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) continue } - default: // uploadid is not empty -> this is an async upload up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) @@ -410,7 +411,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { // remove cache entry in gateway fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - + */ default: log.Error().Interface("event", ev).Msg("Unknown event") } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index a7ae23db2d0..f94a5102b8a 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -21,10 +21,12 @@ package upload import ( "context" "encoding/json" + "fmt" iofs "io/fs" "os" "path/filepath" "strconv" + "strings" "time" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -42,6 +44,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/gofrs/flock" "github.com/google/uuid" "github.com/pkg/errors" tusd "github.com/tus/tusd/pkg/handler" @@ -234,7 +237,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri } // CreateNodeForUpload will create the target node for the Upload -func CreateNodeForUpload(upload *Upload) (*node.Node, error) { +func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Node, error) { fi, err := os.Stat(upload.binPath) if err != nil { return nil, err @@ -262,19 +265,29 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) { return nil, err } + var lock *flock.Flock + defer filelocks.ReleaseLock(lock) + switch n.ID { case "": - err = initNewNode(upload, n, uint64(fsize)) + lock, err = initNewNode(upload, n, uint64(fsize)) default: - err = updateExistingNode(upload, n, spaceID, uint64(fsize)) + lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) } - if err != nil { return nil, err } - // create/update node info - if err := n.WriteAllNodeMetadata(); err != nil { + // overwrite technical information + initAttrs[xattrs.ParentidAttr] = n.ParentID + initAttrs[xattrs.NameAttr] = n.Name + initAttrs[xattrs.BlobIDAttr] = n.BlobID + initAttrs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10) + initAttrs[xattrs.StatusPrefix] = node.ProcessingStatus + + // update node metadata with new blobid etc + err = n.SetXattrsWithLock(initAttrs, lock) + if err != nil { return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") } @@ -284,56 +297,61 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) { return nil, err } - return n, n.MarkProcessing() + return n, nil } -func initNewNode(upload *Upload, n *node.Node, fsize uint64) error { +func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) { n.ID = uuid.New().String() // create folder structure (if needed) if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { - return err + return nil, err } if _, err := os.Create(n.InternalPath()); err != nil { - return err + return nil, err + } + + lock, err := filelocks.AcquireWriteLock(n.InternalPath()) + if err != nil { + // we cannot acquire a lock - we error for safety + return lock, err } if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil { - return err + return lock, err } // link child name to parent if it is new childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) - var link string link, err := os.Readlink(childNameLink) if err == nil && link != "../"+n.ID { if err := os.Remove(childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") + return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not symlink child entry") + return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry") } } // on a new file the sizeDiff is the fileSize upload.sizeDiff = int64(fsize) upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) - return nil + return lock, nil } -func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) error { +func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) { old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false) if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { - return err + return nil, err } vfi, err := os.Stat(old.InternalPath()) if err != nil { - return err + return nil, err } // When the if-match header was set we need to check if the @@ -342,9 +360,9 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint targetEtag, err := node.CalculateEtag(n.ID, vfi.ModTime()) switch { case err != nil: - return errtypes.InternalError(err.Error()) + return nil, errtypes.InternalError(err.Error()) case ifMatch != targetEtag: - return errtypes.Aborted("etag mismatch") + return nil, errtypes.Aborted("etag mismatch") } } @@ -358,36 +376,29 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint lock, err := filelocks.AcquireWriteLock(targetPath) if err != nil { // we cannot acquire a lock - we error for safety - return err + return nil, err } - defer filelocks.ReleaseLock(lock) - // This move drops all metadata!!! We copy it below with CopyMetadata - if err = os.Rename(targetPath, upload.versionsPath); err != nil { - return err + // create version node + if _, err := os.Create(upload.versionsPath); err != nil { + return lock, err } - if _, err := os.Create(targetPath); err != nil { - return err + // copy blob metadata to version node + if err := xattrs.CopyMetadataWithSourceLock(targetPath, upload.versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }, lock); err != nil { + return lock, err } - // copy grant and arbitrary metadata - // NOTE: now restoring an older revision might bring back a grant that was removed! - if err := xattrs.CopyMetadata(upload.versionsPath, targetPath, func(attributeName string) bool { - return true - // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties - /* - [> - return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants - strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata - strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites - strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file - */ - }); err != nil { - return err + // keep mtime from previous version + if err := os.Chtimes(upload.versionsPath, vfi.ModTime(), vfi.ModTime()); err != nil { + return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) } - return nil + return lock, nil } // lookupNode looks up nodes by path. diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 7164396dfd1..490a1ab9da0 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/utils" "github.com/golang-jwt/jwt" "github.com/pkg/errors" @@ -227,7 +228,14 @@ func (upload *Upload) FinishUpload(_ context.Context) error { } } - n, err := CreateNodeForUpload(upload) + // update checksums + attrs := map[string]string{ + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), + } + + n, err := CreateNodeForUpload(upload, attrs) if err != nil { Cleanup(upload, true, false) return err @@ -235,11 +243,6 @@ func (upload *Upload) FinishUpload(_ context.Context) error { upload.Node = n - // now try write all checksums - tryWritingChecksum(log, upload.Node, "sha1", sha1h) - tryWritingChecksum(log, upload.Node, "md5", md5h) - tryWritingChecksum(log, upload.Node, "adler32", adler32h) - if upload.pub != nil { u, _ := ctxpkg.ContextGetUser(upload.Ctx) s, err := upload.URL(upload.Ctx) @@ -459,13 +462,3 @@ func joinurl(paths ...string) string { return s.String() } - -func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) { - if err := n.SetChecksum(algo, h); err != nil { - log.Err(err). - Str("csType", algo). - Bytes("hash", h.Sum(nil)). - Msg("Decomposedfs: could not write checksum") - // this is not critical, the bytes are there so we will continue - } -} diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index 79e91332ea2..c98a499851a 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -164,6 +164,10 @@ func AcquireWriteLock(file string) (*flock.Flock, error) { // ReleaseLock releases a lock from a file that was previously created // by AcquireReadLock or AcquireWriteLock. func ReleaseLock(lock *flock.Flock) error { + if lock == nil { + return errors.New("cannot unlock nil lock") + } + // there is a probability that if the file can not be unlocked, // we also can not remove the file. We will only try to remove if it // was successfully unlocked.