Skip to content

Commit

Permalink
fix multipart using only uploadID
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Aug 26, 2020
1 parent 24a7ba8 commit c7a0497
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,12 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri
})

// list bucket parts and validate request match
bucket := a.client.Bucket(qualifiedKey.StorageNamespace)
bucketParts, err := a.listMultipartUploadParts(bucket, uploadID, qualifiedKey)
bucketParts, err := a.listMultipartUploadParts(qualifiedKey.StorageNamespace, uploadID)
if err != nil {
return nil, 0, err
}
// validate bucketParts match the request multipartList
err = a.validateMultipartUploadParts(qualifiedKey, multipartList, bucketParts)
err = a.validateMultipartUploadParts(uploadID, multipartList, bucketParts)
if err != nil {
return nil, 0, err
}
Expand All @@ -296,13 +295,14 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri
}

// compose target object
targetAttrs, err := a.composeMultipartUploadParts(bucket, qualifiedKey, parts)
targetAttrs, err := a.composeMultipartUploadParts(qualifiedKey.StorageNamespace, uploadID, parts)
if err != nil {
lg.WithError(err).Error("CompleteMultipartUpload failed")
return nil, 0, err
}

// delete marker
bucket := a.client.Bucket(qualifiedKey.StorageNamespace)
objMarker := bucket.Object(formatMultipartMarkerFilename(uploadID))
if err := objMarker.Delete(a.ctx); err != nil {
a.log().WithError(err).Warn("Failed to delete multipart upload marker")
Expand All @@ -311,7 +311,7 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri
return &targetAttrs.Etag, targetAttrs.Size, nil
}

func (a *Adapter) validateMultipartUploadParts(qualifiedKey block.QualifiedKey, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error {
func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error {
if len(multipartList.Part) != len(bucketParts) {
return ErrPartListMismatch
}
Expand All @@ -322,7 +322,7 @@ func (a *Adapter) validateMultipartUploadParts(qualifiedKey block.QualifiedKey,
if p.ETag == nil {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMissingPartETag)
}
objName := formatMultipartFilename(qualifiedKey.Key, *p.PartNumber)
objName := formatMultipartFilename(uploadID, *p.PartNumber)
if objName != bucketParts[i].Name {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartName)
}
Expand All @@ -333,7 +333,8 @@ func (a *Adapter) validateMultipartUploadParts(qualifiedKey block.QualifiedKey,
return nil
}

func (a *Adapter) listMultipartUploadParts(bucket *storage.BucketHandle, uploadID string, qualifiedKey block.QualifiedKey) ([]*storage.ObjectAttrs, error) {
func (a *Adapter) listMultipartUploadParts(bucketName string, uploadID string) ([]*storage.ObjectAttrs, error) {
bucket := a.client.Bucket(bucketName)
var bucketParts []*storage.ObjectAttrs
it := bucket.Objects(a.ctx, &storage.Query{
Delimiter: delimiter,
Expand All @@ -345,11 +346,11 @@ func (a *Adapter) listMultipartUploadParts(bucket *storage.BucketHandle, uploadI
break
}
if err != nil {
return nil, fmt.Errorf("bucket(%s).Objects(): %w", qualifiedKey.StorageNamespace, err)
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, err)
}
bucketParts = append(bucketParts, attrs)
if len(bucketParts) > MaxMultipartObjects {
return nil, fmt.Errorf("bucket(%s).Objects(): %w", qualifiedKey.StorageNamespace, ErrMaxMultipartObjects)
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects)
}
}
// sort by name - assume natual sort order
Expand All @@ -359,10 +360,11 @@ func (a *Adapter) listMultipartUploadParts(bucket *storage.BucketHandle, uploadI
return bucketParts, nil
}

func (a *Adapter) composeMultipartUploadParts(bucket *storage.BucketHandle, qualifiedKey block.QualifiedKey, parts []string) (*storage.ObjectAttrs, error) {
func (a *Adapter) composeMultipartUploadParts(bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) {
// compose target from all parts
bucket := a.client.Bucket(bucketName)
var targetAttrs *storage.ObjectAttrs
err := ComposeAll(qualifiedKey.Key, parts, func(target string, parts []string) error {
err := ComposeAll(uploadID, parts, func(target string, parts []string) error {
objs := make([]*storage.ObjectHandle, len(parts))
for i := range parts {
objs[i] = bucket.Object(parts[i])
Expand All @@ -372,13 +374,16 @@ func (a *Adapter) composeMultipartUploadParts(bucket *storage.BucketHandle, qual
if err != nil {
return err
}
if target == qualifiedKey.Key {
if target == uploadID {
targetAttrs = attrs
}
// delete parts
for _, o := range objs {
if err := o.Delete(a.ctx); err != nil {
a.log().WithError(err).Warn("Failed to delete multipart upload part while compose")
a.log().WithError(err).WithFields(logging.Fields{
"bucket": bucketName,
"parts": parts,
}).Warn("Failed to delete multipart upload part while compose")
}
}
return nil
Expand Down

0 comments on commit c7a0497

Please sign in to comment.