Skip to content

Commit

Permalink
rewrite finish upload to get atomic size diff
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Nov 17, 2022
1 parent 3dbdcc5 commit fced7bd
Showing 1 changed file with 145 additions and 71 deletions.
216 changes: 145 additions & 71 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)

var defaultFilePerm = os.FileMode(0664)
Expand Down Expand Up @@ -566,6 +566,20 @@ func (upload *fileUpload) writeInfo() error {
}

// FinishUpload finishes an upload and moves the file to the internal destination
//
// # upload steps
// check if match header to fail early
// copy blob
// lock metadata node
// check if match header again as safeguard
// read metadata
// create version node with current metadata
// update node metadata with new blobid etc
// remember size diff
// unlock metadata
// propagate size diff and new etag
// - propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant
// - propagation needs to propagate the diff
func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {

// ensure cleanup
Expand Down Expand Up @@ -599,25 +613,27 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
}

overwrite := n.ID != ""
var oldSize uint64
var oldSize int64
if overwrite {
// read size from existing node
old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, n.ID, false)
oldSize = uint64(old.Blobsize)
oldSize = old.Blobsize
} else {
// create new fileid
n.ID = uuid.New().String()
upload.info.Storage["NodeId"] = n.ID
}

if _, err = node.CheckQuota(n.SpaceRoot, overwrite, oldSize, uint64(fi.Size())); err != nil {
if _, err = node.CheckQuota(n.SpaceRoot, overwrite, uint64(oldSize), uint64(fi.Size())); err != nil {
return err
}

targetPath := n.InternalPath()
sublog := appctx.GetLogger(upload.ctx).
With().
Interface("info", upload.info).
Str("spaceid", spaceID).
Str("nodeid", n.ID).
Str("binPath", upload.binPath).
Str("targetPath", targetPath).
Logger()
Expand Down Expand Up @@ -669,8 +685,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {

// defer writing the checksums until the node is in place

// if target exists create new version
versionsPath := ""
// upload steps
// check if match header to fail early

if fi, err = os.Stat(targetPath); err == nil {
// When the if-match header was set we need to check if the
// etag still matches before finishing the upload.
Expand All @@ -684,24 +701,10 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
return errtypes.Aborted("etag mismatch")
}
}

// FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata...
// versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano))

// This move drops all metadata!!! We copy it below with CopyMetadata
// FIXME the node must remain the same. otherwise we might restore share metadata
if err = os.Rename(targetPath, versionsPath); err != nil {
sublog.Err(err).
Str("binPath", upload.binPath).
Str("versionsPath", versionsPath).
Msg("Decomposedfs: could not create version")
return
}

}

// upload the data to the blobstore
// copy blob

file, err := os.Open(upload.binPath)
if err != nil {
return err
Expand All @@ -712,25 +715,72 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
return errors.Wrap(err, "failed to upload file to blostore")
}

// now truncate the upload (the payload stays in the blobstore) and move it to the target path
// TODO put uploads on the same underlying storage as the destination dir?
// TODO trigger a workflow as the final rename might eg involve antivirus scanning
if err = os.Truncate(upload.binPath, 0); err != nil {
sublog.Err(err).
Msg("Decomposedfs: could not truncate")
return
// prepare discarding the blob if something changed while writing it
discardBlob := func() {
err = upload.fs.tp.DeleteBlob(n)
if err != nil {
sublog.Err(err).Str("blobid", n.BlobID).Msg("Decomposedfs: failed to discard blob in blostore")
}
}
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
sublog.Warn().Err(err).Msg("Decomposedfs: could not create node dir, trying to write file anyway")

// lock metadata node
lock, err := filelocks.AcquireWriteLock(targetPath)
if err != nil {
return errtypes.InternalError(err.Error())
}
if err = os.Rename(upload.binPath, targetPath); err != nil {
sublog.Error().Err(err).Msg("Decomposedfs: could not rename")
return

// check if match header again as safequard
versionsPath := ""
if fi, err = os.Stat(targetPath); err == nil {
// When the if-match header was set we need to check if the
// etag still matches before finishing the upload.
if ifMatch, ok := upload.info.MetaData["if-match"]; ok {
var targetEtag string
targetEtag, err = node.CalculateEtag(n.ID, fi.ModTime())
if err != nil {
discardBlob()
return errtypes.InternalError(err.Error())
}
if ifMatch != targetEtag {
discardBlob()
return errtypes.Aborted("etag mismatch")
}
}

// FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata...
// versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano))
}

// read metadata

// attributes that will change
attrs := map[string]string{
xattrs.BlobIDAttr: n.BlobID,
xattrs.BlobsizeAttr: strconv.FormatInt(n.Blobsize, 10),

// update checksums
xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)),
xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)),
xattrs.ChecksumPrefix + "shadler32a1": string(adler32h.Sum(nil)),
}

