Skip to content

Commit

Permalink
Merge branch 'master' into topsql-enable
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 31, 2021
2 parents a3f7178 + 1dc1695 commit 4bfbd23
Show file tree
Hide file tree
Showing 10 changed files with 962 additions and 7 deletions.
489 changes: 489 additions & 0 deletions br/pkg/storage/azblob.go

Large diffs are not rendered by default.

301 changes: 301 additions & 0 deletions br/pkg/storage/azblob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package storage

import (
"context"
"io"
"os"
"strings"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/stretchr/testify/require"
)

// use shared key to access azurite
type sharedKeyAzuriteClientBuilder struct {
}

func (b *sharedKeyAzuriteClientBuilder) GetServiceClient() (azblob.ServiceClient, error) {
connStr := "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
return azblob.NewServiceClientFromConnectionString(connStr, nil)
}

func (b *sharedKeyAzuriteClientBuilder) GetAccountName() string {
return "devstoreaccount1"
}

func TestAzblob(t *testing.T) {
ctx := context.Background()
options := &backuppb.AzureBlobStorage{
Bucket: "test",
Prefix: "a/b/",
}

azblobStorage, err := newAzureBlobStorageWithClientBuilder(ctx, options, &sharedKeyAzuriteClientBuilder{})
if err != nil {
if strings.Contains(err.Error(), "connect: connection refused") {
t.Log("azurite is not running, skip test")
return
}
}
require.NoError(t, err)

err = azblobStorage.WriteFile(ctx, "key", []byte("data"))
require.NoError(t, err)

err = azblobStorage.WriteFile(ctx, "key1", []byte("data1"))
require.NoError(t, err)

err = azblobStorage.WriteFile(ctx, "key2", []byte("data22223346757222222222289722222"))
require.NoError(t, err)

d, err := azblobStorage.ReadFile(ctx, "key")
require.NoError(t, err)
require.Equal(t, []byte("data"), d)

exist, err := azblobStorage.FileExists(ctx, "key")
require.NoError(t, err)
require.True(t, exist)

exist, err = azblobStorage.FileExists(ctx, "key_not_exist")
require.NoError(t, err)
require.False(t, exist)

keyDelete := "key_delete"
exist, err = azblobStorage.FileExists(ctx, keyDelete)
require.NoError(t, err)
require.False(t, exist)

err = azblobStorage.WriteFile(ctx, keyDelete, []byte("data"))
require.NoError(t, err)

exist, err = azblobStorage.FileExists(ctx, keyDelete)
require.NoError(t, err)
require.True(t, exist)

err = azblobStorage.DeleteFile(ctx, keyDelete)
require.NoError(t, err)

exist, err = azblobStorage.FileExists(ctx, keyDelete)
require.NoError(t, err)
require.False(t, exist)

list := ""
var totalSize int64 = 0
err = azblobStorage.WalkDir(ctx, nil, func(name string, size int64) error {
list += name
totalSize += size
return nil
})
require.NoError(t, err)
require.Equal(t, "keykey1key2", list)
require.Equal(t, int64(42), totalSize)

efr, err := azblobStorage.Open(ctx, "key2")
require.NoError(t, err)

p := make([]byte, 10)
n, err := efr.Read(p)
require.NoError(t, err)
require.Equal(t, 10, n)
require.Equal(t, "data222233", string(p))

p = make([]byte, 40)
n, err = efr.Read(p)
require.NoError(t, err)
require.Equal(t, 23, n)
require.Equal(t, "46757222222222289722222", string(p[:23]))

p = make([]byte, 5)
offs, err := efr.Seek(3, io.SeekStart)
require.NoError(t, err)
require.Equal(t, int64(3), offs)

n, err = efr.Read(p)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "a2222", string(p))

p = make([]byte, 5)
offs, err = efr.Seek(3, io.SeekCurrent)
require.NoError(t, err)
require.Equal(t, int64(11), offs)

n, err = efr.Read(p)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "67572", string(p))

p = make([]byte, 5)
offs, err = efr.Seek(int64(-7), io.SeekEnd)
require.NoError(t, err)
// Note, change it to maxCnt - offs
require.Equal(t, int64(26), offs)

n, err = efr.Read(p)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "97222", string(p))

err = efr.Close()
require.NoError(t, err)

