Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global sort: support setting cloud storage uri variable #46407

Merged
merged 18 commits into from
Sep 11, 2023
6 changes: 5 additions & 1 deletion br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"flags.go",
"gcs.go",
"hdfs.go",
"helper.go",
"local.go",
"local_unix.go",
"local_windows.go",
Expand All @@ -23,6 +24,7 @@ go_library(
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//sessionctx/variable",
"@com_github_aliyun_alibaba_cloud_sdk_go//sdk/auth/credentials",
"@com_github_aliyun_alibaba_cloud_sdk_go//sdk/auth/credentials/providers",
"@com_github_aws_aws_sdk_go//aws",
Expand Down Expand Up @@ -68,6 +70,7 @@ go_test(
"azblob_test.go",
"compress_test.go",
"gcs_test.go",
"helper_test.go",
"local_test.go",
"memstore_test.go",
"parse_test.go",
Expand All @@ -77,9 +80,10 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"//br/pkg/mock",
"//sessionctx/variable",
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//aws/awserr",
"@com_github_aws_aws_sdk_go//aws/request",
Expand Down
30 changes: 30 additions & 0 deletions br/pkg/storage/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package storage

import (
"context"

"github.com/pingcap/tidb/sessionctx/variable"
)

func init() {
variable.RedactCloudStorageURI = RedactURL
variable.ValidateCloudStorageURI = ValidateCloudStorageURI
}

// ValidateCloudStorageURI makes validation for tidb_cloud_storage_uri.
func ValidateCloudStorageURI(ctx context.Context, uri string) error {
b, err := ParseBackend(uri, nil)
if err != nil {
return err
}
_, err = New(ctx, b, &ExternalStorageOptions{
CheckPermissions: []Permission{
ListObjects,
GetObject,
AccessBuckets,
},
})
return err

Check warning on line 29 in br/pkg/storage/helper.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/helper.go#L22-L29

Added lines #L22 - L29 were not covered by tests
}
62 changes: 62 additions & 0 deletions br/pkg/storage/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package storage

import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/pingcap/tidb/sessionctx/variable"
"github.com/stretchr/testify/require"
)

func TestSetTiDBCloudStorageURI(t *testing.T) {
vars := variable.NewSessionVars(nil)
mock := variable.NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
cloudStorageURI := variable.GetSysVar(variable.TiDBCloudStorageURI)
require.Len(t, variable.CloudStorageURI.Load(), 0)
defer func() {
variable.CloudStorageURI.Store("")
}()

// Default empty
require.Len(t, cloudStorageURI.Value, 0)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set to noop
noopURI := "noop://blackhole?access-key=hello&secret-access-key=world"
err := mock.SetGlobalSysVar(ctx, variable.TiDBCloudStorageURI, noopURI)
require.NoError(t, err)
val, err1 := mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, variable.TiDBCloudStorageURI)
require.NoError(t, err1)
require.Equal(t, noopURI, val)
require.Equal(t, noopURI, variable.CloudStorageURI.Load())

// Set to s3, should fail
err = mock.SetGlobalSysVar(ctx, variable.TiDBCloudStorageURI, "s3://blackhole")
require.ErrorContains(t, err, "bucket blackhole")

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer s.Close()

// Set to s3, should return uri without variable
s3URI := "s3://tiflow-test/?access-key=testid&secret-access-key=testkey8&session-token=testtoken&endpoint=" + s.URL
err = mock.SetGlobalSysVar(ctx, variable.TiDBCloudStorageURI, s3URI)
require.NoError(t, err)
val, err1 = mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, variable.TiDBCloudStorageURI)
require.NoError(t, err1)
require.True(t, strings.HasPrefix(val, "s3://tiflow-test/"))
require.Contains(t, val, "access-key=redacted")
require.Contains(t, val, "secret-access-key=redacted")
require.Contains(t, val, "session-token=redacted")
require.Equal(t, s3URI, variable.CloudStorageURI.Load())
cancel()
}
2 changes: 1 addition & 1 deletion br/pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
// see below on why we normalize key
// https://github.com/pingcap/tidb/blob/a7c0d95f16ea2582bb569278c3f829403e6c3a7e/br/pkg/storage/parse.go#L163
normalizedKey := strings.ToLower(strings.ReplaceAll(k, "_", "-"))
if normalizedKey == "access-key" || normalizedKey == "secret-access-key" {
if normalizedKey == "access-key" || normalizedKey == "secret-access-key" || normalizedKey == "session-token" {

Check warning on line 227 in br/pkg/storage/parse.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/parse.go#L227

Added line #L227 was not covered by tests
values[k] = []string{"redacted"}
}
}
Expand Down
1 change: 1 addition & 0 deletions ddl/util/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,16 @@
DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota))
return nil
}},
// can't assign validate function here. Because validation function will run after GetGlobal function
{Scope: ScopeGlobal, Name: TiDBCloudStorageURI, Value: "", Type: TypeStr, GetGlobal: func(ctx context.Context, sv *SessionVars) (string, error) {
return RedactCloudStorageURI(CloudStorageURI.Load())

Check warning on line 2261 in sessionctx/variable/sysvar.go

View check run for this annotation

Codecov / codecov/patch

sessionctx/variable/sysvar.go#L2260-L2261

Added lines #L2260 - L2261 were not covered by tests
}, SetGlobal: func(ctx context.Context, s *SessionVars, val string) error {
if err := ValidateCloudStorageURI(ctx, val); err != nil {
return err
}
CloudStorageURI.Store(val)
return nil

Check warning on line 2267 in sessionctx/variable/sysvar.go

View check run for this annotation

Codecov / codecov/patch

sessionctx/variable/sysvar.go#L2266-L2267

Added lines #L2266 - L2267 were not covered by tests
}},
{Scope: ScopeSession, Name: TiDBConstraintCheckInPlacePessimistic, Value: BoolToOnOff(config.GetGlobalConfig().PessimisticTxn.ConstraintCheckInPlacePessimistic), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
s.ConstraintCheckInPlacePessimistic = TiDBOptOn(val)
Expand Down
51 changes: 51 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"encoding/json"
"fmt"
"math"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1356,3 +1359,51 @@ func TestTiDBTiFlashReplicaRead(t *testing.T) {
require.NoError(t, err)
require.Equal(t, DefTiFlashReplicaRead, val)
}

func TestSetTiDBCloudStorageURI(t *testing.T) {
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
cloudStorageURI := GetSysVar(TiDBCloudStorageURI)
require.Len(t, CloudStorageURI.Load(), 0)
defer func() {
CloudStorageURI.Store("")
}()

// Default empty
require.Len(t, cloudStorageURI.Value, 0)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set to noop
noopURI := "noop://blackhole?access-key=hello&secret-access-key=world"
err := mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, noopURI)
require.NoError(t, err)
val, err1 := mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, TiDBCloudStorageURI)
require.NoError(t, err1)
require.Equal(t, noopURI, val)
require.Equal(t, noopURI, CloudStorageURI.Load())