// create version node with current metadata

// if file already exists
if versionsPath != "" {
// copy grant and arbitrary metadata
// touch version node
file, err := os.Create(versionsPath)
if err != nil {
discardBlob()
sublog.Err(err).Str("version", versionsPath).Msg("could not create version")
return errtypes.InternalError("could not create version")
}
file.Close()

// copy grant and arbitrary metadata to version node
// FIXME ... now restoring an older revision might bring back a grant that was removed!
err = xattrs.CopyMetadata(versionsPath, targetPath, func(attributeName string) bool {
err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool {
return true
// TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties
/*
Expand All @@ -739,23 +789,68 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites
strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file
*/
})
}, lock)
if err != nil {
sublog.Info().Err(err).Msg("Decomposedfs: failed to copy xattrs")
discardBlob()
sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node")
return errtypes.InternalError("failed to copy xattrs to version node")
}
}

// now try write all checksums
tryWritingChecksum(&sublog, n, "sha1", sha1h)
tryWritingChecksum(&sublog, n, "md5", md5h)
tryWritingChecksum(&sublog, n, "adler32", adler32h)
// we MUST bypass any cache here as we have to calculate the size diff atomically
oldSize, err = node.ReadBlobSizeAttr(targetPath)
if err != nil {
discardBlob()
sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node")
return errtypes.InternalError("failed to copy xattrs to version node")
}

} else {
// create dir to node
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
discardBlob()
sublog.Err(err).Msg("could not create node dir")
return errtypes.InternalError("could not create node dir")
}
// touch metadata node
file, err := os.Create(targetPath)
if err != nil {
discardBlob()
sublog.Err(err).Msg("could not create metadata node")
return errtypes.InternalError("could not create version")
}
file.Close()

// basic node metadata
attrs[xattrs.ParentidAttr] = n.ParentID
attrs[xattrs.NameAttr] = n.Name
oldSize = 0
}

// who will become the owner? the owner of the parent actually ... not the currently logged in user
err = n.WriteAllNodeMetadata()
// update node metadata with new blobid etc
err = n.SetXattrsWithLock(attrs, lock)
if err != nil {
discardBlob()
return errors.Wrap(err, "Decomposedfs: could not write metadata")
}

// use set arbitrary metadata?
if upload.info.MetaData["mtime"] != "" {
err := n.SetMtime(ctx, upload.info.MetaData["mtime"])
if err != nil {
sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata")
return err
}
}

// remember size diff
//sizeDiff := oldSize - n.Blobsize

// unlock metadata
err = filelocks.ReleaseLock(lock)
if err != nil {
return errtypes.InternalError(err.Error())
}

// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentInternalPath(), n.Name)
var link string
Expand All @@ -778,23 +873,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
}
}

// only delete the upload if it was successfully written to the storage
if err = os.Remove(upload.infoPath); err != nil {
if !errors.Is(err, iofs.ErrNotExist) {
sublog.Err(err).Msg("Decomposedfs: could not delete upload info")
return
}
}
// use set arbitrary metadata?
if upload.info.MetaData["mtime"] != "" {
err := n.SetMtime(ctx, upload.info.MetaData["mtime"])
if err != nil {
sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata")
return err
}

}

// fill metadata with current mtime
if fi, err = os.Stat(targetPath); err == nil {
upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond())
Expand All @@ -803,6 +881,11 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {

n.Exists = true

// propagate size diff and new etag
// propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant
// propagation needs to propagate the diff

//return upload.fs.tp.Propagate(upload.ctx, n, sizeDiff)
return upload.fs.tp.Propagate(upload.ctx, n)
}

Expand All @@ -813,15 +896,6 @@ func (upload *fileUpload) checkHash(expected string, h hash.Hash) error {
}
return nil
}
func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) {
if err := n.SetChecksum(algo, h); err != nil {
log.Err(err).
Str("csType", algo).
Bytes("hash", h.Sum(nil)).
Msg("Decomposedfs: could not write checksum")
// this is not critical, the bytes are there so we will continue
}
}

func (upload *fileUpload) discardChunk() {
if err := os.Remove(upload.binPath); err != nil {
Expand Down

0 comments on commit fced7bd

Please sign in to comment.