require.Equal(t, "azure://test/a/b/", azblobStorage.URI())
}

func TestNewAzblobStorage(t *testing.T) {
{
options := &backuppb.AzureBlobStorage{
Endpoint: "http://127.0.0.1:1000",
Bucket: "test",
Prefix: "a/b",
AccountName: "user",
SharedKey: "cGFzc3dk",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*sharedKeyClientBuilder)
require.True(t, ok)
require.Equal(t, "user", b.GetAccountName())
require.Equal(t, "http://127.0.0.1:1000", b.serviceURL)
}

{
options := &backuppb.AzureBlobStorage{
Bucket: "test",
Prefix: "a/b",
AccountName: "user",
SharedKey: "cGFzc3dk",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*sharedKeyClientBuilder)
require.True(t, ok)
require.Equal(t, "user", b.GetAccountName())
require.Equal(t, "https://user.blob.core.windows.net", b.serviceURL)
}

err := os.Setenv("AZURE_STORAGE_ACCOUNT", "env_user")
require.NoError(t, err)
defer os.Unsetenv("AZURE_STORAGE_ACCOUNT")

{
options := &backuppb.AzureBlobStorage{
Bucket: "test",
Prefix: "a/b",
AccountName: "user",
SharedKey: "cGFzc3dk",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*sharedKeyClientBuilder)
require.True(t, ok)
require.Equal(t, "user", b.GetAccountName())
require.Equal(t, "https://user.blob.core.windows.net", b.serviceURL)
}

{
options := &backuppb.AzureBlobStorage{
Bucket: "test",
Prefix: "a/b",
SharedKey: "cGFzc3dk",
}
_, err := getAzureServiceClientBuilder(options, nil)
require.Error(t, err)
}

err = os.Setenv("AZURE_STORAGE_KEY", "cGFzc3dk")
require.NoError(t, err)
defer os.Unsetenv("AZURE_STORAGE_KEY")

{
options := &backuppb.AzureBlobStorage{
Endpoint: "http://127.0.0.1:1000",
Bucket: "test",
Prefix: "a/b",
SharedKey: "cGFzc2dk",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*sharedKeyClientBuilder)
require.True(t, ok)
require.Equal(t, "env_user", b.GetAccountName())
require.Equal(t, "http://127.0.0.1:1000", b.serviceURL)
}

err = os.Setenv("AZURE_CLIENT_ID", "321")
require.NoError(t, err)
defer os.Unsetenv("AZURE_CLIENT_ID")

err = os.Setenv("AZURE_TENANT_ID", "321")
require.NoError(t, err)
defer os.Unsetenv("AZURE_TENANT_ID")

err = os.Setenv("AZURE_CLIENT_SECRET", "321")
require.NoError(t, err)
defer os.Unsetenv("AZURE_CLIENT_SECRET")

{
options := &backuppb.AzureBlobStorage{
Endpoint: "http://127.0.0.1:1000",
Bucket: "test",
Prefix: "a/b",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*tokenClientBuilder)
require.True(t, ok)
require.Equal(t, "env_user", b.GetAccountName())
require.Equal(t, "http://127.0.0.1:1000", b.serviceURL)
}

{
options := &backuppb.AzureBlobStorage{
Endpoint: "http://127.0.0.1:1000",
Bucket: "test",
Prefix: "a/b",
SharedKey: "cGFzc2dk",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*tokenClientBuilder)
require.True(t, ok)
require.Equal(t, "env_user", b.GetAccountName())
require.Equal(t, "http://127.0.0.1:1000", b.serviceURL)
}

{
options := &backuppb.AzureBlobStorage{
Endpoint: "http://127.0.0.1:1000",
Bucket: "test",
Prefix: "a/b",
AccountName: "user",
SharedKey: "cGFzc3dk",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*sharedKeyClientBuilder)
require.True(t, ok)
require.Equal(t, "user", b.GetAccountName())
require.Equal(t, "http://127.0.0.1:1000", b.serviceURL)
}

{
options := &backuppb.AzureBlobStorage{
Endpoint: "http://127.0.0.1:1000",
Bucket: "test",
Prefix: "a/b",
AccountName: "user",
}
builder, err := getAzureServiceClientBuilder(options, nil)
require.NoError(t, err)
b, ok := builder.(*tokenClientBuilder)
require.True(t, ok)
require.Equal(t, "user", b.GetAccountName())
require.Equal(t, "http://127.0.0.1:1000", b.serviceURL)
}

}
9 changes: 8 additions & 1 deletion br/pkg/storage/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ import (
func DefineFlags(flags *pflag.FlagSet) {
defineS3Flags(flags)
defineGCSFlags(flags)
defineAzblobFlags(flags)
}

// ParseFromFlags obtains the backend options from the flag set.
func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error {
if err := options.S3.parseFromFlags(flags); err != nil {
return errors.Trace(err)
}
return options.GCS.parseFromFlags(flags)
if err := options.GCS.parseFromFlags(flags); err != nil {
return errors.Trace(err)
}
if err := options.Azblob.parseFromFlags(flags); err != nil {
return errors.Trace(err)
}
return nil
}
23 changes: 21 additions & 2 deletions br/pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
// BackendOptions further configures the storage backend not expressed by the
// storage URL.
type BackendOptions struct {
S3 S3BackendOptions `json:"s3" toml:"s3"`
GCS GCSBackendOptions `json:"gcs" toml:"gcs"`
S3 S3BackendOptions `json:"s3" toml:"s3"`
GCS GCSBackendOptions `json:"gcs" toml:"gcs"`
Azblob AzblobBackendOptions `json:"azblob" toml:"azblob"`
}

// ParseRawURL parse raw url to url object.
Expand Down Expand Up @@ -95,6 +96,20 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBack
}
return &backuppb.StorageBackend{Backend: &backuppb.StorageBackend_Gcs{Gcs: gcs}}, nil