// Set to s3, should fail
err = mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, "s3://blackhole")
require.ErrorContains(t, err, "bucket blackhole")

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer s.Close()

// Set to s3, should return uri without variable
s3URI := "s3://tiflow-test/?access-key=testid&secret-access-key=testkey8&session-token=testtoken&endpoint=" + s.URL
err = mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, s3URI)
require.NoError(t, err)
val, err1 = mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, TiDBCloudStorageURI)
require.NoError(t, err1)
require.True(t, strings.HasPrefix(val, "s3://tiflow-test/"))
require.Contains(t, val, "access-key=redacted")
require.Contains(t, val, "secret-access-key=redacted")
require.Contains(t, val, "session-token=redacted")
require.Equal(t, s3URI, CloudStorageURI.Load())
cancel()
}
7 changes: 7 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,8 @@ const (
TiDBDDLEnableFastReorg = "tidb_ddl_enable_fast_reorg"
// TiDBDDLDiskQuota used to set disk quota for lightning add index.
TiDBDDLDiskQuota = "tidb_ddl_disk_quota"
// TiDBCloudStorageURI used to set a cloud storage uri for ddl add index and import into.
TiDBCloudStorageURI = "tidb_cloud_storage_uri"
// TiDBAutoBuildStatsConcurrency is used to set the build concurrency of auto-analyze.
TiDBAutoBuildStatsConcurrency = "tidb_auto_build_stats_concurrency"
// TiDBSysProcScanConcurrency is used to set the scan concurrency of for backend system processes, like auto-analyze.
Expand Down Expand Up @@ -1504,6 +1506,7 @@ var (
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
ServiceScope = atomic.NewString("")
SchemaVersionCacheLimit = atomic.NewInt64(DefTiDBSchemaVersionCacheLimit)
CloudStorageURI = atomic.NewString("")
)

var (
Expand All @@ -1527,6 +1530,10 @@ var (
GetExternalTimestamp func(ctx context.Context) (uint64, error)
// SetGlobalResourceControl is the func registered by domain to set cluster resource control.
SetGlobalResourceControl atomic.Pointer[func(bool)]
// RedactCloudStorageURI redacts the cloud storage URI.
RedactCloudStorageURI func(uri string) (string, error)
// ValidateCloudStorageURI validates the cloud storage URI.
ValidateCloudStorageURI func(ctx context.Context, uri string) error
)

// Hooks functions for Cluster Resource Control.
Expand Down
1 change: 1 addition & 0 deletions util/expensivequery/expensivequerey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions util/mock/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions util/sem/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions util/sqlexec/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}