Skip to content

Commit

Permalink
prototype concurrent implementation (#7822)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadavsteindler committed Jul 7, 2024
1 parent 11b9351 commit c1cc708
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 90 deletions.
34 changes: 2 additions & 32 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,41 +612,11 @@ func (a *Adapter) listMultipartUploadParts(ctx context.Context, bucketName strin
func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) {
// compose target from all parts
bucket := a.client.Bucket(bucketName)
var targetAttrs *storage.ObjectAttrs
err := ComposeAll(uploadID, parts, func(target string, parts []string) error {
objs := make([]*storage.ObjectHandle, len(parts))
for i := range parts {
h := storageObjectHandle{bucket.Object(parts[i])}
objs[i] = h.withReadHandle(ctx, a).ObjectHandle
}
// compose target from parts
h := storageObjectHandle{bucket.Object(target)}
composer := h.withWriteHandle(a).newComposer(a, objs...)
attrs, err := composer.Run(ctx)
if err != nil {
return err
}
if target == uploadID {
targetAttrs = attrs
}
// delete parts
for _, o := range objs {
if err := o.Delete(ctx); err != nil {
a.log(ctx).WithError(err).WithFields(logging.Fields{
"bucket": bucketName,
"parts": parts,
}).Warn("Failed to delete multipart upload part while compose")
}
}
return nil
})
targetAttrs, err := ComposeAll(ctx, a, uploadID, parts, bucketName, bucket)
if err == nil && targetAttrs == nil {
return nil, ErrMissingTargetAttrs
}
if err != nil {
return nil, err
}
return targetAttrs, nil
return targetAttrs, err
}

func (a *Adapter) Close() error {
Expand Down
55 changes: 48 additions & 7 deletions pkg/block/gs/compose.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package gs

import (
"context"
"fmt"
"sync"

"cloud.google.com/go/storage"
"github.com/treeverse/lakefs/pkg/logging"
)

const MaxPartsInCompose = 32

type ComposeFunc func(target string, parts []string) error

func ComposeAll(target string, parts []string, composeFunc ComposeFunc) error {
func ComposeAll(ctx context.Context, a *Adapter, target string, parts []string, bucketName string, bucket *storage.BucketHandle) (*storage.ObjectAttrs, error) {
var wg sync.WaitGroup
for layer := 1; len(parts) > MaxPartsInCompose; layer++ {
var nextParts []string
for i := 0; i < len(parts); i += MaxPartsInCompose {
Expand All @@ -21,13 +25,50 @@ func ComposeAll(target string, parts []string, composeFunc ComposeFunc) error {
nextParts = append(nextParts, chunk...)
} else {
targetName := fmt.Sprintf("%s_%d", chunk[0], layer)
if err := composeFunc(targetName, chunk); err != nil {
return err
}
wg.Add(1)
go composeChunkConcurrent(ctx, a, targetName, chunk, bucketName, bucket, &wg)
nextParts = append(nextParts, targetName)
}
}
parts = nextParts
}
return composeFunc(target, parts)
wg.Wait()

// no compose the chunks we made
attrs, err := composeChunk(ctx, a, target, parts, bucketName, bucket)
return attrs, err
}

func composeChunkConcurrent(ctx context.Context, a *Adapter, target string, parts []string, bucketName string, bucket *storage.BucketHandle, wg *sync.WaitGroup) {
_, err := composeChunk(ctx, a, target, parts, bucketName, bucket)
if err != nil {
fmt.Println("Compose error: ", err)
}
wg.Done()
}

func composeChunk(ctx context.Context, a *Adapter, target string, parts []string, bucketName string, bucket *storage.BucketHandle) (*storage.ObjectAttrs, error) {
objs := make([]*storage.ObjectHandle, len(parts))
for i := range parts {
h := storageObjectHandle{bucket.Object(parts[i])}
objs[i] = h.withReadHandle(ctx, a).ObjectHandle
}
// compose target from parts
h := storageObjectHandle{bucket.Object(target)}
composer := h.withWriteHandle(a).newComposer(a, objs...)
attrs, err := composer.Run(ctx)
if err != nil {
return nil, err
}

// delete parts
for _, o := range objs {
if err := o.Delete(ctx); err != nil {
a.log(ctx).WithError(err).WithFields(logging.Fields{
"bucket": bucketName,
"parts": parts,
}).Warn("Failed to delete multipart upload part while compose")
}
}
return attrs, nil
}
51 changes: 0 additions & 51 deletions pkg/block/gs/compose_test.go

This file was deleted.

0 comments on commit c1cc708

Please sign in to comment.