Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op: New package containing the high-level upload code #3

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb
gopkg.in/urfave/cli.v1 v1.20.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb h1:GIzvVQ9UkUlOhSDlqmrQAAAUd6R3E+caIisNEyWXvNE=
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
Expand Down
264 changes: 264 additions & 0 deletions op/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package op

import (
"context"
"encoding/base64"
"errors"
"fmt"
"runtime"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/coreos/pkg/multierror"

"github.com/flatcar/azure-vhd-utils/upload"
"github.com/flatcar/azure-vhd-utils/upload/metadata"
"github.com/flatcar/azure-vhd-utils/vhdcore/common"
"github.com/flatcar/azure-vhd-utils/vhdcore/diskstream"
"github.com/flatcar/azure-vhd-utils/vhdcore/validator"
)

type Error int

const (
MissingVHDSuffix Error = iota
BlobAlreadyExists
MissingUploadMetadata
)

func (e Error) Error() string {
switch e {
case MissingVHDSuffix:
return "missing .vhd suffix in blob name"
case BlobAlreadyExists:
return "blob already exists"
case MissingUploadMetadata:
return "blob has no upload metadata"
default:
return "unknown upload error"
}
}

func ErrorIsAnyOf(err error, errs ...Error) bool {
var opError Error
if !errors.As(err, &opError) {
return false
}

for _, e := range errs {
if opError == e {
return true
}
}

return false
}

type UploadOptions struct {
Overwrite bool
Parallelism int
Logger func(string)
}

func noopLogger(s string) {
}

func Upload(ctx context.Context, blobServiceClient *service.Client, container, blob, vhd string, opts *UploadOptions) error {
const PageBlobPageSize int64 = 512
const PageBlobPageSetSize int64 = 4 * 1024 * 1024

if !strings.HasSuffix(strings.ToLower(blob), ".vhd") {
return MissingVHDSuffix
}

if opts == nil {
opts = &UploadOptions{
Logger: noopLogger,
}
}

parallelism := 8 * runtime.NumCPU()
if opts.Parallelism > 0 {
parallelism = opts.Parallelism
}
overwrite := opts.Overwrite
logger := opts.Logger

if err := ensureVHDSanity(vhd); err != nil {
return err
}

diskStream, err := diskstream.CreateNewDiskStream(vhd)
if err != nil {
return err
}
defer diskStream.Close()

containerClient := blobServiceClient.NewContainerClient(container)
pageblobClient := containerClient.NewPageBlobClient(blob)
blobClient := pageblobClient.BlobClient()

_, err = containerClient.Create(ctx, nil)
if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists, bloberror.ResourceAlreadyExists) {
return err
}

blobExists := true
blobProperties, err := blobClient.GetProperties(ctx, nil)
if err != nil {
if !bloberror.HasCode(err, bloberror.BlobNotFound, bloberror.ResourceNotFound) {
return err
}
blobExists = false
}

resume := false
var blobMetadata *metadata.Metadata
if blobExists {
if !overwrite {
if len(blobProperties.ContentMD5) > 0 {
return BlobAlreadyExists
}
blobMetadata, err = metadata.NewMetadataFromBlobMetadata(blobProperties.Metadata)
if err != nil {
return err
}
if blobMetadata == nil {
return MissingUploadMetadata
}
}
resume = true
logger(fmt.Sprintf("Blob with name '%s' already exists, checking upload can be resumed", blob))
}

localMetadata, err := metadata.NewMetadataFromLocalVHD(vhd)
if err != nil {
return err
}

var rangesToSkip []*common.IndexRange
if resume {
if errs := metadata.CompareMetadata(blobMetadata, localMetadata); len(errs) > 0 {
return multierror.Error(errs)
}
ranges, err := getAlreadyUploadedBlobRanges(ctx, pageblobClient)
if err != nil {
return err
}
rangesToSkip = ranges
} else {
if err := createBlob(ctx, pageblobClient, diskStream.GetSize(), localMetadata); err != nil {
return err
}
}

uploadableRanges, err := upload.LocateUploadableRanges(diskStream, rangesToSkip, PageBlobPageSize, PageBlobPageSetSize)
if err != nil {
return err
}

uploadableRanges, err = upload.DetectEmptyRanges(diskStream, uploadableRanges)
if err != nil {
return err
}

uploadContext := &upload.DiskUploadContext{
VhdStream: diskStream,
AlreadyProcessedBytes: diskStream.GetSize() - common.TotalRangeLength(uploadableRanges),
UploadableRanges: uploadableRanges,
PageblobClient: pageblobClient,
Parallelism: parallelism,
Resume: resume,
}

err = upload.Upload(ctx, uploadContext)
if err != nil {
return err
}

if err := setBlobMD5Hash(ctx, blobClient, localMetadata); err != nil {
return err
}
logger("Upload completed")
return nil
}

// ensureVHDSanity ensure is VHD is valid for Azure.
func ensureVHDSanity(vhd string) error {
if err := validator.ValidateVhd(vhd); err != nil {
return err
}

if err := validator.ValidateVhdSize(vhd); err != nil {
return err
}

return nil
}

