Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global sort: support setting cloud storage uri variable #46407

Merged
merged 18 commits into from
Sep 11, 2023
2 changes: 1 addition & 1 deletion br/pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
// see below on why we normalize key
// https://github.com/pingcap/tidb/blob/a7c0d95f16ea2582bb569278c3f829403e6c3a7e/br/pkg/storage/parse.go#L163
normalizedKey := strings.ToLower(strings.ReplaceAll(k, "_", "-"))
if normalizedKey == "access-key" || normalizedKey == "secret-access-key" {
if normalizedKey == "access-key" || normalizedKey == "secret-access-key" || normalizedKey == "session-token" {

Check warning on line 227 in br/pkg/storage/parse.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/parse.go#L227

Added line #L227 was not covered by tests
values[k] = []string{"redacted"}
}
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
importpath = "github.com/pingcap/tidb/sessionctx/variable",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/storage",
"//config",
"//domain/resourcegroup",
"//errno",
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use the RedactURL in parser/ast/misc.go instead?

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -2256,6 +2257,16 @@
DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota))
return nil
}},
// can't assign validate function here. Because validation function will run after GetGlobal function
{Scope: ScopeGlobal, Name: TiDBCloudStorageURI, Value: "", Type: TypeStr, GetGlobal: func(ctx context.Context, sv *SessionVars) (string, error) {
return storage.RedactURL(CloudStorageURI.Load())

Check warning on line 2262 in sessionctx/variable/sysvar.go

View check run for this annotation

Codecov / codecov/patch

sessionctx/variable/sysvar.go#L2261-L2262

Added lines #L2261 - L2262 were not covered by tests
}, SetGlobal: func(ctx context.Context, s *SessionVars, val string) error {
if _, err := ParseCloudStorageURI(ctx, val, true); err != nil {
return err
}
CloudStorageURI.Store(val)
return nil

Check warning on line 2268 in sessionctx/variable/sysvar.go

View check run for this annotation

Codecov / codecov/patch

sessionctx/variable/sysvar.go#L2267-L2268

Added lines #L2267 - L2268 were not covered by tests
}},
{Scope: ScopeSession, Name: TiDBConstraintCheckInPlacePessimistic, Value: BoolToOnOff(config.GetGlobalConfig().PessimisticTxn.ConstraintCheckInPlacePessimistic), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
s.ConstraintCheckInPlacePessimistic = TiDBOptOn(val)
Expand Down
51 changes: 51 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"encoding/json"
"fmt"
"math"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1356,3 +1359,51 @@ func TestTiDBTiFlashReplicaRead(t *testing.T) {
require.NoError(t, err)
require.Equal(t, DefTiFlashReplicaRead, val)
}

func TestSetTiDBCloudStorageURI(t *testing.T) {
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
cloudStorageURI := GetSysVar(TiDBCloudStorageURI)
require.Len(t, CloudStorageURI.Load(), 0)
defer func() {
CloudStorageURI.Store("")
}()

// Default empty
require.Len(t, cloudStorageURI.Value, 0)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set to noop
noopURI := "noop://blackhole?access-key=hello&secret-access-key=world"
err := mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, noopURI)
require.NoError(t, err)
val, err1 := mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, TiDBCloudStorageURI)
require.NoError(t, err1)
require.Equal(t, noopURI, val)
require.Equal(t, noopURI, CloudStorageURI.Load())

// Set to s3, should fail
err = mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, "s3://blackhole")
require.ErrorContains(t, err, "bucket blackhole")

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer s.Close()

// Set to s3, should return uri without variable
s3URI := "s3://tiflow-test/?access-key=testid&secret-access-key=testkey8&session-token=testtoken&endpoint=" + s.URL
err = mock.SetGlobalSysVar(ctx, TiDBCloudStorageURI, s3URI)
require.NoError(t, err)
val, err1 = mock.SessionVars.GetSessionOrGlobalSystemVar(ctx, TiDBCloudStorageURI)
require.NoError(t, err1)
require.True(t, strings.HasPrefix(val, "s3://tiflow-test/"))
require.Contains(t, val, "access-key=redacted")
require.Contains(t, val, "secret-access-key=redacted")
require.Contains(t, val, "session-token=redacted")
require.Equal(t, s3URI, CloudStorageURI.Load())
cancel()
}
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,8 @@ const (
TiDBDDLEnableFastReorg = "tidb_ddl_enable_fast_reorg"
// TiDBDDLDiskQuota used to set disk quota for lightning add index.
TiDBDDLDiskQuota = "tidb_ddl_disk_quota"
// TiDBCloudStorageURI used to set a cloud storage uri for ddl add index and import into.
TiDBCloudStorageURI = "tidb_cloud_storage_uri"
// TiDBAutoBuildStatsConcurrency is used to set the build concurrency of auto-analyze.
TiDBAutoBuildStatsConcurrency = "tidb_auto_build_stats_concurrency"
// TiDBSysProcScanConcurrency is used to set the scan concurrency of for backend system processes, like auto-analyze.
Expand Down Expand Up @@ -1504,6 +1506,7 @@ var (
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
ServiceScope = atomic.NewString("")
SchemaVersionCacheLimit = atomic.NewInt64(DefTiDBSchemaVersionCacheLimit)
CloudStorageURI = atomic.NewString("")
)

var (
Expand Down
24 changes: 24 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -586,3 +587,26 @@
}
return skipTypes
}

// ParseCloudStorageURI makes validation for tidb_cloud_storage_uri.
func ParseCloudStorageURI(ctx context.Context, uri string, checkPermission bool) (storage.ExternalStorage, error) {
b, err := storage.ParseBackend(uri, nil)
if err != nil {
return nil, err
}
var checkPermissions []storage.Permission
if checkPermission {
checkPermissions = []storage.Permission{
storage.ListObjects,
storage.GetObject,
storage.AccessBuckets,
}
}
backend, err := storage.New(ctx, b, &storage.ExternalStorageOptions{
CheckPermissions: checkPermissions,
})
if err != nil {
return nil, err
}
return backend, nil

Check warning on line 611 in sessionctx/variable/varsutil.go

View check run for this annotation

Codecov / codecov/patch

sessionctx/variable/varsutil.go#L597-L611

Added lines #L597 - L611 were not covered by tests
}
1 change: 1 addition & 0 deletions util/expensivequery/expensivequerey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions util/mock/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions util/sem/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions util/sqlexec/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}