case "azure", "azblob":
if u.Host == "" {
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "please specify the bucket for azblob in %s", rawURL)
}
prefix := strings.Trim(u.Path, "/")
azblob := &backuppb.AzureBlobStorage{Bucket: u.Host, Prefix: prefix}
if options == nil {
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.Azblob)
if err := options.Azblob.apply(azblob); err != nil {
return nil, errors.Trace(err)
}
return &backuppb.StorageBackend{Backend: &backuppb.StorageBackend_AzureBlobStorage{AzureBlobStorage: azblob}}, nil
default:
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %s not support yet", u.Scheme)
}
Expand Down Expand Up @@ -170,6 +185,10 @@ func FormatBackendURL(backend *backuppb.StorageBackend) (u url.URL) {
u.Scheme = "gcs"
u.Host = b.Gcs.Bucket
u.Path = b.Gcs.Prefix
case *backuppb.StorageBackend_AzureBlobStorage:
u.Scheme = "azure"
u.Host = b.AzureBlobStorage.Bucket
u.Path = b.AzureBlobStorage.Prefix
}
return
}
21 changes: 21 additions & 0 deletions br/pkg/storage/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ func TestCreateStorage(t *testing.T) {
require.Equal(t, "backup", gcs.Prefix)
require.Equal(t, "fakeCreds2", gcs.CredentialsBlob)

s, err = ParseBackend(`azure://bucket1/prefix/path?account-name=user&account-key=cGFzc3dk&endpoint=http://127.0.0.1/user`, nil)
require.NoError(t, err)
azblob := s.GetAzureBlobStorage()
require.NotNil(t, azblob)
require.Equal(t, "bucket1", azblob.Bucket)
require.Equal(t, "prefix/path", azblob.Prefix)
require.Equal(t, "http://127.0.0.1/user", azblob.Endpoint)
require.Equal(t, "user", azblob.AccountName)
require.Equal(t, "cGFzc3dk", azblob.SharedKey)

s, err = ParseBackend("/test", nil)
require.NoError(t, err)
local := s.GetLocal()
Expand Down Expand Up @@ -172,4 +182,15 @@ func TestFormatBackendURL(t *testing.T) {
},
})
require.Equal(t, "gcs://bucket/some%20prefix/", backendURL.String())

backendURL = FormatBackendURL(&backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_AzureBlobStorage{
AzureBlobStorage: &backuppb.AzureBlobStorage{
Bucket: "bucket",
Prefix: "/some prefix/",
Endpoint: "https://azure.example.com/",
},
},
})
require.Equal(t, "azure://bucket/some%20prefix/", backendURL.String())
}
Loading

0 comments on commit 4bfbd23

Please sign in to comment.