// createBlob creates a page blob of specific size and sets custom
// metadata. The parameter client is the Azure pageblob client
// representing a blob in a container, size is the size of the new
// page blob in bytes and parameter vhdMetadata is the custom metadata
// to be associated with the page blob.
func createBlob(ctx context.Context, client *pageblob.Client, size int64, vhdMetadata *metadata.Metadata) error {
m, err := vhdMetadata.ToMap()
if err != nil {
return err
}
opts := pageblob.CreateOptions{
Metadata: m,
}
_, err = client.Create(ctx, size, &opts)
return err
}

// setBlobMD5Hash sets MD5 hash of the blob in its properties
func setBlobMD5Hash(ctx context.Context, client *blob.Client, vhdMetadata *metadata.Metadata) error {
if vhdMetadata.FileMetadata == nil || len(vhdMetadata.FileMetadata.MD5Hash) == 0 {
return nil
}
buf := make([]byte, base64.StdEncoding.EncodedLen(len(vhdMetadata.FileMetadata.MD5Hash)))
base64.StdEncoding.Encode(buf, vhdMetadata.FileMetadata.MD5Hash)
blobHeaders := blob.HTTPHeaders{
BlobContentMD5: buf,
}
_, err := client.SetHTTPHeaders(ctx, blobHeaders, nil)
return err
}

// getAlreadyUploadedBlobRanges returns the range slice containing
// ranges of a page blob those are already uploaded. The parameter
// client is the Azure pageblob client representing a blob in a
// container.
func getAlreadyUploadedBlobRanges(ctx context.Context, client *pageblob.Client) ([]*common.IndexRange, error) {
var (
marker *string
rangesToSkip []*common.IndexRange
)
for {
opts := pageblob.GetPageRangesOptions{
Marker: marker,
}
pager := client.NewGetPageRangesPager(&opts)
for pager.More() {
response, err := pager.NextPage(ctx)
if err != nil {
return nil, err
}
tmpRanges := make([]*common.IndexRange, len(response.PageRange))
for i, page := range response.PageRange {
tmpRanges[i] = common.NewIndexRange(*page.Start, *page.End)
}
rangesToSkip = append(rangesToSkip, tmpRanges...)
marker = response.NextMarker
}
if marker == nil || *marker == "" {
break
}
}
return rangesToSkip, nil
}
21 changes: 11 additions & 10 deletions upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,34 @@ func (byteReadSeekCloser) Close() error {

var _ io.ReadSeekCloser = byteReadSeekCloser{}

// Upload uploads the disk ranges described by the parameter cxt, this parameter describes the disk stream to
// Upload uploads the disk ranges described by the parameter uctx, this parameter describes the disk stream to
// read from, the ranges of the stream to read, the destination blob and it's container, the client to communicate
// with Azure storage and the number of parallel go-routines to use for upload.
func Upload(cxt *DiskUploadContext) error {
func Upload(ctx context.Context, uctx *DiskUploadContext) error {
// Get the channel that contains stream of disk data to upload
dataWithRangeChan, streamReadErrChan := GetDataWithRanges(cxt.VhdStream, cxt.UploadableRanges)
dataWithRangeChan, streamReadErrChan := GetDataWithRanges(uctx.VhdStream, uctx.UploadableRanges)

// The channel to send upload request to load-balancer
requtestChan := make(chan *concurrent.Request, 0)

// Prepare and start the load-balancer that load request across 'cxt.Parallelism' workers
loadBalancer := concurrent.NewBalancer(cxt.Parallelism)
// Prepare and start the load-balancer that load request across 'uctx.Parallelism' workers
loadBalancer := concurrent.NewBalancer(uctx.Parallelism)
loadBalancer.Init()
workerErrorChan, allWorkersFinishedChan := loadBalancer.Run(requtestChan)

// Calculate the actual size of the data to upload
uploadSizeInBytes := int64(0)
for _, r := range cxt.UploadableRanges {
for _, r := range uctx.UploadableRanges {
uploadSizeInBytes += r.Length()
}
fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(cxt.VhdStream.GetSize())/oneMB)
fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(uctx.VhdStream.GetSize())/oneMB)

// Prepare and start the upload progress tracker
uploadProgress := progress.NewStatus(cxt.Parallelism, cxt.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
uploadProgress := progress.NewStatus(uctx.Parallelism, uctx.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
progressChan := uploadProgress.Run()

// read progress status from progress tracker and print it
go readAndPrintProgress(progressChan, cxt.Resume)
go readAndPrintProgress(progressChan, uctx.Resume)

// listen for errors reported by workers and print it
var allWorkSucceeded = true
Expand All @@ -98,7 +98,8 @@ L:
//
req := &concurrent.Request{
Work: func() error {
_, err := cxt.PageblobClient.UploadPages(context.TODO(),
_, err := uctx.PageblobClient.UploadPages(
ctx,
newByteReadSeekCloser(dataWithRange.Data),
blob.HTTPRange{
Offset: dataWithRange.Range.Start,
Expand Down
Loading