Skip to content

Commit

Permalink
return error handling (#7822)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadavsteindler committed Jul 9, 2024
1 parent a24b87d commit e3e6f73
Showing 1 changed file with 83 additions and 7 deletions.
90 changes: 83 additions & 7 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/url"
"sort"
"strings"
"time"

Expand All @@ -25,7 +26,10 @@ const (
)

var (
ErrMismatchPartETag = errors.New("mismatch part ETag")
ErrMismatchPartName = errors.New("mismatch part name")
ErrMaxMultipartObjects = errors.New("maximum multipart object reached")
ErrPartListMismatch = errors.New("multipart part list mismatch")
ErrMissingTargetAttrs = errors.New("missing target attributes")
)

Expand Down Expand Up @@ -499,7 +503,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
})
lg.Debug("started multipart upload")

parts, err := a.getPartNames(bucketName, uploadID, multipartList)
parts, err := a.getPartNamesWithValidation(ctx, bucketName, uploadID, multipartList)
if err != nil {
return nil, err
}
Expand All @@ -524,18 +528,90 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
}, nil
}

func (a *Adapter) getPartNames(bucketName, uploadID string, multipartList *block.MultipartUploadCompletion) ([]string, error) {
if len(multipartList.Part) > MaxMultipartObjects {
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects)
func (a *Adapter) getPartNamesWithValidation(ctx context.Context, bucketName, uploadID string, multipartList *block.MultipartUploadCompletion) ([]string, error) {

Check failure on line 531 in pkg/block/gs/adapter.go

View workflow job for this annotation

GitHub Actions / Run Linters and Checkers

unnecessary leading newline (whitespace)

// list bucket parts and validate request match
bucketParts, err := a.listMultipartUploadParts(ctx, bucketName, uploadID)
if err != nil {
return nil, err
}
// validate bucketParts match the request multipartList
err = a.validateMultipartUploadParts(uploadID, multipartList, bucketParts)
if err != nil {
return nil, err
}

parts := make([]string, len(multipartList.Part))
for i := 0; i < len(parts); i++ {
parts[i] = formatMultipartFilename(uploadID, i+1)
// prepare names
parts := make([]string, len(bucketParts))
for i, part := range bucketParts {
parts[i] = part.Name
}
return parts, nil
}

func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error {
if len(multipartList.Part) != len(bucketParts) {
return fmt.Errorf("part list mismatch - expected %d parts, got %d: %w", len(bucketParts), len(multipartList.Part), ErrPartListMismatch)
}
for i, p := range multipartList.Part {
objName := formatMultipartFilename(uploadID, p.PartNumber)
if objName != bucketParts[i].Name {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartName)
}
if p.ETag != bucketParts[i].Etag {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartETag)
}
}
return nil
}

func (a *Adapter) listMultipartUploadParts(ctx context.Context, bucketName string, uploadID string) ([]*storage.ObjectAttrs, error) {
bucket := a.client.Bucket(bucketName)
var bucketParts []*storage.ObjectAttrs
query := &storage.Query{
Delimiter: delimiter,
Prefix: uploadID + partSuffix,
}
err := query.SetAttrSelection([]string{"Name", "Etag"})
if err != nil {
return nil, err
}
it := bucket.Objects(ctx, query)
for {
attrs, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, err)
}

// filter out invalid part names
if !a.isPartName(attrs.Name) {
continue
}

bucketParts = append(bucketParts, attrs)
if len(bucketParts) > MaxMultipartObjects {
return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects)
}
}
// sort by name - assume natual sort order
sort.Slice(bucketParts, func(i, j int) bool {
return bucketParts[i].Name < bucketParts[j].Name
})
return bucketParts, nil
}

// isPartName checks it's a valid part name, as opposed to an already merged group of parts
func (a *Adapter) isPartName(name string) bool {
if len(name) < len(partSuffix)+5 {
return false
}
suffixSubstring := name[len(name)-5-len(partSuffix) : len(name)-5]
return partSuffix == suffixSubstring
}

func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) {
// compose target from all parts
bucket := a.client.Bucket(bucketName)
Expand Down

0 comments on commit e3e6f73

Please sign in to comment.