diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 46aa4efb3265d..7eb3daffa60da 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "flags.go", "gcs.go", "hdfs.go", + "helper.go", "local.go", "local_unix.go", "local_windows.go", @@ -23,6 +24,7 @@ go_library( deps = [ "//br/pkg/errors", "//br/pkg/logutil", + "//sessionctx/variable", "@com_github_aliyun_alibaba_cloud_sdk_go//sdk/auth/credentials", "@com_github_aliyun_alibaba_cloud_sdk_go//sdk/auth/credentials/providers", "@com_github_aws_aws_sdk_go//aws", @@ -68,6 +70,7 @@ go_test( "azblob_test.go", "compress_test.go", "gcs_test.go", + "helper_test.go", "local_test.go", "memstore_test.go", "parse_test.go", @@ -77,9 +80,10 @@ go_test( ], embed = [":storage"], flaky = True, - shard_count = 48, + shard_count = 49, deps = [ "//br/pkg/mock", + "//sessionctx/variable", "@com_github_aws_aws_sdk_go//aws", "@com_github_aws_aws_sdk_go//aws/awserr", "@com_github_aws_aws_sdk_go//aws/request", diff --git a/br/pkg/storage/helper.go b/br/pkg/storage/helper.go new file mode 100644 index 0000000000000..48c3608af0bf1 --- /dev/null +++ b/br/pkg/storage/helper.go @@ -0,0 +1,30 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package storage + +import ( + "context" + + "github.com/pingcap/tidb/sessionctx/variable" +) + +func init() { + variable.RedactCloudStorageURI = RedactURL + variable.ValidateCloudStorageURI = ValidateCloudStorageURI +} + +// ValidateCloudStorageURI makes validation for tidb_cloud_storage_uri. +func ValidateCloudStorageURI(ctx context.Context, uri string) error { + b, err := ParseBackend(uri, nil) + if err != nil { + return err + } + _, err = New(ctx, b, &ExternalStorageOptions{ + CheckPermissions: []Permission{ + ListObjects, + GetObject, + AccessBuckets, + }, + }) + return err +} diff --git a/br/pkg/storage/helper_test.go b/br/pkg/storage/helper_test.go new file mode 100644 index 0000000000000..36df44a76c841 --- /dev/null +++ b/br/pkg/storage/helper_test.go @@ -0,0 +1,62 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package storage + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/stretchr/testify/require" +) + +func TestSetTiDBCloudStorageURI(t *testing.T) { + vars := variable.NewSessionVars(nil) + mock := variable.NewMockGlobalAccessor4Tests() + mock.SessionVars = vars + vars.GlobalVarsAccessor = mock + cloudStorageURI := variable.GetSysVar(variable.TiDBCloudStorageURI) + require.Len(t, variable.CloudStorageURI.Load(), 0) + defer func() { + variable.CloudStorageURI.Store("") + }() + + // Default empty + require.Len(t, cloudStorageURI.Value, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set to noop + noopURI := "noop://blackhole?access-key=hello&secret-access-key=world" + err := mock.SetGlobalSysVar(ctx, variable.TiDBCloudStorageURI, noopURI) + require.NoError(t, err) + val, err1 := mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, variable.TiDBCloudStorageURI) + require.NoError(t, err1) + require.Equal(t, noopURI, val) + require.Equal(t, noopURI, variable.CloudStorageURI.Load()) + + // Set to s3, should fail + err = mock.SetGlobalSysVar(ctx, variable.TiDBCloudStorageURI, "s3://blackhole") + require.ErrorContains(t, err, "bucket blackhole") + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer s.Close() + + // Set to s3, should return uri without variable + s3URI := "s3://tiflow-test/?access-key=testid&secret-access-key=testkey8&session-token=testtoken&endpoint=" + s.URL + err = mock.SetGlobalSysVar(ctx, variable.TiDBCloudStorageURI, s3URI) + require.NoError(t, err) + val, err1 = mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, variable.TiDBCloudStorageURI) + require.NoError(t, err1) + require.True(t, strings.HasPrefix(val, "s3://tiflow-test/")) + require.Contains(t, val, "access-key=redacted") + require.Contains(t, val, "secret-access-key=redacted") + require.Contains(t, val, "session-token=redacted") + require.Equal(t, s3URI, variable.CloudStorageURI.Load()) + cancel() +} diff --git a/br/pkg/storage/parse.go b/br/pkg/storage/parse.go index d57ca566b402d..320d0022d6017 100644 --- a/br/pkg/storage/parse.go +++ b/br/pkg/storage/parse.go @@ -224,7 +224,7 @@ func RedactURL(str string) (string, error) { // see below on why we normalize key // https://github.com/pingcap/tidb/blob/a7c0d95f16ea2582bb569278c3f829403e6c3a7e/br/pkg/storage/parse.go#L163 normalizedKey := strings.ToLower(strings.ReplaceAll(k, "_", "-")) - if normalizedKey == "access-key" || normalizedKey == "secret-access-key" { + if normalizedKey == "access-key" || normalizedKey == "secret-access-key" || normalizedKey == "session-token" { values[k] = []string{"redacted"} } } diff --git a/ddl/util/main_test.go b/ddl/util/main_test.go index 247e3c0e91f0e..59738797650c6 100644 --- a/ddl/util/main_test.go +++ b/ddl/util/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } goleak.VerifyTestMain(m, opts...) } diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 0b7867b6cd22a..ad3957481c282 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -43,6 +43,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } goleak.VerifyTestMain(m, opts...) } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 4ad158bfd23f8..bf358da6f53fd 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2256,6 +2256,16 @@ var defaultSysVars = []*SysVar{ DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota)) return nil }}, + // can't assign validate function here. Because validation function will run after GetGlobal function + {Scope: ScopeGlobal, Name: TiDBCloudStorageURI, Value: "", Type: TypeStr, GetGlobal: func(ctx context.Context, sv *SessionVars) (string, error) { + return RedactCloudStorageURI(CloudStorageURI.Load()) + }, SetGlobal: func(ctx context.Context, s *SessionVars, val string) error { + if err := ValidateCloudStorageURI(ctx, val); err != nil { + return err + } + CloudStorageURI.Store(val) + return nil + }}, {Scope: ScopeSession, Name: TiDBConstraintCheckInPlacePessimistic, Value: BoolToOnOff(config.GetGlobalConfig().PessimisticTxn.ConstraintCheckInPlacePessimistic), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.ConstraintCheckInPlacePessimistic = TiDBOptOn(val) diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index ad7dcbc2df4f9..d49405e7ce3e8 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -19,7 +19,10 @@ import ( "encoding/json" "fmt" "math" + "net/http" + "net/http/httptest" "strconv" + "strings" "sync/atomic" "testing" "time" @@ -1356,3 +1359,51 @@ func TestTiDBTiFlashReplicaRead(t *testing.T) { require.NoError(t, err) require.Equal(t, DefTiFlashReplicaRead, val) } + +func TestSetTiDBCloudStorageURI(t *testing.T) { + vars := NewSessionVars(nil) + mock := NewMockGlobalAccessor4Tests() + mock.SessionVars = vars + vars.GlobalVarsAccessor = mock + cloudStorageURI := GetSysVar(TiDBCloudStorageURI) + require.Len(t, CloudStorageURI.Load(), 0) + defer func() { + CloudStorageURI.Store("") + }() + + // Default empty + require.Len(t, cloudStorageURI.Value, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set to noop + noopURI := "noop://blackhole?access-key=hello&secret-access-key=world" + err := mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, noopURI) + require.NoError(t, err) + val, err1 := mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, TiDBCloudStorageURI) + require.NoError(t, err1) + require.Equal(t, noopURI, val) + require.Equal(t, noopURI, CloudStorageURI.Load()) + + // Set to s3, should fail + err = mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, "s3://blackhole") + require.ErrorContains(t, err, "bucket blackhole") + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer s.Close() + + // Set to s3, should return uri without variable + s3URI := "s3://tiflow-test/?access-key=testid&secret-access-key=testkey8&session-token=testtoken&endpoint=" + s.URL + err = mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, s3URI) + require.NoError(t, err) + val, err1 = mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, TiDBCloudStorageURI) + require.NoError(t, err1) + require.True(t, strings.HasPrefix(val, "s3://tiflow-test/")) + require.Contains(t, val, "access-key=redacted") + require.Contains(t, val, "secret-access-key=redacted") + require.Contains(t, val, "session-token=redacted") + require.Equal(t, s3URI, CloudStorageURI.Load()) + cancel() +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 988c8cd503a72..4ccec91b710a6 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -982,6 +982,8 @@ const ( TiDBDDLEnableFastReorg = "tidb_ddl_enable_fast_reorg" // TiDBDDLDiskQuota used to set disk quota for lightning add index. TiDBDDLDiskQuota = "tidb_ddl_disk_quota" + // TiDBCloudStorageURI used to set a cloud storage uri for ddl add index and import into. + TiDBCloudStorageURI = "tidb_cloud_storage_uri" // TiDBAutoBuildStatsConcurrency is used to set the build concurrency of auto-analyze. TiDBAutoBuildStatsConcurrency = "tidb_auto_build_stats_concurrency" // TiDBSysProcScanConcurrency is used to set the scan concurrency of for backend system processes, like auto-analyze. @@ -1504,6 +1506,7 @@ var ( SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats) ServiceScope = atomic.NewString("") SchemaVersionCacheLimit = atomic.NewInt64(DefTiDBSchemaVersionCacheLimit) + CloudStorageURI = atomic.NewString("") ) var ( @@ -1527,6 +1530,10 @@ var ( GetExternalTimestamp func(ctx context.Context) (uint64, error) // SetGlobalResourceControl is the func registered by domain to set cluster resource control. SetGlobalResourceControl atomic.Pointer[func(bool)] + // RedactCloudStorageURI redacts the cloud storage URI. + RedactCloudStorageURI func(uri string) (string, error) + // ValidateCloudStorageURI validates the cloud storage URI. + ValidateCloudStorageURI func(ctx context.Context, uri string) error ) // Hooks functions for Cluster Resource Control. diff --git a/util/expensivequery/expensivequerey_test.go b/util/expensivequery/expensivequerey_test.go index d5306ad69a7e8..7a7d6c8815a5d 100644 --- a/util/expensivequery/expensivequerey_test.go +++ b/util/expensivequery/expensivequerey_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } goleak.VerifyTestMain(m, opts...) } diff --git a/util/mock/main_test.go b/util/mock/main_test.go index f5bc77f62930e..2dc36e6230b7a 100644 --- a/util/mock/main_test.go +++ b/util/mock/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } goleak.VerifyTestMain(m, opts...) } diff --git a/util/sem/main_test.go b/util/sem/main_test.go index 64ba584bbe162..51f622233ed16 100644 --- a/util/sem/main_test.go +++ b/util/sem/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } goleak.VerifyTestMain(m, opts...) } diff --git a/util/sqlexec/main_test.go b/util/sqlexec/main_test.go index 51f112627a42c..48381c898f1f1 100644 --- a/util/sqlexec/main_test.go +++ b/util/sqlexec/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } goleak.VerifyTestMain(m, opts...) }