From c7a0497273555dc78b86607f9e95a36271eb0bcc Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Wed, 26 Aug 2020 17:40:23 +0300 Subject: [PATCH] fix multipart using only uploadID --- block/gs/adapter.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/block/gs/adapter.go b/block/gs/adapter.go index 77cb844d116..ec622df399e 100644 --- a/block/gs/adapter.go +++ b/block/gs/adapter.go @@ -278,13 +278,12 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri }) // list bucket parts and validate request match - bucket := a.client.Bucket(qualifiedKey.StorageNamespace) - bucketParts, err := a.listMultipartUploadParts(bucket, uploadID, qualifiedKey) + bucketParts, err := a.listMultipartUploadParts(qualifiedKey.StorageNamespace, uploadID) if err != nil { return nil, 0, err } // validate bucketParts match the request multipartList - err = a.validateMultipartUploadParts(qualifiedKey, multipartList, bucketParts) + err = a.validateMultipartUploadParts(uploadID, multipartList, bucketParts) if err != nil { return nil, 0, err } @@ -296,13 +295,14 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri } // compose target object - targetAttrs, err := a.composeMultipartUploadParts(bucket, qualifiedKey, parts) + targetAttrs, err := a.composeMultipartUploadParts(qualifiedKey.StorageNamespace, uploadID, parts) if err != nil { lg.WithError(err).Error("CompleteMultipartUpload failed") return nil, 0, err } // delete marker + bucket := a.client.Bucket(qualifiedKey.StorageNamespace) objMarker := bucket.Object(formatMultipartMarkerFilename(uploadID)) if err := objMarker.Delete(a.ctx); err != nil { a.log().WithError(err).Warn("Failed to delete multipart upload marker") @@ -311,7 +311,7 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri return &targetAttrs.Etag, targetAttrs.Size, nil } -func (a *Adapter) validateMultipartUploadParts(qualifiedKey block.QualifiedKey, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error { +func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error { if len(multipartList.Part) != len(bucketParts) { return ErrPartListMismatch } @@ -322,7 +322,7 @@ func (a *Adapter) validateMultipartUploadParts(qualifiedKey block.QualifiedKey, if p.ETag == nil { return fmt.Errorf("invalid part at position %d: %w", i, ErrMissingPartETag) } - objName := formatMultipartFilename(qualifiedKey.Key, *p.PartNumber) + objName := formatMultipartFilename(uploadID, *p.PartNumber) if objName != bucketParts[i].Name { return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartName) } @@ -333,7 +333,8 @@ func (a *Adapter) validateMultipartUploadParts(qualifiedKey block.QualifiedKey, return nil } -func (a *Adapter) listMultipartUploadParts(bucket *storage.BucketHandle, uploadID string, qualifiedKey block.QualifiedKey) ([]*storage.ObjectAttrs, error) { +func (a *Adapter) listMultipartUploadParts(bucketName string, uploadID string) ([]*storage.ObjectAttrs, error) { + bucket := a.client.Bucket(bucketName) var bucketParts []*storage.ObjectAttrs it := bucket.Objects(a.ctx, &storage.Query{ Delimiter: delimiter, @@ -345,11 +346,11 @@ func (a *Adapter) listMultipartUploadParts(bucket *storage.BucketHandle, uploadI break } if err != nil { - return nil, fmt.Errorf("bucket(%s).Objects(): %w", qualifiedKey.StorageNamespace, err) + return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, err) } bucketParts = append(bucketParts, attrs) if len(bucketParts) > MaxMultipartObjects { - return nil, fmt.Errorf("bucket(%s).Objects(): %w", qualifiedKey.StorageNamespace, ErrMaxMultipartObjects) + return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects) } } // sort by name - assume natual sort order @@ -359,10 +360,11 @@ func (a *Adapter) listMultipartUploadParts(bucket *storage.BucketHandle, uploadI return bucketParts, nil } -func (a *Adapter) composeMultipartUploadParts(bucket *storage.BucketHandle, qualifiedKey block.QualifiedKey, parts []string) (*storage.ObjectAttrs, error) { +func (a *Adapter) composeMultipartUploadParts(bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) { // compose target from all parts + bucket := a.client.Bucket(bucketName) var targetAttrs *storage.ObjectAttrs - err := ComposeAll(qualifiedKey.Key, parts, func(target string, parts []string) error { + err := ComposeAll(uploadID, parts, func(target string, parts []string) error { objs := make([]*storage.ObjectHandle, len(parts)) for i := range parts { objs[i] = bucket.Object(parts[i]) @@ -372,13 +374,16 @@ func (a *Adapter) composeMultipartUploadParts(bucket *storage.BucketHandle, qual if err != nil { return err } - if target == qualifiedKey.Key { + if target == uploadID { targetAttrs = attrs } // delete parts for _, o := range objs { if err := o.Delete(a.ctx); err != nil { - a.log().WithError(err).Warn("Failed to delete multipart upload part while compose") + a.log().WithError(err).WithFields(logging.Fields{ + "bucket": bucketName, + "parts": parts, + }).Warn("Failed to delete multipart upload part while compose") } } return nil